Skip to content

Commit

Permalink
refactor: call new constructor on LogicalPlanner
Browse files Browse the repository at this point in the history
  • Loading branch information
spena committed Aug 7, 2019
1 parent 1710927 commit 5b72f5f
Show file tree
Hide file tree
Showing 15 changed files with 52 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlException;

import java.util.Collections;
import java.util.Optional;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
Expand All @@ -33,19 +32,6 @@ public final class TimestampExtractionPolicyFactory {
private TimestampExtractionPolicyFactory() {
}

public static TimestampExtractionPolicy create(
final KsqlSchema schema,
final Optional<String> timestampColumnName,
final Optional<String> timestampFormat
) {
return create(
new KsqlConfig(Collections.emptyMap()),
schema,
timestampColumnName,
timestampFormat
);
}

public static TimestampExtractionPolicy create(
final KsqlConfig ksqlConfig,
final KsqlSchema schema,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,6 @@ private static OutputNode buildQueryLogicalPlan(
final Analysis analysis = queryAnalyzer.analyze(sqlExpression, query, sink);
final AggregateAnalysisResult aggAnalysis = queryAnalyzer.analyzeAggregate(query, analysis);

return new LogicalPlanner(analysis, aggAnalysis, metaStore).buildPlan();
return new LogicalPlanner(config, analysis, aggAnalysis, metaStore).buildPlan();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import io.confluent.ksql.planner.plan.ProjectNode;
import io.confluent.ksql.schema.ksql.KsqlSchema;
import io.confluent.ksql.util.ExpressionTypeManager;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.Pair;
import io.confluent.ksql.util.timestamp.TimestampExtractionPolicy;
Expand All @@ -49,15 +50,18 @@
public class LogicalPlanner {
// CHECKSTYLE_RULES.ON: ClassDataAbstractionCoupling

private final KsqlConfig ksqlConfig;
private final Analysis analysis;
private final AggregateAnalysisResult aggregateAnalysis;
private final FunctionRegistry functionRegistry;

public LogicalPlanner(
final KsqlConfig ksqlConfig,
final Analysis analysis,
final AggregateAnalysisResult aggregateAnalysis,
final FunctionRegistry functionRegistry
) {
this.ksqlConfig = ksqlConfig;
this.analysis = analysis;
this.aggregateAnalysis = aggregateAnalysis;
this.functionRegistry = functionRegistry;
Expand Down Expand Up @@ -125,11 +129,12 @@ private OutputNode buildOutputNode(final PlanNode sourcePlanNode) {
);
}

private static TimestampExtractionPolicy getTimestampExtractionPolicy(
private TimestampExtractionPolicy getTimestampExtractionPolicy(
final KsqlSchema inputSchema,
final Analysis analysis
) {
return TimestampExtractionPolicyFactory.create(
ksqlConfig,
inputSchema,
analysis.getTimestampColumnName(),
analysis.getTimestampFormat());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,7 +228,7 @@ private PhysicalPlanBuilder buildPhysicalPlanBuilder(
}

private QueryMetadata buildPhysicalPlan(final String query) {
final OutputNode logical = AnalysisTestUtil.buildLogicalPlan(query, metaStore);;
final OutputNode logical = AnalysisTestUtil.buildLogicalPlan(ksqlConfig, query, metaStore);;
return physicalPlanBuilder.buildPhysicalPlan(new LogicalPlanNode(query, Optional.of(logical)));
}

Expand Down Expand Up @@ -682,7 +682,7 @@ public void shouldConfigureProducerErrorHandlerLogger() {
final ProcessingLogger logger = mock(ProcessingLogger.class);
when(processingLogContext.getLoggerFactory()).thenReturn(loggerFactory);
final OutputNode spyNode = spy(
AnalysisTestUtil.buildLogicalPlan(simpleSelectFilter, metaStore));
AnalysisTestUtil.buildLogicalPlan(ksqlConfig, simpleSelectFilter, metaStore));
doReturn(new QueryId("foo")).when(spyNode).getQueryId(any());
when(loggerFactory.getLogger("foo")).thenReturn(logger);
when(loggerFactory.getLogger(ArgumentMatchers.startsWith("foo.")))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,10 @@
import io.confluent.ksql.planner.plan.PlanNode;
import io.confluent.ksql.planner.plan.ProjectNode;
import io.confluent.ksql.testutils.AnalysisTestUtil;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.MetaStoreFixture;

import java.util.Collections;
import java.util.Optional;
import org.apache.kafka.connect.data.Schema;
import org.junit.Assert;
Expand All @@ -44,10 +47,12 @@
public class LogicalPlannerTest {

private MetaStore metaStore;
private KsqlConfig ksqlConfig;

@Before
public void init() {
metaStore = MetaStoreFixture.getNewMetaStore(new InternalFunctionRegistry());
ksqlConfig = new KsqlConfig(Collections.emptyMap());
}

@Test
Expand Down Expand Up @@ -249,6 +254,6 @@ public void shouldUpdateKeyToReflectProjectionAlias() {
}

private PlanNode buildLogicalPlan(final String query) {
return AnalysisTestUtil.buildLogicalPlan(query, metaStore);
return AnalysisTestUtil.buildLogicalPlan(ksqlConfig, query, metaStore);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,10 @@
import io.confluent.ksql.metastore.MetaStore;
import io.confluent.ksql.planner.plan.PlanNode;
import io.confluent.ksql.testutils.AnalysisTestUtil;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.MetaStoreFixture;

import java.util.Collections;
import java.util.Set;
import org.apache.kafka.common.utils.Utils;
import org.junit.Before;
Expand All @@ -31,10 +34,12 @@
public class PlanSourceExtractorVisitorTest {

private MetaStore metaStore;
private KsqlConfig ksqlConfig;

@Before
public void init() {
metaStore = MetaStoreFixture.getNewMetaStore(new InternalFunctionRegistry());
ksqlConfig = new KsqlConfig(Collections.emptyMap());
}

@Test
Expand All @@ -61,6 +66,6 @@ public void shouldExtractCorrectSourceForJoinQuery() {
}

private PlanNode buildLogicalPlan(final String query) {
return AnalysisTestUtil.buildLogicalPlan(query, metaStore);
return AnalysisTestUtil.buildLogicalPlan(ksqlConfig, query, metaStore);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,7 @@ private SchemaKStream buildQuery(final AggregateNode aggregateNode, final KsqlCo
private static AggregateNode buildAggregateNode(final String queryString) {
final MetaStore newMetaStore = MetaStoreFixture.getNewMetaStore(new InternalFunctionRegistry());
final KsqlBareOutputNode planNode = (KsqlBareOutputNode) AnalysisTestUtil
.buildLogicalPlan(queryString, newMetaStore);
.buildLogicalPlan(KSQL_CONFIG, queryString, newMetaStore);

return (AggregateNode) planNode.getSource();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1069,7 +1069,7 @@ private void buildJoinNode(final String queryString) {
final MetaStore metaStore = MetaStoreFixture.getNewMetaStore(new InternalFunctionRegistry());

final KsqlBareOutputNode planNode =
(KsqlBareOutputNode) AnalysisTestUtil.buildLogicalPlan(queryString, metaStore);
(KsqlBareOutputNode) AnalysisTestUtil.buildLogicalPlan(ksqlConfig, queryString, metaStore);

joinNode = (JoinNode) ((ProjectNode) planNode.getSource()).getSource();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ public class KsqlBareOutputNodeTest {
private StreamsBuilder builder;
private final MetaStore metaStore = MetaStoreFixture.getNewMetaStore(new InternalFunctionRegistry());
private final QueryId queryId = new QueryId("output-test");
private final KsqlConfig ksqlConfig = new KsqlConfig(Collections.emptyMap());

@Mock
private KsqlQueryBuilder ksqlStreamBuilder;
Expand All @@ -87,7 +88,7 @@ public void before() {
.push(inv.getArgument(0).toString()));

final KsqlBareOutputNode planNode = (KsqlBareOutputNode) AnalysisTestUtil
.buildLogicalPlan(SIMPLE_SELECT_WITH_FILTER, metaStore);
.buildLogicalPlan(ksqlConfig, SIMPLE_SELECT_WITH_FILTER, metaStore);

stream = planNode.buildStream(ksqlStreamBuilder);
}
Expand Down Expand Up @@ -134,7 +135,7 @@ public void shouldComputeQueryIdCorrectly() {
// Given:
final KsqlBareOutputNode node
= (KsqlBareOutputNode) AnalysisTestUtil
.buildLogicalPlan("select col0 from test1;", metaStore);
.buildLogicalPlan(ksqlConfig, "select col0 from test1;", metaStore);
final QueryIdGenerator queryIdGenerator = mock(QueryIdGenerator.class);

// When:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ private SchemaKGroupedTable buildSchemaKGroupedTableFromQuery(
final String query,
final String...groupByColumns
) {
final PlanNode logicalPlan = AnalysisTestUtil.buildLogicalPlan(query, metaStore);
final PlanNode logicalPlan = AnalysisTestUtil.buildLogicalPlan(ksqlConfig, query, metaStore);
final SchemaKTable<?> initialSchemaKTable = new SchemaKTable<>(
logicalPlan.getTheSourceNode().getSchema(),
kTable,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -922,7 +922,11 @@ private static KsqlSchema getJoinSchema(
}

private PlanNode givenInitialKStreamOf(final String selectQuery) {
final PlanNode logicalPlan = AnalysisTestUtil.buildLogicalPlan(selectQuery, metaStore);
final PlanNode logicalPlan = AnalysisTestUtil.buildLogicalPlan(
ksqlConfig,
selectQuery,
metaStore
);

initialSchemaKStream = new SchemaKStream(
logicalPlan.getTheSourceNode().getSchema(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -633,7 +633,11 @@ private static KsqlSchema getJoinSchema(final KsqlSchema leftSchema,
}

private List<SelectExpression> givenInitialKTableOf(final String selectQuery) {
final PlanNode logicalPlan = AnalysisTestUtil.buildLogicalPlan(selectQuery, metaStore);
final PlanNode logicalPlan = AnalysisTestUtil.buildLogicalPlan(
ksqlConfig,
selectQuery,
metaStore
);

initialSchemaKTable = new SchemaKTable<>(
logicalPlan.getTheSourceNode().getSchema(),
Expand All @@ -659,6 +663,6 @@ private List<SelectExpression> givenInitialKTableOf(final String selectQuery) {
}

private PlanNode buildLogicalPlan(final String query) {
return AnalysisTestUtil.buildLogicalPlan(query, metaStore);
return AnalysisTestUtil.buildLogicalPlan(ksqlConfig, query, metaStore);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ public void shouldWriteProcessingLogOnError() {
}

private SelectValueMapper givenSelectMapperFor(final String query) {
final PlanNode planNode = AnalysisTestUtil.buildLogicalPlan(query, metaStore);
final PlanNode planNode = AnalysisTestUtil.buildLogicalPlan(ksqlConfig, query, metaStore);
final ProjectNode projectNode = (ProjectNode) planNode.getSources().get(0);
final KsqlSchema schema = planNode.getTheSourceNode().getSchema();
final List<SelectExpression> selectExpressions = projectNode.getProjectSelectExpressions();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,11 @@ public void shouldWriteProcessingLogOnError() {
}

private SqlPredicate givenSqlPredicateFor(final String statement) {
final PlanNode logicalPlan = AnalysisTestUtil.buildLogicalPlan(statement, metaStore);
final PlanNode logicalPlan = AnalysisTestUtil.buildLogicalPlan(
ksqlConfig,
statement,
metaStore
);
final FilterNode filterNode = (FilterNode) logicalPlan.getSources().get(0).getSources().get(0);
return new SqlPredicate(
filterNode.getPredicate(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
import io.confluent.ksql.parser.tree.Statement;
import io.confluent.ksql.planner.LogicalPlanner;
import io.confluent.ksql.planner.plan.OutputNode;
import io.confluent.ksql.util.KsqlConfig;

import java.util.List;
import java.util.Optional;

Expand All @@ -42,10 +44,15 @@ public static Analysis analyzeQuery(final String queryStr, final MetaStore metaS
return new Analyzer(queryStr, metaStore).analysis;
}

public static OutputNode buildLogicalPlan(final String queryStr, final MetaStore metaStore) {
public static OutputNode buildLogicalPlan(
final KsqlConfig ksqlConfig,
final String queryStr,
final MetaStore metaStore
) {
final Analyzer analyzer = new Analyzer(queryStr, metaStore);

final LogicalPlanner logicalPlanner = new LogicalPlanner(
ksqlConfig,
analyzer.analysis,
analyzer.aggregateAnalys(),
metaStore);
Expand Down

0 comments on commit 5b72f5f

Please sign in to comment.