Skip to content

Commit

Permalink
[FLINK-21925][core] Introduce fine-grained.shuffle-mode.all-blocking …
Browse files Browse the repository at this point in the history
…to avoid resource deadlock in batch jobs that apply fine-grained resource management

This closes apache#16307
  • Loading branch information
KarmaGYZ authored and xintongsong committed Jul 5, 2021
1 parent 3f36bbf commit 25fdf54
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,14 @@ public class ClusterOptions {
.withDescription(
"Defines whether the cluster uses fine-grained resource management.");

@Documentation.ExcludeFromDocumentation
public static final ConfigOption<Boolean> FINE_GRAINED_SHUFFLE_MODE_ALL_BLOCKING =
ConfigOptions.key("fine-grained.shuffle-mode.all-blocking")
.booleanType()
.defaultValue(false)
.withDescription(
"Whether to convert all PIPELINE edges to BLOCKING when apply fine-grained resource management in batch jobs.");

public static JobManagerOptions.SchedulerType getSchedulerType(Configuration configuration) {
if (isAdaptiveSchedulerEnabled(configuration) || isReactiveModeEnabled(configuration)) {
return JobManagerOptions.SchedulerType.Adaptive;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,12 @@
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.ClusterOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.configuration.ReadableConfig;
Expand Down Expand Up @@ -91,6 +93,7 @@
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.source.StatefulSequenceSource;
import org.apache.flink.streaming.api.functions.source.TimestampedFileInputSplit;
import org.apache.flink.streaming.api.graph.GlobalDataExchangeMode;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.streaming.api.graph.StreamGraphGenerator;
import org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator;
Expand Down Expand Up @@ -2099,6 +2102,24 @@ public StreamGraph getStreamGraph(String jobName) {
@Internal
public StreamGraph getStreamGraph(String jobName, boolean clearTransformations) {
StreamGraph streamGraph = getStreamGraphGenerator().setJobName(jobName).generate();

// There might be a resource deadlock when applying fine-grained resource management in
// batch jobs with PIPELINE edges. Users need to trigger the
// fine-grained.shuffle-mode.all-blocking to convert all edges to BLOCKING before we fix
// that issue.
if (configuration.get(ExecutionOptions.RUNTIME_MODE) == RuntimeExecutionMode.BATCH
&& streamGraph.hasFineGrainedResource()) {
if (configuration.get(ClusterOptions.FINE_GRAINED_SHUFFLE_MODE_ALL_BLOCKING)) {
streamGraph.setGlobalDataExchangeMode(GlobalDataExchangeMode.ALL_EDGES_BLOCKING);
} else {
throw new IllegalConfigurationException(
"At the moment, fine-grained resource management requires batch workloads to "
+ "be executed with types of all edges being BLOCKING. To do that, you need to configure '"
+ ClusterOptions.FINE_GRAINED_SHUFFLE_MODE_ALL_BLOCKING.key()
+ "' to 'true'. Notice that this may affect the performance. See FLINK-20865 for more details.");
}
}

if (clearTransformations) {
this.transformations.clear();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,11 @@ public Optional<ResourceProfile> getSlotSharingGroupResource(String groupId) {
return Optional.ofNullable(slotSharingGroupResources.get(groupId));
}

public boolean hasFineGrainedResource() {
return slotSharingGroupResources.values().stream()
.anyMatch(resourceProfile -> !resourceProfile.equals(ResourceProfile.UNKNOWN));
}

/**
* Set whether to put all vertices into the same slot sharing group by default.
*
Expand Down

0 comments on commit 25fdf54

Please sign in to comment.