Skip to content

Commit

Permalink
[HUDI-1330] handle prefix filtering at directory level
Browse files Browse the repository at this point in the history
The current DFSPathSelector only ignore prefix(_, .) at the file level while files under subdirectories
e.g. (.checkpoint/*) are still considered which result in bad-format exception during reading.
  • Loading branch information
hotienvu committed Oct 20, 2020
1 parent fd269dd commit 155366d
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,15 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;

Expand All @@ -63,7 +62,7 @@ public static class Config {
protected final TypedProperties props;

public DFSPathSelector(TypedProperties props, Configuration hadoopConf) {
DataSourceUtils.checkRequiredProperties(props, Arrays.asList(Config.ROOT_INPUT_PATH_PROP));
DataSourceUtils.checkRequiredProperties(props, Collections.singletonList(Config.ROOT_INPUT_PATH_PROP));
this.props = props;
this.fs = FSUtils.getFs(props.getString(Config.ROOT_INPUT_PATH_PROP), hadoopConf);
}
Expand Down Expand Up @@ -101,30 +100,15 @@ public Pair<Option<String>, String> getNextFilePathsAndMaxModificationTime(Optio
try {
// obtain all eligible files under root folder.
log.info("Root path => " + props.getString(Config.ROOT_INPUT_PATH_PROP) + " source limit => " + sourceLimit);
List<FileStatus> eligibleFiles = new ArrayList<>();
RemoteIterator<LocatedFileStatus> fitr =
fs.listFiles(new Path(props.getString(Config.ROOT_INPUT_PATH_PROP)), true);
while (fitr.hasNext()) {
LocatedFileStatus fileStatus = fitr.next();
if (fileStatus.isDirectory()
|| fileStatus.getLen() == 0
|| IGNORE_FILEPREFIX_LIST.stream().anyMatch(pfx -> fileStatus.getPath().getName().startsWith(pfx))) {
continue;
}
eligibleFiles.add(fileStatus);
}
long lastCheckpointTime = lastCheckpointStr.map(Long::parseLong).orElse(Long.MIN_VALUE);
List<FileStatus> eligibleFiles = listEligibleFiles(fs, new Path(props.getString(Config.ROOT_INPUT_PATH_PROP)), lastCheckpointTime);
// sort them by modification time.
eligibleFiles.sort(Comparator.comparingLong(FileStatus::getModificationTime));
// Filter based on checkpoint & input size, if needed
long currentBytes = 0;
long maxModificationTime = Long.MIN_VALUE;
List<FileStatus> filteredFiles = new ArrayList<>();
for (FileStatus f : eligibleFiles) {
if (lastCheckpointStr.isPresent() && f.getModificationTime() <= Long.valueOf(lastCheckpointStr.get()).longValue()) {
// skip processed files
continue;
}

if (currentBytes + f.getLen() >= sourceLimit) {
// we have enough data, we are done
break;
Expand All @@ -136,7 +120,7 @@ public Pair<Option<String>, String> getNextFilePathsAndMaxModificationTime(Optio
}

// no data to read
if (filteredFiles.size() == 0) {
if (filteredFiles.isEmpty()) {
return new ImmutablePair<>(Option.empty(), lastCheckpointStr.orElseGet(() -> String.valueOf(Long.MIN_VALUE)));
}

Expand All @@ -148,4 +132,25 @@ public Pair<Option<String>, String> getNextFilePathsAndMaxModificationTime(Optio
throw new HoodieIOException("Unable to read from source from checkpoint: " + lastCheckpointStr, ioe);
}
}

/**
* List files recursively, filter out illegible files/directories while doing so.
*/
private List<FileStatus> listEligibleFiles(FileSystem fs, Path path, long lastCheckpointTime) throws IOException {
// skip files/dirs whose names start with (_, ., etc)
FileStatus[] statuses = fs.listStatus(path, file ->
IGNORE_FILEPREFIX_LIST.stream().noneMatch(pfx -> file.getName().startsWith(pfx)));
List<FileStatus> res = new ArrayList<>();
for (FileStatus status: statuses) {
if (status.isDirectory()) {
// avoid infinite loop
if (!status.isSymlink()) {
res.addAll(listEligibleFiles(fs, status.getPath(), lastCheckpointTime));
}
} else if (status.getModificationTime() > lastCheckpointTime && status.getLen() > 0) {
res.add(status);
}
}
return res;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -175,5 +175,18 @@ public void testReadingFromSource() throws IOException {
InputBatch<JavaRDD<GenericRecord>> fetch5 = sourceFormatAdapter.fetchNewDataInAvroFormat(
Option.empty(), Long.MAX_VALUE);
assertEquals(10100, fetch5.getBatch().get().count());

// 6. Should skip files/directories whose names start with prefixes ("_", ".")
generateOneFile(".checkpoint/3", "002", 100);
generateOneFile("_checkpoint/3", "002", 100);
generateOneFile(".3", "002", 100);
generateOneFile("_3", "002", 100);
// also work with nested directory
generateOneFile("foo/.bar/3", "002", 1); // not ok
generateOneFile("foo/bar/3", "002", 1); // ok
// fetch everything from the beginning
InputBatch<JavaRDD<GenericRecord>> fetch6 = sourceFormatAdapter.fetchNewDataInAvroFormat(
Option.empty(), Long.MAX_VALUE);
assertEquals(10101, fetch6.getBatch().get().count());
}
}

0 comments on commit 155366d

Please sign in to comment.