diff --git a/docs/content.zh/docs/concepts/flink-architecture.md b/docs/content.zh/docs/concepts/flink-architecture.md index ddbf6b374f4da..0336f20f4fa6c 100644 --- a/docs/content.zh/docs/concepts/flink-architecture.md +++ b/docs/content.zh/docs/concepts/flink-architecture.md @@ -101,7 +101,7 @@ Flink 应用程序的作业可以被提交到长期运行的 [Flink Session 集 ### Flink Session 集群 -* **集群生命周期**:在 Flink Session 集群中,客户端连接到一个预先存在的、长期运行的集群,该集群可以接受多个作业提交。即使所有作业完成后,集群(和 JobManager)仍将继续运行直到手动停止 session 为止。因此,Flink Session 集群的寿命不受任何 Flink 作业寿命的约束。 +* **集群生命周期**:在 Flink Session 集群中,客户端连接到一个预先存在的、长期运行的集群,该集群可以接受多个应用程序提交。即使所有应用程序完成后,集群(和 JobManager)仍将继续运行直到手动停止 session 为止。因此,Flink Session 集群的寿命不受任何 Flink 应用程序寿命的约束。 * **资源隔离**:TaskManager slot 由 ResourceManager 在提交作业时分配,并在作业完成时释放。由于所有作业都共享同一集群,因此在集群资源方面存在一些竞争 — 例如提交工作阶段的网络带宽。此共享设置的局限性在于,如果 TaskManager 崩溃,则在此 TaskManager 上运行 task 的所有作业都将失败;类似的,如果 JobManager 上发生一些致命错误,它将影响集群中正在运行的所有作业。 diff --git a/docs/content.zh/docs/concepts/glossary.md b/docs/content.zh/docs/concepts/glossary.md index 63a294cdc3274..79c477b97e520 100644 --- a/docs/content.zh/docs/concepts/glossary.md +++ b/docs/content.zh/docs/concepts/glossary.md @@ -84,6 +84,11 @@ JobMaster 是在 [Flink JobManager](#flink-jobmanager) 运行中的组件之一 JobResultStore 是一个 Flink 组件,它将全局终止(已完成的、已取消的或失败的)作业的结果保存到文件系统中,从而使结果比已完成的作业更长久。 这些结果然后被 Flink 用来确定作业是否应该在高可用集群中被恢复。 +#### ApplicationResultStore + +ApplicationResultStore 是一个 Flink 组件,它将全局终止(已完成的、已取消的或失败的)应用程序的结果保存到文件系统中,从而使结果比已完成的应用程序更长久。 +这些结果然后被 Flink 用来确定应用程序是否应该在高可用集群中被恢复。 + #### Logical Graph 逻辑图是一种有向图,其中顶点是 [算子](#operator),边定义算子的输入/输出关系,并对应于数据流或数据集。通过从 [Flink Application](#flink-application) 提交作业来创建逻辑图。 diff --git a/docs/content.zh/docs/deployment/advanced/historyserver.md b/docs/content.zh/docs/deployment/advanced/historyserver.md index bef18431585e3..721ac7190f011 100644 --- a/docs/content.zh/docs/deployment/advanced/historyserver.md +++ b/docs/content.zh/docs/deployment/advanced/historyserver.md @@ -27,7 +27,7 @@ under the License. # History Server -Flink 提供了 history server,可以在相应的 Flink 集群关闭之后查询已完成作业的统计信息。 +Flink 提供了 history server,可以在相应的 Flink 集群关闭之后查询已完成作业和应用程序的统计信息。 此外,它暴露了一套 REST API,该 API 接受 HTTP 请求并返回 JSON 格式的数据。 @@ -37,7 +37,7 @@ Flink 提供了 history server,可以在相应的 Flink 集群关闭之后查 ## 概览 -HistoryServer 允许查询 JobManager 存档的已完成作业的状态和统计信息。 +HistoryServer 允许查询 JobManager 存档的已完成作业和应用程序的状态和统计信息。 在配置 HistoryServer *和* JobManager 之后,你可以使用相应的脚本来启动和停止 HistoryServer: @@ -58,20 +58,24 @@ bin/historyserver.sh (start|start-foreground|stop) **JobManager** -已完成作业的存档在 JobManager 上进行,将已存档的作业信息上传到文件系统目录中。你可以在 [Flink 配置文件]({{< ref "docs/deployment/config#flink-配置文件" >}})中通过 `jobmanager.archive.fs.dir` 设置一个目录存档已完成的作业。 +已完成作业和应用程序的存档在 JobManager 上进行,将已存档的作业和应用程序信息上传到文件系统目录中。你可以在 [Flink 配置文件]({{< ref "docs/deployment/config#flink-配置文件" >}})中通过 `jobmanager.archive.fs.dir` 设置一个目录存档已完成的作业和应用程序。 ```yaml -# 上传已完成作业信息的目录 -jobmanager.archive.fs.dir: hdfs:///completed-jobs +# 上传已完成作业和应用程序信息的目录 +jobmanager.archive.fs.dir: hdfs:///archives ``` +{{< hint info >}} +如需了解具体的目录结构,请参阅 [FLIP-549: Support Application Management](https://cwiki.apache.org/confluence/display/FLINK/FLIP-549%3A+Support+Application+Management)。 +{{< /hint >}} + **HistoryServer** 可以通过 `historyserver.archive.fs.dir` 设置 HistoryServer 监视以逗号分隔的目录列表。定期轮询已配置的目录以查找新的存档;轮询间隔可以通过 `historyserver.archive.fs.refresh-interval` 来配置。 ```yaml -# 监视以下目录中已完成的作业 -historyserver.archive.fs.dir: hdfs:///completed-jobs +# 监视以下目录中已完成的作业和应用程序 +historyserver.archive.fs.dir: hdfs:///archives # 每 10 秒刷新一次 historyserver.archive.fs.refresh-interval: 10000 @@ -105,6 +109,15 @@ historyserver.log.taskmanager.url-pattern: http://my.log-browsing.url///exceptions` 请求须写为 `http://hostname:port/jobs/7684be6004e4e955c2a558a9bc463f65/exceptions`。 +**应用程序相关请求** + + - `/applications/overview` + - `/applications/` + - `/applications//jobmanager/config` + - `/applications//exceptions` + +**作业相关请求** + - `/config` - `/jobs/overview` - `/jobs/` diff --git a/docs/content.zh/docs/deployment/config.md b/docs/content.zh/docs/deployment/config.md index 3665530e89915..5ab7f81ae96ac 100644 --- a/docs/content.zh/docs/deployment/config.md +++ b/docs/content.zh/docs/deployment/config.md @@ -258,6 +258,10 @@ The JobManager ensures consistency during recovery across TaskManagers. For the {{< generated/common_high_availability_jrs_section >}} +**Options for the ApplicationResultStore in high-availability setups** + +{{< generated/common_high_availability_ars_section >}} + **Options for high-availability setups with ZooKeeper** {{< generated/common_high_availability_zk_section >}} diff --git a/docs/content.zh/docs/deployment/ha/overview.md b/docs/content.zh/docs/deployment/ha/overview.md index 8ac115f5fee71..c0b685432e934 100644 --- a/docs/content.zh/docs/deployment/ha/overview.md +++ b/docs/content.zh/docs/deployment/ha/overview.md @@ -29,7 +29,14 @@ under the License. # 高可用 JobManager 高可用(HA)模式加强了 Flink 集群防止 JobManager 故障的能力。 -此特性确保 Flink 集群将始终持续执行你提交的作业。 +此特性确保 Flink 集群将始终重新执行在故障发生时正在运行的应用程序。 + +{{< hint warning >}} +恢复后,应用程序在故障前提交的作业可能会继续执行或被弃用,具体取决于应用程序 main() 方法采取的执行路径。 + +故障前后的作业按照名称进行匹配,相同名称的作业按照提交顺序进一步匹配。 +为避免匹配错误——尤其是在作业提交顺序不确定的情况下——建议通过 execute(jobName) 为每个作业指定唯一名称。 +{{< /hint >}} ## JobManager 高可用 @@ -70,7 +77,18 @@ Flink 提供了两种高可用服务实现: ## 高可用数据生命周期 -为了恢复提交的作业,Flink 持久化元数据和 job 组件。高可用数据将一直保存,直到相应的作业执行成功、被取消或最终失败。当这些情况发生时,将删除所有高可用数据,包括存储在高可用服务中的元数据。 +为了恢复提交的应用程序,Flink 持久化应用程序的元数据。 +高可用数据将一直保存,直到相应的应用程序执行成功、被取消或最终失败。 +当这些情况发生时,将删除所有高可用数据,包括存储在高可用服务中的元数据。 +类似的生命周期也适用于单个作业的高可用数据。 + +{{< top >}} + +## 应用程序结果存储 + +应用程序结果存储用于归档达到终止状态(即完成、取消或失败)的应用程序的最终结果,其数据存储在文件系统上(请参阅 [application-result-store.storage-path]({{< ref "docs/deployment/config#application-result-store-storage-path" >}}))。 +只要没有正确清理相应的应用程序,此数据条目就是脏数据(数据位于应用程序的子文件夹中 [high-availability.storageDir]({{< ref "docs/deployment/config#high-availability-storagedir" >}}))。 +脏数据将被清理,即相应的应用程序要么在当前时刻被清理,要么在应用程序恢复过程中被清理。一旦清理成功,这些脏数据条目将被删除。请参阅 [HA configuration options]({{< ref "docs/deployment/config#high-availability" >}}) 下应用程序结果存储的配置参数以获取有关如何调整行为的更多详细信息。 {{< top >}} @@ -78,5 +96,5 @@ Flink 提供了两种高可用服务实现: 作业结果存储用于归档达到全局结束状态作业(即完成、取消或失败)的最终结果,其数据存储在文件系统上 (请参阅[job-result-store.storage-path]({{< ref "docs/deployment/config#job-result-store-storage-path" >}}))。 只要没有正确清理相应的作业,此数据条目就是脏数据 (数据位于作业的子文件夹中 [high-availability.storageDir]({{< ref "docs/deployment/config#high-availability-storagedir" >}}))。 -脏数据将被清理,即相应的作业要么在当前时刻被清理,要么在作业恢复过程中被清理。一旦清理成功,这些脏数据条目将被删除。请参阅 [HA configuration options]({{< ref "docs/deployment/config#high-availability" >}}) 下作业结果存储的配置参数以获取有关如何调整行为的更多详细信息。 +脏数据将被清理,即相应的作业要么在当前时刻被清理,要么在作业恢复过程中被清理。这些条目将在清理成功且对应的应用程序已创建脏条目后被删除。 {{< top >}} diff --git a/docs/content.zh/docs/deployment/overview.md b/docs/content.zh/docs/deployment/overview.md index 4c6fc3d809dd1..290569c8c1e4d 100644 --- a/docs/content.zh/docs/deployment/overview.md +++ b/docs/content.zh/docs/deployment/overview.md @@ -73,8 +73,8 @@ When deploying Flink, there are often multiple options available for each buildi JobManager is the name of the central work coordination component of Flink. It has implementations for different resource providers, which differ on high-availability, resource allocation behavior and supported job submission modes.
JobManager modes for job submissions:
    -
  • Application Mode: runs the cluster exclusively for one application. The job's main method (or client) gets executed on the JobManager. Calling `execute`/`executeAsync` multiple times in an application is supported.
  • -
  • Session Mode: one JobManager instance manages multiple jobs sharing the same cluster of TaskManagers
  • +
  • Application Mode: runs the cluster exclusively for one application. The application's main method (or client) gets executed on the JobManager. Calling `execute`/`executeAsync` multiple times in an application is supported.
  • +
  • Session Mode: one JobManager instance manages multiple applications (and all jobs within them) sharing the same cluster of TaskManagers
@@ -168,6 +168,10 @@ while subsuming them as part of the usual CompletedCheckpoint management. These not covered by the repeatable cleanup, i.e. they have to be deleted manually, still. This is covered by [FLINK-26606](https://issues.apache.org/jira/browse/FLINK-26606). +The application resource cleanup is similar (see the +[High Availability Services / ApplicationResultStore]({{< ref "docs/deployment/ha/overview#applicationresultstore" >}}) +section for further details). + ## Deployment Modes Flink can execute applications in two modes: @@ -184,14 +188,14 @@ Flink can execute applications in two modes: #### Application Mode -In all the other modes, the application's `main()` method is executed on the client side. This process +If the application's `main()` method is executed on the client side, this process includes downloading the application's dependencies locally, executing the `main()` to extract a representation of the application that Flink's runtime can understand (i.e. the `JobGraph`) and ship the dependencies and the `JobGraph(s)` to the cluster. This makes the Client a heavy resource consumer as it may need substantial network bandwidth to download dependencies and ship binaries to the cluster, and CPU cycles to execute the `main()`. This problem can be more pronounced when the Client is shared across users. -Building on this observation, the *Application Mode* creates a cluster per submitted application, but this time, +Building on this observation, the *Application Mode* creates a cluster per submitted application, and the `main()` method of the application is executed on the JobManager. Creating a cluster per application can be seen as creating a session cluster shared only among the jobs of a particular application, and torn down when the application finishes. With this architecture, the *Application Mode* provides the application granularity resource isolation @@ -213,12 +217,14 @@ execution of the "next" job being postponed until "this" job finishes. Using `e non-blocking, will lead to the "next" job starting before "this" job finishes. {{< hint warning >}} -The Application Mode allows for multi-`execute()` applications but -High-Availability is not supported in these cases. High-Availability in Application Mode is only -supported for single-`execute()` applications. - -Additionally, when any of multiple running jobs in Application Mode (submitted for example using -`executeAsync()`) gets cancelled, all jobs will be stopped and the JobManager will shut down. +The Application Mode allows for multi-job applications (by calling `execute()` or `executeAsync()` multiple times in the `main()` method) but +High-Availability is limited in these cases. High-Availability in Application Mode is only +supported for applications with a single streaming job or multiple batch jobs. +For more details, see [FLIP-560](https://cwiki.apache.org/confluence/display/FLINK/FLIP-560%3A+Application+Capability+Enhancement). + +Additionally, when any of multiple running jobs in Application Mode (submitted for example using +`executeAsync()`) gets cancelled, all jobs will be stopped and the JobManager will shut down by default. +This behavior can be configured through the [`execution.terminate-application-on-any-job-terminated-exceptionally`]({{< ref "docs/deployment/config" >}}#execution-terminate-application-on-any-job-terminated-exceptionally) option. Regular job completions (by the sources shutting down) are supported. {{< /hint >}} @@ -234,13 +240,21 @@ restarting jobs accessing the filesystem concurrently and making it unavailable Additionally, having a single cluster running multiple jobs implies more load for the JobManager, who is responsible for the book-keeping of all the jobs in the cluster. +In Session Mode, the application's `main()` method can be executed either on the client or on the cluster. +When submitting applications via Command-Line Interface (CLI) or the SQL Client, the `main()` method is executed on the client. +However, when submitting applications via the REST API `/jars/:jarid/run-application`, +the `main()` method is executed on the cluster. +This provides the same benefits as Application Mode in terms of resource usage and network bandwidth for the client, +while still maintaining the shared cluster resource model of Session Mode. #### Summary -In *Session Mode*, the cluster lifecycle is independent of that of any job running on the cluster -and the resources are shared across all jobs. The -*Application Mode* creates a session cluster per application and executes the application's `main()` +In *Session Mode*, the cluster lifecycle is independent of that of any application running on the cluster +and the resources are shared across all applications. The application's `main()` method can be executed either on the client or on the cluster. +*Application Mode* creates a session cluster per application and executes the application's `main()` method on the cluster. +It thus comes with better resource isolation as the resources are only used by the job(s) launched from a single `main()` method. +This comes at the price of spinning up a dedicated cluster for each application. diff --git a/docs/content.zh/docs/internals/application_lifecycle.md b/docs/content.zh/docs/internals/application_lifecycle.md new file mode 100644 index 0000000000000..1ac309d80a0bb --- /dev/null +++ b/docs/content.zh/docs/internals/application_lifecycle.md @@ -0,0 +1,78 @@ +--- +title: "应用程序生命周期" +weight: 9 +type: docs +aliases: + - /internals/application_lifecycle.html +--- + + +# 应用程序生命周期 + +Flink 中的应用程序代表一段用于执行的用户自定义逻辑。它提供了统一的抽象来跟踪用户 `main()` 方法的执行状态并管理其关联的作业。更多详情请参阅 [FLIP-549](https://cwiki.apache.org/confluence/display/FLINK/FLIP-549%3A+Support+Application+Management) +和 [FLIP-560](https://cwiki.apache.org/confluence/display/FLINK/FLIP-560%3A+Application+Capability+Enhancement)。 + +## 集群-应用程序-作业架构 + +Flink 现在使用三层结构:**集群-应用程序-作业**。该结构统一了不同的部署模式,并提供了用户逻辑执行的可观测性和可管理性。 + +集群可以在两种模式下运行: +- **应用程序模式**:每个应用程序一个集群 +- **会话模式**:一个集群用于多个应用程序 + +一个应用程序可以包含 0 到 N 个作业,每个作业与一个应用程序关联。 + +## 应用程序实现 + +{{< gh_link file="/flink-runtime/src/main/java/org/apache/flink/runtime/application/AbstractApplication.java" name="AbstractApplication" >}} 是所有应用程序的基类。Flink 提供了两个具体实现: + +- **PackagedProgramApplication**:封装用户 JAR 并执行其 `main()` 方法。适用于**应用程序模式**或**通过 REST API `/jars/:jarid/run-application` 提交的会话模式**。应用程序的生命周期与用户 `main()` 方法的执行绑定。 + +- **SingleJobApplication**:将单个作业的提交封装为一个轻量级的 `main()` 方法。适用于提交单个作业的情况,例如**通过 CLI 提交的会话模式**。应用程序的生命周期与作业的执行状态绑定。 + +## 应用程序状态 + +应用程序从 *已创建* (created) 状态开始,一旦执行开始则切换到 *运行中* (running) 状态。 +当执行正常完成且与该应用程序关联的所有作业都已达到终止状态时,应用程序转换到 *已结束* (finished) 状态。 +如果发生故障,应用程序首先切换到 *失败中* (failing) 状态,在此状态下它会取消所有非终止状态的作业。所有作业达到终止状态后,应用程序转换到 *已失败* (failed) 状态。 + +如果用户取消应用程序,它将进入 *取消中* (canceling) 状态。 +这也意味着取消其所有非终止状态的作业。 +一旦所有作业达到终止状态,应用程序转换到 *已取消* (canceled) 状态。 + +状态 *已结束*、*已取消* 和 *已失败* 是终止状态,会触发应用程序的归档和清理操作。 + +{{< img src="/fig/application_status.png" alt="应用程序的状态和转换" width="50%" >}} + +## 应用程序提交 + +应用程序提交到集群并通过不同的机制开始执行,具体取决于部署模式和提交方式。 + +在**应用程序模式**中,集群是为单个应用程序专门启动的。在集群启动期间,会自动从用户的 JAR 文件生成一个 `PackagedProgramApplication`。集群就绪后应用程序立即开始执行,其生命周期与用户 `main()` 方法的执行绑定。 + +在**会话模式**中,多个应用程序可以共享同一个集群。应用程序可以通过各种接口提交: + +- **REST API (`/jars/:jarid/run-application`)**:此端点从上传的用户 JAR 创建一个 `PackagedProgramApplication` 并开始执行。与应用程序模式类似,应用程序的生命周期与用户的 `main()` 方法绑定。 + +- **REST API (`/jars/:jarid/run`)** 和 **CLI 提交**:这些接口直接执行用户的 `main()` 方法。当方法调用 `execute()` 提交作业时,该作业被封装为 `SingleJobApplication`。在这种情况下,应用程序的生命周期与作业的执行状态绑定,使其成为单作业提交的轻量级封装。 + +{{< gh_link file="/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java" name="Dispatcher" >}} 管理集群中的所有应用程序。它提供查询应用程序状态、管理应用程序生命周期以及处理取消和恢复等操作的接口。 + +{{< top >}} diff --git a/docs/content/docs/concepts/flink-architecture.md b/docs/content/docs/concepts/flink-architecture.md index 31792c36285f9..dec7bb56c9904 100644 --- a/docs/content/docs/concepts/flink-architecture.md +++ b/docs/content/docs/concepts/flink-architecture.md @@ -192,10 +192,10 @@ isolation guarantees. ### Flink Session Cluster * **Cluster Lifecycle**: in a Flink Session Cluster, the client connects to a - pre-existing, long-running cluster that can accept multiple job submissions. - Even after all jobs are finished, the cluster (and the JobManager) will + pre-existing, long-running cluster that can accept multiple application submissions. + Even after all applications are finished, the cluster (and the JobManager) will keep running until the session is manually stopped. The lifetime of a Flink - Session Cluster is therefore not bound to the lifetime of any Flink Job. + Session Cluster is therefore not bound to the lifetime of any Flink Application. * **Resource Isolation**: TaskManager slots are allocated by the ResourceManager on job submission and released once the job is finished. diff --git a/docs/content/docs/concepts/glossary.md b/docs/content/docs/concepts/glossary.md index 6d38d5acb6a77..7224bb455356c 100644 --- a/docs/content/docs/concepts/glossary.md +++ b/docs/content/docs/concepts/glossary.md @@ -111,6 +111,13 @@ The JobResultStore is a Flink component that persists the results of globally te a finished job. These results are then used by Flink to determine whether jobs should be subject to recovery in highly-available clusters. +#### ApplicationResultStore + +The ApplicationResultStore is a Flink component that persists the results of terminated +(i.e. finished, cancelled or failed) applications to a filesystem, allowing the results to outlive +a terminated application. These results are then used by Flink to determine whether applications should +be subject to recovery in highly-available clusters. + #### Logical Graph A logical graph is a directed graph where the nodes are [Operators](#operator) diff --git a/docs/content/docs/deployment/advanced/historyserver.md b/docs/content/docs/deployment/advanced/historyserver.md index 7d0053bb23025..174af7c1f59c2 100644 --- a/docs/content/docs/deployment/advanced/historyserver.md +++ b/docs/content/docs/deployment/advanced/historyserver.md @@ -27,13 +27,13 @@ under the License. # History Server -Flink has a history server that can be used to query the statistics of completed jobs after the corresponding Flink cluster has been shut down. +Flink has a history server that can be used to query the statistics of completed jobs and applications after the corresponding Flink cluster has been shut down. Furthermore, it exposes a REST API that accepts HTTP requests and responds with JSON data. ## Overview -The HistoryServer allows you to query the status and statistics of completed jobs that have been archived by a JobManager. +The HistoryServer allows you to query the status and statistics of completed jobs and applications that have been archived by a JobManager. After you have configured the HistoryServer *and* JobManager, you start and stop the HistoryServer via its corresponding startup script: @@ -52,20 +52,24 @@ The configuration keys `jobmanager.archive.fs.dir` and `historyserver.archive.fs **JobManager** -The archiving of completed jobs happens on the JobManager, which uploads the archived job information to a file system directory. You can configure the directory to archive completed jobs in [Flink configuration file]({{< ref "docs/deployment/config#flink-configuration-file" >}}) by setting a directory via `jobmanager.archive.fs.dir`. +The archiving of completed jobs and applications happens on the JobManager, which uploads the archived job and application information to a file system directory. You can configure the directory to archive completed jobs and applications in [Flink configuration file]({{< ref "docs/deployment/config#flink-configuration-file" >}}) by setting a directory via `jobmanager.archive.fs.dir`. ```yaml # Directory to upload completed job information -jobmanager.archive.fs.dir: hdfs:///completed-jobs +jobmanager.archive.fs.dir: hdfs:///archives ``` +{{< hint info >}} +For details on the specific directory structure, please refer to [FLIP-549: Support Application Management](https://cwiki.apache.org/confluence/display/FLINK/FLIP-549%3A+Support+Application+Management). +{{< /hint >}} + **HistoryServer** The HistoryServer can be configured to monitor a comma-separated list of directories in via `historyserver.archive.fs.dir`. The configured directories are regularly polled for new archives; the polling interval can be configured via `historyserver.archive.fs.refresh-interval`. ```yaml # Monitor the following directories for completed jobs -historyserver.archive.fs.dir: hdfs:///completed-jobs +historyserver.archive.fs.dir: hdfs:///archives # Refresh every 10 seconds historyserver.archive.fs.refresh-interval: 10000 @@ -97,6 +101,15 @@ Below is a list of available requests, with a sample JSON response. All requests Values in angle brackets are variables, for example `http://hostname:port/jobs//exceptions` will have to requested for example as `http://hostname:port/jobs/7684be6004e4e955c2a558a9bc463f65/exceptions`. +**Application-related requests** + + - `/applications/overview` + - `/applications/` + - `/applications//jobmanager/config` + - `/applications//exceptions` + +**Job-related requests** + - `/config` - `/jobs/overview` - `/jobs/` diff --git a/docs/content/docs/deployment/config.md b/docs/content/docs/deployment/config.md index 9095155dc1cc5..6d0ef4116c8bc 100644 --- a/docs/content/docs/deployment/config.md +++ b/docs/content/docs/deployment/config.md @@ -259,6 +259,10 @@ The JobManager ensures consistency during recovery across TaskManagers. For the {{< generated/common_high_availability_jrs_section >}} +**Options for the ApplicationResultStore in high-availability setups** + +{{< generated/common_high_availability_ars_section >}} + **Options for high-availability setups with ZooKeeper** {{< generated/common_high_availability_zk_section >}} diff --git a/docs/content/docs/deployment/ha/overview.md b/docs/content/docs/deployment/ha/overview.md index 1939474680a26..b877a33340fce 100644 --- a/docs/content/docs/deployment/ha/overview.md +++ b/docs/content/docs/deployment/ha/overview.md @@ -29,7 +29,16 @@ under the License. # High Availability JobManager High Availability (HA) hardens a Flink cluster against JobManager failures. -This feature ensures that a Flink cluster will always continue executing your submitted jobs. +This feature ensures that a Flink cluster will always re-execute your submitted applications that were running at the time of a failure. + +{{< hint warning >}} +After recovery, The jobs submitted by the application before the failure may either resume execution or be deprecated, +depending on the execution path taken in the application's main() method. + +Jobs before and after a failure are matched by name, and those with identical names are further matched based on their submission order. +To avoid mismatches—especially when job submission order is non-deterministic—we recommend assigning each job a unique name via `execute(jobName)`. +{{< /hint >}} + ## JobManager High Availability @@ -74,9 +83,26 @@ Kubernetes HA services only work when running on Kubernetes. ## High Availability data lifecycle -In order to recover submitted jobs, Flink persists metadata and the job artifacts. -The HA data will be kept until the respective job either succeeds, is cancelled or fails terminally. -Once this happens, all the HA data, including the metadata stored in the HA services, will be deleted. +In order to recover submitted applications, Flink persists metadata for the applications. +The HA data will be kept until the respective application either succeeds, is cancelled or fails terminally. +Once this happens, all the HA data, including the metadata stored in the HA services, will be deleted. +Similar lifecycle applies to the HA data for individual jobs. + +{{< top >}} + +## ApplicationResultStore + +The ApplicationResultStore is used to archive the final result of an application that reached a terminal +state (i.e. finished, cancelled or failed). The data is stored on a file system (see +[application-result-store.storage-path]({{< ref "docs/deployment/config#application-result-store-storage-path" >}})). +Entries in this store are marked as dirty as long as the corresponding application wasn't cleaned up properly +(artifacts are found in the application's subfolder in [high-availability.storageDir]({{< ref "docs/deployment/config#high-availability-storagedir" >}})). + +Dirty entries are subject to cleanup, i.e. the corresponding application is either cleaned up by Flink at +the moment or will be picked up for cleanup as part of a recovery. The entries will be deleted as +soon as the cleanup succeeds. Check the ApplicationResultStore configuration parameters under +[HA configuration options]({{< ref "docs/deployment/config#high-availability" >}}) for further +details on how to adapt the behavior. {{< top >}} @@ -89,9 +115,7 @@ Entries in this store are marked as dirty as long as the corresponding job wasn' (artifacts are found in the job's subfolder in [high-availability.storageDir]({{< ref "docs/deployment/config#high-availability-storagedir" >}})). Dirty entries are subject to cleanup, i.e. the corresponding job is either cleaned up by Flink at -the moment or will be picked up for cleanup as part of a recovery. The entries will be deleted as -soon as the cleanup succeeds. Check the JobResultStore configuration parameters under -[HA configuration options]({{< ref "docs/deployment/config#high-availability" >}}) for further -details on how to adapt the behavior. +the moment or will be picked up for cleanup as part of a recovery. The entries will be deleted +once the cleanup succeeds and the corresponding application has created a dirty entry. {{< top >}} diff --git a/docs/content/docs/deployment/overview.md b/docs/content/docs/deployment/overview.md index aa606f3e177ae..d5004ea087a98 100644 --- a/docs/content/docs/deployment/overview.md +++ b/docs/content/docs/deployment/overview.md @@ -73,8 +73,8 @@ When deploying Flink, there are often multiple options available for each buildi JobManager is the name of the central work coordination component of Flink. It has implementations for different resource providers, which differ on high-availability, resource allocation behavior and supported job submission modes.
JobManager modes for job submissions:
    -
  • Application Mode: runs the cluster exclusively for one application. The job's main method (or client) gets executed on the JobManager. Calling `execute`/`executeAsync` multiple times in an application is supported.
  • -
  • Session Mode: one JobManager instance manages multiple jobs sharing the same cluster of TaskManagers
  • +
  • Application Mode: runs the cluster exclusively for one application. The application's main method (or client) gets executed on the JobManager. Calling `execute`/`executeAsync` multiple times in an application is supported.
  • +
  • Session Mode: one JobManager instance manages multiple applications (and all jobs within them) sharing the same cluster of TaskManagers.
@@ -169,6 +169,10 @@ while subsuming them as part of the usual CompletedCheckpoint management. These not covered by the repeatable cleanup, i.e. they have to be deleted manually, still. This is covered by [FLINK-26606](https://issues.apache.org/jira/browse/FLINK-26606). +The application resource cleanup is similar (see the +[High Availability Services / ApplicationResultStore]({{< ref "docs/deployment/ha/overview#applicationresultstore" >}}) +section for further details). + ## Deployment Modes Flink can execute applications in two modes: @@ -184,14 +188,14 @@ Flink can execute applications in two modes: ### Application Mode -In all the other modes, the application's `main()` method is executed on the client side. This process +If the application's `main()` method is executed on the client side, this process includes downloading the application's dependencies locally, executing the `main()` to extract a representation of the application that Flink's runtime can understand (i.e. the `JobGraph`) and ship the dependencies and the `JobGraph(s)` to the cluster. This makes the Client a heavy resource consumer as it may need substantial network bandwidth to download dependencies and ship binaries to the cluster, and CPU cycles to execute the `main()`. This problem can be more pronounced when the Client is shared across users. -Building on this observation, the *Application Mode* creates a cluster per submitted application, but this time, +Building on this observation, the *Application Mode* creates a cluster per submitted application, and the `main()` method of the application is executed by the *JobManager*. Creating a cluster per application can be seen as creating a session cluster shared only among the jobs of a particular application, and turning down when the application finishes. With this architecture, the *Application Mode* provides the application granularity resource isolation @@ -216,12 +220,14 @@ execution of the "next" job being postponed until "this" job finishes. Using `e non-blocking, will lead to the "next" job starting before "this" job finishes. {{< hint warning >}} -The Application Mode allows for multi-`execute()` applications but -High-Availability is not supported in these cases. High-Availability in Application Mode is only -supported for single-`execute()` applications. +The Application Mode allows for multi-job applications (by calling `execute()` or `executeAsync()` multiple times in the `main()` method) but +High-Availability is limited in these cases. High-Availability in Application Mode is only +supported for applications with a single streaming job or multiple batch jobs. +For more details, see [FLIP-560](https://cwiki.apache.org/confluence/display/FLINK/FLIP-560%3A+Application+Capability+Enhancement). Additionally, when any of multiple running jobs in Application Mode (submitted for example using -`executeAsync()`) gets cancelled, all jobs will be stopped and the JobManager will shut down. +`executeAsync()`) gets cancelled, all jobs will be stopped and the JobManager will shut down by default. +This behavior can be configured through the [`execution.terminate-application-on-any-job-terminated-exceptionally`]({{< ref "docs/deployment/config" >}}#execution-terminate-application-on-any-job-terminated-exceptionally) option. Regular job completions (by the sources shutting down) are supported. {{< /hint >}} @@ -237,14 +243,21 @@ restarting jobs accessing the filesystem concurrently and making it unavailable Additionally, having a single cluster running multiple jobs implies more load for the JobManager, who is responsible for the book-keeping of all the jobs in the cluster. +In Session Mode, the application's `main()` method can be executed either on the client or on the cluster. +When submitting applications via Command-Line Interface (CLI) or the SQL Client, the `main()` method is executed on the client. +However, when submitting applications via the REST API `/jars/:jarid/run-application`, +the `main()` method is executed on the cluster. +This provides the same benefits as Application Mode in terms of resource usage and network bandwidth for the client, +while still maintaining the shared cluster resource model of Session Mode. + ### Summary -In *Session Mode*, the cluster lifecycle is independent of that of any job running on the cluster -and the resources are shared across all jobs. -*Application Mode* creates a session cluster per application and executes the application's `main()` +In *Session Mode*, the cluster lifecycle is independent of that of any application running on the cluster +and the resources are shared across all applications. The application's `main()` method can be executed either on the client or on the cluster. +*Application Mode* creates a session cluster per application and executes the application's `main()` method on the cluster. -It thus comes with better resource isolation as the resources are only used by the job(s) launched from a single `main()` method. -This comes at the price of spining up a dedicated cluster for each application. +It thus comes with better resource isolation as the resources are only used by the job(s) launched from a single `main()` method. +This comes at the price of spinning up a dedicated cluster for each application. ## Vendor Solutions diff --git a/docs/content/docs/internals/application_lifecycle.md b/docs/content/docs/internals/application_lifecycle.md new file mode 100644 index 0000000000000..f27a3f7957aaf --- /dev/null +++ b/docs/content/docs/internals/application_lifecycle.md @@ -0,0 +1,81 @@ +--- +title: "Application Lifecycle" +weight: 9 +type: docs +aliases: + - /internals/application_lifecycle.html +--- + + +# Application Lifecycle + +An application in Flink represents a piece of user-defined logic for execution. +It provides a unified abstraction for tracking the execution status of user `main()` methods and managing their associated jobs. +For more details, see [FLIP-549](https://cwiki.apache.org/confluence/display/FLINK/FLIP-549%3A+Support+Application+Management) +and [FLIP-560](https://cwiki.apache.org/confluence/display/FLINK/FLIP-560%3A+Application+Capability+Enhancement). + +## Cluster-Application-Job Architecture + +Flink now uses a three-tier structure: **Cluster-Application-Job**. This structure unifies different deployment modes and provides +observability and manageability of user logic execution. + +A cluster can operate in two modes: +- **Application Mode**: One cluster per application +- **Session Mode**: One cluster for multiple applications + +An application can contain 0 to N jobs, with each job associated with exactly one application. + +## Application Implementations + +The {{< gh_link file="/flink-runtime/src/main/java/org/apache/flink/runtime/application/AbstractApplication.java" name="AbstractApplication" >}} is the base class for all applications. Flink provides two concrete implementations: + +- **PackagedProgramApplication**: Wraps a user JAR and executes its `main()` method. Suitable for **Application Mode** or **Session Mode with REST submission via `/jars/:jarid/run-application`**. The application's lifecycle is tied to the execution of the user's `main()` method. + +- **SingleJobApplication**: Wraps the submission of a single job as a lightweight `main()` method. Suitable for cases where a single job is submitted, such as **Session Mode with CLI submission**. The application lifecycle is tied to the job's execution status. + +## Application Status + +An application starts in the *created* state, then switches to *running* once execution begins. +When the execution completes normally and all jobs associated with the application have reached a terminal state, the application transitions to *finished*. +In case of failures, an application switches first to *failing* where it cancels all its non-terminal jobs. After all jobs have reached a terminal state, the application transitions to *failed*. + +In case that the user cancels the application, it will go into the *canceling* state. +This also entails the cancellation of all its non-terminal jobs. +Once all jobs have reached a terminal state, the application transitions to the state *canceled*. + +The states *finished*, *canceled*, and *failed* are terminal states and trigger archiving and cleanup operations for the application. + +{{< img src="/fig/application_status.png" alt="States and Transitions of Application" width="50%" >}} + +## Application Submission + +Applications are submitted to the cluster and start to execute through different mechanisms depending on the deployment mode and submission method. + +In **Application Mode**, the cluster is started specifically for a single application. During cluster startup, a `PackagedProgramApplication` is automatically generated from the user's JAR file. The application begins executing immediately after the cluster is ready, with the lifecycle tied to the execution of the user's `main()` method. + +In **Session Mode**, multiple applications can share the same cluster. Applications can be submitted through various interfaces: + +- **REST API (`/jars/:jarid/run-application`)**: This endpoint creates a `PackagedProgramApplication` from the uploaded user JAR and begins execution. Like Application Mode, the application's lifecycle is tied to the user's `main()` method. + +- **REST API (`/jars/:jarid/run`)** and **CLI submission**: These interfaces directly execute the user's `main()` method. When the method calls `execute()` to submit a job, the job is wrapped as a `SingleJobApplication`. In this case, the application lifecycle is tied to the job's execution status, making it a lightweight wrapper for single-job submissions. + +The {{< gh_link file="/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java" name="Dispatcher" >}} manages all applications in the cluster. It provides interfaces for querying application status, managing application lifecycle, and handling operations such as cancellation and recovery. + +{{< top >}} diff --git a/docs/static/fig/application_status.png b/docs/static/fig/application_status.png new file mode 100644 index 0000000000000..e15f4c6d5fd0e Binary files /dev/null and b/docs/static/fig/application_status.png differ