-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: missing topic classifier now uses MissingSourceTopicException #6172
Conversation
final Set<String> usedTopics = new HashSet<>(); | ||
for (final Subtopology subtopology : topology.describe().subtopologies()) { | ||
for (final Node node : subtopology.nodes()) { | ||
if (node instanceof Source) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I assume the new exception is thrown for both a missing source topic and sink topic, replicating this behavior. Is that right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for pointing this out, I thought about this and just assumed it did but now that I checked it... It seems like it does not actually capture missing sink topics :( cc @ableegoldman
I created a stream A
and then B
that was just SELECT * FROM A
. Then I deleted topic B
and waited:
[2020-09-09 17:34:24,536] ERROR Unhandled exception caught in streams thread _confluent-ksql-default_query_CSAS_B
_0-3ebb4954-2cfb-40b0-b703-ae2828d3aa0a-StreamThread-2. (io.confluent.ksql.util.QueryMetadata:158)
org.apache.kafka.streams.errors.StreamsException: Error encountered sending record to topic B for task 0_0 due to
:
org.apache.kafka.common.errors.TimeoutException: Topic B not present in metadata after 60000 ms.
This PR is probably still an improvement over what was there beforehand, which seemed to just not work (at least in the confluent cloud environment with multiple brokers). I'm going to test to see what happens if a repartition topic is deleted.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's only for missing source topics specifically. It doesn't apply to repartition topics either; that's trickier, how do we tell the difference between a deleted repartition topic and a repartition topic that we just need to create? There's followup work planned for all that, but this change was just a quick and easy fix for the user input topic case so we could make some progress
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @ableegoldman - I'll go ahead and merge this and we can improve it as we go on.
Description
Improves the
MissingTopicClassifier
to leverage the new Kafka Streams exception that is thrown instead of making a call out to kafka. This is more reliable, as kafka state sometimes believes topics have not been deleted when they actually have been.Testing done
Unit testing, and manual testing.
Reviewer checklist