Skip to content

Commit

Permalink
[FLINK-22714][table-planner-blink] Simplify window TVF to a simple wi…
Browse files Browse the repository at this point in the history
…ndow assigner if successor

node is WindowRank or WindowJoin

This closes apache#16025
  • Loading branch information
beyond1920 authored and godfreyhe committed Jul 6, 2021
1 parent dfb4394 commit f61d9af
Show file tree
Hide file tree
Showing 16 changed files with 3,785 additions and 576 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -104,13 +104,17 @@ public StreamExecWindowRank(
InputProperty inputProperty,
RowType outputType,
String description) {
super(Collections.singletonList(inputProperty), outputType, description);
this.rankType = rankType;
this.rankRange = rankRange;
this.sortSpec = sortSpec;
this.partitionSpec = partitionSpec;
this.outputRankNumber = outputRankNumber;
this.windowing = windowing;
this(
rankType,
partitionSpec,
sortSpec,
rankRange,
outputRankNumber,
windowing,
getNewNodeId(),
Collections.singletonList(inputProperty),
outputType,
description);
}

@JsonCreator
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,33 +21,70 @@
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.planner.delegation.PlannerBase;
import org.apache.flink.table.planner.plan.logical.WindowingStrategy;
import org.apache.flink.table.planner.plan.logical.TimeAttributeWindowingStrategy;
import org.apache.flink.table.planner.plan.nodes.exec.ExecEdge;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
import org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
import org.apache.flink.table.planner.plan.nodes.exec.SingleTransformationTranslator;
import org.apache.flink.table.types.logical.RowType;

import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;

import java.util.Collections;
import java.util.List;

import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;

/**
* Stream {@link ExecNode} which acts as a table-valued function to assign a window for each row of
* the input relation. The return value of the new relation includes all the original columns as
* well additional 3 columns named {@code window_start}, {@code window_end}, {@code window_time} to
* indicate the assigned window.
*/
@JsonIgnoreProperties(ignoreUnknown = true)
public class StreamExecWindowTableFunction extends ExecNodeBase<RowData>
implements StreamExecNode<RowData> {
implements StreamExecNode<RowData>, SingleTransformationTranslator<RowData> {

public static final String FIELD_NAME_WINDOWING = "windowing";
public static final String FIELD_NAME_EMIT_PER_RECORD = "emitPerRecord";

@JsonProperty(FIELD_NAME_WINDOWING)
private final TimeAttributeWindowingStrategy windowingStrategy;

private final WindowingStrategy windowingStrategy;
@JsonProperty(FIELD_NAME_EMIT_PER_RECORD)
private final Boolean emitPerRecord;

public StreamExecWindowTableFunction(
WindowingStrategy windowingStrategy,
InputProperty inputEdge,
TimeAttributeWindowingStrategy windowingStrategy,
Boolean emitPerRecord,
InputProperty inputProperty,
RowType outputType,
String description) {
super(Collections.singletonList(inputEdge), outputType, description);
this.windowingStrategy = windowingStrategy;
this(
windowingStrategy,
emitPerRecord,
getNewNodeId(),
Collections.singletonList(inputProperty),
outputType,
description);
}

@JsonCreator
public StreamExecWindowTableFunction(
@JsonProperty(FIELD_NAME_WINDOWING) TimeAttributeWindowingStrategy windowingStrategy,
@JsonProperty(FIELD_NAME_EMIT_PER_RECORD) Boolean emitPerRecord,
@JsonProperty(FIELD_NAME_ID) int id,
@JsonProperty(FIELD_NAME_INPUT_PROPERTIES) List<InputProperty> inputProperties,
@JsonProperty(FIELD_NAME_OUTPUT_TYPE) RowType outputType,
@JsonProperty(FIELD_NAME_DESCRIPTION) String description) {
super(id, inputProperties, outputType, description);
checkArgument(inputProperties.size() == 1);
this.windowingStrategy = checkNotNull(windowingStrategy);
this.emitPerRecord = checkNotNull(emitPerRecord);
}

@Override
Expand All @@ -57,6 +94,7 @@ protected Transformation<RowData> translateToPlanInternal(PlannerBase planner) {
String[] inputFieldNames = inputRowType.getFieldNames().toArray(new String[0]);
String windowSummary = windowingStrategy.toSummaryString(inputFieldNames);

// TODO support emitPerRecord later
throw new UnsupportedOperationException(
String.format(
"Currently Flink doesn't support individual window table-valued function %s.\n "
Expand Down
Loading

0 comments on commit f61d9af

Please sign in to comment.