Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-7802] Expose a method to make an Schema coder from an Avro coder #9130

Merged
merged 5 commits into from
Aug 16, 2019

Conversation

iemejia
Copy link
Member

@iemejia iemejia commented Jul 23, 2019

This is just my first PR around Schema/SQL. I found that it was not straight forward to transform an Avro-based PCollection into a Beam schema one so did this. It also does some minor clean ups in AvroUtils and the SQL example.

R: @reuvenlax @kanterov

@iemejia iemejia requested a review from reuvenlax July 23, 2019 19:34
@reuvenlax
Copy link
Contributor

reuvenlax commented Jul 23, 2019 via email

@iemejia iemejia force-pushed the BEAM-7802-avro-schema-pcollection branch from 1d12b70 to ae3c28c Compare August 2, 2019 13:31
@iemejia iemejia requested a review from kanterov August 2, 2019 13:31
@iemejia
Copy link
Member Author

iemejia commented Aug 2, 2019

Rebased to fix a merge issue PTAL when you have some time. Notice that there are other Avro / Schema / SQL improvements as part of this but all are isolated in its own commits to ease the review.

Copy link
Member

@kanterov kanterov left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've left a few comments. I see your point that we need a way to use Avro SchemaCoder without changing each Source like KafkaIO, PubSubIO, etc. What I'm not certain about is the approach of having a static method that takes PCollection in favor of using existing APIs such as PCollection#setCoder and SchemaRegistry.

import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Suppliers;

/** Helpers for working with Avro. */
class AvroUtils {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is one more similar class in AvroCoder, but I guess changing it will introduce incompatible change into AvroCoder and break streaming pipelines, so let's keep it for now.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok just for curiosity which class is it?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AvroCoder.SerializableSchemaSupplier

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But we can't change it, or it will break Java serialization.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, I don't know how it works now, but it uses classes from vendored guava, that changes namespace each time we change guava version. I'm wondering if we already broke it without noticing.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I asked on dev, but probably we already broke it a couple of times

if (!pc.hasSchema()) {
Schema beamSchema = getSchema(clazz, schema);
if (beamSchema != null) {
pc.setSchema(beamSchema, getToRowFunction(clazz, schema), getFromRowFunction(clazz));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pc.setSchema is equivalent to pc.setCoder(SchemaCoder.of(...))

I was thinking, what if we just create a static method to create SchemaCoder instead, then user-facing API would be like:

KafkaIO.read(MyRecord.class)
  .setCoder(AvroUtils.schemaCoder(MyRecord.class))

or, it can be done implicitly by registering classes in advance:

p.getSchemaRegistry().registerSchemaProvider(MyRecord.class, new AvroRecordSchema());

or for every SpecificRecord (didn't try this, but it should work as well):

p.getSchemaRegistry().registerSchemaProvider(SpecificRecord.class, new AvroRecordSchema());

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes good idea, that looks cleaner I will add the schemaCoder method to AvroUtils, it should cover all the mentioned cases. will ping you back when done. Thanks for the review ideas.

@iemejia iemejia force-pushed the BEAM-7802-avro-schema-pcollection branch from ae3c28c to 050e296 Compare August 6, 2019 13:38
@iemejia
Copy link
Member Author

iemejia commented Aug 6, 2019

Did suggested updates PTAL @kanterov I added schemaCoder methods to match those of AvroCoder.of and one additional one to do it from an existing AvroCoder that one is relatively simple from the others and I can remove it if you prefer, but I ended up rewriting these two lines multiple times.

@iemejia
Copy link
Member Author

iemejia commented Aug 6, 2019

I was starting to think if we should somehow make every AvroCoder PCollection a SchemaCoder for user friendliness, but I have my doubts on taking this 'implicit' approach, I am a bit worried about breaking backwards compatibility, but somehow it makes sense too, mmm... hard to decide/know.

@kanterov
Copy link
Member

Changing AvroCoder will definitely break compatibility, especially streaming pipelines reading from PubSub or Kafka. In addition, SchemaCoder for Avro isn't as good (yet) as AvroCoder. As an example, it would serialize enums as strings, that is very inefficient when shuffling data. Another source of problems is that it doesn't support all Avro features. I believe once it matures we it could be the default, but we aren't there. In any case, I think it's a good exercise to think where we want to put SchemaCoder and how we are going to evolve AvroCoder, so, probably we should start a threat on dev@.

The code looks good. I agree and support your motivation on making fewer things private, but I don't find it practical to break it now given that we know for sure that there are codebases relying on it being public to avoid limitations of existing APIs, so I propose to postpone this before things stabilize.

@iemejia iemejia force-pushed the BEAM-7802-avro-schema-pcollection branch from 050e296 to 6978655 Compare August 15, 2019 21:37
@iemejia
Copy link
Member Author

iemejia commented Aug 15, 2019

@kanterov I restored the access modifiers and let everything as suggested.

I think we should merge this as it is and open the discussion in dev@ since the classes are still experimental we can still adapt the changes we conclude from the discussion and ‘experimental’ users can benefit of having this available in the meantime. WDYT?

@kanterov
Copy link
Member

@iemejia great, agree, please feel free to merge when PreCommit check passes

@iemejia
Copy link
Member Author

iemejia commented Aug 16, 2019

Merging since it is green now. Thanks for the review @kanterov and @RyanSkraba

@iemejia iemejia merged commit c825d71 into apache:master Aug 16, 2019
@iemejia iemejia changed the title [BEAM-7802] Expose a method to make an Avro-based PCollection into an Schema-based one [BEAM-7802] Expose a method to make an Schema coder from an Avro coder Aug 16, 2019
@iemejia iemejia deleted the BEAM-7802-avro-schema-pcollection branch August 16, 2019 20:47
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants