Skip to content

Commit

Permalink
fix: show queries now returns the correct Kafka Topic if the query st…
Browse files Browse the repository at this point in the history
…ring contains with clause (#4430)
  • Loading branch information
stevenpyzhang committed Feb 4, 2020
1 parent 81f984a commit 1b713cd
Show file tree
Hide file tree
Showing 8 changed files with 31 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
public class QueriesTableBuilder implements TableBuilder<Queries> {

private static final List<String> HEADERS =
ImmutableList.of("Query ID", "Status", "Kafka Topic", "Query String");
ImmutableList.of("Query ID", "Status", "Sink Name", "Sink Kafka Topic", "Query String");

@Override
public Table buildTable(final Queries entity) {
Expand All @@ -34,6 +34,7 @@ public Table buildTable(final Queries entity) {
r.getId().getId(),
r.getState().orElse("N/A"),
String.join(",", r.getSinks()),
String.join(",", r.getSinkKafkaTopics()),
r.getQuerySingleLine()
));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ public void testPrintQueries() {
final List<RunningQuery> queries = new ArrayList<>();
queries.add(
new RunningQuery(
"select * from t1", Collections.singleton("Test"), new QueryId("0"), Optional.of("Foobar")));
"select * from t1", Collections.singleton("Test"), Collections.singleton("Test topic"), new QueryId("0"), Optional.of("Foobar")));

final KsqlEntityList entityList = new KsqlEntityList(ImmutableList.of(
new Queries("e", queries)
Expand All @@ -305,17 +305,18 @@ public void testPrintQueries() {
+ " \"queries\" : [ {\n"
+ " \"queryString\" : \"select * from t1\",\n"
+ " \"sinks\" : [ \"Test\" ],\n"
+ " \"sinkKafkaTopics\" : [ \"Test topic\" ],\n"
+ " \"id\" : \"0\",\n"
+ " \"state\" : \"Foobar\"\n"
+ " } ],\n"
+ " \"warnings\" : [ ]\n"
+ "} ]\n"));
} else {
assertThat(output, is("\n"
+ " Query ID | Status | Kafka Topic | Query String \n"
+ "----------------------------------------------------\n"
+ " 0 | Foobar | Test | select * from t1 \n"
+ "----------------------------------------------------\n"
+ " Query ID | Status | Sink Name | Sink Kafka Topic | Query String \n"
+ "---------------------------------------------------------------------\n"
+ " 0 | Foobar | Test | Test topic | select * from t1 \n"
+ "---------------------------------------------------------------------\n"
+ "For detailed information on a Query run: EXPLAIN <Query ID>;\n"));
}
}
Expand All @@ -337,10 +338,10 @@ public void testPrintSourceDescription() {
);

final List<RunningQuery> readQueries = ImmutableList.of(
new RunningQuery("read query", ImmutableSet.of("sink1"), new QueryId("readId"), Optional.of("Running"))
new RunningQuery("read query", ImmutableSet.of("sink1"), ImmutableSet.of("sink1 topic"), new QueryId("readId"), Optional.of("Running"))
);
final List<RunningQuery> writeQueries = ImmutableList.of(
new RunningQuery("write query", ImmutableSet.of("sink2"), new QueryId("writeId"), Optional.of("Running"))
new RunningQuery("write query", ImmutableSet.of("sink2"), ImmutableSet.of("sink2 topic"), new QueryId("writeId"), Optional.of("Running"))
);

final KsqlEntityList entityList = new KsqlEntityList(ImmutableList.of(
Expand Down Expand Up @@ -384,12 +385,14 @@ public void testPrintSourceDescription() {
+ " \"readQueries\" : [ {\n"
+ " \"queryString\" : \"read query\",\n"
+ " \"sinks\" : [ \"sink1\" ],\n"
+ " \"sinkKafkaTopics\" : [ \"sink1 topic\" ],\n"
+ " \"id\" : \"readId\",\n"
+ " \"state\" : \"Running\"\n"
+ " } ],\n"
+ " \"writeQueries\" : [ {\n"
+ " \"queryString\" : \"write query\",\n"
+ " \"sinks\" : [ \"sink2\" ],\n"
+ " \"sinkKafkaTopics\" : [ \"sink2 topic\" ],\n"
+ " \"id\" : \"writeId\",\n"
+ " \"state\" : \"Running\"\n"
+ " } ],\n"
Expand Down Expand Up @@ -990,10 +993,10 @@ public void testPrintExecuptionPlan() {
public void shouldPrintTopicDescribeExtended() {
// Given:
final List<RunningQuery> readQueries = ImmutableList.of(
new RunningQuery("read query", ImmutableSet.of("sink1"), new QueryId("readId"), Optional.of("Running"))
new RunningQuery("read query", ImmutableSet.of("sink1"), ImmutableSet.of("sink1 topic"), new QueryId("readId"), Optional.of("Running"))
);
final List<RunningQuery> writeQueries = ImmutableList.of(
new RunningQuery("write query", ImmutableSet.of("sink2"), new QueryId("writeId"), Optional.of("Running"))
new RunningQuery("write query", ImmutableSet.of("sink2"), ImmutableSet.of("sink2 topic"), new QueryId("writeId"), Optional.of("Running"))
);

final KsqlEntityList entityList = new KsqlEntityList(ImmutableList.of(
Expand Down Expand Up @@ -1036,12 +1039,14 @@ public void shouldPrintTopicDescribeExtended() {
+ " \"readQueries\" : [ {\n"
+ " \"queryString\" : \"read query\",\n"
+ " \"sinks\" : [ \"sink1\" ],\n"
+ " \"sinkKafkaTopics\" : [ \"sink1 topic\" ],\n"
+ " \"id\" : \"readId\",\n"
+ " \"state\" : \"Running\"\n"
+ " } ],\n"
+ " \"writeQueries\" : [ {\n"
+ " \"queryString\" : \"write query\",\n"
+ " \"sinks\" : [ \"sink2\" ],\n"
+ " \"sinkKafkaTopics\" : [ \"sink2 topic\" ],\n"
+ " \"id\" : \"writeId\",\n"
+ " \"state\" : \"Running\"\n"
+ " } ],\n"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ public static Optional<KsqlEntity> execute(
.map(q -> new RunningQuery(
q.getStatementString(),
ImmutableSet.of(q.getSinkName().name()),
ImmutableSet.of(q.getResultTopic().getKafkaTopicName()),
q.getQueryId(),
Optional.of(q.getState())
))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,7 @@ private static List<RunningQuery> getQueries(
.map(q -> new RunningQuery(
q.getStatementString(),
ImmutableSet.of(q.getSinkName().name()),
ImmutableSet.of(q.getResultTopic().getKafkaTopicName()),
q.getQueryId(),
Optional.of(q.getState())
))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ public void shouldListQueriesBasic() {
new RunningQuery(
metadata.getStatementString(),
ImmutableSet.of(metadata.getSinkName().name()),
ImmutableSet.of(metadata.getResultTopic().getKafkaTopicName()),
metadata.getQueryId(),
Optional.of(metadata.getState())
)));
Expand Down Expand Up @@ -124,6 +125,7 @@ public static PersistentQueryMetadata givenPersistentQuery(final String id) {

final KsqlTopic sinkTopic = mock(KsqlTopic.class);
when(sinkTopic.getKeyFormat()).thenReturn(KeyFormat.nonWindowed(FormatInfo.of(Format.KAFKA.name())));
when(sinkTopic.getKafkaTopicName()).thenReturn(id);
when(metadata.getResultTopic()).thenReturn(sinkTopic);

return metadata;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,7 @@ public void shouldShowColumnsSource() {
ImmutableList.of(new RunningQuery(
metadata.getStatementString(),
ImmutableSet.of(metadata.getSinkName().toString(FormatOptions.noEscape())),
ImmutableSet.of(metadata.getResultTopic().getKafkaTopicName()),
metadata.getQueryId(),
Optional.of(metadata.getState())
)),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1958,6 +1958,7 @@ private List<RunningQuery> createRunningQueries(
.map(md -> new RunningQuery(
md.getStatementString(),
ImmutableSet.of(md.getSinkName().toString(FormatOptions.noEscape())),
ImmutableSet.of(md.getResultTopic().getKafkaTopicName()),
md.getQueryId(),
Optional.of(md.getState())
))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,17 +29,20 @@ public class RunningQuery {

private final String queryString;
private final Set<String> sinks;
private final Set<String> sinkKafkaTopics;
private final QueryId id;
private final Optional<String> state;

@JsonCreator
public RunningQuery(
@JsonProperty("queryString") final String queryString,
@JsonProperty("sinks") final Set<String> sinks,
@JsonProperty("sinkKafkaTopics") final Set<String> sinkKafkaTopics,
@JsonProperty("id") final QueryId id,
@JsonProperty("state") final Optional<String> state
) {
this.queryString = Objects.requireNonNull(queryString, "queryString");
this.sinkKafkaTopics = Objects.requireNonNull(sinkKafkaTopics, "sinkKafkaTopics");
this.sinks = Objects.requireNonNull(sinks, "sinks");
this.id = Objects.requireNonNull(id, "id");
this.state = Objects.requireNonNull(state, "state");
Expand All @@ -58,6 +61,10 @@ public Set<String> getSinks() {
return sinks;
}

public Set<String> getSinkKafkaTopics() {
return sinkKafkaTopics;
}

public QueryId getId() {
return id;
}
Expand All @@ -78,11 +85,12 @@ public boolean equals(final Object o) {
return Objects.equals(id, that.id)
&& Objects.equals(queryString, that.queryString)
&& Objects.equals(sinks, that.sinks)
&& Objects.equals(sinkKafkaTopics, that.sinkKafkaTopics)
&& Objects.equals(state, that.state);
}

@Override
public int hashCode() {
return Objects.hash(id, queryString, id, state);
return Objects.hash(id, queryString, sinks, sinkKafkaTopics, state);
}
}

0 comments on commit 1b713cd

Please sign in to comment.