Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

docs(klip-50): partition and offset in ksqlDB #7505

Merged
merged 2 commits into from
May 18, 2021

Conversation

vcrfxia
Copy link
Contributor

@vcrfxia vcrfxia commented May 12, 2021

Description

KLIP 50: proposal for exposing partition and offset in ksqlDB. Here's a rendered version for easier viewing: https://github.com/vcrfxia/ksql/blob/klip-50/design-proposals/klip-50-partition-and-offset-in-ksqldb.md

Testing done

Docs-only change.

Reviewer checklist

  • Ensure docs are updated if necessary. (eg. if a user visible feature is being added or changed).
  • Ensure relevant issues are linked (description should include text like "Fixes #")

@vcrfxia vcrfxia requested a review from a team as a code owner May 12, 2021 14:33

### Query Semantics

The behavior of `ROWPARTITION` and `ROWOFFSET` in persistent queries and push queries
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From the section below, seems you were proposing that we do not add these two to the push queries, right? If yes maybe better to remove and push queries here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Which section are you referring to? My intention is to make the new pseudocolumns available in push queries. If I've made a typo elsewhere I'll have to amend that.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My bad, I misread it as pull queries.


The first reason is that selecting `ROWPARTITION` or `ROWOFFSET` for a table
is semantically dubious, especially for tables that are the result of other
computation such as an aggregation over a stream. Should `ROWPARTITION` and `ROWOFFSET`
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This argument may apply to persistent CSAS queries as well: should the partition/offset come from the resulted stream, or from the source stream record that generated the result record. Personally I feel just stick to say "it is always the resulted stream/table" is fine, but that also means we do not provide any way still for users to check the partition/offset of the source topics.

Copy link
Contributor

@agavra agavra May 12, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just commented the same thing, but I felt that it should come from the source not the result topic partition/offset 😆 - cc @colinhicks for some product insight 🔮

EDIT: otherwise you won't be able to do things like meaningfully filter based on partition/offset

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm +1 to @agavra -- I'll clarify in the KLIP that the intention is for ROWPARTITION and ROWOFFSET to always represent the source partition and offset.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also agree they should be relative to the source.

intermediate topics (and state stores) and break running queries as a result.

In light of the above, we have two approaches for implementation:
1. Update ksqlDB to never write pseudocolumns into repartition and changelog topics
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought the ultimate solution would be #6374. More specifically, since the partition/offset are only for the sink topics, so conceptually intermediate topics would never need to encode the partition/offset at all assuming we have clear separation between physical and logical schemas, no?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since the partition/offset are only for the sink topics

As discussed above, the intention was for ROWPARTITION and ROWOFFSET to represent the source partition and offset, rather than the sink partition and offset. Let's resolve that discussion first, and return to this if it's still relevant.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm got it, then we'd need to make sure the source topic's metadata is always preserved throughout the topology when necessary indeed; in addition, if the physical plan contains punctuations (which would have null partition/offset) in the future, we should have some sentinel values for that case.

Another side note: as we move with vector clocks throughout the topology for consistency guarantees, the source topic partition / offset may well be preserved in the future anyways.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if the physical plan contains punctuations (which would have null partition/offset) in the future, we should have some sentinel values for that case.

How do punctuations affect our ability to carry forward the source partition and offset throughout the topology? The way ksqlDB carries these metadata forward today is by copying them into the value as the first processor after the source. Will that no longer work with punctuation?

as we move with vector clocks throughout the topology for consistency guarantees, the source topic partition / offset may well be preserved in the future anyways.

Is still in the context of punctuations, or with regards to something else?

( I'm going to merge this PR for now but would love to continue this discussion as it seems I stand to learn :) )

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When punctuation is triggered, we may send records to downstream which do not have partition/offset/etc metadata since they are not triggered by data, but by time. Which means, we may need to be able to set some sentinel values in the corresponding columns for records generated by punctuations.

Yes vector clocks are for the context of punctuations, they can cover both data-driven and time-driven (a.k.a. punctuation) events, but are flushed across chains of query topologies with the source topic metadata (partition/offset) in it. E.g. say we have a chain of queries:

source -> query1 -> topic1 -> query2 -> topic2 -> query3 -> sink

Then the metadata would always be representing the source topic, flowing through topic1/2 and sink. For this KLIP we may need the metadata to be representing the source, topic1, and topic2 respectively for query1/2/3, so for now we can just use the context metadata to achieve this. I'm just thinking time-driven results maybe we can do piggyback on the same mechanism (a.k.a. control messages) to do that in the longer term.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ksqlDB and KS DSL don't use punctuations -- so we should not worry about it? -- I also think that vector clocks won't help, as we need the partition/offset information on a per record basis while vector-clock data would only be written periodically.

Copy link
Contributor

@agavra agavra left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Excellent KLIP - thanks @vcrfxia :)


This will allow users to more easily inspect data and debug ksqlDB queries:
```
SELECT *, ROWPARTITION, ROWOFFSET FROM my_stream EMIT CHANGES;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: thoughts on ROW_PARTITION or just PARTITION?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't feel strongly. I went with ROWPARTITION and ROWOFFSET to follow the pattern of ROWTIME, which I presume was inspired by pseudocolumn names such as ROWID and ROWNUM from traditional DBs, but if ROW_PARTITION and ROW_OFFSET are more readable I'm fine with that too. I worry that shorter names such as simply PARTITION and OFFSET will feel too magical and have greater chance of clashing with a user's own column names, but I don't feel strongly.

(Existing persistent queries that already use these names will continue to run undisrupted.
See ["Compatibility Implications"](#compatibility-implications) for more.)

Consistent with `ROWTIME`, the new `ROWPARTITION` and `ROWOFFSET` pseudocolumns will
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we discuss INSERT INTO and INSERT VALUES here as well? (for the former, what happens if you INSERT INTO into a stream that selected partition/offset (I don't think there's a problem, you might just produce garbage data) and for the latter it's probably OK to prohibit inserting those fields, unlike ROWTIME)

EDIT: I see you discuss this below for INSERT VALUES :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 to what you wrote. INSERT INTO compares sink schemas which would have to have aliases other than ROWPARTITION and ROWOFFSET even if those columns are selected, which means the schema check has no way to detect whether ROWPARTITION or ROWOFFSET is being piped in under an alias. I don't think it makes sense to add an additional check besides the existing sink schema check, and therefore I don't think we should bother blocking these pseudocolumns in INSERT INTO as a result.


### Query Semantics

The behavior of `ROWPARTITION` and `ROWOFFSET` in persistent queries and push queries
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we be extra clear about which partition/offset it chooses when there are repartition topics involved? (I assume it would be the source)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call. My intention is indeed to always use the source partition and offset. Will clarify in the KLIP.

Similar to `ROWTIME`, the new `ROWPARTITION` and `ROWOFFSET` pseudocolumns will not
be included in the list of columns returned as part of a source description.

`ROWPARTITION` and `ROWOFFSET` will also not be added to the output of `PRINT TOPIC` commands.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

partition is already there (#6641) - it might make sense to add offset as well, even if it isn't part of the "print" schema

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fascinating, and good to know! I'll move this to be out of scope for this KLIP since it's a bit orthogonal, but I agree it could be nice to add.

2. Defer on updating ksqlDB to not write pseudocolumns into intermediate topics and
instead modify the stream and table source execution steps to include something
to indicate the set of pseudocolumns that should be included with the source.
Specifically, we can add a "pseudocolumn version number" field into the execution
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instead of a version number, why not just list out all the psuedocolumns to include? magic versions numbers can be tough to reason about

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I considered this but felt that a version number would allow for more evolvability in the future. For example, if we wanted to change the implementation of a pseudocolumn, that would be possible with a version number but not a list of column names. (A bit contrived but maybe we'll introduce ROWHEADERS in the future with type ARRAY<STRUCT<key STRING, header BYTES>> and later want to change it to a different type.)

I don't feel strongly though. Happy to keep it readable if introducing a version number feels like over-engineering.

compatibility, while new queries will have new pseudocolumns.

Given that we'd like to start work on exposing partition and offset within the next month,
my preference is for Option 2. The drawback is that intermediate topics for pre-join
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Comment on lines 114 to 117
If a user really wants to make partition and/or offset information available for pull
queries, they can choose to explicitly include the source partition and offset as
part of the query that materializes their table and query for partition/offset as
value columns as part of their pull query.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should plan to illustrate this in the documentation with an example materializing query. Might also make sense to add the example query in the KLIP as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, added an example to the KLIP. Out of curiosity, what use cases do you envision for including partition/offset as part of a pull query? It seems less natural to be interested in partition and offset for table records compared to messages in a stream.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I agree it's less natural. I could see it being educationally useful for folks checking their understanding about how streams and tables relate, for example to see how the aggregate row corresponds to the source stream's partition and offset. Thanks for adding it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

partition offset for pull queries would be super useful for debugging purposes (answering "why is a certain piece of data there?")

@guozhangwang
Copy link
Contributor

LGTM.

@vcrfxia vcrfxia merged commit 08e6f12 into confluentinc:master May 18, 2021
@vcrfxia vcrfxia deleted the klip-50 branch May 18, 2021 14:48
@vcrfxia vcrfxia restored the klip-50 branch May 18, 2021 14:49
`ROWPARTITION` and `ROWOFFSET` in pull queries would require adding this information
to state stores. (I think there might be a way to get the partition by other means,
from the replica queried in order to serve the pull query, but offset information
definitely wouldn't be available.)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I actually think this also applies to partitions. If we have two input records with the same grouping attributes, but from different input partitions (and we do a repartitioning), it's not defined which partition we should write into the result table (as we don't want to use the partition of the repartition topic).

If a user really wants to make partition and/or offset information available for pull
queries, they can choose to explicitly include the source partition and offset as
part of the query that materializes their table and query for partition/offset as
value columns as part of their pull query:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to make sure to treat both columns as any other columns (ie, for aggregations, they can only be included if they are part of the GROUP BY clause, or an aggregation function is applied to them).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants