From 0f5e49e1a2f55fae5958a358c14284fa0a70c78a Mon Sep 17 00:00:00 2001 From: liwei Date: Tue, 24 Nov 2020 00:51:03 +0800 Subject: [PATCH] [HUDI-1354] Block updates and replace on file groups in clustering --- .../hudi/config/HoodieClusteringConfig.java | 70 ++++++++++++++ .../apache/hudi/config/HoodieWriteConfig.java | 9 ++ .../HoodieClusteringUpdateException.java | 29 ++++++ .../update/RejectUpdateStrategy.java | 61 ++++++++++++ .../clustering/update/UpdateStrategy.java | 41 ++++++++ .../commit/BaseCommitActionExecutor.java | 11 +++ .../commit/BaseSparkCommitActionExecutor.java | 2 + .../action/commit/UpsertPartitioner.java | 12 ++- .../TestHoodieClientOnCopyOnWriteStorage.java | 94 +++++++++++++++++++ 9 files changed, 328 insertions(+), 1 deletion(-) create mode 100644 hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java create mode 100644 hudi-client/hudi-client-common/src/main/java/org/apache/hudi/exception/HoodieClusteringUpdateException.java create mode 100644 hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clustering/update/RejectUpdateStrategy.java create mode 100644 hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clustering/update/UpdateStrategy.java diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java new file mode 100644 index 000000000000..eb110fe02ccd --- /dev/null +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieClusteringConfig.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.config; + +import org.apache.hudi.common.config.DefaultHoodieConfig; +import org.apache.hudi.table.action.clustering.update.RejectUpdateStrategy; +import org.apache.hudi.table.action.clustering.update.UpdateStrategy; + +import java.io.File; +import java.io.FileReader; +import java.io.IOException; +import java.util.Properties; + +public class HoodieClusteringConfig extends DefaultHoodieConfig { + + public static final String CLUSTERING_UPDATES_STRATEGY_PROP = "hoodie.clustering.updates.strategy"; + public static final String DEFAULT_CLUSTERING_UPDATES_STRATEGY = RejectUpdateStrategy.class.getName(); + + public HoodieClusteringConfig(Properties props) { + super(props); + } + + public static Builder newBuilder() { + return new Builder(); + } + + public static class Builder { + private final Properties props = new Properties(); + + public Builder fromProperties(Properties props) { + this.props.putAll(props); + return this; + } + + public Builder fromFile(File propertiesFile) throws IOException { + try (FileReader reader = new FileReader(propertiesFile)) { + this.props.load(reader); + return this; + } + } + + public Builder withClusteringUpdatesStrategy(UpdateStrategy updatesStrategy) { + props.setProperty(CLUSTERING_UPDATES_STRATEGY_PROP, updatesStrategy.getClass().getName()); + return this; + } + + public HoodieClusteringConfig build() { + HoodieClusteringConfig config = new HoodieClusteringConfig(props); + setDefaultOnCondition(props, !props.containsKey(CLUSTERING_UPDATES_STRATEGY_PROP), + CLUSTERING_UPDATES_STRATEGY_PROP, DEFAULT_CLUSTERING_UPDATES_STRATEGY); + return config; + } + } +} \ No newline at end of file diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java index 42d3e2b40456..4087197d3884 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java @@ -31,6 +31,7 @@ import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.metrics.MetricsReporterType; import org.apache.hudi.metrics.datadog.DatadogHttpClient.ApiSite; +import org.apache.hudi.table.action.clustering.update.UpdateStrategy; import org.apache.hudi.table.action.compact.strategy.CompactionStrategy; import org.apache.parquet.hadoop.metadata.CompressionCodecName; @@ -49,6 +50,7 @@ import java.util.function.Supplier; import java.util.stream.Collectors; + /** * Class storing configs for the HoodieWriteClient. */ @@ -381,6 +383,10 @@ public Boolean shouldCleanBootstrapBaseFile() { return Boolean.valueOf(props.getProperty(HoodieCompactionConfig.CLEANER_BOOTSTRAP_BASE_FILE_ENABLED)); } + public UpdateStrategy getClusteringUpdatesStrategy() { + return ReflectionUtils.loadClass(props.getProperty(HoodieClusteringConfig.CLUSTERING_UPDATES_STRATEGY_PROP)); + } + /** * index properties. */ @@ -774,6 +780,7 @@ public static class Builder { private boolean isIndexConfigSet = false; private boolean isStorageConfigSet = false; private boolean isCompactionConfigSet = false; + private boolean isClusteringConfigSet = false; private boolean isMetricsConfigSet = false; private boolean isBootstrapConfigSet = false; private boolean isMemoryConfigSet = false; @@ -1038,6 +1045,8 @@ protected void setDefaults() { setDefaultOnCondition(props, !isStorageConfigSet, HoodieStorageConfig.newBuilder().fromProperties(props).build()); setDefaultOnCondition(props, !isCompactionConfigSet, HoodieCompactionConfig.newBuilder().fromProperties(props).build()); + setDefaultOnCondition(props, !isClusteringConfigSet, + HoodieClusteringConfig.newBuilder().fromProperties(props).build()); setDefaultOnCondition(props, !isMetricsConfigSet, HoodieMetricsConfig.newBuilder().fromProperties(props).build()); setDefaultOnCondition(props, !isBootstrapConfigSet, HoodieBootstrapConfig.newBuilder().fromProperties(props).build()); diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/exception/HoodieClusteringUpdateException.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/exception/HoodieClusteringUpdateException.java new file mode 100644 index 000000000000..68b62a542170 --- /dev/null +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/exception/HoodieClusteringUpdateException.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.exception; + +public class HoodieClusteringUpdateException extends HoodieException { + public HoodieClusteringUpdateException(String msg) { + super(msg); + } + + public HoodieClusteringUpdateException(String msg, Throwable e) { + super(msg, e); + } +} diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clustering/update/RejectUpdateStrategy.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clustering/update/RejectUpdateStrategy.java new file mode 100644 index 000000000000..1504405ed38f --- /dev/null +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clustering/update/RejectUpdateStrategy.java @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.action.clustering.update; + +import org.apache.hudi.common.model.HoodieFileGroupId; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.exception.HoodieClusteringUpdateException; +import org.apache.hudi.table.WorkloadProfile; +import org.apache.hudi.table.WorkloadStat; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; + +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +public class RejectUpdateStrategy implements UpdateStrategy { + private static final Logger LOG = LogManager.getLogger(RejectUpdateStrategy.class); + + @Override + public void apply(List> fileGroupsInPendingClustering, WorkloadProfile workloadProfile) { + Set> partitionPathAndFileIds = fileGroupsInPendingClustering.stream() + .map(entry -> Pair.of(entry.getLeft().getPartitionPath(), entry.getLeft().getFileId())).collect(Collectors.toSet()); + if (partitionPathAndFileIds.size() == 0) { + return; + } + + Set> partitionStatEntries = workloadProfile.getPartitionPathStatMap().entrySet(); + for (Map.Entry partitionStat : partitionStatEntries) { + for (Map.Entry> updateLocEntry : + partitionStat.getValue().getUpdateLocationToCount().entrySet()) { + String partitionPath = partitionStat.getKey(); + String fileId = updateLocEntry.getKey(); + if (partitionPathAndFileIds.contains(Pair.of(partitionPath, fileId))) { + String msg = String.format("Not allowed to update the clustering files partition: %s fileID: %s. " + + "For pending clustering operations, we are not going to support update for now.", partitionPath, fileId); + LOG.error(msg); + throw new HoodieClusteringUpdateException(msg); + } + } + } + } +} diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clustering/update/UpdateStrategy.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clustering/update/UpdateStrategy.java new file mode 100644 index 000000000000..72a354eca66a --- /dev/null +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/clustering/update/UpdateStrategy.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.action.clustering.update; + +import org.apache.hudi.common.model.HoodieFileGroupId; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.table.WorkloadProfile; + +import java.util.List; + +/** + * When file groups in clustering, write records to these file group need to check. + */ +public interface UpdateStrategy { + + /** + * check the update records to the file group in clustering. + * @param fileGroupsInPendingClustering + * @param workloadProfile workloadProfile have the records update info, + * just like BaseSparkCommitActionExecutor.getUpsertPartitioner use it. + */ + void apply(List> fileGroupsInPendingClustering, WorkloadProfile workloadProfile); + +} diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java index 71de9b6fc6f7..cbb87d2fcf45 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java @@ -32,6 +32,7 @@ import org.apache.hudi.common.util.Option; import org.apache.hudi.config.HoodieWriteConfig; import org.apache.hudi.exception.HoodieCommitException; +import org.apache.hudi.exception.HoodieException; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.WorkloadProfile; @@ -49,6 +50,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; public abstract class BaseCommitActionExecutor extends BaseActionExecutor { @@ -68,6 +70,15 @@ public BaseCommitActionExecutor(HoodieEngineContext context, HoodieWriteConfig c this.taskContextSupplier = context.getTaskContextSupplier(); } + protected void clusteringUpdateCheck(WorkloadProfile profile) { + if (profile == null) { + throw new HoodieException("Need workload profile to check clustering update now."); + } + // apply clustering update strategy. + config.getClusteringUpdatesStrategy() + .apply(table.getFileSystemView().getFileGroupsInPendingClustering().collect(Collectors.toList()), profile); + } + public abstract HoodieWriteMetadata execute(I inputRecords); /** diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java index ad62db9250ed..48bd189a74b7 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/BaseSparkCommitActionExecutor.java @@ -106,6 +106,8 @@ public HoodieWriteMetadata> execute(JavaRDD saveWorkloadProfileMetadataToInflight(profile, instantTime); } + // use profile to check clustering update + this.clusteringUpdateCheck(profile); // partition using the insert partitioner final Partitioner partitioner = getPartitioner(profile); JavaRDD> partitionedRecords = partition(inputRecordsRDD, partitioner); diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java index b28c89a53646..cb46920f86b8 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/commit/UpsertPartitioner.java @@ -140,11 +140,21 @@ private void assignInserts(WorkloadProfile profile, HoodieEngineContext context) Map> partitionSmallFilesMap = getSmallFilesForPartitions(new ArrayList(partitionPaths), context); + // get the in pending clustering fileId for each partition path + Map> partitionPathToInPendingClusteringFileId = + table.getFileSystemView().getFileGroupsInPendingClustering() + .map(fileGroupIdAndInstantPair -> + Pair.of(fileGroupIdAndInstantPair.getKey().getPartitionPath(), fileGroupIdAndInstantPair.getKey().getFileId())) + .collect(Collectors.groupingBy(Pair::getKey, Collectors.mapping(Pair::getValue, Collectors.toList()))); + for (String partitionPath : partitionPaths) { WorkloadStat pStat = profile.getWorkloadStat(partitionPath); if (pStat.getNumInserts() > 0) { + // exclude the small file in pending clustering, because in pending clustering file not support update now. + List inPendingClusteringFileId = partitionPathToInPendingClusteringFileId.getOrDefault(partitionPath, Collections.emptyList()); + List smallFiles = partitionSmallFilesMap.get(partitionPath).stream() + .filter(smallFile -> !inPendingClusteringFileId.contains(smallFile.location.getFileId())).collect(Collectors.toList()); - List smallFiles = partitionSmallFilesMap.get(partitionPath); this.smallFiles.addAll(smallFiles); LOG.info("For partitionPath : " + partitionPath + " Small Files => " + smallFiles); diff --git a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java index bbb40488bb04..190f352786d7 100644 --- a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java +++ b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestHoodieClientOnCopyOnWriteStorage.java @@ -21,6 +21,8 @@ import org.apache.avro.generic.GenericRecord; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.Path; +import org.apache.hudi.avro.model.HoodieClusteringPlan; +import org.apache.hudi.avro.model.HoodieRequestedReplaceMetadata; import org.apache.hudi.common.fs.ConsistencyGuardConfig; import org.apache.hudi.common.fs.FSUtils; import org.apache.hudi.common.model.HoodieBaseFile; @@ -29,15 +31,19 @@ import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.model.HoodieWriteStat; import org.apache.hudi.common.model.IOType; +import org.apache.hudi.common.model.FileSlice; +import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.table.HoodieTableMetaClient; import org.apache.hudi.common.table.timeline.HoodieActiveTimeline; import org.apache.hudi.common.table.timeline.HoodieInstant; import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.table.timeline.TimelineMetadataUtils; import org.apache.hudi.common.table.timeline.versioning.TimelineLayoutVersion; import org.apache.hudi.common.table.view.TableFileSystemView.BaseFileOnlyView; import org.apache.hudi.common.testutils.HoodieTestDataGenerator; import org.apache.hudi.common.testutils.HoodieTestTable; import org.apache.hudi.common.testutils.RawTripTestPayload; +import org.apache.hudi.common.util.ClusteringUtils; import org.apache.hudi.common.util.FileIOUtils; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.ParquetUtils; @@ -49,9 +55,11 @@ import org.apache.hudi.exception.HoodieCommitException; import org.apache.hudi.exception.HoodieIOException; import org.apache.hudi.exception.HoodieRollbackException; +import org.apache.hudi.exception.HoodieUpsertException; import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.index.HoodieIndex.IndexType; import org.apache.hudi.table.HoodieSparkTable; +import org.apache.hudi.table.HoodieSparkCopyOnWriteTable; import org.apache.hudi.table.HoodieTable; import org.apache.hudi.table.MarkerFiles; import org.apache.hudi.table.action.commit.SparkWriteHelper; @@ -73,6 +81,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -104,6 +113,13 @@ public class TestHoodieClientOnCopyOnWriteStorage extends HoodieClientTestBase { private static final Logger LOG = LogManager.getLogger(TestHoodieClientOnCopyOnWriteStorage.class); + private static final String CLUSTERING_STRATEGY_CLASS = "org.apache.hudi.DefaultClusteringStrategy"; + private static final Map STRATEGY_PARAMS = new HashMap() { + { + put("sortColumn", "record_key"); + } + }; + private HoodieTestTable testTable; @BeforeEach @@ -627,6 +643,72 @@ private void assertActualAndExpectedPartitionPathRecordKeyMatches(Set, List> insertBatchRecords(SparkRDDWriteClient client, String commitTime, Integer recordNum, int expectStatueSize) { + client.startCommitWithTime(commitTime); + List inserts1 = dataGen.generateInserts(commitTime, recordNum); + JavaRDD insertRecordsRDD1 = jsc.parallelize(inserts1, 1); + List statuses = client.upsert(insertRecordsRDD1, commitTime).collect(); + assertNoWriteErrors(statuses); + assertEquals(expectStatueSize, statuses.size(), "check expect statue size."); + return Pair.of(statuses, inserts1); + } + + @Test + public void testUpdateRejectForClustering() throws IOException { + final String testPartitionPath = "2016/09/26"; + dataGen = new HoodieTestDataGenerator(new String[] {testPartitionPath}); + HoodieWriteConfig config = getSmallInsertWriteConfig(100); + SparkRDDWriteClient client = getHoodieWriteClient(config, false); + HoodieSparkCopyOnWriteTable table = (HoodieSparkCopyOnWriteTable) HoodieSparkTable.create(config, context, metaClient); + + //1. insert to generate 2 file group + String commitTime1 = "001"; + Pair, List> upsertResult = insertBatchRecords(client, commitTime1, 600, 2); + List statuses = upsertResult.getKey(); + List inserts1 = upsertResult.getValue(); + List fileGroupIds1 = table.getFileSystemView().getAllFileGroups(testPartitionPath) + .map(fileGroup -> fileGroup.getFileGroupId().getFileId()).collect(Collectors.toList()); + assertEquals(2, fileGroupIds1.size()); + + // 2. generate clustering plan for fileGroupIds1 file groups + String commitTime2 = "002"; + List> firstInsertFileSlicesList = table.getFileSystemView().getAllFileGroups(testPartitionPath) + .map(fileGroup -> fileGroup.getAllFileSlices().collect(Collectors.toList())).collect(Collectors.toList()); + List[] fileSlices = (List[])firstInsertFileSlicesList.toArray(new List[firstInsertFileSlicesList.size()]); + createRequestedReplaceInstant(this.metaClient, commitTime2, fileSlices); + + + // 3. insert one record with no updating reject exception, and not merge the small file, just generate a new file group + String commitTime3 = "003"; + statuses = insertBatchRecords(client, commitTime3, 1, 1).getKey(); + List fileGroupIds2 = table.getFileSystemView().getAllFileGroups(testPartitionPath) + .map(fileGroup -> fileGroup.getFileGroupId().getFileId()).collect(Collectors.toList()); + assertEquals(3, fileGroupIds2.size()); + + + // 4. update one record for the clustering two file groups, throw reject update exception + String commitTime4 = "004"; + client.startCommitWithTime(commitTime4); + List insertsAndUpdates3 = new ArrayList<>(); + insertsAndUpdates3.addAll(dataGen.generateUpdates(commitTime4, inserts1)); + assertNoWriteErrors(statuses); + String assertMsg = String.format("Not allowed to update the clustering files in partition: %s " + + "For pending clustering operations, we are not going to support update for now.", testPartitionPath); + assertThrows(HoodieUpsertException.class, () -> { + writeClient.upsert(jsc.parallelize(insertsAndUpdates3, 1), commitTime3).collect(); }, assertMsg); + + + // 5. insert one record with no updating reject exception, will merge the small file + String commitTime5 = "005"; + statuses = insertBatchRecords(client, commitTime5, 1, 1).getKey(); + fileGroupIds2.removeAll(fileGroupIds1); + assertEquals(fileGroupIds2.get(0), statuses.get(0).getFileId()); + List firstInsertFileGroupIds4 = table.getFileSystemView().getAllFileGroups(testPartitionPath) + .map(fileGroup -> fileGroup.getFileGroupId().getFileId()).collect(Collectors.toList()); + assertEquals(3, firstInsertFileGroupIds4.size()); + } + + /** * Test scenario of new file-group getting added during upsert(). */ @@ -1348,4 +1430,16 @@ private HoodieWriteConfig getSmallInsertWriteConfig(int insertSplitSize, boolean .parquetMaxFileSize(dataGen.getEstimatedFileSizeInBytes(200)).build()) .build(); } + + protected HoodieInstant createRequestedReplaceInstant(HoodieTableMetaClient metaClient, String clusterTime, List[] fileSlices) throws IOException { + HoodieClusteringPlan clusteringPlan = + ClusteringUtils.createClusteringPlan(CLUSTERING_STRATEGY_CLASS, STRATEGY_PARAMS, fileSlices, Collections.emptyMap()); + + HoodieInstant clusteringInstant = new HoodieInstant(HoodieInstant.State.REQUESTED, HoodieTimeline.REPLACE_COMMIT_ACTION, clusterTime); + HoodieRequestedReplaceMetadata requestedReplaceMetadata = HoodieRequestedReplaceMetadata.newBuilder() + .setClusteringPlan(clusteringPlan).setOperationType(WriteOperationType.CLUSTER.name()).build(); + metaClient.getActiveTimeline().saveToPendingReplaceCommit(clusteringInstant, TimelineMetadataUtils.serializeRequestedReplaceMetadata(requestedReplaceMetadata)); + return clusteringInstant; + } + }