From d1db07bcccaf5096d1d3f49d790ea49433aa05a8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sergio=20Pe=C3=B1a?= Date: Thu, 8 Aug 2019 14:13:35 -0500 Subject: [PATCH] fix: default timestamp extractor override is not working (#3176) The use of ksql.streams.default.timestamp.extractor when creating a stream/table is not working. The new value is persisted in the command topic, but KSQL always use a default FailOnInvalidTimestamp This patch fixes KSQL so it honors the new default specified --- .../MetadataTimestampExtractionPolicy.java | 22 ++- .../TimestampExtractionPolicyFactory.java | 21 ++- ...MetadataTimestampExtractionPolicyTest.java | 5 + .../TimestampExtractionPolicyFactoryTest.java | 127 ++++++++++++++++-- .../ksql/ddl/commands/CommandFactories.java | 14 +- .../ddl/commands/CreateSourceCommand.java | 4 +- .../ddl/commands/CreateStreamCommand.java | 4 +- .../ksql/ddl/commands/CreateTableCommand.java | 4 +- .../ksql/ddl/commands/DdlCommandFactory.java | 3 + .../confluent/ksql/engine/EngineContext.java | 3 + .../confluent/ksql/engine/EngineExecutor.java | 1 + .../io/confluent/ksql/engine/QueryEngine.java | 2 +- .../ksql/planner/LogicalPlanner.java | 7 +- .../ddl/commands/CommandFactoriesTest.java | 18 ++- .../ddl/commands/CreateSourceCommandTest.java | 20 ++- .../ddl/commands/CreateStreamCommandTest.java | 9 +- .../ddl/commands/CreateTableCommandTest.java | 9 +- .../physical/PhysicalPlanBuilderTest.java | 4 +- .../ksql/planner/LogicalPlannerTest.java | 7 +- .../PlanSourceExtractorVisitorTest.java | 7 +- .../ksql/planner/plan/AggregateNodeTest.java | 2 +- .../ksql/planner/plan/JoinNodeTest.java | 2 +- .../planner/plan/KsqlBareOutputNodeTest.java | 5 +- .../structured/SchemaKGroupedTableTest.java | 2 +- .../ksql/structured/SchemaKStreamTest.java | 6 +- .../ksql/structured/SchemaKTableTest.java | 8 +- .../structured/SelectValueMapperTest.java | 2 +- .../ksql/structured/SqlPredicateTest.java | 6 +- .../ksql/testutils/AnalysisTestUtil.java | 9 +- .../timestamp-extractor.json | 44 ++++++ .../ksql/rest/entity/KsqlRequest.java | 21 ++- .../ksql/rest/server/KsqlRestApplication.java | 1 + .../ksql/rest/entity/KsqlRequestTest.java | 17 ++- 33 files changed, 354 insertions(+), 62 deletions(-) create mode 100644 ksql-functional-tests/src/test/resources/query-validation-tests/timestamp-extractor.json diff --git a/ksql-common/src/main/java/io/confluent/ksql/util/timestamp/MetadataTimestampExtractionPolicy.java b/ksql-common/src/main/java/io/confluent/ksql/util/timestamp/MetadataTimestampExtractionPolicy.java index d966da17bdab..7b104a171cfb 100644 --- a/ksql-common/src/main/java/io/confluent/ksql/util/timestamp/MetadataTimestampExtractionPolicy.java +++ b/ksql-common/src/main/java/io/confluent/ksql/util/timestamp/MetadataTimestampExtractionPolicy.java @@ -17,30 +17,40 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.google.errorprone.annotations.Immutable; +import java.util.Objects; import org.apache.kafka.streams.processor.FailOnInvalidTimestamp; import org.apache.kafka.streams.processor.TimestampExtractor; @Immutable public class MetadataTimestampExtractionPolicy implements TimestampExtractionPolicy { + private final TimestampExtractor timestampExtractor; @JsonCreator - public MetadataTimestampExtractionPolicy(){} + public MetadataTimestampExtractionPolicy() { + this(new FailOnInvalidTimestamp()); + } + + public MetadataTimestampExtractionPolicy(final TimestampExtractor timestampExtractor) { + this.timestampExtractor = timestampExtractor; + } @Override public TimestampExtractor create(final int columnIndex) { - return new FailOnInvalidTimestamp(); + return timestampExtractor; } @Override public int hashCode() { - return this.getClass().hashCode(); + return Objects.hash(this.getClass(), timestampExtractor.getClass()); } @Override public boolean equals(final Object other) { - if (this == other) { - return true; + if (!(other instanceof MetadataTimestampExtractionPolicy)) { + return false; } - return other instanceof MetadataTimestampExtractionPolicy; + + final MetadataTimestampExtractionPolicy that = (MetadataTimestampExtractionPolicy)other; + return timestampExtractor.getClass() == that.timestampExtractor.getClass(); } } diff --git a/ksql-common/src/main/java/io/confluent/ksql/util/timestamp/TimestampExtractionPolicyFactory.java b/ksql-common/src/main/java/io/confluent/ksql/util/timestamp/TimestampExtractionPolicyFactory.java index 907f52077744..3b8443244fd9 100644 --- a/ksql-common/src/main/java/io/confluent/ksql/util/timestamp/TimestampExtractionPolicyFactory.java +++ b/ksql-common/src/main/java/io/confluent/ksql/util/timestamp/TimestampExtractionPolicyFactory.java @@ -17,10 +17,15 @@ import io.confluent.ksql.ddl.DdlConfig; import io.confluent.ksql.schema.ksql.KsqlSchema; +import io.confluent.ksql.util.KsqlConfig; import io.confluent.ksql.util.KsqlException; + import java.util.Optional; import org.apache.kafka.connect.data.Field; import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.processor.FailOnInvalidTimestamp; +import org.apache.kafka.streams.processor.TimestampExtractor; public final class TimestampExtractionPolicyFactory { @@ -28,12 +33,13 @@ private TimestampExtractionPolicyFactory() { } public static TimestampExtractionPolicy create( + final KsqlConfig ksqlConfig, final KsqlSchema schema, final Optional timestampColumnName, final Optional timestampFormat ) { if (!timestampColumnName.isPresent()) { - return new MetadataTimestampExtractionPolicy(); + return new MetadataTimestampExtractionPolicy(getDefaultTimestampExtractor(ksqlConfig)); } final String fieldName = timestampColumnName.get().toUpperCase(); @@ -71,4 +77,17 @@ public static TimestampExtractionPolicy create( + " specified"); } + private static TimestampExtractor getDefaultTimestampExtractor(final KsqlConfig ksqlConfig) { + try { + final Class timestampExtractorClass = (Class) ksqlConfig.getKsqlStreamConfigProps() + .getOrDefault( + StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, + FailOnInvalidTimestamp.class + ); + + return (TimestampExtractor) timestampExtractorClass.newInstance(); + } catch (final Exception e) { + throw new KsqlException("Cannot override default timestamp extractor: " + e.getMessage(), e); + } + } } diff --git a/ksql-common/src/test/java/io/confluent/ksql/util/timestamp/MetadataTimestampExtractionPolicyTest.java b/ksql-common/src/test/java/io/confluent/ksql/util/timestamp/MetadataTimestampExtractionPolicyTest.java index 9befcf39b569..4f5e8252edd1 100644 --- a/ksql-common/src/test/java/io/confluent/ksql/util/timestamp/MetadataTimestampExtractionPolicyTest.java +++ b/ksql-common/src/test/java/io/confluent/ksql/util/timestamp/MetadataTimestampExtractionPolicyTest.java @@ -16,6 +16,7 @@ package io.confluent.ksql.util.timestamp; import com.google.common.testing.EqualsTester; +import org.apache.kafka.streams.processor.UsePreviousTimeOnInvalidTimestamp; import org.junit.Test; public class MetadataTimestampExtractionPolicyTest { @@ -25,6 +26,10 @@ public void shouldTestEqualityCorrectly() { .addEqualityGroup( new MetadataTimestampExtractionPolicy(), new MetadataTimestampExtractionPolicy()) + .addEqualityGroup( + new MetadataTimestampExtractionPolicy(new UsePreviousTimeOnInvalidTimestamp()), + new MetadataTimestampExtractionPolicy(new UsePreviousTimeOnInvalidTimestamp()) + ) .testEquals(); } } diff --git a/ksql-common/src/test/java/io/confluent/ksql/util/timestamp/TimestampExtractionPolicyFactoryTest.java b/ksql-common/src/test/java/io/confluent/ksql/util/timestamp/TimestampExtractionPolicyFactoryTest.java index 294f4824f846..860eaf864707 100644 --- a/ksql-common/src/test/java/io/confluent/ksql/util/timestamp/TimestampExtractionPolicyFactoryTest.java +++ b/ksql-common/src/test/java/io/confluent/ksql/util/timestamp/TimestampExtractionPolicyFactoryTest.java @@ -19,12 +19,22 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.core.IsInstanceOf.instanceOf; +import com.google.common.collect.ImmutableMap; import io.confluent.ksql.schema.ksql.KsqlSchema; +import io.confluent.ksql.util.KsqlConfig; import io.confluent.ksql.util.KsqlException; +import java.util.Collections; import java.util.Optional; +import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaBuilder; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.processor.FailOnInvalidTimestamp; +import org.apache.kafka.streams.processor.UsePreviousTimeOnInvalidTimestamp; +import org.junit.Before; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.ExpectedException; public class TimestampExtractionPolicyFactoryTest { @@ -32,14 +42,91 @@ public class TimestampExtractionPolicyFactoryTest { private final SchemaBuilder schemaBuilder = SchemaBuilder.struct() .field("id", Schema.OPTIONAL_INT64_SCHEMA); + private KsqlConfig ksqlConfig; + + @Rule + public final ExpectedException expectedException = ExpectedException.none(); + + @Before + public void setup() { + ksqlConfig = new KsqlConfig(Collections.emptyMap()); + } + @Test public void shouldCreateMetadataPolicyWhenTimestampFieldNotProvided() { // When: final TimestampExtractionPolicy result = TimestampExtractionPolicyFactory - .create(KsqlSchema.of(schemaBuilder.build()), Optional.empty(), Optional.empty()); + .create( + ksqlConfig, + KsqlSchema.of(schemaBuilder.build()), + Optional.empty(), + Optional.empty() + ); + + // Then: + assertThat(result, instanceOf(MetadataTimestampExtractionPolicy.class)); + assertThat(result.create(0), instanceOf(FailOnInvalidTimestamp.class)); + } + + @Test + public void shouldThrowIfTimestampExtractorConfigIsInvalidClass() { + // Given: + final KsqlConfig ksqlConfig = new KsqlConfig(ImmutableMap.of( + StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, + this.getClass() + )); + + // Then: + expectedException.expect(KsqlException.class); + expectedException.expectMessage( + "cannot be cast to org.apache.kafka.streams.processor.TimestampExtractor"); + + // When: + TimestampExtractionPolicyFactory + .create( + ksqlConfig, + KsqlSchema.of(schemaBuilder.build()), + Optional.empty(), + Optional.empty() + ); + } + + @Test + public void shouldCreateMetadataPolicyWithDefaultFailedOnInvalidTimestamp() { + // When: + final TimestampExtractionPolicy result = TimestampExtractionPolicyFactory + .create( + ksqlConfig, + KsqlSchema.of(schemaBuilder.build()), + Optional.empty(), + Optional.empty() + ); + + // Then: + assertThat(result, instanceOf(MetadataTimestampExtractionPolicy.class)); + assertThat(result.create(0), instanceOf(FailOnInvalidTimestamp.class)); + } + + @Test + public void shouldCreateMetadataPolicyWithConfiguredUsePreviousTimeOnInvalidTimestamp() { + // Given: + final KsqlConfig ksqlConfig = new KsqlConfig(ImmutableMap.of( + StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, + UsePreviousTimeOnInvalidTimestamp.class + )); + + // When: + final TimestampExtractionPolicy result = TimestampExtractionPolicyFactory + .create( + ksqlConfig, + KsqlSchema.of(schemaBuilder.build()), + Optional.empty(), + Optional.empty() + ); // Then: assertThat(result, instanceOf(MetadataTimestampExtractionPolicy.class)); + assertThat(result.create(0), instanceOf(UsePreviousTimeOnInvalidTimestamp.class)); } @Test @@ -52,17 +139,26 @@ public void shouldCreateLongTimestampPolicyWhenTimestampFieldIsOfTypeLong() { // When: final TimestampExtractionPolicy result = TimestampExtractionPolicyFactory - .create(KsqlSchema.of(schema), Optional.of(timestamp), Optional.empty()); + .create(ksqlConfig, KsqlSchema.of(schema), Optional.of(timestamp), Optional.empty()); // Then: assertThat(result, instanceOf(LongColumnTimestampExtractionPolicy.class)); assertThat(result.timestampField(), equalTo(timestamp.toUpperCase())); } - @Test(expected = KsqlException.class) + @Test public void shouldFailIfCantFindTimestampField() { + // Then: + expectedException.expect(KsqlException.class); + + // When: TimestampExtractionPolicyFactory - .create(KsqlSchema.of(schemaBuilder.build()), Optional.of("whateva"), Optional.empty()); + .create( + ksqlConfig, + KsqlSchema.of(schemaBuilder.build()), + Optional.of("whateva"), + Optional.empty() + ); } @Test @@ -75,14 +171,14 @@ public void shouldCreateStringTimestampPolicyWhenTimestampFieldIsStringTypeAndFo // When: final TimestampExtractionPolicy result = TimestampExtractionPolicyFactory - .create(KsqlSchema.of(schema), Optional.of(field), Optional.of("yyyy-MM-DD")); + .create(ksqlConfig, KsqlSchema.of(schema), Optional.of(field), Optional.of("yyyy-MM-DD")); // Then: assertThat(result, instanceOf(StringTimestampExtractionPolicy.class)); assertThat(result.timestampField(), equalTo(field.toUpperCase())); } - @Test(expected = KsqlException.class) + @Test public void shouldFailIfStringTimestampTypeAndFormatNotSupplied() { // Given: final String field = "my_string_field"; @@ -90,12 +186,15 @@ public void shouldFailIfStringTimestampTypeAndFormatNotSupplied() { .field(field.toUpperCase(), Schema.OPTIONAL_STRING_SCHEMA) .build(); + // Then: + expectedException.expect(KsqlException.class); + // When: TimestampExtractionPolicyFactory - .create(KsqlSchema.of(schema), Optional.of(field), Optional.empty()); + .create(ksqlConfig, KsqlSchema.of(schema), Optional.of(field), Optional.empty()); } - @Test(expected = KsqlException.class) + @Test public void shouldThorwIfLongTimestampTypeAndFormatIsSupplied() { // Given: final String timestamp = "timestamp"; @@ -103,12 +202,15 @@ public void shouldThorwIfLongTimestampTypeAndFormatIsSupplied() { .field(timestamp.toUpperCase(), Schema.OPTIONAL_INT64_SCHEMA) .build(); + // Then: + expectedException.expect(KsqlException.class); + // When: TimestampExtractionPolicyFactory - .create(KsqlSchema.of(schema), Optional.of(timestamp), Optional.of("b")); + .create(ksqlConfig, KsqlSchema.of(schema), Optional.of(timestamp), Optional.of("b")); } - @Test(expected = KsqlException.class) + @Test public void shouldThrowIfTimestampFieldTypeIsNotLongOrString() { // Given: final String field = "blah"; @@ -116,8 +218,11 @@ public void shouldThrowIfTimestampFieldTypeIsNotLongOrString() { .field(field.toUpperCase(), Schema.OPTIONAL_FLOAT64_SCHEMA) .build(); + // Then: + expectedException.expect(KsqlException.class); + // When: TimestampExtractionPolicyFactory - .create(KsqlSchema.of(schema), Optional.of(field), Optional.empty()); + .create(ksqlConfig, KsqlSchema.of(schema), Optional.of(field), Optional.empty()); } } \ No newline at end of file diff --git a/ksql-engine/src/main/java/io/confluent/ksql/ddl/commands/CommandFactories.java b/ksql-engine/src/main/java/io/confluent/ksql/ddl/commands/CommandFactories.java index c8672f752f79..0d8686d9bccb 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/ddl/commands/CommandFactories.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/ddl/commands/CommandFactories.java @@ -29,6 +29,7 @@ import io.confluent.ksql.services.ServiceContext; import io.confluent.ksql.util.HandlerMaps; import io.confluent.ksql.util.HandlerMaps.ClassHandlerMapR2; +import io.confluent.ksql.util.KsqlConfig; import io.confluent.ksql.util.KsqlException; import java.util.Map; import java.util.Objects; @@ -62,6 +63,7 @@ public CommandFactories(final ServiceContext serviceContext) { public DdlCommand create( final String sqlExpression, final DdlStatement ddlStatement, + final KsqlConfig ksqlConfig, final Map properties ) { return FACTORIES @@ -75,7 +77,7 @@ public DdlCommand create( }) .handle( this, - new CallInfo(sqlExpression, properties), + new CallInfo(sqlExpression, ksqlConfig, properties), ddlStatement); } @@ -90,6 +92,7 @@ private CreateStreamCommand handleCreateStream( return new CreateStreamCommand( callInfo.sqlExpression, statement, + callInfo.ksqlConfig, serviceContext.getTopicClient()); } @@ -100,6 +103,7 @@ private CreateTableCommand handleCreateTable( return new CreateTableCommand( callInfo.sqlExpression, statement, + callInfo.ksqlConfig, serviceContext.getTopicClient()); } @@ -140,14 +144,18 @@ private UnsetPropertyCommand handleUnsetProperty( private static final class CallInfo { final String sqlExpression; + final KsqlConfig ksqlConfig; final Map properties; private CallInfo( final String sqlExpression, + final KsqlConfig ksqlConfig, final Map properties ) { - this.sqlExpression = sqlExpression; - this.properties = properties; + this.sqlExpression = Objects.requireNonNull(sqlExpression, "sqlExpression"); + this.properties = Objects.requireNonNull(properties, "properties"); + this.ksqlConfig = Objects.requireNonNull(ksqlConfig, "ksqlConfig") + .cloneWithPropertyOverwrite(properties); } } } diff --git a/ksql-engine/src/main/java/io/confluent/ksql/ddl/commands/CreateSourceCommand.java b/ksql-engine/src/main/java/io/confluent/ksql/ddl/commands/CreateSourceCommand.java index ccd28fd6bbc6..ea9e02fb92c3 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/ddl/commands/CreateSourceCommand.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/ddl/commands/CreateSourceCommand.java @@ -24,6 +24,7 @@ import io.confluent.ksql.schema.ksql.KsqlSchema; import io.confluent.ksql.schema.ksql.LogicalSchemas; import io.confluent.ksql.services.KafkaTopicClient; +import io.confluent.ksql.util.KsqlConfig; import io.confluent.ksql.util.KsqlException; import io.confluent.ksql.util.SchemaUtil; import io.confluent.ksql.util.StringUtil; @@ -55,6 +56,7 @@ abstract class CreateSourceCommand implements DdlCommand { CreateSourceCommand( final String sqlExpression, final CreateSource statement, + final KsqlConfig ksqlConfig, final KafkaTopicClient kafkaTopicClient ) { this.sqlExpression = sqlExpression; @@ -92,7 +94,7 @@ abstract class CreateSourceCommand implements DdlCommand { final Optional timestampName = properties.getTimestampName(); final Optional timestampFormat = properties.getTimestampFormat(); this.timestampExtractionPolicy = TimestampExtractionPolicyFactory - .create(schema, timestampName, timestampFormat); + .create(ksqlConfig, schema, timestampName, timestampFormat); this.keySerdeFactory = extractKeySerde(properties); } diff --git a/ksql-engine/src/main/java/io/confluent/ksql/ddl/commands/CreateStreamCommand.java b/ksql-engine/src/main/java/io/confluent/ksql/ddl/commands/CreateStreamCommand.java index 774c571ff03e..b7120c8005ea 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/ddl/commands/CreateStreamCommand.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/ddl/commands/CreateStreamCommand.java @@ -19,6 +19,7 @@ import io.confluent.ksql.metastore.model.KsqlStream; import io.confluent.ksql.parser.tree.CreateStream; import io.confluent.ksql.services.KafkaTopicClient; +import io.confluent.ksql.util.KsqlConfig; import io.confluent.ksql.util.KsqlException; public class CreateStreamCommand extends CreateSourceCommand { @@ -26,9 +27,10 @@ public class CreateStreamCommand extends CreateSourceCommand { public CreateStreamCommand( final String sqlExpression, final CreateStream createStream, + final KsqlConfig ksqlConfig, final KafkaTopicClient kafkaTopicClient ) { - super(sqlExpression, createStream, kafkaTopicClient); + super(sqlExpression, createStream, ksqlConfig, kafkaTopicClient); } @Override diff --git a/ksql-engine/src/main/java/io/confluent/ksql/ddl/commands/CreateTableCommand.java b/ksql-engine/src/main/java/io/confluent/ksql/ddl/commands/CreateTableCommand.java index fbd4968d4de7..32a1f7026130 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/ddl/commands/CreateTableCommand.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/ddl/commands/CreateTableCommand.java @@ -19,6 +19,7 @@ import io.confluent.ksql.metastore.model.KsqlTable; import io.confluent.ksql.parser.tree.CreateTable; import io.confluent.ksql.services.KafkaTopicClient; +import io.confluent.ksql.util.KsqlConfig; import io.confluent.ksql.util.KsqlException; public class CreateTableCommand extends CreateSourceCommand { @@ -26,9 +27,10 @@ public class CreateTableCommand extends CreateSourceCommand { CreateTableCommand( final String sqlExpression, final CreateTable createTable, + final KsqlConfig ksqlConfig, final KafkaTopicClient kafkaTopicClient ) { - super(sqlExpression, createTable, kafkaTopicClient); + super(sqlExpression, createTable, ksqlConfig, kafkaTopicClient); } @Override diff --git a/ksql-engine/src/main/java/io/confluent/ksql/ddl/commands/DdlCommandFactory.java b/ksql-engine/src/main/java/io/confluent/ksql/ddl/commands/DdlCommandFactory.java index 9f12bdc36580..1db22ccdcece 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/ddl/commands/DdlCommandFactory.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/ddl/commands/DdlCommandFactory.java @@ -16,12 +16,15 @@ package io.confluent.ksql.ddl.commands; import io.confluent.ksql.parser.tree.DdlStatement; +import io.confluent.ksql.util.KsqlConfig; + import java.util.Map; public interface DdlCommandFactory { DdlCommand create( String sqlExpression, DdlStatement ddlStatement, + KsqlConfig ksqlConfig, Map properties ); } diff --git a/ksql-engine/src/main/java/io/confluent/ksql/engine/EngineContext.java b/ksql-engine/src/main/java/io/confluent/ksql/engine/EngineContext.java index cd0e596cbbdb..263fcf5afa2f 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/engine/EngineContext.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/engine/EngineContext.java @@ -29,6 +29,7 @@ import io.confluent.ksql.query.QueryId; import io.confluent.ksql.services.SandboxedServiceContext; import io.confluent.ksql.services.ServiceContext; +import io.confluent.ksql.util.KsqlConfig; import io.confluent.ksql.util.KsqlException; import io.confluent.ksql.util.KsqlStatementException; import io.confluent.ksql.util.PersistentQueryMetadata; @@ -154,6 +155,7 @@ QueryEngine createQueryEngine(final ServiceContext serviceContext) { String executeDdlStatement( final String sqlExpression, final ExecutableDdlStatement statement, + final KsqlConfig ksqlConfig, final Map overriddenProperties ) { KsqlEngineProps.throwOnImmutableOverride(overriddenProperties); @@ -161,6 +163,7 @@ String executeDdlStatement( final DdlCommand command = ddlCommandFactory.create( sqlExpression, statement, + ksqlConfig, overriddenProperties ); diff --git a/ksql-engine/src/main/java/io/confluent/ksql/engine/EngineExecutor.java b/ksql-engine/src/main/java/io/confluent/ksql/engine/EngineExecutor.java index 9d690a54beb0..55dbcf167411 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/engine/EngineExecutor.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/engine/EngineExecutor.java @@ -89,6 +89,7 @@ ExecuteResult execute(final ConfiguredStatement statement) { final String msg = engineContext.executeDdlStatement( statement.getStatementText(), (ExecutableDdlStatement) statement.getStatement(), + ksqlConfig, overriddenProperties ); diff --git a/ksql-engine/src/main/java/io/confluent/ksql/engine/QueryEngine.java b/ksql-engine/src/main/java/io/confluent/ksql/engine/QueryEngine.java index 3b1421475ee3..6705aaacc7da 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/engine/QueryEngine.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/engine/QueryEngine.java @@ -150,6 +150,6 @@ private static OutputNode buildQueryLogicalPlan( final Analysis analysis = queryAnalyzer.analyze(sqlExpression, query, sink); final AggregateAnalysisResult aggAnalysis = queryAnalyzer.analyzeAggregate(query, analysis); - return new LogicalPlanner(analysis, aggAnalysis, metaStore).buildPlan(); + return new LogicalPlanner(config, analysis, aggAnalysis, metaStore).buildPlan(); } } diff --git a/ksql-engine/src/main/java/io/confluent/ksql/planner/LogicalPlanner.java b/ksql-engine/src/main/java/io/confluent/ksql/planner/LogicalPlanner.java index 94070ad74b1f..192f4de1df99 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/planner/LogicalPlanner.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/planner/LogicalPlanner.java @@ -36,6 +36,7 @@ import io.confluent.ksql.planner.plan.ProjectNode; import io.confluent.ksql.schema.ksql.KsqlSchema; import io.confluent.ksql.util.ExpressionTypeManager; +import io.confluent.ksql.util.KsqlConfig; import io.confluent.ksql.util.KsqlException; import io.confluent.ksql.util.Pair; import io.confluent.ksql.util.timestamp.TimestampExtractionPolicy; @@ -49,15 +50,18 @@ public class LogicalPlanner { // CHECKSTYLE_RULES.ON: ClassDataAbstractionCoupling + private final KsqlConfig ksqlConfig; private final Analysis analysis; private final AggregateAnalysisResult aggregateAnalysis; private final FunctionRegistry functionRegistry; public LogicalPlanner( + final KsqlConfig ksqlConfig, final Analysis analysis, final AggregateAnalysisResult aggregateAnalysis, final FunctionRegistry functionRegistry ) { + this.ksqlConfig = ksqlConfig; this.analysis = analysis; this.aggregateAnalysis = aggregateAnalysis; this.functionRegistry = functionRegistry; @@ -125,11 +129,12 @@ private OutputNode buildOutputNode(final PlanNode sourcePlanNode) { ); } - private static TimestampExtractionPolicy getTimestampExtractionPolicy( + private TimestampExtractionPolicy getTimestampExtractionPolicy( final KsqlSchema inputSchema, final Analysis analysis ) { return TimestampExtractionPolicyFactory.create( + ksqlConfig, inputSchema, analysis.getTimestampColumnName(), analysis.getTimestampFormat()); diff --git a/ksql-engine/src/test/java/io/confluent/ksql/ddl/commands/CommandFactoriesTest.java b/ksql-engine/src/test/java/io/confluent/ksql/ddl/commands/CommandFactoriesTest.java index deb03a05dd9e..5c524425c020 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/ddl/commands/CommandFactoriesTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/ddl/commands/CommandFactoriesTest.java @@ -21,6 +21,7 @@ import static org.hamcrest.MatcherAssert.assertThat; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient; import io.confluent.ksql.ddl.DdlConfig; import io.confluent.ksql.parser.tree.CreateStream; @@ -38,6 +39,7 @@ import io.confluent.ksql.parser.tree.Type.SqlType; import io.confluent.ksql.services.KafkaTopicClient; import io.confluent.ksql.services.ServiceContext; +import io.confluent.ksql.util.KsqlConfig; import io.confluent.ksql.util.KsqlException; import java.util.Collections; import java.util.HashMap; @@ -58,6 +60,8 @@ public class CommandFactoriesTest { private final CommandFactories commandFactories = new CommandFactories(serviceContext); private final HashMap properties = new HashMap<>(); + private KsqlConfig ksqlConfig = new KsqlConfig(ImmutableMap.of()); + @Before public void before() { @@ -79,7 +83,7 @@ public void before() { public void shouldCreateDDLCommandForRegisterTopic() { final DdlCommand result = commandFactories.create( sqlExpression, new RegisterTopic(QualifiedName.of("blah"), - true, properties), NO_PROPS); + true, properties), ksqlConfig, NO_PROPS); assertThat(result, instanceOf(RegisterTopicCommand.class)); } @@ -88,8 +92,7 @@ sqlExpression, new RegisterTopic(QualifiedName.of("blah"), public void shouldCreateCommandForCreateStream() { final DdlCommand result = commandFactories.create( sqlExpression, new CreateStream(QualifiedName.of("foo"), - SOME_ELEMENTS, true, properties), - NO_PROPS); + SOME_ELEMENTS, true, properties), ksqlConfig, NO_PROPS); assertThat(result, instanceOf(CreateStreamCommand.class)); } @@ -99,8 +102,7 @@ public void shouldCreateCommandForCreateTable() { final HashMap tableProperties = validTableProps(); final DdlCommand result = commandFactories - .create(sqlExpression, createTable(tableProperties), - NO_PROPS); + .create(sqlExpression, createTable(tableProperties), ksqlConfig, NO_PROPS); assertThat(result, instanceOf(CreateTableCommand.class)); } @@ -109,6 +111,7 @@ public void shouldCreateCommandForCreateTable() { public void shouldCreateCommandForDropStream() { final DdlCommand result = commandFactories.create(sqlExpression, new DropStream(QualifiedName.of("foo"), true, true), + ksqlConfig, NO_PROPS ); assertThat(result, instanceOf(DropSourceCommand.class)); @@ -118,6 +121,7 @@ public void shouldCreateCommandForDropStream() { public void shouldCreateCommandForDropTable() { final DdlCommand result = commandFactories.create(sqlExpression, new DropTable(QualifiedName.of("foo"), true, true), + ksqlConfig, NO_PROPS ); assertThat(result, instanceOf(DropSourceCommand.class)); @@ -127,6 +131,7 @@ public void shouldCreateCommandForDropTable() { public void shouldCreateCommandForDropTopic() { final DdlCommand result = commandFactories.create(sqlExpression, new DropTopic(QualifiedName.of("foo"), true), + ksqlConfig, NO_PROPS ); assertThat(result, instanceOf(DropTopicCommand.class)); @@ -134,8 +139,7 @@ public void shouldCreateCommandForDropTopic() { @Test(expected = KsqlException.class) public void shouldThowKsqlExceptionIfCommandFactoryNotFound() { - commandFactories.create(sqlExpression, new ExecutableDdlStatement() {}, - NO_PROPS); + commandFactories.create(sqlExpression, new ExecutableDdlStatement() {}, ksqlConfig, NO_PROPS); } private HashMap validTableProps() { diff --git a/ksql-engine/src/test/java/io/confluent/ksql/ddl/commands/CreateSourceCommandTest.java b/ksql-engine/src/test/java/io/confluent/ksql/ddl/commands/CreateSourceCommandTest.java index b673d554bc49..43f297c88410 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/ddl/commands/CreateSourceCommandTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/ddl/commands/CreateSourceCommandTest.java @@ -35,6 +35,7 @@ import io.confluent.ksql.parser.tree.TableElement; import io.confluent.ksql.parser.tree.Type.SqlType; import io.confluent.ksql.services.KafkaTopicClient; +import io.confluent.ksql.util.KsqlConfig; import io.confluent.ksql.util.KsqlException; import java.util.Collections; import java.util.HashMap; @@ -62,12 +63,16 @@ public class CreateSourceCommandTest { @Mock private KafkaTopicClient kafkaTopicClient; + private KsqlConfig ksqlConfig; + @Before public void setUp() { when(statement.getElements()).thenReturn(SOME_ELEMENTS); when(statement.getName()).thenReturn(QualifiedName.of("bob")); givenPropertiesWith(ImmutableMap.of()); when(kafkaTopicClient.isTopicExists(any())).thenReturn(true); + + ksqlConfig = new KsqlConfig(ImmutableMap.of()); } @Test @@ -81,7 +86,7 @@ public void shouldThrowOnNoElements() { "The statement does not define any columns."); // When: - new TestCmd("look mum, no columns", statement, kafkaTopicClient); + new TestCmd("look mum, no columns", statement, ksqlConfig, kafkaTopicClient); } @Test @@ -90,7 +95,7 @@ public void shouldNotThrowWhenThereAreElements() { when(statement.getElements()).thenReturn(SOME_ELEMENTS); // When: - new TestCmd("look mum, columns", statement, kafkaTopicClient); + new TestCmd("look mum, columns", statement, ksqlConfig, kafkaTopicClient); // Then: not exception thrown } @@ -105,7 +110,7 @@ public void shouldThrowIfTopicDoesNotExist() { expectedException.expectMessage("Kafka topic does not exist: " + TOPIC_NAME); // When: - new TestCmd("what, no value topic?", statement, kafkaTopicClient); + new TestCmd("what, no value topic?", statement, ksqlConfig, kafkaTopicClient); } @Test @@ -114,7 +119,7 @@ public void shouldNotThrowIfTopicDoesExist() { when(kafkaTopicClient.isTopicExists(TOPIC_NAME)).thenReturn(true); // When: - new TestCmd("what, no value topic?", statement, kafkaTopicClient); + new TestCmd("what, no value topic?", statement, ksqlConfig, kafkaTopicClient); // Then: verify(kafkaTopicClient).isTopicExists(TOPIC_NAME); @@ -133,7 +138,7 @@ public void shouldThrowIfKeyFieldNotInSchema() { + "'WILL-NOT-FIND-ME'"); // When: - new TestCmd("key not in schema!", statement, kafkaTopicClient); + new TestCmd("key not in schema!", statement, ksqlConfig, kafkaTopicClient); } @Test @@ -150,7 +155,7 @@ public void shouldThrowIfTimestampColumnDoesNotExist() { + "'WILL-NOT-FIND-ME'"); // When: - new TestCmd("key not in schema!", statement, kafkaTopicClient); + new TestCmd("key not in schema!", statement, ksqlConfig, kafkaTopicClient); } private static Map minValidProps() { @@ -178,9 +183,10 @@ private static final class TestCmd extends CreateSourceCommand { private TestCmd( final String sqlExpression, final CreateSource statement, + final KsqlConfig ksqlConfig, final KafkaTopicClient kafkaTopicClient ) { - super(sqlExpression, statement, kafkaTopicClient); + super(sqlExpression, statement, ksqlConfig, kafkaTopicClient); } @Override diff --git a/ksql-engine/src/test/java/io/confluent/ksql/ddl/commands/CreateStreamCommandTest.java b/ksql-engine/src/test/java/io/confluent/ksql/ddl/commands/CreateStreamCommandTest.java index 1a0d474ec527..d55850e306a7 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/ddl/commands/CreateStreamCommandTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/ddl/commands/CreateStreamCommandTest.java @@ -37,6 +37,7 @@ import io.confluent.ksql.parser.tree.TableElement; import io.confluent.ksql.parser.tree.Type.SqlType; import io.confluent.ksql.services.KafkaTopicClient; +import io.confluent.ksql.util.KsqlConfig; import io.confluent.ksql.util.KsqlException; import io.confluent.ksql.util.MetaStoreFixture; import java.util.Collections; @@ -67,6 +68,8 @@ public class CreateStreamCommandTest { private KafkaTopicClient topicClient; @Mock private CreateStream createStreamStatement; + @Mock + private KsqlConfig ksqlConfig; @Rule public final ExpectedException expectedException = ExpectedException.none(); @@ -190,7 +193,11 @@ public void shouldAddSourceWithNoKeyField() { } private CreateStreamCommand createCmd() { - return new CreateStreamCommand("some sql", createStreamStatement, topicClient); + return new CreateStreamCommand( + "some sql", + createStreamStatement, + ksqlConfig, + topicClient); } private void givenPropertiesWith(final Map props) { diff --git a/ksql-engine/src/test/java/io/confluent/ksql/ddl/commands/CreateTableCommandTest.java b/ksql-engine/src/test/java/io/confluent/ksql/ddl/commands/CreateTableCommandTest.java index 52a248f3f33c..2364563f32c4 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/ddl/commands/CreateTableCommandTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/ddl/commands/CreateTableCommandTest.java @@ -37,6 +37,7 @@ import io.confluent.ksql.parser.tree.TableElement; import io.confluent.ksql.parser.tree.Type.SqlType; import io.confluent.ksql.services.KafkaTopicClient; +import io.confluent.ksql.util.KsqlConfig; import io.confluent.ksql.util.KsqlException; import io.confluent.ksql.util.MetaStoreFixture; import java.util.Collections; @@ -62,6 +63,8 @@ public class CreateTableCommandTest { private KafkaTopicClient topicClient; @Mock private CreateTable createTableStatement; + @Mock + private KsqlConfig ksqlConfig; @Rule public final ExpectedException expectedException = ExpectedException.none(); @@ -188,7 +191,11 @@ public void shouldAddSourceWithNoKeyField() { private CreateTableCommand createCmd() { - return new CreateTableCommand("some sql", createTableStatement, topicClient); + return new CreateTableCommand( + "some sql", + createTableStatement, + ksqlConfig, + topicClient); } private void givenPropertiesWith(final Map props) { diff --git a/ksql-engine/src/test/java/io/confluent/ksql/physical/PhysicalPlanBuilderTest.java b/ksql-engine/src/test/java/io/confluent/ksql/physical/PhysicalPlanBuilderTest.java index b99de58ccdc8..cd311910069f 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/physical/PhysicalPlanBuilderTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/physical/PhysicalPlanBuilderTest.java @@ -228,7 +228,7 @@ private PhysicalPlanBuilder buildPhysicalPlanBuilder( } private QueryMetadata buildPhysicalPlan(final String query) { - final OutputNode logical = AnalysisTestUtil.buildLogicalPlan(query, metaStore);; + final OutputNode logical = AnalysisTestUtil.buildLogicalPlan(ksqlConfig, query, metaStore);; return physicalPlanBuilder.buildPhysicalPlan(new LogicalPlanNode(query, Optional.of(logical))); } @@ -682,7 +682,7 @@ public void shouldConfigureProducerErrorHandlerLogger() { final ProcessingLogger logger = mock(ProcessingLogger.class); when(processingLogContext.getLoggerFactory()).thenReturn(loggerFactory); final OutputNode spyNode = spy( - AnalysisTestUtil.buildLogicalPlan(simpleSelectFilter, metaStore)); + AnalysisTestUtil.buildLogicalPlan(ksqlConfig, simpleSelectFilter, metaStore)); doReturn(new QueryId("foo")).when(spyNode).getQueryId(any()); when(loggerFactory.getLogger("foo")).thenReturn(logger); when(loggerFactory.getLogger(ArgumentMatchers.startsWith("foo."))) diff --git a/ksql-engine/src/test/java/io/confluent/ksql/planner/LogicalPlannerTest.java b/ksql-engine/src/test/java/io/confluent/ksql/planner/LogicalPlannerTest.java index c60b444fefb0..186a3c6f54b8 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/planner/LogicalPlannerTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/planner/LogicalPlannerTest.java @@ -34,7 +34,10 @@ import io.confluent.ksql.planner.plan.PlanNode; import io.confluent.ksql.planner.plan.ProjectNode; import io.confluent.ksql.testutils.AnalysisTestUtil; +import io.confluent.ksql.util.KsqlConfig; import io.confluent.ksql.util.MetaStoreFixture; + +import java.util.Collections; import java.util.Optional; import org.apache.kafka.connect.data.Schema; import org.junit.Assert; @@ -44,10 +47,12 @@ public class LogicalPlannerTest { private MetaStore metaStore; + private KsqlConfig ksqlConfig; @Before public void init() { metaStore = MetaStoreFixture.getNewMetaStore(new InternalFunctionRegistry()); + ksqlConfig = new KsqlConfig(Collections.emptyMap()); } @Test @@ -249,6 +254,6 @@ public void shouldUpdateKeyToReflectProjectionAlias() { } private PlanNode buildLogicalPlan(final String query) { - return AnalysisTestUtil.buildLogicalPlan(query, metaStore); + return AnalysisTestUtil.buildLogicalPlan(ksqlConfig, query, metaStore); } } diff --git a/ksql-engine/src/test/java/io/confluent/ksql/planner/PlanSourceExtractorVisitorTest.java b/ksql-engine/src/test/java/io/confluent/ksql/planner/PlanSourceExtractorVisitorTest.java index bd2eac805de6..4f6a73fa75d0 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/planner/PlanSourceExtractorVisitorTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/planner/PlanSourceExtractorVisitorTest.java @@ -22,7 +22,10 @@ import io.confluent.ksql.metastore.MetaStore; import io.confluent.ksql.planner.plan.PlanNode; import io.confluent.ksql.testutils.AnalysisTestUtil; +import io.confluent.ksql.util.KsqlConfig; import io.confluent.ksql.util.MetaStoreFixture; + +import java.util.Collections; import java.util.Set; import org.apache.kafka.common.utils.Utils; import org.junit.Before; @@ -31,10 +34,12 @@ public class PlanSourceExtractorVisitorTest { private MetaStore metaStore; + private KsqlConfig ksqlConfig; @Before public void init() { metaStore = MetaStoreFixture.getNewMetaStore(new InternalFunctionRegistry()); + ksqlConfig = new KsqlConfig(Collections.emptyMap()); } @Test @@ -61,6 +66,6 @@ public void shouldExtractCorrectSourceForJoinQuery() { } private PlanNode buildLogicalPlan(final String query) { - return AnalysisTestUtil.buildLogicalPlan(query, metaStore); + return AnalysisTestUtil.buildLogicalPlan(ksqlConfig, query, metaStore); } } diff --git a/ksql-engine/src/test/java/io/confluent/ksql/planner/plan/AggregateNodeTest.java b/ksql-engine/src/test/java/io/confluent/ksql/planner/plan/AggregateNodeTest.java index 3acb2e55e88a..5abc99d366b6 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/planner/plan/AggregateNodeTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/planner/plan/AggregateNodeTest.java @@ -395,7 +395,7 @@ private SchemaKStream buildQuery(final AggregateNode aggregateNode, final KsqlCo private static AggregateNode buildAggregateNode(final String queryString) { final MetaStore newMetaStore = MetaStoreFixture.getNewMetaStore(new InternalFunctionRegistry()); final KsqlBareOutputNode planNode = (KsqlBareOutputNode) AnalysisTestUtil - .buildLogicalPlan(queryString, newMetaStore); + .buildLogicalPlan(KSQL_CONFIG, queryString, newMetaStore); return (AggregateNode) planNode.getSource(); } diff --git a/ksql-engine/src/test/java/io/confluent/ksql/planner/plan/JoinNodeTest.java b/ksql-engine/src/test/java/io/confluent/ksql/planner/plan/JoinNodeTest.java index 718cb39ef090..f94be8207b43 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/planner/plan/JoinNodeTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/planner/plan/JoinNodeTest.java @@ -1069,7 +1069,7 @@ private void buildJoinNode(final String queryString) { final MetaStore metaStore = MetaStoreFixture.getNewMetaStore(new InternalFunctionRegistry()); final KsqlBareOutputNode planNode = - (KsqlBareOutputNode) AnalysisTestUtil.buildLogicalPlan(queryString, metaStore); + (KsqlBareOutputNode) AnalysisTestUtil.buildLogicalPlan(ksqlConfig, queryString, metaStore); joinNode = (JoinNode) ((ProjectNode) planNode.getSource()).getSource(); } diff --git a/ksql-engine/src/test/java/io/confluent/ksql/planner/plan/KsqlBareOutputNodeTest.java b/ksql-engine/src/test/java/io/confluent/ksql/planner/plan/KsqlBareOutputNodeTest.java index b236815ea4ef..e90bd902a78e 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/planner/plan/KsqlBareOutputNodeTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/planner/plan/KsqlBareOutputNodeTest.java @@ -68,6 +68,7 @@ public class KsqlBareOutputNodeTest { private StreamsBuilder builder; private final MetaStore metaStore = MetaStoreFixture.getNewMetaStore(new InternalFunctionRegistry()); private final QueryId queryId = new QueryId("output-test"); + private final KsqlConfig ksqlConfig = new KsqlConfig(Collections.emptyMap()); @Mock private KsqlQueryBuilder ksqlStreamBuilder; @@ -87,7 +88,7 @@ public void before() { .push(inv.getArgument(0).toString())); final KsqlBareOutputNode planNode = (KsqlBareOutputNode) AnalysisTestUtil - .buildLogicalPlan(SIMPLE_SELECT_WITH_FILTER, metaStore); + .buildLogicalPlan(ksqlConfig, SIMPLE_SELECT_WITH_FILTER, metaStore); stream = planNode.buildStream(ksqlStreamBuilder); } @@ -134,7 +135,7 @@ public void shouldComputeQueryIdCorrectly() { // Given: final KsqlBareOutputNode node = (KsqlBareOutputNode) AnalysisTestUtil - .buildLogicalPlan("select col0 from test1;", metaStore); + .buildLogicalPlan(ksqlConfig, "select col0 from test1;", metaStore); final QueryIdGenerator queryIdGenerator = mock(QueryIdGenerator.class); // When: diff --git a/ksql-engine/src/test/java/io/confluent/ksql/structured/SchemaKGroupedTableTest.java b/ksql-engine/src/test/java/io/confluent/ksql/structured/SchemaKGroupedTableTest.java index 03cfc6ffc509..1dd26bd9e5c3 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/structured/SchemaKGroupedTableTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/structured/SchemaKGroupedTableTest.java @@ -111,7 +111,7 @@ private SchemaKGroupedTable buildSchemaKGroupedTableFromQuery( final String query, final String...groupByColumns ) { - final PlanNode logicalPlan = AnalysisTestUtil.buildLogicalPlan(query, metaStore); + final PlanNode logicalPlan = AnalysisTestUtil.buildLogicalPlan(ksqlConfig, query, metaStore); final SchemaKTable initialSchemaKTable = new SchemaKTable<>( logicalPlan.getTheSourceNode().getSchema(), kTable, diff --git a/ksql-engine/src/test/java/io/confluent/ksql/structured/SchemaKStreamTest.java b/ksql-engine/src/test/java/io/confluent/ksql/structured/SchemaKStreamTest.java index 32dadd2cb0e1..c9669d2e4991 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/structured/SchemaKStreamTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/structured/SchemaKStreamTest.java @@ -922,7 +922,11 @@ private static KsqlSchema getJoinSchema( } private PlanNode givenInitialKStreamOf(final String selectQuery) { - final PlanNode logicalPlan = AnalysisTestUtil.buildLogicalPlan(selectQuery, metaStore); + final PlanNode logicalPlan = AnalysisTestUtil.buildLogicalPlan( + ksqlConfig, + selectQuery, + metaStore + ); initialSchemaKStream = new SchemaKStream( logicalPlan.getTheSourceNode().getSchema(), diff --git a/ksql-engine/src/test/java/io/confluent/ksql/structured/SchemaKTableTest.java b/ksql-engine/src/test/java/io/confluent/ksql/structured/SchemaKTableTest.java index e6979d603ecf..e6d6780f8b1b 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/structured/SchemaKTableTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/structured/SchemaKTableTest.java @@ -633,7 +633,11 @@ private static KsqlSchema getJoinSchema(final KsqlSchema leftSchema, } private List givenInitialKTableOf(final String selectQuery) { - final PlanNode logicalPlan = AnalysisTestUtil.buildLogicalPlan(selectQuery, metaStore); + final PlanNode logicalPlan = AnalysisTestUtil.buildLogicalPlan( + ksqlConfig, + selectQuery, + metaStore + ); initialSchemaKTable = new SchemaKTable<>( logicalPlan.getTheSourceNode().getSchema(), @@ -659,6 +663,6 @@ private List givenInitialKTableOf(final String selectQuery) { } private PlanNode buildLogicalPlan(final String query) { - return AnalysisTestUtil.buildLogicalPlan(query, metaStore); + return AnalysisTestUtil.buildLogicalPlan(ksqlConfig, query, metaStore); } } diff --git a/ksql-engine/src/test/java/io/confluent/ksql/structured/SelectValueMapperTest.java b/ksql-engine/src/test/java/io/confluent/ksql/structured/SelectValueMapperTest.java index f68cd09e9007..7fa9ffeb2855 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/structured/SelectValueMapperTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/structured/SelectValueMapperTest.java @@ -137,7 +137,7 @@ public void shouldWriteProcessingLogOnError() { } private SelectValueMapper givenSelectMapperFor(final String query) { - final PlanNode planNode = AnalysisTestUtil.buildLogicalPlan(query, metaStore); + final PlanNode planNode = AnalysisTestUtil.buildLogicalPlan(ksqlConfig, query, metaStore); final ProjectNode projectNode = (ProjectNode) planNode.getSources().get(0); final KsqlSchema schema = planNode.getTheSourceNode().getSchema(); final List selectExpressions = projectNode.getProjectSelectExpressions(); diff --git a/ksql-engine/src/test/java/io/confluent/ksql/structured/SqlPredicateTest.java b/ksql-engine/src/test/java/io/confluent/ksql/structured/SqlPredicateTest.java index ec1352fe060e..9f6c4776697b 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/structured/SqlPredicateTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/structured/SqlPredicateTest.java @@ -136,7 +136,11 @@ public void shouldWriteProcessingLogOnError() { } private SqlPredicate givenSqlPredicateFor(final String statement) { - final PlanNode logicalPlan = AnalysisTestUtil.buildLogicalPlan(statement, metaStore); + final PlanNode logicalPlan = AnalysisTestUtil.buildLogicalPlan( + ksqlConfig, + statement, + metaStore + ); final FilterNode filterNode = (FilterNode) logicalPlan.getSources().get(0).getSources().get(0); return new SqlPredicate( filterNode.getPredicate(), diff --git a/ksql-engine/src/test/java/io/confluent/ksql/testutils/AnalysisTestUtil.java b/ksql-engine/src/test/java/io/confluent/ksql/testutils/AnalysisTestUtil.java index 07c839b9ab57..2e4eff01b717 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/testutils/AnalysisTestUtil.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/testutils/AnalysisTestUtil.java @@ -30,6 +30,8 @@ import io.confluent.ksql.parser.tree.Statement; import io.confluent.ksql.planner.LogicalPlanner; import io.confluent.ksql.planner.plan.OutputNode; +import io.confluent.ksql.util.KsqlConfig; + import java.util.List; import java.util.Optional; @@ -42,10 +44,15 @@ public static Analysis analyzeQuery(final String queryStr, final MetaStore metaS return new Analyzer(queryStr, metaStore).analysis; } - public static OutputNode buildLogicalPlan(final String queryStr, final MetaStore metaStore) { + public static OutputNode buildLogicalPlan( + final KsqlConfig ksqlConfig, + final String queryStr, + final MetaStore metaStore + ) { final Analyzer analyzer = new Analyzer(queryStr, metaStore); final LogicalPlanner logicalPlanner = new LogicalPlanner( + ksqlConfig, analyzer.analysis, analyzer.aggregateAnalys(), metaStore); diff --git a/ksql-functional-tests/src/test/resources/query-validation-tests/timestamp-extractor.json b/ksql-functional-tests/src/test/resources/query-validation-tests/timestamp-extractor.json new file mode 100644 index 000000000000..020d4a33d748 --- /dev/null +++ b/ksql-functional-tests/src/test/resources/query-validation-tests/timestamp-extractor.json @@ -0,0 +1,44 @@ +{ + "comments": [ + "Tests to verify override of default.timestamp.extractor on streams" + ], + "tests": [ + { + "name": "KSQL default timestamp extractor", + "statements": [ + "CREATE STREAM TEST (ID bigint) WITH (kafka_topic='test_topic', value_format='JSON');", + "CREATE STREAM TS AS SELECT id FROM test;" + ], + "inputs": [ + {"topic": "test_topic", "value": {"ID": 1}, "timestamp": 1526075913000}, + {"topic": "test_topic", "value": {"ID": 2}, "timestamp": 1557611913000}, + {"topic": "test_topic", "value": {"ID": 3}, "timestamp": 1589234313000} + ], + "outputs": [ + {"topic": "TS", "value": {"ID": 1}, "timestamp": 1526075913000}, + {"topic": "TS", "value": {"ID": 2}, "timestamp": 1557611913000}, + {"topic": "TS", "value": {"ID": 3}, "timestamp": 1589234313000} + ] + }, + { + "name": "KSQL override timestamp extractor", + "statements": [ + "CREATE STREAM TEST (ID bigint) WITH (kafka_topic='test_topic', value_format='JSON');", + "CREATE STREAM TS AS SELECT id FROM test;" + ], + "properties": { + "ksql.streams.default.timestamp.extractor": "org.apache.kafka.streams.processor.UsePreviousTimeOnInvalidTimestamp" + }, + "inputs": [ + {"topic": "test_topic", "value": {"ID": 1}, "timestamp": 1526075913000}, + {"topic": "test_topic", "value": {"ID": 2}, "timestamp": -1}, + {"topic": "test_topic", "value": {"ID": 3}, "timestamp": 1589234313000} + ], + "outputs": [ + {"topic": "TS", "value": {"ID": 1}, "timestamp": 1526075913000}, + {"topic": "TS", "value": {"ID": 2}, "timestamp": 1526075913000}, + {"topic": "TS", "value": {"ID": 3}, "timestamp": 1589234313000} + ] + } + ] +} \ No newline at end of file diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/entity/KsqlRequest.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/entity/KsqlRequest.java index dbf1a797c3d5..606f8ec2ec4c 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/entity/KsqlRequest.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/entity/KsqlRequest.java @@ -27,6 +27,7 @@ import java.util.Map; import java.util.Objects; import java.util.Optional; +import java.util.stream.Collectors; @JsonIgnoreProperties(ignoreUnknown = true) @JsonSubTypes({}) @@ -46,7 +47,7 @@ public KsqlRequest( this.ksql = ksql == null ? "" : ksql; this.streamsProperties = streamsProperties == null ? Collections.emptyMap() - : Collections.unmodifiableMap(new HashMap<>(streamsProperties)); + : Collections.unmodifiableMap(new HashMap<>(serializeClassValues(streamsProperties))); this.commandSequenceNumber = Optional.ofNullable(commandSequenceNumber); } @@ -83,6 +84,24 @@ public int hashCode() { return Objects.hash(ksql, streamsProperties, commandSequenceNumber); } + /** + * Converts all Class references values to their canonical String value. + *

+ * This conversion avoids the JsonMappingException error thrown by Jackson when attempting + * to serialize the class properties prior to send this KsqlRequest object as part of the HTTP + * request. The error thrown by Jackson is "Class ... not be found". + */ + private Map serializeClassValues(final Map properties) { + return properties.entrySet().stream() + .collect(Collectors.toMap(Map.Entry::getKey, kv -> { + if (kv.getValue() instanceof Class) { + return ((Class)kv.getValue()).getCanonicalName(); + } + + return kv.getValue(); + })); + } + private static Map coerceTypes(final Map streamsProperties) { if (streamsProperties == null) { return Collections.emptyMap(); diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java index 8105f08a1d31..6c80edd09134 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java @@ -286,6 +286,7 @@ private void initialize() { .put(DdlConfig.TOPIC_NAME_PROPERTY, new StringLiteral(COMMANDS_KSQL_TOPIC_NAME)) .build() ), + ksqlConfig, serviceContext.getTopicClient() )); diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/entity/KsqlRequestTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/entity/KsqlRequestTest.java index 784bbcbb374b..d5a8a16d97d5 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/entity/KsqlRequestTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/entity/KsqlRequestTest.java @@ -31,6 +31,8 @@ import java.util.Map; import java.util.Optional; import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.processor.TimestampExtractor; import org.junit.Rule; import org.junit.Test; import org.junit.rules.ExpectedException; @@ -42,23 +44,30 @@ public class KsqlRequestTest { private static final String A_JSON_REQUEST = "{" + "\"ksql\":\"sql\"," + "\"streamsProperties\":{" - + "\"" + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG + "\":\"earliest\"" + + "\"" + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG + "\":\"earliest\"," + + "\"" + StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG + "\":\"" + + TimestampExtractor.class.getCanonicalName() + "\"" + "}}"; private static final String A_JSON_REQUEST_WITH_COMMAND_NUMBER = "{" + "\"ksql\":\"sql\"," + "\"streamsProperties\":{" - + "\"" + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG + "\":\"earliest\"" + + "\"" + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG + "\":\"earliest\"," + + "\"" + StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG + "\":\"" + + TimestampExtractor.class.getCanonicalName() + "\"" + "}," + "\"commandSequenceNumber\":2}"; private static final String A_JSON_REQUEST_WITH_NULL_COMMAND_NUMBER = "{" + "\"ksql\":\"sql\"," + "\"streamsProperties\":{" - + "\"" + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG + "\":\"earliest\"" + + "\"" + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG + "\":\"earliest\"," + + "\"" + StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG + "\":\"" + + TimestampExtractor.class.getCanonicalName() + "\"" + "}," + "\"commandSequenceNumber\":null}"; private static final ImmutableMap SOME_PROPS = ImmutableMap.of( - ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest" + ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest", + StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, TimestampExtractor.class ); private static final long SOME_COMMAND_NUMBER = 2L;