Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[AVRO] Generation of Avro schemas with logical types. #133

Closed
wants to merge 26 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
6a3aa4c
WIP Generation of avro schemas with logical types.
jcustenborder May 8, 2018
d21e6f0
Decimal schema generation is working. Serialization should be working…
jcustenborder May 15, 2018
d316fa5
Working using the JacksonAvroParserImpl. Both fixed and byte based de…
jcustenborder May 21, 2018
c3eeb5a
Restructuring of the test cases.
jcustenborder May 22, 2018
d97d401
Added more around logicalTypes and java.time.
jcustenborder May 22, 2018
386a4f0
Refactored java8 classes out to it's own module. This will allow the …
jcustenborder May 24, 2018
f9c5c3b
Cleaned up logic for converting dates. It's going to be lossy but tha…
jcustenborder May 24, 2018
6e62edb
Moved decimals to use serializers like the other types.
jcustenborder May 24, 2018
7b55daf
Accidentally added java8 dependencies. Reverted.
jcustenborder May 24, 2018
f260229
Removed direct avro dependency. We're already getting this from `jack…
jcustenborder May 24, 2018
817d80a
Added support for java.util.Date for logical types `date`, `time-mill…
jcustenborder May 24, 2018
d4d13ba
Refactored to use a single attribute. Deprecated AvroFixedSize attrib…
jcustenborder May 31, 2018
1f8c05b
WIP Generation of avro schemas with logical types.
jcustenborder May 8, 2018
b166033
Decimal schema generation is working. Serialization should be working…
jcustenborder May 15, 2018
7cfa079
Working using the JacksonAvroParserImpl. Both fixed and byte based de…
jcustenborder May 21, 2018
b5706c3
Restructuring of the test cases.
jcustenborder May 22, 2018
39c689f
Added more around logicalTypes and java.time.
jcustenborder May 22, 2018
59fd5c5
Refactored java8 classes out to it's own module. This will allow the …
jcustenborder May 24, 2018
64c3497
Cleaned up logic for converting dates. It's going to be lossy but tha…
jcustenborder May 24, 2018
99b4770
Moved decimals to use serializers like the other types.
jcustenborder May 24, 2018
4597141
Removed direct avro dependency. We're already getting this from `jack…
jcustenborder May 24, 2018
41c0344
Added support for java.util.Date for logical types `date`, `time-mill…
jcustenborder May 24, 2018
17ff44a
Refactored to use a single attribute. Deprecated AvroFixedSize attrib…
jcustenborder May 31, 2018
9492fc4
Bump version to 2.10.0.pr2-SNAPSHOT.
jcustenborder Aug 23, 2019
b1d374d
Removed support for ZonedDateTime and OffsetDateTime because there ar…
jcustenborder Aug 23, 2019
9df02f5
Merge remote-tracking branch 'origin/issue-132' into issue-132
jcustenborder Aug 23, 2019
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 91 additions & 0 deletions avro-java8/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformats-binary</artifactId>
<version>2.10.0.pr2-SNAPSHOT</version>
</parent>
<artifactId>jackson-dataformat-avro-java8</artifactId>
<name>Jackson dataformat: Avro Java 8</name>
<packaging>bundle</packaging>
<description>Support for reading and writing AVRO-encoded data via Jackson
abstractions.
</description>
<url>http://github.com/FasterXML/jackson-dataformats-binary</url>

<properties>
<!-- Generate PackageVersion.java into this directory. -->
<packageVersion.dir>com/fasterxml/jackson/dataformat/avro/java8</packageVersion.dir>
<packageVersion.package>${project.groupId}.avro.java8</packageVersion.package>
</properties>

<dependencies>
<!-- Hmmh. Need annotations for introspection... -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
</dependency>
<!-- and databind for Avro Schema generation... -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-avro</artifactId>
<version>${project.version}</version>
</dependency>

<!-- and for testing we need logback -->
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.1.3</version>
<scope>test</scope>
</dependency>
<!-- A bit of help to reduce boiler-plate in dummy test classes -->
<dependency>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Quick note: no, unfortunately I will not allow Lombok as a dependency (some usage did sneak in, and I have been busy removing it). Originally reason was that Lombok use prevented clean build out of cloned repo (some component had to be locally installed).

<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.16.14</version>
<scope>test</scope>
</dependency>
<!-- For validating more complex comparisons -->
<!-- 27-Feb-2017, tatu: NOTE! Can NOT use 3.x as it requires Java 8
-->
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<version>2.5.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-jsr310</artifactId>
</dependency>
</dependencies>

<build>
<plugins>
<plugin><!-- Inherited from oss-base. Generate PackageVersion.java.-->
<groupId>com.google.code.maven-replacer-plugin</groupId>
<artifactId>replacer</artifactId>
<executions>
<execution>
<id>process-packageVersion</id>
<phase>generate-sources</phase>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>8</source>
<target>8</target>
</configuration>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
package com.fasterxml.jackson.dataformat.avro.java8;

import com.fasterxml.jackson.core.Version;
import com.fasterxml.jackson.databind.introspect.Annotated;
import com.fasterxml.jackson.dataformat.avro.AvroAnnotationIntrospector;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will need to use different Java package if this remains as separate component, as Java Module system does not allow Split packages (classes for single Java package coming from multiple modules, that is, jars with module-info).
But this is work-in-progress so it's fine until packaging details decided.

import com.fasterxml.jackson.dataformat.avro.AvroType;
import com.fasterxml.jackson.dataformat.avro.java8.deser.InstantDeserializer;
import com.fasterxml.jackson.dataformat.avro.java8.deser.LocalDateDeserializer;
import com.fasterxml.jackson.dataformat.avro.java8.deser.LocalDateTimeDeserializer;
import com.fasterxml.jackson.dataformat.avro.java8.deser.LocalTimeDeserializer;
import com.fasterxml.jackson.dataformat.avro.java8.ser.InstantSerializer;
import com.fasterxml.jackson.dataformat.avro.java8.ser.LocalDateSerializer;
import com.fasterxml.jackson.dataformat.avro.java8.ser.LocalDateTimeSerializer;
import com.fasterxml.jackson.dataformat.avro.java8.ser.LocalTimeSerializer;

import java.time.Instant;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;

class AvroJavaTimeAnnotationIntrospector extends AvroAnnotationIntrospector {
static final AvroJavaTimeAnnotationIntrospector INSTANCE = new AvroJavaTimeAnnotationIntrospector();

@Override
public Object findSerializer(Annotated a) {
AvroType logicalType = _findAnnotation(a, AvroType.class);
if (null != logicalType) {
switch (logicalType.logicalType()) {
case TIMESTAMP_MILLISECOND:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

timestamps have no timezone, explicit or implicit. it is more correct to model them using java.time.Instant. this can be converted into any XXXDateTime class by providing a timezone, or stating it is implicit - this information is not available from the encoded timestamp itself

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lukejackson That makes sense because AVRO stores everything without timezone as well. With jackson I was under the impression that I had to create a serializer and deserializer for each of the java.time.* classes. Is there another approach that I can take?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My suggestion would be to add java.time.Instant support - this is arguably the canonical representation of timestamps in Java 8+.

As for the other XXXDateTime types, I think there is an argument for them not to be supported, given there is no corresponding type in Avro.

If you do introduce a conversion, it will introduce information not on the wire (e.g. timezone) and also be lossy, given a date time with a time zone can map to two different timestamps during daylight savings time transitions.

I would argue that given these limitations, it should be left to the user to perform the conversion and understand their assumptions and associated limitations.

Note the closest Avro does have is its date and time-xxx logical types. To model a LocalDateTime in Avro requires two fields with these corresponding types. To model a ZonedDateTIme requires a third field with the zone id.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I get what you are saying now. Not having Instant support was an oversight. My last comment I thought you were telling me to convert to instant and delegate that to a serializer. I'll add instant support. LocalDateTime and LocalDate would be the closest thing to the avro specification given that neither Local* or the avro specification include timezone information.

Copy link

@lukejackson lukejackson Aug 13, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I was talking about a user of this library doing the conversion themselves, rather than Jackson itself.

I can see how LocalDate is the closest to the avro spec for date, but not sure I understand what you mean by LocalDateTime - I don't think there is anything in the avro spec that relates to that?

Don't want to sound nit picking, and appreciate your effort here. The concepts behind the date and time types are deceptively complicated. Given apparent recent stagnation in the Avro project (and there is a long standing PR to add java.time there), I am hoping that this Jackson module could be a workable alternative to core Avro for some of my use cases.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is what I was thinking when I started this PR. The Local* time classes go not have timezone information so they translate perfectly to the corresponding avro logical types. Not having instant was just an oversight. I've already started on adding it to this pull. Here is what I'm thinking. Thinking about your comments on OffsetDateTime and ZonedDateTime. You are correct. I was planning on doing some lossy conversions to support these types. Alternatively I could create logical types for them. The point of logical types in Avro is to support things outside of the specification. To do this I could create a logical type called zoned-date, zoned-time-millis, zoned-time-micros, zoned-timestamp-millis, and zoned-timestamp-millis. This would allow me to store the timezone information along with the date/timestamp information.

This was my original thoughts.

Java Type Avro Logical Type Notes
LocalDate Date Lossless conversion
LocalTime time-millis, time-micros Lossless conversion
LocalDateTime timestamp-millis, timestamp-millis Lossless conversion
Instant timestamp-millis, timestamp-millis Lossless conversion
OffsetDateTime timestamp-millis, timestamp-millis Lossy conversion? Custom logical type? Timestamp adjusted to UTC.
ZonedDateTime timestamp-millis, timestamp-millis Lossy conversion? Custom logical type? Timestamp adjusted to UTC.
OffsetTime time-millis, time-micros Lossy conversion? Custom logical type? Timestamp adjusted to UTC.

Copy link

@lukejackson lukejackson Aug 16, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I hadn't thought about adding new logical types to Avro.

I think the way forward depends on what you're looking to achieve. If it's a way to represent all the Java related time classes in Avro - i.e. the Java POJO is the canonical schema and the Avro representation is just derived from that - then I can understand you'd look for a way to encode them in Avro. There are some options:

  1. reusing another type which is close in meaning (e.g. timestamp-millis for LocalDateTime and have any consumers - which may not be this library - know that they should convert the timestamp into a ZonedDateTime with UTC time zone and convert to LocalDateTime)
  2. encoding the types as strings, and parsing them as such. I believe this is what Jackson does when encoding Java times to JSON.
  3. adding your own logical types. this would mean that any consumer - which again may not be this library - would also have to implement them.
  4. using the record type composed of types supported by Avro. this is what Jackson does when converting POJOs to JSON - they become JSON dictionaries. Avro records would be the equivalent here. e.g. LocalDateTime would be a record containing a date field and a time-millis/time-micros field.

If instead you are looking for a way to directly represent Avro records as Java classes - i.e. the Avro schema is the canonical schema and the Java POJOs are just derived from that - then you just need to support the Avro types and no more, and leave it to the user to define their schemas to model the data they want to encode (e.g. manually define a record representing local date and local time), or let them choose how to shoehorn the value into an existing Avro type (e.g. represent as a string, as a long, etc.)

For my use cases, the Avro schemas are the canonical schemas. As such I only use the Avro types. This is because I use Avro to communicate between applications written in multiple languages, such as Java, C++, Python, and I rely on Kafka tools that only support Avro types (e.g. Connect and its various sinks).

Where the Avro types don't support my use case I either use a different type (e.g. long for nano precision timestamps) or multiple fields (e.g. combination of date and time-millis for a local date time), and I'm happy to accept that these fields would appear as such in Java, C++, Python, my database tables, etc.

if (a.getRawType().isAssignableFrom(LocalDateTime.class)) {
return LocalDateTimeSerializer.MILLIS;
}
if (a.getRawType().isAssignableFrom(Instant.class)) {
return InstantSerializer.MILLIS;
}
break;
case TIMESTAMP_MICROSECOND:
if (a.getRawType().isAssignableFrom(LocalDateTime.class)) {
return LocalDateTimeSerializer.MICROS;
}
if (a.getRawType().isAssignableFrom(Instant.class)) {
return InstantSerializer.MICROS;
}
break;
case DATE:
if (a.getRawType().isAssignableFrom(LocalDate.class)) {
return LocalDateSerializer.INSTANCE;
}
break;
case TIME_MILLISECOND:
if (a.getRawType().isAssignableFrom(LocalTime.class)) {
return LocalTimeSerializer.MILLIS;
}
break;
case TIME_MICROSECOND:
if (a.getRawType().isAssignableFrom(LocalTime.class)) {
return LocalTimeSerializer.MICROS;
}
break;
}
}

return super.findSerializer(a);
}

@Override
public Object findDeserializer(Annotated a) {
AvroType logicalType = _findAnnotation(a, AvroType.class);
if (null != logicalType) {
switch (logicalType.logicalType()) {
case TIMESTAMP_MILLISECOND:
if (a.getRawType().isAssignableFrom(LocalDateTime.class)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For serializers, should check isAssignableFrom (since subtype serialization probably fine); for deserializers fine (and perhaps preferable) to do equality check -- can not substitute base value to subtype property.

return LocalDateTimeDeserializer.MILLIS;
}
if (a.getRawType().isAssignableFrom(Instant.class)) {
return InstantDeserializer.MILLIS;
}
break;
case TIMESTAMP_MICROSECOND:
if (a.getRawType().isAssignableFrom(LocalDateTime.class)) {
return LocalDateTimeDeserializer.MICROS;
}
if (a.getRawType().isAssignableFrom(Instant.class)) {
return InstantDeserializer.MICROS;
}
break;
case DATE:
if (a.getRawType().isAssignableFrom(LocalDate.class)) {
return LocalDateDeserializer.INSTANCE;
}
break;
case TIME_MILLISECOND:
if (a.getRawType().isAssignableFrom(LocalTime.class)) {
return LocalTimeDeserializer.MILLIS;
}
break;
case TIME_MICROSECOND:
if (a.getRawType().isAssignableFrom(LocalTime.class)) {
return LocalTimeDeserializer.MICROS;
}
break;
}
}

return super.findDeserializer(a);
}

@Override
public Version version() {
return PackageVersion.VERSION;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package com.fasterxml.jackson.dataformat.avro.java8;

import com.fasterxml.jackson.core.Version;
import com.fasterxml.jackson.dataformat.avro.AvroModule;

public class AvroJavaTimeModule extends AvroModule {


public AvroJavaTimeModule() {
withAnnotationIntrospector(AvroJavaTimeAnnotationIntrospector.INSTANCE);
}


@Override
public Version version() {
return PackageVersion.VERSION;
}


}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package @package@;

import com.fasterxml.jackson.core.Version;
import com.fasterxml.jackson.core.Versioned;
import com.fasterxml.jackson.core.util.VersionUtil;

/**
* Automatically generated from PackageVersion.java.in during
* packageVersion-generate execution of maven-replacer-plugin in
* pom.xml.
*/
public final class PackageVersion implements Versioned {
public final static Version VERSION = VersionUtil.parseVersion(
"@projectversion@", "@projectgroupid@", "@projectartifactid@");

@Override
public Version version() {
return VERSION;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package com.fasterxml.jackson.dataformat.avro.java8.deser;

import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.databind.DeserializationContext;
import com.fasterxml.jackson.databind.JsonDeserializer;

import java.io.IOException;
import java.time.Instant;
import java.time.ZoneId;
import java.time.temporal.ChronoUnit;
import java.util.concurrent.TimeUnit;

public abstract class BaseTimeJsonDeserializer<T> extends JsonDeserializer<T> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd suggest extending StdScalarDeserializer instead.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, on naming, since this is Avro-specific, maybe rather BaseTimeAvroDeserializer (or something).

private final TimeUnit resolution;
final ZoneId zoneId = ZoneId.of("UTC");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

static?


BaseTimeJsonDeserializer(TimeUnit resolution) {
this.resolution = resolution;
}

abstract T fromInstant(Instant input);

@Override
public T deserialize(JsonParser p, DeserializationContext ctxt) throws IOException {
final long input = p.getLongValue();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should probably add checking for other input types, give more meaningful error message in case underlying physical token is not a 64-bit long (schema mismatch)? And unit test for the same would be useful as well.

final ChronoUnit chronoUnit;

switch (this.resolution) {
case MICROSECONDS:
chronoUnit = ChronoUnit.MICROS;
break;
case MILLISECONDS:
chronoUnit = ChronoUnit.MILLIS;
break;
default:
throw new UnsupportedOperationException(
String.format("%s is not supported", this.resolution)
);
}
final Instant instant = Instant.EPOCH.plus(input, chronoUnit);
return fromInstant(instant);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package com.fasterxml.jackson.dataformat.avro.java8.deser;

import com.fasterxml.jackson.databind.JsonDeserializer;

import java.time.Instant;
import java.util.concurrent.TimeUnit;

public class InstantDeserializer extends BaseTimeJsonDeserializer<Instant> {
public static JsonDeserializer<Instant> MILLIS = new InstantDeserializer(TimeUnit.MILLISECONDS);
public static JsonDeserializer<Instant> MICROS = new InstantDeserializer(TimeUnit.MICROSECONDS);

InstantDeserializer(TimeUnit resolution) {
super(resolution);
}

@Override
Instant fromInstant(Instant input) {
return input;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package com.fasterxml.jackson.dataformat.avro.java8.deser;

import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.DeserializationContext;
import com.fasterxml.jackson.databind.JsonDeserializer;

import java.io.IOException;
import java.time.LocalDate;

public class LocalDateDeserializer extends JsonDeserializer<LocalDate> {
public static final JsonDeserializer<LocalDate> INSTANCE = new LocalDateDeserializer();

@Override
public LocalDate deserialize(JsonParser jsonParser, DeserializationContext deserializationContext) throws IOException, JsonProcessingException {
return LocalDate.ofEpochDay(jsonParser.getLongValue());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package com.fasterxml.jackson.dataformat.avro.java8.deser;

import com.fasterxml.jackson.databind.JsonDeserializer;

import java.time.Instant;
import java.time.LocalDateTime;
import java.util.concurrent.TimeUnit;

public class LocalDateTimeDeserializer extends BaseTimeJsonDeserializer<LocalDateTime> {
public static JsonDeserializer<LocalDateTime> MILLIS = new LocalDateTimeDeserializer(TimeUnit.MILLISECONDS);
public static JsonDeserializer<LocalDateTime> MICROS = new LocalDateTimeDeserializer(TimeUnit.MICROSECONDS);

LocalDateTimeDeserializer(TimeUnit resolution) {
super(resolution);
}

@Override
protected LocalDateTime fromInstant(Instant input) {
return LocalDateTime.ofInstant(input, this.zoneId);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package com.fasterxml.jackson.dataformat.avro.java8.deser;

import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.DeserializationContext;
import com.fasterxml.jackson.databind.JsonDeserializer;

import java.io.IOException;
import java.time.LocalTime;
import java.util.concurrent.TimeUnit;

public class LocalTimeDeserializer extends JsonDeserializer<LocalTime> {
public static JsonDeserializer<LocalTime> MILLIS = new LocalTimeDeserializer(TimeUnit.MILLISECONDS);
public static JsonDeserializer<LocalTime> MICROS = new LocalTimeDeserializer(TimeUnit.MICROSECONDS);

final TimeUnit resolution;

LocalTimeDeserializer(TimeUnit resolution) {
this.resolution = resolution;
}

@Override
public LocalTime deserialize(JsonParser jsonParser, DeserializationContext deserializationContext) throws IOException, JsonProcessingException {
long value = jsonParser.getLongValue();
long nanos = this.resolution.toNanos(value);
return LocalTime.ofNanoOfDay(nanos);
}
}
Loading