Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HBASE-26707: Reduce number of renames during bulkload #4066

Merged
merged 2 commits into from
Feb 17, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
HBASE-26707: Reduce number of renames during bulkload
move files directly to the store dir when requireWritingToTmpDirFirst is
false
fix failedBulkLoad to work on second call
change existing tests to run without tmp folder too
add tests for SecureBulkLoadListener
  • Loading branch information
BukrosSzabolcs committed Jan 26, 2022
commit 5ec17eaffe79463792d8e3cf800ab4274059500d
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.mapreduce;

import org.apache.commons.lang3.RandomStringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtil;
import org.apache.hadoop.hbase.IntegrationTestBase;
import org.apache.hadoop.hbase.IntegrationTestingUtility;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Consistency;
import org.apache.hadoop.hbase.client.RegionLocator;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.RegionObserver;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.storefiletracker.StoreFileTrackerFactory;
import org.apache.hadoop.hbase.testclassification.IntegrationTests;
import org.apache.hadoop.hbase.tool.BulkLoadHFiles;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.RegionSplitter;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hbase.thirdparty.com.google.common.base.Joiner;
import org.apache.hbase.thirdparty.com.google.common.collect.Sets;
import org.apache.hbase.thirdparty.org.apache.commons.cli.CommandLine;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import static org.junit.Assert.assertEquals;

/**
* Test Bulk Load and MR on a distributed cluster.
* With FileBased StorefileTracker enabled.
* It starts an MR job that creates linked chains
*
* The format of rows is like this:
* Row Key -> Long
*
* L:<< Chain Id >> -> Row Key of the next link in the chain
* S:<< Chain Id >> -> The step in the chain that his link is.
* D:<< Chain Id >> -> Random Data.
*
* All chains start on row 0.
* All rk's are > 0.
*
* After creating the linked lists they are walked over using a TableMapper based Mapreduce Job.
*
* There are a few options exposed:
*
* hbase.IntegrationTestBulkLoad.chainLength
* The number of rows that will be part of each and every chain.
*
* hbase.IntegrationTestBulkLoad.numMaps
* The number of mappers that will be run. Each mapper creates on linked list chain.
*
* hbase.IntegrationTestBulkLoad.numImportRounds
* How many jobs will be run to create linked lists.
*
* hbase.IntegrationTestBulkLoad.tableName
* The name of the table.
*
* hbase.IntegrationTestBulkLoad.replicaCount
* How many region replicas to configure for the table under test.
*/
@Category(IntegrationTests.class)
public class IntegrationTestFileBasedSFTBulkLoad extends IntegrationTestBulkLoad {

private static final Logger LOG = LoggerFactory.getLogger(IntegrationTestFileBasedSFTBulkLoad.class);

private static String NUM_MAPS_KEY = "hbase.IntegrationTestBulkLoad.numMaps";
private static String NUM_IMPORT_ROUNDS_KEY = "hbase.IntegrationTestBulkLoad.numImportRounds";
private static String NUM_REPLICA_COUNT_KEY = "hbase.IntegrationTestBulkLoad.replicaCount";
private static int NUM_REPLICA_COUNT_DEFAULT = 1;

@Test
public void testFileBasedSFTBulkLoad() throws Exception {
super.testBulkLoad();
}

@Override
public void setUpCluster() throws Exception {
util = getTestingUtil(getConf());
util.getConfiguration().set(StoreFileTrackerFactory.TRACKER_IMPL,
"org.apache.hadoop.hbase.regionserver.storefiletracker.FileBasedStoreFileTracker");
util.initializeCluster(1);
int replicaCount = getConf().getInt(NUM_REPLICA_COUNT_KEY, NUM_REPLICA_COUNT_DEFAULT);
if (LOG.isDebugEnabled() && replicaCount != NUM_REPLICA_COUNT_DEFAULT) {
LOG.debug("Region Replicas enabled: " + replicaCount);
}

// Scale this up on a real cluster
if (util.isDistributedCluster()) {
util.getConfiguration().setIfUnset(NUM_MAPS_KEY,
Integer.toString(util.getAdmin().getRegionServers().size() * 10));
util.getConfiguration().setIfUnset(NUM_IMPORT_ROUNDS_KEY, "5");
} else {
util.startMiniMapReduceCluster();
}
}

public static void main(String[] args) throws Exception {
Configuration conf = HBaseConfiguration.create();
IntegrationTestingUtility.setUseDistributedCluster(conf);
int status = ToolRunner.run(conf, new IntegrationTestFileBasedSFTBulkLoad(), args);
System.exit(status);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7128,7 +7128,7 @@ public interface BulkLoadListener {
* @return final path to be used for actual loading
* @throws IOException
*/
String prepareBulkLoad(byte[] family, String srcPath, boolean copyFile)
String prepareBulkLoad(byte[] family, String srcPath, boolean copyFile, String customStaging)
throws IOException;

/**
Expand Down Expand Up @@ -7250,12 +7250,21 @@ public Map<byte[], List<Path>> bulkLoadHFiles(Collection<Pair<byte[], String>> f
familyWithFinalPath.put(familyName, new ArrayList<>());
}
List<Pair<Path, Path>> lst = familyWithFinalPath.get(familyName);
String finalPath = path;
try {
String finalPath = path;
boolean reqTmp = store.storeEngine.requireWritingToTmpDirFirst();
if (bulkLoadListener != null) {
finalPath = bulkLoadListener.prepareBulkLoad(familyName, path, copyFile);
finalPath = bulkLoadListener.prepareBulkLoad(familyName, path, copyFile,
reqTmp ? null : regionDir.toString());
}
Pair<Path, Path> pair = null;
if (reqTmp) {
pair = store.preBulkLoadHFile(finalPath, seqId);
}
else {
Path livePath = new Path(finalPath);
pair = new Pair<>(livePath, livePath);
}
Pair<Path, Path> pair = store.preBulkLoadHFile(finalPath, seqId);
lst.add(pair);
} catch (IOException ioe) {
// A failure here can cause an atomicity violation that we currently
Expand All @@ -7265,7 +7274,7 @@ public Map<byte[], List<Path>> bulkLoadHFiles(Collection<Pair<byte[], String>> f
" load " + Bytes.toString(p.getFirst()) + " : " + p.getSecond(), ioe);
if (bulkLoadListener != null) {
try {
bulkLoadListener.failedBulkLoad(familyName, path);
bulkLoadListener.failedBulkLoad(familyName, finalPath);
} catch (Exception ex) {
LOG.error("Error while calling failedBulkLoad for family " +
Bytes.toString(familyName) + " with path " + path, ex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -508,6 +508,10 @@ private Path preCommitStoreFile(final String familyName, final Path buildPath,
* @throws IOException
*/
Path commitStoreFile(final Path buildPath, Path dstPath) throws IOException {
// rename is not necessary in case of direct-insert stores
if(buildPath.equals(dstPath)){
return dstPath;
}
// buildPath exists, therefore not doing an exists() check.
if (!rename(buildPath, dstPath)) {
throw new IOException("Failed rename of " + buildPath + " to " + dstPath);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.mutable.MutableInt;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
Expand Down Expand Up @@ -341,27 +342,36 @@ private User getActiveUser() throws IOException {
return user;
}

private static class SecureBulkLoadListener implements BulkLoadListener {
protected static class SecureBulkLoadListener implements BulkLoadListener {
BukrosSzabolcs marked this conversation as resolved.
Show resolved Hide resolved
// Target filesystem
private final FileSystem fs;
private final String stagingDir;
private final Configuration conf;
// Source filesystem
private FileSystem srcFs = null;
private Map<String, FsPermission> origPermissions = null;
private Map<String, String> origlSources = null;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could be <String,Path> and save need for path conversion again in failedBulkLoad?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: origSources instead of origlSources?

Copy link
Contributor Author

@BukrosSzabolcs BukrosSzabolcs Feb 4, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I left it like that because origPermissions uses String as well so a conversion would be done anyway. And while I could switch both of those to use Paths, log messages will need a string too.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I left it like that because origPermissions uses String as well so a conversion would be done anyway. And while I could switch both of those to use Paths, log messages will need a string too.

Ok, let's just then follow var naming pattern, then.


public SecureBulkLoadListener(FileSystem fs, String stagingDir, Configuration conf) {
this.fs = fs;
this.stagingDir = stagingDir;
this.conf = conf;
this.origPermissions = new HashMap<>();
this.origlSources = new HashMap<>();
}

@Override
public String prepareBulkLoad(final byte[] family, final String srcPath, boolean copyFile)
throws IOException {
public String prepareBulkLoad(final byte[] family, final String srcPath, boolean copyFile,
String customStaging ) throws IOException {
Path p = new Path(srcPath);
Path stageP = new Path(stagingDir, new Path(Bytes.toString(family), p.getName()));

//store customStaging for failedBulkLoad
String currentStaging = stagingDir;
if(StringUtils.isNotEmpty(customStaging)){
currentStaging = customStaging;
}
wchevreuil marked this conversation as resolved.
Show resolved Hide resolved

Path stageP = new Path(currentStaging, new Path(Bytes.toString(family), p.getName()));

// In case of Replication for bulk load files, hfiles are already copied in staging directory
if (p.equals(stageP)) {
Expand Down Expand Up @@ -390,11 +400,16 @@ public String prepareBulkLoad(final byte[] family, final String srcPath, boolean
LOG.debug("Moving " + p + " to " + stageP);
FileStatus origFileStatus = fs.getFileStatus(p);
origPermissions.put(srcPath, origFileStatus.getPermission());
origlSources.put(stageP.toString(), srcPath);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One idea for FILE SFT, could we always force the copy option above, instead of this one that relies on rename?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could. My only concern is that it might be impractical over a certain data size especially if there is no config or param that would allow the user to use rename if they need to.
What do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True. My main concern is how safe this rename would be, if we don't have hboss in the picture. Could concurrent bulkloads be a problem in such cases? Maybe a workaround for such deployments would be to always pass a custom staging dir?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not think concurrency is an issue. SecureBulkLoadListener keeps track of the moved files and a separate listener is created for each region in each bulkLoad. So even with parallel bulkLoads they can not touch each other's files. Using the same source folder can be an issue but it always was an issue.

I'm not sure I understand your comment about "always pass a custom staging dir". The staging dir by default is different for each bulkLoad process. I break this by introducing the "custom staging dir" which always points to the live data folder as a workaround to skip moving hfiles to an actual staging dir without loosing the existing error handling. We can't change it and decrease the number of moves at the same time.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, yeah, then we would need to rename from the custom stage to the actual dir anyways. I think we can leave it this way.

For S3 with SFT, I think the best practice would be to pass the copy option, for consistency. Shouldn't have much difference, as s3 renames are basically copies.

if(!fs.rename(p, stageP)) {
throw new IOException("Failed to move HFile: " + p + " to " + stageP);
}
}
fs.setPermission(stageP, PERM_ALL_ACCESS);

if(StringUtils.isNotEmpty(customStaging)) {
fs.setPermission(stageP, PERM_ALL_ACCESS);
}

return stageP.toString();
}

Expand All @@ -412,35 +427,37 @@ private void closeSrcFs() throws IOException {
}

@Override
public void failedBulkLoad(final byte[] family, final String srcPath) throws IOException {
public void failedBulkLoad(final byte[] family, final String stagedPath) throws IOException {
try {
Path p = new Path(srcPath);
if (srcFs == null) {
srcFs = FileSystem.newInstance(p.toUri(), conf);
}
if (!FSUtils.isSameHdfs(conf, srcFs, fs)) {
// files are copied so no need to move them back
String src = origlSources.get(stagedPath);
if(StringUtils.isEmpty(src)){
LOG.debug(stagedPath + " was not moved to staging. No need to move back");
return;
}
Path stageP = new Path(stagingDir, new Path(Bytes.toString(family), p.getName()));

// In case of Replication for bulk load files, hfiles are not renamed by end point during
// prepare stage, so no need of rename here again
if (p.equals(stageP)) {
wchevreuil marked this conversation as resolved.
Show resolved Hide resolved
LOG.debug(p.getName() + " is already available in source directory. Skipping rename.");
Path stageP = new Path(stagedPath);
if (!fs.exists(stageP)) {
throw new IOException(
"Missing HFile: " + stageP + ", can't be moved back to it's original place");
}

//we should not move back files if the original exists
Path srcPath = new Path(src);
if(srcFs.exists(srcPath)) {
LOG.debug(src + " is already at it's original place. No need to move.");
return;
}

LOG.debug("Moving " + stageP + " back to " + p);
if (!fs.rename(stageP, p)) {
throw new IOException("Failed to move HFile: " + stageP + " to " + p);
LOG.debug("Moving " + stageP + " back to " + srcPath);
if (!fs.rename(stageP, srcPath)) {
throw new IOException("Failed to move HFile: " + stageP + " to " + srcPath);
}

// restore original permission
if (origPermissions.containsKey(srcPath)) {
fs.setPermission(p, origPermissions.get(srcPath));
if (origPermissions.containsKey(stagedPath)) {
fs.setPermission(srcPath, origPermissions.get(src));
} else {
LOG.warn("Can't find previous permission for path=" + srcPath);
LOG.warn("Can't find previous permission for path=" + stagedPath);
}
} finally {
closeSrcFs();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,10 @@ public class TestBulkLoad extends TestBulkloadBase {
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestBulkLoad.class);

public TestBulkLoad(boolean useFileBasedSFT) {
super(useFileBasedSFT);
}

@Test
public void verifyBulkLoadEvent() throws IOException {
TableName tableName = TableName.valueOf("test", "test");
Expand Down
Loading