Skip to content

Commit

Permalink
fix: default timestamp extractor override is not working (#3176)
Browse files Browse the repository at this point in the history
The use of ksql.streams.default.timestamp.extractor when creating a stream/table is not working. The new value is persisted in the command topic, but KSQL always use a default FailOnInvalidTimestamp

This patch fixes KSQL so it honors the new default specified
  • Loading branch information
spena committed Aug 8, 2019
1 parent e42c2ca commit d1db07b
Show file tree
Hide file tree
Showing 33 changed files with 354 additions and 62 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,30 +17,40 @@

import com.fasterxml.jackson.annotation.JsonCreator;
import com.google.errorprone.annotations.Immutable;
import java.util.Objects;
import org.apache.kafka.streams.processor.FailOnInvalidTimestamp;
import org.apache.kafka.streams.processor.TimestampExtractor;

@Immutable
public class MetadataTimestampExtractionPolicy implements TimestampExtractionPolicy {
private final TimestampExtractor timestampExtractor;

@JsonCreator
public MetadataTimestampExtractionPolicy(){}
public MetadataTimestampExtractionPolicy() {
this(new FailOnInvalidTimestamp());
}

public MetadataTimestampExtractionPolicy(final TimestampExtractor timestampExtractor) {
this.timestampExtractor = timestampExtractor;
}

@Override
public TimestampExtractor create(final int columnIndex) {
return new FailOnInvalidTimestamp();
return timestampExtractor;
}

@Override
public int hashCode() {
return this.getClass().hashCode();
return Objects.hash(this.getClass(), timestampExtractor.getClass());
}

@Override
public boolean equals(final Object other) {
if (this == other) {
return true;
if (!(other instanceof MetadataTimestampExtractionPolicy)) {
return false;
}
return other instanceof MetadataTimestampExtractionPolicy;

final MetadataTimestampExtractionPolicy that = (MetadataTimestampExtractionPolicy)other;
return timestampExtractor.getClass() == that.timestampExtractor.getClass();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,23 +17,29 @@

import io.confluent.ksql.ddl.DdlConfig;
import io.confluent.ksql.schema.ksql.KsqlSchema;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlException;

import java.util.Optional;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.processor.FailOnInvalidTimestamp;
import org.apache.kafka.streams.processor.TimestampExtractor;

public final class TimestampExtractionPolicyFactory {

private TimestampExtractionPolicyFactory() {
}

public static TimestampExtractionPolicy create(
final KsqlConfig ksqlConfig,
final KsqlSchema schema,
final Optional<String> timestampColumnName,
final Optional<String> timestampFormat
) {
if (!timestampColumnName.isPresent()) {
return new MetadataTimestampExtractionPolicy();
return new MetadataTimestampExtractionPolicy(getDefaultTimestampExtractor(ksqlConfig));
}

final String fieldName = timestampColumnName.get().toUpperCase();
Expand Down Expand Up @@ -71,4 +77,17 @@ public static TimestampExtractionPolicy create(
+ " specified");
}

private static TimestampExtractor getDefaultTimestampExtractor(final KsqlConfig ksqlConfig) {
try {
final Class<?> timestampExtractorClass = (Class<?>) ksqlConfig.getKsqlStreamConfigProps()
.getOrDefault(
StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
FailOnInvalidTimestamp.class
);

return (TimestampExtractor) timestampExtractorClass.newInstance();
} catch (final Exception e) {
throw new KsqlException("Cannot override default timestamp extractor: " + e.getMessage(), e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package io.confluent.ksql.util.timestamp;

import com.google.common.testing.EqualsTester;
import org.apache.kafka.streams.processor.UsePreviousTimeOnInvalidTimestamp;
import org.junit.Test;

public class MetadataTimestampExtractionPolicyTest {
Expand All @@ -25,6 +26,10 @@ public void shouldTestEqualityCorrectly() {
.addEqualityGroup(
new MetadataTimestampExtractionPolicy(),
new MetadataTimestampExtractionPolicy())
.addEqualityGroup(
new MetadataTimestampExtractionPolicy(new UsePreviousTimeOnInvalidTimestamp()),
new MetadataTimestampExtractionPolicy(new UsePreviousTimeOnInvalidTimestamp())
)
.testEquals();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,27 +19,114 @@
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.core.IsInstanceOf.instanceOf;

import com.google.common.collect.ImmutableMap;
import io.confluent.ksql.schema.ksql.KsqlSchema;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlException;
import java.util.Collections;
import java.util.Optional;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.processor.FailOnInvalidTimestamp;
import org.apache.kafka.streams.processor.UsePreviousTimeOnInvalidTimestamp;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;

public class TimestampExtractionPolicyFactoryTest {


private final SchemaBuilder schemaBuilder = SchemaBuilder.struct()
.field("id", Schema.OPTIONAL_INT64_SCHEMA);

private KsqlConfig ksqlConfig;

@Rule
public final ExpectedException expectedException = ExpectedException.none();

@Before
public void setup() {
ksqlConfig = new KsqlConfig(Collections.emptyMap());
}

@Test
public void shouldCreateMetadataPolicyWhenTimestampFieldNotProvided() {
// When:
final TimestampExtractionPolicy result = TimestampExtractionPolicyFactory
.create(KsqlSchema.of(schemaBuilder.build()), Optional.empty(), Optional.empty());
.create(
ksqlConfig,
KsqlSchema.of(schemaBuilder.build()),
Optional.empty(),
Optional.empty()
);

// Then:
assertThat(result, instanceOf(MetadataTimestampExtractionPolicy.class));
assertThat(result.create(0), instanceOf(FailOnInvalidTimestamp.class));
}

@Test
public void shouldThrowIfTimestampExtractorConfigIsInvalidClass() {
// Given:
final KsqlConfig ksqlConfig = new KsqlConfig(ImmutableMap.of(
StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
this.getClass()
));

// Then:
expectedException.expect(KsqlException.class);
expectedException.expectMessage(
"cannot be cast to org.apache.kafka.streams.processor.TimestampExtractor");

// When:
TimestampExtractionPolicyFactory
.create(
ksqlConfig,
KsqlSchema.of(schemaBuilder.build()),
Optional.empty(),
Optional.empty()
);
}

@Test
public void shouldCreateMetadataPolicyWithDefaultFailedOnInvalidTimestamp() {
// When:
final TimestampExtractionPolicy result = TimestampExtractionPolicyFactory
.create(
ksqlConfig,
KsqlSchema.of(schemaBuilder.build()),
Optional.empty(),
Optional.empty()
);

// Then:
assertThat(result, instanceOf(MetadataTimestampExtractionPolicy.class));
assertThat(result.create(0), instanceOf(FailOnInvalidTimestamp.class));
}

@Test
public void shouldCreateMetadataPolicyWithConfiguredUsePreviousTimeOnInvalidTimestamp() {
// Given:
final KsqlConfig ksqlConfig = new KsqlConfig(ImmutableMap.of(
StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG,
UsePreviousTimeOnInvalidTimestamp.class
));

// When:
final TimestampExtractionPolicy result = TimestampExtractionPolicyFactory
.create(
ksqlConfig,
KsqlSchema.of(schemaBuilder.build()),
Optional.empty(),
Optional.empty()
);

// Then:
assertThat(result, instanceOf(MetadataTimestampExtractionPolicy.class));
assertThat(result.create(0), instanceOf(UsePreviousTimeOnInvalidTimestamp.class));
}

@Test
Expand All @@ -52,17 +139,26 @@ public void shouldCreateLongTimestampPolicyWhenTimestampFieldIsOfTypeLong() {

// When:
final TimestampExtractionPolicy result = TimestampExtractionPolicyFactory
.create(KsqlSchema.of(schema), Optional.of(timestamp), Optional.empty());
.create(ksqlConfig, KsqlSchema.of(schema), Optional.of(timestamp), Optional.empty());

// Then:
assertThat(result, instanceOf(LongColumnTimestampExtractionPolicy.class));
assertThat(result.timestampField(), equalTo(timestamp.toUpperCase()));
}

@Test(expected = KsqlException.class)
@Test
public void shouldFailIfCantFindTimestampField() {
// Then:
expectedException.expect(KsqlException.class);

// When:
TimestampExtractionPolicyFactory
.create(KsqlSchema.of(schemaBuilder.build()), Optional.of("whateva"), Optional.empty());
.create(
ksqlConfig,
KsqlSchema.of(schemaBuilder.build()),
Optional.of("whateva"),
Optional.empty()
);
}

@Test
Expand All @@ -75,49 +171,58 @@ public void shouldCreateStringTimestampPolicyWhenTimestampFieldIsStringTypeAndFo

// When:
final TimestampExtractionPolicy result = TimestampExtractionPolicyFactory
.create(KsqlSchema.of(schema), Optional.of(field), Optional.of("yyyy-MM-DD"));
.create(ksqlConfig, KsqlSchema.of(schema), Optional.of(field), Optional.of("yyyy-MM-DD"));

// Then:
assertThat(result, instanceOf(StringTimestampExtractionPolicy.class));
assertThat(result.timestampField(), equalTo(field.toUpperCase()));
}

@Test(expected = KsqlException.class)
@Test
public void shouldFailIfStringTimestampTypeAndFormatNotSupplied() {
// Given:
final String field = "my_string_field";
final Schema schema = schemaBuilder
.field(field.toUpperCase(), Schema.OPTIONAL_STRING_SCHEMA)
.build();

// Then:
expectedException.expect(KsqlException.class);

// When:
TimestampExtractionPolicyFactory
.create(KsqlSchema.of(schema), Optional.of(field), Optional.empty());
.create(ksqlConfig, KsqlSchema.of(schema), Optional.of(field), Optional.empty());
}

@Test(expected = KsqlException.class)
@Test
public void shouldThorwIfLongTimestampTypeAndFormatIsSupplied() {
// Given:
final String timestamp = "timestamp";
final Schema schema = schemaBuilder
.field(timestamp.toUpperCase(), Schema.OPTIONAL_INT64_SCHEMA)
.build();

// Then:
expectedException.expect(KsqlException.class);

// When:
TimestampExtractionPolicyFactory
.create(KsqlSchema.of(schema), Optional.of(timestamp), Optional.of("b"));
.create(ksqlConfig, KsqlSchema.of(schema), Optional.of(timestamp), Optional.of("b"));
}

@Test(expected = KsqlException.class)
@Test
public void shouldThrowIfTimestampFieldTypeIsNotLongOrString() {
// Given:
final String field = "blah";
final Schema schema = schemaBuilder
.field(field.toUpperCase(), Schema.OPTIONAL_FLOAT64_SCHEMA)
.build();

// Then:
expectedException.expect(KsqlException.class);

// When:
TimestampExtractionPolicyFactory
.create(KsqlSchema.of(schema), Optional.of(field), Optional.empty());
.create(ksqlConfig, KsqlSchema.of(schema), Optional.of(field), Optional.empty());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import io.confluent.ksql.services.ServiceContext;
import io.confluent.ksql.util.HandlerMaps;
import io.confluent.ksql.util.HandlerMaps.ClassHandlerMapR2;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlException;
import java.util.Map;
import java.util.Objects;
Expand Down Expand Up @@ -62,6 +63,7 @@ public CommandFactories(final ServiceContext serviceContext) {
public DdlCommand create(
final String sqlExpression,
final DdlStatement ddlStatement,
final KsqlConfig ksqlConfig,
final Map<String, Object> properties
) {
return FACTORIES
Expand All @@ -75,7 +77,7 @@ public DdlCommand create(
})
.handle(
this,
new CallInfo(sqlExpression, properties),
new CallInfo(sqlExpression, ksqlConfig, properties),
ddlStatement);
}

Expand All @@ -90,6 +92,7 @@ private CreateStreamCommand handleCreateStream(
return new CreateStreamCommand(
callInfo.sqlExpression,
statement,
callInfo.ksqlConfig,
serviceContext.getTopicClient());
}

Expand All @@ -100,6 +103,7 @@ private CreateTableCommand handleCreateTable(
return new CreateTableCommand(
callInfo.sqlExpression,
statement,
callInfo.ksqlConfig,
serviceContext.getTopicClient());
}

Expand Down Expand Up @@ -140,14 +144,18 @@ private UnsetPropertyCommand handleUnsetProperty(
private static final class CallInfo {

final String sqlExpression;
final KsqlConfig ksqlConfig;
final Map<String, Object> properties;

private CallInfo(
final String sqlExpression,
final KsqlConfig ksqlConfig,
final Map<String, Object> properties
) {
this.sqlExpression = sqlExpression;
this.properties = properties;
this.sqlExpression = Objects.requireNonNull(sqlExpression, "sqlExpression");
this.properties = Objects.requireNonNull(properties, "properties");
this.ksqlConfig = Objects.requireNonNull(ksqlConfig, "ksqlConfig")
.cloneWithPropertyOverwrite(properties);
}
}
}
Loading

0 comments on commit d1db07b

Please sign in to comment.