Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add support for IN clause to pull queries #6409

Merged
merged 18 commits into from
Oct 22, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
mvn validate
  • Loading branch information
AlanConfluent committed Oct 16, 2020
commit 41f34c1a8c14713ab3df1320e63087c224c4f757
Original file line number Diff line number Diff line change
Expand Up @@ -233,11 +233,11 @@ public PullQueryResult execute(
.getMaterialization(queryId, contextStacker)
.orElseThrow(() -> notMaterializedException(getSourceName(analysis)));

List<Optional<KsqlNode>> sourceNodes = new ArrayList<>();
List<List<?>> tableRows = new ArrayList<>();
List<LogicalSchema> schemas = new ArrayList<>();
List<Future<PullQueryResult>> futures = new ArrayList<>();
List<List<Struct>> keysByLocation = mat.locator().groupByLocation(
final List<Optional<KsqlNode>> sourceNodes = new ArrayList<>();
final List<List<?>> tableRows = new ArrayList<>();
final List<LogicalSchema> schemas = new ArrayList<>();
final List<Future<PullQueryResult>> futures = new ArrayList<>();
final List<List<Struct>> keysByLocation = mat.locator().groupByLocation(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why don't you return besides they keys also the active, standby per group? This way you wouldn't need to do locate twice basically.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, I think currently the routing is wrong and you have to return the <active,standby> per group

Copy link
Member Author

@AlanConfluent AlanConfluent Oct 13, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I had thought that a given set of partitions were grouped together at each active and standby, but I think you're right this isn't the case. I'll change it to groupByActiveStandyList or something similar. In practice, there aren't too many standbys, so this is likely to be a lot better than grouping by partition or just fetching by key.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I reworked this so that it now groups key together only if they share the same list of nodes, including active and all standbys. Most of the time if there's 1 or 2 standbys and lots of keys fetched, this will hopefully reduce unnecessary calls.

whereInfo.keysBound.stream()
.map(keyBound -> asKeyStruct(keyBound, query.getPhysicalSchema()))
.collect(Collectors.toList()));
Expand Down Expand Up @@ -286,7 +286,7 @@ public PullQueryResult execute(
}

static void validateSchemas(final List<LogicalSchema> schemas) {
LogicalSchema schema = Iterables.getLast(schemas);
final LogicalSchema schema = Iterables.getLast(schemas);
for (LogicalSchema s : schemas) {
if (!schema.equals(s)) {
throw new KsqlException("Schemas from different hosts should be identical");
Expand Down Expand Up @@ -334,7 +334,7 @@ private PullQueryResult handlePullQuery(
= routeQuery(node, statement, executionContext, serviceContext, pullQueryContext);
final Optional<KsqlNode> debugNode = Optional.ofNullable(
routingOptions.isDebugRequest() ? node : null);
List<Optional<KsqlNode>> debugNodes = rows.getRows().stream()
final List<Optional<KsqlNode>> debugNodes = rows.getRows().stream()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: probably makes sense to make this Optional<List<KsqlNode>> instead of List<Optional<KsqlNode>> to prevent creating a list of empty optionals in the non-debug case. And then you can also use Collections#nCopies to make it a little more readable

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had been thinking that maybe this was necessary because different machines can have different configs, but I now remember that this is a request config, so it should always work. Ok, made it Optional<List<KsqlNode>>

.map(r -> debugNode)
.collect(Collectors.toList());
return new PullQueryResult(rows, debugNodes);
Expand Down Expand Up @@ -381,15 +381,15 @@ private static TableRows queryRowsLocally(
if (pullQueryContext.whereInfo.windowBounds.isPresent()) {
final WindowBounds windowBounds = pullQueryContext.whereInfo.windowBounds.get();

ImmutableList.Builder<TableRow> allRows = ImmutableList.builder();
final ImmutableList.Builder<TableRow> allRows = ImmutableList.builder();
for (Struct key : pullQueryContext.keys) {
final List<? extends TableRow> rows = pullQueryContext.mat.windowed()
.get(key, windowBounds.start, windowBounds.end);
allRows.addAll(rows);
}
result = new Result(pullQueryContext.mat.schema(), allRows.build());
} else {
ImmutableList.Builder<TableRow> allRows = ImmutableList.builder();
final ImmutableList.Builder<TableRow> allRows = ImmutableList.builder();
for (Struct key : pullQueryContext.keys) {
final List<? extends TableRow> rows = pullQueryContext.mat.nonWindowed()
.get(key)
Expand Down Expand Up @@ -622,8 +622,8 @@ private static List<Object> extractKeysFromInPredicate(
final boolean windowed,
final LogicalSchema schema
) {
InPredicate inPredicate = Iterables.getLast(inPredicates);
List<Object> result = new ArrayList<>();
final InPredicate inPredicate = Iterables.getLast(inPredicates);
final List<Object> result = new ArrayList<>();
for (Expression expression : inPredicate.getValueList().getValues()) {
if (!(expression instanceof Literal)) {
throw new KsqlException("Ony comparison to literals is currently supported: "
agavra marked this conversation as resolved.
Show resolved Hide resolved
Expand Down Expand Up @@ -826,22 +826,6 @@ private static Instant asInstant(final Expression other) {
);
}

private static KeyAndWindowBounds extractWhereClauseTarget(
final InPredicate inPredicate,
final PersistentQueryMetadata query
) {
UnqualifiedColumnReferenceExp column = (UnqualifiedColumnReferenceExp) inPredicate.getValue();
final ColumnName keyColumn = Iterables.getOnlyElement(query.getLogicalSchema().key()).name();
if (column.getColumnName().equals(keyColumn)) {
return new KeyAndWindowBounds().addInPredicate(inPredicate);
}

throw invalidWhereClauseException(
"IN expression on unsupported column: " + column.getColumnName().text(),
false
);
}

private enum ComparisonTarget {
WINDOWSTART,
WINDOWEND
Expand All @@ -853,30 +837,32 @@ private static class KeyAndWindowBounds {
private List<ComparisonExpression> windowEndExpression = new ArrayList<>();
private List<InPredicate> inPredicate = new ArrayList<>();

public KeyAndWindowBounds() {
KeyAndWindowBounds() {
}

public KeyAndWindowBounds addKeyColExpression(ComparisonExpression keyColExpression) {
public KeyAndWindowBounds addKeyColExpression(final ComparisonExpression keyColExpression) {
this.keyColExpression.add(keyColExpression);
return this;
}

public KeyAndWindowBounds addWindowStartExpression(ComparisonExpression windowStartExpression) {
public KeyAndWindowBounds addWindowStartExpression(
final ComparisonExpression windowStartExpression) {
this.windowStartExpression.add(windowStartExpression);
return this;
}

public KeyAndWindowBounds addWindowEndExpression(ComparisonExpression windowEndExpression) {
public KeyAndWindowBounds addWindowEndExpression(
final ComparisonExpression windowEndExpression) {
this.windowEndExpression.add(windowEndExpression);
return this;
}

public KeyAndWindowBounds addInPredicate(InPredicate inPredicate) {
public KeyAndWindowBounds addInPredicate(final InPredicate inPredicate) {
this.inPredicate.add(inPredicate);
return this;
}

public KeyAndWindowBounds merge(KeyAndWindowBounds other) {
public KeyAndWindowBounds merge(final KeyAndWindowBounds other) {
keyColExpression.addAll(other.keyColExpression);
windowStartExpression.addAll(other.windowStartExpression);
windowEndExpression.addAll(other.windowEndExpression);
Expand Down Expand Up @@ -962,6 +948,23 @@ private static KeyAndWindowBounds extractWhereClauseTarget(
);
}

private static KeyAndWindowBounds extractWhereClauseTarget(
final InPredicate inPredicate,
final PersistentQueryMetadata query
) {
final UnqualifiedColumnReferenceExp column
= (UnqualifiedColumnReferenceExp) inPredicate.getValue();
final ColumnName keyColumn = Iterables.getOnlyElement(query.getLogicalSchema().key()).name();
if (column.getColumnName().equals(keyColumn)) {
return new KeyAndWindowBounds().addInPredicate(inPredicate);
}

throw invalidWhereClauseException(
"IN expression on unsupported column: " + column.getColumnName().text(),
false
);
}

private static boolean isSelectStar(final Select select) {
final boolean someStars = select.getSelectItems().stream()
.anyMatch(s -> s instanceof AllColumns);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import io.confluent.ksql.services.ServiceContext;
import io.confluent.ksql.statement.ConfiguredStatement;
import io.confluent.ksql.util.Pair;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,11 @@
import java.net.URI;
import java.net.URL;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Objects;
import java.util.Set;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -122,15 +124,18 @@ public List<KsqlNode> locate(
}

@Override
public List<List<Struct>> groupByLocation(List<Struct> keys) {
Map<String, List<Struct>> groups = new HashMap<>();
public List<List<Struct>> groupByLocation(final List<Struct> keys) {
final Map<String, List<Struct>> groups = new HashMap<>();
for (Struct key : keys) {
AlanConfluent marked this conversation as resolved.
Show resolved Hide resolved
final KeyQueryMetadata metadata = kafkaStreams
.queryMetadataForKey(stateStoreName, key, keySerializer);
groups.computeIfAbsent(metadata.activeHost().toString(), active -> new ArrayList<>());
groups.get(metadata.activeHost().toString()).add(key);
}
return ImmutableList.copyOf(groups.values());
return groups.entrySet().stream()
.sorted(Comparator.comparing(Entry::getKey))
.map(Entry::getValue)
.collect(Collectors.toList());
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static org.junit.Assert.assertThrows;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import com.google.common.collect.ImmutableList;
Expand All @@ -43,6 +44,7 @@
import java.util.List;
import java.util.Optional;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.streams.KafkaStreams;
Expand All @@ -60,7 +62,10 @@ public class KsLocatorTest {
private static final String APPLICATION_ID = "app_id";
private static final String STORE_NAME = "someStoreName";
private static final URL LOCAL_HOST_URL = localHost();
private static final Struct SOME_KEY = new Struct(SchemaBuilder.struct().build());
private static final Schema SCHEMA = SchemaBuilder.struct().field("a", SchemaBuilder.int32());
private static final Struct SOME_KEY = new Struct(SCHEMA).put("a", 1);
private static final Struct SOME_KEY1 = new Struct(SCHEMA).put("a", 2);
private static final Struct SOME_KEY2 = new Struct(SCHEMA).put("a", 3);

@Mock
private KafkaStreams kafkaStreams;
Expand Down Expand Up @@ -156,7 +161,7 @@ public void shouldThrowIfMetadataNotAvailable() {

// Then:
assertThat(e.getMessage(), containsString(
"KeyQueryMetadata not available for state store someStoreName and key Struct{}"));
"KeyQueryMetadata not available for state store someStoreName and key Struct{a=1}"));
}

@Test
Expand Down Expand Up @@ -334,6 +339,23 @@ public void shouldReturnOneStandByWhenActiveAndOtherStandByDown() {
assertThat(result.stream().findFirst().get(), is(standByNode2));
}

@Test
public void shouldGroupKeysByLocation() {
// Given:
getActiveMetadata(SOME_KEY, new HostInfo("host", 123));
getActiveMetadata(SOME_KEY1, new HostInfo("host2", 345));
getActiveMetadata(SOME_KEY2, new HostInfo("host", 123));

// When:
final List<List<Struct>> result = locator.groupByLocation(ImmutableList.of(
SOME_KEY, SOME_KEY1, SOME_KEY2));

// Then:
assertThat(result.size(), is(2));
assertThat(result.get(0), is(ImmutableList.of(SOME_KEY, SOME_KEY2)));
assertThat(result.get(1), is(ImmutableList.of(SOME_KEY1)));
}

@SuppressWarnings("unchecked")
private void getEmtpyMetadata() {
when(kafkaStreams.queryMetadataForKey(any(), any(), any(Serializer.class)))
Expand All @@ -357,6 +379,14 @@ private void getActiveAndStandbyMetadata(final HostInfo activeHostInfo) {
.thenReturn(keyQueryMetadata);
}

@SuppressWarnings("unchecked")
private void getActiveMetadata(final Struct key, final HostInfo activeHostInfo) {
KeyQueryMetadata keyQueryMetadata = mock(KeyQueryMetadata.class);
when(keyQueryMetadata.activeHost()).thenReturn(activeHostInfo);
when(kafkaStreams.queryMetadataForKey(any(), eq(key), any(Serializer.class)))
.thenReturn(keyQueryMetadata);
}

private static URL localHost() {
try {
return new URL("http://somehost:1234");
Expand Down