-
Notifications
You must be signed in to change notification settings - Fork 13.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-19609][table-planner-blink] Support streaming window join in planner #15195
Conversation
Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community Automated ChecksLast check on commit 8f844ec (Sun Mar 14 08:40:20 UTC 2021) Warnings:
Mention the bot in a comment to re-run the automated checks. Review Progress
Please see the Pull Request Review Guide for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commandsThe @flinkbot bot supports the following commands:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the great work @beyond1920 . I left some comments.
val fmq = FlinkRelMetadataQuery.reuseOrCreate(mq) | ||
val leftWindowProperties = fmq.getRelWindowProperties(rel.getLeft) | ||
val rightWindowProperties = fmq.getRelWindowProperties(rel.getRight) | ||
assert(leftWindowProperties.getWindowSpec == rightWindowProperties.getWindowSpec) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to use assert
here? I think the query is still valid, we just can't infer the window properties. I think we can return null in this case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The case will not happen because the node will not be translated to WindowJoin if WindowSpec
is different.
@@ -266,6 +266,12 @@ class FlinkChangelogModeInferenceProgram extends FlinkOptimizeProgram[StreamOpti | |||
} | |||
createNewNode(join, children, providedTrait, requiredTrait, requester) | |||
|
|||
case windowJoin: StreamPhysicalWindowJoin => | |||
// window join support all changes in input | |||
val children = visitChildren(rel, ModifyKindSetTrait.ALL_CHANGES) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would suggest to use INSERT_ONLY
here (and we don't add any tests for this). We can
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK
val leftWindowing: WindowingStrategy, | ||
val rightWindowing: WindowingStrategy) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems that we do not allow different window strategy for left and right input. I think we can simplify the design to only have a single window strategy. We can evolve it in the future.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left windowStrategy and Right windowStrategy only have same timeAttribute and WindowSpec. But window start index and window end index maybe different.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see. You are right. Would be better to check the two window strategies in the physical node by overriding isValid
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
def containsWindowStartEqualityOrEndEquality(join: FlinkLogicalJoin): Boolean = { | ||
val (windowStartEqualityLeftKeys, windowEndEqualityLeftKeys, _, _) = | ||
excludeWindowStartEqualityAndEndEqualityFromJoinInfoPairs(join) | ||
windowStartEqualityLeftKeys.nonEmpty || windowEndEqualityLeftKeys.nonEmpty |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it would be better to use &&
, because we don't allow start OR end for now in excludeWindowStartEqualityAndEndEqualityFromJoinInfoPairs
.
val rightFieldType = joinFieldsType.get(rightIndex).getType | ||
val rightInputRef = new RexInputRef(rightIndex, rightFieldType) | ||
val remainEqual = rexBuilder.makeCall( | ||
SqlStdOperatorTable.EQUALS, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why the remainCondition
should contain the join key equality?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remainCondition contains two part:
- remain equal condition exclude window start equality and window end equality
- non-equal condition
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not put the join key equality (exclude window start & end) into join key paires? I mean construct a new JoinInfo
instead of a RexNode
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@wuchong Thanks, the solution you mentioned works well of course. However I prefer let StreamPhysicalWindowJoin
extends CommonPhysicalJoin
, so a complete condition which contains equal condition and non-equal condition is required, JoinInfo would be inferred in the Join
node. What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the explanation. Sounds good to me.
<Resource name="optimized rel plan"> | ||
<![CDATA[ | ||
Calc(select=[a, b, c, a0, b0, c0]) | ||
+- WindowJoin(leftWindow=[CUMULATE(win_start=[window_start], win_end=[window_end], max_size=[1 h], step=[10 min])], rightWindow=[CUMULATE(win_start=[window_start], win_end=[window_end], max_size=[1 h], step=[10 min])], joinType=[InnerJoin], where=[=(a, a0)], select=[a, b, c, window_start, window_end, a0, b0, c0, window_start0, window_end0]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note: when implementing window join operator, we can't support proctime window join for now, becuase the window assigner is in an separate operator, that means we don't know when to trigger the processing-time window (we don't have something like watermark for proctime). We need to merge window assigner into window join to support proctime window join, but this can be a future work.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point! Do we need to throw UnsupportedException in planner, Or just when translate to ExecNode?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
throwing a readable exception sounds good to me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Throwing in planner or when translating to exec node are both fine. But this can be done in the next PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok. If I understand well, the problem also exists in WindowJoin
, WindowAggregate
, WindowJoin
when input window strategy is WindowAttachedWindowingStrategy
, and TimeAttribute
is proc-time at the same time. Right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes.
<Resource name="optimized rel plan"> | ||
<![CDATA[ | ||
Calc(select=[window_start, window_end, a, cnt AS l_cnt, uv AS l_uv, cnt0 AS r_cnt, uv0 AS r_uv, rownum]) | ||
+- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=3], partitionBy=[window_start, window_end], orderBy=[cnt DESC], select=[a, window_start, window_end, window_time, cnt, uv, a0, window_start0, window_end0, window_time0, cnt0, uv0, rownum]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does it mean window properties are not propagated? Because here is Rank
instead of WindowRank
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch, I missed windowProperties inference for FlinkLogicalJoin
in FlinkRelMdWindowProperties
.
// ---------------------------------------------------------------------------------------- | ||
|
||
/** Window type in left and right child should be same **/ | ||
@Test(expected = classOf[TableException]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use thrown.expect(...)
and thrown.expectMessage(..)
to assert exception and error message.
@wuchong Thanks for review. I've address all your comments. |
162ca63
to
0a79f7d
Compare
* @return True if join condition contains window starts equality of input tables and window | ||
* ends equality of input tables. Else false. | ||
*/ | ||
def containsWindowStartEqualityOrEndEquality(join: FlinkLogicalJoin): Boolean = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
def containsWindowStartEqualityOrEndEquality(join: FlinkLogicalJoin): Boolean = { | |
def containsWindowStartEqualityAndEndEquality(join: FlinkLogicalJoin): Boolean = { |
|Currently, the windowing TVFs must be the same of left and right inputs. | ||
|In the future, we could support different window TVFs, for example, tumbling windows | ||
| join sliding windows with the same window size. | ||
|""".stripMargin) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should make it clear what's the current operator, and what's the difference.
Currently, window join doesn't support different window table function of left and right inputs. The left window table function is %s, the right window table function is %s.
|Currently, time attribute type of left and right inputs should be both row-time or | ||
|both proc-time. In the future, we could support different time attribute type. | ||
|""".stripMargin) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently, window join doesn't support different time attribute type of left and right inputs. The left time attribute type is %s, the right time attribute type is %s.
|Currently, window starts equality and window ends equality are both required for | ||
|window join. In the future, we could support join clause which only includes window | ||
|starts equality or window ends equality for TUMBLE or HOP window. | ||
|""".stripMargin) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently, window join requires JOIN ON condition must contain both window starts equality of input tables and window ends equality of input tables. But the current JOIN ON condition is %s.
@wuchong Thanks, I've updated based on your comment. |
|starts equality or window ends equality for TUMBLE or HOP window. | ||
|""".stripMargin) | ||
"Currently, window join requires JOIN ON condition must contain both window starts " + | ||
"equality of input tables and window ends equality of input tables.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please also verify the full exception message.
|starts equality or window ends equality for TUMBLE or HOP window. | ||
|""".stripMargin) | ||
"Currently, window join requires JOIN ON condition must contain both window starts " + | ||
"equality of input tables and window ends equality of input tables.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please also verify the full exception message.
|""".stripMargin) | ||
"Currently, window join doesn't support different window table function of left and " + | ||
"right inputs.\n" + | ||
s"The left window table function is ${leftWindowProperties}.\n" + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We just need to print the windowSpec instead of the total leftWindowProperties
. Besides, we need to implement toString
method for WindowSpecs.
|""".stripMargin) | ||
"Currently, window join requires JOIN ON condition must contain both window starts " + | ||
"equality of input tables and window ends equality of input tables.\n" + | ||
s"But the current JOIN ON condition is ${join.getCondition}.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Currently, it prints:
But the current JOIN ON condition is AND(=($2, $7), =($0, $5)).
I think it unreadable and not helpful for users. Would be better to display field names.
@wuchong Thanks a lot, I've updated based on your comment. |
if (windowStartEqualityLeftKeys.nonEmpty && windowEndEqualityLeftKeys.nonEmpty) { | ||
if ( | ||
leftWindowProperties.getTimeAttributeType != rightWindowProperties.getTimeAttributeType) { | ||
|
||
def timeAttributeTypeStr(isRowTime: Boolean): String = { | ||
if (isRowTime) "ROWTIME" else "PROCTIME" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can just print the logical type, because we will support TIMESTAMP_LTZ as time attribute soon, so the logical type maybe different.
windowSpec: WindowSpec): String = { | ||
val windowing = s"win_start=[${inputFieldNames(windowStartIdx)}]" + | ||
s", win_end=[${inputFieldNames(windowEndIdx)}]" | ||
windowSpec.toSummaryString(windowing) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
WindowSpec doesn't contain window_start and window_end column information, so I think we don't need to print them.
@@ -182,26 +183,65 @@ object WindowJoinUtil { | |||
} | |||
|
|||
// Validate join | |||
def getLeftFieldNames() = join.getLeft.getRowType.getFieldNames.toList | |||
|
|||
def getRightFieldNames() = join.getRight.getRowType.getFieldNames.toList |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A local variable is enough?
I appended a commit to improve the exception message. And also rebased the branch. If there is no objections, I will merge it once the build is passed. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM.
@wuchong Thanks a lot. The update is good. |
What is the purpose of the change
This pull request aims to support streaming WindowJoin in planner.
Brief change log
StreamPhysicalWindowJoin
FlinkChangelogModeInterenceProgram
andFlinkRelMdWindowProperties
to takeStreamPhysicalWindowJoin
into consideration.StreamPhysicalWindowJoinRule
to convertFlinkLogicalJoin
toStreamPhysicalWindowJoin
if join condition contains window starts equality of input tables and window ends equality of input tables.Verifying this change
WindowJoinTest
.Does this pull request potentially affect one of the following parts:
@Public(Evolving)
: (no)Documentation