Skip to content

Commit

Permalink
[FLINK-25999] Deprecate Per-Job Mode
Browse files Browse the repository at this point in the history
  • Loading branch information
knaufk committed Feb 11, 2022
1 parent 314f91d commit 47c5aae
Show file tree
Hide file tree
Showing 20 changed files with 128 additions and 117 deletions.
52 changes: 27 additions & 25 deletions docs/content/docs/concepts/flink-architecture.md
Original file line number Diff line number Diff line change
Expand Up @@ -167,10 +167,28 @@ the outside world (see [Anatomy of a Flink Program]({{< ref "docs/dev/datastream

The jobs of a Flink Application can either be submitted to a long-running
[Flink Session Cluster]({{< ref "docs/concepts/glossary" >}}#flink-session-cluster), a dedicated [Flink Job
Cluster]({{< ref "docs/concepts/glossary" >}}#flink-job-cluster), or a
Cluster (deprecated)]({{< ref "docs/concepts/glossary" >}}#flink-job-cluster), or a
[Flink Application Cluster]({{< ref "docs/concepts/glossary" >}}#flink-application-cluster). The difference between these options is mainly related to the cluster’s lifecycle and to resource
isolation guarantees.

### Flink Application Cluster

* **Cluster Lifecycle**: a Flink Application Cluster is a dedicated Flink
cluster that only executes jobs from one Flink Application and where the
``main()`` method runs on the cluster rather than the client. The job
submission is a one-step process: you don’t need to start a Flink cluster
first and then submit a job to the existing cluster session; instead, you
package your application logic and dependencies into a executable job JAR and
the cluster entrypoint (``ApplicationClusterEntryPoint``)
is responsible for calling the ``main()`` method to extract the JobGraph.
This allows you to deploy a Flink Application like any other application on
Kubernetes, for example. The lifetime of a Flink Application Cluster is
therefore bound to the lifetime of the Flink Application.

* **Resource Isolation**: in a Flink Application Cluster, the ResourceManager
and Dispatcher are scoped to a single Flink Application, which provides a
better separation of concerns than the Flink Session Cluster.

### Flink Session Cluster

* **Cluster Lifecycle**: in a Flink Session Cluster, the client connects to a
Expand Down Expand Up @@ -199,7 +217,13 @@ isolation guarantees.
Formerly, a Flink Session Cluster was also known as a Flink Cluster in `session mode`.
{{< /hint >}}

### Flink Job Cluster
### Flink Job Cluster (deprecated)

{{< hint danger >}}
Per-job mode is only supported by YARN and has been deprecated in Flink 1.15.
It will be dropped in [FLINK-26000](https://issues.apache.org/jira/browse/FLINK-26000).
Please consider application mode to launch a dedicated cluster per-job on YARN.
{{< /hint >}}

* **Cluster Lifecycle**: in a Flink Job Cluster, the available cluster manager
(like YARN) is used to spin up a cluster for each submitted job
Expand All @@ -221,29 +245,7 @@ Formerly, a Flink Session Cluster was also known as a Flink Cluster in `session
Formerly, a Flink Job Cluster was also known as a Flink Cluster in `job (or per-job) mode`.
{{< /hint >}}
{{< hint info >}}
Kubernetes doesn't support Flink Job Cluster. See details in [Standalone Kubernetes]({{< ref "docs/deployment/resource-providers/standalone/kubernetes" >}}#per-job-cluster-mode) and [Native Kubernetes]({{< ref "docs/deployment/resource-providers/native_kubernetes" >}}#per-job-cluster-mode).
{{< /hint >}}

### Flink Application Cluster

* **Cluster Lifecycle**: a Flink Application Cluster is a dedicated Flink
cluster that only executes jobs from one Flink Application and where the
``main()`` method runs on the cluster rather than the client. The job
submission is a one-step process: you don’t need to start a Flink cluster
first and then submit a job to the existing cluster session; instead, you
package your application logic and dependencies into a executable job JAR and
the cluster entrypoint (``ApplicationClusterEntryPoint``)
is responsible for calling the ``main()`` method to extract the JobGraph.
This allows you to deploy a Flink Application like any other application on
Kubernetes, for example. The lifetime of a Flink Application Cluster is
therefore bound to the lifetime of the Flink Application.

* **Resource Isolation**: in a Flink Application Cluster, the ResourceManager
and Dispatcher are scoped to a single Flink Application, which provides a
better separation of concerns than the Flink Session Cluster.

{{< hint info >}}
A Flink Job Cluster can be seen as a “run-on-client” alternative to Flink Application Clusters.
Flink Job Clusters are only supperted with YARN.
{{< /hint >}}

{{< top >}}
3 changes: 2 additions & 1 deletion docs/content/docs/concepts/glossary.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ Cluster](#flink-cluster) is bound to the lifetime of the Flink Application.

A Flink Job Cluster is a dedicated [Flink Cluster](#flink-cluster) that only
executes a single [Flink Job](#flink-job). The lifetime of the
[Flink Cluster](#flink-cluster) is bound to the lifetime of the Flink Job.
[Flink Cluster](#flink-cluster) is bound to the lifetime of the Flink Job.
This deployment mode has been deprecated since Flink 1.15.

#### Flink Cluster

Expand Down
8 changes: 4 additions & 4 deletions docs/content/docs/deployment/cli.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,8 @@ this action individually as it works similarly to the `run` action in terms of t
The `run` and `run-application` commands support passing additional configuration parameters via the
`-D` argument. For example setting the [maximum parallelism]({{< ref "docs/deployment/config#pipeline-max-parallelism" >}}#application-mode)
for a job can be done by setting `-Dpipeline.max-parallelism=120`. This argument is very useful for
configuring per-job or application mode clusters, because you can pass any configuration parameter
to the cluster, without changing the configuration file.
configuring application mode clusters, because you can pass any configuration parameter
to the cluster without changing the configuration file.

When submitting a job to an existing session cluster, only [execution configuration parameters]({{< ref "docs/deployment/config#execution" >}}) are supported.

Expand Down Expand Up @@ -335,12 +335,12 @@ The parameterization of a job submission differs based on the underlying framewo

`bin/flink` offers a parameter `--target` to handle the different options. In addition to that, jobs
have to be submitted using either `run` (for [Session]({{< ref "docs/deployment/overview" >}}#session-mode)
and [Per-Job Mode]({{< ref "docs/deployment/overview" >}}#per-job-mode)) or `run-application` (for
and [Per-Job Mode (deprecated)]({{< ref "docs/deployment/overview" >}}#per-job-mode)) or `run-application` (for
[Application Mode]({{< ref "docs/deployment/overview" >}}#application-mode)). See the following summary of
parameter combinations:
* YARN
* `./bin/flink run --target yarn-session`: Submission to an already running Flink on YARN cluster
* `./bin/flink run --target yarn-per-job`: Submission spinning up a Flink on YARN cluster in Per-Job Mode
* `./bin/flink run --target yarn-per-job`: Submission spinning up a Flink on YARN cluster in Per-Job Mode (deprecated)
* `./bin/flink run-application --target yarn-application`: Submission spinning up Flink on YARN cluster in Application Mode
* Kubernetes
* `./bin/flink run --target kubernetes-session`: Submission to an already running Flink on Kubernetes cluster
Expand Down
40 changes: 21 additions & 19 deletions docs/content/docs/deployment/overview.md
Original file line number Diff line number Diff line change
Expand Up @@ -158,8 +158,8 @@ When deploying Flink, there are often multiple options available for each buildi

Flink can execute applications in one of three ways:
- in Application Mode,
- in a Per-Job Mode,
- in Session Mode.
- in Session Mode,
- in a Per-Job Mode (deprecated).

The above modes differ in:
- the cluster lifecycle and resource isolation guarantees
Expand Down Expand Up @@ -196,7 +196,7 @@ Executing the `main()` method on the cluster may have other implications for you
in your environment using the `registerCachedFile()` must be accessible by the JobManager of your application.
{{< /hint >}}

Compared to the *Per-Job* mode, the *Application Mode* allows the submission of applications consisting of
Compared to the *Per-Job (deprecated)* mode, the *Application Mode* allows the submission of applications consisting of
multiple jobs. The order of job execution is not affected by the deployment mode but by the call used
to launch the job. Using `execute()`, which is blocking, establishes an order and it will lead to the
execution of the "next" job being postponed until "this" job finishes. Using `executeAsync()`, which is
Expand All @@ -212,16 +212,6 @@ Additionally, when any of multiple running jobs in Application Mode (submitted f
Regular job completions (by the sources shutting down) are supported.
{{< /hint >}}

### Per-Job Mode

Aiming at providing better resource isolation guarantees, the *Per-Job* mode uses the available resource provider
framework (e.g. YARN, Kubernetes) to spin up a cluster for each submitted job. This cluster is available to
that job only. When the job finishes, the cluster is torn down and any lingering resources (files, etc) are
cleared up. This provides better resource isolation, as a misbehaving job can only bring down its own
TaskManagers. In addition, it spreads the load of book-keeping across multiple JobManagers, as there is
one per job. For these reasons, the *Per-Job* resource allocation model is the preferred mode by many
production reasons.

### Session Mode

*Session mode* assumes an already running cluster and uses the resources of that cluster to execute any
Expand All @@ -234,17 +224,29 @@ restarting jobs accessing the filesystem concurrently and making it unavailable
Additionally, having a single cluster running multiple jobs implies more load for the JobManager, who
is responsible for the book-keeping of all the jobs in the cluster.

### Per-Job Mode (deprecated)

{{< hint danger >}}
Per-job mode is only supported by YARN and has been deprecated in Flink 1.15.
It will be dropped in [FLINK-26000](https://issues.apache.org/jira/browse/FLINK-26000).
Please consider application mode to launch a dedicated cluster per-job on YARN.
{{< /hint >}}

Aiming at providing better resource isolation guarantees, the *Per-Job* mode uses the available resource provider
framework (e.g. YARN) to spin up a cluster for each submitted job. This cluster is available to
that job only. When the job finishes, the cluster is torn down and any lingering resources (files, etc) are
cleared up. This provides better resource isolation, as a misbehaving job can only bring down its own
TaskManagers. In addition, it spreads the load of book-keeping across multiple JobManagers, as there is
one per job.

### Summary

In *Session Mode*, the cluster lifecycle is independent of that of any job running on the cluster
and the resources are shared across all jobs. The *Per-Job* mode pays the price of spinning up a cluster
for every submitted job, but this comes with better isolation guarantees as the resources are not shared
across jobs. In this case, the lifecycle of the cluster is bound to that of the job. Finally, the
and the resources are shared across all jobs.
*Application Mode* creates a session cluster per application and executes the application's `main()`
method on the cluster.


method on the cluster.
It thus comes with better resource isolation as the resources are only used by the job(s) launched from a single `main()` method.
This comes at the price of spining up a dedicated cluster for each application.

## Vendor Solutions

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,14 +126,6 @@ $ ./bin/flink cancel --target kubernetes-application -Dkubernetes.cluster-id=my-

You can override configurations set in `conf/flink-conf.yaml` by passing key-value pairs `-Dkey=value` to `bin/flink`.

### Per-Job Mode

{{< hint info >}}
For high-level intuition behind the per-job mode, please refer to the [deployment mode overview]({{< ref "docs/deployment/overview#per-job-mode" >}}).
{{< /hint >}}

Flink on Kubernetes does not support Per-Job Cluster Mode.

### Session Mode

{{< hint info >}}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ To shut down the cluster, either terminate (e.g. with `CTRL-C`) the JobManager a
The Flink image contains a regular Flink distribution with its default configuration and a standard entry point script.
You can run its entry point in the following modes:
* [JobManager]({{< ref "docs/concepts/glossary" >}}#flink-jobmanager) for [a Session cluster](#starting-a-session-cluster-on-docker)
* [JobManager]({{< ref "docs/concepts/glossary" >}}#flink-jobmanager) for [a Application cluster](#application-mode-on-docker)
* [JobManager]({{< ref "docs/concepts/glossary" >}}#flink-jobmanager) for [an Application cluster](#application-mode-on-docker)
* [TaskManager]({{< ref "docs/concepts/glossary" >}}#flink-taskmanager) for any cluster

This allows you to deploy a standalone cluster (Session or Application Mode) in any containerised environment, for example:
Expand Down Expand Up @@ -202,14 +202,6 @@ You can provide the following additional command line arguments to the cluster e
If the main function of the user job main class accepts arguments, you can also pass them at the end of the `docker run` command.
### Per-Job Mode
{{< hint info >}}
For high-level intuition behind the per-job mode, please refer to the [deployment mode overview]({{< ref "docs/deployment/overview#per-job-mode" >}}).
{{< /hint >}}
Per-Job Mode is not supported by Flink on Docker.
### Session Mode
{{< hint info >}}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,14 +133,6 @@ with the `kubectl` command:
$ kubectl delete -f jobmanager-job.yaml
```

### Per-Job Mode

{{< hint info >}}
For high-level intuition behind the per-job mode, please refer to the [deployment mode overview]({{< ref "docs/deployment/overview#per-job-mode" >}}).
{{< /hint >}}

Flink on Standalone Kubernetes does not support the Per-Job Cluster Mode.

### Session Mode

{{< hint info >}}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,15 +111,6 @@ $ ./bin/taskmanager.sh stop
$ ./bin/standalone-job.sh stop
```


### Per-Job Mode

{{< hint info >}}
For high-level intuition behind the per-job mode, please refer to the [deployment mode overview]({{< ref "docs/deployment/overview#per-job-mode" >}}).
{{< /hint >}}

Per-Job Mode is not supported by the Standalone Cluster.

### Session Mode

{{< hint info >}}
Expand Down
60 changes: 32 additions & 28 deletions docs/content/docs/deployment/resource-providers/yarn.md
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ Congratulations! You have successfully run a Flink application by deploying Flin

## Deployment Modes Supported by Flink on YARN

For production use, we recommend deploying Flink Applications in the [Per-job or Application Mode]({{< ref "docs/deployment/overview" >}}#deployment-modes), as these modes provide a better isolation for the Applications.
For production use, we recommend deploying Flink Applications in [Application Mode]({{< ref "docs/deployment/overview" >}}#deployment-modes) as it provides a better isolation between applications.

### Application Mode

Expand All @@ -98,7 +98,6 @@ The cluster will shut down as soon as the application has finished. You can manu
./bin/flink run-application -t yarn-application ./examples/streaming/TopSpeedWindowing.jar
```


Once an Application Mode cluster is deployed, you can interact with it for operations like cancelling or taking a savepoint.

```bash
Expand All @@ -124,32 +123,6 @@ The above will allow the job submission to be extra lightweight as the needed Fl
are going to be picked up by the specified remote locations rather than be shipped to the cluster by the
client.

### Per-Job Mode

{{< hint info >}}
For high-level intuition behind the per-job mode, please refer to the [deployment mode overview]({{< ref "docs/deployment/overview#per-job-mode" >}}).
{{< /hint >}}

The Per-job Cluster mode will launch a Flink cluster on YARN, then run the provided application jar locally and finally submit the JobGraph to the JobManager on YARN. If you pass the `--detached` argument, the client will stop once the submission is accepted.

The YARN cluster will stop once the job has stopped.

```bash
./bin/flink run -t yarn-per-job --detached ./examples/streaming/TopSpeedWindowing.jar
```

Once a Per-Job Cluster is deployed, you can interact with it for operations like cancelling or taking a savepoint.

```bash
# List running job on the cluster
./bin/flink list -t yarn-per-job -Dyarn.application.id=application_XXXX_YY
# Cancel running job
./bin/flink cancel -t yarn-per-job -Dyarn.application.id=application_XXXX_YY <jobId>
```

Note that cancelling your job on an Per-Job Cluster will stop the cluster.


### Session Mode

{{< hint info >}}
Expand Down Expand Up @@ -184,6 +157,37 @@ The YARN session client also has a few "shortcut arguments" for commonly used se

{{< top >}}

### Per-Job Mode (deprecated)

{{< hint danger >}}
Per-job mode is only supported by YARN and has been deprecated in Flink 1.15.
It will be dropped in [FLINK-26000](https://issues.apache.org/jira/browse/FLINK-26000).
Please consider application mode to launch a dedicated cluster per-job on YARN.
{{< /hint >}}

{{< hint info >}}
For high-level intuition behind the per-job mode, please refer to the [deployment mode overview]({{< ref "docs/deployment/overview#per-job-mode" >}}).
{{< /hint >}}

The Per-job Cluster mode will launch a Flink cluster on YARN, then run the provided application jar locally and finally submit the JobGraph to the JobManager on YARN. If you pass the `--detached` argument, the client will stop once the submission is accepted.

The YARN cluster will stop once the job has stopped.

```bash
./bin/flink run -t yarn-per-job --detached ./examples/streaming/TopSpeedWindowing.jar
```

Once a Per-Job Cluster is deployed, you can interact with it for operations like cancelling or taking a savepoint.

```bash
# List running job on the cluster
./bin/flink list -t yarn-per-job -Dyarn.application.id=application_XXXX_YY
# Cancel running job
./bin/flink cancel -t yarn-per-job -Dyarn.application.id=application_XXXX_YY <jobId>
```

Note that cancelling your job on an Per-Job Cluster will stop the cluster.

## Flink on YARN Reference

### Configuring Flink on YARN
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
<td><h5>execution.target</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>The deployment target for the execution. This can take one of the following values when calling <code class="highlighter-rouge">bin/flink run</code>:<ul><li>remote</li><li>local</li><li>yarn-per-job</li><li>yarn-session</li><li>kubernetes-session</li></ul>And one of the following values when calling <code class="highlighter-rouge">bin/flink run-application</code>:<ul><li>yarn-application</li><li>kubernetes-application</li></ul></td>
<td>The deployment target for the execution. This can take one of the following values when calling <code class="highlighter-rouge">bin/flink run</code>:<ul><li>remote</li><li>local</li><li>yarn-per-job (deprecated)</li><li>yarn-session</li><li>kubernetes-session</li></ul>And one of the following values when calling <code class="highlighter-rouge">bin/flink run-application</code>:<ul><li>yarn-application</li><li>kubernetes-application</li></ul></td>
</tr>
</tbody>
</table>
Original file line number Diff line number Diff line change
Expand Up @@ -132,9 +132,17 @@ private static String getExecutorFactoryNames() {
return new DefaultExecutorServiceLoader()
.getExecutorNames()
.map(name -> String.format("\"%s\"", name))
.map(name -> addDeprecationNoticeToYarnPerJobMode(name))
.collect(Collectors.joining(", "));
}

private static String addDeprecationNoticeToYarnPerJobMode(String name) {
if (name.contains("yarn-per-job")) {
return name + " (deprecated)";
}
return name;
}

private static String getApplicationModeTargetNames() {
return new DefaultClusterClientServiceLoader()
.getApplicationModeTargetNames()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,11 @@ ClusterClientProvider<T> deployApplicationCluster(
* @param detached true if the cluster should be stopped after the job completion without
* serving the result, otherwise false
* @return Cluster client to talk to the Flink cluster
* @deprecated Per-job mode has been deprecated in Flink 1.15 and will be removed in the future.
* Please use application mode instead.
* @throws ClusterDeploymentException if the cluster could not be deployed
*/
@Deprecated
ClusterClientProvider<T> deployJobCluster(
final ClusterSpecification clusterSpecification,
final JobGraph jobGraph,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
* client to the target cluster.
*/
@Internal
@Deprecated
public class AbstractJobClusterExecutor<
ClusterID, ClientFactory extends ClusterClientFactory<ClusterID>>
implements PipelineExecutor {
Expand Down
Loading

0 comments on commit 47c5aae

Please sign in to comment.