Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-2660] Set PubsubIO batch size using builder #3619

Merged
merged 5 commits into from
Aug 9, 2018

Conversation

cjmcgraw
Copy link

BEAM-2660 asks for controlling batch size using the PubsubIO.Write.Builder

This PR adds Two values configurable through the PubsubIO.Write.Builder:

  • maxBatchSize - controls the bulk batch request size
  • maxBatchByteSize - controls the bulk batch bytes request size

In this PR I have also made a modification to the PubsubIO.Write.PubsubBoundedWriter. Now the writer will dynamically track the number of bytes allocated for all messages. If the number of bytes exceeds the threshold it will publish before adding more messages.

If the message size exceeds the maxBatchByteSize then an exception will be thrown

An example use case of the new parameter is:

PubsubIO.writeMessages()
    .withMaxBatchSize(100)
    .withMaxBatchByteSize(100000)
   .to("my-topic")

@reuvenlax
Copy link
Contributor

R: @reuvenlax

@coveralls
Copy link

Coverage Status

Coverage decreased (-0.6%) to 69.968% when pulling c2abeb9 on cjmcgraw:update-pubsubIO into f398748 on apache:master.

@jkff
Copy link
Contributor

jkff commented Dec 16, 2017

Is this PR still relevant? @reuvenlax ?

@cjmcgraw
Copy link
Author

I am still interested in merging this pull request if possible. My company still has a use case for beam with tuples exceeding the default byte size.

@BenFradet
Copy link

any updates on this moving forward?

@kennknowles
Copy link
Member

We have turned on autoformatting of the codebase, which causes small conflicts across the board. You can probably safely rebase and just keep your changes. Like this:

$ git rebase
... see some conflicts
$ git diff
... confirmed that the conflicts are just autoformatting
... so we can just keep our changes are do our own autoformat
$ git checkout --theirs --
$ git add -u
$ git rebase --continue
$ ./gradlew spotlessJavaApply

Please ping me if you run into any difficulty.

@@ -732,9 +734,20 @@ private PubsubIO() {}
/** Implementation of {@link #write}. */
@AutoValue
public abstract static class Write<T> extends PTransform<PCollection<T>, PDone> {
private static final int MAX_PUBLISH_BATCH_BYTE_SIZE_DEFAULT = 1000000;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think, more precisely it should be 10mb, or 10 * 1024 * 1024 bytes.

@aromanenko-dev
Copy link
Contributor

@reuvenlax @jkff could you take a look on this one?
This PR seems good for me and I wished to have this merged. I updated this PR to resolve merging conflicts.

@reuvenlax
Copy link
Contributor

@aromanenko-dev which runner do you need this for?

@aromanenko-dev
Copy link
Contributor

aromanenko-dev commented Jul 19, 2018

@reuvenlax Hmm, I guess it should work despite of which runner is used. No?

@cjmcgraw
Copy link
Author

@aromanenko-dev pubsub is google cloud specific. But this change is not runner specific

@reuvenlax
Copy link
Contributor

@aromanenko-dev it somewhat is. It turns out that Dataflow has it's own implementation of the PubSub source, and so this PR will not change any behavior for Dataflow - only for non-Dataflow runners. This PR is still a good one I believe, however I want to make sure that you know it will not affect the Dataflow runner.

@aromanenko-dev
Copy link
Contributor

Run Dataflow ValidatesRunner

@dadrian
Copy link

dadrian commented Aug 7, 2018

It turns out that Dataflow has it's own implementation of the PubSub source

What? For one, this PR doesn't touch the source, just the sink. Second, if that's the case, how do we get this fixed in the Dataflow runner? I currently have code running in prod that rolls it's own Pubsub client to compensate for this size limitation, and I'd really like to get rid of it.

@reuvenlax
Copy link
Contributor

@dadrian true of both the source and the sink, at least for Dataflow streaming. Dataflow's batch runner does use this code.

I can go ahead and merge this PR. If you have a use case where this is needed for Dataflow streaming you need to contact Google with a bug report.

@aromanenko-dev
Copy link
Contributor

aromanenko-dev commented Aug 8, 2018

@cjmcgraw

pubsub is google cloud specific. But this change is not runner specific

Yes, that is why I was wondering how it's related to any specific runner and @reuvenlax explained that it's happened that Dataflow runner has it's own implementation for Pubsub support.

@reuvenlax

however I want to make sure that you know it will not affect the Dataflow runner.

As @dadrian mentioned above, this PR affects only sink part of PubsubIO, not source. To be honest, I don't know if Dataflow runner uses PubsubIO sink or not (not familiar with this part of code), so I can't guarantee this.
Do you think that running Dataflow_ValidatesRunner job is not enough? Do we need to run any other tests for that?

In general, this LGTM except some raised concerns about Dataflow runner below. If it's tested by Dataflow_ValidatesRunner job (and it was passed) then I'd like to have this merged.


if (output.size() >= MAX_PUBLISH_BATCH_SIZE) {
if (payload.length > maxPublishBatchByteSize) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's better to default to success here and simply publish this message. Otherwise this could be considered a backwards-incompatible change, as it changes IO semantics.

@reuvenlax
Copy link
Contributor

I have one change requested on the code to make it backwards compatible, and then I will merge this PR. I just wanted to make sure @cjmcgraw understood that this PR will not affect behavior of Dataflow streaming (in general, runners are allowed to replace IOs with their own internal versions).

@cjmcgraw
Copy link
Author

cjmcgraw commented Aug 8, 2018

currently my company is using this as a batch for loading prediction tuples in fast batch. We are using this in Dataflow as we speak, and have been since this fork was created. Our use case most likely won't need to be streaming. So the change is effective for my problem.

That being said I am not fully groking the issue here. I'd like to get clarity for when/if someone stumbles across this in the future.

@dadrian

What? For one, this PR doesn't touch the source, just the sink. Second, if that's the case, how do we get this fixed in the Dataflow runner? I currently have code running in prod that rolls it's own Pubsub client to compensate for this size limitation, and I'd really like to get rid of it.

@reuvenlax

@dadrian true of both the source and the sink, at least for Dataflow streaming. Dataflow's batch runner does use this code.

@aromanenko-dev

Yes, that is why I was wondering how it's related to any specific runner and @reuvenlax explained that it's happened that Dataflow runner has it's own implementation for Pubsub support.

If I recall the limitation with the sink was that it was using the gcloud SDK to submit a grpc request. There was a hard coded default of the maximum number of bytes that one bulk request could be. I simply allowed the hard coded value to be dynamic.

Since the implementation was in the builder for the sink, I applied the values to both the bounded and unbounded sinks.

The source request didn't have a maximum message size API parameter. So it will be enforced by Pubsub instead of Beam.

If I am understanding this all correctly. This means that it can be used in both the bounded and unbounded cases.

@reuvenlax
Copy link
Contributor

@cjmcgraw This PR will work for you then if you are using batch. As I mentioned above I only have one comment - throwing SizeLimitExceededException is backwards-incompatible behavior that might cause existing pipelines to stop working - and I am happy to merge this PR. This change looks perfectly good to me otherwise.

@dadrian
Copy link

dadrian commented Aug 9, 2018

It's not a backwards incompatible change (assuming the default max size actually matches the max size of a request you can send to Pubsub). The behavior right now is to throw an exception in the Google Cloud SDK internals, rather than from the Beam SDK.

The whole point of this PR is to prevent Beam from submitting batches that are larger than the underlying libraries support.

@reuvenlax
Copy link
Contributor

@dadrian good point. I was having trouble finding Pub/Sub documentation about these limits, but I've validated that these are the limits. I'll go ahead an merge now.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

8 participants