Skip to content

Commit

Permalink
[FLINK-23498][streaming-java] Remove StreamExecutionEnvironment.getCo…
Browse files Browse the repository at this point in the history
…nfiguration()
  • Loading branch information
twalthr committed Aug 4, 2021
1 parent cbdd052 commit 659ffa7
Show file tree
Hide file tree
Showing 10 changed files with 57 additions and 61 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -97,11 +97,11 @@ private JobExecutionResult getJobExecutionResult(final JobClient jobClient) thro
checkNotNull(jobClient);

JobExecutionResult jobExecutionResult;
if (getConfiguration().getBoolean(DeploymentOptions.ATTACHED)) {
if (configuration.getBoolean(DeploymentOptions.ATTACHED)) {
CompletableFuture<JobExecutionResult> jobExecutionResultFuture =
jobClient.getJobExecutionResult();

if (getConfiguration().getBoolean(DeploymentOptions.SHUTDOWN_IF_ATTACHED)) {
if (configuration.getBoolean(DeploymentOptions.SHUTDOWN_IF_ATTACHED)) {
Thread shutdownHook =
ShutdownHookUtil.addShutdownHook(
() -> {
Expand Down
7 changes: 3 additions & 4 deletions flink-python/pyflink/datastream/tests/test_data_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,16 +40,15 @@
TimeWindowSerializer)
from pyflink.java_gateway import get_gateway
from pyflink.datastream.tests.test_util import DataStreamTestSinkFunction
from pyflink.testing.test_case_utils import invoke_java_object_method, \
PyFlinkBatchTestCase, PyFlinkStreamingTestCase
from pyflink.testing.test_case_utils import PyFlinkBatchTestCase, PyFlinkStreamingTestCase
from pyflink.util.java_utils import get_j_env_configuration


class DataStreamTests(object):

def setUp(self) -> None:
super(DataStreamTests, self).setUp()
config = invoke_java_object_method(
self.env._j_stream_execution_environment, "getConfiguration")
config = get_j_env_configuration(self.env._j_stream_execution_environment)
config.setString("akka.ask.timeout", "20 s")
self.test_sink = DataStreamTestSinkFunction()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,7 @@
from pyflink.pyflink_gateway_server import on_windows
from pyflink.table import DataTypes, CsvTableSource, CsvTableSink, StreamTableEnvironment, \
EnvironmentSettings
from pyflink.testing.test_case_utils import PyFlinkTestCase, exec_insert_table, \
invoke_java_object_method
from pyflink.testing.test_case_utils import PyFlinkTestCase, exec_insert_table
from pyflink.util.java_utils import get_j_env_configuration


Expand Down Expand Up @@ -129,8 +128,7 @@ def test_get_set_max_parallelism(self):
def test_set_runtime_mode(self):
self.env.set_runtime_mode(RuntimeExecutionMode.BATCH)

config = invoke_java_object_method(
self.env._j_stream_execution_environment, "getConfiguration")
config = get_j_env_configuration(self.env._j_stream_execution_environment)
runtime_mode = config.getValue(
get_gateway().jvm.org.apache.flink.configuration.ExecutionOptions.RUNTIME_MODE)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
from pyflink.testing import source_sink_utils
from pyflink.testing.test_case_utils import (
PyFlinkBatchTableTestCase, PyFlinkStreamTableTestCase, PyFlinkTestCase,
_load_specific_flink_module_jars, invoke_java_object_method)
_load_specific_flink_module_jars)
from pyflink.util.java_utils import get_j_env_configuration


Expand Down Expand Up @@ -246,8 +246,7 @@ def setUp(self) -> None:
self.t_env = StreamTableEnvironment.create(self.env)

self.env.set_parallelism(2)
config = invoke_java_object_method(
self.env._j_stream_execution_environment, "getConfiguration")
config = get_j_env_configuration(self.env._j_stream_execution_environment)
config.setString("akka.ask.timeout", "20 s")
self.t_env.get_config().get_configuration().set_string(
"python.fn-execution.bundle.size", "1")
Expand Down
10 changes: 5 additions & 5 deletions flink-python/pyflink/util/java_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,11 +84,11 @@ def get_j_env_configuration(j_env):
if is_instance_of(j_env, "org.apache.flink.api.java.ExecutionEnvironment"):
return j_env.getConfiguration()
else:
return invoke_method(
j_env,
"org.apache.flink.streaming.api.environment.StreamExecutionEnvironment",
"getConfiguration"
)
env_clazz = load_java_class(
"org.apache.flink.streaming.api.environment.StreamExecutionEnvironment")
field = env_clazz.getDeclaredField("configuration")
field.setAccessible(True)
return field.get(j_env)


def invoke_method(obj, object_type, method_name, args=None, arg_types=None):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@

import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.List;

/**
Expand All @@ -66,34 +65,34 @@ public class PythonConfigUtil {
* python dependency management configurations.
*/
public static Configuration getEnvConfigWithDependencies(StreamExecutionEnvironment env)
throws InvocationTargetException, IllegalAccessException, NoSuchMethodException {
throws InvocationTargetException, IllegalAccessException, NoSuchFieldException {
return PythonDependencyUtils.configurePythonDependencies(
env.getCachedFiles(), getEnvironmentConfig(env));
}

/**
* Get the private method {@link StreamExecutionEnvironment#getConfiguration()} by reflection
* recursively. Then access the method to get the configuration of the given
* Get the private field {@code StreamExecutionEnvironment#configuration} by reflection
* recursively. Then access the field to get the configuration of the given
* StreamExecutionEnvironment.
*/
public static Configuration getEnvironmentConfig(StreamExecutionEnvironment env)
throws InvocationTargetException, IllegalAccessException, NoSuchMethodException {
Method getConfigurationMethod = null;
throws InvocationTargetException, IllegalAccessException, NoSuchFieldException {
Field configurationField = null;
for (Class<?> clz = env.getClass(); clz != Object.class; clz = clz.getSuperclass()) {
try {
getConfigurationMethod = clz.getDeclaredMethod("getConfiguration");
configurationField = clz.getDeclaredField("configuration");
break;
} catch (NoSuchMethodException e) {
} catch (NoSuchFieldException e) {
// ignore
}
}

if (getConfigurationMethod == null) {
throw new NoSuchMethodException("Method getConfigurationMethod not found.");
if (configurationField == null) {
throw new NoSuchFieldException("Field 'configuration' not found.");
}

getConfigurationMethod.setAccessible(true);
return (Configuration) getConfigurationMethod.invoke(env);
configurationField.setAccessible(true);
return (Configuration) configurationField.get(env);
}

/**
Expand All @@ -105,8 +104,7 @@ public static Configuration getEnvironmentConfig(StreamExecutionEnvironment env)
*/
public static StreamGraph generateStreamGraphWithDependencies(
StreamExecutionEnvironment env, boolean clearTransformations)
throws IllegalAccessException, NoSuchMethodException, InvocationTargetException,
NoSuchFieldException {
throws IllegalAccessException, InvocationTargetException, NoSuchFieldException {
configPythonOperator(env);

String jobName =
Expand All @@ -118,8 +116,7 @@ public static StreamGraph generateStreamGraphWithDependencies(

@SuppressWarnings("unchecked")
public static void configPythonOperator(StreamExecutionEnvironment env)
throws IllegalAccessException, NoSuchMethodException, InvocationTargetException,
NoSuchFieldException {
throws IllegalAccessException, InvocationTargetException, NoSuchFieldException {
Configuration mergedConfig = getEnvConfigWithDependencies(env);

boolean executedInBatchMode = isExecuteInBatchMode(env, mergedConfig);
Expand Down Expand Up @@ -166,7 +163,7 @@ public static Configuration getMergedConfig(
PythonDependencyUtils.configurePythonDependencies(env.getCachedFiles(), config);
mergedConfig.setString("table.exec.timezone", tableConfig.getLocalTimeZone().getId());
return mergedConfig;
} catch (IllegalAccessException | NoSuchMethodException | InvocationTargetException e) {
} catch (IllegalAccessException | NoSuchFieldException | InvocationTargetException e) {
throw new TableException("Method getMergedConfig failed.", e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public class PythonConfigUtilTest {

@Test
public void testGetEnvironmentConfig()
throws IllegalAccessException, NoSuchMethodException, InvocationTargetException {
throws IllegalAccessException, NoSuchFieldException, InvocationTargetException {
StreamExecutionEnvironment executionEnvironment =
StreamExecutionEnvironment.getExecutionEnvironment();
Configuration envConfig =
Expand All @@ -46,8 +46,7 @@ public void testGetEnvironmentConfig()

@Test
public void testJobName()
throws IllegalAccessException, NoSuchMethodException, InvocationTargetException,
NoSuchFieldException {
throws IllegalAccessException, InvocationTargetException, NoSuchFieldException {
String jobName = "MyTestJob";
Configuration config = new Configuration();
config.set(PipelineOptions.NAME, jobName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@ public JobClient executeAsync(StreamGraph streamGraph) throws Exception {
}

private void updateDependencies() throws Exception {
final Configuration configuration = getConfiguration();
checkState(
configuration.getBoolean(DeploymentOptions.ATTACHED),
"Only ATTACHED mode is supported by the scala shell.");
Expand All @@ -85,7 +84,7 @@ private void updateDependencies() throws Exception {
}

public Configuration getClientConfiguration() {
return getConfiguration();
return configuration;
}

private List<URL> getUpdatedJarFiles() throws MalformedURLException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,8 +220,8 @@ public JobExecutionResult execute(StreamGraph streamGraph) throws Exception {

@Override
public String toString() {
final String host = getConfiguration().getString(JobManagerOptions.ADDRESS);
final int port = getConfiguration().getInteger(JobManagerOptions.PORT);
final String host = configuration.getString(JobManagerOptions.ADDRESS);
final int port = configuration.getInteger(JobManagerOptions.PORT);
final String parallelism = (getParallelism() == -1 ? "default" : "" + getParallelism());

return "Remote Environment ("
Expand All @@ -239,7 +239,7 @@ public String toString() {
* @return The hostname of the master
*/
public String getHost() {
return getConfiguration().getString(JobManagerOptions.ADDRESS);
return configuration.getString(JobManagerOptions.ADDRESS);
}

/**
Expand All @@ -248,12 +248,12 @@ public String getHost() {
* @return The port of the master
*/
public int getPort() {
return getConfiguration().getInteger(JobManagerOptions.PORT);
return configuration.getInteger(JobManagerOptions.PORT);
}

/** @deprecated This method is going to be removed in the next releases. */
@Deprecated
public Configuration getClientConfiguration() {
return getConfiguration();
return configuration;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.DeploymentOptions;
Expand Down Expand Up @@ -190,7 +191,16 @@ public class StreamExecutionEnvironment {

private final PipelineExecutorServiceLoader executorServiceLoader;

private final Configuration configuration;
/**
* Currently, configuration is split across multiple member variables and classes such as {@link
* ExecutionConfig} or {@link CheckpointConfig}. This architecture makes it quite difficult to
* handle/merge/enrich configuration or restrict access in other APIs.
*
* <p>In the long-term, this {@link Configuration} object should be the source of truth for
* newly added {@link ConfigOption}s that are relevant for DataStream API. Make sure to also
* update {@link #configure(ReadableConfig, ClassLoader)}.
*/
protected final Configuration configuration;

private final ClassLoader userClassloader;

Expand Down Expand Up @@ -262,10 +272,6 @@ public StreamExecutionEnvironment(
this.configure(this.configuration, this.userClassloader);
}

protected Configuration getConfiguration() {
return this.configuration;
}

protected ClassLoader getUserClassloader() {
return userClassloader;
}
Expand Down Expand Up @@ -976,26 +982,25 @@ public void configure(ReadableConfig configuration, ClassLoader classLoader) {
.getOptional(ExecutionOptions.SORT_INPUTS)
.ifPresent(
sortInputs ->
this.getConfiguration()
.set(ExecutionOptions.SORT_INPUTS, sortInputs));
this.configuration.set(ExecutionOptions.SORT_INPUTS, sortInputs));
configuration
.getOptional(ExecutionOptions.USE_BATCH_STATE_BACKEND)
.ifPresent(
sortInputs ->
this.getConfiguration()
.set(ExecutionOptions.USE_BATCH_STATE_BACKEND, sortInputs));
this.configuration.set(
ExecutionOptions.USE_BATCH_STATE_BACKEND, sortInputs));
configuration
.getOptional(PipelineOptions.NAME)
.ifPresent(jobName -> this.getConfiguration().set(PipelineOptions.NAME, jobName));
.ifPresent(jobName -> this.configuration.set(PipelineOptions.NAME, jobName));

configuration
.getOptional(ExecutionCheckpointingOptions.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH)
.ifPresent(
flag ->
this.getConfiguration()
.set(
ExecutionCheckpointingOptions
.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH,
flag));
this.configuration.set(
ExecutionCheckpointingOptions
.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH,
flag));

config.configure(configuration, classLoader);
checkpointCfg.configure(configuration);
Expand Down Expand Up @@ -2130,7 +2135,7 @@ private StreamGraphGenerator getStreamGraphGenerator() {

final RuntimeExecutionMode executionMode = configuration.get(ExecutionOptions.RUNTIME_MODE);

return new StreamGraphGenerator(transformations, config, checkpointCfg, getConfiguration())
return new StreamGraphGenerator(transformations, config, checkpointCfg, configuration)
.setRuntimeExecutionMode(executionMode)
.setStateBackend(defaultStateBackend)
.setChangelogStateBackendEnabled(changelogStateBackendEnabled)
Expand Down

0 comments on commit 659ffa7

Please sign in to comment.