Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: CREATE IF NOT EXISTS does not work at all #6073

Merged
merged 9 commits into from
Dec 8, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 6 additions & 2 deletions docs/developer-guide/ksqldb-reference/create-stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ Synopsis
--------

```sql
CREATE [OR REPLACE] STREAM stream_name ( { column_name data_type [KEY] } [, ...] )
CREATE [OR REPLACE] STREAM [IF NOT EXISTS] stream_name ( { column_name data_type [KEY] } [, ...] )
WITH ( property_name = expression [, ...] );
```

Expand All @@ -22,6 +22,10 @@ Description

Create a new stream with the specified columns and properties.

If the IF NOT EXISTS clause is present, the statement won't fail if a
stream with the same name already exists.


A ksqlDB STREAM is a stream of _facts_. Each fact is immutable and unique.
A stream can store its data in either `KEY` or `VALUE` columns.
Both `KEY` and `VALUE` columns can be NULL. No special processing is done if two rows have the same
Expand Down Expand Up @@ -72,7 +76,7 @@ The WITH clause supports the following properties:
| VALUE_DELIMITER | Used when VALUE_FORMAT='DELIMITED'. Supports single character to be a delimiter, defaults to ','. For space and tab delimited values you must use the special values 'SPACE' or 'TAB', not an actual space or tab character. |
| TIMESTAMP | By default, the pseudo `ROWTIME` column is the timestamp of the message in the Kafka topic. The TIMESTAMP property can be used to override `ROWTIME` with the contents of the specified column within the Kafka message (similar to timestamp extractors in Kafka's Streams API). Timestamps have a millisecond accuracy. Time-based operations, such as windowing, will process a record according to the timestamp in `ROWTIME`. |
| TIMESTAMP_FORMAT | Used in conjunction with TIMESTAMP. If not set the timestamp column must be of type `bigint`. If it is set, then the TIMESTAMP column must be of type `varchar` and have a format that can be parsed with the java `DateTimeFormatter`. If your timestamp format has characters requiring single quotes, you can escape them with successive single quotes, `''`, for example: `'yyyy-MM-dd''T''HH:mm:ssX'`. For more information on timestamp formats, see [DateTimeFormatter](https://cnfl.io/java-dtf). |
| WRAP_SINGLE_VALUE | Controls how values are deserialized where the value schema contains only a single column. The setting controls how ksqlDB will deserialize the value of the records in the supplied `KAFKA_TOPIC` that contain only a single column.<br>If set to `true`, ksqlDB expects the column to have been serialized as a named column within a record.<br>If set to `false`, ksqlDB expects the column to have been serialized as an anonymous value.<br>If not supplied, the system default, defined by [ksql.persistence.wrap.single.values](../../operate-and-deploy/installation/server-config/config-reference.md#ksqlpersistencewrapsinglevalues), then the format's default is used.<br>**Note:** `null` values have special meaning in ksqlDB. Care should be taken when dealing with single-column schemas where the value can be `null`. For more information, see [Single column (un)wrapping](../serialization.md#single-field-unwrapping).<br>**Note:** Supplying this property for formats that do not support wrapping, for example `DELIMITED`, or when the value schema has multiple columns, will result in an error. |
| WRAP_SINGLE_VALUE | Controls how values are deserialized where the value schema contains only a single column. The setting controls how ksqlDB will deserialize the value of the records in the supplied `KAFKA_TOPIC` that contain only a single column.<br>If set to `true`, ksqlDB expects the column to have been serialized as a named column within a record.<br>If set to `false`, ksqlDB expects the column to have been serialized as an anonymous value.<br>If not supplied, the system default, defined by [ksql.persistence.wrap.single.values](../../operate-and-deploy/installation/server-config/config-reference.md#ksqlpersistencewrapsinglevalues) and defaulting to `true`, is used.<br>**Note:** `null` values have special meaning in ksqlDB. Care should be taken when dealing with single-column schemas where the value can be `null`. For more information, see [Single column (un)wrapping](../serialization.md#single-field-unwrapping).<br>**Note:** Supplying this property for formats that do not support wrapping, for example `DELIMITED`, or when the value schema has multiple columns, will result in an error. |
| WINDOW_TYPE | By default, the topic is assumed to contain non-windowed data. If the data is windowed, i.e., was created using ksqlDB using a query that contains a `WINDOW` clause, then the `WINDOW_TYPE` property can be used to provide the window type. Valid values are `SESSION`, `HOPPING`, and `TUMBLING`. |
| WINDOW_SIZE | By default, the topic is assumed to contain non-windowed data. If the data is windowed, i.e., was created using ksqlDB using a query that contains a `WINDOW` clause, and the `WINDOW_TYPE` property is TUMBLING or HOPPING, then the WINDOW_SIZE property should be set. The property is a string with two literals, window size (a number) and window size unit (a time unit). For example: `10 SECONDS`. |

Expand Down
5 changes: 4 additions & 1 deletion docs/developer-guide/ksqldb-reference/create-table.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ Synopsis
--------

```sql
CREATE [OR REPLACE] TABLE table_name ( { column_name data_type [PRIMARY KEY] } [, ...] )
CREATE [OR REPLACE] TABLE [IF NOT EXISTS] table_name ( { column_name data_type [PRIMARY KEY] } [, ...] )
WITH ( property_name = expression [, ...] );
```

Expand All @@ -19,6 +19,9 @@ Description

Create a new table with the specified columns and properties.

If the IF NOT EXISTS clause is present, the statement won't fail if a
table with the same name already exists.

A ksqlDB TABLE works much like tables in other SQL systems. A table has zero or more rows. Each
row is identified by its `PRIMARY KEY`. A row's `PRIMARY KEY` can not be `NULL`. A message in the
underlying Kafka topic with the same key as an existing row will _replace_ the existing row in the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public class CommandFactories implements DdlCommandFactory {

public CommandFactories(final ServiceContext serviceContext, final MetaStore metaStore) {
this(
new CreateSourceFactory(serviceContext),
new CreateSourceFactory(serviceContext, metaStore),
new DropSourceFactory(metaStore),
new RegisterTypeFactory(metaStore),
new DropTypeFactory(metaStore),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import io.confluent.ksql.execution.streams.timestamp.TimestampExtractionPolicyFactory;
import io.confluent.ksql.execution.timestamp.TimestampColumn;
import io.confluent.ksql.logging.processing.NoopProcessingLogContext;
import io.confluent.ksql.metastore.MetaStore;
import io.confluent.ksql.metastore.model.DataSource;
import io.confluent.ksql.name.ColumnName;
import io.confluent.ksql.name.SourceName;
import io.confluent.ksql.parser.properties.with.CreateSourceProperties;
Expand Down Expand Up @@ -58,14 +60,16 @@ public final class CreateSourceFactory {
private final SerdeFeaturessSupplier valueSerdeFeaturesSupplier;
private final KeySerdeFactory keySerdeFactory;
private final ValueSerdeFactory valueSerdeFactory;
private final MetaStore metaStore;

public CreateSourceFactory(final ServiceContext serviceContext) {
public CreateSourceFactory(final ServiceContext serviceContext, final MetaStore metaStore) {
this(
serviceContext,
(s, f, e, k) -> SerdeFeaturesFactory.buildKeyFeatures(s, f),
SerdeFeaturesFactory::buildValueFeatures,
new GenericKeySerDe(),
new GenericRowSerDe()
new GenericRowSerDe(),
metaStore
);
}

Expand All @@ -75,7 +79,8 @@ public CreateSourceFactory(final ServiceContext serviceContext) {
final SerdeFeaturessSupplier keySerdeFeaturesSupplier,
final SerdeFeaturessSupplier valueSerdeFeaturesSupplier,
final KeySerdeFactory keySerdeFactory,
final ValueSerdeFactory valueSerdeFactory
final ValueSerdeFactory valueSerdeFactory,
final MetaStore metaStore
) {
this.serviceContext = requireNonNull(serviceContext, "serviceContext");
this.keySerdeFeaturesSupplier =
Expand All @@ -84,6 +89,7 @@ public CreateSourceFactory(final ServiceContext serviceContext) {
requireNonNull(valueSerdeFeaturesSupplier, "valueSerdeFeaturesSupplier");
this.keySerdeFactory = requireNonNull(keySerdeFactory, "keySerdeFactory");
this.valueSerdeFactory = requireNonNull(valueSerdeFactory, "valueSerdeFactory");
this.metaStore = requireNonNull(metaStore);
}

public CreateStreamCommand createStreamCommand(final KsqlStructuredDataOutputNode outputNode) {
Expand All @@ -108,6 +114,14 @@ public CreateStreamCommand createStreamCommand(
final LogicalSchema schema = buildSchema(statement.getElements());
final Optional<TimestampColumn> timestampColumn =
buildTimestampColumn(ksqlConfig, props, schema);
final DataSource dataSource = metaStore.getSource(sourceName);

if (dataSource != null && !statement.isOrReplace() && !statement.isNotExists()) {
final String sourceType = dataSource.getDataSourceType().getKsqlType();
throw new KsqlException(
String.format("Cannot add stream '%s': A %s with the same name already exists",
sourceName.text(), sourceType.toLowerCase()));
}

return new CreateStreamCommand(
sourceName,
Expand Down Expand Up @@ -140,6 +154,14 @@ public CreateTableCommand createTableCommand(
final CreateSourceProperties props = statement.getProperties();
final String topicName = ensureTopicExists(props, serviceContext);
final LogicalSchema schema = buildSchema(statement.getElements());
final DataSource dataSource = metaStore.getSource(sourceName);

if (dataSource != null && !statement.isOrReplace() && !statement.isNotExists()) {
final String sourceType = dataSource.getDataSourceType().getKsqlType();
throw new KsqlException(
String.format("Cannot add table '%s': A %s with the same name already exists",
sourceName.text(), sourceType.toLowerCase()));
}
if (schema.key().isEmpty()) {
final boolean usingSchemaInference = props.getValueSchemaId().isPresent();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,17 @@ private Executor(

@Override
public DdlCommandResult executeCreateStream(final CreateStreamCommand createStream) {
final SourceName sourceName = createStream.getSourceName();
final DataSource dataSource = metaStore.getSource(sourceName);

if (dataSource != null && !createStream.isOrReplace()) {
final String sourceType = dataSource.getDataSourceType().getKsqlType();
return new DdlCommandResult(true,
String.format("Cannot add stream %s: A %s with the same name "
+ "already exists.",
sourceName, sourceType.toLowerCase()));
}

final KsqlStream<?> ksqlStream = new KsqlStream<>(
sql,
createStream.getSourceName(),
Expand All @@ -89,13 +100,24 @@ public DdlCommandResult executeCreateStream(final CreateStreamCommand createStre
withQuery,
getKsqlTopic(createStream)
);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@big-andy-coates @spena
I noticed that CreateStreamCommand did not have the boolean flag for ifNotExists set here , I thought if this approach is ok , I will add that too and fail the query if its not set and source exists

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The place to put the check is in the CreateSourceFactory. You'll need to pass the MetaStore in to the factory's constructor.

See DropSourceFactory for an example of the pattern.

metaStore.putSource(ksqlStream, createStream.isOrReplace());
metaStore.addSourceReferences(ksqlStream.getName(), withQuerySources);
return new DdlCommandResult(true, "Stream created");
}

@Override
public DdlCommandResult executeCreateTable(final CreateTableCommand createTable) {
final SourceName sourceName = createTable.getSourceName();
final DataSource dataSource = metaStore.getSource(sourceName);

if (dataSource != null && !createTable.isOrReplace()) {
final String sourceType = dataSource.getDataSourceType().getKsqlType();
return new DdlCommandResult(true,
String.format("Cannot add table %s: A %s with the same name "
+ "already exists.",
sourceName, sourceType.toLowerCase()));
}
final KsqlTable<?> ksqlTable = new KsqlTable<>(
sql,
createTable.getSourceName(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import io.confluent.ksql.parser.tree.Query;
import io.confluent.ksql.parser.tree.QueryContainer;
import io.confluent.ksql.parser.tree.Sink;
import io.confluent.ksql.parser.tree.Statement;
import io.confluent.ksql.physical.PhysicalPlan;
import io.confluent.ksql.physical.pull.HARouting;
import io.confluent.ksql.physical.pull.PullPhysicalPlan;
Expand Down Expand Up @@ -109,7 +110,9 @@ static EngineExecutor create(
}

ExecuteResult execute(final KsqlPlan plan) {
if (!plan.getQueryPlan().isPresent()) {
final Optional<QueryPlan> queryPlan = plan.getQueryPlan();

if (!queryPlan.isPresent()) {
final String ddlResult = plan
.getDdlCommand()
.map(ddl -> executeDdl(ddl, plan.getStatementText(), false, Collections.emptySet()))
Expand All @@ -119,11 +122,16 @@ ExecuteResult execute(final KsqlPlan plan) {
return ExecuteResult.of(ddlResult);
}

final QueryPlan queryPlan = plan.getQueryPlan().get();
plan.getDdlCommand().map(ddl ->
executeDdl(ddl, plan.getStatementText(), true, queryPlan.getSources()));
final Optional<String> ddlResult = plan.getDdlCommand().map(ddl ->
executeDdl(ddl, plan.getStatementText(), true, queryPlan.get().getSources()));

// Return if the source to create already exists.
if (ddlResult.isPresent() && ddlResult.get().contains("already exists")) {
return ExecuteResult.of(ddlResult.get());
}

return ExecuteResult.of(executePersistentQuery(
queryPlan,
queryPlan.get(),
plan.getStatementText(),
plan.getDdlCommand().isPresent())
);
Expand Down Expand Up @@ -233,6 +241,7 @@ KsqlPlan plan(final ConfiguredStatement<?> statement) {
(KsqlStructuredDataOutputNode) plans.logicalPlan.getNode().get();

final Optional<DdlCommand> ddlCommand = maybeCreateSinkDdl(
statement,
outputNode
);

Expand Down Expand Up @@ -340,13 +349,32 @@ private ExecutorPlans(
}

private Optional<DdlCommand> maybeCreateSinkDdl(
final ConfiguredStatement<?> cfgStatement,
final KsqlStructuredDataOutputNode outputNode
) {
if (!outputNode.createInto()) {
validateExistingSink(outputNode);
return Optional.empty();
}

final Statement statement = cfgStatement.getStatement();
final SourceName intoSource = outputNode.getIntoSourceName();
final boolean orReplace = statement instanceof CreateAsSelect
&& ((CreateAsSelect) statement).isOrReplace();
final boolean ifNotExists = statement instanceof CreateAsSelect
&& ((CreateAsSelect) statement).isNotExists();

final DataSource dataSource = engineContext.getMetaStore().getSource(intoSource);
if (dataSource != null && !ifNotExists && !orReplace) {
final String failedSourceType = outputNode.getNodeOutputType().getKsqlType();
final String foundSourceType = dataSource.getDataSourceType().getKsqlType();

throw new KsqlException(String.format(
"Cannot add %s '%s': A %s with the same name already exists",
failedSourceType.toLowerCase(), intoSource.text(), foundSourceType.toLowerCase()
));
}

return Optional.of(engineContext.createDdlCommand(outputNode));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
import io.confluent.ksql.KsqlExecutionContext;
import io.confluent.ksql.KsqlExecutionContext.ExecuteResult;
import io.confluent.ksql.execution.ddl.commands.CreateSourceCommand;
import io.confluent.ksql.parser.properties.with.SourcePropertiesUtil;
import io.confluent.ksql.parser.tree.CreateAsSelect;
import io.confluent.ksql.parser.tree.CreateSource;
Expand All @@ -43,7 +43,6 @@
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlSchemaRegistryNotConfiguredException;
import io.confluent.ksql.util.KsqlStatementException;
import io.confluent.ksql.util.PersistentQueryMetadata;
import java.io.IOException;
import java.util.List;
import java.util.Objects;
Expand Down Expand Up @@ -111,26 +110,28 @@ private void registerForCreateSource(final ConfiguredStatement<? extends CreateS
}

private void registerForCreateAs(final ConfiguredStatement<? extends CreateAsSelect> cas) {
final ServiceContext sandboxServiceContext = SandboxedServiceContext.create(serviceContext);
final ExecuteResult executeResult = executionContext
.createSandbox(sandboxServiceContext)
.execute(sandboxServiceContext, cas);

final PersistentQueryMetadata queryMetadata = (PersistentQueryMetadata) executeResult
.getQuery()
.orElseThrow(() -> new KsqlStatementException(
"Could not determine output schema for query due to error: "
+ executeResult.getCommandResult(),
cas.getStatementText()
));
final CreateSourceCommand createSourceCommand;

try {
final ServiceContext sandboxServiceContext = SandboxedServiceContext.create(serviceContext);
createSourceCommand = (CreateSourceCommand)
executionContext.createSandbox(sandboxServiceContext)
.plan(sandboxServiceContext, cas)
.getDdlCommand()
.get();
} catch (final Exception e) {
throw new KsqlStatementException(
"Could not determine output schema for query due to error: "
+ e.getMessage(), cas.getStatementText(), e);
}

registerSchemas(
queryMetadata.getLogicalSchema(),
queryMetadata.getResultTopic().getKafkaTopicName(),
queryMetadata.getResultTopic().getKeyFormat().getFormatInfo(),
queryMetadata.getPhysicalSchema().keySchema().features(),
queryMetadata.getResultTopic().getValueFormat().getFormatInfo(),
queryMetadata.getPhysicalSchema().valueSchema().features(),
createSourceCommand.getSchema(),
createSourceCommand.getTopicName(),
createSourceCommand.getFormats().getKeyFormat(),
createSourceCommand.getFormats().getKeyFeatures(),
createSourceCommand.getFormats().getValueFormat(),
createSourceCommand.getFormats().getValueFeatures(),
Comment on lines +129 to +134
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had to build the query information from the plan instead of executing it. This avoids failing with an error if the IF NOT EXISTS keyword is used in the CREATE_AS statement. Also, this seems a better approach than execution 'cause this injector is only registering the schema in SR.

cas.getSessionConfig().getConfig(false),
cas.getStatementText(),
true
Expand Down
Loading