Skip to content

Commit

Permalink
Improvements to SchemaTransform implementations for BQ and Kafka (apa…
Browse files Browse the repository at this point in the history
…che#23045)

* Improvements to SchemaTransform implementations for BQ and Kafka

* Fix kafka null checs

* fixup

* fixup?

* few improvements

* fixup

* adding test

* fix issue with hashcode
  • Loading branch information
pabloem committed Sep 10, 2022
1 parent d179148 commit 1526ca8
Show file tree
Hide file tree
Showing 4 changed files with 337 additions and 144 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@
import java.util.HashMap;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.io.gcp.testing.FakeBigQueryServices;
import org.apache.beam.sdk.io.gcp.testing.FakeDatasetService;
import org.apache.beam.sdk.io.gcp.testing.FakeJobService;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.Schema.FieldType;
import org.apache.beam.sdk.schemas.io.SchemaIO;
Expand All @@ -34,6 +37,7 @@
import org.apache.beam.sdk.values.PDone;
import org.apache.beam.sdk.values.Row;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.Duration;

/**
* An implementation of {@link SchemaIOProvider} for reading and writing to BigQuery with {@link
Expand Down Expand Up @@ -90,6 +94,7 @@ public Schema configurationSchema() {
.addNullableField("query", FieldType.STRING)
.addNullableField("queryLocation", FieldType.STRING)
.addNullableField("createDisposition", FieldType.STRING)
.addNullableField("useTestingBigQueryServices", FieldType.BOOLEAN)
.build();
}

Expand Down Expand Up @@ -194,7 +199,20 @@ public PDone expand(PCollection<Row> input) {
BigQueryIO.Write<Row> write =
BigQueryIO.<Row>write()
.useBeamSchema()
.withMethod(BigQueryIO.Write.Method.STORAGE_WRITE_API);
.withMethod(BigQueryIO.Write.Method.STORAGE_WRITE_API)
.withTriggeringFrequency(Duration.standardSeconds(5))
.withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)
.withAutoSharding();

final Boolean useTestingBigQueryServices =
config.getBoolean("useTestingBigQueryServices");
if (useTestingBigQueryServices != null && useTestingBigQueryServices) {
FakeBigQueryServices fbqs =
new FakeBigQueryServices()
.withDatasetService(new FakeDatasetService())
.withJobService(new FakeJobService());
write = write.withTestServices(fbqs);
}

String table = config.getString("table");
if (table != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@
package org.apache.beam.sdk.io.kafka;

import com.google.auto.value.AutoValue;
import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.annotation.Nullable;
import org.apache.beam.sdk.annotations.Experimental;
import org.apache.beam.sdk.schemas.AutoValueSchema;
import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
import org.apache.kafka.common.TopicPartition;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;

/**
* Configuration for reading from a Kafka topic.
Expand All @@ -38,187 +38,71 @@
@AutoValue
public abstract class KafkaSchemaTransformReadConfiguration {

public static final Set<String> VALID_START_OFFSET_VALUES = Sets.newHashSet("earliest", "latest");
public static final Set<String> VALID_DATA_FORMATS = Sets.newHashSet("AVRO", "JSON");

public void validate() {
final String startOffset = this.getAutoOffsetResetConfig();
assert startOffset == null || VALID_START_OFFSET_VALUES.contains(startOffset)
: "Valid Kafka Start offset values are " + VALID_START_OFFSET_VALUES;
final String dataFormat = this.getDataFormat();
assert dataFormat == null || VALID_DATA_FORMATS.contains(dataFormat)
: "Valid data formats are " + VALID_DATA_FORMATS;
}

/** Instantiates a {@link KafkaSchemaTransformReadConfiguration.Builder} instance. */
public static Builder builder() {
return new AutoValue_KafkaSchemaTransformReadConfiguration.Builder();
}

/** Sets the bootstrap servers for the Kafka consumer. */
@Nullable
public abstract String getBootstrapServers();

/** Flags whether finalized offsets are committed to Kafka. */
@Nullable
public abstract Boolean getCommitOffsetsInFinalize();

/** Configuration updates for the backend main consumer. */
@Nullable
public abstract Map<String, Object> getConsumerConfigUpdates();

/**
* Sets the timestamps policy based on KafkaTimestampType.CREATE_TIME timestamp of the records.
*/
@Nullable
public abstract Long getCreateTimeMillisecondsMaximumDelay();

/**
* Configure the KafkaIO to use WatchKafkaTopicPartitionDoFn to detect and emit any new available
* {@link TopicPartition} for ReadFromKafkaDoFn to consume during pipeline execution time.
*/
@Nullable
public abstract Long getDynamicReadMillisecondsDuration();

/** Additional configuration for the backend offset consumer. */
@Nullable
public abstract Map<String, Object> getOffsetConsumerConfiguration();
public abstract String getConfluentSchemaRegistryUrl();

/** Specifies whether to include metadata when reading from Kafka topic. */
// TODO(pabloem): Make data format an ENUM
@Nullable
public abstract Boolean getReadWithMetadata();
public abstract String getDataFormat();

/** Sets "isolation_level" to "read_committed" in Kafka consumer configuration. */
@Nullable
public abstract Boolean getReadCommitted();
public abstract String getConfluentSchemaRegistrySubject();

/** Use timestamp to set up start offset. */
@Nullable
public abstract Long getStartReadTimeMillisecondsEpoch();
public abstract String getAvroSchema();

/** Use timestamp to set up stop offset. */
@Nullable
public abstract Long getStopReadTimeMillisecondsEpoch();
public abstract String getAutoOffsetResetConfig();

/**
* A timestamp policy to assign event time for messages in a Kafka partition and watermark for it.
*/
@Nullable
public abstract TimestampPolicyConfiguration getTimestampPolicy();
public abstract Map<String, String> getConsumerConfigUpdates();

/** Sets the topic from which to read. */
@Nullable
public abstract String getTopic();

/** Kafka partitions from which to read. */
@Nullable
public abstract List<TopicPartitionConfiguration> getTopicPartitions();

/** Builder for the {@link KafkaSchemaTransformReadConfiguration}. */
@AutoValue.Builder
public abstract static class Builder {

/** Sets the bootstrap servers for the Kafka consumer. */
public abstract Builder setBootstrapServers(String value);

/** Flags whether finalized offsets are committed to Kafka. */
public abstract Builder setCommitOffsetsInFinalize(Boolean value);

/** Configuration updates for the backend main consumer. */
public abstract Builder setConsumerConfigUpdates(Map<String, Object> value);

/**
* Sets the timestamps policy based on KafkaTimestampType.CREATE_TIME timestamp of the records.
*/
public abstract Builder setCreateTimeMillisecondsMaximumDelay(Long value);

/**
* Configure the KafkaIO to use WatchKafkaTopicPartitionDoFn to detect and emit any new
* available {@link TopicPartition} for ReadFromKafkaDoFn to consume during pipeline execution
* time.
*/
public abstract Builder setDynamicReadMillisecondsDuration(Long value);
public abstract Builder setConfluentSchemaRegistryUrl(String schemaRegistry);

/** Additional configuration for the backend offset consumer. */
public abstract Builder setOffsetConsumerConfiguration(Map<String, Object> value);
public abstract Builder setConfluentSchemaRegistrySubject(String subject);

/** Specifies whether to include metadata when reading from Kafka topic. */
public abstract Builder setReadWithMetadata(Boolean value);
public abstract Builder setAvroSchema(String schema);

/** Sets "isolation_level" to "read_committed" in Kafka consumer configuration. */
public abstract Builder setReadCommitted(Boolean value);
public abstract Builder setDataFormat(String dataFormat);

/** Use timestamp to set up start offset. */
public abstract Builder setStartReadTimeMillisecondsEpoch(Long value);
public abstract Builder setAutoOffsetResetConfig(String startOffset);

/** Use timestamp to set up stop offset. */
public abstract Builder setStopReadTimeMillisecondsEpoch(Long value);

/**
* A timestamp policy to assign event time for messages in a Kafka partition and watermark for
* it.
*/
public abstract Builder setTimestampPolicy(TimestampPolicyConfiguration value);
public abstract Builder setConsumerConfigUpdates(Map<String, String> consumerConfigUpdates);

/** Sets the topic from which to read. */
public abstract Builder setTopic(String value);

/** Kafka partitions from which to read. */
public abstract Builder setTopicPartitions(List<TopicPartitionConfiguration> value);

/** Builds a {@link KafkaSchemaTransformReadConfiguration} instance. */
public abstract KafkaSchemaTransformReadConfiguration build();
}

/**
* A configuration for a {@link TopicPartition}.
*
* <p><b>Internal only:</b> This class is actively being worked on, and it will likely change. We
* provide no backwards compatibility guarantees, and it should not be implemented outside the
* Beam repository.
*/
@Experimental
@DefaultSchema(AutoValueSchema.class)
@AutoValue
public abstract static class TopicPartitionConfiguration {

/** Instantiates a {@link TopicPartitionConfiguration.Builder} instance. */
public static Builder builder() {
return new AutoValue_KafkaSchemaTransformReadConfiguration_TopicPartitionConfiguration
.Builder();
}

/** The name of the topic defining the partition. */
public abstract String getTopic();

/** The number of the topic partition. */
public abstract Integer getPartition();

/** Builder for the {@link TopicPartitionConfiguration}. */
@AutoValue.Builder
public abstract static class Builder {

/** The name of the topic defining the partition. */
public abstract Builder setTopic(String value);

/** The number of the topic partition. */
public abstract Builder setPartition(Integer value);

/** Builds a {@link TopicPartitionConfiguration} instance. */
public abstract TopicPartitionConfiguration build();
}
}

/**
* A timestamp policy to assign event time for messages in a Kafka partition and watermark for it.
*
* <p><b>Internal only:</b> This class is actively being worked on, and it will likely change. We
* provide no backwards compatibility guarantees, and it should not be implemented outside the
* Beam repository.
*/
@Experimental
public enum TimestampPolicyConfiguration {

/**
* Assigns Kafka's log append time (server side ingestion time) to each record. The watermark
* for each Kafka partition is the timestamp of the last record read. If a partition is idle,
* the watermark advances roughly to 'current time - 2 seconds'. See {@link
* KafkaIO.Read#withLogAppendTime()} for longer description.
*/
LOG_APPEND_TIME,

/**
* A simple policy that uses current time for event time and watermark. This should be used when
* better timestamps like LogAppendTime are not available for a topic.
*/
PROCESSING_TIME,
}
}
Loading

0 comments on commit 1526ca8

Please sign in to comment.