diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java index dd9f1674e6c6f..09c6de557de11 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java @@ -436,10 +436,11 @@ public CompletableFuture submitJob(@Nonnull JobGraph jobGraph) { @Override public CompletableFuture cancel(JobID jobID) { - JobCancellationMessageParameters params = new JobCancellationMessageParameters(); - params.jobPathParameter.resolve(jobID); - params.terminationModeQueryParameter.resolve( - Collections.singletonList(TerminationModeQueryParameter.TerminationMode.CANCEL)); + JobCancellationMessageParameters params = + new JobCancellationMessageParameters() + .resolveJobId(jobID) + .resolveTerminationMode( + TerminationModeQueryParameter.TerminationMode.CANCEL); CompletableFuture responseFuture = sendRequest(JobCancellationHeaders.getInstance(), params); return responseFuture.thenApply(ignore -> Acknowledge.get()); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobCancellationMessageParameters.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobCancellationMessageParameters.java index 59a2136bcd8d5..2902f77851315 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobCancellationMessageParameters.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobCancellationMessageParameters.java @@ -18,6 +18,8 @@ package org.apache.flink.runtime.rest.messages; +import org.apache.flink.api.common.JobID; + import java.util.Collection; import java.util.Collections; @@ -28,8 +30,8 @@ */ public class JobCancellationMessageParameters extends MessageParameters { - public final JobIDPathParameter jobPathParameter = new JobIDPathParameter(); - public final TerminationModeQueryParameter terminationModeQueryParameter = + private final JobIDPathParameter jobPathParameter = new JobIDPathParameter(); + private final TerminationModeQueryParameter terminationModeQueryParameter = new TerminationModeQueryParameter(); @Override @@ -41,4 +43,15 @@ public Collection> getPathParameters() { public Collection> getQueryParameters() { return Collections.singleton(terminationModeQueryParameter); } + + public JobCancellationMessageParameters resolveJobId(JobID jobId) { + jobPathParameter.resolve(jobId); + return this; + } + + public JobCancellationMessageParameters resolveTerminationMode( + TerminationModeQueryParameter.TerminationMode terminationMode) { + terminationModeQueryParameter.resolve(Collections.singletonList(terminationMode)); + return this; + } }