Skip to content

Commit

Permalink
fix: missing topic classifier now uses MissingSourceTopicException (c…
Browse files Browse the repository at this point in the history
  • Loading branch information
agavra committed Sep 10, 2020
1 parent cf99425 commit 1cc0699
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 61 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,8 @@
package io.confluent.ksql.query;

import io.confluent.ksql.query.QueryError.Type;
import io.confluent.ksql.services.KafkaTopicClient;
import java.util.Objects;
import java.util.Set;
import org.apache.kafka.streams.errors.MissingSourceTopicException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -31,37 +30,24 @@ public class MissingTopicClassifier implements QueryErrorClassifier {

private static final Logger LOG = LoggerFactory.getLogger(MissingTopicClassifier.class);

private final Set<String> requiredTopics;
private final KafkaTopicClient topicClient;
private final String queryId;

public MissingTopicClassifier(
final String queryId,
final Set<String> requiredTopics,
final KafkaTopicClient topicClient
) {
public MissingTopicClassifier(final String queryId) {
this.queryId = Objects.requireNonNull(queryId, "queryId");
this.requiredTopics = Objects.requireNonNull(requiredTopics, "requiredTopics");
this.topicClient = Objects.requireNonNull(topicClient, "topicClient");
LOG.info("Query {} requires topics {}", queryId, requiredTopics);
}

@Override
public Type classify(final Throwable e) {
LOG.info(
"Attempting to classify missing topic error. Query ID: {} Required topics: {}",
queryId,
requiredTopics
);
final Type type = e instanceof MissingSourceTopicException ? Type.USER : Type.UNKNOWN;

for (String requiredTopic : requiredTopics) {
if (!topicClient.isTopicExists(requiredTopic)) {
LOG.warn("Query {} requires topic {} which cannot be found.", queryId, requiredTopic);
return Type.USER;
}
if (type == Type.USER) {
LOG.info(
"Classified error as USER error based on missing topic. Query ID: {} Exception: {}",
queryId,
e);
}

return Type.UNKNOWN;
return type;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import static io.confluent.ksql.util.KsqlConfig.KSQL_SHUTDOWN_TIMEOUT_MS_CONFIG;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import io.confluent.ksql.GenericRow;
import io.confluent.ksql.errors.ProductionExceptionHandlerUtil;
Expand Down Expand Up @@ -51,7 +50,6 @@
import io.confluent.ksql.util.TransientQueryMetadata;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
Expand All @@ -65,10 +63,6 @@
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.TopologyDescription.Node;
import org.apache.kafka.streams.TopologyDescription.Sink;
import org.apache.kafka.streams.TopologyDescription.Source;
import org.apache.kafka.streams.TopologyDescription.Subtopology;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;

Expand Down Expand Up @@ -212,10 +206,7 @@ public PersistentQueryMetadata buildPersistentQuery(
applicationId
));

final QueryErrorClassifier topicClassifier = new MissingTopicClassifier(
applicationId,
extractTopics(topology),
serviceContext.getTopicClient());
final QueryErrorClassifier topicClassifier = new MissingTopicClassifier(applicationId);
final QueryErrorClassifier classifier = buildConfiguredClassifiers(ksqlConfig, applicationId)
.map(topicClassifier::and)
.orElse(topicClassifier);
Expand Down Expand Up @@ -326,20 +317,6 @@ private static Optional<QueryErrorClassifier> buildConfiguredClassifiers(
return Optional.ofNullable(combined);
}

private static Set<String> extractTopics(final Topology topology) {
final Set<String> usedTopics = new HashSet<>();
for (final Subtopology subtopology : topology.describe().subtopologies()) {
for (final Node node : subtopology.nodes()) {
if (node instanceof Source) {
usedTopics.addAll(((Source) node).topicSet());
} else if (node instanceof Sink) {
usedTopics.add(((Sink) node).topic());
}
}
}
return ImmutableSet.copyOf(usedTopics);
}

private static void updateListProperty(
final Map<String, Object> properties,
final String key,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.confluent.ksql.query.QueryError.Type;
import io.confluent.ksql.services.KafkaTopicClient;
import java.util.Set;
import org.apache.kafka.streams.errors.MissingSourceTopicException;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
Expand All @@ -31,20 +32,13 @@
@RunWith(MockitoJUnitRunner.class)
public class MissingTopicClassifierTest {

@Mock
private KafkaTopicClient topicClient;
@Mock
private Throwable error;

@Test
public void shouldClassifyMissingTopicAsUserError() {
// Given:
final Set<String> requiredTopics = ImmutableSet.of("A", "B");
when(topicClient.isTopicExists("A")).thenReturn(true);
when(topicClient.isTopicExists("B")).thenReturn(false);
final Exception e = new MissingSourceTopicException("foo");

// When:
final Type type = new MissingTopicClassifier("", requiredTopics, topicClient).classify(error);
final Type type = new MissingTopicClassifier("").classify(e);

// Then:
assertThat(type, is(Type.USER));
Expand All @@ -53,15 +47,13 @@ public void shouldClassifyMissingTopicAsUserError() {
@Test
public void shouldClassifyNoMissingTopicAsUnknownError() {
// Given:
final Set<String> requiredTopics = ImmutableSet.of("A", "B");
when(topicClient.isTopicExists("A")).thenReturn(true);
when(topicClient.isTopicExists("B")).thenReturn(false);
final Exception e = new Exception("foo");

// When:
final Type type = new MissingTopicClassifier("", requiredTopics, topicClient).classify(error);
final Type type = new MissingTopicClassifier("").classify(e);

// Then:
assertThat(type, is(Type.USER));
assertThat(type, is(Type.UNKNOWN));
}

}

0 comments on commit 1cc0699

Please sign in to comment.