Skip to content

Commit

Permalink
feat: Adds ScalablePushRegistry and peeking ability in a persistent q…
Browse files Browse the repository at this point in the history
…uery (#7424)

* feat: Adds ScalablePushRegistry and peeking ability in a persistent query
  • Loading branch information
AlanConfluent committed May 10, 2021
1 parent 344d36d commit 89c1588
Show file tree
Hide file tree
Showing 18 changed files with 887 additions and 23 deletions.
14 changes: 14 additions & 0 deletions ksqldb-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,13 @@ public class KsqlConfig extends AbstractConfig {
+ " much faster for short-lived queries.";
public static final boolean KSQL_QUERY_PULL_INTERPRETER_ENABLED_DEFAULT = true;

public static final String KSQL_QUERY_PUSH_SCALABLE_ENABLED
= "ksql.query.push.scalable.enabled";
public static final String KSQL_QUERY_PUSH_SCALABLE_ENABLED_DOC =
"Enables whether scalable push queries are enabled. Scalable push queries require no window "
+ "functions, aggregations, or joins, but may include projections and filters.";
public static final boolean KSQL_QUERY_PUSH_SCALABLE_ENABLED_DEFAULT = false;

public static final String KSQL_STRING_CASE_CONFIG_TOGGLE = "ksql.cast.strings.preserve.nulls";
public static final String KSQL_STRING_CASE_CONFIG_TOGGLE_DOC =
"When casting a SQLType to string, if false, use String.valueof(), else if true use"
Expand Down Expand Up @@ -864,6 +871,13 @@ private static ConfigDef buildConfigDef(final ConfigGeneration generation) {
Importance.LOW,
KSQL_QUERY_PULL_INTERPRETER_ENABLED_DOC
)
.define(
KSQL_QUERY_PUSH_SCALABLE_ENABLED,
Type.BOOLEAN,
KSQL_QUERY_PUSH_SCALABLE_ENABLED_DEFAULT,
Importance.LOW,
KSQL_QUERY_PUSH_SCALABLE_ENABLED_DOC
)
.define(
KSQL_ERROR_CLASSIFIER_REGEX_PREFIX,
Type.STRING,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/*
* Copyright 2021 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.physical.scalablepush;

import io.confluent.ksql.execution.streams.materialization.TableRow;
import io.confluent.ksql.query.QueryId;
import java.util.ArrayDeque;
import java.util.Deque;

/**
* A queue for storing pre-processed rows for a given scalable push query request. This queue
* starts dropping rows if they're past the capacity, and keeps track so it can be reported to the
* request.
*
* <p>The class is threadsafe since it's assumed that different threads are producing and consuming
* the data.
*/
public class ProcessingQueue {

static final int BLOCKING_QUEUE_CAPACITY = 100;

private final Deque<TableRow> rowQueue;
private final QueryId queryId;
private final int queueSizeLimit;
private boolean closed = false;
private boolean droppedRows = false;
private Runnable newRowCallback = () -> { };

public ProcessingQueue(final QueryId queryId) {
this(queryId, BLOCKING_QUEUE_CAPACITY);
}

public ProcessingQueue(final QueryId queryId, final int queueSizeLimit) {
this.queryId = queryId;
this.queueSizeLimit = queueSizeLimit;
this.rowQueue = new ArrayDeque<>();
}

/**
* Adds a {@link TableRow} to the queue. This is expected to be called from the processor streams
* thread when a new row arrives.
* @param tableRow The row to add
* @return if the row has been successfully added to the queue or if it's been dropped due to
* being at the size limit.
*/
public synchronized boolean offer(final TableRow tableRow) {
if (closed) {
return false;
} else if (rowQueue.size() < queueSizeLimit && !droppedRows) {
rowQueue.offer(tableRow);
newRowCallback.run();
return true;
}
droppedRows = true;
return false;
}

/**
* Reads a row from the queue. This is expected to be called from the plan's physical operator
* which is called from the Vertx context.
* @return The next row or null if either the queue is closed or there's no data to return.
*/
public synchronized TableRow poll() {
if (!closed) {
return rowQueue.poll();
}
return null;
}

/**
* Closes the queue which causes rows to stop being returned.
*/
public synchronized void close() {
closed = true;
}

public synchronized boolean isClosed() {
return closed;
}

/**
* Sets a callback which is invoked every time a new row has been enqueued.
* @param newRowCallback The callback to invoke
*/
public synchronized void setNewRowCallback(final Runnable newRowCallback) {
this.newRowCallback = newRowCallback;
}

/**
* Whether rows have been dropped due to hitting the queue limit.
*/
public synchronized boolean hasDroppedRows() {
return droppedRows;
}

public QueryId getQueryId() {
return queryId;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
/*
* Copyright 2021 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.physical.scalablepush;

import com.google.common.annotations.VisibleForTesting;
import io.confluent.ksql.GenericKey;
import io.confluent.ksql.GenericRow;
import io.confluent.ksql.execution.streams.materialization.Row;
import io.confluent.ksql.execution.streams.materialization.TableRow;
import io.confluent.ksql.execution.streams.materialization.WindowedRow;
import io.confluent.ksql.physical.scalablepush.locator.AllHostsLocator;
import io.confluent.ksql.physical.scalablepush.locator.PushLocator;
import io.confluent.ksql.query.QueryId;
import io.confluent.ksql.schema.ksql.LogicalSchema;
import io.confluent.ksql.util.PersistentQueryMetadata;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Supplier;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.ProcessorSupplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* This registry is kept with every persistent query, peeking at the stream which is the output
* of the topology. These rows are then fed to any registered ProcessingQueues where they are
* eventually passed on to scalable push queries.
*/
public class ScalablePushRegistry implements ProcessorSupplier<Object, GenericRow> {

private static final Logger LOG = LoggerFactory.getLogger(ScalablePushRegistry.class);

private final PushLocator pushLocator;
private final LogicalSchema logicalSchema;
private final boolean windowed;
// All mutable field accesses are protected with synchronized. The exception is when
// processingQueues is accessed to processed rows, in which case we want a weakly consistent
// view of the map, so we just iterate over the ConcurrentHashMap directly.
private final ConcurrentHashMap<QueryId, ProcessingQueue> processingQueues
= new ConcurrentHashMap<>();
private boolean closed = false;

public ScalablePushRegistry(
final PushLocator pushLocator,
final LogicalSchema logicalSchema,
final boolean windowed
) {
this.pushLocator = pushLocator;
this.logicalSchema = logicalSchema;
this.windowed = windowed;
}

public synchronized void close() {
for (ProcessingQueue queue : processingQueues.values()) {
queue.close();
}
processingQueues.clear();
closed = true;
}

public synchronized void register(final ProcessingQueue processingQueue) {
if (closed) {
throw new IllegalStateException("Shouldn't register after closing");
}
processingQueues.put(processingQueue.getQueryId(), processingQueue);
}

public synchronized void unregister(final ProcessingQueue processingQueue) {
if (closed) {
throw new IllegalStateException("Shouldn't unregister after closing");
}
processingQueues.remove(processingQueue.getQueryId());
}

public PushLocator getLocator() {
return pushLocator;
}

@VisibleForTesting
int numRegistered() {
return processingQueues.size();
}

@SuppressWarnings("unchecked")
private void handleRow(
final Object key, final GenericRow value, final long timestamp) {
for (ProcessingQueue queue : processingQueues.values()) {
try {
// The physical operators may modify the keys and values, so we make a copy to ensure
// that there's no cross-query interference.
final TableRow row;
if (!windowed) {
final GenericKey keyCopy = GenericKey.fromList(((GenericKey) key).values());
final GenericRow valueCopy = GenericRow.fromList(value.values());
row = Row.of(logicalSchema, keyCopy, valueCopy, timestamp);
} else {
final Windowed<GenericKey> windowedKey = (Windowed<GenericKey>) key;
final Windowed<GenericKey> keyCopy =
new Windowed<>(GenericKey.fromList(windowedKey.key().values()),
windowedKey.window());
final GenericRow valueCopy = GenericRow.fromList(value.values());
row = WindowedRow.of(logicalSchema, keyCopy, valueCopy, timestamp);
}
queue.offer(row);
} catch (final Throwable t) {
LOG.error("Error while offering row", t);
}
}
}

@Override
public Processor<Object, GenericRow> get() {
return new PeekProcessor();
}

private final class PeekProcessor implements Processor<Object, GenericRow> {

private ProcessorContext context;

private PeekProcessor() {
}

public void init(final ProcessorContext context) {
this.context = context;
}

public void process(final Object key, final GenericRow value) {
handleRow(key, value, this.context.timestamp());
this.context.forward(key, value);
}

@Override
public void close() {
}
}

public static Optional<ScalablePushRegistry> create(
final LogicalSchema logicalSchema,
final Supplier<List<PersistentQueryMetadata>> allPersistentQueries,
final boolean windowed,
final Map<String, Object> streamsProperties
) {
final Object appServer = streamsProperties.get(StreamsConfig.APPLICATION_SERVER_CONFIG);
if (appServer == null) {
return Optional.empty();
}

if (!(appServer instanceof String)) {
throw new IllegalArgumentException(StreamsConfig.APPLICATION_SERVER_CONFIG + " not String");
}

final URL localhost;
try {
localhost = new URL((String) appServer);
} catch (final MalformedURLException e) {
throw new IllegalArgumentException(StreamsConfig.APPLICATION_SERVER_CONFIG + " malformed: "
+ "'" + appServer + "'");
}

final PushLocator pushLocator = new AllHostsLocator(allPersistentQueries, localhost);
return Optional.of(new ScalablePushRegistry(pushLocator, logicalSchema, windowed));
}
}
Loading

0 comments on commit 89c1588

Please sign in to comment.