Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add support for IN clause to pull queries #6409

Merged
merged 18 commits into from
Oct 22, 2020

Conversation

AlanConfluent
Copy link
Member

@AlanConfluent AlanConfluent commented Oct 12, 2020

Description

Adds support for the IN clause to pull queries such that multiple keys can be fetched at once. This can be used in conjunction with all of the existing window comparisions (e.g. windowstart, windowend).

An example of such a query is:

CREATE STREAM INPUT (ID STRING KEY, IGNORED INT) WITH (kafka_topic='test_topic', value_format='JSON');
CREATE TABLE AGGREGATE AS SELECT ID, COUNT(1) AS COUNT FROM INPUT GROUP BY ID;

SELECT * FROM AGGREGATE WHERE ID IN ('12345', '39874', '45450');

And you might get a response like this:

+----------------------------------+----------------+
|ID                                |COUNT           |
+----------------------------------+----------------+
|12345                             |8               |
|45450                             |10              |

Fixes #6115

Testing done

Ran unit tests and rest query validation tests.

Reviewer checklist

  • Ensure docs are updated if necessary. (eg. if a user visible feature is being added or changed).
  • Ensure relevant issues are linked (description should include text like "Fixes #")

@AlanConfluent AlanConfluent requested a review from a team as a code owner October 12, 2020 21:29
@AlanConfluent AlanConfluent changed the title Pull query in clause feat: Add support for IN clause to pull query Oct 12, 2020
final List<List<?>> tableRows = new ArrayList<>();
final List<LogicalSchema> schemas = new ArrayList<>();
final List<Future<PullQueryResult>> futures = new ArrayList<>();
final List<List<Struct>> keysByLocation = mat.locator().groupByLocation(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why don't you return besides they keys also the active, standby per group? This way you wouldn't need to do locate twice basically.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, I think currently the routing is wrong and you have to return the <active,standby> per group

Copy link
Member Author

@AlanConfluent AlanConfluent Oct 13, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I had thought that a given set of partitions were grouped together at each active and standby, but I think you're right this isn't the case. I'll change it to groupByActiveStandyList or something similar. In practice, there aren't too many standbys, so this is likely to be a lot better than grouping by partition or just fetching by key.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I reworked this so that it now groups key together only if they share the same list of nodes, including active and all standbys. Most of the time if there's 1 or 2 standbys and lots of keys fetched, this will hopefully reduce unnecessary calls.

WINDOWSTART,
WINDOWEND
}

private static Map<ComparisonTarget, List<ComparisonExpression>> extractComparisons(
private static class KeyAndWindowBounds {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the benefit of having this class versus adding one more entry in the ComparisonTarget? Seems a lot of work for achieving the same functionality?

Copy link
Member Author

@AlanConfluent AlanConfluent Oct 13, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

InPredicate isn't a ComparisonExpression. Here, I can have a different type for each target type.

Alternatively, I could have change the map to point to an expression and casted back to the subtype, but that's not a great practice.

pullQueryMetrics);
}
for (Future<PullQueryResult> future : futures) {
final PullQueryResult result = future.get();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't there be some error handling here? What happens if a thread dies due to an uncaught exception? We would like to fail the entire query and not return partial results, right? Maybe handle something like this:

        try {
            future.get();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        } catch (ExecutionException e) {
            // Extract the actual exception from its wrapper
            Throwable t = e.getCause();
            System.err.println("Uncaught exception is detected! " + t
                    + " st: " + Arrays.toString(t.getStackTrace()));
            // ... Handle the exception
        }

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I agree that we don't want partial results, so I wouldn't want to catch each individual future.

Also, calling interrupt is what causes the InterruptedException and it sends that exception to other threads. To have the current thread have it thrown, all we have to do is not catch it.

I pretty much just let the existing handler catch Exception in the wider scope. I added a small special case to unwrap ExecutionException so we get the same error message.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

calling interrupt is what causes the InterruptedException

I think you probably meant this, but just a small clarification here - Thread.currentThread().interrupt() does not directly cause an InterruptedException. It just sets an interrupt flag on the thread's state, and then any blocking code is expected to check that flag and raise an InterruptedException when it notices that flag. There is no guarantee that calling interupt will actually do anything (especially if there's misbehaving client code). It's a cooperative strategy.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, you're right that a thread may have to explicitly check for being interrupted if it's doing a lot of computation. If it's sleeping in some manner, there's a good chance that it will be thrown for the thread, e.g. Thread.sleep.

@vpapavas
Copy link
Member

We are missing testing for the case where keys belong to different partitions, where there are partitions that are on the same active host but different standbys. I think this must be done manually, I don't think there is any automatic test currently for multiple partitions per host, right?

@AlanConfluent
Copy link
Member Author

We are missing testing for the case where keys belong to different partitions, where there are partitions that are on the same active host but different standbys. I think this must be done manually, I don't think there is any automatic test currently for multiple partitions per host, right?

@vpapavas Yeah, there's currently no test of this sort with multiple partitions where we also have failures (since those are required to resort to different standbys). Muckrake tests actually might do the job, but I'll also try to test this manually.

I addressed all of your above comments and reworked the code to not only find the location once, but also do it based on the whole list of active and standbys, so this should cover the different partitions case.

@AlanConfluent AlanConfluent changed the title feat: Add support for IN clause to pull query feat: Add support for IN clause to pull queries Oct 13, 2020
Copy link
Contributor

@agavra agavra left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @AlanConfluent! This feature is going to be a huge improvement over the existing functionality. I'm mostly aligned with the approach, but some implementation concerns inline.

case FLOAT64:
return new DoubleLiteral((double) value);
default:
throw new KsqlException("Unknown key type " + schema.type());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

very soon we're going to support all expression types as keys (including structs) and we already support DECIMAL keys. I'm not entirely sure that this approach will work then since it's not always possible to take a java object and convert it into an expression as the inverse conversion is lossy. Is it possible to extract the original expression when we construct the List<Struct> keys instead of mapping it to Java objects?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As we mentioned elsewhere, I entirely got rid of this rewriter and instead use partitions to ensure I only read the correct data rather than the keys only. I also added partition to the read calls on streams so that it enforces this both for this change and when range queries are soon implemented.

routingOptions
);
}
for (Future<PullQueryResult> future : futures) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there any requirement that the pull query result is reproducible? it seems like we add rows to the result based on the order of the nodes that we request, but those nodes can host different partitions at different times. I think it's reasonable to not guarantee ordering, but food for thought (and probably should be documented)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, you're right that there are no guarantees about ordering. I'll document that in the code and consider documenting elsewhere as well.

pullQueryMetrics);
}
for (Future<PullQueryResult> future : futures) {
final PullQueryResult result = future.get();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

calling interrupt is what causes the InterruptedException

I think you probably meant this, but just a small clarification here - Thread.currentThread().interrupt() does not directly cause an InterruptedException. It just sets an interrupt flag on the thread's state, and then any blocking code is expected to check that flag and raise an InterruptedException when it notices that flag. There is no guarantee that calling interupt will actually do anything (especially if there's misbehaving client code). It's a cooperative strategy.

return new PullQueryResult(
routeQuery(node, statement, executionContext, serviceContext, pullQueryContext),
debugNode);
final List<Optional<KsqlNode>> debugNodes = rows.getRows().stream()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: probably makes sense to make this Optional<List<KsqlNode>> instead of List<Optional<KsqlNode>> to prevent creating a list of empty optionals in the non-debug case. And then you can also use Collections#nCopies to make it a little more readable

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had been thinking that maybe this was necessary because different machines can have different configs, but I now remember that this is a request config, so it should always work. Ok, made it Optional<List<KsqlNode>>

Comment on lines 444 to 445
// Rewrite the expression to only query for the particular keys we care about for this node.
// Otherwise, we'll risk reading standby data for other partitions.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm worried this approach doesn't scale well for things other than IN queries (not to mention that it feels hacky). Instead, it probably makes sense to have a way to specify that a pull query (internally routed only) should only read from certain partitions. Otherwise, how would we handle things like range queries? I can't "rewrite" the range query and avoid the risk of reading standby data.

Generally, I think it might make sense to think about communicating pull queries internally using something other than just the SQL statement.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that's fair. We actually were discussing range queries as well and it's probably easier to use a unified approach. I think that it should be sufficient to specify partition when reading from the table (to ensure you're only reading from the actives and standbys you intend) and also on the second hop, avoid reading keys that are not hosted on this machine or that don't agree with those specified partitions. That same approach will work on range queries as well.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, implemented that other approach where partition is specified. No rewriting necessary.

Copy link
Contributor

@big-andy-coates big-andy-coates left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Haven't looked in depth at the code - just scanned the test cases. They LGTM!

Comment on lines +1663 to +1674
"name": "non-windowed IN lookup on wrong type",
"statements": [
"CREATE STREAM INPUT (ID INT KEY, IGNORED INT) WITH (kafka_topic='test_topic', value_format='JSON');",
"CREATE TABLE AGGREGATE AS SELECT ID, COUNT(1) AS COUNT FROM INPUT GROUP BY ID;",
"SELECT * FROM AGGREGATE WHERE ID IN ('10', 8);"
],
"expectedError": {
"type": "io.confluent.ksql.rest.entity.KsqlStatementErrorMessage",
"message": "'10' can not be converted to the type of the key column: ID INTEGER KEY",
"status": 400
}
},
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's normal for the IN operator to support some level of coercion of types. I see you've handled INT -> BIGINT. But, strangely, it's also normal to support STRING -> . Of course, we don't have to support this. But those used to sql may expect it...

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting. I find it a bit funny that a IN (v1) isn't just syntactic sugar for a = v1. I may add that as a followup to try to keep from adding much more to this PR.

// If one of the partitions required is out of nodes, then we cannot continue.
if (round >= location.getNodes().size()) {
throw new MaterializationException(String.format(
"Unable to execute pull query: %s", statement.getStatementText()));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider adding a more descriptive error message so that we know at which point in the code the query failed and why.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, changed it to "Unable to execute pull query: %s. Exhausted standby hosts to try."

"Unable to execute pull query: %s", statement.getStatementText()));
}

private static Map<KsqlNode, List<KsqlLocation>> groupByHost(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What this method does, is group all locations per the same host, which if round=0, will be the active. So, all locations (all keys) that have the same host as active will we grouped together. Then, in the second round, for any keys that the active failed, we will get the standby that is second in ordering.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's correct. I added this example to make it clearer:

    // For example, locations might be:
    // [ Partition 0 <Host 1, Host 2>,
    //   Partition 1 <Host 2, Host 1>,
    //   Partition 2 <Host 1, Host 2> ]
    // In Round 0, fetch from Host 1: [Partition 0, Partition 2], from Host 2: [Partition 1]
    // If everything succeeds, we're done.  If Host 1 failed, then we'd have a Round 1:
    // In Round 1, fetch from Host 2: [Partition 0, Partition 2].

Copy link
Member

@vpapavas vpapavas left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Awesome job @AlanConfluent ! LGTM!

@AlanConfluent AlanConfluent merged commit d5fc365 into confluentinc:master Oct 22, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Pull Queries: Multiple key lookup using IN keyword in WHERE clause
4 participants