-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: reject mismatched decimals from avro topics #7544
Conversation
ksqldb-serde/src/main/java/io/confluent/ksql/serde/avro/AvroDataTranslator.java
Outdated
Show resolved
Hide resolved
ksqldb-serde/src/test/java/io/confluent/ksql/serde/avro/AvroDataTranslatorTest.java
Outdated
Show resolved
Hide resolved
} | ||
|
||
@Test | ||
public void shouldForceUnmatchingDecimalSchemaIfPossible() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe a dumb question, could we support a conversion from 1234.5 to a 5 precision and 0 scale decimal number? Are there any other invalid cases we could test out?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, we don't support rounding for conversions now. Added a test for that case.
@@ -117,7 +119,9 @@ private static Object replaceSchema(final Schema schema, final Object object) { | |||
|
|||
case STRUCT: | |||
return convertStruct((Struct) object, schema); | |||
|
|||
case BYTES: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How did we reach the assumption that given type of BYTES
would require a decimal conversion?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Decimals in Avro are represented as bytes. This function is called earlier on to reject any BYTES
that don't represent decimals. Right now, we don't have a BYTES
type in ksqlDB so all incoming bytes must be decimals, but this logic will change once we implement it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thanks for the PR! @jzaralim
Description
Fixes #7170
If you run the following statements
You would end up with a
org.apache.kafka.common.errors.SerializationException
in the logs, instead of rejecting the value to begin with.And querying A would return the mismatched decimal
This PR changes the behaviour so that if there is a mismatching decimal, it first tries to convert the topic decimal to the ksql schema, and if that is impossible, then it skips that value and the logs a
org.apache.kafka.common.errors.SerializationException
in the streams log.Testing done
Manually tested, added unit test
Reviewer checklist