Skip to content

Commit

Permalink
[hotfix][runtime] Refactor resource manager termination handling.
Browse files Browse the repository at this point in the history
  • Loading branch information
xintongsong committed Aug 14, 2020
1 parent dfd8823 commit 0fe2047
Show file tree
Hide file tree
Showing 7 changed files with 40 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;

/**
* Kubernetes specific implementation of the {@link ResourceManager}.
Expand Down Expand Up @@ -142,23 +141,23 @@ protected void initialize() throws ResourceManagerException {
}

@Override
public CompletableFuture<Void> onStop() {
public void terminate() throws Exception {
// shut down all components
Throwable throwable = null;
Exception exception = null;

try {
podsWatch.close();
} catch (Throwable t) {
throwable = t;
} catch (Exception e) {
exception = e;
}

try {
kubeClient.close();
} catch (Throwable t) {
throwable = ExceptionUtils.firstOrSuppressed(t, throwable);
} catch (Exception e) {
exception = ExceptionUtils.firstOrSuppressed(e, exception);
}

return getStopTerminationFutureOrCompletedExceptionally(throwable);
ExceptionUtils.tryRethrowException(exception);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -394,8 +394,8 @@ private CompletableFuture<Void> stopSupportingActorsAsync() {
}

@Override
public CompletableFuture<Void> onStop() {
return stopSupportingActorsAsync().thenCompose((ignored) -> super.onStop());
public void terminate() throws Exception {
stopSupportingActorsAsync().get();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ private void handleStartResourceManagerServicesException(Exception e) throws Exc
}

@Override
public CompletableFuture<Void> onStop() {
public final CompletableFuture<Void> onStop() {
try {
stopResourceManagerServices();
} catch (Exception exception) {
Expand All @@ -256,6 +256,12 @@ public CompletableFuture<Void> onStop() {
private void stopResourceManagerServices() throws Exception {
Exception exception = null;

try {
terminate();
} catch (Exception e) {
exception = new ResourceManagerException("Error while shutting down resource manager", e);
}

stopHeartbeatServices();

try {
Expand Down Expand Up @@ -1064,6 +1070,13 @@ public void handleError(final Exception exception) {
*/
protected abstract void initialize() throws ResourceManagerException;

/**
* Terminates the framework specific components.
*
* @throws Exception which occurs during termination.
*/
protected abstract void terminate() throws Exception;

/**
* This method can be overridden to add a (non-blocking) initialization routine to the
* ResourceManager that will be called when leadership is granted but before leadership is
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,11 @@ protected void initialize() throws ResourceManagerException {
// nothing to initialize
}

@Override
protected void terminate() {
// noop
}

@Override
protected void internalDeregisterApplication(ApplicationStatus finalStatus, @Nullable String diagnostics) {
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceIDRetrievable;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.entrypoint.ClusterInformation;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
Expand All @@ -36,13 +35,9 @@
import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.util.FlinkException;

import javax.annotation.Nullable;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;

/**
* Base class for {@link ResourceManager} implementations which contains some common variables and methods.
Expand Down Expand Up @@ -105,17 +100,6 @@ public ActiveResourceManager(
allocatedNotRegisteredWorkerResourceSpecs = new HashMap<>();
}

protected CompletableFuture<Void> getStopTerminationFutureOrCompletedExceptionally(@Nullable Throwable exception) {
final CompletableFuture<Void> terminationFuture = super.onStop();

if (exception != null) {
return FutureUtils.completedExceptionally(new FlinkException(
"Error while shutting down resource manager", exception));
} else {
return terminationFuture;
}
}

protected abstract Configuration loadClientConfiguration();

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,11 @@ protected void initialize() throws ResourceManagerException {
// noop
}

@Override
protected void terminate() {
// noop
}

@Override
protected void internalDeregisterApplication(ApplicationStatus finalStatus, @Nullable String diagnostics) throws ResourceManagerException {
// noop
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -284,27 +283,27 @@ protected void initialize() throws ResourceManagerException {
}

@Override
public CompletableFuture<Void> onStop() {
public void terminate() throws Exception {
// shut down all components
Throwable firstException = null;
Exception firstException = null;

if (resourceManagerClient != null) {
try {
resourceManagerClient.stop();
} catch (Throwable t) {
firstException = t;
} catch (Exception e) {
firstException = e;
}
}

if (nodeManagerClient != null) {
try {
nodeManagerClient.stop();
} catch (Throwable t) {
firstException = ExceptionUtils.firstOrSuppressed(t, firstException);
} catch (Exception e) {
firstException = ExceptionUtils.firstOrSuppressed(e, firstException);
}
}

return getStopTerminationFutureOrCompletedExceptionally(firstException);
ExceptionUtils.tryRethrowException(firstException);
}

@Override
Expand Down

0 comments on commit 0fe2047

Please sign in to comment.