Skip to content

Commit

Permalink
[FLINK-24440][source] Announce and combine latest watermarks across S…
Browse files Browse the repository at this point in the history
…ourceOperators
  • Loading branch information
pnowojski committed Feb 10, 2022
1 parent f839f12 commit 10d6d4f
Show file tree
Hide file tree
Showing 7 changed files with 412 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ Licensed to the Apache Software Foundation (ASF) under one

import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.eventtime.Watermark;
import org.apache.flink.api.connector.source.ReaderInfo;
import org.apache.flink.api.connector.source.Source;
import org.apache.flink.api.connector.source.SourceEvent;
Expand All @@ -32,8 +33,10 @@ Licensed to the Apache Software Foundation (ASF) under one
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.source.event.ReaderRegistrationEvent;
import org.apache.flink.runtime.source.event.ReportedWatermarkEvent;
import org.apache.flink.runtime.source.event.RequestSplitEvent;
import org.apache.flink.runtime.source.event.SourceEventWrapper;
import org.apache.flink.runtime.source.event.WatermarkAlignmentEvent;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.TemporaryClassLoaderContext;
Expand All @@ -49,10 +52,15 @@ Licensed to the Apache Software Foundation (ASF) under one
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import static org.apache.flink.runtime.source.coordinator.SourceCoordinatorSerdeUtils.readAndVerifyCoordinatorSerdeVersion;
Expand All @@ -76,13 +84,19 @@ Licensed to the Apache Software Foundation (ASF) under one
@Internal
public class SourceCoordinator<SplitT extends SourceSplit, EnumChkT>
implements OperatorCoordinator {
public static final WatermarkAlignmentParams WATERMARK_ALIGNMENT_DISABLED =
new WatermarkAlignmentParams(Long.MAX_VALUE, "", 0);

private static final Logger LOG = LoggerFactory.getLogger(SourceCoordinator.class);

private final WatermarkAggregator<Integer> combinedWatermark = new WatermarkAggregator<>();

private final WatermarkAlignmentParams watermarkAlignmentParams;

/** The name of the operator this SourceCoordinator is associated with. */
private final String operatorName;
/** A single-thread executor to handle all the changes to the coordinator. */
private final ExecutorService coordinatorExecutor;
private final ScheduledExecutorService coordinatorExecutor;
/** The Source that is associated with this SourceCoordinator. */
private final Source<?, SplitT, EnumChkT> source;
/** The serializer that handles the serde of the SplitEnumerator checkpoints. */
Expand All @@ -101,16 +115,70 @@ public class SourceCoordinator<SplitT extends SourceSplit, EnumChkT>

public SourceCoordinator(
String operatorName,
ExecutorService coordinatorExecutor,
ScheduledExecutorService coordinatorExecutor,
Source<?, SplitT, EnumChkT> source,
SourceCoordinatorContext<SplitT> context,
CoordinatorStore coordinatorStore) {
this(
operatorName,
coordinatorExecutor,
source,
context,
coordinatorStore,
WATERMARK_ALIGNMENT_DISABLED);
}

public SourceCoordinator(
String operatorName,
ScheduledExecutorService coordinatorExecutor,
Source<?, SplitT, EnumChkT> source,
SourceCoordinatorContext<SplitT> context,
CoordinatorStore coordinatorStore,
WatermarkAlignmentParams watermarkAlignmentParams) {
this.operatorName = operatorName;
this.coordinatorExecutor = coordinatorExecutor;
this.source = source;
this.enumCheckpointSerializer = source.getEnumeratorCheckpointSerializer();
this.context = context;
this.coordinatorStore = coordinatorStore;
this.watermarkAlignmentParams = watermarkAlignmentParams;

if (watermarkAlignmentParams.isEnabled()) {
coordinatorStore.putIfAbsent(
watermarkAlignmentParams.watermarkGroup, new WatermarkAggregator<>());
coordinatorExecutor.scheduleAtFixedRate(
this::announceCombinedWatermark,
watermarkAlignmentParams.updateInterval,
watermarkAlignmentParams.updateInterval,
TimeUnit.MILLISECONDS);
}
}

@VisibleForTesting
void announceCombinedWatermark() {
checkState(watermarkAlignmentParams != WATERMARK_ALIGNMENT_DISABLED);

Watermark globalCombinedWatermark =
coordinatorStore.apply(
watermarkAlignmentParams.watermarkGroup,
(value) -> {
WatermarkAggregator aggregator = (WatermarkAggregator) value;
return new Watermark(
aggregator.getAggregatedWatermark().getTimestamp());
});

long maxAllowedWatermark =
globalCombinedWatermark.getTimestamp()
+ watermarkAlignmentParams.maxAllowedWatermarkDrift;
Set<Integer> subTaskIds = combinedWatermark.keySet();
LOG.info(
"Distributing maxAllowedWatermark={} to subTaskIds={}",
maxAllowedWatermark,
subTaskIds);
for (Integer subtaskId : subTaskIds) {
context.sendEventToSourceOperator(
subtaskId, new WatermarkAlignmentEvent(maxAllowedWatermark));
}
}

@Override
Expand Down Expand Up @@ -194,6 +262,10 @@ public void handleEventFromOperator(int subtask, OperatorEvent event) {
subtask,
registrationEvent.location());
handleReaderRegistrationEvent(registrationEvent);
} else if (event instanceof ReportedWatermarkEvent) {
handleReportedWatermark(
subtask,
new Watermark(((ReportedWatermarkEvent) event).getWatermark()));
} else {
throw new FlinkException("Unrecognized Operator Event: " + event);
}
Expand Down Expand Up @@ -440,9 +512,80 @@ private void handleReaderRegistrationEvent(ReaderRegistrationEvent event) {
enumerator.addReader(event.subtaskId());
}

private void handleReportedWatermark(int subtask, Watermark watermark) {
LOG.debug("New reported watermark={} from subTaskId={}", watermark, subtask);

checkState(watermarkAlignmentParams.isEnabled());

combinedWatermark
.aggregate(subtask, watermark)
.ifPresent(
newCombinedWatermark ->
coordinatorStore.computeIfPresent(
watermarkAlignmentParams.watermarkGroup,
(key, oldValue) -> {
WatermarkAggregator<String> watermarkAggregator =
(WatermarkAggregator<String>) oldValue;
watermarkAggregator.aggregate(
operatorName, newCombinedWatermark);
return watermarkAggregator;
}));
}

private void ensureStarted() {
if (!started) {
throw new IllegalStateException("The coordinator has not started yet.");
}
}

private static class WatermarkAggregator<T> {
private final Map<T, Watermark> watermarks = new HashMap<>();
private Watermark aggregatedWatermark = new Watermark(Long.MIN_VALUE);

/**
* Update the {@link Watermark} for the given {@code key)}.
*
* @return the new updated combined {@link Watermark} if the value has changed. {@code
* Optional.empty()} otherwise.
*/
public Optional<Watermark> aggregate(T key, Watermark watermark) {
watermarks.put(key, watermark);
Watermark newMinimum =
watermarks.values().stream()
.min(Comparator.comparingLong(Watermark::getTimestamp))
.orElseThrow(IllegalStateException::new);
if (newMinimum.equals(aggregatedWatermark)) {
return Optional.empty();
} else {
aggregatedWatermark = newMinimum;
return Optional.of(aggregatedWatermark);
}
}

public Set<T> keySet() {
return watermarks.keySet();
}

public Watermark getAggregatedWatermark() {
return aggregatedWatermark;
}
}

/** Configuration parameters for watermark alignemnt. */
public static class WatermarkAlignmentParams {
private final long maxAllowedWatermarkDrift;
private final String watermarkGroup;
private final long updateInterval;

public WatermarkAlignmentParams(
long maxAllowedWatermarkDrift, String watermarkGroup, long updateInterval) {
this.maxAllowedWatermarkDrift = maxAllowedWatermarkDrift;
this.watermarkGroup = watermarkGroup;
this.updateInterval = updateInterval;
}

public boolean isEnabled() {
return maxAllowedWatermarkDrift < Long.MAX_VALUE;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,19 @@ public void sendEventToSourceReader(int subtaskId, SourceEvent event) {
String.format("Failed to send event %s to subtask %d", event, subtaskId));
}

void sendEventToSourceOperator(int subtaskId, OperatorEvent event) {
checkSubtaskIndex(subtaskId);

callInCoordinatorThread(
() -> {
final OperatorCoordinator.SubtaskGateway gateway =
getGatewayAndCheckReady(subtaskId);
gateway.sendEvent(event);
return null;
},
String.format("Failed to send event %s to subtask %d", event, subtaskId));
}

@Override
public int currentParallelism() {
return operatorCoordinatorContext.currentParallelism();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ Licensed to the Apache Software Foundation (ASF) under one
import javax.annotation.Nullable;

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.function.BiConsumer;

Expand Down Expand Up @@ -71,8 +71,8 @@ public OperatorCoordinator getCoordinator(OperatorCoordinator.Context context) {
CoordinatorExecutorThreadFactory coordinatorThreadFactory =
new CoordinatorExecutorThreadFactory(
coordinatorThreadName, context.getUserCodeClassloader());
ExecutorService coordinatorExecutor =
Executors.newSingleThreadExecutor(coordinatorThreadFactory);
ScheduledExecutorService coordinatorExecutor =
Executors.newScheduledThreadPool(1, coordinatorThreadFactory);

SimpleVersionedSerializer<SplitT> splitSerializer = source.getSplitSerializer();
SourceCoordinatorContext<SplitT> sourceCoordinatorContext =
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.source.event;

import org.apache.flink.api.common.eventtime.Watermark;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;

import java.util.Objects;

/**
* Reports last emitted {@link Watermark} from a subtask to the {@link
* org.apache.flink.runtime.source.coordinator.SourceCoordinator}.
*/
public class ReportedWatermarkEvent implements OperatorEvent {

private static final long serialVersionUID = 1L;

private final long watermark;

public ReportedWatermarkEvent(long watermark) {
this.watermark = watermark;
}

public long getWatermark() {
return watermark;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
ReportedWatermarkEvent that = (ReportedWatermarkEvent) o;
return watermark == that.watermark;
}

@Override
public int hashCode() {
return Objects.hash(watermark);
}

@Override
public String toString() {
return getClass().getSimpleName() + "{" + "watermark=" + watermark + '}';
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.source.event;

import org.apache.flink.runtime.operators.coordination.OperatorEvent;

import java.util.Objects;

/** Signals source operators the maximum watermark that emitted records can have. */
public class WatermarkAlignmentEvent implements OperatorEvent {

private static final long serialVersionUID = 1L;

private final long maxWatermark;

public WatermarkAlignmentEvent(long maxWatermark) {
this.maxWatermark = maxWatermark;
}

public long getMaxWatermark() {
return maxWatermark;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
WatermarkAlignmentEvent that = (WatermarkAlignmentEvent) o;
return maxWatermark == that.maxWatermark;
}

@Override
public int hashCode() {
return Objects.hash(maxWatermark);
}

@Override
public String toString() {
return "WatermarkAlignmentEvent{" + "maxWatermark=" + maxWatermark + '}';
}
}
Loading

0 comments on commit 10d6d4f

Please sign in to comment.