Skip to content

Commit

Permalink
feat: Add SHOW TOPICS EXTENDED
Browse files Browse the repository at this point in the history
Fixes #1268

BREAKING CHANGE: "SHOW TOPICS" no longer includes the "Consumers" and
"ConsumerGroups" columns. You can use "SHOW TOPICS EXTENDED" to get the
output previous emitted from "SHOW TOPICS". See below for examples.

This change splits "SHOW TOPICS" into two commands:

1. "SHOW TOPICS EXTENDED", which shows what was previously shown by
"SHOW TOPICS". Sample output:

```
    ksql> show topics extended;

     Kafka Topic                                                                                   | Partitions | Partition Replicas | Consumers | ConsumerGroups
    --------------------------------------------------------------------------------------------------------------------------------------------------------------
     _confluent-command                                                                            | 1          | 1                  | 1         | 1
     _confluent-controlcenter-5-3-0-1-actual-group-consumption-rekey                               | 1          | 1                  | 1         | 1
```

2. "SHOW TOPICS", which now no longer queries consumer groups and their
active consumers. Sample output:

```
    ksql> show topics;

     Kafka Topic                                                                                   | Partitions | Partition Replicas
    ---------------------------------------------------------------------------------------------------------------------------------
     _confluent-command                                                                            | 1          | 1
     _confluent-controlcenter-5-3-0-1-actual-group-consumption-rekey                               | 1          | 1
```

This changeset primarily involved renaming KafkaTopicXXX classes to
KafkaTopicXXXExtended and then trimming out the consumer group info from
the original KafkaTopicXXX classes.
  • Loading branch information
cpettitt-confluent committed Aug 7, 2019
1 parent b503c8d commit a6fe3b7
Show file tree
Hide file tree
Showing 21 changed files with 501 additions and 236 deletions.
12 changes: 6 additions & 6 deletions docs/developer-guide/query-with-structured-data.rst
Original file line number Diff line number Diff line change
Expand Up @@ -121,12 +121,12 @@ Your output should resemble:
::
Kafka Topic | Partitions | Partition Replicas | Consumers | ConsumerGroups
----------------------------------------------------------------------------------
_confluent-metrics | 12 | 1 | 0 | 0
_schemas | 1 | 1 | 0 | 0
raw-topic | 1 | 1 | 0 | 0
----------------------------------------------------------------------------------
Kafka Topic | Partitions | Partition Replicas
------------------------------------------------------
_confluent-metrics | 12 | 1
_schemas | 1 | 1
raw-topic | 1 | 1
------------------------------------------------------
Inspect ``raw-topic`` to ensure that |kcat| populated it:
Expand Down
7 changes: 4 additions & 3 deletions docs/developer-guide/syntax-reference.rst
Original file line number Diff line number Diff line change
Expand Up @@ -1338,13 +1338,14 @@ SHOW TOPICS

.. code:: sql
SHOW | LIST TOPICS;
SHOW | LIST TOPICS [EXTENDED];
**Description**

List the available topics in the Kafka cluster that KSQL is configured
SHOW TOPICS lists the available topics in the Kafka cluster that KSQL is configured
to connect to (default setting for ``bootstrap.servers``:
``localhost:9092``).
``localhost:9092``). SHOW TOPICS EXTENDED also displays consumer groups and their active consumer
counts.

.. _show-streams:

Expand Down
14 changes: 7 additions & 7 deletions docs/includes/ksql-includes.rst
Original file line number Diff line number Diff line change
Expand Up @@ -126,13 +126,13 @@ Your output should resemble:

::

Kafka Topic | Partitions | Partition Replicas | Consumers | ConsumerGroups
-----------------------------------------------------------------------------------
_confluent-metrics | 12 | 1 | 0 | 0
_schemas | 1 | 1 | 0 | 0
pageviews | 1 | 1 | 0 | 0
users | 1 | 1 | 0 | 0
-----------------------------------------------------------------------------------
Kafka Topic | Partitions | Partition Replicas
------------------------------------------------------
_confluent-metrics | 12 | 1
_schemas | 1 | 1
pageviews | 1 | 1
users | 1 | 1
------------------------------------------------------

Inspect the ``users`` topic by using the PRINT statement:

Expand Down
4 changes: 1 addition & 3 deletions docs/tutorials/basics-control-center.rst
Original file line number Diff line number Diff line change
Expand Up @@ -154,9 +154,7 @@ statements in KSQL Editor, just like you use them in the KSQL CLI.
"registered": true,
"replicaInfo": [
1
],
"consumerCount": 0,
"consumerGroupCount": 0
]
},
The ``"registered": true`` indicator means that you have registered the topic
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import io.confluent.ksql.rest.entity.FunctionInfo;
import io.confluent.ksql.rest.entity.FunctionNameList;
import io.confluent.ksql.rest.entity.KafkaTopicsList;
import io.confluent.ksql.rest.entity.KafkaTopicsListExtended;
import io.confluent.ksql.rest.entity.KsqlEntity;
import io.confluent.ksql.rest.entity.KsqlEntityList;
import io.confluent.ksql.rest.entity.KsqlErrorMessage;
Expand Down Expand Up @@ -130,7 +131,11 @@ public class Console implements Closeable {
.put(TablesList.class,
tablePrinter(TablesList.class, TablesListTableBuilder::new))
.put(KafkaTopicsList.class,
tablePrinter(KafkaTopicsList.class, KafkaTopicsListTableBuilder::new))
tablePrinter(KafkaTopicsList.class, KafkaTopicsListTableBuilder.SimpleBuilder::new))
.put(KafkaTopicsListExtended.class,
tablePrinter(
KafkaTopicsListExtended.class,
KafkaTopicsListTableBuilder.ExtendedBuilder::new))
.put(ExecutionPlan.class,
tablePrinter(ExecutionPlan.class, ExecutionPlanTableBuilder::new))
.put(FunctionNameList.class,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,33 +19,57 @@
import io.confluent.ksql.cli.console.table.Table;
import io.confluent.ksql.cli.console.table.Table.Builder;
import io.confluent.ksql.rest.entity.KafkaTopicsList;
import io.confluent.ksql.rest.entity.KafkaTopicsListExtended;
import io.confluent.ksql.util.StringUtil;
import java.util.List;
import java.util.stream.Stream;

public class KafkaTopicsListTableBuilder implements TableBuilder<KafkaTopicsList> {
public class KafkaTopicsListTableBuilder {

private static final List<String> HEADERS = ImmutableList.of(
"Kafka Topic",
"Partitions",
"Partition Replicas",
"Consumers",
"ConsumerGroups");
public static class SimpleBuilder implements TableBuilder<KafkaTopicsList> {
private static final List<String> HEADERS = ImmutableList.of(
"Kafka Topic",
"Partitions",
"Partition Replicas");

@Override
public Table buildTable(final KafkaTopicsList entity) {
final Stream<List<String>> rows = entity.getTopics().stream()
.map(t -> ImmutableList.of(
t.getName(),
Integer.toString(t.getReplicaInfo().size()),
getTopicReplicaInfo(t.getReplicaInfo()),
Integer.toString(t.getConsumerCount()),
Integer.toString(t.getConsumerGroupCount())));
@Override
public Table buildTable(final KafkaTopicsList entity) {
final Stream<List<String>> rows = entity.getTopics().stream()
.map(t -> ImmutableList.of(
t.getName(),
Integer.toString(t.getReplicaInfo().size()),
getTopicReplicaInfo(t.getReplicaInfo())));

return new Builder()
.withColumnHeaders(HEADERS)
.withRows(rows)
.build();
return new Builder()
.withColumnHeaders(HEADERS)
.withRows(rows)
.build();
}
}

public static class ExtendedBuilder implements TableBuilder<KafkaTopicsListExtended> {
private static final List<String> HEADERS = ImmutableList.of(
"Kafka Topic",
"Partitions",
"Partition Replicas",
"Consumers",
"ConsumerGroups");

@Override
public Table buildTable(final KafkaTopicsListExtended entity) {
final Stream<List<String>> rows = entity.getTopics().stream()
.map(t -> ImmutableList.of(
t.getName(),
Integer.toString(t.getReplicaInfo().size()),
getTopicReplicaInfo(t.getReplicaInfo()),
Integer.toString(t.getConsumerCount()),
Integer.toString(t.getConsumerGroupCount())));

return new Builder()
.withColumnHeaders(HEADERS)
.withRows(rows)
.build();
}
}

/**
Expand Down
10 changes: 9 additions & 1 deletion ksql-cli/src/test/java/io/confluent/ksql/cli/CliTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -355,9 +355,17 @@ private void selectWithLimit(

@Test
public void shouldPrintResultsForListOrShowCommands() {

assertRunListCommand(
"topics",
hasRow(
equalTo(orderDataProvider.topicName()),
equalTo("1"),
equalTo("1")
)
);

assertRunListCommand(
"topics extended",
hasRow(
equalTo(orderDataProvider.topicName()),
equalTo("1"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ singleExpression
statement
: query #querystatement
| (LIST | SHOW) PROPERTIES #listProperties
| (LIST | SHOW) TOPICS #listTopics
| (LIST | SHOW) TOPICS EXTENDED? #listTopics
| (LIST | SHOW) STREAMS EXTENDED? #listStreams
| (LIST | SHOW) TABLES EXTENDED? #listTables
| (LIST | SHOW) FUNCTIONS #listFunctions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -571,7 +571,7 @@ public Node visitRunScript(final SqlBaseParser.RunScriptContext context) {

@Override
public Node visitListTopics(final SqlBaseParser.ListTopicsContext context) {
return new ListTopics(getLocation(context));
return new ListTopics(getLocation(context), context.EXTENDED() != null);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,27 +22,38 @@

public class ListTopics extends Statement {

public ListTopics(final Optional<NodeLocation> location) {
private final boolean showExtended;

public ListTopics(final Optional<NodeLocation> location, final boolean showExtended) {
super(location);
this.showExtended = showExtended;
}

@Override
public int hashCode() {
return Objects.hash(getClass());
public boolean getShowExtended() {
return showExtended;
}

@Override
public boolean equals(final Object obj) {
if (this == obj) {
public boolean equals(final Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
final ListTopics that = (ListTopics) o;
return showExtended == that.showExtended;
}

return obj != null && obj.getClass().equals(getClass());
@Override
public int hashCode() {
return Objects.hash(showExtended);
}

@Override
public String toString() {
return toStringHelper(this)
.add("showExtended", showExtended)
.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -619,11 +619,17 @@ public void testSelectSessionWindow() {

@Test
public void testShowTopics() {
// Given:
final String simpleQuery = "SHOW TOPICS;";

// When:
final Statement statement = KsqlParserTestUtil.buildSingleAst(simpleQuery, metaStore).getStatement();
Assert.assertTrue(statement instanceof ListTopics);
final ListTopics listTopics = (ListTopics) statement;
Assert.assertTrue(listTopics.toString().equalsIgnoreCase("ListTopics{}"));

// Then:
Assert.assertTrue(statement instanceof ListTopics);
Assert.assertThat(listTopics.toString(), is("ListTopics{showExtended=false}"));
Assert.assertThat(listTopics.getShowExtended(), is(false));
}

@Test
Expand Down Expand Up @@ -761,6 +767,21 @@ public void shouldSetShowDescriptionsForShowStreamsDescriptions() {
Assert.assertThat(listStreams.getShowExtended(), is(true));
}

@Test
public void shouldSetShowDescriptionsForShowTopicsDescriptions() {
// Given:
final String statementString = "SHOW TOPICS EXTENDED;";

// When:
final Statement statement = KsqlParserTestUtil.buildSingleAst(statementString, metaStore)
.getStatement();

// Then:
Assert.assertThat(statement, instanceOf(ListTopics.class));
final ListTopics listTopics = (ListTopics)statement;
Assert.assertThat(listTopics.getShowExtended(), is(true));
}

@Test
public void shouldSetShowDescriptionsForShowTablesDescriptions() {
final String statementString = "SHOW TABLES EXTENDED;";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,15 @@ public class ListTopicsTest {

@Test
public void shouldImplementHashCodeAndEqualsProperty() {
// Note: At the moment location does not take part in equality testing
new EqualsTester()
.addEqualityGroup(
// Note: At the moment location does not take part in equality testing
new ListTopics(Optional.of(SOME_LOCATION)),
new ListTopics(Optional.of(OTHER_LOCATION))
new ListTopics(Optional.of(SOME_LOCATION), true),
new ListTopics(Optional.of(OTHER_LOCATION), true)
)
.addEqualityGroup(
new ListTopics(Optional.of(SOME_LOCATION), false),
new ListTopics(Optional.of(OTHER_LOCATION), false)
)
.testEquals();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,20 +28,14 @@ public class KafkaTopicInfo {

private final String name;
private final List<Integer> replicaInfo;
private final int consumerGroupCount;
private final int consumerCount;

@JsonCreator
public KafkaTopicInfo(
@JsonProperty("name") final String name,
@JsonProperty("replicaInfo") final List<Integer> replicaInfo,
@JsonProperty("consumerCount") final int consumerCount,
@JsonProperty("consumerGroupCount") final int consumerGroupCount
@JsonProperty("replicaInfo") final List<Integer> replicaInfo
) {
this.name = name;
this.replicaInfo = replicaInfo;
this.consumerGroupCount = consumerGroupCount;
this.consumerCount = consumerCount;
}

public String getName() {
Expand All @@ -52,14 +46,6 @@ public List<Integer> getReplicaInfo() {
return replicaInfo;
}

public int getConsumerCount() {
return consumerCount;
}

public int getConsumerGroupCount() {
return consumerGroupCount;
}

@Override
public boolean equals(final Object o) {
if (this == o) {
Expand Down
Loading

0 comments on commit a6fe3b7

Please sign in to comment.