Skip to content

Commit

Permalink
Update based on JingsongLi's comments
Browse files Browse the repository at this point in the history
  • Loading branch information
beyond1920 committed Apr 30, 2021
1 parent e7a99f5 commit 7fadefe
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@
import org.apache.flink.table.runtime.generated.JoinCondition;
import org.apache.flink.table.runtime.operators.TableStreamOperator;
import org.apache.flink.table.runtime.operators.join.JoinConditionWithNullFilters;
import org.apache.flink.table.runtime.operators.window.slicing.WindowTimerService;
import org.apache.flink.table.runtime.operators.window.slicing.WindowTimerServiceImpl;
import org.apache.flink.table.runtime.operators.window.state.WindowListState;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.types.RowKind;
Expand All @@ -51,13 +53,14 @@
import java.util.List;

import static org.apache.flink.table.runtime.util.TimeWindowUtil.isWindowFired;
import static org.apache.flink.table.runtime.util.TimeWindowUtil.toEpochMillsForTimer;

/**
* Streaming window join operator.
*
* <p>Note: currently, {@link WindowJoinOperator} doesn't support early-fire and late-arrival. Thus
* late elements (elements belong to emitted windows) will be simply dropped.
*
* <p>Note: currently, {@link WindowJoinOperator} doesn't support DELETE or UPDATE_BEFORE input row.
*/
public abstract class WindowJoinOperator extends TableStreamOperator<RowData>
implements TwoInputStreamOperator<RowData, RowData, RowData>,
Expand Down Expand Up @@ -91,7 +94,7 @@ public abstract class WindowJoinOperator extends TableStreamOperator<RowData>
/** Flag to prevent duplicate function.close() calls in close() and dispose(). */
private transient boolean functionsClosed = false;

private transient InternalTimerService<Long> internalTimerService;
private transient WindowTimerService<Long> windowTimerService;

// ------------------------------------------------------------------------
protected transient JoinConditionWithNullFilters joinCondition;
Expand Down Expand Up @@ -139,7 +142,9 @@ public void open() throws Exception {

final LongSerializer windowSerializer = LongSerializer.INSTANCE;

internalTimerService = getInternalTimerService("window-timers", windowSerializer, this);
InternalTimerService<Long> internalTimerService =
getInternalTimerService("window-timers", windowSerializer, this);
this.windowTimerService = new WindowTimerServiceImpl(internalTimerService, shiftTimeZone);

// init join condition
JoinCondition condition =
Expand Down Expand Up @@ -178,11 +183,11 @@ public void open() throws Exception {
metrics.gauge(
WATERMARK_LATENCY_METRIC_NAME,
() -> {
long watermark = internalTimerService.currentWatermark();
long watermark = windowTimerService.currentWatermark();
if (watermark < 0) {
return 0L;
} else {
return internalTimerService.currentProcessingTime() - watermark;
return windowTimerService.currentProcessingTime() - watermark;
}
});
}
Expand Down Expand Up @@ -227,19 +232,20 @@ private void processElement(
throws Exception {
RowData inputRow = element.getValue();
long windowEnd = inputRow.getLong(windowEndIndex);
if (isWindowFired(windowEnd, internalTimerService.currentWatermark(), shiftTimeZone)) {
if (isWindowFired(windowEnd, windowTimerService.currentWatermark(), shiftTimeZone)) {
// element is late and should be dropped
lateRecordsDroppedRate.markEvent();
return;
}
if (RowDataUtil.isAccumulateMsg(inputRow)) {
recordState.add(windowEnd, inputRow);
} else {
recordState.delete(windowEnd, inputRow);
// Window join could not handle retraction input stream
throw new UnsupportedOperationException(
"This is a bug and should not happen. Please file an issue.");
}
// always register time for every element
internalTimerService.registerEventTimeTimer(
windowEnd, toEpochMillsForTimer(windowEnd - 1, shiftTimeZone));
windowTimerService.registerEventTimeWindowTimer(windowEnd);
}

@Override
Expand Down Expand Up @@ -372,6 +378,8 @@ public void join(Iterable<RowData> leftRecords, Iterable<RowData> rightRecords)

private abstract static class AbstractOuterJoinOperator extends WindowJoinOperator {

private static final long serialVersionUID = 1L;

private transient RowData leftNullRow;
private transient RowData rightNullRow;
private transient JoinedRowData outRow;
Expand Down Expand Up @@ -431,6 +439,8 @@ protected void output(RowData inputRow, RowData otherRow, boolean inputIsLeft) {

static class LeftOuterJoinOperator extends AbstractOuterJoinOperator {

private static final long serialVersionUID = 1L;

LeftOuterJoinOperator(
InternalTypeInfo leftType,
InternalTypeInfo rightType,
Expand Down Expand Up @@ -476,6 +486,8 @@ public void join(Iterable<RowData> leftRecords, Iterable<RowData> rightRecords)

static class RightOuterJoinOperator extends AbstractOuterJoinOperator {

private static final long serialVersionUID = 1L;

RightOuterJoinOperator(
InternalTypeInfo leftType,
InternalTypeInfo rightType,
Expand Down Expand Up @@ -520,6 +532,8 @@ public void join(Iterable<RowData> leftRecords, Iterable<RowData> rightRecords)

static class FullOuterJoinOperator extends AbstractOuterJoinOperator {

private static final long serialVersionUID = 1L;

FullOuterJoinOperator(
InternalTypeInfo leftType,
InternalTypeInfo rightType,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,26 +58,4 @@ public void add(W window, RowData value) throws Exception {
windowState.setCurrentNamespace(window);
windowState.add(value);
}

/**
* Updates the operator state accessible by {@link #get(W)} )} by delete the given value to the
* list of values. The next time {@link #get(W)} is called (for the same state partition) the
* returned state will represent the updated list.
*
* <p>If null is passed in, the state value will remain unchanged.
*
* <p>The performance is not well, first get complete list by calling {@link
* InternalListState#getInternal()})}, then remove the value from list, finally update state by
* calling {@link InternalListState#update(List)}.
*
* @param window The namespace for the state.
* @param value The new value for the state.
* @throws Exception Thrown if the system cannot access the state.
*/
public boolean delete(W window, RowData value) throws Exception {
List<RowData> completeData = get(window);
boolean flag = completeData.remove(value);
windowState.updateInternal(completeData);
return flag;
}
}

0 comments on commit 7fadefe

Please sign in to comment.