Skip to content

Commit

Permalink
[FLINK-28336][python][format] Support parquet-avro format DataStream API
Browse files Browse the repository at this point in the history
This closes apache#20124.
  • Loading branch information
vancior98 authored and dianfu committed Jul 4, 2022
1 parent 94915d6 commit 75024ce
Show file tree
Hide file tree
Showing 10 changed files with 514 additions and 4 deletions.
31 changes: 30 additions & 1 deletion docs/content.zh/docs/connectors/datastream/formats/parquet.md
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ final DataStream<RowData> stream =

## Avro Records

Flink 支持三种方式来读取 Parquet 文件并创建 Avro records :
Flink 支持三种方式来读取 Parquet 文件并创建 Avro records (PyFlink 只支持 generic record)

- [Generic record](https://avro.apache.org/docs/1.10.0/api/java/index.html)
- [Specific record](https://avro.apache.org/docs/1.10.0/api/java/index.html)
Expand Down Expand Up @@ -163,6 +163,8 @@ Flink 会基于 JSON 字符串解析 Avro schema。也有很多其他的方式
请参考 [Avro Schema](https://avro.apache.org/docs/1.10.0/api/java/org/apache/avro/Schema.html) 以获取更多详细信息。
然后,你可以通过 `AvroParquetReaders` 为 Avro Generic 记录创建 `AvroParquetRecordFormat`

{{< tabs "GenericRecord" >}}
{{< tab "Java" >}}
```java
// 解析 avro schema
final Schema schema =
Expand All @@ -188,6 +190,33 @@ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEn
final DataStream<GenericRecord> stream =
env.fromSource(source, WatermarkStrategy.noWatermarks(), "file-source");
```
{{< /tab >}}
{{< tab "Python" >}}
```python
# 解析 avro schema
schema = Schema.parse_string("""
{
"type": "record",
"name": "User",
"fields": [
{"name": "name", "type": "string"},
{"name": "favoriteNumber", "type": ["int", "null"]},
{"name": "favoriteColor", "type": ["string", "null"]}
]
}
""")

source = FileSource.for_record_stream_format(
AvroParquetReaders.for_generic_record(schema), # file paths
).build()

env = StreamExecutionEnvironment.get_execution_environment()
env.enable_checkpointing(10)

stream = env.from_source(source, WatermarkStrategy.no_watermarks(), "file-source")
```
{{< /tab >}}
{{< /tabs >}}

### Specific record

Expand Down
31 changes: 30 additions & 1 deletion docs/content/docs/connectors/datastream/formats/parquet.md
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ final DataStream<RowData> stream =

## Avro Records

Flink supports producing three types of Avro records by reading Parquet files:
Flink supports producing three types of Avro records by reading Parquet files (Only generic record is supported in PyFlink):

- [Generic record](https://avro.apache.org/docs/1.10.0/api/java/index.html)
- [Specific record](https://avro.apache.org/docs/1.10.0/api/java/index.html)
Expand Down Expand Up @@ -166,6 +166,8 @@ In the following example, you will create a DataStream containing Parquet record
It will parse the Avro schema based on the JSON string. There are many other ways to parse a schema, e.g. from java.io.File or java.io.InputStream. Please refer to [Avro Schema](https://avro.apache.org/docs/1.10.0/api/java/org/apache/avro/Schema.html) for details.
After that, you will create an `AvroParquetRecordFormat` via `AvroParquetReaders` for Avro Generic records.

{{< tabs "GenericRecord" >}}
{{< tab "Java" >}}
```java
// parsing avro schema
final Schema schema =
Expand All @@ -191,6 +193,33 @@ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEn
final DataStream<GenericRecord> stream =
env.fromSource(source, WatermarkStrategy.noWatermarks(), "file-source");
```
{{< /tab >}}
{{< tab "Python" >}}
```python
# parsing avro schema
schema = Schema.parse_string("""
{
"type": "record",
"name": "User",
"fields": [
{"name": "name", "type": "string"},
{"name": "favoriteNumber", "type": ["int", "null"]},
{"name": "favoriteColor", "type": ["string", "null"]}
]
}
""")

source = FileSource.for_record_stream_format(
AvroParquetReaders.for_generic_record(schema), # file paths
).build()

env = StreamExecutionEnvironment.get_execution_environment()
env.enable_checkpointing(10)

stream = env.from_source(source, WatermarkStrategy.no_watermarks(), "file-source")
```
{{< /tab >}}
{{< /tabs >}}

### Specific record

Expand Down
23 changes: 23 additions & 0 deletions flink-formats/flink-sql-parquet/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,22 @@ under the License.
<artifactId>flink-parquet</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-avro</artifactId>
<version>${flink.format.parquet.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
</exclusion>
<exclusion>
<groupId>it.unimi.dsi</groupId>
<artifactId>fastutil</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>

<build>
Expand All @@ -57,6 +73,7 @@ under the License.
<artifactSet>
<includes>
<include>org.apache.flink:flink-parquet</include>
<include>org.apache.parquet:parquet-avro</include>
<include>org.apache.parquet:parquet-hadoop</include>
<include>org.apache.parquet:parquet-format</include>
<include>org.apache.parquet:parquet-column</include>
Expand All @@ -70,6 +87,12 @@ under the License.
<include>commons-codec:commons-codec</include>
</includes>
</artifactSet>
<relocations>
<relocation>
<pattern>org.apache.avro</pattern>
<shadedPattern>org.apache.flink.avro.shaded.org.apache.avro</shadedPattern>
</relocation>
</relocations>
</configuration>
</execution>
</executions>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ The Apache Software Foundation (http://www.apache.org/).

This project bundles the following dependencies under the Apache Software License 2.0. (http://www.apache.org/licenses/LICENSE-2.0.txt)

- org.apache.parquet:parquet-avro:1.12.2
- org.apache.parquet:parquet-hadoop:1.12.2
- org.apache.parquet:parquet-column:1.12.2
- org.apache.parquet:parquet-common:1.12.2
Expand Down
Loading

0 comments on commit 75024ce

Please sign in to comment.