diff --git a/flink-core/src/main/java/org/apache/flink/api/common/RuntimeExecutionMode.java b/flink-core/src/main/java/org/apache/flink/api/common/RuntimeExecutionMode.java index 5611fceed2681..5c882d8cab485 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/RuntimeExecutionMode.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/RuntimeExecutionMode.java @@ -17,7 +17,7 @@ package org.apache.flink.api.common; -import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.PublicEvolving; /** * Runtime execution mode of DataStream programs. Among other things, this controls task scheduling, @@ -27,7 +27,7 @@ * @see * https://cwiki.apache.org/confluence/display/FLINK/FLIP-134%3A+Batch+execution+for+the+DataStream+API */ -@Internal +@PublicEvolving public enum RuntimeExecutionMode { /** diff --git a/flink-python/pyflink/datastream/tests/test_stream_execution_environment_completeness.py b/flink-python/pyflink/datastream/tests/test_stream_execution_environment_completeness.py index 8cb4c5a3eae20..08262276e65ed 100644 --- a/flink-python/pyflink/datastream/tests/test_stream_execution_environment_completeness.py +++ b/flink-python/pyflink/datastream/tests/test_stream_execution_environment_completeness.py @@ -48,7 +48,8 @@ def excluded_methods(cls): 'createInput', 'createLocalEnvironmentWithWebUI', 'fromCollection', 'socketTextStream', 'initializeContextEnvironment', 'readTextFile', 'addSource', 'setNumberOfExecutionRetries', 'configure', 'executeAsync', 'registerJobListener', - 'clearJobListeners', 'getJobListeners', "fromSource", "fromSequence"} + 'clearJobListeners', 'getJobListeners', "fromSource", "fromSequence", + 'setRuntimeMode'} if __name__ == '__main__': diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java index 9a30d5dc59e71..2bf27bf5f66c7 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java @@ -284,6 +284,26 @@ public StreamExecutionEnvironment setParallelism(int parallelism) { return this; } + /** + * Sets the runtime execution mode for the application (see {@link RuntimeExecutionMode}). + * This is equivalent to setting the {@code execution.runtime-mode} in your application's + * configuration file. + * + *
We recommend users to NOT use this method but set the {@code execution.runtime-mode}
+ * using the command-line when submitting the application. Keeping the application code
+ * configuration-free allows for more flexibility as the same application will be able to
+ * be executed in any execution mode.
+ *
+ * @param executionMode the desired execution mode.
+ * @return The execution environment of your application.
+ */
+ @PublicEvolving
+ public StreamExecutionEnvironment setRuntimeMode(final RuntimeExecutionMode executionMode) {
+ checkNotNull(executionMode);
+ configuration.set(ExecutionOptions.RUNTIME_MODE, executionMode);
+ return this;
+ }
+
/**
* Sets the maximum degree of parallelism defined for the program. The upper limit (inclusive)
* is Short.MAX_VALUE.
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
index 43fd7dbaedf51..6bb2acabbcb33 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamExecutionEnvironment.scala
@@ -20,6 +20,7 @@ package org.apache.flink.streaming.api.scala
import com.esotericsoftware.kryo.Serializer
import org.apache.flink.annotation.{Experimental, Internal, Public, PublicEvolving}
+import org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.api.common.eventtime.WatermarkStrategy
import org.apache.flink.api.common.io.{FileInputFormat, FilePathFilter, InputFormat}
import org.apache.flink.api.common.restartstrategy.RestartStrategies.RestartStrategyConfiguration
@@ -75,6 +76,25 @@ class StreamExecutionEnvironment(javaEnv: JavaEnv) {
javaEnv.setParallelism(parallelism)
}
+ /**
+ * Sets the runtime execution mode for the application (see [[RuntimeExecutionMode]]).
+ * This is equivalent to setting the "execution.runtime-mode" in your application's
+ * configuration file.
+ *
+ * We recommend users to NOT use this method but set the "execution.runtime-mode"
+ * using the command-line when submitting the application. Keeping the application code
+ * configuration-free allows for more flexibility as the same application will be able to
+ * be executed in any execution mode.
+ *
+ * @param executionMode the desired execution mode.
+ * @return The execution environment of your application.
+ */
+ @PublicEvolving
+ def setRuntimeMode(executionMode: RuntimeExecutionMode): StreamExecutionEnvironment = {
+ javaEnv.setRuntimeMode(executionMode)
+ this
+ }
+
/**
* Sets the maximum degree of parallelism defined for the program.
* The maximum degree of parallelism specifies the upper limit for dynamic scaling. It also
diff --git a/flink-tests/src/test/java/org/apache/flink/api/datastream/DataStreamBatchExecutionITCase.java b/flink-tests/src/test/java/org/apache/flink/api/datastream/DataStreamBatchExecutionITCase.java
index e9617c08f70c9..dd8c6744aa74e 100644
--- a/flink-tests/src/test/java/org/apache/flink/api/datastream/DataStreamBatchExecutionITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/api/datastream/DataStreamBatchExecutionITCase.java
@@ -22,8 +22,6 @@
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Time;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
@@ -156,12 +154,8 @@ public void batchSumSingleResultPerKey() throws Exception {
}
private StreamExecutionEnvironment getExecutionEnvironment() {
-
- Configuration config = new Configuration();
- config.set(ExecutionOptions.RUNTIME_MODE, RuntimeExecutionMode.BATCH);
-
- final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(
- config);
+ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setRuntimeMode(RuntimeExecutionMode.BATCH);
env.setParallelism(1);
// trick the collecting sink into working even in the face of failures 🙏
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkITCase.java
index ececbd31a3f2c..ac55c0277de35 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SinkITCase.java
@@ -20,8 +20,6 @@
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.typeinfo.IntegerTypeInfo;
import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.runtime.operators.sink.TestSink;
import org.apache.flink.streaming.util.FiniteTestSource;
@@ -235,17 +233,14 @@ private static List