Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Ensures BaseSubscriber.makeRequest is called on context in PollableSubscriber #7212

Merged

Conversation

AlanConfluent
Copy link
Member

@AlanConfluent AlanConfluent commented Mar 12, 2021

Description

Fixes a small bug where BaseSubscriber.makeRequest can be called from outside the Vertx context, which should be disallowed.

makeRequest is called from checkRequestTokens and there are currently a few paths into checkRequestTokens:

  • One is afterSubscribe which is called from the context
  • The other is poll which is inherently blocking and shouldn't be called from the context.

For this reason, this PR exposes runOnRightContext and invokes makeRequest using that.

Fixes #7115

Testing done

Ran tests and manually exercised this path by fetching more than the batch size.

Reviewer checklist

  • Ensure docs are updated if necessary. (eg. if a user visible feature is being added or changed).
  • Ensure relevant issues are linked (description should include text like "Fixes #")

@AlanConfluent AlanConfluent requested a review from a team as a code owner March 12, 2021 00:04
Copy link
Contributor

@agavra agavra left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @AlanConfluent - I'm always impressed with how fast you can get to the root of an issue!

It LGTM (well, as much as I understand this code - which is not much at all, the Vert.X context and threading model always confuses me...), but I think we should add some test coverage to make sure #7115 doesn't come back.

@@ -112,7 +112,7 @@ synchronized boolean isClosed() {
private void checkRequestTokens() {
if (tokens == 0) {
tokens += REQUEST_BATCH_SIZE;
makeRequest(REQUEST_BATCH_SIZE);
runOnRightContext(() -> makeRequest(REQUEST_BATCH_SIZE));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fascinating. so without this we always hit the batch limit and then stopped? can we add some tests to make sure we can exceed the BATCH_SIZE - this bug is a little embarrassing! 😳

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added unit tests for this case and verified it fails without my change.

@@ -147,7 +147,7 @@ protected final void checkContext() {
VertxUtils.checkContext(context);
}

private void runOnRightContext(final Runnable runnable) {
protected void runOnRightContext(final Runnable runnable) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a little confused, the first line in makeRequest checks the context. if that doesn't ensure our property, then what does it do?

is there any reason not to just always call makeRequest wrapped in a runOnRightContext? If that's the case could we keep this private and just make makeRequest do that?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a little confused, the first line in makeRequest checks the context. if that doesn't ensure our property, then what does it do?

The call to checkContext just asserts that we're on the context and is the basis of the exceptions we've seen because we're not in fact on it in some call paths.

Copy link
Contributor

@vcrfxia vcrfxia left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @AlanConfluent thanks for tracking down this bug! I'm +1 to Almog's comment on adding testing to prevent future regressions, and also left a comment inline regarding the fix.

@@ -147,7 +147,7 @@ protected final void checkContext() {
VertxUtils.checkContext(context);
}

private void runOnRightContext(final Runnable runnable) {
protected void runOnRightContext(final Runnable runnable) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we should expose this method. Let's just update the problematic line in PollableSubscriber#poll() to call context.runOnContext(v -> checkRequestTokens()); instead of checkRequestTokens().

The purpose of having strict context checks is to ensure that implementers of BaseSubscriber are cognizant of which methods are being run where. When additional, user-facing methods outside the public methods of BaseSubscriber are exposed, it's preferable to have the implementer intentionally use the correct context, rather than throwing a blanket of "use this context if needed" at the method. If you're curious, there's precedent here:

context.runOnContext(v -> handleResult(result));

Copy link
Member Author

@AlanConfluent AlanConfluent Mar 12, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we should expose this method. Let's just update the problematic line in PollableSubscriber#poll() to call context.runOnContext(v -> checkRequestTokens()); instead of checkRequestTokens().

Ok, sounds reasonable. Rather than use synchonization with token and use an AtomicInteger, which you'd have to do if it was updated in the context, I just changed this within checkRequestTokens to:

context.runOnContext(v -> makeRequest(REQUEST_BATCH_SIZE));

This fits the invariant you describe about makeRequest being called on context as it is everywhere else. Or do all non-public methods generally get called on the context?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or do all non-public methods generally get called on the context?

Are you asking whether the entire poll() call should be run on the context? If so, the answer is no. Blocking operations are not meant to be performed on event loops.

Rather than use synchonization with token and use an AtomicInteger, which you'd have to do if it was updated in the context, I just changed this within checkRequestTokens to: context.runOnContext(v -> makeRequest(REQUEST_BATCH_SIZE));

My only potential concern with this is that because checkRequestTokens() is called from afterSubscribe() which is already wrapped in a call to context.runOnContext(...);, we now have two layers of wrapping in this case. I think this is fine but it'd be good to double-check. (I hope we have automated testing that would fail if this was disallowed/blocking!)

Copy link
Member Author

@AlanConfluent AlanConfluent Mar 12, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you asking whether the entire poll() call should be run on the context? If so, the answer is no. Blocking operations are not meant to be performed on event loops.

I definitely understand that that's not allowed on the context. Poll is a public method, so I wasn't meaning to include that. I was more generally asking about the convention about when to ensure code is running on the context vs potentially some other thread. "Do the onramps to the context only happen at a few well defined public API functions in these publishers and subscribers?" was what I meant.

My only potential concern with this is that because checkRequestTokens() is called from afterSubscribe() which is already wrapped in a call to context.runOnContext(...);, we now have two layers of wrapping in this case. I think this is fine but it'd be good to double-check. (I hope we have automated testing that would fail if this was disallowed/blocking!)

Ah, you're right. Didn't realize that about afterSubscribe. Ok, tokens was already being accessed by multiple threads, so I just made it an AtomicInteger and took your original suggestion.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was more generally asking about the convention about when to ensure code is running on the context vs potentially some other thread. "Do the onramps to the context only happen at a few well defined public API functions in these publishers and subscribers?" was what I meant.

In general or in how the ksql code in particular is setup? I don't know the general answer but the ksql setup makes sense to me: BaseSubscriber ensures all the subscriber methods are run on the correct context, since otherwise all subscriber implementations would have to do that separately. For all other methods (not included in BaseSubscriber), the subscriber implementation must ensure the relevant calls are run on the correct context, while minimizing the code that is run on the context to what must be run on the context, in order to not consume event loop resources unnecessarily.

Not sure if I actually answered your question. LMK if I'm still misunderstanding.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that answers my question. I was kind of wanting to know if in our codebase we had some kind of philosophy since reasoning at a glance about whether some method in one of these classes is run on or off the context is hard. In this case, it seems like BaseSubscriber handles most of that, which makes it easier. Looking through the code, PollableSubscriber seems like a bit of an exception since it provides another public method that we know is run off the context, whereas all of these other uses are all async by nature.

@AlanConfluent
Copy link
Member Author

Hey @AlanConfluent thanks for tracking down this bug! I'm +1 to Almog's comment on adding testing to prevent future regressions, and also left a comment inline regarding the fix.

Added some tests that trigger the bug.

Copy link
Contributor

@agavra agavra left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:) LGTM!

publisher.subscribe(pollableSubscriber);

Row row = pollableSubscriber.poll(POLL_DURATION);
for (int i = 0; row != null; i++) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This does not actually verify we received the expected number of rows?

Copy link
Member Author

@AlanConfluent AlanConfluent Mar 12, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I forgot to check the index at the end. Pulled it out so I can check it against numRows after the loop.

Copy link
Contributor

@vcrfxia vcrfxia left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @AlanConfluent ! LGTM.

@AlanConfluent AlanConfluent merged commit 31127b2 into confluentinc:master Mar 13, 2021
AlanConfluent added a commit that referenced this pull request Mar 13, 2021
…bleSubscriber (#7212)

* fix: Ensures BaseSubscriber.makeRequest is called on context
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

ksqldb-api-client: Cannot retrieve more than 100 rows in pull/push queries using StreamedQueryResult
3 participants