-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
docs(klip-50): partition and offset in ksqlDB #7505
Conversation
|
||
### Query Semantics | ||
|
||
The behavior of `ROWPARTITION` and `ROWOFFSET` in persistent queries and push queries |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From the section below, seems you were proposing that we do not add these two to the push queries, right? If yes maybe better to remove and push queries
here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Which section are you referring to? My intention is to make the new pseudocolumns available in push queries. If I've made a typo elsewhere I'll have to amend that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My bad, I misread it as pull queries.
|
||
The first reason is that selecting `ROWPARTITION` or `ROWOFFSET` for a table | ||
is semantically dubious, especially for tables that are the result of other | ||
computation such as an aggregation over a stream. Should `ROWPARTITION` and `ROWOFFSET` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This argument may apply to persistent CSAS queries as well: should the partition/offset come from the resulted stream, or from the source stream record that generated the result record. Personally I feel just stick to say "it is always the resulted stream/table" is fine, but that also means we do not provide any way still for users to check the partition/offset of the source topics.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just commented the same thing, but I felt that it should come from the source not the result topic partition/offset 😆 - cc @colinhicks for some product insight 🔮
EDIT: otherwise you won't be able to do things like meaningfully filter based on partition/offset
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm +1 to @agavra -- I'll clarify in the KLIP that the intention is for ROWPARTITION
and ROWOFFSET
to always represent the source partition and offset.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I also agree they should be relative to the source.
intermediate topics (and state stores) and break running queries as a result. | ||
|
||
In light of the above, we have two approaches for implementation: | ||
1. Update ksqlDB to never write pseudocolumns into repartition and changelog topics |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought the ultimate solution would be #6374. More specifically, since the partition/offset are only for the sink topics, so conceptually intermediate topics would never need to encode the partition/offset at all assuming we have clear separation between physical and logical schemas, no?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
since the partition/offset are only for the sink topics
As discussed above, the intention was for ROWPARTITION
and ROWOFFSET
to represent the source partition and offset, rather than the sink partition and offset. Let's resolve that discussion first, and return to this if it's still relevant.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm got it, then we'd need to make sure the source topic's metadata is always preserved throughout the topology when necessary indeed; in addition, if the physical plan contains punctuations (which would have null partition/offset) in the future, we should have some sentinel values for that case.
Another side note: as we move with vector clocks throughout the topology for consistency guarantees, the source topic partition / offset may well be preserved in the future anyways.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if the physical plan contains punctuations (which would have null partition/offset) in the future, we should have some sentinel values for that case.
How do punctuations affect our ability to carry forward the source partition and offset throughout the topology? The way ksqlDB carries these metadata forward today is by copying them into the value as the first processor after the source. Will that no longer work with punctuation?
as we move with vector clocks throughout the topology for consistency guarantees, the source topic partition / offset may well be preserved in the future anyways.
Is still in the context of punctuations, or with regards to something else?
( I'm going to merge this PR for now but would love to continue this discussion as it seems I stand to learn :) )
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When punctuation is triggered, we may send records to downstream which do not have partition/offset/etc metadata since they are not triggered by data, but by time. Which means, we may need to be able to set some sentinel values in the corresponding columns for records generated by punctuations.
Yes vector clocks are for the context of punctuations, they can cover both data-driven and time-driven (a.k.a. punctuation) events, but are flushed across chains of query topologies with the source topic metadata (partition/offset) in it. E.g. say we have a chain of queries:
source -> query1 -> topic1 -> query2 -> topic2 -> query3 -> sink
Then the metadata would always be representing the source
topic, flowing through topic1/2 and sink. For this KLIP we may need the metadata to be representing the source
, topic1
, and topic2
respectively for query1/2/3, so for now we can just use the context metadata to achieve this. I'm just thinking time-driven results maybe we can do piggyback on the same mechanism (a.k.a. control messages) to do that in the longer term.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ksqlDB and KS DSL don't use punctuations -- so we should not worry about it? -- I also think that vector clocks won't help, as we need the partition/offset information on a per record basis while vector-clock data would only be written periodically.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Excellent KLIP - thanks @vcrfxia :)
|
||
This will allow users to more easily inspect data and debug ksqlDB queries: | ||
``` | ||
SELECT *, ROWPARTITION, ROWOFFSET FROM my_stream EMIT CHANGES; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: thoughts on ROW_PARTITION
or just PARTITION
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't feel strongly. I went with ROWPARTITION
and ROWOFFSET
to follow the pattern of ROWTIME
, which I presume was inspired by pseudocolumn names such as ROWID
and ROWNUM
from traditional DBs, but if ROW_PARTITION
and ROW_OFFSET
are more readable I'm fine with that too. I worry that shorter names such as simply PARTITION
and OFFSET
will feel too magical and have greater chance of clashing with a user's own column names, but I don't feel strongly.
(Existing persistent queries that already use these names will continue to run undisrupted. | ||
See ["Compatibility Implications"](#compatibility-implications) for more.) | ||
|
||
Consistent with `ROWTIME`, the new `ROWPARTITION` and `ROWOFFSET` pseudocolumns will |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we discuss INSERT INTO
and INSERT VALUES
here as well? (for the former, what happens if you INSERT INTO
into a stream that selected partition/offset (I don't think there's a problem, you might just produce garbage data) and for the latter it's probably OK to prohibit inserting those fields, unlike ROWTIME
)
EDIT: I see you discuss this below for INSERT VALUES
:)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 to what you wrote. INSERT INTO
compares sink schemas which would have to have aliases other than ROWPARTITION
and ROWOFFSET
even if those columns are selected, which means the schema check has no way to detect whether ROWPARTITION
or ROWOFFSET
is being piped in under an alias. I don't think it makes sense to add an additional check besides the existing sink schema check, and therefore I don't think we should bother blocking these pseudocolumns in INSERT INTO
as a result.
|
||
### Query Semantics | ||
|
||
The behavior of `ROWPARTITION` and `ROWOFFSET` in persistent queries and push queries |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we be extra clear about which partition/offset it chooses when there are repartition topics involved? (I assume it would be the source)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good call. My intention is indeed to always use the source partition and offset. Will clarify in the KLIP.
Similar to `ROWTIME`, the new `ROWPARTITION` and `ROWOFFSET` pseudocolumns will not | ||
be included in the list of columns returned as part of a source description. | ||
|
||
`ROWPARTITION` and `ROWOFFSET` will also not be added to the output of `PRINT TOPIC` commands. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
partition is already there (#6641) - it might make sense to add offset as well, even if it isn't part of the "print" schema
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fascinating, and good to know! I'll move this to be out of scope for this KLIP since it's a bit orthogonal, but I agree it could be nice to add.
2. Defer on updating ksqlDB to not write pseudocolumns into intermediate topics and | ||
instead modify the stream and table source execution steps to include something | ||
to indicate the set of pseudocolumns that should be included with the source. | ||
Specifically, we can add a "pseudocolumn version number" field into the execution |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
instead of a version number, why not just list out all the psuedocolumns to include? magic versions numbers can be tough to reason about
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I considered this but felt that a version number would allow for more evolvability in the future. For example, if we wanted to change the implementation of a pseudocolumn, that would be possible with a version number but not a list of column names. (A bit contrived but maybe we'll introduce ROWHEADERS
in the future with type ARRAY<STRUCT<key STRING, header BYTES>>
and later want to change it to a different type.)
I don't feel strongly though. Happy to keep it readable if introducing a version number feels like over-engineering.
compatibility, while new queries will have new pseudocolumns. | ||
|
||
Given that we'd like to start work on exposing partition and offset within the next month, | ||
my preference is for Option 2. The drawback is that intermediate topics for pre-join |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
If a user really wants to make partition and/or offset information available for pull | ||
queries, they can choose to explicitly include the source partition and offset as | ||
part of the query that materializes their table and query for partition/offset as | ||
value columns as part of their pull query. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should plan to illustrate this in the documentation with an example materializing query. Might also make sense to add the example query in the KLIP as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, added an example to the KLIP. Out of curiosity, what use cases do you envision for including partition/offset as part of a pull query? It seems less natural to be interested in partition and offset for table records compared to messages in a stream.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I agree it's less natural. I could see it being educationally useful for folks checking their understanding about how streams and tables relate, for example to see how the aggregate row corresponds to the source stream's partition and offset. Thanks for adding it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
partition offset for pull queries would be super useful for debugging purposes (answering "why is a certain piece of data there?")
LGTM. |
`ROWPARTITION` and `ROWOFFSET` in pull queries would require adding this information | ||
to state stores. (I think there might be a way to get the partition by other means, | ||
from the replica queried in order to serve the pull query, but offset information | ||
definitely wouldn't be available.) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I actually think this also applies to partitions. If we have two input records with the same grouping attributes, but from different input partitions (and we do a repartitioning), it's not defined which partition we should write into the result table (as we don't want to use the partition of the repartition topic).
If a user really wants to make partition and/or offset information available for pull | ||
queries, they can choose to explicitly include the source partition and offset as | ||
part of the query that materializes their table and query for partition/offset as | ||
value columns as part of their pull query: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to make sure to treat both columns as any other columns (ie, for aggregations, they can only be included if they are part of the GROUP BY
clause, or an aggregation function is applied to them).
Description
KLIP 50: proposal for exposing partition and offset in ksqlDB. Here's a rendered version for easier viewing: https://github.com/vcrfxia/ksql/blob/klip-50/design-proposals/klip-50-partition-and-offset-in-ksqldb.md
Testing done
Docs-only change.
Reviewer checklist