Skip to content

Commit

Permalink
[FLINK-24501][table-runtime] Stores window progress into state in ord…
Browse files Browse the repository at this point in the history
…er to check whether an input element is late or not for window-tvf aggregate operator

This closes apache#17509
  • Loading branch information
beyond1920 committed Nov 10, 2021
1 parent 4dd04bd commit cba6b2c
Show file tree
Hide file tree
Showing 6 changed files with 83 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,13 @@ public void open(Context<Long> context) throws Exception {
this.nextTriggerProgress = Long.MIN_VALUE;
}

@Override
public void initializeWatermark(long watermark) {
if (isEventTime) {
currentProgress = watermark;
}
}

@Override
public boolean processElement(RowData key, RowData element) throws Exception {
long sliceEnd = sliceAssigner.assignSliceEnd(element, clockService);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,11 @@ public void open(Context<Long> context) throws Exception {
this.currentProgress = Long.MIN_VALUE;
}

@Override
public void initializeWatermark(long watermark) {
currentProgress = watermark;
}

@Override
public boolean processElement(RowData key, RowData element) throws Exception {
long sliceEnd = element.getLong(windowEndIndex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@

import static org.apache.flink.table.runtime.util.TimeWindowUtil.isWindowFired;

/** An window rank processor. */
/** A rowtime window rank processor. */
public final class WindowRankProcessor implements SlicingWindowProcessor<Long> {
private static final long serialVersionUID = 1L;

Expand Down Expand Up @@ -140,6 +140,11 @@ public void open(Context<Long> context) throws Exception {
this.currentProgress = Long.MIN_VALUE;
}

@Override
public void initializeWatermark(long watermark) {
currentProgress = watermark;
}

@Override
public boolean processElement(RowData key, RowData element) throws Exception {
long sliceEnd = element.getLong(windowEndIndex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,17 @@
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.Meter;
import org.apache.flink.metrics.MeterView;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.state.KeyedStateBackend;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.InternalTimer;
import org.apache.flink.streaming.api.operators.InternalTimerService;
Expand Down Expand Up @@ -116,6 +121,9 @@ public final class SlicingWindowOperator<K, W> extends TableStreamOperator<RowDa
/** The tracked processing time triggered last time. */
private transient long lastTriggeredProcessingTime;

/** The operator state to store watermark. */
private transient ListState<Long> watermarkState;

// ------------------------------------------------------------------------
// Metrics
// ------------------------------------------------------------------------
Expand Down Expand Up @@ -150,6 +158,8 @@ public void open() throws Exception {
getKeyedStateBackend(),
collector,
getRuntimeContext()));
// initialize watermark
windowProcessor.initializeWatermark(currentWatermark);

// metrics
this.numLateRecordsDropped = metrics.counter(LATE_ELEMENTS_DROPPED_METRIC_NAME);
Expand All @@ -170,6 +180,33 @@ public void open() throws Exception {
});
}

@Override
public void initializeState(StateInitializationContext context) throws Exception {
super.initializeState(context);
ListStateDescriptor<Long> watermarkStateDesc =
new ListStateDescriptor<>("watermark", LongSerializer.INSTANCE);
this.watermarkState = context.getOperatorStateStore().getUnionListState(watermarkStateDesc);
if (context.isRestored()) {
Iterable<Long> watermarks = watermarkState.get();
if (watermarks != null) {
Long minWatermark = Long.MAX_VALUE;
for (Long watermark : watermarks) {
minWatermark = Math.min(watermark, minWatermark);
}
if (minWatermark != Long.MAX_VALUE) {
this.currentWatermark = minWatermark;
}
}
}
}

@Override
public void snapshotState(StateSnapshotContext context) throws Exception {
super.snapshotState(context);
this.watermarkState.clear();
this.watermarkState.add(currentWatermark);
}

@Override
public void close() throws Exception {
super.close();
Expand All @@ -190,8 +227,12 @@ public void processElement(StreamRecord<RowData> element) throws Exception {

@Override
public void processWatermark(Watermark mark) throws Exception {
windowProcessor.advanceProgress(mark.getTimestamp());
super.processWatermark(mark);
if (mark.getTimestamp() > currentWatermark) {
windowProcessor.advanceProgress(mark.getTimestamp());
super.processWatermark(mark);
} else {
super.processWatermark(new Watermark(currentWatermark));
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,14 @@ public interface SlicingWindowProcessor<W> extends Serializable {
/** Initialization method for the function. It is called before the actual working methods. */
void open(Context<W> context) throws Exception;

/**
* Initializes the watermark which restores from state. The method is called after open method
* and before the actual working methods.
*
* @param watermark the initial watermark
*/
void initializeWatermark(long watermark);

/**
* Process an element with associated key from the input stream. Returns true if this element is
* dropped because of late arrival.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -391,13 +391,6 @@ public void testEventTimeCumulativeWindows() throws Exception {
ASSERTER.assertOutputEqualsSorted(
"Output was not correct.", expectedOutput, testHarness.getOutput());

testHarness.processWatermark(new Watermark(2999));
expectedOutput.add(insertRecord("key1", 3L, 3L, localMills(0L), localMills(3000L)));
expectedOutput.add(insertRecord("key2", 4L, 4L, localMills(0L), localMills(3000L)));
expectedOutput.add(new Watermark(2999));
ASSERTER.assertOutputEqualsSorted(
"Output was not correct.", expectedOutput, testHarness.getOutput());

// do a snapshot, close and restore again
testHarness.prepareSnapshotPreBarrier(0L);
OperatorSubtaskState snapshot = testHarness.snapshot(0L, 0);
Expand All @@ -410,6 +403,20 @@ public void testEventTimeCumulativeWindows() throws Exception {
testHarness.setup();
testHarness.initializeState(snapshot);
testHarness.open();
// the late event would not trigger window [0, 2000L) again even if the job restore from
// savepoint
testHarness.processElement(insertRecord("key2", 1, 1000L));
testHarness.processWatermark(new Watermark(1999));

expectedOutput.add(new Watermark(1999));
ASSERTER.assertOutputEqualsSorted(
"Output was not correct.", expectedOutput, testHarness.getOutput());
testHarness.processWatermark(new Watermark(2999));
expectedOutput.add(insertRecord("key1", 3L, 3L, localMills(0L), localMills(3000L)));
expectedOutput.add(insertRecord("key2", 5L, 5L, localMills(0L), localMills(3000L)));
expectedOutput.add(new Watermark(2999));
ASSERTER.assertOutputEqualsSorted(
"Output was not correct.", expectedOutput, testHarness.getOutput());

testHarness.processWatermark(new Watermark(3999));
expectedOutput.add(insertRecord("key2", 1L, 1L, localMills(3000L), localMills(4000L)));
Expand Down

0 comments on commit cba6b2c

Please sign in to comment.