Skip to content

Commit

Permalink
Merge pull request apache#9152: [BEAM-7816] [BEAM-7817] Support Avro …
Browse files Browse the repository at this point in the history
…dates and enums in Schemas
  • Loading branch information
kanterov committed Aug 13, 2019
2 parents c950d30 + 5d41100 commit e4b268e
Show file tree
Hide file tree
Showing 4 changed files with 180 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import org.apache.avro.Schema.Type;
import org.apache.avro.data.TimeConversions;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericEnumSymbol;
import org.apache.avro.generic.GenericFixed;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.GenericRecordBuilder;
Expand All @@ -62,6 +61,8 @@
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.CaseFormat;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;
import org.joda.time.Days;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.joda.time.ReadableInstant;

Expand Down Expand Up @@ -410,6 +411,8 @@ private static Schema.FieldType toFieldType(TypeWithNullability type) {
// TODO: There is a desire to move Beam schema DATETIME to a micros representation. When
// this is done, this logical type needs to be changed.
fieldType = FieldType.DATETIME;
} else if (logicalType instanceof LogicalTypes.Date) {
fieldType = FieldType.DATETIME;
}
}

Expand Down Expand Up @@ -599,8 +602,16 @@ private static Object genericFromBeamField(
return new Conversions.DecimalConversion().toBytes(decimal, null, logicalType);

case DATETIME:
ReadableInstant instant = (ReadableInstant) value;
return instant.getMillis();
if (typeWithNullability.type.getType() == Type.INT) {
ReadableInstant instant = (ReadableInstant) value;
return (int) Days.daysBetween(Instant.EPOCH, instant).getDays();
} else if (typeWithNullability.type.getType() == Type.LONG) {
ReadableInstant instant = (ReadableInstant) value;
return (long) instant.getMillis();
} else {
throw new IllegalArgumentException(
"Can't represent " + fieldType + " as " + typeWithNullability.type.getType());
}

case BYTES:
return ByteBuffer.wrap((byte[]) value);
Expand Down Expand Up @@ -686,6 +697,13 @@ public static Object convertAvroFieldStrict(
} else {
return convertDateTimeStrict((Long) value, fieldType);
}
} else if (logicalType instanceof LogicalTypes.Date) {
if (value instanceof ReadableInstant) {
int epochDays = Days.daysBetween(Instant.EPOCH, (ReadableInstant) value).getDays();
return convertDateStrict(epochDays, fieldType);
} else {
return convertDateStrict((Integer) value, fieldType);
}
}
}

Expand Down Expand Up @@ -718,7 +736,9 @@ public static Object convertAvroFieldStrict(
return convertRecordStrict((GenericRecord) value, fieldType);

case ENUM:
return convertEnumStrict((GenericEnumSymbol) value, fieldType);
// enums are either Java enums, or GenericEnumSymbol,
// they don't share common interface, but override toString()
return convertEnumStrict(value, fieldType);

case ARRAY:
return convertArrayStrict((List<Object>) value, type.type.getElementType(), fieldType);
Expand Down Expand Up @@ -778,6 +798,11 @@ private static Object convertDecimal(BigDecimal value, Schema.FieldType fieldTyp
return value;
}

private static Object convertDateStrict(Integer epochDays, Schema.FieldType fieldType) {
checkTypeName(fieldType.getTypeName(), TypeName.DATETIME, "date");
return Instant.EPOCH.plus(Duration.standardDays(epochDays));
}

private static Object convertDateTimeStrict(Long value, Schema.FieldType fieldType) {
checkTypeName(fieldType.getTypeName(), TypeName.DATETIME, "dateTime");
return new Instant(value);
Expand All @@ -798,7 +823,7 @@ private static Object convertBooleanStrict(Boolean value, Schema.FieldType field
return value;
}

private static Object convertEnumStrict(GenericEnumSymbol value, Schema.FieldType fieldType) {
private static Object convertEnumStrict(Object value, Schema.FieldType fieldType) {
checkTypeName(fieldType.getTypeName(), Schema.TypeName.STRING, "enum");
return value.toString();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.lang.reflect.Parameter;
import java.lang.reflect.Type;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
Expand Down Expand Up @@ -58,6 +59,7 @@
import org.apache.beam.vendor.bytebuddy.v1_9_3.net.bytebuddy.implementation.bytecode.collection.ArrayAccess;
import org.apache.beam.vendor.bytebuddy.v1_9_3.net.bytebuddy.implementation.bytecode.collection.ArrayFactory;
import org.apache.beam.vendor.bytebuddy.v1_9_3.net.bytebuddy.implementation.bytecode.constant.IntegerConstant;
import org.apache.beam.vendor.bytebuddy.v1_9_3.net.bytebuddy.implementation.bytecode.member.FieldAccess;
import org.apache.beam.vendor.bytebuddy.v1_9_3.net.bytebuddy.implementation.bytecode.member.MethodInvocation;
import org.apache.beam.vendor.bytebuddy.v1_9_3.net.bytebuddy.implementation.bytecode.member.MethodReturn;
import org.apache.beam.vendor.bytebuddy.v1_9_3.net.bytebuddy.implementation.bytecode.member.MethodVariableAccess;
Expand All @@ -69,6 +71,7 @@
import org.apache.commons.lang3.ClassUtils;
import org.joda.time.Instant;
import org.joda.time.ReadableInstant;
import org.joda.time.ReadablePartial;

class ByteBuddyUtils {
private static final ForLoadedType ARRAYS_TYPE = new ForLoadedType(Arrays.class);
Expand All @@ -80,6 +83,9 @@ class ByteBuddyUtils {
private static final ForLoadedType LIST_TYPE = new ForLoadedType(List.class);
private static final ForLoadedType READABLE_INSTANT_TYPE =
new ForLoadedType(ReadableInstant.class);
private static final ForLoadedType READABLE_PARTIAL_TYPE =
new ForLoadedType(ReadablePartial.class);
private static final ForLoadedType OBJECT_TYPE = new ForLoadedType(Object.class);

/**
* A naming strategy for ByteBuddy classes.
Expand Down Expand Up @@ -151,6 +157,8 @@ public T convert(TypeDescriptor typeDescriptor) {
return convertMap(typeDescriptor);
} else if (typeDescriptor.isSubtypeOf(TypeDescriptor.of(ReadableInstant.class))) {
return convertDateTime(typeDescriptor);
} else if (typeDescriptor.isSubtypeOf(TypeDescriptor.of(ReadablePartial.class))) {
return convertDateTime(typeDescriptor);
} else if (typeDescriptor.isSubtypeOf(TypeDescriptor.of(ByteBuffer.class))) {
return convertByteBuffer(typeDescriptor);
} else if (typeDescriptor.isSubtypeOf(TypeDescriptor.of(GenericFixed.class))) {
Expand All @@ -160,6 +168,8 @@ public T convert(TypeDescriptor typeDescriptor) {
return convertCharSequence(typeDescriptor);
} else if (typeDescriptor.getRawType().isPrimitive()) {
return convertPrimitive(typeDescriptor);
} else if (typeDescriptor.getRawType().isEnum()) {
return convertEnum(typeDescriptor);
} else {
return convertDefault(typeDescriptor);
}
Expand All @@ -181,6 +191,8 @@ public T convert(TypeDescriptor typeDescriptor) {

protected abstract T convertPrimitive(TypeDescriptor<?> type);

protected abstract T convertEnum(TypeDescriptor<?> type);

protected abstract T convertDefault(TypeDescriptor<?> type);
}

Expand Down Expand Up @@ -250,6 +262,11 @@ protected Type convertPrimitive(TypeDescriptor<?> type) {
return ClassUtils.primitiveToWrapper(type.getRawType());
}

@Override
protected Type convertEnum(TypeDescriptor<?> type) {
return String.class;
}

@Override
protected Type convertDefault(TypeDescriptor<?> type) {
return returnRawTypes ? type.getRawType() : type.getType();
Expand Down Expand Up @@ -328,29 +345,73 @@ protected StackManipulation convertDateTime(TypeDescriptor<?> type) {
if (Instant.class.isAssignableFrom(type.getRawType())) {
return readValue;
}

// Otherwise, generate the following code:
//
// for ReadableInstant:
// return new Instant(value.getMillis());
//
// for ReadablePartial:
// return new Instant((value.toDateTime(Instant.EPOCH)).getMillis());

List<StackManipulation> stackManipulations = new ArrayList<>();

// Create a new instance of the target type.
stackManipulations.add(TypeCreation.of(INSTANT_TYPE));
stackManipulations.add(Duplication.SINGLE);

// if value is ReadablePartial, convert it to ReadableInstant first
if (ReadablePartial.class.isAssignableFrom(type.getRawType())) {
// Generate the following code: .toDateTime(Instant.EPOCH)

// Load the parameter and cast it to ReadablePartial.
stackManipulations.add(readValue);
stackManipulations.add(TypeCasting.to(READABLE_PARTIAL_TYPE));

// Get Instant.EPOCH
stackManipulations.add(
FieldAccess.forField(
INSTANT_TYPE
.getDeclaredFields()
.filter(ElementMatchers.named("EPOCH"))
.getOnly())
.read());

// Call ReadablePartial.toDateTime
stackManipulations.add(
MethodInvocation.invoke(
READABLE_PARTIAL_TYPE
.getDeclaredMethods()
.filter(
ElementMatchers.named("toDateTime")
.and(ElementMatchers.takesArguments(READABLE_INSTANT_TYPE)))
.getOnly()));
} else {
// Otherwise, parameter is already ReadableInstant.
// Load the parameter and cast it to ReadableInstant.
stackManipulations.add(readValue);
stackManipulations.add(TypeCasting.to(READABLE_INSTANT_TYPE));
}

return new StackManipulation.Compound(
// Create a new instance of the target type.
TypeCreation.of(INSTANT_TYPE),
Duplication.SINGLE,
readValue,
TypeCasting.to(READABLE_INSTANT_TYPE),
// Call ReadableInstant.getMillis to extract the millis since the epoch.
// Call ReadableInstant.getMillis to extract the millis since the epoch.
stackManipulations.add(
MethodInvocation.invoke(
READABLE_INSTANT_TYPE
.getDeclaredMethods()
.filter(ElementMatchers.named("getMillis"))
.getOnly()),
// Construct a DateTime object containing the millis.
.getOnly()));

// Construct a Instant object containing the millis.
stackManipulations.add(
MethodInvocation.invoke(
INSTANT_TYPE
.getDeclaredMethods()
.filter(
ElementMatchers.isConstructor()
.and(ElementMatchers.takesArguments(ForLoadedType.of(long.class))))
.getOnly()));

return new StackManipulation.Compound(stackManipulations);
}

@Override
Expand Down Expand Up @@ -402,7 +463,7 @@ protected StackManipulation convertCharSequence(TypeDescriptor<?> type) {
MethodInvocation.invoke(
CHAR_SEQUENCE_TYPE
.getDeclaredMethods()
.filter(ElementMatchers.named("toString"))
.filter(ElementMatchers.named("toString").and(ElementMatchers.takesArguments(0)))
.getOnly()));
}

Expand All @@ -416,6 +477,17 @@ protected StackManipulation convertPrimitive(TypeDescriptor<?> type) {
loadedType.asGenericType(), loadedType.asBoxed().asGenericType(), Typing.STATIC));
}

@Override
protected StackManipulation convertEnum(TypeDescriptor<?> type) {
return new Compound(
readValue,
MethodInvocation.invoke(
OBJECT_TYPE
.getDeclaredMethods()
.filter(ElementMatchers.named("toString").and(ElementMatchers.takesArguments(0)))
.getOnly()));
}

@Override
protected StackManipulation convertDefault(TypeDescriptor<?> type) {
return readValue;
Expand Down Expand Up @@ -518,8 +590,8 @@ protected StackManipulation convertDateTime(TypeDescriptor<?> type) {
.getDeclaredMethods()
.filter(ElementMatchers.named("getMillis"))
.getOnly()),
// All subclasses of ReadableInstant contain a ()(long) constructor that takes in a millis
// argument. Call that constructor of the field to initialize it.
// All subclasses of ReadableInstant and ReadablePartial contain a ()(long) constructor
// that takes in a millis argument. Call that constructor of the field to initialize it.
MethodInvocation.invoke(
loadedType
.getDeclaredMethods()
Expand Down Expand Up @@ -551,13 +623,14 @@ protected StackManipulation convertByteBuffer(TypeDescriptor<?> type) {
@Override
protected StackManipulation convertGenericFixed(TypeDescriptor<?> type) {
// Generate the following code:
// return T((byte[]) value);
// return new T((byte[]) value);

// TODO: Refactor AVRO-specific code out of this class.
ForLoadedType loadedType = new ForLoadedType(type.getRawType());
return new Compound(
TypeCreation.of(loadedType),
Duplication.SINGLE,
// Load the parameter and cast it to a byte[].
readValue,
TypeCasting.to(BYTE_ARRAY_TYPE),
// Create a new instance that wraps this byte[].
Expand Down Expand Up @@ -609,6 +682,23 @@ protected StackManipulation convertPrimitive(TypeDescriptor<?> type) {
Typing.STATIC));
}

@Override
protected StackManipulation convertEnum(TypeDescriptor<?> type) {
ForLoadedType loadedType = new ForLoadedType(type.getRawType());

return new Compound(
readValue,
MethodInvocation.invoke(
loadedType
.getDeclaredMethods()
.filter(
ElementMatchers.named("valueOf")
.and(
ElementMatchers.isStatic()
.and(ElementMatchers.takesArguments(String.class))))
.getOnly()));
}

@Override
protected StackManipulation convertDefault(TypeDescriptor<?> type) {
return readValue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,9 @@
{ "name": "string", "type": ["string", "null"]},
{ "name": "bytes", "type": ["bytes", "null"]},
{ "name": "fixed", "type": {"type": "fixed", "size": 4, "name": "fixed4"} },
{ "name": "timestampMillis", "type":
[ {"type": "long", "logicalType": "timestamp-millis"}, "null"]},
{ "name": "date", "type": {"type": "int", "logicalType": "date"} },
{ "name": "timestampMillis", "type": {"type": "long", "logicalType": "timestamp-millis"} },
{ "name": "testEnum", "type": {"name": "TestEnum", "type": "enum", "symbols": ["abc","cde"] } },
{ "name": "row", "type": ["null", {
"type": "record",
"name": "TestAvroNested",
Expand Down
Loading

0 comments on commit e4b268e

Please sign in to comment.