Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: reject mismatched decimals from avro topics #7544

Merged
merged 2 commits into from
May 19, 2021

Conversation

jzaralim
Copy link
Contributor

Description

Fixes #7170

If you run the following statements

CREATE STREAM A (dec DECIMAL(4,2)) WITH (topic='topic', value_format='AVRO');
CREATE STREAM B (dec DECIMAL(6,3)) WITH (topic='topic', value_format='AVRO');
CREATE STREAM C AS SELECT * FROM A;
INSERT INTO B VALUES (123.456);

You would end up with a org.apache.kafka.common.errors.SerializationException in the logs, instead of rejecting the value to begin with.

And querying A would return the mismatched decimal

SELECT * FROM A EMIT CHANGES;
+---------+
|DEC      |
+---------+
|123.456  |

This PR changes the behaviour so that if there is a mismatching decimal, it first tries to convert the topic decimal to the ksql schema, and if that is impossible, then it skips that value and the logs a org.apache.kafka.common.errors.SerializationException in the streams log.

Testing done

Manually tested, added unit test

Reviewer checklist

  • Ensure docs are updated if necessary. (eg. if a user visible feature is being added or changed).
  • Ensure relevant issues are linked (description should include text like "Fixes #")

@jzaralim jzaralim requested a review from a team as a code owner May 18, 2021 00:23
@abbccdda abbccdda self-requested a review May 18, 2021 16:51
}

@Test
public void shouldForceUnmatchingDecimalSchemaIfPossible() {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe a dumb question, could we support a conversion from 1234.5 to a 5 precision and 0 scale decimal number? Are there any other invalid cases we could test out?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, we don't support rounding for conversions now. Added a test for that case.

@@ -117,7 +119,9 @@ private static Object replaceSchema(final Schema schema, final Object object) {

case STRUCT:
return convertStruct((Struct) object, schema);

case BYTES:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How did we reach the assumption that given type of BYTES would require a decimal conversion?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Decimals in Avro are represented as bytes. This function is called earlier on to reject any BYTES that don't represent decimals. Right now, we don't have a BYTES type in ksqlDB so all incoming bytes must be decimals, but this logic will change once we implement it.

Copy link

@abbccdda abbccdda left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks for the PR! @jzaralim

@jzaralim jzaralim merged commit 85ba0f1 into confluentinc:master May 19, 2021
@jzaralim jzaralim deleted the avro-dec branch May 19, 2021 01:50
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

avro serdes don't correctly handle precision/scale mismatch between ksql and registered schema
2 participants