Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding support for Beam Schema Rows with BQ DIRECT_READ #22926

Merged
merged 5 commits into from
Sep 2, 2022

Conversation

pabloem
Copy link
Member

@pabloem pabloem commented Aug 26, 2022

Please add a meaningful description for your change here


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Choose reviewer(s) and mention them in a comment (R: @username).
  • Mention the appropriate issue in your description (for example: addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment fixes #<ISSUE NUMBER> instead.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests
Go tests

See CI.md for more information about GitHub Actions CI.

@pabloem
Copy link
Member Author

pabloem commented Aug 27, 2022

Run Java PostCommit

@pabloem
Copy link
Member Author

pabloem commented Aug 27, 2022

r: @svetakvsundhar
r: @TheNeuralBit

This adds only schema support for table reads, not for query reads but it may not be difficult to add it for query reads.

Copy link
Contributor

@svetakvsundhar svetakvsundhar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, this looks helpful, will wait for @TheNeuralBit 's feedback :)

Because you are defining fromBeamRow, is this also looking to be supported in WriteToBQ?


beamSchema = Schema.builder().addFields(flds).build();
SerializableFunction<T, Row> toBeamRow = getToBeamRowFn().apply(beamSchema);
SerializableFunction<Row, T> fromBeamRow = getFromBeamRowFn().apply(beamSchema);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is the purpose of this here when we wan't to convert something from a BeamRow type into another type? (e.g. the WriteToBQ case)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes right! It relies on BQUtils to generate a converter function.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We already have this logic in BigQueryIO.expand (only triggered in the non direct read path). I wonder if we can refactor and re-use that path for both paths? Similar to what Svetak did in Python.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done!

@pabloem pabloem changed the title [WIP] Adding support for Beam Schema Rows with BQ DIRECT_READ Adding support for Beam Schema Rows with BQ DIRECT_READ Aug 28, 2022
@github-actions
Copy link
Contributor

Assigning reviewers. If you would like to opt out of this review, comment assign to next reviewer:

R: @robertwb for label java.
R: @johnjcasey for label io.

Available commands:

  • stop reviewer notifications - opt out of the automated review tooling
  • remind me after tests pass - tag the comment author after tests pass
  • waiting on author - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)

The PR bot will only process comments in the main thread (not review comments).

@pabloem
Copy link
Member Author

pabloem commented Aug 29, 2022

@TheNeuralBit PTAL : )

@TheNeuralBit TheNeuralBit self-requested a review August 29, 2022 23:53
@pabloem
Copy link
Member Author

pabloem commented Sep 1, 2022

PTAL? : D I would like to have this in snapshots to build and test the Syndeo template that I'm working on

@TheNeuralBit
Copy link
Member

PTAL? : D I would like to have this in snapshots to build and test the Syndeo template that I'm working on

Ah shoot sorry. I will take a look today. One high-level question - I wonder if there's some duplication between this and the BQ TableProvider? It could be nice to have one defer to the other if possible.

@TheNeuralBit
Copy link
Member

Run Java PostCommit

Copy link
Member

@TheNeuralBit TheNeuralBit left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM overall, but I have a few suggestions. Thank you!

@@ -320,7 +320,9 @@ public Job getJob(JobReference jobRef) {
"Job %s failed: %s", job.job.getConfiguration(), e.toString())));
List<ResourceId> sourceFiles =
filesForLoadJobs.get(jobRef.getProjectId(), jobRef.getJobId());
FileSystems.delete(sourceFiles);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You might consider enabling the nullness checker in this file.

Row.withSchema(beamSchema).addValue("C").build(),
Row.withSchema(beamSchema).addValue("D").build()));

p.run();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you also test with an integration test?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added

.from("foo.com:project:dataset.table")
.withMethod(Method.DIRECT_READ)
.withSelectedFields(Lists.newArrayList("name"))
.withFormat(DataFormat.AVRO)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this work with Arrow format as well?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, it does (I've tested it on the syndeo template... and I've added an integration test)


beamSchema = Schema.builder().addFields(flds).build();
SerializableFunction<T, Row> toBeamRow = getToBeamRowFn().apply(beamSchema);
SerializableFunction<Row, T> fromBeamRow = getFromBeamRowFn().apply(beamSchema);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We already have this logic in BigQueryIO.expand (only triggered in the non direct read path). I wonder if we can refactor and re-use that path for both paths? Similar to what Svetak did in Python.

@pabloem
Copy link
Member Author

pabloem commented Sep 2, 2022

Run Java PostCommit

@pabloem
Copy link
Member Author

pabloem commented Sep 2, 2022

Run Java PreCommit

@pabloem pabloem merged commit 948af30 into apache:master Sep 2, 2022
@pabloem pabloem deleted the beam-schema-bq-direct-read branch September 2, 2022 16:51
dedocibula pushed a commit to dedocibula/beam that referenced this pull request Sep 15, 2022
* Adding support for Beam Schema Rows with BQ DIRECT_READ

* Fixing for trimmed-out fields

* refactor

* Fix NPE in FakeJobService

* Addressing comments
kkdoon pushed a commit to twitter-forks/beam that referenced this pull request Sep 29, 2022
* Adding support for Beam Schema Rows with BQ DIRECT_READ

* Fixing for trimmed-out fields

* refactor

* Fix NPE in FakeJobService

* Addressing comments
cushon pushed a commit to cushon/beam that referenced this pull request Oct 17, 2022
* Adding support for Beam Schema Rows with BQ DIRECT_READ

* Fixing for trimmed-out fields

* refactor

* Fix NPE in FakeJobService

* Addressing comments
Abacn pushed a commit to Abacn/beam that referenced this pull request Jan 31, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants