Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Make pull queries streamed asynchronously #6813

Merged

Conversation

AlanConfluent
Copy link
Member

Description

Before this PR, pull queries were gathered in memory as a TableRows object containing a List<List<?>> rows across all of the endpoints where pull queries are exposed.

This change adds the class PullQueryQueue, which is a queue of result rows meant to decouple the producers and consumers of row data, allowing for producer calls that are partially complete to be enqueued across both local and remote sources of data. This largely follows the methodology used by push queries. This allows:

  • Fetches for large batches of data to begin immediately returning results rather than waiting until each batch from each partition is complete before returning the first row.
  • Fetches for large amounts of data being that may be prohibitively expensive or not possible to hold it all in memory using the old methodology, whereas this change keeps relatively few rows in memory at a time and can apply back-pressure to the producers if the queue is filled.
  • Queries like table scans and range queries, which may return many more rows than past pull queries allowed for.

Specifically on each end of the queue:

  • New rows are produced and enqueued by PullPhysicalPlan if the request is being handled locally or HARouting if the request must be forwarded to another node.
  • Rows are consumed by the request thread of the endpoint.

For each of the endpoints:

  • StreamedQueryResource: This uses chunked encoding responses using the class PullQueryStreamWriter which is a StreamingOutput response type. This class periodically reads from the queue and writes a chunk to the response.
  • QueryEndpoint: This uses a KsqlPullQueryHandle with the existing BlockingQueryPublisher to connect a publisher to the queued data.
  • PullQueryPublisher: This uses PullQuerySubscription, a new PollingSubscription rather than the former block of logic that fed raw rows to the subscriber.

Testing done

Ran unit and integration tests. Also manually experimented with batches of rows being streamed back to the user by introducing artificial delays.

Reviewer checklist

  • Ensure docs are updated if necessary. (eg. if a user visible feature is being added or changed).
  • Ensure relevant issues are linked (description should include text like "Fixes #")

@AlanConfluent AlanConfluent requested a review from a team as a code owner December 22, 2020 03:46
Copy link
Contributor

@agavra agavra left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Starting to lose steam, so I'll revisit in a bit. Note to myself, I've reviewed up until PullQueryStreamWriter and will continue later today!

// If the queue has been closed, we stop adding rows and cleanup.
break;
}
pullQueryQueue.acceptRow(rowFactory.apply(row, schema));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

with this asynchronous model, how would we handle operators that require sorting? e.g. SELECT * FROM foo ORDER BY date

One option might be to block at the operator node itself, but I'm not sure what the canonical way of handling this is.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think what you're describing is likely how this would be done. Obviously any operation that cannot be done on a per-row basis complicates streaming since it requires caching many rows from the lower layer before returning even a single one.

Probably the easiest thing to do would be to allow for sorting up to N entries (or M bytes of memory) and if that's hit, throw an error. If there was sufficient interest, we might consider doing something more complex. You can do disk-based sorts if memory is limited. Conventional DBs will try to consider exactly that and I believe try to sort in memory up to a point before spilling to disk and doing a kind of merge-sort from disk.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just throwing out some more ideas here: if a query touches on multiple partitions, we can let the sorting to first happen on each partition itself, and then at the operator we can just do merge-sort based on the returned streaming of sorted rows, which would take const space only.

Even more general, if we allow aggregations in the future, such aggregation push-downs can be applied as well.

@@ -1797,22 +1797,23 @@
"statements": [
"CREATE STREAM INPUT (ID DOUBLE KEY, IGNORED INT) WITH (kafka_topic='test_topic', value_format='JSON');",
"CREATE TABLE AGGREGATE AS SELECT ID, COUNT(1) AS COUNT FROM INPUT WINDOW TUMBLING(SIZE 1 SECOND) GROUP BY ID;",
"SELECT * FROM AGGREGATE WHERE ID IN (10.1, 8.1);",
"SELECT * FROM AGGREGATE WHERE ID IN (10.5, 8.5);",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

out of interest, why did you make this change?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because in floating point representation in the CPU (i.e. binary), .1 cannot be represented exactly: https://www.exploringbinary.com/why-0-point-1-does-not-exist-in-floating-point

Doing a key lookup using equality with such a value creates weird flakey results sometimes: https://floating-point-gui.de/errors/comparison/

I was banging my head against the wall with these tests not returning the row I thought it should until I tried this. Look at the result and you'll see that the existing answer is wrong! 0.5 can be represented exactly in binary. With all this, it probably doesn't make sense to do double key lookups, but I guess we allow it.

User beware!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🙀

Comment on lines 71 to 74
// This allows us to hit the limit without having to queue one last row
if (queue.isEmpty()) {
ctx.runOnContext(v -> sendComplete());
} else {
ctx.runOnContext(v -> doSend());
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm confused. Why is this necessary? Why would we otherwise have to enqueue another row?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With push queries, the only thing that would stop a stream was a limit being hit. The way that the code was set up was a row was enqueued and then if that made it hit the limit, it would fire off the limit handler. Look below in doSend(). Effectively, the only way to trigger sendCompete was to push one more row into the queue. In the pull query case, we don't know when we push a row on the queue if that's the last until we try to fetch another.

To fix this, I could have either changed pull query behavior to match push queries in the way, or made this publisher do a sendComplete without having to enqueue another row, and the latter is simpler and makes a fair amount of sense.


PullQueryExecutionUtil.checkRateLimit(rateLimiter);

final PullQueryResult result = ksqlEngine.executePullQuery(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

previously we were calling this inside the Subscription#request method, now we're doing it before calling the subscriber. I guess that this is intentional because of the new asynchronous nature of pull queries? Just wanted to confirm that I understand correctly (and to confirm that it is OK to do this here in the subscribe method instead of on the first poll call)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good question. I think it was largely done that way so that we could have a big try/catch around it and ensure we called subscriber.onError(e); if we hit an exception. In this case, even though it's kicked off immediately, it still sets result.onException(this::setError); which should allow us to handle errors correctly.

I roughly patterned this off of push queries in PushQueryPublisher and they also start the query immediately, so I don't think there should be a big difference so far as I can tell.

Copy link
Contributor

@agavra agavra left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few more comments, all that's left now are the tests :) I'll get to them tomorrow morning

writeRow(toProcess, head, sb);
if (sb.length() >= FLUSH_SIZE_BYTES || (clock.millis() - lastFlush) >= MAX_FLUSH_MS) {
output.write(sb.toString().getBytes(StandardCharsets.UTF_8));
output.flush();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering how this works from a client perspective. It looks like we'd be flushing incomplete JSON, right? Is that what was happening previously for pull queries? Would clients know how to handle that properly?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct, we flush incomplete JSON. This is how push queries work at the moment, so I just copied that behavior (Check out QueryStreamWriter). If you look in this PR at KsqlTarget, you'll see how we do this for partial results for forwarded requests. I assume that the CLI does something similar.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For my own clarification: were you referring to the JSON format of the HTTP response, or the JSON format of a single record?

}

@Override
public void write(final OutputStream output) {
Copy link
Contributor

@agavra agavra Jan 14, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there's a lot of different levels going on and wrapping my head around passing around different heads/string buffers and flushing in the middle (etc...) is a struggle. At a minimum, I think we can do with inline comments and javadocing (including the private methods), though I urge you to take another stab and see if you can refactor this to make it cleaner and reduce the cyclomatic complexity (this is one of those times where I think checkstyle got it right, it needs improvement).

There's a lot of complexity in creating proper JSON (commas in the right places, the header row being first, etc...). I wonder if there's a library that we can use to make that simpler.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a ton of comments and simplified the methods. Hopefully you agree it's easier to understand.

I am using objectMapper.writeValueAsString to do the heavy lifting of json serialization. The comma handling is a bit of a pain that I think I have to handle myself. The reason is just due to the partial flushing. I can't rely on a json library to write partial json (or at least I don't really know of any that do). This is actually exactly what it does for push queries. I'll take a look and see if any internet searching turns up any ideas.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💯 this is way cleaner and easier for me to reason about. Thanks for the heavy refactoring!

Copy link
Contributor

@agavra agavra left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well... morning quickly turned to afternoon and then evening but I finally got around to the tests for this PR.

}
times[0]++;
return null;
}).when(pullQueryQueue).drainTo(any());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if it makes sense to have a TestPullQueryQueue because I see this pattern come up in the tests quite often (of mocking what gets returned on which calls). It'll make things a little bit easier to write tests going forward

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I played a bit with trying to create one, and it's a little hard to do given how I'm using it in tests. In some cases, a real queue is fine, which I create in some tests. In others, I want to verify that the queue was used in a certain way, and a mock is required. In some others, I want to test surrounding logic and call various callbacks, which I often do from queue "answer" methods to simulate things happening while waiting for new rows. This could be done with a test queue, and methods like completeAfterEmpty which would implicitly know to call the completion method. I'm not too sure how reusable given that the completion scenario is different in different tests. What do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I trust that if you played around with it and it wasn't clear then it would probably require more work to retrofit it than it's worth. I'm happy leaving it as is

@agavra
Copy link
Contributor

agavra commented Jan 15, 2021

Overall, the PR seems like a huge improvement. I have some questions inline (amongst a less-important small army of nits) that I'd like addressed before giving the green light. I'll mark those 3/4 comments with the 🚀 emoji so that we can focus on the important ones

Copy link
Member Author

@AlanConfluent AlanConfluent left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@agavra Followed up on some of your comments. Still working through them.

// If the queue has been closed, we stop adding rows and cleanup.
break;
}
pullQueryQueue.acceptRow(rowFactory.apply(row, schema));
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think what you're describing is likely how this would be done. Obviously any operation that cannot be done on a per-row basis complicates streaming since it requires caching many rows from the lower layer before returning even a single one.

Probably the easiest thing to do would be to allow for sorting up to N entries (or M bytes of memory) and if that's hit, throw an error. If there was sufficient interest, we might consider doing something more complex. You can do disk-based sorts if memory is limited. Conventional DBs will try to consider exactly that and I believe try to sort in memory up to a point before spilling to disk and doing a kind of merge-sort from disk.

@@ -1797,22 +1797,23 @@
"statements": [
"CREATE STREAM INPUT (ID DOUBLE KEY, IGNORED INT) WITH (kafka_topic='test_topic', value_format='JSON');",
"CREATE TABLE AGGREGATE AS SELECT ID, COUNT(1) AS COUNT FROM INPUT WINDOW TUMBLING(SIZE 1 SECOND) GROUP BY ID;",
"SELECT * FROM AGGREGATE WHERE ID IN (10.1, 8.1);",
"SELECT * FROM AGGREGATE WHERE ID IN (10.5, 8.5);",
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because in floating point representation in the CPU (i.e. binary), .1 cannot be represented exactly: https://www.exploringbinary.com/why-0-point-1-does-not-exist-in-floating-point

Doing a key lookup using equality with such a value creates weird flakey results sometimes: https://floating-point-gui.de/errors/comparison/

I was banging my head against the wall with these tests not returning the row I thought it should until I tried this. Look at the result and you'll see that the existing answer is wrong! 0.5 can be represented exactly in binary. With all this, it probably doesn't make sense to do double key lookups, but I guess we allow it.

User beware!

Copy link
Member Author

@AlanConfluent AlanConfluent left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@agavra I believe I've addressed your comments. PTAL

Comment on lines 71 to 74
// This allows us to hit the limit without having to queue one last row
if (queue.isEmpty()) {
ctx.runOnContext(v -> sendComplete());
} else {
ctx.runOnContext(v -> doSend());
}
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With push queries, the only thing that would stop a stream was a limit being hit. The way that the code was set up was a row was enqueued and then if that made it hit the limit, it would fire off the limit handler. Look below in doSend(). Effectively, the only way to trigger sendCompete was to push one more row into the queue. In the pull query case, we don't know when we push a row on the queue if that's the last until we try to fetch another.

To fix this, I could have either changed pull query behavior to match push queries in the way, or made this publisher do a sendComplete without having to enqueue another row, and the latter is simpler and makes a fair amount of sense.


PullQueryExecutionUtil.checkRateLimit(rateLimiter);

final PullQueryResult result = ksqlEngine.executePullQuery(
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good question. I think it was largely done that way so that we could have a big try/catch around it and ensure we called subscriber.onError(e); if we hit an exception. In this case, even though it's kicked off immediately, it still sets result.onException(this::setError); which should allow us to handle errors correctly.

I roughly patterned this off of push queries in PushQueryPublisher and they also start the query immediately, so I don't think there should be a big difference so far as I can tell.

}

@Override
public void write(final OutputStream output) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a ton of comments and simplified the methods. Hopefully you agree it's easier to understand.

I am using objectMapper.writeValueAsString to do the heavy lifting of json serialization. The comma handling is a bit of a pain that I think I have to handle myself. The reason is just due to the partial flushing. I can't rely on a json library to write partial json (or at least I don't really know of any that do). This is actually exactly what it does for push queries. I'll take a look and see if any internet searching turns up any ideas.

writeRow(toProcess, head, sb);
if (sb.length() >= FLUSH_SIZE_BYTES || (clock.millis() - lastFlush) >= MAX_FLUSH_MS) {
output.write(sb.toString().getBytes(StandardCharsets.UTF_8));
output.flush();
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct, we flush incomplete JSON. This is how push queries work at the moment, so I just copied that behavior (Check out QueryStreamWriter). If you look in this PR at KsqlTarget, you'll see how we do this for partial results for forwarded requests. I assume that the CLI does something similar.

}
times[0]++;
return null;
}).when(pullQueryQueue).drainTo(any());
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I played a bit with trying to create one, and it's a little hard to do given how I'm using it in tests. In some cases, a real queue is fine, which I create in some tests. In others, I want to verify that the queue was used in a certain way, and a mock is required. In some others, I want to test surrounding logic and call various callbacks, which I often do from queue "answer" methods to simulate things happening while waiting for new rows. This could be done with a test queue, and methods like completeAfterEmpty which would implicitly know to call the completion method. I'm not too sure how reusable given that the completion scenario is different in different tests. What do you think?

Copy link
Contributor

@agavra agavra left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @AlanConfluent! Big step forward. I'm still having a little trouble wrapping my head around the limit handler/reached end code, but I don't want to block this PR on that as you've clearly thought that out and the longer it sits the more there's a chance for nasty merge conflicts. I'll dig into that offline 🙂

final boolean hasAnotherRow
) {
// Send for a comma after the header
if (!sentAtLeastOneRow) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this can be simplified by always prepending ,\n when we write a row that's not the header (as opposed to only in the case of the header). then when we're done, just append \n] (so we don't need to check hasAnotherRow)

Copy link
Member Author

@AlanConfluent AlanConfluent Jan 22, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're actually right and that would honestly simplify my code a bit. The only issue with this is that the code parsing this (and specifically partial responses, chunks) in both KsqlTarget and the CLI looks for "\n" and also assumes if there's a "," it's on the end. I can potentially fix this, but I don't want to make a big change at this point to the PR. Also, this is consistent with push queries at the moment. I'd rather do this as a followup, if we want to go that route.

// The head becomes the next thing to process and the newly polled row becomes the head.
final PullQueryRow toProcess = head;
head = row;
return toProcess;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i guess your algorithm above handles this gracefully, but the first time you call pollNextRow won't it always be null? might just want to add a little comment there as well so that the next person doesn't scratch their head 😄

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that's right. I'll add a comment.

}

@Override
public void write(final OutputStream output) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💯 this is way cleaner and easier for me to reason about. Thanks for the heavy refactoring!

@AlanConfluent AlanConfluent merged commit b69e3f8 into confluentinc:master Jan 28, 2021
@agavra agavra mentioned this pull request May 27, 2021
2 tasks
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants