Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Make pull queries streamed asynchronously #6813

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Makes pull query interface async
  • Loading branch information
AlanConfluent committed Jan 27, 2021
commit f9168bd62c15bf61d591fca6757f7a65af5b5c50
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

import io.confluent.ksql.GenericRow;
import io.confluent.ksql.api.impl.BlockingQueryPublisher;
import io.confluent.ksql.api.server.PushQueryHandle;
import io.confluent.ksql.api.server.QueryHandle;
import io.confluent.ksql.query.BlockingRowQueue;
import io.confluent.ksql.query.TransientQueryQueue;
import io.confluent.ksql.util.KeyValue;
Expand All @@ -27,6 +27,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.OptionalInt;
import java.util.function.Consumer;
import org.reactivestreams.Publisher;
import org.reactivestreams.tck.PublisherVerification;
import org.reactivestreams.tck.TestEnvironment;
Expand All @@ -49,7 +50,7 @@ public Publisher<KeyValue<List<?>, GenericRow>> createPublisher(long elements) {
final Context context = vertx.getOrCreateContext();
BlockingQueryPublisher publisher = new BlockingQueryPublisher(context, workerExecutor);
final TestQueryHandle queryHandle = new TestQueryHandle(elements);
publisher.setQueryHandle(queryHandle);
publisher.setQueryHandle(queryHandle, false);
if (elements < Integer.MAX_VALUE) {
for (long l = 0; l < elements; l++) {
queryHandle.queue.acceptRow(null, generateRow(l));
Expand All @@ -71,7 +72,7 @@ private static GenericRow generateRow(long num) {
return GenericRow.fromList(l);
}

private static class TestQueryHandle implements PushQueryHandle {
private static class TestQueryHandle implements QueryHandle {

private final TransientQueryQueue queue;

Expand Down Expand Up @@ -105,5 +106,9 @@ public void stop() {
public BlockingRowQueue getQueue() {
return queue;
}

@Override
public void onException(Consumer<Throwable> onException) {
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public abstract class BasePublisher<T> implements Publisher<T> {
private long demand;
private boolean cancelled;
private boolean sentComplete;
private volatile Exception failure;
private volatile Throwable failure;

public BasePublisher(final Context ctx) {
this.ctx = Objects.requireNonNull(ctx);
Expand Down Expand Up @@ -75,7 +75,7 @@ protected void checkContext() {
VertxUtils.checkContext(ctx);
}

protected final void sendError(final Exception e) {
protected final void sendError(final Throwable e) {
checkContext();
try {
if (subscriber != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

import io.confluent.ksql.engine.KsqlEngine;
import io.confluent.ksql.engine.KsqlPlan;
import io.confluent.ksql.execution.streams.RoutingFilter.RoutingFilterFactory;
import io.confluent.ksql.execution.streams.RoutingOptions;
import io.confluent.ksql.internal.PullQueryExecutorMetrics;
import io.confluent.ksql.logging.processing.ProcessingLogContext;
Expand Down Expand Up @@ -145,18 +144,19 @@ TransientQueryMetadata executeQuery(
* plan. The physical plan is then traversed for every row in the state store.
* @param serviceContext The service context to execute the query in
* @param statement The pull query
* @param routingFilterFactory The filters used to route requests for HA routing
* @param routingOptions Configuration parameters used for routing requests
* @param pullQueryMetrics JMX metrics
* @param startImmediately Whether to start the pull query immediately. If not, the caller must
* call PullQueryResult.start to start the query.
* @return the rows that are the result of the query evaluation.
*/
PullQueryResult executePullQuery(
ServiceContext serviceContext,
ConfiguredStatement<Query> statement,
HARouting routing,
RoutingFilterFactory routingFilterFactory,
RoutingOptions routingOptions,
Optional<PullQueryExecutorMetrics> pullQueryMetrics
Optional<PullQueryExecutorMetrics> pullQueryMetrics,
boolean startImmediately
);

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import io.confluent.ksql.config.SessionConfig;
import io.confluent.ksql.execution.ddl.commands.DdlCommand;
import io.confluent.ksql.execution.plan.ExecutionStep;
import io.confluent.ksql.execution.streams.RoutingFilter.RoutingFilterFactory;
import io.confluent.ksql.execution.streams.RoutingOptions;
import io.confluent.ksql.internal.PullQueryExecutorMetrics;
import io.confluent.ksql.metastore.model.DataSource;
Expand All @@ -41,6 +40,7 @@
import io.confluent.ksql.physical.pull.HARouting;
import io.confluent.ksql.physical.pull.PullPhysicalPlan;
import io.confluent.ksql.physical.pull.PullPhysicalPlanBuilder;
import io.confluent.ksql.physical.pull.PullQueryQueuePopulator;
import io.confluent.ksql.physical.pull.PullQueryResult;
import io.confluent.ksql.planner.LogicalPlanNode;
import io.confluent.ksql.planner.LogicalPlanner;
Expand All @@ -49,6 +49,7 @@
import io.confluent.ksql.planner.plan.KsqlStructuredDataOutputNode;
import io.confluent.ksql.planner.plan.OutputNode;
import io.confluent.ksql.planner.plan.PlanNode;
import io.confluent.ksql.query.PullQueryQueue;
import io.confluent.ksql.query.QueryExecutor;
import io.confluent.ksql.query.QueryId;
import io.confluent.ksql.schema.ksql.LogicalSchema;
Expand Down Expand Up @@ -137,17 +138,16 @@ ExecuteResult execute(final KsqlPlan plan) {
* Evaluates a pull query by first analyzing it, then building the logical plan and finally
* the physical plan. The execution is then done using the physical plan in a pipelined manner.
* @param statement The pull query
* @param routingFilterFactory The filters used for HA routing
* @param routingOptions Configuration parameters used for HA routing
* @param pullQueryMetrics JMX metrics
* @return the rows that are the result of evaluating the pull query
*/
PullQueryResult executePullQuery(
final ConfiguredStatement<Query> statement,
final HARouting routing,
final RoutingFilterFactory routingFilterFactory,
final RoutingOptions routingOptions,
final Optional<PullQueryExecutorMetrics> pullQueryMetrics
final Optional<PullQueryExecutorMetrics> pullQueryMetrics,
final boolean startImmediately
) {

if (!statement.getStatement().isPullQuery()) {
Expand All @@ -168,10 +168,17 @@ PullQueryResult executePullQuery(
logicalPlan,
analysis
);
return routing.handlePullQuery(
final PullQueryQueue pullQueryQueue = new PullQueryQueue();
final PullQueryQueuePopulator populator = () -> routing.handlePullQuery(
serviceContext,
physicalPlan, statement, routingOptions, physicalPlan.getOutputSchema(),
physicalPlan.getQueryId());
physicalPlan.getQueryId(), pullQueryQueue);
final PullQueryResult result = new PullQueryResult(physicalPlan.getOutputSchema(), populator,
physicalPlan.getQueryId(), pullQueryQueue, pullQueryMetrics);
if (startImmediately) {
result.start();
}
return result;
} catch (final Exception e) {
pullQueryMetrics.ifPresent(metrics -> metrics.recordErrorRate(1));
throw new KsqlStatementException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import com.google.common.collect.ImmutableList;
import io.confluent.ksql.KsqlExecutionContext;
import io.confluent.ksql.ServiceInfo;
import io.confluent.ksql.execution.streams.RoutingFilter.RoutingFilterFactory;
import io.confluent.ksql.execution.streams.RoutingOptions;
import io.confluent.ksql.function.FunctionRegistry;
import io.confluent.ksql.internal.KsqlEngineMetrics;
Expand Down Expand Up @@ -270,9 +269,9 @@ public PullQueryResult executePullQuery(
final ServiceContext serviceContext,
final ConfiguredStatement<Query> statement,
final HARouting routing,
final RoutingFilterFactory routingFilterFactory,
final RoutingOptions routingOptions,
final Optional<PullQueryExecutorMetrics> pullQueryMetrics
final Optional<PullQueryExecutorMetrics> pullQueryMetrics,
final boolean startImmediately
) {
return EngineExecutor
.create(
Expand All @@ -283,9 +282,9 @@ public PullQueryResult executePullQuery(
.executePullQuery(
statement,
routing,
routingFilterFactory,
routingOptions,
pullQueryMetrics
pullQueryMetrics,
startImmediately
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

import com.google.common.collect.ImmutableList;
import io.confluent.ksql.KsqlExecutionContext;
import io.confluent.ksql.execution.streams.RoutingFilter.RoutingFilterFactory;
import io.confluent.ksql.execution.streams.RoutingOptions;
import io.confluent.ksql.internal.PullQueryExecutorMetrics;
import io.confluent.ksql.logging.processing.NoopProcessingLogContext;
Expand Down Expand Up @@ -164,9 +163,9 @@ public PullQueryResult executePullQuery(
final ServiceContext serviceContext,
final ConfiguredStatement<Query> statement,
final HARouting routing,
final RoutingFilterFactory routingFilterFactory,
final RoutingOptions routingOptions,
final Optional<PullQueryExecutorMetrics> pullQueryMetrics
final Optional<PullQueryExecutorMetrics> pullQueryMetrics,
final boolean startImmediately
) {
return EngineExecutor.create(
engineContext,
Expand All @@ -175,9 +174,9 @@ public PullQueryResult executePullQuery(
).executePullQuery(
statement,
routing,
routingFilterFactory,
routingOptions,
pullQueryMetrics
pullQueryMetrics,
startImmediately
);
}
}
Loading