Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: missing topic classifier now uses MissingSourceTopicException #6172

Merged
merged 1 commit into from
Sep 10, 2020

Conversation

agavra
Copy link
Contributor

@agavra agavra commented Sep 9, 2020

Description

Improves the MissingTopicClassifier to leverage the new Kafka Streams exception that is thrown instead of making a call out to kafka. This is more reliable, as kafka state sometimes believes topics have not been deleted when they actually have been.

Testing done

Unit testing, and manual testing.

Reviewer checklist

  • Ensure docs are updated if necessary. (eg. if a user visible feature is being added or changed).
  • Ensure relevant issues are linked (description should include text like "Fixes #")

@agavra agavra requested a review from a team as a code owner September 9, 2020 23:44
final Set<String> usedTopics = new HashSet<>();
for (final Subtopology subtopology : topology.describe().subtopologies()) {
for (final Node node : subtopology.nodes()) {
if (node instanceof Source) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume the new exception is thrown for both a missing source topic and sink topic, replicating this behavior. Is that right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for pointing this out, I thought about this and just assumed it did but now that I checked it... It seems like it does not actually capture missing sink topics :( cc @ableegoldman

I created a stream A and then B that was just SELECT * FROM A. Then I deleted topic B and waited:

[2020-09-09 17:34:24,536] ERROR Unhandled exception caught in streams thread _confluent-ksql-default_query_CSAS_B
_0-3ebb4954-2cfb-40b0-b703-ae2828d3aa0a-StreamThread-2. (io.confluent.ksql.util.QueryMetadata:158)
org.apache.kafka.streams.errors.StreamsException: Error encountered sending record to topic B for task 0_0 due to
:
org.apache.kafka.common.errors.TimeoutException: Topic B not present in metadata after 60000 ms.

This PR is probably still an improvement over what was there beforehand, which seemed to just not work (at least in the confluent cloud environment with multiple brokers). I'm going to test to see what happens if a repartition topic is deleted.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's only for missing source topics specifically. It doesn't apply to repartition topics either; that's trickier, how do we tell the difference between a deleted repartition topic and a repartition topic that we just need to create? There's followup work planned for all that, but this change was just a quick and easy fix for the user input topic case so we could make some progress

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @ableegoldman - I'll go ahead and merge this and we can improve it as we go on.

@agavra agavra merged commit cf8e15d into confluentinc:master Sep 10, 2020
@agavra agavra deleted the missing_topics branch September 10, 2020 01:21
AlanConfluent added a commit that referenced this pull request Sep 11, 2020
AlanConfluent added a commit that referenced this pull request Sep 16, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants