diff --git a/docs-md/developer-guide/ksqldb-reference/show-topics.md b/docs-md/developer-guide/ksqldb-reference/show-topics.md index ba27367e4390..87337eda6956 100644 --- a/docs-md/developer-guide/ksqldb-reference/show-topics.md +++ b/docs-md/developer-guide/ksqldb-reference/show-topics.md @@ -13,7 +13,7 @@ Synopsis -------- ```sql -SHOW | LIST TOPICS [EXTENDED]; +SHOW | LIST [ALL] TOPICS [EXTENDED]; ``` Description @@ -24,10 +24,37 @@ configured to connect to (default setting for `bootstrap.servers`: `localhost:9092`). SHOW TOPICS EXTENDED also displays consumer groups and their active consumer counts. +`SHOW TOPICS` does not display hidden topics by default, such as: +* KSQL internal topics, like the KSQL command topic or changelog & repartition topics, or + topics that match any pattern in the `ksql.hidden.topics` configuration. + +`SHOW ALL TOPICS` lists all topics, including hidden topics. + Example ------- -TODO: example +```sql +ksql> SHOW TOPICS; + + Kafka Topic | Partitions | Partition Replicas +-------------------------------------------------------------------------------------------------------- + default_ksql_processing_log | 1 | 1 + pageviews | 1 | 1 + users | 1 | 1 +-------------------------------------------------------------------------------------------------------- +``` -Page last revised on: {{ git_revision_date }} +```sql +ksql> SHOW ALL TOPICS; + + Kafka Topic | Partitions | Partition Replicas +-------------------------------------------------------------------------------------------------------- + _confluent-ksql-default__command_topic | 1 | 1 + _confluent-ksql-default_query_CTAS_USERS_0-Aggregate-Aggregate-Materialize-changelog | 1 | 1 + _confluent-ksql-default_query_CTAS_USERS_0-Aggregate-GroupBy-repartition | 1 | 1 + default_ksql_processing_log | 1 | 1 + pageviews | 1 | 1 + users | 1 | 1 +-------------------------------------------------------------------------------------------------------- +``` diff --git a/ksql-common/src/main/java/io/confluent/ksql/config/ImmutableProperties.java b/ksql-common/src/main/java/io/confluent/ksql/config/ImmutableProperties.java index 77c3f5409523..d3e2249f1393 100644 --- a/ksql-common/src/main/java/io/confluent/ksql/config/ImmutableProperties.java +++ b/ksql-common/src/main/java/io/confluent/ksql/config/ImmutableProperties.java @@ -30,6 +30,8 @@ public final class ImmutableProperties { .add(KsqlConfig.KSQL_EXT_DIR) .add(KsqlConfig.KSQL_ACTIVE_PERSISTENT_QUERY_LIMIT_CONFIG) .add(KsqlConfig.KSQL_PULL_QUERIES_ENABLE_CONFIG) + .add(KsqlConfig.KSQL_HIDDEN_TOPICS_CONFIG) + .add(KsqlConfig.KSQL_READONLY_TOPICS_CONFIG) .addAll(KsqlConfig.SSL_CONFIG_NAMES) .build(); diff --git a/ksql-common/src/main/java/io/confluent/ksql/configdef/ConfigValidators.java b/ksql-common/src/main/java/io/confluent/ksql/configdef/ConfigValidators.java index 163ce2dd9af3..00acf4fa27cf 100644 --- a/ksql-common/src/main/java/io/confluent/ksql/configdef/ConfigValidators.java +++ b/ksql-common/src/main/java/io/confluent/ksql/configdef/ConfigValidators.java @@ -22,6 +22,7 @@ import java.util.EnumSet; import java.util.List; import java.util.function.Function; +import java.util.regex.Pattern; import java.util.stream.Collectors; import org.apache.kafka.common.config.ConfigDef.Validator; import org.apache.kafka.common.config.ConfigException; @@ -92,6 +93,35 @@ public static Validator validUrl() { }; } + public static Validator validRegex() { + return (name, val) -> { + if (!(val instanceof List)) { + throw new IllegalArgumentException("validator should only be used with " + + "LIST of STRING defs"); + } + + final StringBuilder regexBuilder = new StringBuilder(); + for (Object item : (List)val) { + if (!(item instanceof String)) { + throw new IllegalArgumentException("validator should only be used with " + + "LIST of STRING defs"); + } + + if (regexBuilder.length() > 0) { + regexBuilder.append("|"); + } + + regexBuilder.append((String)item); + } + + try { + Pattern.compile(regexBuilder.toString()); + } catch (final Exception e) { + throw new ConfigException(name, val, "Not valid regular expression: " + e.getMessage()); + } + }; + } + public static final class ValidCaseInsensitiveString implements Validator { private final List validStrings; diff --git a/ksql-common/src/main/java/io/confluent/ksql/logging/processing/ProcessingLogConfig.java b/ksql-common/src/main/java/io/confluent/ksql/logging/processing/ProcessingLogConfig.java index 50c905938124..0bcd1bff2417 100644 --- a/ksql-common/src/main/java/io/confluent/ksql/logging/processing/ProcessingLogConfig.java +++ b/ksql-common/src/main/java/io/confluent/ksql/logging/processing/ProcessingLogConfig.java @@ -17,6 +17,8 @@ import io.confluent.ksql.util.KsqlConfig; import java.util.Map; +import java.util.Set; + import org.apache.kafka.common.config.AbstractConfig; import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigDef.Importance; @@ -125,6 +127,10 @@ private static String propertyName(final String name) { INCLUDE_ROWS_DOC ); + public static Set configNames() { + return CONFIG_DEF.names(); + } + public ProcessingLogConfig(final Map properties) { super(CONFIG_DEF, properties); } diff --git a/ksql-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java b/ksql-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java index bacc45658bd7..4d9b781d2891 100644 --- a/ksql-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java +++ b/ksql-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java @@ -20,8 +20,10 @@ import com.google.common.collect.ImmutableMap; import io.confluent.ksql.config.ConfigItem; import io.confluent.ksql.config.KsqlConfigResolver; +import io.confluent.ksql.configdef.ConfigValidators; import io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler; import io.confluent.ksql.errors.ProductionExceptionHandlerUtil; +import io.confluent.ksql.logging.processing.ProcessingLogConfig; import io.confluent.ksql.model.SemanticVersion; import io.confluent.ksql.testing.EffectivelyImmutable; import java.util.Collection; @@ -202,6 +204,36 @@ public class KsqlConfig extends AbstractConfig { public static final String KSQL_AUTH_CACHE_MAX_ENTRIES_DOC = "Controls the size of the cache " + "to a maximum number of KSQL authorization responses entries."; + public static final String KSQL_HIDDEN_TOPICS_CONFIG = "ksql.hidden.topics"; + public static final String KSQL_HIDDEN_TOPICS_DEFAULT = "_confluent.*,__confluent.*" + + ",_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets," + + "connect-status,connect-statuses"; + public static final String KSQL_HIDDEN_TOPICS_DOC = "Comma-separated list of topics that will " + + "be hidden. Entries in the list may be literal topic names or " + + "[Java regular expressions](https://docs.oracle.com/javase/8/docs/api/java/util/regex/" + + "Pattern.html). " + + "For example, `_confluent.*` will match any topic whose name starts with the `_confluent`)." + + "\nHidden topics will not visible when running the `SHOW TOPICS` command unless " + + "`SHOW ALL TOPICS` is used." + + "\nThe default value hides known system topics from Kafka and Confluent products." + + "\nKSQL also marks its own internal topics as hidden. This is not controlled by this " + + "config."; + + public static final String KSQL_READONLY_TOPICS_CONFIG = "ksql.readonly.topics"; + public static final String KSQL_READONLY_TOPICS_DEFAULT = "_confluent.*,__confluent.*" + + ",_schemas,__consumer_offsets,__transaction_state,connect-configs,connect-offsets," + + "connect-status,connect-statuses"; + public static final String KSQL_READONLY_TOPICS_DOC = "Comma-separated list of topics that " + + "should be marked as read-only. Entries in the list may be literal topic names or " + + "[Java regular expressions](https://docs.oracle.com/javase/8/docs/api/java/util/regex/" + + "Pattern.html). " + + "For example, `_confluent.*` will match any topic whose name starts with the `_confluent`)." + + "\nRead-only topics cannot be modified by any KSQL command." + + "\nThe default value marks known system topics from Kafka and Confluent products as " + + "read-only." + + "\nKSQL also marks its own internal topics as read-only. This is not controlled by this " + + "config."; + private enum ConfigGeneration { LEGACY, CURRENT @@ -532,6 +564,20 @@ private static ConfigDef buildConfigDef(final ConfigGeneration generation) { KSQL_AUTH_CACHE_MAX_ENTRIES_DEFAULT, Importance.LOW, KSQL_AUTH_CACHE_MAX_ENTRIES_DOC + ).define( + KSQL_HIDDEN_TOPICS_CONFIG, + Type.LIST, + KSQL_HIDDEN_TOPICS_DEFAULT, + ConfigValidators.validRegex(), + Importance.LOW, + KSQL_HIDDEN_TOPICS_DOC + ).define( + KSQL_READONLY_TOPICS_CONFIG, + Type.LIST, + KSQL_READONLY_TOPICS_DEFAULT, + ConfigValidators.validRegex(), + Importance.LOW, + KSQL_READONLY_TOPICS_DOC ) .withClientSslSupport(); @@ -669,6 +715,10 @@ public Map getProducerClientConfigProps() { return getConfigsFor(ProducerConfig.configNames()); } + public Map getProcessingLogConfigProps() { + return getConfigsFor(ProcessingLogConfig.configNames()); + } + private Map getConfigsFor(final Set configs) { final Map props = new HashMap<>(); ksqlStreamConfigProps.values().stream() diff --git a/ksql-common/src/main/java/io/confluent/ksql/util/KsqlConstants.java b/ksql-common/src/main/java/io/confluent/ksql/util/KsqlConstants.java index 5778f6ae0983..0f3ce1aeee5d 100644 --- a/ksql-common/src/main/java/io/confluent/ksql/util/KsqlConstants.java +++ b/ksql-common/src/main/java/io/confluent/ksql/util/KsqlConstants.java @@ -22,9 +22,6 @@ private KsqlConstants() { public static final String CONFLUENT_AUTHOR = "Confluent"; - public static final String KSQL_INTERNAL_TOPIC_PREFIX = "_confluent-ksql-"; - public static final String CONFLUENT_INTERNAL_TOPIC_PREFIX = "__confluent"; - public static final String STREAMS_CHANGELOG_TOPIC_SUFFIX = "-changelog"; public static final String STREAMS_REPARTITION_TOPIC_SUFFIX = "-repartition"; diff --git a/ksql-common/src/main/java/io/confluent/ksql/util/ReservedInternalTopics.java b/ksql-common/src/main/java/io/confluent/ksql/util/ReservedInternalTopics.java new file mode 100644 index 000000000000..82069a784885 --- /dev/null +++ b/ksql-common/src/main/java/io/confluent/ksql/util/ReservedInternalTopics.java @@ -0,0 +1,138 @@ +/* + * Copyright 2020 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.util; + +import com.google.common.collect.Streams; +import io.confluent.ksql.logging.processing.ProcessingLogConfig; + +import java.util.Set; +import java.util.regex.Pattern; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public final class ReservedInternalTopics { + private static final Logger LOG = LoggerFactory.getLogger(ReservedInternalTopics.class); + + // These constant should not be part of KsqlConfig.SYSTEM_INTERNAL_TOPICS_CONFIG because they're + // not configurable. + public static final String KSQL_INTERNAL_TOPIC_PREFIX = "_confluent-ksql-"; + public static final String KSQL_COMMAND_TOPIC_SUFFIX = "command_topic"; + public static final String KSQL_CONFIGS_TOPIC_SUFFIX = "configs"; + + /** + * Returns the internal KSQL command topic. + * + * @param ksqlConfig The KSQL config, which is used to extract the internal topic prefix. + * @return The command topic name. + */ + public static String commandTopic(final KsqlConfig ksqlConfig) { + return toKsqlInternalTopic(ksqlConfig, KSQL_COMMAND_TOPIC_SUFFIX); + } + + /** + * Returns the internal KSQL configs topic (used for KSQL standalone) + * + * @param ksqlConfig The KSQL config, which is used to extract the internal topic prefix. + * @return The configurations topic name. + */ + public static String configsTopic(final KsqlConfig ksqlConfig) { + return toKsqlInternalTopic(ksqlConfig, KSQL_CONFIGS_TOPIC_SUFFIX); + } + + /** + * Returns the KSQL processing log topic. + *

+ * This is not an internal topic in the sense that users are intentionally meant to read from + * this topic to identify deserialization and other processing errors, define a KSQL stream on + * it, and potentially issue queries to filter from it, etc. This is why it is not prefixed in + * the way KSQL internal topics are. + * + * @param config The Processing log config, which is used to extract the processing topic suffix + * @param ksqlConfig The KSQL config, which is used to extract the KSQL service id. + * @return The processing log topic name. + */ + public static String processingLogTopic( + final ProcessingLogConfig config, + final KsqlConfig ksqlConfig + ) { + final String topicNameConfig = config.getString(ProcessingLogConfig.TOPIC_NAME); + if (topicNameConfig.equals(ProcessingLogConfig.TOPIC_NAME_NOT_SET)) { + return String.format( + "%s%s", + ksqlConfig.getString(KsqlConfig.KSQL_SERVICE_ID_CONFIG), + ProcessingLogConfig.TOPIC_NAME_DEFAULT_SUFFIX + ); + } else { + return topicNameConfig; + } + } + + /** + * Compute a name for a KSQL internal topic. + * + * @param ksqlConfig The KSQL config, which is used to extract the internal topic prefix. + * @param topicSuffix A suffix that is appended to the topic name. + * @return The computed topic name. + */ + private static String toKsqlInternalTopic(final KsqlConfig ksqlConfig, final String topicSuffix) { + return String.format( + "%s%s_%s", + KSQL_INTERNAL_TOPIC_PREFIX, + ksqlConfig.getString(KsqlConfig.KSQL_SERVICE_ID_CONFIG), + topicSuffix + ); + } + + private final Pattern hiddenTopicsPattern; + private final Pattern readOnlyTopicsPattern; + + public ReservedInternalTopics(final KsqlConfig ksqlConfig) { + final ProcessingLogConfig processingLogConfig = + new ProcessingLogConfig(ksqlConfig.getProcessingLogConfigProps()); + + this.hiddenTopicsPattern = Pattern.compile( + Streams.concat( + Stream.of(KSQL_INTERNAL_TOPIC_PREFIX + ".*"), + ksqlConfig.getList(KsqlConfig.KSQL_HIDDEN_TOPICS_CONFIG).stream() + ).collect(Collectors.joining("|")) + ); + + this.readOnlyTopicsPattern = Pattern.compile( + Streams.concat( + Stream.of(processingLogTopic(processingLogConfig, ksqlConfig)), + Stream.of(KSQL_INTERNAL_TOPIC_PREFIX + ".*"), + ksqlConfig.getList(KsqlConfig.KSQL_READONLY_TOPICS_CONFIG).stream() + ).collect(Collectors.joining("|")) + ); + } + + public Set removeHiddenTopics(final Set topicNames) { + return topicNames.stream() + .filter(t -> !isHidden(t)) + .collect(Collectors.toSet()); + } + + public boolean isHidden(final String topicName) { + return hiddenTopicsPattern.matcher(topicName).matches(); + } + + public boolean isReadOnly(final String topicName) { + return readOnlyTopicsPattern.matcher(topicName).matches(); + } +} diff --git a/ksql-common/src/test/java/io/confluent/ksql/configdef/ConfigValidatorsTest.java b/ksql-common/src/test/java/io/confluent/ksql/configdef/ConfigValidatorsTest.java index b635e9a72f86..9aef40cae5e2 100644 --- a/ksql-common/src/test/java/io/confluent/ksql/configdef/ConfigValidatorsTest.java +++ b/ksql-common/src/test/java/io/confluent/ksql/configdef/ConfigValidatorsTest.java @@ -19,6 +19,7 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import java.util.Collections; import java.util.function.Function; import org.apache.kafka.common.config.ConfigDef.Validator; import org.apache.kafka.common.config.ConfigException; @@ -207,6 +208,56 @@ public void shouldNotThrowOnValidURL() { // Then: did not throw. } + @Test + public void shouldNotThrowOnValidRegex() { + // Given + final Validator validator = ConfigValidators.validRegex(); + + // When: + validator.ensureValid("propName", Collections.singletonList("prefix_.*")); + + // Then: did not throw. + } + + @Test + public void shouldThrowOnInvalidRegex() { + // Given: + final Validator validator = ConfigValidators.validRegex(); + + // Then: + expectedException.expect(ConfigException.class); + expectedException.expectMessage("Not valid regular expression: "); + + // When: + validator.ensureValid("propName", Collections.singletonList("*_suffix")); + } + + @Test + public void shouldThrowOnNoRegexList() { + // Given: + final Validator validator = ConfigValidators.validRegex(); + + // Then: + expectedException.expect(IllegalArgumentException.class); + expectedException.expectMessage("validator should only be used with LIST of STRING defs"); + + // When: + validator.ensureValid("propName", "*.*"); + } + + @Test + public void shouldThrowOnNoStringRegexList() { + // Given: + final Validator validator = ConfigValidators.validRegex(); + + // Then: + expectedException.expect(IllegalArgumentException.class); + expectedException.expectMessage("validator should only be used with LIST of STRING defs"); + + // When: + validator.ensureValid("propName", Collections.singletonList(1)); + } + private enum TestEnum { FOO, BAR } diff --git a/ksql-common/src/test/java/io/confluent/ksql/util/ReservedInternalTopicsTest.java b/ksql-common/src/test/java/io/confluent/ksql/util/ReservedInternalTopicsTest.java new file mode 100644 index 000000000000..52587aa75876 --- /dev/null +++ b/ksql-common/src/test/java/io/confluent/ksql/util/ReservedInternalTopicsTest.java @@ -0,0 +1,170 @@ +/* + * Copyright 2020 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.util; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; + +import java.util.List; +import java.util.Set; + +import com.google.common.collect.ImmutableSet; +import io.confluent.ksql.logging.processing.ProcessingLogConfig; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +import static org.hamcrest.Matchers.is; +import static org.junit.Assert.assertThat; + +public class ReservedInternalTopicsTest { + private static final String KSQL_PROCESSING_LOG_TOPIC = "default_ksql_processing_log"; + + @Rule + public final ExpectedException expectedException = ExpectedException.none(); + + private ReservedInternalTopics internalTopics; + private KsqlConfig ksqlConfig; + + @Before + public void setUp() { + ksqlConfig = new KsqlConfig(ImmutableMap.of( + KsqlConfig.KSQL_HIDDEN_TOPICS_CONFIG, "prefix_.*,literal,.*_suffix", + KsqlConfig.KSQL_READONLY_TOPICS_CONFIG, "ro_prefix_.*,ro_literal,.*_suffix_ro" + )); + + internalTopics = new ReservedInternalTopics(ksqlConfig); + } + + + @Test + public void shouldReturnTrueOnAllHiddenTopics() { + // Given + final List topicNames = ImmutableList.of( + ReservedInternalTopics.KSQL_INTERNAL_TOPIC_PREFIX + "_test", + "prefix_", "_suffix", "prefix_topic", "topic_suffix", "literal" + ); + + topicNames.forEach(topic -> { + // When + final boolean isHidden = internalTopics.isHidden(topic); + + // Then + assertThat("Should return true on hidden topic: " + topic, + isHidden, is(true)); + }); + } + + @Test + public void shouldReturnTrueOnAllReadOnlyTopics() { + // Given + final List topicNames = ImmutableList.of( + KSQL_PROCESSING_LOG_TOPIC, + ReservedInternalTopics.KSQL_INTERNAL_TOPIC_PREFIX + "_test", + "ro_prefix_", "_suffix_ro", "ro_prefix_topic", "topic_suffix_ro", "ro_literal" + ); + + topicNames.forEach(topic -> { + // When + final boolean isReadOnly = internalTopics.isReadOnly(topic); + + // Then + assertThat("Should return true on read-only topic: " + topic, + isReadOnly, is(true)); + }); + } + + @Test + public void shouldReturnFalseOnNonHiddenTopics() { + // Given + final List topicNames = ImmutableList.of( + KSQL_PROCESSING_LOG_TOPIC, + "topic_prefix_", "_suffix_topic" + ); + + // Given + topicNames.forEach(topic -> { + // When + final boolean isHidden = internalTopics.isHidden(topic); + + // Then + assertThat("Should return false on non-hidden topic: " + topic, + isHidden, is(false)); + }); + } + + @Test + public void shouldReturnFalseOnNonReadOnlyTopics() { + // Given + final List topicNames = ImmutableList.of( + "topic_prefix_", "_suffix_topic" + ); + + // Given + topicNames.forEach(topic -> { + // When + final boolean isReadOnly = internalTopics.isReadOnly(topic); + + // Then + assertThat("Should return false on non read-only topic: " + topic, + isReadOnly, is(false)); + }); + } + + @Test + public void shouldRemoveAllHiddenTopics() { + // Given + final Set topics = ImmutableSet.of( + "prefix_name", "literal", "tt", "name1", "suffix", "p_suffix" + ); + + // When + final Set filteredTopics = internalTopics.removeHiddenTopics(topics); + + // Then + assertThat(filteredTopics, is(ImmutableSet.of("tt", "name1", "suffix"))); + } + + @Test + public void shouldReturnCommandTopic() { + // Given/When + final String commandTopic = ReservedInternalTopics.commandTopic(ksqlConfig); + + // Then + assertThat(commandTopic, is("_confluent-ksql-default__command_topic")); + } + + @Test + public void shouldReturnConfigsTopic() { + // Given/When + final String commandTopic = ReservedInternalTopics.configsTopic(ksqlConfig); + + // Then + assertThat(commandTopic, is("_confluent-ksql-default__configs")); + } + + @Test + public void shouldReturnProcessingLogTopic() { + // Given/When + final ProcessingLogConfig processingLogConfig = new ProcessingLogConfig(ImmutableMap.of()); + final String processingLogTopic = ReservedInternalTopics.processingLogTopic( + processingLogConfig, ksqlConfig); + + // Then + assertThat(processingLogTopic, is("default_ksql_processing_log")); + } +} diff --git a/ksql-engine/src/main/java/io/confluent/ksql/engine/InsertValuesExecutor.java b/ksql-engine/src/main/java/io/confluent/ksql/engine/InsertValuesExecutor.java index 6941ed6dd909..3f73952a08e6 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/engine/InsertValuesExecutor.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/engine/InsertValuesExecutor.java @@ -28,6 +28,7 @@ import io.confluent.ksql.execution.expression.tree.VisitParentExpressionVisitor; import io.confluent.ksql.function.FunctionRegistry; import io.confluent.ksql.logging.processing.NoopProcessingLogContext; +import io.confluent.ksql.metastore.MetaStore; import io.confluent.ksql.metastore.model.DataSource; import io.confluent.ksql.metastore.model.DataSource.DataSourceType; import io.confluent.ksql.metastore.model.KeyField; @@ -54,6 +55,7 @@ import io.confluent.ksql.util.KsqlConstants; import io.confluent.ksql.util.KsqlException; import io.confluent.ksql.util.KsqlStatementException; +import io.confluent.ksql.util.ReservedInternalTopics; import io.confluent.ksql.util.SchemaUtil; import java.time.Duration; import java.util.HashMap; @@ -145,11 +147,14 @@ public void execute( final ServiceContext serviceContext ) { final InsertValues insertValues = statement.getStatement(); + final MetaStore metaStore = executionContext.getMetaStore(); final KsqlConfig config = statement.getConfig() .cloneWithPropertyOverwrite(statement.getOverrides()); + final DataSource dataSource = getDataSource(config, metaStore, insertValues); + final ProducerRecord record = - buildRecord(statement, executionContext, serviceContext); + buildRecord(statement, metaStore, dataSource, serviceContext); try { producer.sendRecord(record, serviceContext, config.getProducerClientConfigProps()); @@ -168,21 +173,12 @@ public void execute( } } - private ProducerRecord buildRecord( - final ConfiguredStatement statement, - final KsqlExecutionContext executionContext, - final ServiceContext serviceContext + private DataSource getDataSource( + final KsqlConfig ksqlConfig, + final MetaStore metaStore, + final InsertValues insertValues ) { - throwIfDisabled(statement.getConfig()); - - final InsertValues insertValues = statement.getStatement(); - final KsqlConfig config = statement.getConfig() - .cloneWithPropertyOverwrite(statement.getOverrides()); - - final DataSource dataSource = executionContext - .getMetaStore() - .getSource(insertValues.getTarget()); - + final DataSource dataSource = metaStore.getSource(insertValues.getTarget()); if (dataSource == null) { throw new KsqlException("Cannot insert values into an unknown stream/table: " + insertValues.getTarget()); @@ -192,11 +188,32 @@ private ProducerRecord buildRecord( throw new KsqlException("Cannot insert values into windowed stream/table!"); } + final ReservedInternalTopics internalTopics = new ReservedInternalTopics(ksqlConfig); + if (internalTopics.isReadOnly(dataSource.getKafkaTopicName())) { + throw new KsqlException("Cannot insert values into read-only topic: " + + dataSource.getKafkaTopicName()); + } + + return dataSource; + } + + private ProducerRecord buildRecord( + final ConfiguredStatement statement, + final MetaStore metaStore, + final DataSource dataSource, + final ServiceContext serviceContext + ) { + throwIfDisabled(statement.getConfig()); + + final InsertValues insertValues = statement.getStatement(); + final KsqlConfig config = statement.getConfig() + .cloneWithPropertyOverwrite(statement.getOverrides()); + try { final RowData row = extractRow( insertValues, dataSource, - executionContext.getMetaStore(), + metaStore, config); final byte[] key = serializeKey(row.key, dataSource, config, serviceContext); diff --git a/ksql-engine/src/main/java/io/confluent/ksql/internal/KsqlEngineMetrics.java b/ksql-engine/src/main/java/io/confluent/ksql/internal/KsqlEngineMetrics.java index 02ef0b2e0c2c..8340c7842061 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/internal/KsqlEngineMetrics.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/internal/KsqlEngineMetrics.java @@ -17,8 +17,8 @@ import io.confluent.ksql.engine.KsqlEngine; import io.confluent.ksql.metrics.MetricCollectors; -import io.confluent.ksql.util.KsqlConstants; import io.confluent.ksql.util.QueryMetadata; +import io.confluent.ksql.util.ReservedInternalTopics; import java.io.Closeable; import java.util.ArrayList; import java.util.Arrays; @@ -88,7 +88,8 @@ public KsqlEngineMetrics( final Optional metricsExtension ) { this.ksqlEngine = ksqlEngine; - this.ksqlServiceId = KsqlConstants.KSQL_INTERNAL_TOPIC_PREFIX + ksqlEngine.getServiceId(); + this.ksqlServiceId = ReservedInternalTopics.KSQL_INTERNAL_TOPIC_PREFIX + + ksqlEngine.getServiceId(); this.sensors = new ArrayList<>(); this.countMetrics = new ArrayList<>(); this.metricGroupPrefix = Objects.requireNonNull(metricGroupPrefix, "metricGroupPrefix"); diff --git a/ksql-engine/src/main/java/io/confluent/ksql/query/QueryExecutor.java b/ksql-engine/src/main/java/io/confluent/ksql/query/QueryExecutor.java index 765853399d43..08218e27fa79 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/query/QueryExecutor.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/query/QueryExecutor.java @@ -46,10 +46,10 @@ import io.confluent.ksql.serde.KeyFormat; import io.confluent.ksql.services.ServiceContext; import io.confluent.ksql.util.KsqlConfig; -import io.confluent.ksql.util.KsqlConstants; import io.confluent.ksql.util.KsqlException; import io.confluent.ksql.util.PersistentQueryMetadata; import io.confluent.ksql.util.QueryMetadata; +import io.confluent.ksql.util.ReservedInternalTopics; import io.confluent.ksql.util.TransientQueryMetadata; import java.util.Arrays; import java.util.HashMap; @@ -274,7 +274,7 @@ private KsqlQueryBuilder queryBuilder(final QueryId queryId) { } private String getServiceId() { - return KsqlConstants.KSQL_INTERNAL_TOPIC_PREFIX + return ReservedInternalTopics.KSQL_INTERNAL_TOPIC_PREFIX + ksqlConfig.getString(KsqlConfig.KSQL_SERVICE_ID_CONFIG); } diff --git a/ksql-engine/src/main/java/io/confluent/ksql/services/KafkaTopicClientImpl.java b/ksql-engine/src/main/java/io/confluent/ksql/services/KafkaTopicClientImpl.java index 31c19dbc477f..d311d2678a40 100644 --- a/ksql-engine/src/main/java/io/confluent/ksql/services/KafkaTopicClientImpl.java +++ b/ksql-engine/src/main/java/io/confluent/ksql/services/KafkaTopicClientImpl.java @@ -175,14 +175,6 @@ public Set listTopicNames() { } } - @Override - public Set listNonInternalTopicNames() { - return listTopicNames().stream() - .filter((topic) -> !(topic.startsWith(KsqlConstants.KSQL_INTERNAL_TOPIC_PREFIX) - || topic.startsWith(KsqlConstants.CONFLUENT_INTERNAL_TOPIC_PREFIX))) - .collect(Collectors.toSet()); - } - @Override public Map describeTopics(final Collection topicNames) { try { diff --git a/ksql-engine/src/test/java/io/confluent/ksql/engine/InsertValuesExecutorTest.java b/ksql-engine/src/test/java/io/confluent/ksql/engine/InsertValuesExecutorTest.java index 938b8fad77e7..79dc15bb762c 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/engine/InsertValuesExecutorTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/engine/InsertValuesExecutorTest.java @@ -538,6 +538,66 @@ public void shouldAllowUpcast() { verify(producer).send(new ProducerRecord<>(TOPIC_NAME, null, 1L, KEY, VALUE)); } + @Test + public void shouldThrowWhenInsertValuesOnReservedInternalTopic() { + // Given + givenDataSourceWithSchema("_confluent-ksql-default__command-topic", SCHEMA, + SerdeOption.none(), Optional.of(COL0), false); + + final ConfiguredStatement statement = ConfiguredStatement.of( + PreparedStatement.of( + "", + new InsertValues(SourceName.of("TOPIC"), + allFieldNames(SCHEMA), + ImmutableList.of( + new LongLiteral(1L), + new StringLiteral("str"), + new StringLiteral("str"), + new LongLiteral(2L) + ))), + ImmutableMap.of(), + new KsqlConfig(ImmutableMap.of()) + ); + + // Expect: + expectedException.expect(KsqlException.class); + expectedException.expectMessage( + "Cannot insert values into read-only topic: _confluent-ksql-default__command-topic"); + + // When: + executor.execute(statement, ImmutableMap.of(), engine, serviceContext); + } + + @Test + public void shouldThrowWhenInsertValuesOnProcessingLogTopic() { + // Given + givenDataSourceWithSchema("default_ksql_processing_log", SCHEMA, + SerdeOption.none(), Optional.of(COL0), false); + + final ConfiguredStatement statement = ConfiguredStatement.of( + PreparedStatement.of( + "", + new InsertValues(SourceName.of("TOPIC"), + allFieldNames(SCHEMA), + ImmutableList.of( + new LongLiteral(1L), + new StringLiteral("str"), + new StringLiteral("str"), + new LongLiteral(2L) + ))), + ImmutableMap.of(), + new KsqlConfig(ImmutableMap.of()) + ); + + // Expect: + expectedException.expect(KsqlException.class); + expectedException.expectMessage( + "Cannot insert values into read-only topic: default_ksql_processing_log"); + + // When: + executor.execute(statement, ImmutableMap.of(), engine, serviceContext); + } + @Test public void shouldThrowOnProducerSendError() throws ExecutionException, InterruptedException { // Given: @@ -850,24 +910,25 @@ private void givenSourceStreamWithSchema( final Set serdeOptions, final Optional keyField ) { - givenDataSourceWithSchema(schema, serdeOptions, keyField, false); + givenDataSourceWithSchema(TOPIC_NAME, schema, serdeOptions, keyField, false); } private void givenSourceTableWithSchema( final Set serdeOptions, final Optional keyField ) { - givenDataSourceWithSchema(SCHEMA, serdeOptions, keyField, true); + givenDataSourceWithSchema(TOPIC_NAME, SCHEMA, serdeOptions, keyField, true); } private void givenDataSourceWithSchema( + final String topicName, final LogicalSchema schema, final Set serdeOptions, final Optional keyField, final boolean table ) { final KsqlTopic topic = new KsqlTopic( - TOPIC_NAME, + topicName, KeyFormat.nonWindowed(FormatInfo.of(Format.KAFKA)), ValueFormat.of(FormatInfo.of(Format.JSON)) ); diff --git a/ksql-engine/src/test/java/io/confluent/ksql/internal/KsqlEngineMetricsTest.java b/ksql-engine/src/test/java/io/confluent/ksql/internal/KsqlEngineMetricsTest.java index 6c7a9a9def4e..abe19ab2da40 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/internal/KsqlEngineMetricsTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/internal/KsqlEngineMetricsTest.java @@ -40,6 +40,8 @@ import java.util.Map; import java.util.Optional; import java.util.function.Supplier; + +import io.confluent.ksql.util.ReservedInternalTopics; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; @@ -66,7 +68,8 @@ public class KsqlEngineMetricsTest { private static final String METRIC_GROUP = "testGroup"; private KsqlEngineMetrics engineMetrics; private static final String KSQL_SERVICE_ID = "test-ksql-service-id"; - private static final String metricNamePrefix = KsqlConstants.KSQL_INTERNAL_TOPIC_PREFIX + KSQL_SERVICE_ID; + private static final String metricNamePrefix = ReservedInternalTopics.KSQL_INTERNAL_TOPIC_PREFIX + + KSQL_SERVICE_ID; private static final Map CUSTOM_TAGS = ImmutableMap.of("tag1", "value1", "tag2", "value2"); @Mock diff --git a/ksql-engine/src/test/java/io/confluent/ksql/services/FakeKafkaTopicClient.java b/ksql-engine/src/test/java/io/confluent/ksql/services/FakeKafkaTopicClient.java index 3c2867fc215f..2ceb8aa5c9ff 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/services/FakeKafkaTopicClient.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/services/FakeKafkaTopicClient.java @@ -20,7 +20,7 @@ import com.google.common.collect.Sets; import io.confluent.ksql.topic.TopicProperties; -import io.confluent.ksql.util.KsqlConstants; + import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -31,6 +31,7 @@ import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.IntStream; + import org.apache.kafka.clients.admin.CreateTopicsOptions; import org.apache.kafka.clients.admin.TopicDescription; import org.apache.kafka.common.Node; @@ -155,14 +156,6 @@ public Set listTopicNames() { return topicMap.keySet(); } - @Override - public Set listNonInternalTopicNames() { - return topicMap.keySet().stream() - .filter((topic) -> (!topic.startsWith(KsqlConstants.KSQL_INTERNAL_TOPIC_PREFIX) - || !topic.startsWith(KsqlConstants.CONFLUENT_INTERNAL_TOPIC_PREFIX))) - .collect(Collectors.toSet()); - } - @Override public Map describeTopics(final Collection topicNames) { return topicNames.stream() diff --git a/ksql-engine/src/test/java/io/confluent/ksql/services/KafkaTopicClientImplTest.java b/ksql-engine/src/test/java/io/confluent/ksql/services/KafkaTopicClientImplTest.java index 061e422737b7..0daf095db777 100644 --- a/ksql-engine/src/test/java/io/confluent/ksql/services/KafkaTopicClientImplTest.java +++ b/ksql-engine/src/test/java/io/confluent/ksql/services/KafkaTopicClientImplTest.java @@ -33,7 +33,6 @@ import io.confluent.ksql.exception.KafkaResponseGetFailedException; import io.confluent.ksql.exception.KafkaTopicExistsException; import io.confluent.ksql.exception.KsqlTopicAuthorizationException; -import io.confluent.ksql.util.KsqlConstants; import java.util.Arrays; import java.util.Collection; import java.util.Collections; @@ -47,6 +46,8 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; + +import io.confluent.ksql.util.ReservedInternalTopics; import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.AlterConfigOp; import org.apache.kafka.clients.admin.AlterConfigOp.OpType; @@ -95,18 +96,17 @@ public class KafkaTopicClientImplTest { private static final String topicName2 = "topic2"; private static final String topicName3 = "topic3"; private static final String internalTopic1 = String.format("%s%s_%s", - KsqlConstants.KSQL_INTERNAL_TOPIC_PREFIX, + ReservedInternalTopics.KSQL_INTERNAL_TOPIC_PREFIX, "default", "query_CTAS_USERS_BY_CITY-KSTREAM-AGGREGATE" + "-STATE-STORE-0000000006-repartition"); private static final String internalTopic2 = String.format("%s%s_%s", - KsqlConstants.KSQL_INTERNAL_TOPIC_PREFIX, + ReservedInternalTopics.KSQL_INTERNAL_TOPIC_PREFIX, "default", "query_CTAS_USERS_BY_CITY-KSTREAM-AGGREGATE" + "-STATE-STORE-0000000006-changelog"); - private static final String confluentInternalTopic = - String.format("%s-%s", KsqlConstants.CONFLUENT_INTERNAL_TOPIC_PREFIX, - "confluent-control-center"); + private static final String internalTopic = String.format("%s_internal", + ReservedInternalTopics.KSQL_INTERNAL_TOPIC_PREFIX); private Node node; @Mock private AdminClient adminClient; @@ -276,16 +276,6 @@ public void shouldRetryListTopics() { verify(adminClient); } - @Test - public void shouldFilterInternalTopics() { - expect(adminClient.listTopics()).andReturn(getListTopicsResultWithInternalTopics()); - replay(adminClient); - final KafkaTopicClient kafkaTopicClient = new KafkaTopicClientImpl(() -> adminClient); - final Set names = kafkaTopicClient.listNonInternalTopicNames(); - assertThat(names, equalTo(Utils.mkSet(topicName1, topicName2, topicName3))); - verify(adminClient); - } - @Test public void shouldListTopicNames() { expect(adminClient.listTopics()).andReturn(getListTopicsResult()); @@ -326,7 +316,7 @@ public void shouldDeleteInternalTopics() { replay(adminClient); final KafkaTopicClient kafkaTopicClient = new KafkaTopicClientImpl(() -> adminClient); final String applicationId = String.format("%s%s", - KsqlConstants.KSQL_INTERNAL_TOPIC_PREFIX, + ReservedInternalTopics.KSQL_INTERNAL_TOPIC_PREFIX, "default_query_CTAS_USERS_BY_CITY"); kafkaTopicClient.deleteInternalTopics(applicationId); verify(adminClient); @@ -656,7 +646,7 @@ private static ListTopicsResult getListTopicsResultWithInternalTopics() { final ListTopicsResult listTopicsResult = mock(ListTopicsResult.class); final List topicNamesList = Arrays.asList(topicName1, topicName2, topicName3, internalTopic1, internalTopic2, - confluentInternalTopic); + internalTopic); expect(listTopicsResult.names()) .andReturn(KafkaFuture.completedFuture(new HashSet<>(topicNamesList))); replay(listTopicsResult); diff --git a/ksql-execution/src/main/java/io/confluent/ksql/services/KafkaTopicClient.java b/ksql-execution/src/main/java/io/confluent/ksql/services/KafkaTopicClient.java index 54df75227c3a..449fa3c96ccb 100644 --- a/ksql-execution/src/main/java/io/confluent/ksql/services/KafkaTopicClient.java +++ b/ksql-execution/src/main/java/io/confluent/ksql/services/KafkaTopicClient.java @@ -140,13 +140,6 @@ void createTopic( */ Set listTopicNames(); - /** - * Call to retrieve list of internal topics - * - * @return set of all non-internal topics - */ - Set listNonInternalTopicNames(); - /** * Call to get a one or more topic's description. * diff --git a/ksql-functional-tests/src/main/java/io/confluent/ksql/test/tools/stubs/StubKafkaTopicClient.java b/ksql-functional-tests/src/main/java/io/confluent/ksql/test/tools/stubs/StubKafkaTopicClient.java index 0ff06f205953..78fe1e272545 100644 --- a/ksql-functional-tests/src/main/java/io/confluent/ksql/test/tools/stubs/StubKafkaTopicClient.java +++ b/ksql-functional-tests/src/main/java/io/confluent/ksql/test/tools/stubs/StubKafkaTopicClient.java @@ -22,7 +22,6 @@ import io.confluent.ksql.exception.KafkaTopicExistsException; import io.confluent.ksql.services.KafkaTopicClient; import io.confluent.ksql.topic.TopicProperties; -import io.confluent.ksql.util.KsqlConstants; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -137,14 +136,6 @@ public Set listTopicNames() { return topicMap.keySet(); } - @Override - public Set listNonInternalTopicNames() { - return topicMap.keySet().stream() - .filter((topic) -> (!topic.startsWith(KsqlConstants.KSQL_INTERNAL_TOPIC_PREFIX) - || !topic.startsWith(KsqlConstants.CONFLUENT_INTERNAL_TOPIC_PREFIX))) - .collect(Collectors.toSet()); - } - @Override public Map describeTopics(final Collection topicNames) { return topicNames.stream() diff --git a/ksql-parser/src/main/antlr4/io/confluent/ksql/parser/SqlBase.g4 b/ksql-parser/src/main/antlr4/io/confluent/ksql/parser/SqlBase.g4 index fd8edda4c95f..049de85a48a3 100644 --- a/ksql-parser/src/main/antlr4/io/confluent/ksql/parser/SqlBase.g4 +++ b/ksql-parser/src/main/antlr4/io/confluent/ksql/parser/SqlBase.g4 @@ -34,7 +34,7 @@ singleExpression statement : query #queryStatement | (LIST | SHOW) PROPERTIES #listProperties - | (LIST | SHOW) TOPICS EXTENDED? #listTopics + | (LIST | SHOW) ALL? TOPICS EXTENDED? #listTopics | (LIST | SHOW) STREAMS EXTENDED? #listStreams | (LIST | SHOW) TABLES EXTENDED? #listTables | (LIST | SHOW) FUNCTIONS #listFunctions diff --git a/ksql-parser/src/main/java/io/confluent/ksql/parser/AstBuilder.java b/ksql-parser/src/main/java/io/confluent/ksql/parser/AstBuilder.java index 66e891b6ddf0..540db70d009b 100644 --- a/ksql-parser/src/main/java/io/confluent/ksql/parser/AstBuilder.java +++ b/ksql-parser/src/main/java/io/confluent/ksql/parser/AstBuilder.java @@ -579,7 +579,8 @@ public Node visitSelectSingle(final SqlBaseParser.SelectSingleContext context) { @Override public Node visitListTopics(final SqlBaseParser.ListTopicsContext context) { - return new ListTopics(getLocation(context), context.EXTENDED() != null); + return new ListTopics(getLocation(context), + context.ALL() != null, context.EXTENDED() != null); } @Override diff --git a/ksql-parser/src/main/java/io/confluent/ksql/parser/tree/ListTopics.java b/ksql-parser/src/main/java/io/confluent/ksql/parser/tree/ListTopics.java index 5ed87de7d479..32ffef24c189 100644 --- a/ksql-parser/src/main/java/io/confluent/ksql/parser/tree/ListTopics.java +++ b/ksql-parser/src/main/java/io/confluent/ksql/parser/tree/ListTopics.java @@ -23,13 +23,23 @@ public class ListTopics extends Statement { + private final boolean showAll; private final boolean showExtended; - public ListTopics(final Optional location, final boolean showExtended) { + public ListTopics( + final Optional location, + final boolean showAll, + final boolean showExtended + ) { super(location); + this.showAll = showAll; this.showExtended = showExtended; } + public boolean getShowAll() { + return showAll; + } + public boolean getShowExtended() { return showExtended; } @@ -43,17 +53,18 @@ public boolean equals(final Object o) { return false; } final ListTopics that = (ListTopics) o; - return showExtended == that.showExtended; + return showAll == that.showAll && showExtended == that.showExtended; } @Override public int hashCode() { - return Objects.hash(showExtended); + return Objects.hash(showAll, showExtended); } @Override public String toString() { return toStringHelper(this) + .add("showAll", showAll) .add("showExtended", showExtended) .toString(); } diff --git a/ksql-parser/src/test/java/io/confluent/ksql/parser/KsqlParserTest.java b/ksql-parser/src/test/java/io/confluent/ksql/parser/KsqlParserTest.java index 43757b0b5e48..577ee33ea02d 100644 --- a/ksql-parser/src/test/java/io/confluent/ksql/parser/KsqlParserTest.java +++ b/ksql-parser/src/test/java/io/confluent/ksql/parser/KsqlParserTest.java @@ -692,7 +692,7 @@ public void testShowTopics() { // Then: Assert.assertTrue(statement instanceof ListTopics); - Assert.assertThat(listTopics.toString(), is("ListTopics{showExtended=false}")); + Assert.assertThat(listTopics.toString(), is("ListTopics{showAll=false, showExtended=false}")); Assert.assertThat(listTopics.getShowExtended(), is(false)); } diff --git a/ksql-parser/src/test/java/io/confluent/ksql/parser/tree/ListTopicsTest.java b/ksql-parser/src/test/java/io/confluent/ksql/parser/tree/ListTopicsTest.java index 59e33b0b28f9..6b5badaa354b 100644 --- a/ksql-parser/src/test/java/io/confluent/ksql/parser/tree/ListTopicsTest.java +++ b/ksql-parser/src/test/java/io/confluent/ksql/parser/tree/ListTopicsTest.java @@ -30,12 +30,14 @@ public void shouldImplementHashCodeAndEqualsProperty() { // Note: At the moment location does not take part in equality testing new EqualsTester() .addEqualityGroup( - new ListTopics(Optional.of(SOME_LOCATION), true), - new ListTopics(Optional.of(OTHER_LOCATION), true) + new ListTopics(Optional.of(SOME_LOCATION), true, true), + new ListTopics(Optional.of(OTHER_LOCATION), true, true) ) .addEqualityGroup( - new ListTopics(Optional.of(SOME_LOCATION), false), - new ListTopics(Optional.of(OTHER_LOCATION), false) + new ListTopics(Optional.of(SOME_LOCATION), false, true) + ) + .addEqualityGroup( + new ListTopics(Optional.of(SOME_LOCATION), true,false) ) .testEquals(); } diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java index 7d78f466dd59..eeea549f4573 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java @@ -86,6 +86,7 @@ import io.confluent.ksql.statement.ConfiguredStatement; import io.confluent.ksql.util.KsqlConfig; import io.confluent.ksql.util.KsqlServerException; +import io.confluent.ksql.util.ReservedInternalTopics; import io.confluent.ksql.util.RetryUtil; import io.confluent.ksql.util.Version; import io.confluent.ksql.util.WelcomeMsgUtils; @@ -565,8 +566,7 @@ static KsqlRestApplication buildApplication( UserFunctionLoader.newInstance(ksqlConfig, functionRegistry, ksqlInstallDir).load(); - final String commandTopicName = KsqlInternalTopicUtils.getTopicName( - ksqlConfig, KsqlRestConfig.COMMAND_TOPIC_SUFFIX); + final String commandTopicName = ReservedInternalTopics.commandTopic(ksqlConfig); final CommandStore commandStore = CommandStore.Factory.create( commandTopicName, diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestConfig.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestConfig.java index 5c1d1b134e2e..d523732fb3bc 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestConfig.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestConfig.java @@ -77,8 +77,6 @@ public class KsqlRestConfig extends RestConfig { private static final String INSTALL_DIR_DOC = "The directory that ksql is installed in. This is set in the ksql-server-start script."; - static final String COMMAND_TOPIC_SUFFIX = "command_topic"; - static final String KSQL_WEBSOCKETS_NUM_THREADS = KSQL_CONFIG_PREFIX + "server.websockets.num.threads"; private static final String KSQL_WEBSOCKETS_NUM_THREADS_DOC = diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/StandaloneExecutorFactory.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/StandaloneExecutorFactory.java index f56a42fc667c..0033fb044022 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/StandaloneExecutorFactory.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/StandaloneExecutorFactory.java @@ -34,6 +34,7 @@ import io.confluent.ksql.statement.Injector; import io.confluent.ksql.statement.Injectors; import io.confluent.ksql.util.KsqlConfig; +import io.confluent.ksql.util.ReservedInternalTopics; import io.confluent.ksql.version.metrics.KsqlVersionCheckerAgent; import io.confluent.ksql.version.metrics.VersionCheckerAgent; import java.util.Map; @@ -43,8 +44,6 @@ public final class StandaloneExecutorFactory { - static final String CONFIG_TOPIC_SUFFIX = "configs"; - private StandaloneExecutorFactory(){ } @@ -93,8 +92,7 @@ static StandaloneExecutor create( final ServiceContext serviceContext = serviceContextFactory.apply(baseConfig); - final String configTopicName - = KsqlInternalTopicUtils.getTopicName(baseConfig, CONFIG_TOPIC_SUFFIX); + final String configTopicName = ReservedInternalTopics.configsTopic(baseConfig); KsqlInternalTopicUtils.ensureTopic( configTopicName, baseConfig, diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandRunnerStatusMetric.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandRunnerStatusMetric.java index ebb9ee968ea2..9bf96e39ac01 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandRunnerStatusMetric.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandRunnerStatusMetric.java @@ -17,7 +17,7 @@ import com.google.common.annotations.VisibleForTesting; import io.confluent.ksql.metrics.MetricCollectors; -import io.confluent.ksql.util.KsqlConstants; +import io.confluent.ksql.util.ReservedInternalTopics; import java.io.Closeable; import java.util.Collections; @@ -63,7 +63,7 @@ public class CommandRunnerStatusMetric implements Closeable { this.metricGroupName = metricsGroupPrefix + METRIC_GROUP_POST_FIX; this.metricName = metrics.metricName( "status", - KsqlConstants.KSQL_INTERNAL_TOPIC_PREFIX + ksqlServiceId + metricGroupName, + ReservedInternalTopics.KSQL_INTERNAL_TOPIC_PREFIX + ksqlServiceId + metricGroupName, "The status of the commandRunner thread as it processes the command topic.", Collections.emptyMap() ); diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/DistributingExecutor.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/DistributingExecutor.java index 75144e15d21d..df4f9c461e5b 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/DistributingExecutor.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/DistributingExecutor.java @@ -16,6 +16,8 @@ import io.confluent.ksql.KsqlExecutionContext; import io.confluent.ksql.metastore.MetaStore; +import io.confluent.ksql.metastore.model.DataSource; +import io.confluent.ksql.parser.tree.InsertInto; import io.confluent.ksql.parser.tree.Statement; import io.confluent.ksql.rest.entity.CommandId; import io.confluent.ksql.rest.entity.CommandStatus; @@ -26,7 +28,10 @@ import io.confluent.ksql.services.ServiceContext; import io.confluent.ksql.statement.ConfiguredStatement; import io.confluent.ksql.statement.Injector; +import io.confluent.ksql.util.KsqlConfig; +import io.confluent.ksql.util.KsqlException; import io.confluent.ksql.util.KsqlServerException; +import io.confluent.ksql.util.ReservedInternalTopics; import java.time.Duration; import java.util.Objects; import java.util.Optional; @@ -49,8 +54,10 @@ public class DistributingExecutor { private final Optional authorizationValidator; private final ValidatedCommandFactory validatedCommandFactory; private final CommandIdAssigner commandIdAssigner; + private final ReservedInternalTopics internalTopics; public DistributingExecutor( + final KsqlConfig ksqlConfig, final CommandQueue commandQueue, final Duration distributedCmdResponseTimeout, final BiFunction injectorFactory, @@ -68,6 +75,8 @@ public DistributingExecutor( "validatedCommandFactory" ); this.commandIdAssigner = new CommandIdAssigner(); + this.internalTopics = + new ReservedInternalTopics(Objects.requireNonNull(ksqlConfig, "ksqlConfig")); } /** @@ -89,6 +98,13 @@ public Optional execute( .apply(executionContext, securityContext.getServiceContext()) .inject(statement); + if (injected.getStatement() instanceof InsertInto) { + throwIfInsertOnReadOnlyTopic( + executionContext.getMetaStore(), + (InsertInto)injected.getStatement() + ); + } + checkAuthorization(injected, securityContext, executionContext); final Producer transactionalProducer = @@ -159,4 +175,20 @@ private void checkAuthorization( throw new KsqlServerException("The KSQL server is not permitted to execute the command", e); } } + + private void throwIfInsertOnReadOnlyTopic( + final MetaStore metaStore, + final InsertInto insertInto + ) { + final DataSource dataSource = metaStore.getSource(insertInto.getTarget()); + if (dataSource == null) { + throw new KsqlException("Cannot insert into an unknown stream/table: " + + insertInto.getTarget()); + } + + if (internalTopics.isReadOnly(dataSource.getKafkaTopicName())) { + throw new KsqlException("Cannot insert into read-only topic: " + + dataSource.getKafkaTopicName()); + } + } } diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/ListTopicsExecutor.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/ListTopicsExecutor.java index 96da8ecec210..db671e1fffaf 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/ListTopicsExecutor.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/execution/ListTopicsExecutor.java @@ -28,8 +28,7 @@ import io.confluent.ksql.util.KafkaConsumerGroupClient; import io.confluent.ksql.util.KafkaConsumerGroupClient.ConsumerSummary; import io.confluent.ksql.util.KafkaConsumerGroupClientImpl; -import io.confluent.ksql.util.KsqlConfig; -import io.confluent.ksql.util.KsqlConstants; +import io.confluent.ksql.util.ReservedInternalTopics; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -58,12 +57,7 @@ public static Optional execute( final ServiceContext serviceContext ) { final KafkaTopicClient client = serviceContext.getTopicClient(); - - final Map kafkaTopicDescriptions - = client.describeTopics(client.listNonInternalTopicNames()); - - final Map filteredDescriptions = new TreeMap<>( - filterKsqlInternalTopics(kafkaTopicDescriptions, statement.getConfig())); + final Map topicDescriptions = listTopics(client, statement); if (statement.getStatement().getShowExtended()) { final KafkaConsumerGroupClient consumerGroupClient @@ -71,7 +65,7 @@ public static Optional execute( final Map> topicConsumersAndGroupCount = getTopicConsumerAndGroupCounts(consumerGroupClient); - final List topicInfoExtendedList = filteredDescriptions.values() + final List topicInfoExtendedList = topicDescriptions.values() .stream().map(desc -> topicDescriptionToTopicInfoExtended(desc, topicConsumersAndGroupCount)) .collect(Collectors.toList()); @@ -79,7 +73,7 @@ public static Optional execute( return Optional.of( new KafkaTopicsListExtended(statement.getStatementText(), topicInfoExtendedList)); } else { - final List topicInfoList = filteredDescriptions.values() + final List topicInfoList = topicDescriptions.values() .stream().map(desc -> topicDescriptionToTopicInfo(desc)) .collect(Collectors.toList()); @@ -87,6 +81,20 @@ public static Optional execute( } } + private static Map listTopics( + final KafkaTopicClient topicClient, + final ConfiguredStatement statement + ) { + final ReservedInternalTopics internalTopics = new ReservedInternalTopics(statement.getConfig()); + + final Set topics = statement.getStatement().getShowAll() + ? topicClient.listTopicNames() + : internalTopics.removeHiddenTopics(topicClient.listTopicNames()); + + // TreeMap is used to keep elements sorted + return new TreeMap<>(topicClient.describeTopics(topics)); + } + private static KafkaTopicInfo topicDescriptionToTopicInfo(final TopicDescription description) { return new KafkaTopicInfo( description.name(), @@ -110,27 +118,6 @@ private static KafkaTopicInfoExtended topicDescriptionToTopicInfoExtended( consumerAndGroupCount.get(1)); } - private static Map filterKsqlInternalTopics( - final Map kafkaTopicDescriptions, - final KsqlConfig ksqlConfig - ) { - final Map filteredKafkaTopics = new HashMap<>(); - final String serviceId = KsqlConstants.KSQL_INTERNAL_TOPIC_PREFIX - + ksqlConfig.getString(KsqlConfig.KSQL_SERVICE_ID_CONFIG); - final String persistentQueryPrefix = ksqlConfig.getString( - KsqlConfig.KSQL_PERSISTENT_QUERY_NAME_PREFIX_CONFIG); - final String transientQueryPrefix = ksqlConfig.getString( - KsqlConfig.KSQL_TRANSIENT_QUERY_NAME_PREFIX_CONFIG); - - for (final Map.Entry entry : kafkaTopicDescriptions.entrySet()) { - if (!entry.getKey().startsWith(serviceId + persistentQueryPrefix) - && !entry.getKey().startsWith(serviceId + transientQueryPrefix)) { - filteredKafkaTopics.put(entry.getKey(), entry.getValue()); - } - } - return filteredKafkaTopics; - } - /** * @return all topics with their associated consumerCount and consumerGroupCount */ diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/KsqlResource.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/KsqlResource.java index f7d0044b99ae..3f36e789ca6f 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/KsqlResource.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/server/resources/KsqlResource.java @@ -162,6 +162,7 @@ public void configure(final KsqlConfig config) { this.handler = new RequestHandler( CustomExecutors.EXECUTOR_MAP, new DistributingExecutor( + config, commandQueue, distributedCmdResponseTimeout, injectorFactory, diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/util/KsqlInternalTopicUtils.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/util/KsqlInternalTopicUtils.java index d949e531fcb6..efb034e0057f 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/util/KsqlInternalTopicUtils.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/util/KsqlInternalTopicUtils.java @@ -18,7 +18,6 @@ import com.google.common.collect.ImmutableMap; import io.confluent.ksql.services.KafkaTopicClient; import io.confluent.ksql.util.KsqlConfig; -import io.confluent.ksql.util.KsqlConstants; import org.apache.kafka.clients.admin.TopicDescription; import org.apache.kafka.common.config.TopicConfig; import org.slf4j.Logger; @@ -39,22 +38,6 @@ public final class KsqlInternalTopicUtils { private KsqlInternalTopicUtils() { } - /** - * Compute a name for an internal topic. - * - * @param ksqlConfig The KSQL config, which is used to extract the internal topic prefix. - * @param topicSuffix A suffix that is appended to the topic name. - * @return The computed topic name. - */ - public static String getTopicName(final KsqlConfig ksqlConfig, final String topicSuffix) { - return String.format( - "%s%s_%s", - KsqlConstants.KSQL_INTERNAL_TOPIC_PREFIX, - ksqlConfig.getString(KsqlConfig.KSQL_SERVICE_ID_CONFIG), - topicSuffix - ); - } - /** * Ensure that an internal topic exists with the requested configuration, creating it * if necessary. This function will also fix the retention time on topics if it detects diff --git a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/util/ProcessingLogServerUtils.java b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/util/ProcessingLogServerUtils.java index 44d4d541091f..c7cb98b73594 100644 --- a/ksql-rest-app/src/main/java/io/confluent/ksql/rest/util/ProcessingLogServerUtils.java +++ b/ksql-rest-app/src/main/java/io/confluent/ksql/rest/util/ProcessingLogServerUtils.java @@ -24,6 +24,7 @@ import io.confluent.ksql.services.KafkaTopicClient; import io.confluent.ksql.util.IdentifierUtil; import io.confluent.ksql.util.KsqlConfig; +import io.confluent.ksql.util.ReservedInternalTopics; import java.util.Optional; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaAndValue; @@ -50,16 +51,7 @@ static Schema getMessageSchema() { public static String getTopicName( final ProcessingLogConfig config, final KsqlConfig ksqlConfig) { - final String topicNameConfig = config.getString(ProcessingLogConfig.TOPIC_NAME); - if (topicNameConfig.equals(ProcessingLogConfig.TOPIC_NAME_NOT_SET)) { - return String.format( - "%s%s", - ksqlConfig.getString(KsqlConfig.KSQL_SERVICE_ID_CONFIG), - ProcessingLogConfig.TOPIC_NAME_DEFAULT_SUFFIX - ); - } else { - return topicNameConfig; - } + return ReservedInternalTopics.processingLogTopic(config, ksqlConfig); } public static Optional maybeCreateProcessingLogTopic( diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/StandaloneExecutorFactoryTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/StandaloneExecutorFactoryTest.java index 9679fe77f600..d857a25140c2 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/StandaloneExecutorFactoryTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/StandaloneExecutorFactoryTest.java @@ -16,10 +16,10 @@ import io.confluent.ksql.engine.KsqlEngine; import io.confluent.ksql.rest.server.StandaloneExecutorFactory.StandaloneExecutorConstructor; import io.confluent.ksql.rest.server.computation.ConfigStore; -import io.confluent.ksql.rest.util.KsqlInternalTopicUtils; import io.confluent.ksql.services.KafkaTopicClient; import io.confluent.ksql.services.ServiceContext; import io.confluent.ksql.util.KsqlConfig; +import io.confluent.ksql.util.ReservedInternalTopics; import io.confluent.ksql.version.metrics.VersionCheckerAgent; import java.util.Collections; import java.util.Map; @@ -47,10 +47,7 @@ public class StandaloneExecutorFactoryTest { private final Map properties = Collections.emptyMap(); private final KsqlConfig baseConfig = new KsqlConfig(properties); private final KsqlConfig mergedConfig = new KsqlConfig(Collections.emptyMap()); - private final String configTopicName = KsqlInternalTopicUtils.getTopicName( - baseConfig, - StandaloneExecutorFactory.CONFIG_TOPIC_SUFFIX - ); + private final String configTopicName = ReservedInternalTopics.configsTopic(baseConfig); @Mock private Function serviceContextFactory; diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/TestKsqlRestApp.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/TestKsqlRestApp.java index f81cd49fcc66..dc30a58d1766 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/TestKsqlRestApp.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/TestKsqlRestApp.java @@ -48,6 +48,7 @@ import io.confluent.ksql.services.SimpleKsqlClient; import io.confluent.ksql.test.util.EmbeddedSingleNodeKafkaCluster; import io.confluent.ksql.util.KsqlConfig; +import io.confluent.ksql.util.ReservedInternalTopics; import io.confluent.ksql.version.metrics.VersionCheckerAgent; import io.confluent.rest.ApplicationServer; import io.confluent.rest.validation.JacksonMessageBodyProvider; @@ -192,10 +193,7 @@ public KsqlRestClient buildKsqlClient(final Optional credentia } public static String getCommandTopicName() { - return KsqlInternalTopicUtils.getTopicName( - new KsqlConfig(ImmutableMap.of()), - KsqlRestConfig.COMMAND_TOPIC_SUFFIX - ); + return ReservedInternalTopics.commandTopic(new KsqlConfig(ImmutableMap.of())); } public Set getPersistentQueries() { diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/DistributingExecutorTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/DistributingExecutorTest.java index 3fa668628ff6..3b0b3a2056b6 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/DistributingExecutorTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/computation/DistributingExecutorTest.java @@ -21,6 +21,7 @@ import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; @@ -32,11 +33,15 @@ import io.confluent.ksql.exception.KsqlTopicAuthorizationException; import io.confluent.ksql.execution.expression.tree.StringLiteral; import io.confluent.ksql.metastore.MetaStore; +import io.confluent.ksql.metastore.model.DataSource; +import io.confluent.ksql.metastore.model.KsqlStream; import io.confluent.ksql.name.SourceName; import io.confluent.ksql.parser.KsqlParser.PreparedStatement; import io.confluent.ksql.parser.properties.with.CreateSourceProperties; import io.confluent.ksql.parser.tree.CreateStream; +import io.confluent.ksql.parser.tree.InsertInto; import io.confluent.ksql.parser.tree.ListProperties; +import io.confluent.ksql.parser.tree.Query; import io.confluent.ksql.parser.tree.Statement; import io.confluent.ksql.parser.tree.TableElements; import io.confluent.ksql.properties.with.CommonCreateConfigs; @@ -148,6 +153,7 @@ public void setUp() throws InterruptedException { securityContext = new KsqlSecurityContext(Optional.empty(), serviceContext); distributor = new DistributingExecutor( + KSQL_CONFIG, queue, DURATION_10_MS, (ec, sc) -> InjectorChain.of(schemaInjector, topicInjector), @@ -282,4 +288,61 @@ public void shouldThrowServerExceptionIfServerServiceContextIsDeniedAuthorizatio // When: distributor.execute(configured, executionContext, userSecurityContext); } + + @Test + public void shouldThrowExceptionWhenInsertIntoUnknownStream() { + // Given + final PreparedStatement preparedStatement = + PreparedStatement.of("", new InsertInto(SourceName.of("s1"), mock(Query.class))); + final ConfiguredStatement configured = + ConfiguredStatement.of(preparedStatement, ImmutableMap.of(), KSQL_CONFIG); + doReturn(null).when(metaStore).getSource(SourceName.of("s1")); + + // Expect: + expectedException.expect(KsqlException.class); + expectedException.expectMessage("Cannot insert into an unknown stream/table: `s1`"); + + // When: + distributor.execute(configured, executionContext, mock(KsqlSecurityContext.class)); + } + + @Test + public void shouldThrowExceptionWhenInsertIntoReadOnlyTopic() { + // Given + final PreparedStatement preparedStatement = + PreparedStatement.of("", new InsertInto(SourceName.of("s1"), mock(Query.class))); + final ConfiguredStatement configured = + ConfiguredStatement.of(preparedStatement, ImmutableMap.of(), KSQL_CONFIG); + final DataSource dataSource = mock(DataSource.class); + doReturn(dataSource).when(metaStore).getSource(SourceName.of("s1")); + when(dataSource.getKafkaTopicName()).thenReturn("_confluent-ksql-default__command-topic"); + + // Expect: + expectedException.expect(KsqlException.class); + expectedException.expectMessage("Cannot insert into read-only topic: " + + "_confluent-ksql-default__command-topic"); + + // When: + distributor.execute(configured, executionContext, mock(KsqlSecurityContext.class)); + } + + @Test + public void shouldThrowExceptionWhenInsertIntoProcessingLogTopic() { + // Given + final PreparedStatement preparedStatement = + PreparedStatement.of("", new InsertInto(SourceName.of("s1"), mock(Query.class))); + final ConfiguredStatement configured = + ConfiguredStatement.of(preparedStatement, ImmutableMap.of(), KSQL_CONFIG); + final DataSource dataSource = mock(DataSource.class); + doReturn(dataSource).when(metaStore).getSource(SourceName.of("s1")); + when(dataSource.getKafkaTopicName()).thenReturn("default_ksql_processing_log"); + + // Expect: + expectedException.expect(KsqlException.class); + expectedException.expectMessage("Cannot insert into read-only topic: " + + "default_ksql_processing_log"); + + // When: + distributor.execute(configured, executionContext, mock(KsqlSecurityContext.class)); + } } diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/ListTopicsExecutorTest.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/ListTopicsExecutorTest.java index 008b7faedc07..42c0efdf4c40 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/ListTopicsExecutorTest.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/execution/ListTopicsExecutorTest.java @@ -62,10 +62,11 @@ public void setUp() { } @Test - public void shouldListKafkaTopics() { + public void shouldListKafkaTopicsWithoutInternalTopics() { // Given: engine.givenKafkaTopic("topic1"); engine.givenKafkaTopic("topic2"); + engine.givenKafkaTopic("_confluent_any_topic"); // When: final KafkaTopicsList topicsList = @@ -83,6 +84,30 @@ public void shouldListKafkaTopics() { )); } + @Test + public void shouldListKafkaTopicsIncludingInternalTopics() { + // Given: + engine.givenKafkaTopic("topic1"); + engine.givenKafkaTopic("topic2"); + engine.givenKafkaTopic("_confluent_any_topic"); + + // When: + final KafkaTopicsList topicsList = + (KafkaTopicsList) CustomExecutors.LIST_TOPICS.execute( + engine.configure("LIST ALL TOPICS;"), + ImmutableMap.of(), + engine.getEngine(), + serviceContext + ).orElseThrow(IllegalStateException::new); + + // Then: + assertThat(topicsList.getTopics(), containsInAnyOrder( + new KafkaTopicInfo("topic1", ImmutableList.of(1)), + new KafkaTopicInfo("topic2", ImmutableList.of(1)), + new KafkaTopicInfo("_confluent_any_topic", ImmutableList.of(1)) + )); + } + @Test public void shouldListKafkaTopicsThatDifferByCase() { // Given: diff --git a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/mock/MockKafkaTopicClient.java b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/mock/MockKafkaTopicClient.java index 16ba03540727..1ef19c1fa9ec 100644 --- a/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/mock/MockKafkaTopicClient.java +++ b/ksql-rest-app/src/test/java/io/confluent/ksql/rest/server/mock/MockKafkaTopicClient.java @@ -47,11 +47,6 @@ public Set listTopicNames() { return Collections.emptySet(); } - @Override - public Set listNonInternalTopicNames() { - return Collections.EMPTY_SET; - } - @Override public Map describeTopics(final Collection topicNames) { return Collections.emptyMap();