-
Notifications
You must be signed in to change notification settings - Fork 4.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Adding support for Beam Schema Rows with BQ DIRECT_READ #22926
Conversation
Run Java PostCommit |
r: @svetakvsundhar This adds only schema support for table reads, not for query reads but it may not be difficult to add it for query reads. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, this looks helpful, will wait for @TheNeuralBit 's feedback :)
Because you are defining fromBeamRow
, is this also looking to be supported in WriteToBQ
?
|
||
beamSchema = Schema.builder().addFields(flds).build(); | ||
SerializableFunction<T, Row> toBeamRow = getToBeamRowFn().apply(beamSchema); | ||
SerializableFunction<Row, T> fromBeamRow = getFromBeamRowFn().apply(beamSchema); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is the purpose of this here when we wan't to convert something from a BeamRow
type into another type? (e.g. the WriteToBQ case)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes right! It relies on BQUtils to generate a converter function.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We already have this logic in BigQueryIO.expand
(only triggered in the non direct read path). I wonder if we can refactor and re-use that path for both paths? Similar to what Svetak did in Python.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done!
Assigning reviewers. If you would like to opt out of this review, comment R: @robertwb for label java. Available commands:
The PR bot will only process comments in the main thread (not review comments). |
43ed68b
to
42ce95f
Compare
@TheNeuralBit PTAL : ) |
PTAL? : D I would like to have this in snapshots to build and test the Syndeo template that I'm working on |
Ah shoot sorry. I will take a look today. One high-level question - I wonder if there's some duplication between this and the BQ TableProvider? It could be nice to have one defer to the other if possible. |
Run Java PostCommit |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM overall, but I have a few suggestions. Thank you!
@@ -320,7 +320,9 @@ public Job getJob(JobReference jobRef) { | |||
"Job %s failed: %s", job.job.getConfiguration(), e.toString()))); | |||
List<ResourceId> sourceFiles = | |||
filesForLoadJobs.get(jobRef.getProjectId(), jobRef.getJobId()); | |||
FileSystems.delete(sourceFiles); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You might consider enabling the nullness checker in this file.
Row.withSchema(beamSchema).addValue("C").build(), | ||
Row.withSchema(beamSchema).addValue("D").build())); | ||
|
||
p.run(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you also test with an integration test?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added
.from("foo.com:project:dataset.table") | ||
.withMethod(Method.DIRECT_READ) | ||
.withSelectedFields(Lists.newArrayList("name")) | ||
.withFormat(DataFormat.AVRO) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will this work with Arrow format as well?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, it does (I've tested it on the syndeo template... and I've added an integration test)
|
||
beamSchema = Schema.builder().addFields(flds).build(); | ||
SerializableFunction<T, Row> toBeamRow = getToBeamRowFn().apply(beamSchema); | ||
SerializableFunction<Row, T> fromBeamRow = getFromBeamRowFn().apply(beamSchema); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We already have this logic in BigQueryIO.expand
(only triggered in the non direct read path). I wonder if we can refactor and re-use that path for both paths? Similar to what Svetak did in Python.
Run Java PostCommit |
Run Java PreCommit |
* Adding support for Beam Schema Rows with BQ DIRECT_READ * Fixing for trimmed-out fields * refactor * Fix NPE in FakeJobService * Addressing comments
* Adding support for Beam Schema Rows with BQ DIRECT_READ * Fixing for trimmed-out fields * refactor * Fix NPE in FakeJobService * Addressing comments
* Adding support for Beam Schema Rows with BQ DIRECT_READ * Fixing for trimmed-out fields * refactor * Fix NPE in FakeJobService * Addressing comments
…2926 and apache/beam/apache#22942 in snapshots
Please add a meaningful description for your change here
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
R: @username
).addresses #123
), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, commentfixes #<ISSUE NUMBER>
instead.CHANGES.md
with noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI.