Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: default timestamp extractor override is not working #3176

Merged
merged 7 commits into from
Aug 8, 2019

Conversation

spena
Copy link
Member

@spena spena commented Aug 6, 2019

Description

The use of ksql.streams.default.timestamp.extractor when creating a stream/table is not working. The new value is persisted in the command topic, but KSQL always use a default FailOnInvalidTimestamp

This patch fixes KSQL so it honors the new default specified.

Testing done

  • Added unit tests.

  • Verified a stream created with 'ksql.streams.default.timestamp.extractor'='org.apache.kafka.streams.processor.WallclockTimestampExtractor' results in queries displaying the current Wallclock time.

  • Verified a stream created with no timestamp extractor uses the default FailedOnInvalidTimestamp, which displays the timestamp it was used during the insertion of the row.

To test it, I had to call the REST api instead of the CLI because I couldn't make the CLI to recognize a different timestamp extractor:

curl -X POST http://localhost:8088/ksql -H 'content-type: application/vnd.ksql.v1+json; charset=utf-8' -d "{\"ksql\":\"create stream t1 (id int, name string) with (kafka_topic='t1', value_format='json');\", \"streamsProperties\": { \"ksql.streams.default.timestamp.extractor\": \"org.apache.kafka.streams.processor.WallclockTimestampExtractor\" }}"

Reviewer checklist

  • Ensure docs are updated if necessary. (eg. if a user visible feature is being added or changed).
  • Ensure relevant issues are linked (description should include text like "Fixes #")

@spena spena added the bug label Aug 6, 2019
@spena spena added this to the 5.4 milestone Aug 6, 2019
@spena spena requested review from hjafarpour and a team August 6, 2019 14:04
@spena spena self-assigned this Aug 6, 2019
Copy link
Contributor

@agavra agavra left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall this LGTM, but I'm trying to understand why doesn't it work in the CLI?


return (TimestampExtractor) defaultTimestampExtractor.newInstance();
} catch (final Exception e) {
// TODO: Need to log invalid timestamp configuration here?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not violently fail (e.g. raise exception) here? I think I'd rather have my command fail than do something which wasn't what i defined

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I forgot to remove that line. I left it as a reminder for me to do something there before submitting the PR.

Anyway, you're right, I should fail instead of just logging the error. I submitted another PR (this time for 5.3.x branch instead of master) which fails in that case.

@@ -230,6 +233,51 @@ public void shouldBuildSerdeOptions() {
assertThat(cmd.getSerdeOptions(), is(SOME_SERDE_OPTIONS));
}

@Test
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we add a QTT for this? we should be able to pass it in the property overrides for a command

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just added one for this.

@agavra agavra requested a review from a team August 6, 2019 15:47
@spena spena force-pushed the override_extraction_policy branch from 7970a25 to 4f2d7d5 Compare August 6, 2019 18:43
@spena spena changed the base branch from master to 5.3.x August 6, 2019 18:43
@spena spena force-pushed the override_extraction_policy branch from 4f2d7d5 to ae653d6 Compare August 6, 2019 18:47
Copy link
Contributor

@agavra agavra left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @spena! LGTM

@agavra agavra requested a review from a team August 6, 2019 20:05
@blueedgenick
Copy link
Contributor

seems very strange to me also that this doesn't work using SET from the CLI. Arguably the fix is incomplete if this doesn't work - is there at least another issue opened up to track that ?

@hjafarpour
Copy link
Contributor

I agree with @blueedgenick, this should work from CLI too. The problem with SET in CLI is that we validate the config properties in the CLI side before sending the requests to the server and the validation for default.timestamp.extractor property uses Class.forName and if you don't have the class in the class path in client side this will through exception.
You can either make sure that the streams classes are available in the CLI side or not validate this config property in the CLI side.

@spena
Copy link
Member Author

spena commented Aug 7, 2019

@agavra I submitted another commit to fix the issue when the CLI fails sending a REST request that contains a Class value in the stream properties field.

@spena
Copy link
Member Author

spena commented Aug 7, 2019

@hjafarpour I decided to convert the Class values of the stream properties to String values. For some reason, Jackson does not detect the class in the classpath when creating the JSON object. The jar is in the classpath, in fact, the Class is available in the streams properties as Class. But when doing the JSON conversion, Jackson fails.

The code is working now. If you're fine with it, then I can merge it later.

Copy link
Contributor

@hjafarpour hjafarpour left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @spena
Added a few comments.

@@ -31,9 +37,23 @@ public static TimestampExtractionPolicy create(
final KsqlSchema schema,
final Optional<String> timestampColumnName,
final Optional<String> timestampFormat
) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was used only in one place, LogicalPlanner. Why not remove this method and just use the next one?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll do that, thanks.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

assertThat(result.create(0), instanceOf(FailOnInvalidTimestamp.class));
}

@Test(expected = ConfigException.class)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not use ExpectedException so you can also check the correctness of the error messages?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I followed how other tests cases of this class verify error exceptions.
Btw, What is our preference? Should I modify the other tests cases with the ExpectdException?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ExpectdException would be preferred. Check KsqlRequestTest for examples.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

);
}

@Test(expected = KsqlException.class)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto here, use ExpectedException

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@@ -46,7 +47,7 @@ public KsqlRequest(
this.ksql = ksql == null ? "" : ksql;
this.streamsProperties = streamsProperties == null
? Collections.emptyMap()
: Collections.unmodifiableMap(new HashMap<>(streamsProperties));
: Collections.unmodifiableMap(new HashMap<>(serializeClassValues(streamsProperties)));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be good to add a test for this.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@spena spena force-pushed the override_extraction_policy branch from 5b72f5f to 598c27a Compare August 8, 2019 14:25
Copy link
Contributor

@hjafarpour hjafarpour left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM.

@spena spena merged commit d1db07b into confluentinc:5.3.x Aug 8, 2019
@spena spena deleted the override_extraction_policy branch August 8, 2019 19:14
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants