Skip to content

Commit

Permalink
Revert "[FLINK-22038][table-planner-blink] Update TopN to be without …
Browse files Browse the repository at this point in the history
…rowNumber if rowNumber field is never used by the successor Calc"

This reverts commit 362aadc.
  • Loading branch information
rmetzger committed Jun 3, 2021
1 parent 3763321 commit 214ade6
Show file tree
Hide file tree
Showing 13 changed files with 14 additions and 363 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ object FlinkBatchRuleSets {
// transpose calc past rank to reduce rank input fields
CalcRankTransposeRule.INSTANCE,
// remove output of rank number when it is a constant
ConstantRankNumberColumnRemoveRule.INSTANCE,
RankNumberColumnRemoveRule.INSTANCE,

// calc rules
CoreRules.FILTER_CALC_MERGE,
Expand Down Expand Up @@ -383,9 +383,7 @@ object FlinkBatchRuleSets {
PythonCalcSplitRule.EXPAND_PROJECT,
PythonCalcSplitRule.PUSH_CONDITION,
PythonCalcSplitRule.REWRITE_PROJECT,
PythonMapMergeRule.INSTANCE,
// remove output of rank number when it is not used by successor calc
RedundantRankNumberColumnRemoveRule.INSTANCE
PythonMapMergeRule.INSTANCE
)

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,7 @@ object FlinkStreamRuleSets {
// transpose calc past rank to reduce rank input fields
CalcRankTransposeRule.INSTANCE,
// remove output of rank number when it is a constant
ConstantRankNumberColumnRemoveRule.INSTANCE,
RankNumberColumnRemoveRule.INSTANCE,
// split distinct aggregate to reduce data skew
SplitAggregateRule.INSTANCE,
// transpose calc past snapshot
Expand All @@ -374,8 +374,6 @@ object FlinkStreamRuleSets {
PythonCorrelateSplitRule.INSTANCE,
// merge calc after calc transpose
FlinkCalcMergeRule.INSTANCE,
// remove output of rank number when it is not used by successor calc
RedundantRankNumberColumnRemoveRule.INSTANCE,
// remove the trivial calc that is produced by PushWatermarkIntoTableSourceScanAcrossCalcRule.
// because [[PushWatermarkIntoTableSourceScanAcrossCalcRule]] will push the rowtime computed
// column into the source. After FlinkCalcMergeRule applies, it may produces a trivial calc.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@ import java.math.{BigDecimal => JBigDecimal}
* Planner rule that removes the output column of rank number
* iff there is a equality condition for the rank column.
*/
class ConstantRankNumberColumnRemoveRule
class RankNumberColumnRemoveRule
extends RelOptRule(
operand(classOf[FlinkLogicalRank], any()),
"ConstantRankNumberColumnRemoveRule") {
"RankFunctionColumnRemoveRule") {

override def matches(call: RelOptRuleCall): Boolean = {
val rank: FlinkLogicalRank = call.rel(0)
Expand Down Expand Up @@ -79,6 +79,6 @@ class ConstantRankNumberColumnRemoveRule
}
}

object ConstantRankNumberColumnRemoveRule {
val INSTANCE = new ConstantRankNumberColumnRemoveRule
object RankNumberColumnRemoveRule {
val INSTANCE = new RankNumberColumnRemoveRule
}
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ LogicalProject(a=[$0], b=[$1], rk=[$2])
<Resource name="optimized exec plan">
<![CDATA[
Calc(select=[a, b, 2:BIGINT AS $2])
+- Rank(rankType=[RANK], rankRange=[rankStart=2, rankEnd=2], partitionBy=[b], orderBy=[a ASC, c ASC], global=[true], select=[a, b, c])
+- Rank(rankType=[RANK], rankRange=[rankStart=2, rankEnd=2], partitionBy=[b], orderBy=[a ASC, c ASC], global=[true], select=[a, b, c, w0$o0])
+- Sort(orderBy=[b ASC, a ASC, c ASC])
+- Exchange(distribution=[hash[b]])
+- Rank(rankType=[RANK], rankRange=[rankStart=1, rankEnd=2], partitionBy=[b], orderBy=[a ASC, c ASC], global=[false], select=[a, b, c])
Expand Down Expand Up @@ -280,80 +280,6 @@ Calc(select=[CAST(rna) AS rn1, CAST(w0$o0) AS rn2], where=[(w0$o0 <= 200)])
+- Sort(orderBy=[a ASC, c ASC, b DESC])
+- Exchange(distribution=[hash[a, c]])
+- LegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
</Resource>
</TestCase>
<TestCase name="testRedundantRankNumberColumnRemove">
<Resource name="sql">
<![CDATA[
SELECT
CONCAT('http://txmov2.a.yximgs.com', uri) AS url,
reqcount AS download_count,
start_time AS `timestamp`
FROM
(
SELECT
uri,
reqcount,
rownum_2,
start_time
FROM
(
SELECT
uri,
reqcount,
start_time,
RANK() OVER (
PARTITION BY start_time
ORDER BY
reqcount DESC
) AS rownum_2
FROM
(
SELECT
uri,
reqcount,
start_time,
RANK() OVER (
PARTITION BY start_time, bucket_id
ORDER BY
reqcount DESC
) AS rownum_1
FROM MyTable1
)
WHERE
rownum_1 <= 100000
)
WHERE
rownum_2 <= 100000
)
]]>
</Resource>
<Resource name="ast">
<![CDATA[
LogicalProject(url=[CONCAT(_UTF-16LE'http://txmov2.a.yximgs.com', $0)], download_count=[$1], timestamp=[$2])
+- LogicalFilter(condition=[<=($3, 100000)])
+- LogicalProject(uri=[$0], reqcount=[$1], start_time=[$2], rownum_2=[RANK() OVER (PARTITION BY $2 ORDER BY $1 DESC NULLS LAST)])
+- LogicalFilter(condition=[<=($3, 100000)])
+- LogicalProject(uri=[$0], reqcount=[$1], start_time=[$2], rownum_1=[RANK() OVER (PARTITION BY $2, $3 ORDER BY $1 DESC NULLS LAST)])
+- LogicalTableScan(table=[[default_catalog, default_database, MyTable1]])
]]>
</Resource>
<Resource name="optimized exec plan">
<![CDATA[
Calc(select=[CONCAT(_UTF-16LE'http://txmov2.a.yximgs.com', uri) AS url, reqcount AS download_count, start_time AS timestamp])
+- Rank(rankType=[RANK], rankRange=[rankStart=1, rankEnd=100000], partitionBy=[start_time], orderBy=[reqcount DESC], global=[true], select=[uri, reqcount, start_time])
+- Sort(orderBy=[start_time ASC, reqcount DESC])
+- Exchange(distribution=[hash[start_time]])
+- Rank(rankType=[RANK], rankRange=[rankStart=1, rankEnd=100000], partitionBy=[start_time], orderBy=[reqcount DESC], global=[false], select=[uri, reqcount, start_time])
+- Sort(orderBy=[start_time ASC, reqcount DESC])
+- Calc(select=[uri, reqcount, start_time])
+- Rank(rankType=[RANK], rankRange=[rankStart=1, rankEnd=100000], partitionBy=[start_time, bucket_id], orderBy=[reqcount DESC], global=[true], select=[uri, reqcount, start_time, bucket_id])
+- Sort(orderBy=[start_time ASC, bucket_id ASC, reqcount DESC])
+- Exchange(distribution=[hash[start_time, bucket_id]])
+- Rank(rankType=[RANK], rankRange=[rankStart=1, rankEnd=100000], partitionBy=[start_time, bucket_id], orderBy=[reqcount DESC], global=[false], select=[uri, reqcount, start_time, bucket_id])
+- Sort(orderBy=[start_time ASC, bucket_id ASC, reqcount DESC])
+- BoundedStreamScan(table=[[default_catalog, default_database, MyTable1]], fields=[uri, reqcount, start_time, bucket_id])
]]>
</Resource>
</TestCase>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -722,7 +722,7 @@ LogicalAggregate(group=[{0}], EXPR$1=[SUM($1)])
<![CDATA[
HashAggregate(isMerge=[false], groupBy=[a], select=[a, SUM(b) AS EXPR$1])
+- Calc(select=[a, b])
+- Rank(rankType=[RANK], rankRange=[rankStart=1, rankEnd=10], partitionBy=[a, c], orderBy=[b ASC], global=[true], select=[a, b, c])
+- Rank(rankType=[RANK], rankRange=[rankStart=1, rankEnd=10], partitionBy=[a, c], orderBy=[b ASC], global=[true], select=[a, b, c, w0$o0])
+- Sort(orderBy=[a ASC, c ASC, b ASC])
+- Exchange(distribution=[hash[a]])
+- Rank(rankType=[RANK], rankRange=[rankStart=1, rankEnd=10], partitionBy=[a, c], orderBy=[b ASC], global=[false], select=[a, b, c])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ LogicalProject(a=[$0], b=[$1], rk=[$2])
<Resource name="optimized rel plan">
<![CDATA[
FlinkLogicalCalc(select=[a, b, 2:BIGINT AS $2])
+- FlinkLogicalRank(rankType=[RANK], rankRange=[rankStart=2, rankEnd=2], partitionBy=[b], orderBy=[a ASC, c ASC], select=[a, b, c])
+- FlinkLogicalRank(rankType=[RANK], rankRange=[rankStart=2, rankEnd=2], partitionBy=[b], orderBy=[a ASC, c ASC], select=[a, b, c, w0$o0])
+- FlinkLogicalLegacyTableSourceScan(table=[[default_catalog, default_database, MyTable, source: [TestTableSource(a, b, c)]]], fields=[a, b, c])
]]>
</Resource>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ LogicalAggregate(group=[{0}], EXPR$1=[SUM($1)])
<![CDATA[
HashAggregate(isMerge=[false], groupBy=[a], select=[a, SUM(b) AS EXPR$1])
+- Calc(select=[a, b])
+- Rank(rankType=[RANK], rankRange=[rankStart=1, rankEnd=10], partitionBy=[a, c], orderBy=[b ASC], global=[true], select=[a, b, c])
+- Rank(rankType=[RANK], rankRange=[rankStart=1, rankEnd=10], partitionBy=[a, c], orderBy=[b ASC], global=[true], select=[a, b, c, w0$o0])
+- Sort(orderBy=[a ASC, c ASC, b ASC])
+- Exchange(distribution=[hash[a]])
+- Rank(rankType=[RANK], rankRange=[rankStart=1, rankEnd=10], partitionBy=[a, c], orderBy=[b ASC], global=[false], select=[a, b, c])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ LogicalProject(a=[$0])
<Resource name="optimized rel plan">
<![CDATA[
Calc(select=[a])
+- Rank(rankType=[RANK], rankRange=[rankStart=1, rankEnd=5], partitionBy=[a], orderBy=[$f1 ASC], global=[true], select=[a, $f1])
+- Rank(rankType=[RANK], rankRange=[rankStart=1, rankEnd=5], partitionBy=[a], orderBy=[$f1 ASC], global=[true], select=[a, $f1, w0$o0])
+- Sort(orderBy=[a ASC, $f1 ASC])
+- HashAggregate(isMerge=[true], groupBy=[a], select=[a, Final_SUM(sum$0) AS $f1])
+- Exchange(distribution=[hash[a]])
Expand Down Expand Up @@ -68,7 +68,7 @@ LogicalProject(a=[$0])
<Resource name="optimized rel plan">
<![CDATA[
Calc(select=[a])
+- Rank(rankType=[RANK], rankRange=[rankStart=2, rankEnd=5], partitionBy=[a], orderBy=[$f1 ASC], global=[true], select=[a, $f1])
+- Rank(rankType=[RANK], rankRange=[rankStart=2, rankEnd=5], partitionBy=[a], orderBy=[$f1 ASC], global=[true], select=[a, $f1, w0$o0])
+- Sort(orderBy=[a ASC, $f1 ASC])
+- HashAggregate(isMerge=[true], groupBy=[a], select=[a, Final_SUM(sum$0) AS $f1])
+- Exchange(distribution=[hash[a]])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -277,74 +277,6 @@ Calc(select=[a, b, c])
+- Exchange(distribution=[hash[a]])
+- Calc(select=[a, b, c, proctime])
+- DataStreamScan(table=[[default_catalog, default_database, MyTable]], fields=[a, b, c, proctime, rowtime])
]]>
</Resource>
</TestCase>
<TestCase name="testRedundantRankNumberColumnRemove">
<Resource name="sql">
<![CDATA[
SELECT
CONCAT('http://txmov2.a.yximgs.com', uri) AS url,
reqcount AS download_count,
start_time AS `timestamp`
FROM
(
SELECT
uri,
reqcount,
rownum_2,
start_time
FROM
(
SELECT
uri,
reqcount,
start_time,
ROW_NUMBER() OVER (
PARTITION BY start_time
ORDER BY
reqcount DESC
) AS rownum_2
FROM
(
SELECT
uri,
reqcount,
start_time,
ROW_NUMBER() OVER (
PARTITION BY start_time, bucket_id
ORDER BY
reqcount DESC
) AS rownum_1
FROM MyTable1
)
WHERE
rownum_1 <= 100000
)
WHERE
rownum_2 <= 100000
)
]]>
</Resource>
<Resource name="ast">
<![CDATA[
LogicalProject(url=[CONCAT(_UTF-16LE'http://txmov2.a.yximgs.com', $0)], download_count=[$1], timestamp=[$2])
+- LogicalFilter(condition=[<=($3, 100000)])
+- LogicalProject(uri=[$0], reqcount=[$1], start_time=[$2], rownum_2=[ROW_NUMBER() OVER (PARTITION BY $2 ORDER BY $1 DESC NULLS LAST)])
+- LogicalFilter(condition=[<=($3, 100000)])
+- LogicalProject(uri=[$0], reqcount=[$1], start_time=[$2], rownum_1=[ROW_NUMBER() OVER (PARTITION BY $2, $3 ORDER BY $1 DESC NULLS LAST)])
+- LogicalTableScan(table=[[default_catalog, default_database, MyTable1]])
]]>
</Resource>
<Resource name="optimized exec plan">
<![CDATA[
Calc(select=[CONCAT(_UTF-16LE'http://txmov2.a.yximgs.com', uri) AS url, reqcount AS download_count, start_time AS timestamp])
+- Rank(strategy=[RetractStrategy], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=100000], partitionBy=[start_time], orderBy=[reqcount DESC], select=[uri, reqcount, start_time])
+- Exchange(distribution=[hash[start_time]])
+- Calc(select=[uri, reqcount, start_time])
+- Rank(strategy=[AppendFastStrategy], rankType=[ROW_NUMBER], rankRange=[rankStart=1, rankEnd=100000], partitionBy=[start_time, bucket_id], orderBy=[reqcount DESC], select=[uri, reqcount, start_time, bucket_id])
+- Exchange(distribution=[hash[start_time, bucket_id]])
+- DataStreamScan(table=[[default_catalog, default_database, MyTable1]], fields=[uri, reqcount, start_time, bucket_id])
]]>
</Resource>
</TestCase>
Expand Down
Loading

0 comments on commit 214ade6

Please sign in to comment.