Skip to content

Commit

Permalink
feat: Add SHOW TOPICS EXTENDED
Browse files Browse the repository at this point in the history
Fixes #1268

BREAKING CHANGE: "SHOW TOPICS" no longer includes the "Consumers" and
"ConsumerGroups" columns. You can use "SHOW TOPICS EXTENDED" to get the
output previous emitted from "SHOW TOPICS". See below for examples.

This change splits "SHOW TOPICS" into two commands:

1. "SHOW TOPICS EXTENDED", which shows what was previously shown by
"SHOW TOPICS". Sample output:

```
    ksql> show topics extended;

     Kafka Topic                                                                                   | Partitions | Partition Replicas | Consumers | ConsumerGroups
    --------------------------------------------------------------------------------------------------------------------------------------------------------------
     _confluent-command                                                                            | 1          | 1                  | 1         | 1
     _confluent-controlcenter-5-3-0-1-actual-group-consumption-rekey                               | 1          | 1                  | 1         | 1
```

2. "SHOW TOPICS", which now no longer queries consumer groups and their
active consumers. Sample output:

```
    ksql> show topics;

     Kafka Topic                                                                                   | Partitions | Partition Replicas
    ---------------------------------------------------------------------------------------------------------------------------------
     _confluent-command                                                                            | 1          | 1
     _confluent-controlcenter-5-3-0-1-actual-group-consumption-rekey                               | 1          | 1
```

This changeset primarily involved renaming KafkaTopicXXX classes to
KafkaTopicXXXExtended and then trimming out the consumer group info from
the original KafkaTopicXXX classes.
  • Loading branch information
cpettitt-confluent committed Aug 6, 2019
1 parent b503c8d commit df9a263
Show file tree
Hide file tree
Showing 22 changed files with 535 additions and 136 deletions.
12 changes: 6 additions & 6 deletions docs/developer-guide/query-with-structured-data.rst
Original file line number Diff line number Diff line change
Expand Up @@ -121,12 +121,12 @@ Your output should resemble:
::
Kafka Topic | Partitions | Partition Replicas | Consumers | ConsumerGroups
----------------------------------------------------------------------------------
_confluent-metrics | 12 | 1 | 0 | 0
_schemas | 1 | 1 | 0 | 0
raw-topic | 1 | 1 | 0 | 0
----------------------------------------------------------------------------------
Kafka Topic | Partitions | Partition Replicas
------------------------------------------------------
_confluent-metrics | 12 | 1
_schemas | 1 | 1
raw-topic | 1 | 1
------------------------------------------------------
Inspect ``raw-topic`` to ensure that |kcat| populated it:
Expand Down
7 changes: 4 additions & 3 deletions docs/developer-guide/syntax-reference.rst
Original file line number Diff line number Diff line change
Expand Up @@ -1338,13 +1338,14 @@ SHOW TOPICS

.. code:: sql
SHOW | LIST TOPICS;
SHOW | LIST TOPICS [EXTENDED];
**Description**

List the available topics in the Kafka cluster that KSQL is configured
SHOW TOPICS list the available topics in the Kafka cluster that KSQL is configured
to connect to (default setting for ``bootstrap.servers``:
``localhost:9092``).
``localhost:9092``). SHOW TOPICS EXTENDED also displays consumer groups and their active consumer
counts.

.. _show-streams:

Expand Down
14 changes: 7 additions & 7 deletions docs/includes/ksql-includes.rst
Original file line number Diff line number Diff line change
Expand Up @@ -126,13 +126,13 @@ Your output should resemble:

::

Kafka Topic | Partitions | Partition Replicas | Consumers | ConsumerGroups
-----------------------------------------------------------------------------------
_confluent-metrics | 12 | 1 | 0 | 0
_schemas | 1 | 1 | 0 | 0
pageviews | 1 | 1 | 0 | 0
users | 1 | 1 | 0 | 0
-----------------------------------------------------------------------------------
Kafka Topic | Partitions | Partition Replicas
------------------------------------------------------
_confluent-metrics | 12 | 1
_schemas | 1 | 1
pageviews | 1 | 1
users | 1 | 1
------------------------------------------------------

Inspect the ``users`` topic by using the PRINT statement:

Expand Down
4 changes: 1 addition & 3 deletions docs/tutorials/basics-control-center.rst
Original file line number Diff line number Diff line change
Expand Up @@ -154,9 +154,7 @@ statements in KSQL Editor, just like you use them in the KSQL CLI.
"registered": true,
"replicaInfo": [
1
],
"consumerCount": 0,
"consumerGroupCount": 0
]
},
The ``"registered": true`` indicator means that you have registered the topic
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import io.confluent.ksql.cli.console.table.builder.ErrorEntityTableBuilder;
import io.confluent.ksql.cli.console.table.builder.ExecutionPlanTableBuilder;
import io.confluent.ksql.cli.console.table.builder.FunctionNameListTableBuilder;
import io.confluent.ksql.cli.console.table.builder.KafkaTopicsListExtendedTableBuilder;
import io.confluent.ksql.cli.console.table.builder.KafkaTopicsListTableBuilder;
import io.confluent.ksql.cli.console.table.builder.PropertiesListTableBuilder;
import io.confluent.ksql.cli.console.table.builder.QueriesTableBuilder;
Expand All @@ -49,6 +50,7 @@
import io.confluent.ksql.rest.entity.FunctionInfo;
import io.confluent.ksql.rest.entity.FunctionNameList;
import io.confluent.ksql.rest.entity.KafkaTopicsList;
import io.confluent.ksql.rest.entity.KafkaTopicsListExtended;
import io.confluent.ksql.rest.entity.KsqlEntity;
import io.confluent.ksql.rest.entity.KsqlEntityList;
import io.confluent.ksql.rest.entity.KsqlErrorMessage;
Expand Down Expand Up @@ -131,6 +133,8 @@ public class Console implements Closeable {
tablePrinter(TablesList.class, TablesListTableBuilder::new))
.put(KafkaTopicsList.class,
tablePrinter(KafkaTopicsList.class, KafkaTopicsListTableBuilder::new))
.put(KafkaTopicsListExtended.class,
tablePrinter(KafkaTopicsListExtended.class, KafkaTopicsListExtendedTableBuilder::new))
.put(ExecutionPlan.class,
tablePrinter(ExecutionPlan.class, ExecutionPlanTableBuilder::new))
.put(FunctionNameList.class,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Copyright 2018 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.cli.console.table.builder;

import com.google.common.collect.ImmutableList;
import io.confluent.ksql.cli.console.table.Table;
import io.confluent.ksql.cli.console.table.Table.Builder;
import io.confluent.ksql.rest.entity.KafkaTopicsListExtended;
import io.confluent.ksql.util.StringUtil;
import java.util.List;
import java.util.stream.Stream;

public class KafkaTopicsListExtendedTableBuilder implements TableBuilder<KafkaTopicsListExtended> {

private static final List<String> HEADERS = ImmutableList.of(
"Kafka Topic",
"Partitions",
"Partition Replicas",
"Consumers",
"ConsumerGroups");

@Override
public Table buildTable(final KafkaTopicsListExtended entity) {
final Stream<List<String>> rows = entity.getTopics().stream()
.map(t -> ImmutableList.of(
t.getName(),
Integer.toString(t.getReplicaInfo().size()),
getTopicReplicaInfo(t.getReplicaInfo()),
Integer.toString(t.getConsumerCount()),
Integer.toString(t.getConsumerGroupCount())));

return new Builder()
.withColumnHeaders(HEADERS)
.withRows(rows)
.build();
}

/**
* Pretty print replica info.
*
* @param replicaSizes list of replicas per partition
* @return single value if all values are equal, else a csv representation
*/
private static String getTopicReplicaInfo(final List<Integer> replicaSizes) {
if (replicaSizes.isEmpty()) {
return "0";
} else if (replicaSizes.stream().distinct().limit(2).count() <= 1) {
return String.valueOf(replicaSizes.get(0));
} else {
return StringUtil.join(", ", replicaSizes);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,19 +28,15 @@ public class KafkaTopicsListTableBuilder implements TableBuilder<KafkaTopicsList
private static final List<String> HEADERS = ImmutableList.of(
"Kafka Topic",
"Partitions",
"Partition Replicas",
"Consumers",
"ConsumerGroups");
"Partition Replicas");

@Override
public Table buildTable(final KafkaTopicsList entity) {
final Stream<List<String>> rows = entity.getTopics().stream()
.map(t -> ImmutableList.of(
t.getName(),
Integer.toString(t.getReplicaInfo().size()),
getTopicReplicaInfo(t.getReplicaInfo()),
Integer.toString(t.getConsumerCount()),
Integer.toString(t.getConsumerGroupCount())));
getTopicReplicaInfo(t.getReplicaInfo())));

return new Builder()
.withColumnHeaders(HEADERS)
Expand Down
10 changes: 9 additions & 1 deletion ksql-cli/src/test/java/io/confluent/ksql/cli/CliTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -355,9 +355,17 @@ private void selectWithLimit(

@Test
public void shouldPrintResultsForListOrShowCommands() {

assertRunListCommand(
"topics",
hasRow(
equalTo(orderDataProvider.topicName()),
equalTo("1"),
equalTo("1")
)
);

assertRunListCommand(
"topics extended",
hasRow(
equalTo(orderDataProvider.topicName()),
equalTo("1"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ singleExpression
statement
: query #querystatement
| (LIST | SHOW) PROPERTIES #listProperties
| (LIST | SHOW) TOPICS #listTopics
| (LIST | SHOW) TOPICS EXTENDED? #listTopics
| (LIST | SHOW) STREAMS EXTENDED? #listStreams
| (LIST | SHOW) TABLES EXTENDED? #listTables
| (LIST | SHOW) FUNCTIONS #listFunctions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -571,7 +571,7 @@ public Node visitRunScript(final SqlBaseParser.RunScriptContext context) {

@Override
public Node visitListTopics(final SqlBaseParser.ListTopicsContext context) {
return new ListTopics(getLocation(context));
return new ListTopics(getLocation(context), context.EXTENDED() != null);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,27 +22,38 @@

public class ListTopics extends Statement {

public ListTopics(final Optional<NodeLocation> location) {
private final boolean showExtended;

public ListTopics(final Optional<NodeLocation> location, final boolean showExtended) {
super(location);
this.showExtended = showExtended;
}

@Override
public int hashCode() {
return Objects.hash(getClass());
public boolean getShowExtended() {
return showExtended;
}

@Override
public boolean equals(final Object obj) {
if (this == obj) {
public boolean equals(final Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
final ListTopics that = (ListTopics) o;
return showExtended == that.showExtended;
}

return obj != null && obj.getClass().equals(getClass());
@Override
public int hashCode() {
return Objects.hash(showExtended);
}

@Override
public String toString() {
return toStringHelper(this)
.add("showExtended", showExtended)
.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -623,7 +623,8 @@ public void testShowTopics() {
final Statement statement = KsqlParserTestUtil.buildSingleAst(simpleQuery, metaStore).getStatement();
Assert.assertTrue(statement instanceof ListTopics);
final ListTopics listTopics = (ListTopics) statement;
Assert.assertTrue(listTopics.toString().equalsIgnoreCase("ListTopics{}"));
Assert.assertThat(listTopics.toString(), is("ListTopics{showExtended=false}"));
Assert.assertThat(listTopics.getShowExtended(), is(false));
}

@Test
Expand Down Expand Up @@ -761,6 +762,16 @@ public void shouldSetShowDescriptionsForShowStreamsDescriptions() {
Assert.assertThat(listStreams.getShowExtended(), is(true));
}

@Test
public void shouldSetShowDescriptionsForShowTopicsDescriptions() {
final String statementString = "SHOW TOPICS EXTENDED;";
final Statement statement = KsqlParserTestUtil.buildSingleAst(statementString, metaStore)
.getStatement();
Assert.assertThat(statement, instanceOf(ListTopics.class));
final ListTopics listTopics = (ListTopics)statement;
Assert.assertThat(listTopics.getShowExtended(), is(true));
}

@Test
public void shouldSetShowDescriptionsForShowTablesDescriptions() {
final String statementString = "SHOW TABLES EXTENDED;";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,15 @@ public class ListTopicsTest {

@Test
public void shouldImplementHashCodeAndEqualsProperty() {
// Note: At the moment location does not take part in equality testing
new EqualsTester()
.addEqualityGroup(
// Note: At the moment location does not take part in equality testing
new ListTopics(Optional.of(SOME_LOCATION)),
new ListTopics(Optional.of(OTHER_LOCATION))
new ListTopics(Optional.of(SOME_LOCATION), true),
new ListTopics(Optional.of(OTHER_LOCATION), true)
)
.addEqualityGroup(
new ListTopics(Optional.of(SOME_LOCATION), false),
new ListTopics(Optional.of(OTHER_LOCATION), false)
)
.testEquals();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,20 +28,14 @@ public class KafkaTopicInfo {

private final String name;
private final List<Integer> replicaInfo;
private final int consumerGroupCount;
private final int consumerCount;

@JsonCreator
public KafkaTopicInfo(
@JsonProperty("name") final String name,
@JsonProperty("replicaInfo") final List<Integer> replicaInfo,
@JsonProperty("consumerCount") final int consumerCount,
@JsonProperty("consumerGroupCount") final int consumerGroupCount
@JsonProperty("replicaInfo") final List<Integer> replicaInfo
) {
this.name = name;
this.replicaInfo = replicaInfo;
this.consumerGroupCount = consumerGroupCount;
this.consumerCount = consumerCount;
}

public String getName() {
Expand All @@ -52,14 +46,6 @@ public List<Integer> getReplicaInfo() {
return replicaInfo;
}

public int getConsumerCount() {
return consumerCount;
}

public int getConsumerGroupCount() {
return consumerGroupCount;
}

@Override
public boolean equals(final Object o) {
if (this == o) {
Expand Down
Loading

0 comments on commit df9a263

Please sign in to comment.