Skip to content

Commit

Permalink
[FLINK-24912][state-processor-api] Update documentation for DataStrea…
Browse files Browse the repository at this point in the history
…m based API

Co-authored-by: Jun Qin <11677043+qinjunjerry@users.noreply.github.com>

This closes apache#18170
  • Loading branch information
sjwiesman committed Dec 28, 2021
1 parent 035ddf6 commit 3b54d27
Show file tree
Hide file tree
Showing 11 changed files with 119 additions and 84 deletions.
84 changes: 45 additions & 39 deletions docs/content.zh/docs/libs/state_processor_api.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,13 @@ specific language governing permissions and limitations
under the License.
-->


# State Processor API

Apache Flink's State Processor API provides powerful functionality to reading, writing, and modifying savepoints and checkpoints using Flink’s batch DataSet API.
Due to the [interoperability of DataSet and Table API](https://nightlies.apache.org/flink/flink-docs-master/dev/table/common.html#integration-with-datastream-and-dataset-api), you can even use relational Table API or SQL queries to analyze and process state data.
Apache Flink's State Processor API provides powerful functionality to reading, writing, and modifying savepoints and checkpoints using Flink’s DataStream API under `BATCH` execution.
Due to the [interoperability of DataStream and Table API]({{< ref "docs/dev/table/data_stream_api" >}}), you can even use relational Table API or SQL queries to analyze and process state data.

For example, you can take a savepoint of a running stream processing application and analyze it with a DataSet batch program to verify that the application behaves correctly.
For example, you can take a savepoint of a running stream processing application and analyze it with a DataStream batch program to verify that the application behaves correctly.
Or you can read a batch of data from any store, preprocess it, and write the result to a savepoint that you use to bootstrap the state of a streaming application.
It is also possible to fix inconsistent state entries.
Finally, the State Processor API opens up many ways to evolve a stateful application that was previously blocked by parameter and design choices that could not be changed without losing all the state of the application after it was started.
Expand All @@ -39,7 +40,7 @@ To get started with the state processor api, include the following library in yo

{{< artifact flink-state-processor-api >}}

## Mapping Application State to DataSets
## Mapping Application State to DataSets

The State Processor API maps the state of a streaming application to one or more data sets that can be processed separately.
In order to be able to use the API, you need to understand how this mapping works.
Expand All @@ -49,7 +50,7 @@ A Flink job is composed of operators; typically one or more source operators, a
Each operator runs in parallel in one or more tasks and can work with different types of state.
An operator can have zero, one, or more *“operator states”* which are organized as lists that are scoped to the operator's tasks.
If the operator is applied on a keyed stream, it can also have zero, one, or more *“keyed states”* which are scoped to a key that is extracted from each processed record.
You can think of keyed state as a distributed key-value map.
You can think of keyed state as a distributed key-value map.

The following figure shows the application “MyApp” which consists of three operators called “Src”, “Proc”, and “Snk”.
Src has one operator state (os1), Proc has one operator state (os2) and two keyed states (ks1, ks2) and Snk is stateless.
Expand Down Expand Up @@ -77,8 +78,8 @@ Reading state begins by specifying the path to a valid savepoint or checkpoint a
The compatibility guarantees for restoring state are identical to those when restoring a `DataStream` application.

```java
ExecutionEnvironment bEnv = ExecutionEnvironment.getExecutionEnvironment();
ExistingSavepoint savepoint = Savepoint.load(bEnv, "hdfs://path/", new HashMapStateBackend());
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
SavepointReader savepoint = SavepointReader.read(env, "hdfs://path/", new HashMapStateBackend());
```


Expand All @@ -94,7 +95,7 @@ Operator state stored in a `CheckpointedFunction` using `getListState` can be re
The state name and type information should match those used to define the `ListStateDescriptor` that declared this state in the DataStream application.

```java
DataSet<Integer> listState = savepoint.readListState<>(
DataStream<Integer> listState = savepoint.readListState<>(
"my-uid",
"list-state",
Types.INT);
Expand All @@ -107,7 +108,7 @@ The state name and type information should match those used to define the `ListS
The framework will return a _single_ copy of the state, equivalent to restoring a DataStream with parallelism 1.

```java
DataSet<Integer> listState = savepoint.readUnionState<>(
DataStream<Integer> listState = savepoint.readUnionState<>(
"my-uid",
"union-state",
Types.INT);
Expand All @@ -120,7 +121,7 @@ The state name and type information should match those used to define the `MapSt
The framework will return a _single_ copy of the state, equivalent to restoring a DataStream with parallelism 1.

```java
DataSet<Tuple2<Integer, Integer>> broadcastState = savepoint.readBroadcastState<>(
DataStream<Tuple2<Integer, Integer>> broadcastState = savepoint.readBroadcastState<>(
"my-uid",
"broadcast-state",
Types.INT,
Expand All @@ -129,10 +130,10 @@ DataSet<Tuple2<Integer, Integer>> broadcastState = savepoint.readBroadcastState<

#### Using Custom Serializers

Each of the operator state readers support using custom `TypeSerializers` if one was used to define the `StateDescriptor` that wrote out the state.
Each of the operator state readers support using custom `TypeSerializers` if one was used to define the `StateDescriptor` that wrote out the state.

```java
DataSet<Integer> listState = savepoint.readListState<>(
DataStream<Integer> listState = savepoint.readListState<>(
"uid",
"list-state",
Types.INT,
Expand Down Expand Up @@ -171,10 +172,10 @@ public class StatefulFunctionWithTime extends KeyedProcessFunction<Integer, Inte
}
```

Then it can read by defining an output type and corresponding `KeyedStateReaderFunction`.
Then it can read by defining an output type and corresponding `KeyedStateReaderFunction`.

```java
DataSet<KeyedState> keyedState = savepoint.readKeyedState("my-uid", new ReaderFunction());
DataStream<KeyedState> keyedState = savepoint.readKeyedState("my-uid", new ReaderFunction());

public class KeyedState {
public int key;
Expand Down Expand Up @@ -290,7 +291,11 @@ class ClickState {
class ClickReader extends WindowReaderFunction<Integer, ClickState, String, TimeWindow> {

@Override
public void readWindow(String key, Context<TimeWindow> context, Iterable<Integer> elements, Collector<ClickState> out) {
public void readWindow(
String key,
Context<TimeWindow> context,
Iterable<Integer> elements,
Collector<ClickState> out) {
ClickState state = new ClickState();
state.userId = key;
state.count = elements.iterator().next();
Expand All @@ -301,8 +306,8 @@ class ClickReader extends WindowReaderFunction<Integer, ClickState, String, Time
}
}

ExecutionEnvironment batchEnv = ExecutionEnvironment.getExecutionEnvironment();
ExistingSavepoint savepoint = Savepoint.load(batchEnv, "hdfs://checkpoint-dir", new HashMapStateBackend());
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
SavepointReader savepoint = SavepointReader.read(env, "hdfs://checkpoint-dir", new HashMapStateBackend());

savepoint
.window(TumblingEventTimeWindows.of(Time.minutes(1)))
Expand All @@ -317,19 +322,23 @@ Additionally, trigger state - from `CountTrigger`s or custom triggers - can be r
## Writing New Savepoints

`Savepoint`'s may also be written, which allows such use cases as bootstrapping state based on historical data.
Each savepoint is made up of one or more `BootstrapTransformation`'s (explained below), each of which defines the state for an individual operator.
Each savepoint is made up of one or more `StateBootstrapTransformation`'s (explained below), each of which defines the state for an individual operator.

{{< hint info >}}
When using the `SavepointWriter`, your application must be executed under [BATCH]({{< ref "docs/dev/datastream/execution_mode" >}}) execution.
{{< /hint >}}

{{< hint info >}}
**Note** The state processor api does not currently provide a Scala API. As a result
it will always auto-derive serializers using the Java type stack. To bootstrap
it will always auto-derive serializers using the Java type stack. To bootstrap
a savepoint for the Scala DataStream API please manually pass in all type information.
{{< /hint >}}

```java
int maxParallelism = 128;

Savepoint
.create(new HashMapStateBackend(), maxParallelism)
SavepointWriter
.newSavepoint(new HashMapStateBackend(), maxParallelism)
.withOperator("uid1", transformation1)
.withOperator("uid2", transformation2)
.write(savepointPath);
Expand All @@ -339,7 +348,7 @@ The [UIDs]({{< ref "docs/ops/state/savepoints" >}}#assigning-operator-ids) assoc

### Operator State

Simple operator state, using `CheckpointedFunction`, can be created using the `StateBootstrapFunction`.
Simple operator state, using `CheckpointedFunction`, can be created using the `StateBootstrapFunction`.

```java
public class SimpleBootstrapFunction extends StateBootstrapFunction<Integer> {
Expand All @@ -361,17 +370,17 @@ public class SimpleBootstrapFunction extends StateBootstrapFunction<Integer> {
}
}

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Integer> data = env.fromElements(1, 2, 3);
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Integer> data = env.fromElements(1, 2, 3);

BootstrapTransformation transformation = OperatorTransformation
StateBootstrapTransformation transformation = OperatorTransformation
.bootstrapWith(data)
.transform(new SimpleBootstrapFunction());
```

### Broadcast State

[BroadcastState]({{< ref "docs/dev/datastream/fault-tolerance/broadcast_state" >}}) can be written using a `BroadcastStateBootstrapFunction`. Similar to broadcast state in the `DataStream` API, the full state must fit in memory.
[BroadcastState]({{< ref "docs/dev/datastream/fault-tolerance/broadcast_state" >}}) can be written using a `BroadcastStateBootstrapFunction`. Similar to broadcast state in the `DataStream` API, the full state must fit in memory.

```java
public class CurrencyRate {
Expand All @@ -391,10 +400,10 @@ public class CurrencyBootstrapFunction extends BroadcastStateBootstrapFunction<C
}
}

DataSet<CurrencyRate> currencyDataSet = bEnv.fromCollection(
DataStream<CurrencyRate> currencyDataSet = env.fromCollection(
new CurrencyRate("USD", 1.0), new CurrencyRate("EUR", 1.3));

BootstrapTransformation<CurrencyRate> broadcastTransformation = OperatorTransformation
StateBootstrapTransformation<CurrencyRate> broadcastTransformation = OperatorTransformation
.bootstrapWith(currencyDataSet)
.transform(new CurrencyBootstrapFunction());
```
Expand Down Expand Up @@ -427,11 +436,11 @@ public class AccountBootstrapper extends KeyedStateBootstrapFunction<Integer, Ac
}
}

ExecutionEnvironment bEnv = ExecutionEnvironment.getExecutionEnvironment();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataSet<Account> accountDataSet = bEnv.fromCollection(accounts);
DataStream<Account> accountDataSet = env.fromCollection(accounts);

BootstrapTransformation<Account> transformation = OperatorTransformation
StateBootstrapTransformation<Account> transformation = OperatorTransformation
.bootstrapWith(accountDataSet)
.keyBy(acc -> acc.id)
.transform(new AccountBootstrapper());
Expand All @@ -458,15 +467,12 @@ public class Account {
public long timestamp;
}

ExecutionEnvironment bEnv = ExecutionEnvironment.getExecutionEnvironment();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataSet<Account> accountDataSet = bEnv.fromCollection(accounts);
DataStream<Account> accountDataSet = env.fromCollection(accounts);

BootstrapTransformation<Account> transformation = OperatorTransformation
StateBootstrapTransformation<Account> transformation = OperatorTransformation
.bootstrapWith(accountDataSet)
// When using event time windows, it is important
// to assign timestamps to each record.
.assignTimestamps(account -> account.timestamp)
.keyBy(acc -> acc.id)
.window(TumblingEventTimeWindows.of(Time.minutes(5)))
.reduce((left, right) -> left + right);
Expand All @@ -477,8 +483,8 @@ BootstrapTransformation<Account> transformation = OperatorTransformation
Besides creating a savepoint from scratch, you can base one off an existing savepoint such as when bootstrapping a single new operator for an existing job.

```java
Savepoint
.load(bEnv, new HashMapStateBackend(), oldPath)
SavepointWriter
.fromExistingSavepoint(env, new HashMapStateBackend(), oldPath)
.withOperator("uid", transformation)
.write(newPath);
```
Loading

0 comments on commit 3b54d27

Please sign in to comment.