Skip to content

Commit

Permalink
[FLINK-28140][python][docs] Improve the documentation by adding Pytho…
Browse files Browse the repository at this point in the history
…n examples in DataStream API Integration page

This closes apache#20121.
  • Loading branch information
pengmide authored and dianfu committed Jul 1, 2022
1 parent 2f4f462 commit ce11ce3
Show file tree
Hide file tree
Showing 2 changed files with 600 additions and 0 deletions.
300 changes: 300 additions & 0 deletions docs/content.zh/docs/dev/table/data_stream_api.md
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,36 @@ val tableEnv = StreamTableEnvironment.create(env)
// +U[Alice, 112]
```
{{< /tab >}}
{{< tab "Python" >}}
```python
from pyflink.datastream import StreamExecutionEnvironment, RuntimeExecutionMode
from pyflink.table import StreamTableEnvironment

# setup DataStream API
env = StreamExecutionEnvironment.get_execution_environment()

# set the batch runtime mode
env.set_runtime_mode(RuntimeExecutionMode.BATCH)

# uncomment this for streaming mode
# env.set_runtime_mode(RuntimeExecutionMode.STREAMING)

# setup Table API
# the table environment adopts the runtime mode during initialization
table_env = StreamTableEnvironment.create(env)

# define the same pipeline as above
# prints in BATCH mode:
# +I[Bob, 10]
# +I[Alice, 112]

# prints in STREAMING mode:
# +I[Alice, 12]
# +I[Bob, 10]
# -U[Alice, 12]
# +U[Alice, 112]
```
{{< /tab >}}
{{< /tabs >}}

Once the changelog is applied to an external system (e.g. a key-value store), one can see that both
Expand Down Expand Up @@ -467,6 +497,15 @@ import org.apache.flink.table.api._
import org.apache.flink.table.api.bridge.scala._
```
{{< /tab >}}
{{< tab "Python" >}}
```python
# imports for Python DataStream API
from pyflink.datastream import *

# imports for Table API to Python DataStream API
from pyflink.table import *
```
{{< /tab >}}
{{< /tabs >}}

请查阅[配置]({{< ref "docs/dev/configuration/overview" >}})小节了解更多细节。
Expand Down Expand Up @@ -598,6 +637,8 @@ declare a final sink. Both `TableEnvironment` and also `StreamTableEnvironment`
general `execute()` method. Instead, they offer methods for submitting a single source-to-sink
pipeline or a statement set:

{{< tabs "47a32814-abea-11eb-8529-0242ac133403" >}}
{{< tab "Java" >}}
```java
// execute with explicit sink
tableEnv.from("InputTable").insertInto("OutputTable").execute();
Expand All @@ -620,13 +661,40 @@ tableEnv.from("InputTable").execute().print();

tableEnv.executeSql("SELECT * FROM InputTable").print();
```
{{< /tab >}}
{{< tab "Python" >}}
```python
# execute with explicit sink
table_env.from_path("input_table").execute_insert("output_table")

table_env.execute_sql("INSERT INTO output_table SELECT * FROM input_table")

table_env.create_statement_set() \
.add_insert("output_table", input_table) \
.add_insert("output_table2", input_table) \
.execute()

table_env.create_statement_set() \
.add_insert_sql("INSERT INTO output_table SELECT * FROM input_table") \
.add_insert_sql("INSERT INTO output_table2 SELECT * FROM input_table") \
.execute()

# execute with implicit local sink
table_env.from_path("input_table").execute().print()

table_env.execute_sql("SELECT * FROM input_table").print()
```
{{< /tab >}}
{{< /tabs >}}

To combine both execution behaviors, every call to `StreamTableEnvironment.toDataStream`
or `StreamTableEnvironment.toChangelogStream` will materialize (i.e. compile) the Table API sub-pipeline
and insert it into the DataStream API pipeline builder. This means that `StreamExecutionEnvironment.execute()`
or `DataStream.executeAndCollect` must be called afterwards. An execution in Table API will not trigger
these "external parts".

{{< tabs "47a32814-abea-11eb-8529-0242ac133504" >}}
{{< tab "Java" >}}
```java
// (1)

Expand All @@ -643,6 +711,25 @@ table.execute().print();
// Flink job, (2) was already running before
env.execute();
```
{{< /tab >}}
{{< tab "Python" >}}
```python
# (1)

# adds a branch with a printing sink to the StreamExecutionEnvironment
table_env.to_data_stream(table).print()

# (2)
# executes a Table API end-to-end pipeline as a Flink job and prints locally,
# thus (1) has still not been executed
table.execute().print()

# executes the DataStream API pipeline with the sink defined in (1) as a
# Flink job, (2) was already running before
env.execute()
```
{{< /tab >}}
{{< /tabs >}}

{{< top >}}

Expand Down Expand Up @@ -705,6 +792,24 @@ val env = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv = StreamTableEnvironment.create(env, EnvironmentSettings.inBatchMode)
```
{{< /tab >}}
{{< tab "Python" >}}
```python
from pyflink.datastream import StreamExecutionEnvironment, RuntimeExecutionMode
from pyflink.table import EnvironmentSettings, StreamTableEnvironment

# adopt mode from StreamExecutionEnvironment
env = StreamExecutionEnvironment.get_execution_environment()
env.set_runtime_mode(RuntimeExecutionMode.BATCH)
table_env = StreamTableEnvironment.create(env)

# or

# set mode explicitly for StreamTableEnvironment
# it will be propagated to StreamExecutionEnvironment during planning
env = StreamExecutionEnvironment.get_execution_environment()
table_env = StreamTableEnvironment.create(env, EnvironmentSettings.in_batch_mode())
```
{{< /tab >}}
{{< /tabs >}}

One must meet the following prerequisites before setting the runtime mode to `BATCH`:
Expand Down Expand Up @@ -808,6 +913,35 @@ tableEnv.toDataStream(table)
// ...
```
{{< /tab >}}
{{< tab "Python" >}}
```python
from pyflink.table import TableDescriptor, Schema, DataTypes

table = table_env.from_descriptor(
TableDescriptor.for_connector("datagen")
.option("number-of-rows", "10")
.schema(
Schema.new_builder()
.column("uid", DataTypes.TINYINT())
.column("payload", DataTypes.STRING())
.build())
.build())

# convert the Table to a DataStream and further transform the pipeline
collect = table_env.to_data_stream(table) \
.key_by(lambda r: r[0]) \
.map(lambda r: "My custom operator: " + r[1]) \
.execute_and_collect()

for c in collect:
print(c)

# prints:
# My custom operator: 9660912d30a43c7b035e15bd...
# My custom operator: 29f5f706d2144f4a4f9f52a0...
# ...
```
{{< /tab >}}
{{< /tabs >}}

### Changelog Unification
Expand All @@ -824,6 +958,8 @@ in the resulting changelog stream. The example joins two tables in SQL (`UserTab
an interval join based on the time attributes in both tables (`ts`). It uses DataStream API to implement
a custom operator that deduplicates the user name using a `KeyedProcessFunction` and value state.

{{< tabs "3f5f5d4e-cd03-48d1-9309-917a6cf66aba" >}}
{{< tab "Java" >}}
```java
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.state.ValueState;
Expand Down Expand Up @@ -940,6 +1076,107 @@ env.execute();
// Bob
// Alice
```
{{< /tab >}}
{{< tab "Python" >}}
```python
from datetime import datetime
from pyflink.common import Row, Types
from pyflink.datastream import StreamExecutionEnvironment, RuntimeExecutionMode,
KeyedProcessFunction, RuntimeContext
from pyflink.datastream.state import ValueStateDescriptor
from pyflink.table import StreamTableEnvironment, Schema, DataTypes

# setup DataStream API
env = StreamExecutionEnvironment.get_execution_environment()

# use BATCH or STREAMING mode
env.set_runtime_mode(RuntimeExecutionMode.BATCH)

# setup Table API
table_env = StreamTableEnvironment.create(env)

# create a user stream
t_format = "%Y-%m-%dT%H:%M:%S"
user_stream = env.from_collection(
[Row(datetime.strptime("2021-08-21T13:00:00", t_format), 1, "Alice"),
Row(datetime.strptime("2021-08-21T13:05:00", t_format), 2, "Bob"),
Row(datetime.strptime("2021-08-21T13:10:00", t_format), 2, "Bob")],
type_info=Types.ROW_NAMED(["ts1", "uid", "name"],
[Types.SQL_TIMESTAMP(), Types.INT(), Types.STRING()]))

# create an order stream
order_stream = env.from_collection(
[Row(datetime.strptime("2021-08-21T13:02:00", t_format), 1, 122),
Row(datetime.strptime("2021-08-21T13:07:00", t_format), 2, 239),
Row(datetime.strptime("2021-08-21T13:11:00", t_format), 2, 999)],
type_info=Types.ROW_NAMED(["ts1", "uid", "amount"],
[Types.SQL_TIMESTAMP(), Types.INT(), Types.INT()]))

# # create corresponding tables
table_env.create_temporary_view(
"user_table",
user_stream,
Schema.new_builder()
.column_by_expression("ts", "CAST(ts1 AS TIMESTAMP(3))")
.column("uid", DataTypes.INT())
.column("name", DataTypes.STRING())
.watermark("ts", "ts - INTERVAL '1' SECOND")
.build())

table_env.create_temporary_view(
"order_table",
order_stream,
Schema.new_builder()
.column_by_expression("ts", "CAST(ts1 AS TIMESTAMP(3))")
.column("uid", DataTypes.INT())
.column("amount", DataTypes.INT())
.watermark("ts", "ts - INTERVAL '1' SECOND")
.build())

# perform interval join
joined_table = table_env.sql_query(
"SELECT U.name, O.amount " +
"FROM user_table U, order_table O " +
"WHERE U.uid = O.uid AND O.ts BETWEEN U.ts AND U.ts + INTERVAL '5' MINUTES")

joined_stream = table_env.to_data_stream(joined_table)

joined_stream.print()

# implement a custom operator using ProcessFunction and value state
class MyProcessFunction(KeyedProcessFunction):

def __init__(self):
self.seen = None

def open(self, runtime_context: RuntimeContext):
state_descriptor = ValueStateDescriptor("seen", Types.STRING())
self.seen = runtime_context.get_state(state_descriptor)

def process_element(self, value, ctx):
name = value[0]
if self.seen.value() is None:
self.seen.update(name)
yield name

joined_stream \
.key_by(lambda r: r[0]) \
.process(MyProcessFunction()) \
.print()

# execute unified pipeline
env.execute()

# prints (in both BATCH and STREAMING mode):
# +I[Bob, 239]
# +I[Alice, 122]
# +I[Bob, 999]
#
# Bob
# Alice
```
{{< /tab >}}
{{< /tabs >}}

{{< top >}}

Expand Down Expand Up @@ -1422,6 +1659,9 @@ data structures. The following example in Java shows what is possible. Check als
[Data Types & Serialization]({{< ref "docs/dev/datastream/fault-tolerance/serialization/types_serialization" >}}) page of
the DataStream API for more information about the supported types there.


{{< tabs "079cdf25-21ef-4393-ad69-623510038b1b" >}}
{{< tab "Java" >}}
```java
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.table.api.DataTypes;
Expand Down Expand Up @@ -1494,6 +1734,13 @@ table.printSchema();
// `user` *User<`name` STRING,`score` INT>*
// )
```
{{< /tab >}}
{{< tab "Python" >}}
```python
Custom PoJo Class is unsupported in PyFlink now.
```
{{< /tab >}}
{{< /tabs >}}

### Examples for `createTemporaryView`

Expand Down Expand Up @@ -2641,6 +2888,59 @@ env.execute()
// +I[3]
```
{{< /tab >}}
{{< tab "Python" >}}
```python
from pyflink.common import Encoder
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors import StreamingFileSink
from pyflink.table import StreamTableEnvironment, TableDescriptor, Schema, DataTypes

env = StreamExecutionEnvironment.get_execution_environment()
table_env = StreamTableEnvironment.create(env)

statement_set = table_env.create_statement_set()

# create some source
source_descriptor = TableDescriptor.for_connector("datagen") \
.option("number-of-rows", "3") \
.schema(
Schema.new_builder()
.column("my_col", DataTypes.INT())
.column("my_other_col", DataTypes.BOOLEAN())
.build()) \
.build()

# create some sink
sink_descriptor = TableDescriptor.for_connector("print").build()

# add a pure Table API pipeline
table_from_source = table_env.from_descriptor(source_descriptor)
statement_set.add_insert(sink_descriptor, table_from_source)


# use table sinks for the DataStream API pipeline
data_stream = env.from_collection([1, 2, 3])
table_from_stream = table_env.from_data_stream(data_stream)
statement_set.add_insert(sink_descriptor, table_from_stream)

# define other DataStream API parts
env.from_collection([4, 5, 6])
.add_sink(StreamingFileSink
.for_row_format('/tmp/output', Encoder.simple_string_encoder())
.build())

# use DataStream API to submit the pipelines
env.execute()

# prints similar to:
# +I[1618440447, false]
# +I[1259693645, true]
# +I[158588930, false]
# +I[1]
# +I[2]
# +I[3]
```
{{< /tab >}}
{{< /tabs >}}

{{< top >}}
Expand Down
Loading

0 comments on commit ce11ce3

Please sign in to comment.