Skip to content

Commit

Permalink
[FLINK-21935][state backends] Remove async parameter for HashMapState…
Browse files Browse the repository at this point in the history
…Backend.

Checkpoints are always asynchronous now.
  • Loading branch information
StephanEwen committed Mar 31, 2021
1 parent a93251b commit a31d0fe
Show file tree
Hide file tree
Showing 6 changed files with 9 additions and 123 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -212,12 +212,6 @@ public class DataStreamAllroundTestJobFactory {
.withDescription(
"Activate or deactivate incremental snapshots if RocksDBStateBackend is selected.");

private static final ConfigOption<Boolean> STATE_BACKEND_FILE_ASYNC =
ConfigOptions.key("state_backend.file.async")
.defaultValue(true)
.withDescription(
"Activate or deactivate asynchronous snapshots if FileStateBackend is selected.");

private static final ConfigOption<Integer> SEQUENCE_GENERATOR_SRC_KEYSPACE =
ConfigOptions.key("sequence_generator_source.keyspace").defaultValue(200);

Expand Down Expand Up @@ -356,12 +350,7 @@ private static void setupStateBackend(
final String stateBackend = pt.get(STATE_BACKEND.key(), STATE_BACKEND.defaultValue());

if ("hashmap".equalsIgnoreCase(stateBackend)) {
boolean asyncCheckpoints =
pt.getBoolean(
STATE_BACKEND_FILE_ASYNC.key(),
STATE_BACKEND_FILE_ASYNC.defaultValue());

env.setStateBackend(new HashMapStateBackend(asyncCheckpoints));
env.setStateBackend(new HashMapStateBackend());
} else if ("rocks".equalsIgnoreCase(stateBackend)) {
boolean incrementalCheckpoints =
pt.getBoolean(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,6 @@
* default <code>file</code>.
* <li>killJvmOnFail: flag that determines whether or not an artificial failure induced by the
* test kills the JVM or not.
* <li>asyncCheckpoints: flag for async checkpoints with file state backend, default <code>true
* </code>.
* <li>incrementalCheckpoints: flag for incremental checkpoint with rocks state backend, default
* <code>false</code>.
* <li>delay: sleep delay to throttle down the production of the source, default 0.
Expand Down Expand Up @@ -110,8 +108,7 @@ public static void main(String[] args) throws Exception {

String stateBackend = pt.get("stateBackend", "hashmap");
if ("hashmap".equals(stateBackend)) {
boolean asyncCheckpoints = pt.getBoolean("asyncCheckpoints", true);
env.setStateBackend(new HashMapStateBackend(asyncCheckpoints));
env.setStateBackend(new HashMapStateBackend());
} else if ("rocks".equals(stateBackend)) {
boolean incrementalCheckpoints = pt.getBoolean("incrementalCheckpoints", false);
env.setStateBackend(new EmbeddedRocksDBStateBackend(incrementalCheckpoints));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ public static void main(String[] args) throws Exception {
System.out.println("Options for both the above setups: ");
System.out.println("\t[--backend <hashmap|rocks>]");
System.out.println("\t[--checkpoint-dir <filepath>]");
System.out.println("\t[--async-checkpoints <true|false>]");
System.out.println("\t[--incremental-checkpoints <true|false>]");
System.out.println("\t[--output <filepath> OR null for stdout]");
System.out.println();
Expand All @@ -79,8 +78,7 @@ public static void main(String[] args) throws Exception {
final String stateBackend = params.get("backend", "memory");
if ("hashmap".equals(stateBackend)) {
final String checkpointDir = params.get("checkpoint-dir");
boolean asyncCheckpoints = params.getBoolean("async-checkpoints", false);
env.setStateBackend(new HashMapStateBackend(asyncCheckpoints));
env.setStateBackend(new HashMapStateBackend());
env.getCheckpointConfig().setCheckpointStorage(checkpointDir);
} else if ("rocks".equals(stateBackend)) {
final String checkpointDir = params.get("checkpoint-dir");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,9 @@
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
Expand All @@ -43,16 +41,12 @@
import org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder;
import org.apache.flink.runtime.state.heap.HeapPriorityQueueSetFactory;
import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
import org.apache.flink.util.TernaryBoolean;

import javax.annotation.Nonnull;

import java.io.IOException;
import java.net.URI;
import java.util.Collection;

import static org.apache.flink.util.Preconditions.checkNotNull;

/**
* This state backend holds the working state in the memory (JVM heap) of the TaskManagers and
* checkpoints based on the configured {@link org.apache.flink.runtime.state.CheckpointStorage}.
Expand Down Expand Up @@ -80,81 +74,12 @@ public class HashMapStateBackend extends AbstractStateBackend implements Configu

private static final long serialVersionUID = 1L;

// ------------------------------------------------------------------------

/**
* Switch to chose between synchronous and asynchronous snapshots. A value of 'undefined' means
* not yet configured, in which case the default will be used.
*/
private final TernaryBoolean asynchronousSnapshots;

// -----------------------------------------------------------------------

/**
* Creates a new state backend that stores its checkpoint data in the file system and location
* defined by the given URI.
*
* <p>A file system for the file system scheme in the URI (e.g., 'file://', 'hdfs://', or
* 'S3://') must be accessible via {@link FileSystem#get(URI)}.
*
* <p>For a state backend targeting HDFS, this means that the URI must either specify the
* authority (host and port), or that the Hadoop configuration that describes that information
* must be in the classpath.
*/
public HashMapStateBackend() {
this(TernaryBoolean.UNDEFINED);
}

/**
* Creates a new state backend that stores its checkpoint data in the file system and location
* defined by the given URI.
*
* <p>A file system for the file system scheme in the URI (e.g., 'file://', 'hdfs://', or
* 'S3://') must be accessible via {@link FileSystem#get(URI)}.
*
* <p>For a state backend targeting HDFS, this means that the URI must either specify the
* authority (host and port), or that the Hadoop configuration that describes that information
* must be in the classpath.
*
* @param asynchronousSnapshots Flag to switch between synchronous and asynchronous snapshot
* mode.
*/
public HashMapStateBackend(boolean asynchronousSnapshots) {
this(TernaryBoolean.fromBoolean(asynchronousSnapshots));
}

/**
* Creates a new state backend that stores its checkpoint data in the file system and location
* defined by the given URI.
*
* <p>A file system for the file system scheme in the URI (e.g., 'file://', 'hdfs://', or
* 'S3://') must be accessible via {@link FileSystem#get(URI)}.
*
* <p>For a state backend targeting HDFS, this means that the URI must either specify the
* authority (host and port), or that the Hadoop configuration that describes that information
* must be in the classpath.
*
* @param asynchronousSnapshots Flag to switch between synchronous and asynchronous snapshot
* mode. If UNDEFINED, the value configured in the runtime configuration will be used.
*/
public HashMapStateBackend(TernaryBoolean asynchronousSnapshots) {
checkNotNull(asynchronousSnapshots, "asynchronousSnapshots");

this.asynchronousSnapshots = asynchronousSnapshots;
}

private HashMapStateBackend(HashMapStateBackend original, ReadableConfig config) {
// if asynchronous snapshots were configured, use that setting,
// else check the configuration
this.asynchronousSnapshots =
original.asynchronousSnapshots.resolveUndefined(
config.get(CheckpointingOptions.ASYNC_SNAPSHOTS));
}

@Override
public HashMapStateBackend configure(ReadableConfig config, ClassLoader classLoader)
throws IllegalConfigurationException {
return new HashMapStateBackend(this, config);
return this;
}

@Override
Expand Down Expand Up @@ -189,7 +114,7 @@ public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
getCompressionDecorator(env.getExecutionConfig()),
localRecoveryConfig,
priorityQueueSetFactory,
isUsingAsynchronousSnapshots(),
true,
cancelStreamRegistry)
.build();
}
Expand All @@ -205,20 +130,9 @@ public OperatorStateBackend createOperatorStateBackend(
return new DefaultOperatorStateBackendBuilder(
env.getUserCodeClassLoader().asClassLoader(),
env.getExecutionConfig(),
isUsingAsynchronousSnapshots(),
true,
stateHandles,
cancelStreamRegistry)
.build();
}

/**
* Gets whether the key/value data structures are asynchronously snapshotted.
*
* <p>If not explicitly configured, this is the default value of {@link
* CheckpointingOptions#ASYNC_SNAPSHOTS}.
*/
public boolean isUsingAsynchronousSnapshots() {
return asynchronousSnapshots.getOrDefault(
CheckpointingOptions.ASYNC_SNAPSHOTS.defaultValue());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,10 @@ public static List<Object[]> modes() {
return Arrays.asList(
new Object[][] {
{
true,
(SupplierWithException<CheckpointStorage, IOException>)
JobManagerCheckpointStorage::new
},
{
false,
(SupplierWithException<CheckpointStorage, IOException>)
() -> {
String checkpointPath =
Expand All @@ -64,15 +62,12 @@ public static List<Object[]> modes() {
});
}

@Parameterized.Parameter(value = 0)
public boolean useAsyncMode;

@Parameterized.Parameter(value = 1)
@Parameterized.Parameter
public SupplierWithException<CheckpointStorage, IOException> storageSupplier;

@Override
protected HashMapStateBackend getStateBackend() {
return new HashMapStateBackend(useAsyncMode);
return new HashMapStateBackend();
}

@Override
Expand All @@ -82,7 +77,7 @@ protected CheckpointStorage getCheckpointStorage() throws Exception {

@Override
protected boolean supportsAsynchronousSnapshots() {
return useAsyncMode;
return true;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;

import org.junit.Rule;
Expand All @@ -39,12 +38,6 @@ public void testConfigOptionDefaultsToAsync() {
assertTrue(CheckpointingOptions.ASYNC_SNAPSHOTS.defaultValue());
}

@Test
public void testHashMapStateBackendDefaultToAsync() {
HashMapStateBackend backend = new HashMapStateBackend();
assertTrue(backend.isUsingAsynchronousSnapshots());
}

@Test
public void testFsStateBackendDefaultsToAsync() throws Exception {
FsStateBackend backend = new FsStateBackend(tmpFolder.newFolder().toURI());
Expand Down

0 comments on commit a31d0fe

Please sign in to comment.