Skip to content

Commit

Permalink
refactor: re-use CreateSourceFactory to create streams/tables for CRE…
Browse files Browse the repository at this point in the history
…ATE_AS statements (confluentinc#6594)
  • Loading branch information
spena committed Nov 13, 2020
1 parent 2d92144 commit 1e46ff8
Show file tree
Hide file tree
Showing 14 changed files with 186 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ public class Analysis implements ImmutableAnalysis {
private OptionalInt limitClause = OptionalInt.empty();
private CreateSourceAsProperties withProperties = CreateSourceAsProperties.none();
private final List<FunctionCall> tableFunctions = new ArrayList<>();
private boolean orReplace = false;

public Analysis(final Optional<RefinementInfo> refinementInfo) {
this(refinementInfo, SourceSchemas::new);
Expand Down Expand Up @@ -219,6 +220,15 @@ public AliasedDataSource getFrom() {
return allDataSources.get(0);
}

@Override
public boolean getOrReplace() {
return orReplace;
}

public void setOrReplace(final boolean orReplace) {
this.orReplace = orReplace;
}

void addDataSource(final SourceName alias, final DataSource dataSource) {
if (!(dataSource instanceof KsqlStream) && !(dataSource instanceof KsqlTable)) {
throw new IllegalArgumentException("Data source type not supported yet: " + dataSource);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,8 @@ private void analyzeNonStdOutSink(final Sink sink) {

analysis
.setInto(Into.newSink(sink.getName(), topicName, windowInfo, keyFmtInfo, valueFmtInfo));

analysis.setOrReplace(sink.shouldReplace());
}

private FormatInfo buildKeyFormatInfo(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,4 +72,6 @@ public interface ImmutableAnalysis {
SourceSchemas getFromSourceSchemas(boolean postAggregate);

AliasedDataSource getFrom();

boolean getOrReplace();
}
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,11 @@ private <T extends Expression> Optional<T> rewriteOptional(final Optional<T> exp
return expression.map(this::rewrite);
}

@Override
public boolean getOrReplace() {
return original.getOrReplace();
}

private <T extends Expression> List<T> rewriteList(final List<T> expressions) {
return expressions.stream()
.map(this::rewrite)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import io.confluent.ksql.execution.ddl.commands.DropTypeCommand;
import io.confluent.ksql.execution.ddl.commands.RegisterTypeCommand;
import io.confluent.ksql.metastore.MetaStore;
import io.confluent.ksql.metastore.model.DataSource;
import io.confluent.ksql.parser.DropType;
import io.confluent.ksql.parser.tree.AlterSource;
import io.confluent.ksql.parser.tree.CreateStream;
Expand All @@ -33,6 +34,7 @@
import io.confluent.ksql.parser.tree.DropStream;
import io.confluent.ksql.parser.tree.DropTable;
import io.confluent.ksql.parser.tree.RegisterType;
import io.confluent.ksql.planner.plan.KsqlStructuredDataOutputNode;
import io.confluent.ksql.services.ServiceContext;
import io.confluent.ksql.util.HandlerMaps;
import io.confluent.ksql.util.HandlerMaps.ClassHandlerMapR2;
Expand Down Expand Up @@ -111,6 +113,15 @@ public DdlCommand create(
ddlStatement);
}

@Override
public DdlCommand create(final KsqlStructuredDataOutputNode outputNode) {
if (outputNode.getNodeOutputType() == DataSource.DataSourceType.KSTREAM) {
return createSourceFactory.createStreamCommand(outputNode);
} else {
return createSourceFactory.createTableCommand(outputNode);
}
}

private CreateStreamCommand handleCreateStream(
final CallInfo callInfo,
final CreateStream statement
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import io.confluent.ksql.parser.tree.CreateStream;
import io.confluent.ksql.parser.tree.CreateTable;
import io.confluent.ksql.parser.tree.TableElements;
import io.confluent.ksql.planner.plan.KsqlStructuredDataOutputNode;
import io.confluent.ksql.schema.ksql.LogicalSchema;
import io.confluent.ksql.schema.ksql.PhysicalSchema;
import io.confluent.ksql.schema.ksql.SystemColumns;
Expand Down Expand Up @@ -85,6 +86,18 @@ public CreateSourceFactory(final ServiceContext serviceContext) {
this.valueSerdeFactory = requireNonNull(valueSerdeFactory, "valueSerdeFactory");
}

public CreateStreamCommand createStreamCommand(final KsqlStructuredDataOutputNode outputNode) {
return new CreateStreamCommand(
outputNode.getIntoSourceName(),
outputNode.getSchema(),
outputNode.getTimestampColumn(),
outputNode.getKsqlTopic().getKafkaTopicName(),
Formats.from(outputNode.getKsqlTopic()),
outputNode.getKsqlTopic().getKeyFormat().getWindowInfo(),
Optional.of(outputNode.getOrReplace())
);
}

public CreateStreamCommand createStreamCommand(
final CreateStream statement,
final KsqlConfig ksqlConfig
Expand All @@ -107,6 +120,18 @@ public CreateStreamCommand createStreamCommand(
);
}

public CreateTableCommand createTableCommand(final KsqlStructuredDataOutputNode outputNode) {
return new CreateTableCommand(
outputNode.getIntoSourceName(),
outputNode.getSchema(),
outputNode.getTimestampColumn(),
outputNode.getKsqlTopic().getKafkaTopicName(),
Formats.from(outputNode.getKsqlTopic()),
outputNode.getKsqlTopic().getKeyFormat().getWindowInfo(),
Optional.of(outputNode.getOrReplace())
);
}

public CreateTableCommand createTableCommand(
final CreateTable statement,
final KsqlConfig ksqlConfig
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,16 @@
import io.confluent.ksql.config.SessionConfig;
import io.confluent.ksql.execution.ddl.commands.DdlCommand;
import io.confluent.ksql.parser.tree.DdlStatement;
import io.confluent.ksql.planner.plan.KsqlStructuredDataOutputNode;

public interface DdlCommandFactory {
DdlCommand create(
String sqlExpression,
DdlStatement ddlStatement,
SessionConfig config
);

DdlCommand create(
KsqlStructuredDataOutputNode outputNode
);
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@

package io.confluent.ksql.ddl.commands;

import com.google.common.annotations.VisibleForTesting;
import io.confluent.ksql.execution.ddl.commands.DropSourceCommand;
import io.confluent.ksql.metastore.MetaStore;
import io.confluent.ksql.metastore.model.DataSource;
Expand All @@ -29,7 +28,6 @@
public final class DropSourceFactory {
private final MetaStore metaStore;

@VisibleForTesting
DropSourceFactory(final MetaStore metaStore) {
this.metaStore = Objects.requireNonNull(metaStore, "metaStore");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import io.confluent.ksql.parser.KsqlParser.PreparedStatement;
import io.confluent.ksql.parser.VariableSubstitutor;
import io.confluent.ksql.parser.tree.ExecutableDdlStatement;
import io.confluent.ksql.planner.plan.KsqlStructuredDataOutputNode;
import io.confluent.ksql.query.QueryExecutor;
import io.confluent.ksql.query.QueryId;
import io.confluent.ksql.query.id.QueryIdGenerator;
Expand Down Expand Up @@ -246,6 +247,10 @@ DdlCommand createDdlCommand(
);
}

DdlCommand createDdlCommand(final KsqlStructuredDataOutputNode outputNode) {
return ddlCommandFactory.create(outputNode);
}

String executeDdl(
final String sqlExpression,
final DdlCommand command,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,8 @@

import io.confluent.ksql.KsqlExecutionContext.ExecuteResult;
import io.confluent.ksql.config.SessionConfig;
import io.confluent.ksql.execution.ddl.commands.CreateSourceCommand;
import io.confluent.ksql.execution.ddl.commands.CreateStreamCommand;
import io.confluent.ksql.execution.ddl.commands.CreateTableCommand;
import io.confluent.ksql.execution.ddl.commands.DdlCommand;
import io.confluent.ksql.execution.plan.ExecutionStep;
import io.confluent.ksql.execution.plan.Formats;
import io.confluent.ksql.metastore.model.DataSource;
import io.confluent.ksql.name.SourceName;
import io.confluent.ksql.parser.properties.with.SourcePropertiesUtil;
Expand All @@ -36,7 +32,6 @@
import io.confluent.ksql.parser.tree.Query;
import io.confluent.ksql.parser.tree.QueryContainer;
import io.confluent.ksql.parser.tree.Sink;
import io.confluent.ksql.parser.tree.Statement;
import io.confluent.ksql.physical.PhysicalPlan;
import io.confluent.ksql.planner.LogicalPlanNode;
import io.confluent.ksql.planner.plan.DataSourceNode;
Expand Down Expand Up @@ -178,7 +173,6 @@ KsqlPlan plan(final ConfiguredStatement<?> statement) {
(KsqlStructuredDataOutputNode) plans.logicalPlan.getNode().get();

final Optional<DdlCommand> ddlCommand = maybeCreateSinkDdl(
statement,
outputNode
);

Expand Down Expand Up @@ -254,43 +248,14 @@ private ExecutorPlans(
}

private Optional<DdlCommand> maybeCreateSinkDdl(
final ConfiguredStatement<?> cfgStatement,
final KsqlStructuredDataOutputNode outputNode
) {
if (!outputNode.createInto()) {
validateExistingSink(outputNode);
return Optional.empty();
}

final Formats formats = Formats.from(outputNode.getKsqlTopic());

final Statement statement = cfgStatement.getStatement();
final CreateSourceCommand ddl;
if (outputNode.getNodeOutputType() == DataSourceType.KSTREAM) {
ddl = new CreateStreamCommand(
outputNode.getIntoSourceName(),
outputNode.getSchema(),
outputNode.getTimestampColumn(),
outputNode.getKsqlTopic().getKafkaTopicName(),
formats,
outputNode.getKsqlTopic().getKeyFormat().getWindowInfo(),
Optional.of(
statement instanceof CreateAsSelect && ((CreateAsSelect) statement).isOrReplace())
);
} else {
ddl = new CreateTableCommand(
outputNode.getIntoSourceName(),
outputNode.getSchema(),
outputNode.getTimestampColumn(),
outputNode.getKsqlTopic().getKafkaTopicName(),
formats,
outputNode.getKsqlTopic().getKeyFormat().getWindowInfo(),
Optional.of(
statement instanceof CreateAsSelect && ((CreateAsSelect) statement).isOrReplace())
);
}

return Optional.of(ddl);
return Optional.of(engineContext.createDdlCommand(outputNode));
}

private void validateExistingSink(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,8 @@ private OutputNode buildOutputNode(final PlanNode sourcePlanNode) {
existingTopic,
analysis.getLimitClause(),
into.isCreate(),
into.getName()
into.getName(),
analysis.getOrReplace()
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ public class KsqlStructuredDataOutputNode extends OutputNode {
private final KsqlTopic ksqlTopic;
private final boolean doCreateInto;
private final SourceName intoSourceName;
private final boolean orReplace;

public KsqlStructuredDataOutputNode(
final PlanNodeId id,
Expand All @@ -46,13 +47,15 @@ public KsqlStructuredDataOutputNode(
final KsqlTopic ksqlTopic,
final OptionalInt limit,
final boolean doCreateInto,
final SourceName intoSourceName
final SourceName intoSourceName,
final boolean orReplace
) {
super(id, source, schema, limit, timestampColumn);

this.ksqlTopic = requireNonNull(ksqlTopic, "ksqlTopic");
this.doCreateInto = doCreateInto;
this.intoSourceName = requireNonNull(intoSourceName, "intoSourceName");
this.orReplace = orReplace;

validate(source, intoSourceName);
}
Expand All @@ -69,6 +72,10 @@ public SourceName getIntoSourceName() {
return intoSourceName;
}

public boolean getOrReplace() {
return orReplace;
}

@Override
public Optional<SourceName> getSinkName() {
return Optional.of(intoSourceName);
Expand Down
Loading

0 comments on commit 1e46ff8

Please sign in to comment.