-
Notifications
You must be signed in to change notification settings - Fork 4.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[BEAM-7802] Expose a method to make an Schema coder from an Avro coder #9130
[BEAM-7802] Expose a method to make an Schema coder from an Avro coder #9130
Conversation
Very cool!
…On Tue, Jul 23, 2019 at 12:34 PM Ismaël Mejía ***@***.***> wrote:
This is just my first PR while playing a bit with Schemas and SQL. I found
that it was not straight forward to transform an Avro-based PCollection
into a Beam schema one so did this. It also does some minor clean ups in
AvroUtils and the SQL example.
R: @reuvenlax <https://github.com/reuvenlax>
------------------------------
You can view, comment on, or merge this pull request online at:
#9130
Commit Summary
- [BEAM-7802] Make SQL example slightly simpler
- [BEAM-7802] Inline AvroUtils methods to have only one public
AvroUtils class in core SDK
- [BEAM-7802] Expose a method to make an Avro-based PCollection into
an Schema-based one
- [BEAM-7802] Fix minor issues (access modifiers + static) in AvroUtils
File Changes
- *M* sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
<https://github.com/apache/beam/pull/9130/files#diff-0> (55)
- *D*
sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroUtils.java
<https://github.com/apache/beam/pull/9130/files#diff-1> (40)
- *M*
sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java
<https://github.com/apache/beam/pull/9130/files#diff-2> (28)
- *M*
sdks/java/core/src/test/java/org/apache/beam/sdk/schemas/utils/AvroUtilsTest.java
<https://github.com/apache/beam/pull/9130/files#diff-3> (62)
- *M*
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/BeamSqlExample.java
<https://github.com/apache/beam/pull/9130/files#diff-4> (15)
- *M*
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/example/BeamSqlPojoExample.java
<https://github.com/apache/beam/pull/9130/files#diff-5> (17)
Patch Links:
- https://github.com/apache/beam/pull/9130.patch
- https://github.com/apache/beam/pull/9130.diff
—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
<#9130?email_source=notifications&email_token=AFAYJVMG43LNYQ3KSZNZ6RLQA5MMXA5CNFSM4IGIIBM2YY3PNVWWK3TUL52HS4DFUVEXG43VMWVGG33NNVSW45C7NFSM4HBAPOHA>,
or mute the thread
<https://github.com/notifications/unsubscribe-auth/AFAYJVJ67DOAFUJSA3GSK33QA5MMXANCNFSM4IGIIBMQ>
.
|
1d12b70
to
ae3c28c
Compare
Rebased to fix a merge issue PTAL when you have some time. Notice that there are other Avro / Schema / SQL improvements as part of this but all are isolated in its own commits to ease the review. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've left a few comments. I see your point that we need a way to use Avro SchemaCoder without changing each Source like KafkaIO, PubSubIO, etc. What I'm not certain about is the approach of having a static method that takes PCollection in favor of using existing APIs such as PCollection#setCoder
and SchemaRegistry
.
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Suppliers; | ||
|
||
/** Helpers for working with Avro. */ | ||
class AvroUtils { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is one more similar class in AvroCoder
, but I guess changing it will introduce incompatible change into AvroCoder
and break streaming pipelines, so let's keep it for now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok just for curiosity which class is it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
AvroCoder.SerializableSchemaSupplier
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But we can't change it, or it will break Java serialization.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, I don't know how it works now, but it uses classes from vendored guava, that changes namespace each time we change guava version. I'm wondering if we already broke it without noticing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I asked on dev, but probably we already broke it a couple of times
sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/utils/AvroUtils.java
Outdated
Show resolved
Hide resolved
if (!pc.hasSchema()) { | ||
Schema beamSchema = getSchema(clazz, schema); | ||
if (beamSchema != null) { | ||
pc.setSchema(beamSchema, getToRowFunction(clazz, schema), getFromRowFunction(clazz)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pc.setSchema
is equivalent to pc.setCoder(SchemaCoder.of(...))
I was thinking, what if we just create a static method to create SchemaCoder
instead, then user-facing API would be like:
KafkaIO.read(MyRecord.class)
.setCoder(AvroUtils.schemaCoder(MyRecord.class))
or, it can be done implicitly by registering classes in advance:
p.getSchemaRegistry().registerSchemaProvider(MyRecord.class, new AvroRecordSchema());
or for every SpecificRecord
(didn't try this, but it should work as well):
p.getSchemaRegistry().registerSchemaProvider(SpecificRecord.class, new AvroRecordSchema());
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes good idea, that looks cleaner I will add the schemaCoder
method to AvroUtils, it should cover all the mentioned cases. will ping you back when done. Thanks for the review ideas.
ae3c28c
to
050e296
Compare
Did suggested updates PTAL @kanterov I added |
I was starting to think if we should somehow make every AvroCoder PCollection a SchemaCoder for user friendliness, but I have my doubts on taking this 'implicit' approach, I am a bit worried about breaking backwards compatibility, but somehow it makes sense too, mmm... hard to decide/know. |
Changing AvroCoder will definitely break compatibility, especially streaming pipelines reading from PubSub or Kafka. In addition, SchemaCoder for Avro isn't as good (yet) as AvroCoder. As an example, it would serialize enums as strings, that is very inefficient when shuffling data. Another source of problems is that it doesn't support all Avro features. I believe once it matures we it could be the default, but we aren't there. In any case, I think it's a good exercise to think where we want to put SchemaCoder and how we are going to evolve AvroCoder, so, probably we should start a threat on dev@. The code looks good. I agree and support your motivation on making fewer things private, but I don't find it practical to break it now given that we know for sure that there are codebases relying on it being public to avoid limitations of existing APIs, so I propose to postpone this before things stabilize. |
sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
Outdated
Show resolved
Hide resolved
sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
Outdated
Show resolved
Hide resolved
sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
Outdated
Show resolved
Hide resolved
…s class in core SDK Even if this class is not public, it is barely used so better to make it part of AvroIO.
050e296
to
6978655
Compare
@kanterov I restored the access modifiers and let everything as suggested. I think we should merge this as it is and open the discussion in dev@ since the classes are still experimental we can still adapt the changes we conclude from the discussion and ‘experimental’ users can benefit of having this available in the meantime. WDYT? |
@iemejia great, agree, please feel free to merge when PreCommit check passes |
Merging since it is green now. Thanks for the review @kanterov and @RyanSkraba |
This is just my first PR around Schema/SQL. I found that it was not straight forward to transform an Avro-based PCollection into a Beam schema one so did this. It also does some minor clean ups in AvroUtils and the SQL example.
R: @reuvenlax @kanterov