From 664b29d62e658415a3ca8fe37de2d2b88251b1e4 Mon Sep 17 00:00:00 2001 From: Michael Li Date: Mon, 26 Apr 2021 14:05:28 +0800 Subject: [PATCH] [FLINK-18199][doc] translate FileSystem SQL Connector page into chinese This closes #13459 --- .../docs/connectors/table/filesystem.md | 234 +++++++++--------- 1 file changed, 115 insertions(+), 119 deletions(-) diff --git a/docs/content.zh/docs/connectors/table/filesystem.md b/docs/content.zh/docs/connectors/table/filesystem.md index b3dfeb2bfcc79..bbb08ff4c1d9f 100644 --- a/docs/content.zh/docs/connectors/table/filesystem.md +++ b/docs/content.zh/docs/connectors/table/filesystem.md @@ -1,5 +1,5 @@ --- -title: FileSystem +title: 文件系统 weight: 8 type: docs aliases: @@ -24,15 +24,13 @@ specific language governing permissions and limitations under the License. --> -# FileSystem SQL Connector +# 文件系统 SQL 连接器 -This connector provides access to partitioned files in filesystems -supported by the [Flink FileSystem abstraction]({{< ref "docs/deployment/filesystems/overview" >}}). +该连接器提供了对 [Flink 文件系统抽象]({{< ref "docs/deployment/filesystems/overview" >}}) 支持的文件系统中的分区文件的访问. -The file system connector itself is included in Flink and does not require an additional dependency. -A corresponding format needs to be specified for reading and writing rows from and to a file system. +文件系统连接器本身就被包括在 Flink 中,不需要任何额外的依赖。当从文件系统中读取或向文件系统写入记录时,需要指定相应的记录格式。 -The file system connector allows for reading and writing from a local or distributed filesystem. A filesystem table can be defined as: +文件系统连接器支持对本地文件系统或分布式文件系统的读取和写入。 可以通过如下方式定义文件系统表: ```sql CREATE TABLE MyUserTable ( @@ -42,37 +40,30 @@ CREATE TABLE MyUserTable ( part_name1 INT, part_name2 STRING ) PARTITIONED BY (part_name1, part_name2) WITH ( - 'connector' = 'filesystem', -- required: specify the connector - 'path' = 'file:///path/to/whatever', -- required: path to a directory - 'format' = '...', -- required: file system connector requires to specify a format, - -- Please refer to Table Formats - -- section for more details - 'partition.default-name' = '...', -- optional: default partition name in case the dynamic partition - -- column value is null/empty string - - -- optional: the option to enable shuffle data by dynamic partition fields in sink phase, this can greatly - -- reduce the number of file for filesystem sink but may lead data skew, the default value is false. - 'sink.shuffle-by-partition.enable' = '...', + 'connector' = 'filesystem', -- 必选: 指定连接器类型 + 'path' = 'file:///path/to/whatever', -- 必选: 指向目录的路径 + 'format' = '...', -- 必选: 文件系统连接器需要指定格式,请查阅 表格式 部分以获取更多细节 + 'partition.default-name' = '...', -- 可选: 动态分区模式下分区字段值是 null 或空字符串时,默认的分区名。 + 'sink.shuffle-by-partition.enable' = '...', -- 可选: 该选项开启了在 sink 阶段通过动态分区字段来 shuffle 数据,该功能可以大大减少文件系统 sink 的文件数,但可能会导致数据倾斜,默认值是 false. ... ) ``` {{< hint info >}} -Make sure to include [Flink File System specific dependencies]({{< ref "docs/deployment/filesystems/overview" >}}). +需要确保包含以下依赖 [Flink File System specific dependencies]({{< ref "docs/deployment/filesystems/overview" >}}). {{< /hint >}} {{< hint info >}} -File system sources for streaming is still under development. In the future, the community will add support for common streaming use cases, i.e., partition and directory monitoring. +针对流的文件系统 sources 目前还在开发中。 将来,社区会不断添加对常见的流处理场景的支持, 比如对分区和目录的检测等。 {{< /hint >}} {{< hint warning >}} -The behaviour of file system connector is much different from `previous legacy filesystem connector`: -the path parameter is specified for a directory not for a file and you can't get a human-readable file in the path that you declare. +新版的文件系统连接器和旧版的文件系统连接器有很大不同:path 参数指定的是一个目录而不是一个文件,该目录下文件的格式也不是肉眼可读的。 {{< /hint >}} -## Partition Files +## 分区文件 -Flink's file system partition support uses the standard hive format. However, it does not require partitions to be pre-registered with a table catalog. Partitions are discovered and inferred based on directory structure. For example, a table partitioned based on the directory below would be inferred to contain `datetime` and `hour` partitions. +Flink 的文件系统连接器在对分区的支持上,使用了标准的 hive 格式。 不过,它不需要预先注册分区,而是基于目录结构自动做了分区发现。比如,以下目录结构的表, 会被自动推导为包含 `datetime` 和 `hour` 分区的分区表。 ``` path @@ -87,35 +78,34 @@ path ├── part-0.parquet ``` -The file system table supports both partition inserting and overwrite inserting. See [INSERT Statement]({{< ref "docs/dev/table/sql/insert" >}}). When you insert overwrite to a partitioned table, only the corresponding partition will be overwritten, not the entire table. +文件系统连接器支持分区新增插入和分区覆盖插入。 参见 [INSERT Statement]({{< ref "docs/dev/table/sql/insert" >}}). 当对分区表进行分区覆盖插入时,只有相应的分区会被覆盖,而不是整个表。 -## File Formats +## 文件格式 -The file system connector supports multiple formats: +文件系统连接器支持多种格式: - - CSV: [RFC-4180](https://tools.ietf.org/html/rfc4180). Uncompressed. - - JSON: Note JSON format for file system connector is not a typical JSON file but uncompressed [newline delimited JSON](http://jsonlines.org/). - - Avro: [Apache Avro](http://avro.apache.org). Support compression by configuring `avro.codec`. - - Parquet: [Apache Parquet](http://parquet.apache.org). Compatible with Hive. - - Orc: [Apache Orc](http://orc.apache.org). Compatible with Hive. + - CSV: [RFC-4180](https://tools.ietf.org/html/rfc4180). 非压缩格式。 + - JSON: 注意文件系统连接器中的 JSON 不是传统的标准的 JSON 格式,而是非压缩的 [newline delimited JSON](http://jsonlines.org/). + - Avro: [Apache Avro](http://avro.apache.org). 可以通过配置 `avro.codec` 支持压缩. + - Parquet: [Apache Parquet](http://parquet.apache.org). 与 Hive 兼容. + - Orc: [Apache Orc](http://orc.apache.org). 与 Hive 兼容. - Debezium-JSON: [debezium-json]({{< ref "docs/connectors/table/formats/debezium" >}}). - Canal-JSON: [canal-json]({{< ref "docs/connectors/table/formats/canal" >}}). - Raw: [raw]({{< ref "docs/connectors/table/formats/raw" >}}). -## Streaming Sink +## 流式 Sink -The file system connector supports streaming writes, based on Flink's [Streaming File Sink]({{< ref "docs/connectors/datastream/streamfile_sink" >}}) -to write records to file. Row-encoded Formats are csv and json. Bulk-encoded Formats are parquet, orc and avro. +文件系统连接器支持流式的写, 它基于 Flink 的 [Streaming File Sink]({{< ref "docs/connectors/datastream/streamfile_sink" >}}) +将记录写入文件。按行编码的格式支持 csv 和 json。 按块编码的格式支持 parquet, orc 和 avro。 -You can write SQL directly, insert the stream data into the non-partitioned table. -If it is a partitioned table, you can configure partition related operations. See [Partition Commit](filesystem.html#partition-commit) for details. +你可以直接编写 SQL,把流数据插入到非分区表。 +如果是分区表,可以配置分区操作相关的参数,参见 [分区提交](#分区提交) 以查阅更多细节. -### Rolling Policy +### 滚动策略 -Data within the partition directories are split into part files. Each partition will contain at least one part file for -each subtask of the sink that has received data for that partition. The in-progress part file will be closed and additional -part file will be created according to the configurable rolling policy. The policy rolls part files based on size, -a timeout that specifies the maximum duration for which a file can be open. +分区目录下的数据被分割到分区文件中。每个分区对应的sink的每个接受到了数据的子任务都至少会为该分区生成一个分区文件。 +根据可配置的滚动策略,当前正在写入的分区文件会被关闭,新的分区文件也会被生成。 +该策略基于大小,和指定的文件可被打开的最大 timeout 时长,来滚动分区文件。 @@ -131,33 +121,31 @@ a timeout that specifies the maximum duration for which a file can be open. - + - + - +
sink.rolling-policy.file-size
128MB MemorySizeThe maximum part file size before rolling. 滚动前,分区文件最大大小.
sink.rolling-policy.rollover-interval
30 min DurationThe maximum time duration a part file can stay open before rolling (by default 30 min to avoid to many small files). - The frequency at which this is checked is controlled by the 'sink.rolling-policy.check-interval' option. 滚动前,分区文件处于打开状态的最大时长 (默认值是30分钟,以避免产生大量小文件)。 检查该选项的频率由参数 'sink.rolling-policy.check-interval' 控制。
sink.rolling-policy.check-interval
1 min DurationThe interval for checking time based rolling policies. This controls the frequency to check whether a part file should rollover based on 'sink.rolling-policy.rollover-interval'. 基于时间的滚动策略的检查间隔。该参数控制了基于参数 'sink.rolling-policy.rollover-interval' 检查分区文件是否该被滚动的检查频率 .
-**NOTE:** For bulk formats (parquet, orc, avro), the rolling policy in combination with the checkpoint interval(pending files -become finished on the next checkpoint) control the size and number of these parts. +**注意:** 对于 bulk 格式 (parquet, orc, avro), 滚动策略和检查点间隔控制了分区文件的大小和个数 (未完成的文件会在下个检查点完成). -**NOTE:** For row formats (csv, json), you can set the parameter `sink.rolling-policy.file-size` or `sink.rolling-policy.rollover-interval` in the connector properties and parameter `execution.checkpointing.interval` in flink-conf.yaml together -if you don't want to wait a long period before observe the data exists in file system. For other formats (avro, orc), you can just set parameter `execution.checkpointing.interval` in flink-conf.yaml. +**注意:** 对于行格式 (csv, json), 如果想使得分区文件更快地在文件系统中可见,可以设置连接器参数 `sink.rolling-policy.file-size` 或 `sink.rolling-policy.rollover-interval` ,以及 flink-conf.yaml 中的 `execution.checkpointing.interval` 。 +对于其他格式 (avro, orc), 可以只设置 flink-conf.yaml 中的 `execution.checkpointing.interval` 。 -### File Compaction +### 文件合并 -The file sink supports file compactions, which allows applications to have smaller checkpoint intervals without generating a large number of files. +file sink 支持文件合并,以允许应用程序可以使用较小的检查点间隔而不产生大量文件。 @@ -173,35 +161,36 @@ The file sink supports file compactions, which allows applications to have small - + - +
auto-compaction
false BooleanWhether to enable automatic compaction in streaming sink or not. The data will be written to temporary files. After the checkpoint is completed, the temporary files generated by a checkpoint will be compacted. The temporary files are invisible before compaction. 在流式 sink 中是否开启自动合并功能。数据首先会被写入到临时文件,在检查点完成后,该检查点产生的临时文件会被合并。这些临时文件在合并前不可见.
compaction.file-size
(none) MemorySizeThe compaction target file size, the default value is the rolling file size. 合并目标文件大小,默认值是滚动文件大小.
-If enabled, file compaction will merge multiple small files into larger files based on the target file size. -When running file compaction in production, please be aware that: -- Only files in a single checkpoint are compacted, that is, at least the same number of files as the number of checkpoints is generated. -- The file before merging is invisible, so the visibility of the file may be: checkpoint interval + compaction time. -- If the compaction takes too long, it will backpressure the job and extend the time period of checkpoint. +启用该参数后,文件合并功能会根据设定的目标文件大小,合并多个小文件到大文件。 +当在生产环境使用文件合并功能时,需要注意: +- 只有检查点内部的文件才会被合并,也就是说,至少会生成跟检查点个数一样多的文件。 +- 合并前文件是可见的,所以文件的可见性是:检查点间隔 + 合并时长。 +- 如果合并花费的时间很长,会对作业产生反压,延长检查点所需时间。 -### Partition Commit +### 分区提交 + -After writing a partition, it is often necessary to notify downstream applications. For example, add the partition to a Hive metastore or writing a `_SUCCESS` file in the directory. The file system sink contains a partition commit feature that allows configuring custom policies. Commit actions are based on a combination of `triggers` and `policies`. +分区数据写完毕后,经常需要通知下游应用。比如,在 Hive metastore 中新增分区或者在目录下新增 `_SUCCESS` 文件。 分区提交策略是可定制的,具体的分区提交行为是基于 `triggers` 和 `policies` 的组合. -- Trigger: The timing of the commit of the partition can be determined by the watermark with the time extracted from the partition, or by processing time. -- Policy: How to commit a partition, built-in policies support for the commit of success files and metastore, you can also implement your own policies, such as triggering hive's analysis to generate statistics, or merging small files, etc. +- Trigger: 分区提交的时机,可以基于从分区中提取的时间对应的水印,或者基于处理时间。 +- Policy: 分区提交策略,内置的策略包括提交 `_SUCCESS` 文件和 hive metastore, 也可以自己定制提交策略, 比如触发 hive 生成统计信息,合并小文件等。 -**NOTE:** Partition Commit only works in dynamic partition inserting. +**注意:** 分区提交只有在动态分区插入模式下才有效。 -#### Partition commit trigger +#### 分区提交触发器 -To define when to commit a partition, providing partition commit trigger: +通过配置分区提交的触发策略,来配置何时提交分区: @@ -217,53 +206,53 @@ To define when to commit a partition, providing partition commit trigger: - + - + - +
sink.partition-commit.trigger
process-time StringTrigger type for partition commit: 'process-time': based on the time of the machine, it neither requires partition time extraction nor watermark generation. Commit partition once the 'current system time' passes 'partition creation system time' plus 'delay'. 'partition-time': based on the time that extracted from partition values, it requires watermark generation. Commit partition once the 'watermark' passes 'time extracted from partition values' plus 'delay'.分区提交触发器类型。 + 'process-time': 基于机器时间,既不需要分区时间提取器也不需要水印生成器,一旦 ”当前系统时间“ 超过了 “分区创建系统时间” 和 'sink.partition-commit.delay' 之和,就提交分区; + 'partition-time': 基于从分区字段提取的时间,需要水印生成器,一旦 “水印” 超过了 ”从分区字段提取的时间“ 和 'sink.partition-commit.delay' 之和,就提交分区.
sink.partition-commit.delay
0 s DurationThe partition will not commit until the delay time. If it is a daily partition, should be '1 d', if it is a hourly partition, should be '1 h'.该延迟时间之前分区不会被提交。如果是按天的分区,应配置为 '1 d', 如果是按小时的分区,应配置为 '1 h'.
sink.partition-commit.watermark-time-zone
UTC StringThe time zone to parse the long watermark value to TIMESTAMP value, the parsed watermark timestamp is used to compare with partition time to decide the partition should commit or not. This option is only take effect when `sink.partition-commit.trigger` is set to 'partition-time'. If this option is not configured correctly, e.g. source rowtime is defined on TIMESTAMP_LTZ column, but this config is not configured, then users may see the partition committed after a few hours. The default value is 'UTC', which means the watermark is defined on TIMESTAMP column or not defined. If the watermark is defined on TIMESTAMP_LTZ column, the time zone of watermark is the session time zone. The option value is either a full name such as 'America/Los_Angeles', or a custom timezone id such as 'GMT-8:00'.解析 LONG 类型的水印到 TIMESTAMP 类型时所采用的时区,解析得到的水印的 TIMESTAMP 会被用来跟分区时间进行比较以判断分区是否该被提交。 + 该参数只有在参数 `sink.partition-commit.trigger` 被设置为 'partition-time' 时才生效。 + 如果该参数设置的不正确,比如在 TIMESTAMP_LTZ 列上定义了 source rowtime, 但没有设置该参数,则用户可能在若干个小时后才看到分区的提交。 + 该参数的默认值是 'UTC', 代表水印是定义在 TIMESTAMP 列上或没有定义水印。 如果水印定义在 TIMESTAMP_LTZ 列上,则水印的时区是会话的时区。 + 该参数的可选值要么是完整的时区名比如 'America/Los_Angeles',要么是自定义的时区 id 比如 'GMT-8:00'.
-There are two types of trigger: -- The first is partition processing time. It neither requires partition time extraction nor watermark -generation. The trigger of partition commit according to partition creation time and current system time. This trigger -is more universal, but not so precise. For example, data delay or failover will lead to premature partition commit. -- The second is the trigger of partition commit according to the time that extracted from partition values and watermark. -This requires that your job has watermark generation, and the partition is divided according to time, such as -hourly partition or daily partition. +有两种类型的触发器: +- 第一种是根据分区的处理时间。 该触发器不需要分区时间提取,也不需要生成水印。通过分区创建时间和当前系统时间来触发分区提交。该触发器更通用但不是很精确。比如,数据的延迟或故障转移都会导致分区的提前提交。 +- 第二种是根据从分区字段提取的时间以及水印。这需要你的作业支持生成水印,分区是根据时间来切割的,比如按小时或按天分区。 -If you want to let downstream see the partition as soon as possible, no matter whether its data is complete or not: -- 'sink.partition-commit.trigger'='process-time' (Default value) -- 'sink.partition-commit.delay'='0s' (Default value) -Once there is data in the partition, it will immediately commit. Note: the partition may be committed multiple times. +如果想让下游系统尽快感知到分区,而不管分区数据是否完整: +- 'sink.partition-commit.trigger'='process-time' (默认值) +- 'sink.partition-commit.delay'='0s' (默认值) +一旦分区中有数据,分区立马就会被提交。注意:分区可能会被提交多次。 -If you want to let downstream see the partition only when its data is complete, and your job has watermark generation, and you can extract the time from partition values: +如果想让下游系统只有在分区数据完整时才感知到分区,且你的作业有水印生成的逻辑,也能从分区字段的值中提取到时间: - 'sink.partition-commit.trigger'='partition-time' -- 'sink.partition-commit.delay'='1h' ('1h' if your partition is hourly partition, depends on your partition type) -This is the most accurate way to commit partition, and it will try to ensure that the committed partitions are as data complete as possible. +- 'sink.partition-commit.delay'='1h' (根据分区类型指定,如果是按小时的分区可配置为 '1h') +该方式是最精确的提交分区的方式,该方式尽力确保提交的分区包含尽量完整的数据。 -If you want to let downstream see the partition only when its data is complete, but there is no watermark, or the time cannot be extracted from partition values: -- 'sink.partition-commit.trigger'='process-time' (Default value) -- 'sink.partition-commit.delay'='1h' ('1h' if your partition is hourly partition, depends on your partition type) -Try to commit partition accurately, but data delay or failover will lead to premature partition commit. +如果想让下游系统只有在数据完整时才感知到分区,但是没有水印,或者无法从分区字段的值中提取时间: +- 'sink.partition-commit.trigger'='process-time' (默认值) +- 'sink.partition-commit.delay'='1h' (根据分区类型指定,如果是按小时的分区可配置为 '1h') +该方式尽量精确地提交分区,但是数据延迟或故障转移会导致分区的提前提交。 -Late data processing: The record will be written into its partition when a record is supposed to be -written into a partition that has already been committed, and then the committing of this partition -will be triggered again. +延迟数据的处理:延迟的记录会被写入到已经提交的对应分区中,且会再次触发该分区的提交。 -#### Partition Time Extractor +#### 分区时间提取器 -Time extractors define extracting time from partition values. +时间提取器定义了如何从分区字段值中提取时间. @@ -279,24 +268,27 @@ Time extractors define extracting time from partition values. - + - + - +
partition.time-extractor.kind
default StringTime extractor to extract time from partition values. Support default and custom. For default, can configure timestamp pattern. For custom, should configure extractor class.从分区字段提取时间的时间提取器。支持默认值和定制。对于默认值,可以配置时间戳模式。对于定制,应指定提取器类.
partition.time-extractor.class
(none) StringThe extractor class for implement PartitionTimeExtractor interface.实现了接口 PartitionTimeExtractor 的提取器类.
partition.time-extractor.timestamp-pattern
(none) StringThe 'default' construction way allows users to use partition fields to get a legal timestamp pattern. Default support 'yyyy-mm-dd hh:mm:ss' from first field. If timestamp should be extracted from a single partition field 'dt', can configure: '$dt'. If timestamp should be extracted from multiple partition fields, say 'year', 'month', 'day' and 'hour', can configure: '$year-$month-$day $hour:00:00'. If timestamp should be extracted from two partition fields 'dt' and 'hour', can configure: '$dt $hour:00:00'. 'default' 时间提取器允许用户从分区字段中提取合法的时间戳模式。默认支持从第一个字段按 'yyyy-mm-dd hh:mm:ss' 时间戳模式提取。 + 如果需要从一个分区字段比如 ‘dt’ 提取时间戳,可以配置为: '$dt'; + 如果需要从多个分区字段,比如 'year', 'month', 'day' 和 'hour'提取时间戳,可以配置为:'$year-$month-$day $hour:00:00'; + 如果需要从两字分区字段,比如 'dt' 和 'hour' 提取时间戳,可以配置为:'$dt $hour:00:00'.
-The default extractor is based on a timestamp pattern composed of your partition fields. You can also specify an implementation for fully custom partition extraction based on the `PartitionTimeExtractor` interface. +默认的提取器是基于由分区字段组合而成的时间戳模式。你也可以指定一个实现了 `PartitionTimeExtractor` 接口的自定义的提取器。 ```java @@ -311,12 +303,12 @@ public class HourPartTimeExtractor implements PartitionTimeExtractor { ``` -#### Partition Commit Policy +#### 分区提交策略 -The partition commit policy defines what action is taken when partitions are committed. +分区提交策略指定了提交分区时的具体操作. -- The first is metastore, only hive table supports metastore policy, file system manages partitions through directory structure. -- The second is the success file, which will write an empty file in the directory corresponding to the partition. +- 第一种是 metastore, 只有 hive 表支持该策略, 该策略下文件系统通过目录层次结构来管理分区. +- 第二种是 success 文件, 该策略下会在分区对应的目录下写入一个名为 `_SUCCESS` 的空文件. @@ -332,24 +324,28 @@ The partition commit policy defines what action is taken when partitions are com - + - + - +
sink.partition-commit.policy.kind
(none) StringPolicy to commit a partition is to notify the downstream application that the partition has finished writing, the partition is ready to be read. metastore: add partition to metastore. Only hive table supports metastore policy, file system manages partitions through directory structure. success-file: add '_success' file to directory. Both can be configured at the same time: 'metastore,success-file'. custom: use policy class to create a commit policy. Support to configure multiple policies: 'metastore,success-file'.分区提交策略用来通知下游应用系统某个分区已经写完毕可以被读取了。 + metastore: 向 metastore 中增加分区,只有 hive 支持 metastore 策略,文件系统通过目录结构管理分区; + success-file: 向目录下增加 '_success' 文件; + custom: 使用指定的类来创建提交策略; + 支持同时指定多个提交策略,如:'metastore,success-file'.
sink.partition-commit.policy.class
(none) StringThe partition commit policy class for implement PartitionCommitPolicy interface. Only work in custom commit policy. 实现了 PartitionCommitPolicy 接口的分区提交策略。只有在 custom 提交策略下适用。
sink.partition-commit.success-file.name
_SUCCESS StringThe file name for success-file partition commit policy, default is '_SUCCESS'. 使用 success-file 分区提交策略时的文件名,默认值是 '_SUCCESS'.
-You can extend the implementation of commit policy, The custom commit policy implementation like: +你也可以实现自己的提交策略,如: ```java @@ -378,9 +374,9 @@ public class AnalysisCommitPolicy implements PartitionCommitPolicy { ``` -## Sink Parallelism +## Sink 并行度 -The parallelism of writing files into external file system (including Hive) can be configured by the corresponding table option, which is supported both in streaming mode and in batch mode. By default, the parallelism is configured to being the same as the parallelism of its last upstream chained operator. When the parallelism which is different from the parallelism of the upstream parallelism is configured, the operator of writing files and the operator compacting files (if used) will apply the parallelism. +向外部文件系统(包括 hive) 写文件时的并行度,在流处理模式和批处理模式下,都可以通过对应的 table 选项指定。默认情况下,该并行度跟上一个上游的 chained operator 的并行度一样。当配置了跟上一个上游的 chained operator 不一样的并行度时,写文件的算子和合并文件的算子(如果使用了的话)会使用指定的并行度。 @@ -397,17 +393,17 @@ The parallelism of writing files into external file system (including Hive) can - +
sink.parallelism
(none) IntegerParallelism of writing files into external file system. The value should greater than zero otherwise exception will be thrown. 向外部文件系统写文件时的并行度。必须大于 0,否则会抛出异常.
-**NOTE:** Currently, Configuring sink parallelism is supported if and only if the changelog mode of upstream is **INSERT-ONLY**. Otherwise, exception will be thrown. +**注意:** 当前,只有在上游的 changelog 模式是 **INSERT-ONLY** 时,才支持设置 sink 的并行度。否则的话,会抛出异常。 -## Full Example +## 完整示例 -The below examples show how the file system connector can be used to write a streaming query to write data from Kafka into a file system and runs a batch query to read that data back out. +如下示例演示了如何使用文件系统连接器编写流查询语句查询 kafka 中的数据并写入到文件系统中,以及通过批查询把结果数据读取出来. ```sql @@ -415,7 +411,7 @@ CREATE TABLE kafka_table ( user_id STRING, order_amount DOUBLE, log_ts TIMESTAMP(3), - WATERMARK FOR log_ts AS log_ts - INTERVAL '5' SECOND -- Define watermark on TIMESTAMP column + WATERMARK FOR log_ts AS log_ts - INTERVAL '5' SECOND -- 在 TIMESTAMP 列上定义水印 ) WITH (...); CREATE TABLE fs_table ( @@ -431,7 +427,7 @@ CREATE TABLE fs_table ( 'sink.partition-commit.policy.kind'='success-file' ); --- streaming sql, insert into file system table +-- streaming sql, 插入数据到文件系统表中 INSERT INTO fs_table SELECT user_id, @@ -440,19 +436,19 @@ SELECT DATE_FORMAT(log_ts, 'HH') FROM kafka_table; --- batch sql, select with partition pruning +-- batch sql, 分区裁剪查询 SELECT * FROM fs_table WHERE dt='2020-05-20' and `hour`='12'; ``` -If the watermark is defined on TIMESTAMP_LTZ column and used `partition-time` to commit, the `sink.partition-commit.watermark-time-zone` is required to set to the session time zone, otherwise the partition committed may happen after a few hours. +如果水印是定义在 TIMESTAMP_LTZ 列上,且使用了 `partition-time` 来提交分区, 则参数 `sink.partition-commit.watermark-time-zone` 需要被设置为会话的时区,否则分区会在若干小时后才会被提交。 ```sql CREATE TABLE kafka_table ( user_id STRING, order_amount DOUBLE, - ts BIGINT, -- time in epoch milliseconds + ts BIGINT, -- epoch 毫秒时间 ts_ltz AS TO_TIMESTAMP_LTZ(ts, 3), - WATERMARK FOR ts_ltz AS ts_ltz - INTERVAL '5' SECOND -- Define watermark on TIMESTAMP_LTZ column + WATERMARK FOR ts_ltz AS ts_ltz - INTERVAL '5' SECOND -- 在 TIMESTAMP_LTZ 列上定义水印 ) WITH (...); CREATE TABLE fs_table ( @@ -467,11 +463,11 @@ CREATE TABLE fs_table ( 'partition.time-extractor.timestamp-pattern'='$dt $hour:00:00', 'sink.partition-commit.delay'='1 h', 'sink.partition-commit.trigger'='partition-time', - 'sink.partition-commit.watermark-time-zone'='Asia/Shanghai', -- Assume user configured time zone is 'Asia/Shanghai' + 'sink.partition-commit.watermark-time-zone'='Asia/Shanghai', -- 假定用户配置的时区是 'Asia/Shanghai' 'sink.partition-commit.policy.kind'='success-file' ); --- streaming sql, insert into file system table +-- streaming sql, 插入数据到文件系统表中 INSERT INTO fs_table SELECT user_id, @@ -480,7 +476,7 @@ SELECT DATE_FORMAT(ts_ltz, 'HH') FROM kafka_table; --- batch sql, select with partition pruning +-- batch sql, 分区裁剪查询 SELECT * FROM fs_table WHERE dt='2020-05-20' and `hour`='12'; ```