Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: terminate persistent query on DROP command #6143

Merged
merged 6 commits into from
Nov 6, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
refactor: call getQueriesWithSink() from KsqlEngine instead of Metastore
  • Loading branch information
spena committed Nov 5, 2020
commit de62bb3e8e0be6cd63b11498be646ae47e4c08f5
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.confluent.ksql.engine.KsqlPlan;
import io.confluent.ksql.logging.processing.ProcessingLogContext;
import io.confluent.ksql.metastore.MetaStore;
import io.confluent.ksql.name.SourceName;
import io.confluent.ksql.parser.KsqlParser.ParsedStatement;
import io.confluent.ksql.parser.KsqlParser.PreparedStatement;
import io.confluent.ksql.parser.tree.Query;
Expand All @@ -35,6 +36,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;

/**
* The context in which statements can be executed.
Expand Down Expand Up @@ -80,6 +82,14 @@ public interface KsqlExecutionContext {
*/
List<PersistentQueryMetadata> getPersistentQueries();

/**
* Retrieves the list of all queries writing to this {@code SourceName}.
*
* @param sourceName the sourceName of the queries to retrieve.
* @return the list of queries.
*/
Set<String> getQueriesWithSink(SourceName sourceName);

/**
* Retrieves the list of all running queries.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,10 @@ Map<QueryId, PersistentQueryMetadata> getPersistentQueries() {
return Collections.unmodifiableMap(persistentQueries);
}

Set<String> getQueriesWithSink(final SourceName sourceName) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any reason not to make this a Set<QueryId>? I feel that keeping this strongly typed is a benefit, especially if we want to leverage QueryId more heavily in the future

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No reason. It's actually better your suggestion.
Done.

return metaStore.getQueriesWithSink(sourceName);
}

MutableMetaStore getMetaStore() {
return metaStore;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ private ExecutorPlans planQuery(
Optional.of(outputNode)
);
final QueryId queryId = QueryIdUtil.buildId(
engineContext.getMetaStore(),
engineContext,
engineContext.idGenerator(),
outputNode,
ksqlConfig.getBoolean(KsqlConfig.KSQL_CREATE_OR_REPLACE_ENABLED)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import io.confluent.ksql.metastore.MetaStore;
import io.confluent.ksql.metastore.MetaStoreImpl;
import io.confluent.ksql.metastore.MutableMetaStore;
import io.confluent.ksql.name.SourceName;
import io.confluent.ksql.parser.KsqlParser.ParsedStatement;
import io.confluent.ksql.parser.KsqlParser.PreparedStatement;
import io.confluent.ksql.parser.tree.ExecutableDdlStatement;
Expand All @@ -46,6 +47,7 @@
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -134,6 +136,11 @@ public List<PersistentQueryMetadata> getPersistentQueries() {
return ImmutableList.copyOf(primaryContext.getPersistentQueries().values());
}

@Override
public Set<String> getQueriesWithSink(final SourceName sourceName) {
return primaryContext.getQueriesWithSink(sourceName);
}

@Override
public List<QueryMetadata> getAllLiveQueries() {
return primaryContext.getAllLiveQueries();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
package io.confluent.ksql.engine;

import com.google.common.collect.Iterables;
import io.confluent.ksql.metastore.MetaStore;
import io.confluent.ksql.metastore.model.DataSource.DataSourceType;
import io.confluent.ksql.name.SourceName;
import io.confluent.ksql.planner.plan.KsqlStructuredDataOutputNode;
Expand All @@ -39,14 +38,14 @@ private QueryIdUtil() {
/**
* Builds a {@link QueryId} for a physical plan specification.
*
* @param metaStore the meta store representing the current state of the engine
* @param engineContext the context representing the current state of the engine
* @param idGenerator generates query ids
* @param outputNode the logical plan
* @param createOrReplaceEnabled whether or not the queryID can replace an existing one
* @return the {@link QueryId} to be used
*/
static QueryId buildId(
final MetaStore metaStore,
final EngineContext engineContext,
final QueryIdGenerator idGenerator,
final OutputNode outputNode,
final boolean createOrReplaceEnabled) {
Expand All @@ -60,7 +59,7 @@ static QueryId buildId(
}

final SourceName sink = outputNode.getSinkName().get();
final Set<String> queriesForSink = metaStore.getQueriesWithSink(sink);
final Set<String> queriesForSink = engineContext.getQueriesWithSink(sink);
if (queriesForSink.size() > 1) {
throw new KsqlException("REPLACE for sink " + sink + " is not supported because there are "
+ "multiple queries writing into it: " + queriesForSink);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.confluent.ksql.logging.processing.NoopProcessingLogContext;
import io.confluent.ksql.logging.processing.ProcessingLogContext;
import io.confluent.ksql.metastore.MetaStore;
import io.confluent.ksql.name.SourceName;
import io.confluent.ksql.parser.KsqlParser.ParsedStatement;
import io.confluent.ksql.parser.KsqlParser.PreparedStatement;
import io.confluent.ksql.parser.tree.Query;
Expand All @@ -34,6 +35,7 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;

/**
* An execution context that can execute statements without changing the core engine's state
Expand Down Expand Up @@ -81,6 +83,11 @@ public List<PersistentQueryMetadata> getPersistentQueries() {
return ImmutableList.copyOf(engineContext.getPersistentQueries().values());
}

@Override
public Set<String> getQueriesWithSink(final SourceName sourceName) {
return engineContext.getQueriesWithSink(sourceName);
}

@Override
public List<QueryMetadata> getAllLiveQueries() {
return ImmutableList.of();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public class QueryIdUtilTest {
@Mock
private QueryIdGenerator idGenerator;
@Mock
private MetaStore metaStore;
private EngineContext engineContext;

@Test
public void shouldGenerateUniqueRandomIdsForTransientQueries() {
Expand All @@ -59,7 +59,7 @@ public void shouldGenerateUniqueRandomIdsForTransientQueries() {

// When:
long numUniqueIds = IntStream.range(0, 100)
.mapToObj(i -> QueryIdUtil.buildId(metaStore, idGenerator, transientPlan, false))
.mapToObj(i -> QueryIdUtil.buildId(engineContext, idGenerator, transientPlan, false))
.distinct()
.count();

Expand All @@ -74,7 +74,7 @@ public void shouldComputeQueryIdCorrectlyForInsertInto() {
when(idGenerator.getNext()).thenReturn("1");

// When:
final QueryId queryId = QueryIdUtil.buildId(metaStore, idGenerator, plan, false);
final QueryId queryId = QueryIdUtil.buildId(engineContext, idGenerator, plan, false);

// Then:
assertThat(queryId, is(new QueryId("INSERTQUERY_1")));
Expand All @@ -88,10 +88,10 @@ public void shouldComputeQueryIdCorrectlyForNewStream() {
when(plan.getNodeOutputType()).thenReturn(DataSourceType.KSTREAM);
when(plan.createInto()).thenReturn(true);
when(idGenerator.getNext()).thenReturn("1");
when(metaStore.getQueriesWithSink(SINK)).thenReturn(ImmutableSet.of());
when(engineContext.getQueriesWithSink(SINK)).thenReturn(ImmutableSet.of());

// When:
final QueryId queryId = QueryIdUtil.buildId(metaStore, idGenerator, plan, false);
final QueryId queryId = QueryIdUtil.buildId(engineContext, idGenerator, plan, false);

// Then:
assertThat(queryId, is(new QueryId("CSAS_FOO_1")));
Expand All @@ -105,10 +105,10 @@ public void shouldComputeQueryIdCorrectlyForNewTable() {
when(plan.getNodeOutputType()).thenReturn(DataSourceType.KTABLE);
when(plan.createInto()).thenReturn(true);
when(idGenerator.getNext()).thenReturn("1");
when(metaStore.getQueriesWithSink(SINK)).thenReturn(ImmutableSet.of());
when(engineContext.getQueriesWithSink(SINK)).thenReturn(ImmutableSet.of());

// When:
final QueryId queryId = QueryIdUtil.buildId(metaStore, idGenerator, plan, false);
final QueryId queryId = QueryIdUtil.buildId(engineContext, idGenerator, plan, false);

// Then:
assertThat(queryId, is(new QueryId("CTAS_FOO_1")));
Expand All @@ -119,10 +119,10 @@ public void shouldReuseExistingQueryId() {
// Given:
when(plan.getSinkName()).thenReturn(Optional.of(SINK));
when(plan.createInto()).thenReturn(true);
when(metaStore.getQueriesWithSink(SINK)).thenReturn(ImmutableSet.of("CTAS_FOO_10"));
when(engineContext.getQueriesWithSink(SINK)).thenReturn(ImmutableSet.of("CTAS_FOO_10"));

// When:
final QueryId queryId = QueryIdUtil.buildId(metaStore, idGenerator, plan, true);
final QueryId queryId = QueryIdUtil.buildId(engineContext, idGenerator, plan, true);

// Then:
assertThat(queryId, is(new QueryId("CTAS_FOO_10")));
Expand All @@ -134,21 +134,21 @@ public void shouldThrowOnReuseIfCreateOrReplacedIsDisabled() {
when(plan.getSinkName()).thenReturn(Optional.of(SINK));
when(plan.createInto()).thenReturn(true);
when(plan.getNodeOutputType()).thenReturn(DataSourceType.KSTREAM);
when(metaStore.getQueriesWithSink(SINK)).thenReturn(ImmutableSet.of("CTAS_FOO_10"));
when(engineContext.getQueriesWithSink(SINK)).thenReturn(ImmutableSet.of("CTAS_FOO_10"));

// When:
QueryIdUtil.buildId(metaStore, idGenerator, plan, false);
QueryIdUtil.buildId(engineContext, idGenerator, plan, false);
}

@Test
public void shouldThrowIfMultipleQueriesExist() {
// Given:
when(plan.getSinkName()).thenReturn(Optional.of(SINK));
when(plan.createInto()).thenReturn(true);
when(metaStore.getQueriesWithSink(SINK)).thenReturn(ImmutableSet.of("CTAS_FOO_1", "INSERTQUERY_1"));
when(engineContext.getQueriesWithSink(SINK)).thenReturn(ImmutableSet.of("CTAS_FOO_1", "INSERTQUERY_1"));

// When:
final KsqlException e = assertThrows(KsqlException.class, () -> QueryIdUtil.buildId(metaStore, idGenerator, plan, false));
final KsqlException e = assertThrows(KsqlException.class, () -> QueryIdUtil.buildId(engineContext, idGenerator, plan, false));

// Then:
assertThat(e.getMessage(), containsString("there are multiple queries writing"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@
import io.confluent.ksql.execution.transform.select.SelectValueMapperFactory;
import io.confluent.ksql.execution.util.ExpressionTypeManager;
import io.confluent.ksql.logging.processing.ProcessingLogger;
import io.confluent.ksql.metastore.MetaStore;
import io.confluent.ksql.metastore.model.DataSource;
import io.confluent.ksql.metastore.model.DataSource.DataSourceType;
import io.confluent.ksql.model.WindowType;
Expand Down Expand Up @@ -1239,11 +1238,9 @@ private static PersistentQueryMetadata findMaterializingQuery(
final KsqlExecutionContext executionContext,
final ImmutableAnalysis analysis
) {
final MetaStore metaStore = executionContext.getMetaStore();

final SourceName sourceName = getSourceName(analysis);

final Set<String> queries = metaStore.getQueriesWithSink(sourceName);
final Set<String> queries = executionContext.getQueriesWithSink(sourceName);
if (queries.isEmpty()) {
throw notMaterializedException(sourceName);
}
Expand Down