-
Notifications
You must be signed in to change notification settings - Fork 4.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[BEAM-2660] Set PubsubIO batch size using builder #3619
Conversation
R: @reuvenlax |
Is this PR still relevant? @reuvenlax ? |
I am still interested in merging this pull request if possible. My company still has a use case for beam with tuples exceeding the default byte size. |
any updates on this moving forward? |
We have turned on autoformatting of the codebase, which causes small conflicts across the board. You can probably safely rebase and just keep your changes. Like this:
Please ping me if you run into any difficulty. |
c2abeb9
to
c83e853
Compare
@@ -732,9 +734,20 @@ private PubsubIO() {} | |||
/** Implementation of {@link #write}. */ | |||
@AutoValue | |||
public abstract static class Write<T> extends PTransform<PCollection<T>, PDone> { | |||
private static final int MAX_PUBLISH_BATCH_BYTE_SIZE_DEFAULT = 1000000; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think, more precisely it should be 10mb, or 10 * 1024 * 1024
bytes.
@reuvenlax @jkff could you take a look on this one? |
@aromanenko-dev which runner do you need this for? |
@reuvenlax Hmm, I guess it should work despite of which runner is used. No? |
@aromanenko-dev pubsub is google cloud specific. But this change is not runner specific |
@aromanenko-dev it somewhat is. It turns out that Dataflow has it's own implementation of the PubSub source, and so this PR will not change any behavior for Dataflow - only for non-Dataflow runners. This PR is still a good one I believe, however I want to make sure that you know it will not affect the Dataflow runner. |
Run Dataflow ValidatesRunner |
What? For one, this PR doesn't touch the source, just the sink. Second, if that's the case, how do we get this fixed in the Dataflow runner? I currently have code running in prod that rolls it's own Pubsub client to compensate for this size limitation, and I'd really like to get rid of it. |
@dadrian true of both the source and the sink, at least for Dataflow streaming. Dataflow's batch runner does use this code. I can go ahead and merge this PR. If you have a use case where this is needed for Dataflow streaming you need to contact Google with a bug report. |
Yes, that is why I was wondering how it's related to any specific runner and @reuvenlax explained that it's happened that Dataflow runner has it's own implementation for Pubsub support.
As @dadrian mentioned above, this PR affects only sink part of PubsubIO, not source. To be honest, I don't know if Dataflow runner uses PubsubIO sink or not (not familiar with this part of code), so I can't guarantee this. In general, this LGTM except some raised concerns about Dataflow runner below. If it's tested by |
|
||
if (output.size() >= MAX_PUBLISH_BATCH_SIZE) { | ||
if (payload.length > maxPublishBatchByteSize) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's better to default to success here and simply publish this message. Otherwise this could be considered a backwards-incompatible change, as it changes IO semantics.
I have one change requested on the code to make it backwards compatible, and then I will merge this PR. I just wanted to make sure @cjmcgraw understood that this PR will not affect behavior of Dataflow streaming (in general, runners are allowed to replace IOs with their own internal versions). |
currently my company is using this as a batch for loading prediction tuples in fast batch. We are using this in Dataflow as we speak, and have been since this fork was created. Our use case most likely won't need to be streaming. So the change is effective for my problem. That being said I am not fully groking the issue here. I'd like to get clarity for when/if someone stumbles across this in the future.
If I recall the limitation with the sink was that it was using the gcloud SDK to submit a grpc request. There was a hard coded default of the maximum number of bytes that one bulk request could be. I simply allowed the hard coded value to be dynamic. Since the implementation was in the builder for the sink, I applied the values to both the bounded and unbounded sinks. The source request didn't have a maximum message size API parameter. So it will be enforced by Pubsub instead of Beam. If I am understanding this all correctly. This means that it can be used in both the bounded and unbounded cases. |
@cjmcgraw This PR will work for you then if you are using batch. As I mentioned above I only have one comment - throwing SizeLimitExceededException is backwards-incompatible behavior that might cause existing pipelines to stop working - and I am happy to merge this PR. This change looks perfectly good to me otherwise. |
It's not a backwards incompatible change (assuming the default max size actually matches the max size of a request you can send to Pubsub). The behavior right now is to throw an exception in the Google Cloud SDK internals, rather than from the Beam SDK. The whole point of this PR is to prevent Beam from submitting batches that are larger than the underlying libraries support. |
@dadrian good point. I was having trouble finding Pub/Sub documentation about these limits, but I've validated that these are the limits. I'll go ahead an merge now. |
BEAM-2660 asks for controlling batch size using the
PubsubIO.Write.Builder
This PR adds Two values configurable through the
PubsubIO.Write.Builder
:maxBatchSize
- controls the bulk batch request sizemaxBatchByteSize
- controls the bulk batch bytes request sizeIn this PR I have also made a modification to the
PubsubIO.Write.PubsubBoundedWriter
. Now the writer will dynamically track the number of bytes allocated for all messages. If the number of bytes exceeds the threshold it will publish before adding more messages.If the message size exceeds the
maxBatchByteSize
then an exception will be thrownAn example use case of the new parameter is: