Skip to content

Commit

Permalink
[HUDI-1354] Block updates and replace on file groups in clustering
Browse files Browse the repository at this point in the history
  • Loading branch information
lw309637554 committed Dec 18, 2020
1 parent 4d05680 commit 0f5e49e
Show file tree
Hide file tree
Showing 9 changed files with 328 additions and 1 deletion.
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.config;

import org.apache.hudi.common.config.DefaultHoodieConfig;
import org.apache.hudi.table.action.clustering.update.RejectUpdateStrategy;
import org.apache.hudi.table.action.clustering.update.UpdateStrategy;

import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.util.Properties;

public class HoodieClusteringConfig extends DefaultHoodieConfig {

public static final String CLUSTERING_UPDATES_STRATEGY_PROP = "hoodie.clustering.updates.strategy";
public static final String DEFAULT_CLUSTERING_UPDATES_STRATEGY = RejectUpdateStrategy.class.getName();

public HoodieClusteringConfig(Properties props) {
super(props);
}

public static Builder newBuilder() {
return new Builder();
}

public static class Builder {
private final Properties props = new Properties();

public Builder fromProperties(Properties props) {
this.props.putAll(props);
return this;
}

public Builder fromFile(File propertiesFile) throws IOException {
try (FileReader reader = new FileReader(propertiesFile)) {
this.props.load(reader);
return this;
}
}

public Builder withClusteringUpdatesStrategy(UpdateStrategy updatesStrategy) {
props.setProperty(CLUSTERING_UPDATES_STRATEGY_PROP, updatesStrategy.getClass().getName());
return this;
}

public HoodieClusteringConfig build() {
HoodieClusteringConfig config = new HoodieClusteringConfig(props);
setDefaultOnCondition(props, !props.containsKey(CLUSTERING_UPDATES_STRATEGY_PROP),
CLUSTERING_UPDATES_STRATEGY_PROP, DEFAULT_CLUSTERING_UPDATES_STRATEGY);
return config;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.metrics.MetricsReporterType;
import org.apache.hudi.metrics.datadog.DatadogHttpClient.ApiSite;
import org.apache.hudi.table.action.clustering.update.UpdateStrategy;
import org.apache.hudi.table.action.compact.strategy.CompactionStrategy;

import org.apache.parquet.hadoop.metadata.CompressionCodecName;
Expand All @@ -49,6 +50,7 @@
import java.util.function.Supplier;
import java.util.stream.Collectors;


/**
* Class storing configs for the HoodieWriteClient.
*/
Expand Down Expand Up @@ -381,6 +383,10 @@ public Boolean shouldCleanBootstrapBaseFile() {
return Boolean.valueOf(props.getProperty(HoodieCompactionConfig.CLEANER_BOOTSTRAP_BASE_FILE_ENABLED));
}

public UpdateStrategy getClusteringUpdatesStrategy() {
return ReflectionUtils.loadClass(props.getProperty(HoodieClusteringConfig.CLUSTERING_UPDATES_STRATEGY_PROP));
}

/**
* index properties.
*/
Expand Down Expand Up @@ -774,6 +780,7 @@ public static class Builder {
private boolean isIndexConfigSet = false;
private boolean isStorageConfigSet = false;
private boolean isCompactionConfigSet = false;
private boolean isClusteringConfigSet = false;
private boolean isMetricsConfigSet = false;
private boolean isBootstrapConfigSet = false;
private boolean isMemoryConfigSet = false;
Expand Down Expand Up @@ -1038,6 +1045,8 @@ protected void setDefaults() {
setDefaultOnCondition(props, !isStorageConfigSet, HoodieStorageConfig.newBuilder().fromProperties(props).build());
setDefaultOnCondition(props, !isCompactionConfigSet,
HoodieCompactionConfig.newBuilder().fromProperties(props).build());
setDefaultOnCondition(props, !isClusteringConfigSet,
HoodieClusteringConfig.newBuilder().fromProperties(props).build());
setDefaultOnCondition(props, !isMetricsConfigSet, HoodieMetricsConfig.newBuilder().fromProperties(props).build());
setDefaultOnCondition(props, !isBootstrapConfigSet,
HoodieBootstrapConfig.newBuilder().fromProperties(props).build());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.exception;

public class HoodieClusteringUpdateException extends HoodieException {
public HoodieClusteringUpdateException(String msg) {
super(msg);
}

public HoodieClusteringUpdateException(String msg, Throwable e) {
super(msg, e);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.table.action.clustering.update;

import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieClusteringUpdateException;
import org.apache.hudi.table.WorkloadProfile;
import org.apache.hudi.table.WorkloadStat;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

public class RejectUpdateStrategy implements UpdateStrategy {
private static final Logger LOG = LogManager.getLogger(RejectUpdateStrategy.class);

@Override
public void apply(List<Pair<HoodieFileGroupId, HoodieInstant>> fileGroupsInPendingClustering, WorkloadProfile workloadProfile) {
Set<Pair<String, String>> partitionPathAndFileIds = fileGroupsInPendingClustering.stream()
.map(entry -> Pair.of(entry.getLeft().getPartitionPath(), entry.getLeft().getFileId())).collect(Collectors.toSet());
if (partitionPathAndFileIds.size() == 0) {
return;
}

Set<Map.Entry<String, WorkloadStat>> partitionStatEntries = workloadProfile.getPartitionPathStatMap().entrySet();
for (Map.Entry<String, WorkloadStat> partitionStat : partitionStatEntries) {
for (Map.Entry<String, Pair<String, Long>> updateLocEntry :
partitionStat.getValue().getUpdateLocationToCount().entrySet()) {
String partitionPath = partitionStat.getKey();
String fileId = updateLocEntry.getKey();
if (partitionPathAndFileIds.contains(Pair.of(partitionPath, fileId))) {
String msg = String.format("Not allowed to update the clustering files partition: %s fileID: %s. "
+ "For pending clustering operations, we are not going to support update for now.", partitionPath, fileId);
LOG.error(msg);
throw new HoodieClusteringUpdateException(msg);
}
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hudi.table.action.clustering.update;

import org.apache.hudi.common.model.HoodieFileGroupId;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.table.WorkloadProfile;

import java.util.List;

/**
* When file groups in clustering, write records to these file group need to check.
*/
public interface UpdateStrategy {

/**
* check the update records to the file group in clustering.
* @param fileGroupsInPendingClustering
* @param workloadProfile workloadProfile have the records update info,
* just like BaseSparkCommitActionExecutor.getUpsertPartitioner use it.
*/
void apply(List<Pair<HoodieFileGroupId, HoodieInstant>> fileGroupsInPendingClustering, WorkloadProfile workloadProfile);

}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieCommitException;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.table.HoodieTable;
import org.apache.hudi.table.WorkloadProfile;
Expand All @@ -49,6 +50,7 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

public abstract class BaseCommitActionExecutor<T extends HoodieRecordPayload, I, K, O, R>
extends BaseActionExecutor<T, I, K, O, R> {
Expand All @@ -68,6 +70,15 @@ public BaseCommitActionExecutor(HoodieEngineContext context, HoodieWriteConfig c
this.taskContextSupplier = context.getTaskContextSupplier();
}

protected void clusteringUpdateCheck(WorkloadProfile profile) {
if (profile == null) {
throw new HoodieException("Need workload profile to check clustering update now.");
}
// apply clustering update strategy.
config.getClusteringUpdatesStrategy()
.apply(table.getFileSystemView().getFileGroupsInPendingClustering().collect(Collectors.toList()), profile);
}

public abstract HoodieWriteMetadata<O> execute(I inputRecords);

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,8 @@ public HoodieWriteMetadata<JavaRDD<WriteStatus>> execute(JavaRDD<HoodieRecord<T>
saveWorkloadProfileMetadataToInflight(profile, instantTime);
}

// use profile to check clustering update
this.clusteringUpdateCheck(profile);
// partition using the insert partitioner
final Partitioner partitioner = getPartitioner(profile);
JavaRDD<HoodieRecord<T>> partitionedRecords = partition(inputRecordsRDD, partitioner);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,11 +140,21 @@ private void assignInserts(WorkloadProfile profile, HoodieEngineContext context)
Map<String, List<SmallFile>> partitionSmallFilesMap =
getSmallFilesForPartitions(new ArrayList<String>(partitionPaths), context);

// get the in pending clustering fileId for each partition path
Map<String, List<String>> partitionPathToInPendingClusteringFileId =
table.getFileSystemView().getFileGroupsInPendingClustering()
.map(fileGroupIdAndInstantPair ->
Pair.of(fileGroupIdAndInstantPair.getKey().getPartitionPath(), fileGroupIdAndInstantPair.getKey().getFileId()))
.collect(Collectors.groupingBy(Pair::getKey, Collectors.mapping(Pair::getValue, Collectors.toList())));

for (String partitionPath : partitionPaths) {
WorkloadStat pStat = profile.getWorkloadStat(partitionPath);
if (pStat.getNumInserts() > 0) {
// exclude the small file in pending clustering, because in pending clustering file not support update now.
List<String> inPendingClusteringFileId = partitionPathToInPendingClusteringFileId.getOrDefault(partitionPath, Collections.emptyList());
List<SmallFile> smallFiles = partitionSmallFilesMap.get(partitionPath).stream()
.filter(smallFile -> !inPendingClusteringFileId.contains(smallFile.location.getFileId())).collect(Collectors.toList());

List<SmallFile> smallFiles = partitionSmallFilesMap.get(partitionPath);
this.smallFiles.addAll(smallFiles);

LOG.info("For partitionPath : " + partitionPath + " Small Files => " + smallFiles);
Expand Down
Loading

0 comments on commit 0f5e49e

Please sign in to comment.