Skip to content

Commit

Permalink
feat: support CREATE OR REPLACE w/ config guard but w/o restrictions (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
agavra committed Jul 6, 2020
1 parent c6b5e57 commit e7ff81a
Show file tree
Hide file tree
Showing 31 changed files with 372 additions and 95 deletions.
12 changes: 12 additions & 0 deletions ksqldb-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,11 @@ public class KsqlConfig extends AbstractConfig {
+ " and the regex pattern will be matched against the error class name and message of any "
+ "uncaught error and subsequent error causes in the Kafka Streams applications.";

public static final String KSQL_CREATE_OR_REPLACE_ENABLED = "ksql.create.or.replace.enabled";
public static final Boolean KSQL_CREATE_OR_REPLACE_ENABLED_DEFAULT = false;
public static final String KSQL_CREATE_OR_REPLACE_ENABLED_DOC =
"Feature flag for CREATE OR REPLACE";

private enum ConfigGeneration {
LEGACY,
CURRENT
Expand Down Expand Up @@ -653,6 +658,13 @@ private static ConfigDef buildConfigDef(final ConfigGeneration generation) {
Importance.LOW,
KSQL_ERROR_CLASSIFIER_REGEX_PREFIX_DOC
)
.define(
KSQL_CREATE_OR_REPLACE_ENABLED,
Type.BOOLEAN,
KSQL_CREATE_OR_REPLACE_ENABLED_DEFAULT,
Importance.LOW,
KSQL_CREATE_OR_REPLACE_ENABLED_DOC
)
.withClientSslSupport();

for (final CompatibilityBreakingConfigDef compatibilityBreakingConfigDef
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,8 @@ public CreateStreamCommand createStreamCommand(
topic.getKafkaTopicName(),
io.confluent.ksql.execution.plan.Formats
.of(topic.getKeyFormat(), topic.getValueFormat(), serdeOptions),
topic.getKeyFormat().getWindowInfo()
topic.getKeyFormat().getWindowInfo(),
Optional.of(statement.isOrReplace())
);
}

Expand Down Expand Up @@ -154,7 +155,8 @@ public CreateTableCommand createTableCommand(
topic.getKafkaTopicName(),
io.confluent.ksql.execution.plan.Formats
.of(topic.getKeyFormat(), topic.getValueFormat(), serdeOptions),
topic.getKeyFormat().getWindowInfo()
topic.getKeyFormat().getWindowInfo(),
Optional.of(statement.isOrReplace())
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public DdlCommandResult executeCreateStream(final CreateStreamCommand createStre
withQuery,
getKsqlTopic(createStream)
);
metaStore.putSource(ksqlStream);
metaStore.putSource(ksqlStream, createStream.isOrReplace());
return new DdlCommandResult(true, "Stream created");
}

Expand All @@ -90,7 +90,7 @@ public DdlCommandResult executeCreateTable(final CreateTableCommand createTable)
withQuery,
getKsqlTopic(createTable)
);
metaStore.putSource(ksqlTable);
metaStore.putSource(ksqlTable, createTable.isOrReplace());
return new DdlCommandResult(true, "Table created");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -211,10 +211,15 @@ void registerQuery(final QueryMetadata query) {
final PersistentQueryMetadata persistentQuery = (PersistentQueryMetadata) query;
final QueryId queryId = persistentQuery.getQueryId();

if (persistentQueries.putIfAbsent(queryId, persistentQuery) != null) {
throw new IllegalStateException("Query already registered:" + queryId);
// don't use persistentQueries.put(queryId) here because oldQuery.close()
// will remove any query with oldQuery.getQueryId() from the map of persistent
// queries
final PersistentQueryMetadata oldQuery = persistentQueries.get(queryId);
if (oldQuery != null) {
oldQuery.close();
}

persistentQueries.put(queryId, persistentQuery);
metaStore.updateForPersistentQuery(
queryId.toString(),
persistentQuery.getSourceNames(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,14 @@
import io.confluent.ksql.execution.plan.Formats;
import io.confluent.ksql.metastore.model.DataSource;
import io.confluent.ksql.name.SourceName;
import io.confluent.ksql.parser.tree.CreateAsSelect;
import io.confluent.ksql.parser.tree.CreateStreamAsSelect;
import io.confluent.ksql.parser.tree.CreateTableAsSelect;
import io.confluent.ksql.parser.tree.ExecutableDdlStatement;
import io.confluent.ksql.parser.tree.Query;
import io.confluent.ksql.parser.tree.QueryContainer;
import io.confluent.ksql.parser.tree.Sink;
import io.confluent.ksql.parser.tree.Statement;
import io.confluent.ksql.physical.PhysicalPlan;
import io.confluent.ksql.planner.LogicalPlanNode;
import io.confluent.ksql.planner.plan.DataSourceNode;
Expand Down Expand Up @@ -163,7 +165,7 @@ KsqlPlan plan(final ConfiguredStatement<?> statement) {
(KsqlStructuredDataOutputNode) plans.logicalPlan.getNode().get();

final Optional<DdlCommand> ddlCommand = maybeCreateSinkDdl(
statement.getStatementText(),
statement,
outputNode
);

Expand Down Expand Up @@ -193,11 +195,12 @@ private ExecutorPlans planQuery(
final Query query,
final Optional<Sink> sink) {
final QueryEngine queryEngine = engineContext.createQueryEngine(serviceContext);
final KsqlConfig config = this.ksqlConfig.cloneWithPropertyOverwrite(overriddenProperties);
final OutputNode outputNode = QueryEngine.buildQueryLogicalPlan(
query,
sink,
engineContext.getMetaStore(),
ksqlConfig.cloneWithPropertyOverwrite(overriddenProperties)
config
);
final LogicalPlanNode logicalPlan = new LogicalPlanNode(
statement.getStatementText(),
Expand All @@ -206,7 +209,8 @@ private ExecutorPlans planQuery(
final QueryId queryId = QueryIdUtil.buildId(
engineContext.getMetaStore(),
engineContext.idGenerator(),
outputNode
outputNode,
config.getBoolean(KsqlConfig.KSQL_CREATE_OR_REPLACE_ENABLED)
);
final PhysicalPlan physicalPlan = queryEngine.buildPhysicalPlan(
logicalPlan,
Expand All @@ -231,7 +235,7 @@ private ExecutorPlans(
}

private Optional<DdlCommand> maybeCreateSinkDdl(
final String sql,
final ConfiguredStatement<?> cfgStatement,
final KsqlStructuredDataOutputNode outputNode
) {
if (!outputNode.createInto()) {
Expand All @@ -245,6 +249,7 @@ private Optional<DdlCommand> maybeCreateSinkDdl(
outputNode.getSerdeOptions()
);

final Statement statement = cfgStatement.getStatement();
final CreateSourceCommand ddl;
if (outputNode.getNodeOutputType() == DataSourceType.KSTREAM) {
ddl = new CreateStreamCommand(
Expand All @@ -253,7 +258,9 @@ private Optional<DdlCommand> maybeCreateSinkDdl(
outputNode.getTimestampColumn(),
outputNode.getKsqlTopic().getKafkaTopicName(),
formats,
outputNode.getKsqlTopic().getKeyFormat().getWindowInfo()
outputNode.getKsqlTopic().getKeyFormat().getWindowInfo(),
Optional.of(
statement instanceof CreateAsSelect && ((CreateAsSelect) statement).isOrReplace())
);
} else {
ddl = new CreateTableCommand(
Expand All @@ -262,12 +269,14 @@ private Optional<DdlCommand> maybeCreateSinkDdl(
outputNode.getTimestampColumn(),
outputNode.getKsqlTopic().getKafkaTopicName(),
formats,
outputNode.getKsqlTopic().getKeyFormat().getWindowInfo()
outputNode.getKsqlTopic().getKeyFormat().getWindowInfo(),
Optional.of(
statement instanceof CreateAsSelect && ((CreateAsSelect) statement).isOrReplace())
);
}

final SchemaRegistryClient srClient = serviceContext.getSchemaRegistryClient();
AvroUtil.throwOnInvalidSchemaEvolution(sql, ddl, srClient);
AvroUtil.throwOnInvalidSchemaEvolution(cfgStatement.getStatementText(), ddl, srClient);
return Optional.of(ddl);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,14 @@ private QueryIdUtil() {
* @param metaStore the meta store representing the current state of the engine
* @param idGenerator generates query ids
* @param outputNode the logical plan
* @param createOrReplaceEnabled whether or not the queryID can replace an existing one
* @return the {@link QueryId} to be used
*/
static QueryId buildId(
final MetaStore metaStore,
final QueryIdGenerator idGenerator,
final OutputNode outputNode
) {
final OutputNode outputNode,
final boolean createOrReplaceEnabled) {
if (!outputNode.getSinkName().isPresent()) {
return new QueryId(String.valueOf(Math.abs(ThreadLocalRandom.current().nextLong())));
}
Expand All @@ -64,6 +65,15 @@ static QueryId buildId(
throw new KsqlException("REPLACE for sink " + sink + " is not supported because there are "
+ "multiple queries writing into it: " + queriesForSink);
} else if (!queriesForSink.isEmpty()) {
if (!createOrReplaceEnabled) {
final String type = outputNode.getNodeOutputType().getKsqlType().toLowerCase();
throw new UnsupportedOperationException(
String.format(
"Cannot add %s '%s': A %s with the same name already exists",
type,
sink.text(),
type));
}
return new QueryId(Iterables.getOnlyElement(queriesForSink));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ public void shouldNotInheritNamespaceExplicitlySetUpstreamForAvro() {
ksqlTopic
);

newAvroMetaStore.putSource(ksqlStream);
newAvroMetaStore.putSource(ksqlStream, false);

final List<Statement> statements = parse(simpleQuery, newAvroMetaStore);
final CreateStreamAsSelect createStreamAsSelect = (CreateStreamAsSelect) statements.get(0);
Expand Down Expand Up @@ -392,7 +392,7 @@ private void registerKafkaSource() {
topic
);

jsonMetaStore.putSource(stream);
jsonMetaStore.putSource(stream, false);
}

private static List<Statement> parse(final String simpleQuery, final MetaStore metaStore) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package io.confluent.ksql.ddl.commands;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThrows;
import static org.mockito.Mockito.when;

import io.confluent.ksql.execution.ddl.commands.CreateStreamCommand;
Expand All @@ -18,6 +20,7 @@
import io.confluent.ksql.metastore.model.KsqlStream;
import io.confluent.ksql.name.ColumnName;
import io.confluent.ksql.name.SourceName;
import io.confluent.ksql.parser.tree.CreateAsSelect;
import io.confluent.ksql.schema.ksql.LogicalSchema;
import io.confluent.ksql.schema.ksql.types.SqlTypes;
import io.confluent.ksql.serde.FormatFactory;
Expand All @@ -26,6 +29,7 @@
import io.confluent.ksql.serde.SerdeOption;
import io.confluent.ksql.serde.ValueFormat;
import io.confluent.ksql.serde.WindowInfo;
import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.MetaStoreFixture;
import java.util.Optional;
import java.util.Set;
Expand All @@ -46,6 +50,12 @@ public class DdlCommandExecTest {
.valueColumn(ColumnName.of("F1"), SqlTypes.BIGINT)
.valueColumn(ColumnName.of("F2"), SqlTypes.STRING)
.build();
private static final LogicalSchema SCHEMA2 = LogicalSchema.builder()
.keyColumn(ColumnName.of("K0"), SqlTypes.BIGINT)
.valueColumn(ColumnName.of("F1"), SqlTypes.BIGINT)
.valueColumn(ColumnName.of("F2"), SqlTypes.STRING)
.valueColumn(ColumnName.of("F3"), SqlTypes.STRING)
.build();
private static final ValueFormat VALUE_FORMAT = ValueFormat.of(FormatInfo.of(FormatFactory.JSON.name()));
private static final KeyFormat KEY_FORMAT = KeyFormat.nonWindowed(FormatInfo.of(FormatFactory.KAFKA.name()));
private static final Set<SerdeOption> SERDE_OPTIONS = SerdeOption.none();
Expand Down Expand Up @@ -92,6 +102,20 @@ public void shouldAddStreamWithCorrectSql() {
assertThat(metaStore.getSource(STREAM_NAME).getSqlExpression(), is(SQL_TEXT));
}

@Test
public void shouldAddStreamWithReplace() {
// Given:
givenCreateStream();
cmdExec.execute(SQL_TEXT, createStream, false);

// When:
givenCreateStream(SCHEMA2, true);
cmdExec.execute(SQL_TEXT, createStream, false);

// Then:
assertThat(metaStore.getSource(STREAM_NAME).getSchema(), is(SCHEMA2));
}

@Test
public void shouldAddSinkStream() {
// Given:
Expand Down Expand Up @@ -204,7 +228,7 @@ public void shouldDropMissingSource() {
@Test
public void shouldDropSource() {
// Given:
metaStore.putSource(source);
metaStore.putSource(source, false);
givenDropSourceCommand(STREAM_NAME);

// When:
Expand Down Expand Up @@ -245,21 +269,40 @@ public void shouldDropMissingType() {
assertThat(result.getMessage(), is("Type 'type' does not exist"));
}

@Test
public void shouldFailAddDuplicateStreamWithoutReplace() {
// Given:
givenCreateStream();
cmdExec.execute(SQL_TEXT, createStream, false);

// When:
givenCreateStream(SCHEMA2, false);
final KsqlException e = assertThrows(KsqlException.class, () -> cmdExec.execute(SQL_TEXT, createStream, false));

// Then:
assertThat(e.getMessage(), containsString("A stream with the same name already exists"));
}

private void givenDropSourceCommand(final SourceName name) {
dropSource = new DropSourceCommand(name);
}

private void givenCreateStream() {
givenCreateStream(SCHEMA, false);
}

private void givenCreateStream(final LogicalSchema schema, final boolean allowReplace) {
createStream = new CreateStreamCommand(
STREAM_NAME,
SCHEMA,
schema,
Optional.of(timestampColumn),
"topic",
io.confluent.ksql.execution.plan.Formats.of(
KEY_FORMAT,
VALUE_FORMAT,
SERDE_OPTIONS),
Optional.empty()
Optional.empty(),
Optional.of(allowReplace)
);
}

Expand All @@ -273,7 +316,8 @@ private void givenCreateWindowedStream() {
KEY_FORMAT,
VALUE_FORMAT,
SERDE_OPTIONS),
Optional.of(windowInfo)
Optional.of(windowInfo),
Optional.of(false)
);
}

Expand All @@ -288,7 +332,8 @@ private void givenCreateWindowedTable() {
VALUE_FORMAT,
SERDE_OPTIONS
),
Optional.of(windowInfo)
Optional.of(windowInfo),
Optional.of(false)
);
}

Expand All @@ -303,7 +348,8 @@ private void givenCreateTable() {
VALUE_FORMAT,
SERDE_OPTIONS
),
Optional.empty()
Optional.empty(),
Optional.of(false)
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -927,7 +927,7 @@ private void givenDataSourceWithSchema(
}

final MetaStoreImpl metaStore = new MetaStoreImpl(TestFunctionRegistry.INSTANCE.get());
metaStore.putSource(dataSource);
metaStore.putSource(dataSource, false);

when(engine.getMetaStore()).thenReturn(metaStore);
}
Expand Down
Loading

0 comments on commit e7ff81a

Please sign in to comment.