From 0323e865f5a6c8314681f7f3a21df613072ff217 Mon Sep 17 00:00:00 2001 From: Jim Galasyn Date: Tue, 14 Jul 2020 13:23:26 -0700 Subject: [PATCH] docs: doc timestamps for aggregations and joins (DOCS-4854) (#5824) --- .../time-and-windows-in-ksqldb-queries.md | 70 ++++++++++++------- 1 file changed, 45 insertions(+), 25 deletions(-) diff --git a/docs/concepts/time-and-windows-in-ksqldb-queries.md b/docs/concepts/time-and-windows-in-ksqldb-queries.md index 1fadccec9d8a..9bcde160e9c5 100644 --- a/docs/concepts/time-and-windows-in-ksqldb-queries.md +++ b/docs/concepts/time-and-windows-in-ksqldb-queries.md @@ -20,7 +20,7 @@ may be out-of-order within the stream. Timestamps are used by time-dependent operations, like aggregations and joins. -Time Semantics +Time semantics -------------- Timestamps have different meanings, depending on the implementation. A @@ -70,10 +70,14 @@ processing-time in the analytics application might be many minutes or hours after the event-time, as cars can move out of mobile reception for periods of time and have to buffer records locally. -!!! note +### Stream-time + +The maximum timestamp seen over all processed records so far. + +!!! important Don't mix streams or tables that have different time semantics. -### Timestamp Assignment +### Timestamp assignment A record's timestamp is set either by the record's producer or by the {{ site.ak }} broker, depending on the topic's timestamp configuration. The @@ -113,22 +117,38 @@ at least understood and traced — throughout your streaming data pipelines. It helps to agree on specifying time information in UTC or in Unix time, like seconds since the Unix epoch, everywhere in your system. -### Timestamps of ksqlDB Output Streams +### Timestamps of ksqlDB output streams + +When a ksqlDB application writes new records to {{ site.ak }}, timestamps +are assigned to the records it creates. ksqlDB uses the underlying +{{ site.kstreams }} implementation for computing timestamps. Timestamps are +assigned based on context: + +- When new output records are generated by processing an input record + directly, output record timestamps are inherited from input record + timestamps. +- When new output records are generated by a periodic function, the + output record timestamp is defined as the current internal time of + the stream task. +- For stateless operations, the input record timestamp is passed through. + For `flatMap` and siblings that emit multiple records, all output records + inherit the timestamp from the corresponding input record. + +### Timestamps for aggregations and joins -When a ksqlDB application writes new records to {{ site.ak }}, it assigns -timestamps to the records it creates. Timestamps are assigned based on -context: +For aggregations and joins, timestamps are computed by using the following +rules. -- When new output records are generated by processing an input record - directly, output record timestamps are inherited from input record - timestamps. -- When new output records are generated by a periodic function, the - output record timestamp is defined as the current internal time of - the stream task. -- For aggregations, the timestamp of the resulting update record is - taken from the latest input record that triggered the update. +* For joins (stream-stream, table-table) that have left and right input + records, the timestamp of the output record is assigned `max(left.ts, right.ts)`. +* For stream-table joins, the output record is assigned the timestamp from the + stream record. +* For aggregations, the timestamp of the resulting update record is + taken from the latest input record that triggered the update. +* For aggregations, the `max` timestamp is computed over all + records, per key, either globally (for non-windowed) or per-window. -### Producers and Timestamps +### Producers and timestamps A producer application can set the timestamp on its records to any value, but usually, it choses a sensible event-time or the current @@ -148,7 +168,7 @@ producer: In all three cases, the time semantics are considered to be event-time. -### Timestamp Extractors +### Timestamp extractors When ksqlDB imports a topic to create a stream, it gets the timestamp from the topic's messages by using a *timestamp extractor* class. Timestamp @@ -168,7 +188,7 @@ different notions or semantics of time, depending on the requirements of your business logic. For more information see [default.timestamp.extractor](https://docs.confluent.io/current/streams/developer-guide/config-streams.html#default-timestamp-extractor). -Windows in SQL Queries +Windows in SQL queries ---------------------- Representing time consistently enables aggregation operations on streams @@ -203,7 +223,7 @@ that arrive out-of-order are handled as you might expect: although the window end time has passed, the out-of-order records are still associated with the correct window. -### Window Types +### Window types There are three ways to define time windows in ksqlDB: hopping windows, tumbling windows, and session windows. Hopping and tumbling windows are @@ -219,7 +239,7 @@ and defined by periods of activity separated by gaps of inactivity. ![Diagram showing three types of time windows in ksqlDB streams: tumbling, hopping, and session](../img/ksql-window-aggregation.png) -#### Hopping Window +#### Hopping window Hopping windows are based on time intervals. They model fixed-sized, possibly overlapping windows. A hopping window is defined by two @@ -251,7 +271,7 @@ The hopping window's start time is inclusive, but the end time is exclusive. This is important for non-overlapping windows, in which each record must be contained in exactly one window. -#### Tumbling Window +#### Tumbling window Tumbling windows are a special case of hopping windows. Like hopping windows, tumbling windows are based on time intervals. They model @@ -292,7 +312,7 @@ The tumbling window's start time is inclusive, but the end time is exclusive. This is important for non-overlapping windows, in which each record must be contained in exactly one window. -#### Session Window +#### Session window A session window aggregates records into a session, which represents a period of activity separated by a specified gap of inactivity, or @@ -346,7 +366,7 @@ earliest/oldest record's ROWTIME timestamp is identical to the window's start time, and the latest/newest record's ROWTIME timestamp is identical to the window's end time. -### Windowed Joins +### Windowed joins ksqlDB supports using windows in JOIN queries by using the WITHIN clause. @@ -365,7 +385,7 @@ SELECT o.order_id, o.total_amount, o.customer_name, s.shipment_id, s.warehouse For more information on joins, see [Join Event Streams with ksqlDB](../developer-guide/joins/join-streams-and-tables.md). -### Late Arriving Events +### Late arriving events Frequently, events that belong to a window can arrive late, for example, over slow networks, and a grace period may be required to ensure the events are accepted into the window. @@ -383,7 +403,7 @@ SELECT orderzip_code, TOPK(order_total, 5) FROM orders Events that arrive later than the grace period are dropped and not included in the aggregate result. -### Window Retention +### Window retention For each window type, you can configure the number of windows in the past that ksqlDB retains. This capability is very useful for interactive applications that use ksqlDB as their primary