diff --git a/flink-runtime-web/web-dashboard/src/app/interfaces/job-detail.ts b/flink-runtime-web/web-dashboard/src/app/interfaces/job-detail.ts index 21724a6b41074..e178fd6f0eaa1 100644 --- a/flink-runtime-web/web-dashboard/src/app/interfaces/job-detail.ts +++ b/flink-runtime-web/web-dashboard/src/app/interfaces/job-detail.ts @@ -61,6 +61,7 @@ export interface JobDetail { 'stream-graph': StreamGraph; 'pending-operators': number; 'application-id': string; + schedulerType: string; } interface Plan { diff --git a/flink-runtime-web/web-dashboard/src/app/interfaces/job-rescales.ts b/flink-runtime-web/web-dashboard/src/app/interfaces/job-rescales.ts new file mode 100644 index 0000000000000..b5db266ddb6a3 --- /dev/null +++ b/flink-runtime-web/web-dashboard/src/app/interfaces/job-rescales.ts @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +export interface RescalesConfig { + rescaleHistoryMax: number; + schedulerExecutionMode: string; + submissionResourceWaitTimeoutInMillis: number; + submissionResourceStabilizationTimeoutInMillis: number; + slotIdleTimeoutInMillis: number; + executingCooldownTimeoutInMillis: number; + executingResourceStabilizationTimeoutInMillis: number; + maximumDelayForTriggeringRescaleInMillis: number; + rescaleOnFailedCheckpointCount: number; +} diff --git a/flink-runtime-web/web-dashboard/src/app/interfaces/public-api.ts b/flink-runtime-web/web-dashboard/src/app/interfaces/public-api.ts index 38d072557e173..86a6116d2235a 100644 --- a/flink-runtime-web/web-dashboard/src/app/interfaces/public-api.ts +++ b/flink-runtime-web/web-dashboard/src/app/interfaces/public-api.ts @@ -27,6 +27,7 @@ export * from './job-vertex'; export * from './job-checkpoint'; export * from './job-backpressure'; export * from './job-flamegraph'; +export * from './job-rescales'; export * from './plan'; export * from './overview'; export * from './task-manager'; diff --git a/flink-runtime-web/web-dashboard/src/app/pages/job/job-detail/status/job-status.component.ts b/flink-runtime-web/web-dashboard/src/app/pages/job/job-detail/status/job-status.component.ts index 8d7ded6d29881..b243d136360a4 100644 --- a/flink-runtime-web/web-dashboard/src/app/pages/job/job-detail/status/job-status.component.ts +++ b/flink-runtime-web/web-dashboard/src/app/pages/job/job-detail/status/job-status.component.ts @@ -63,6 +63,7 @@ export class JobStatusComponent implements OnInit, OnDestroy { urlLoading = true; readonly listOfNavigation: RouterTab[]; private readonly checkpointIndexOfNavigation: number; + private readonly rescalesIndexOfNavigation: number; webCancelEnabled = this.statusService.configuration.features['web-cancel']; isHistoryServer = this.statusService.configuration.features['web-history']; @@ -80,6 +81,7 @@ export class JobStatusComponent implements OnInit, OnDestroy { ) { this.listOfNavigation = moduleConfig.routerTabs || JOB_MODULE_DEFAULT_CONFIG.routerTabs; this.checkpointIndexOfNavigation = this.checkpointIndexOfNav(); + this.rescalesIndexOfNavigation = this.rescalesIndexOfNav(); } ngOnInit(): void { @@ -126,16 +128,31 @@ export class JobStatusComponent implements OnInit, OnDestroy { return this.listOfNavigation.findIndex(item => item.path === 'checkpoints'); } + rescalesIndexOfNav(): number { + return this.listOfNavigation.findIndex(item => item.path === 'rescales'); + } + private handleJobDetailChanged(data: JobDetailCorrect): void { this.jobDetail = data; - const index = this.checkpointIndexOfNav(); - if (data.plan.type == 'STREAMING' && index == -1) { + const checkpointNavIndex = this.checkpointIndexOfNav(); + if (data.plan.type == 'STREAMING' && checkpointNavIndex == -1) { this.listOfNavigation.splice(this.checkpointIndexOfNavigation, 0, { path: 'checkpoints', title: 'Checkpoints' }); - } else if (data.plan.type == 'BATCH' && index > -1) { - this.listOfNavigation.splice(index, 1); + } else if (data.plan.type == 'BATCH' && checkpointNavIndex > -1) { + this.listOfNavigation.splice(checkpointNavIndex, 1); + } + + const rescalesNavIndex = this.rescalesIndexOfNav(); + const shouldShowRescales = data.plan.type == 'STREAMING' && data.schedulerType == 'Adaptive'; + if (!shouldShowRescales && rescalesNavIndex > -1) { + this.listOfNavigation.splice(rescalesNavIndex, 1); + } else if (shouldShowRescales && rescalesNavIndex == -1) { + this.listOfNavigation.splice(this.rescalesIndexOfNavigation, 0, { + path: 'rescales', + title: 'Rescales' + }); } this.cdr.markForCheck(); } diff --git a/flink-runtime-web/web-dashboard/src/app/pages/job/job.config.ts b/flink-runtime-web/web-dashboard/src/app/pages/job/job.config.ts index 59ef102751895..4995f1773ffc4 100644 --- a/flink-runtime-web/web-dashboard/src/app/pages/job/job.config.ts +++ b/flink-runtime-web/web-dashboard/src/app/pages/job/job.config.ts @@ -29,7 +29,8 @@ export const JOB_MODULE_DEFAULT_CONFIG: Required = { { title: 'Data Skew', path: 'dataskew' }, { title: 'TimeLine', path: 'timeline' }, { title: 'Checkpoints', path: 'checkpoints' }, - { title: 'Configuration', path: 'configuration' } + { title: 'Configuration', path: 'configuration' }, + { title: 'Rescales', path: 'rescales' } ] }; diff --git a/flink-runtime-web/web-dashboard/src/app/pages/job/modules/completed-job/routes.ts b/flink-runtime-web/web-dashboard/src/app/pages/job/modules/completed-job/routes.ts index 496acbaf2e218..1d70929daf838 100644 --- a/flink-runtime-web/web-dashboard/src/app/pages/job/modules/completed-job/routes.ts +++ b/flink-runtime-web/web-dashboard/src/app/pages/job/modules/completed-job/routes.ts @@ -53,6 +53,7 @@ const OVERRIDE_JOB_MODULE_CONFIG_FACTORY = (statusService: StatusService): JobMo { title: 'TimeLine', path: 'timeline' }, { title: 'Checkpoints', path: 'checkpoints' }, { title: 'Job Configuration', path: 'configuration' }, + { title: 'Rescales', path: 'rescales' }, { title: 'Cluster Configuration', path: 'cluster_configuration' } ] : JOB_MODULE_DEFAULT_CONFIG.routerTabs @@ -129,6 +130,14 @@ export const COMPLETED_JOB_ROUES: Routes = [ path: 'configuration' } }, + { + path: 'rescales', + loadComponent: () => + import('@flink-runtime-web/pages/job/rescales/job-rescales.component').then(m => m.JobRescalesComponent), + data: { + path: 'rescales' + } + }, { path: 'cluster_configuration', loadComponent: () => diff --git a/flink-runtime-web/web-dashboard/src/app/pages/job/modules/running-job/routes.ts b/flink-runtime-web/web-dashboard/src/app/pages/job/modules/running-job/routes.ts index c09afabf9ab13..cf4e2a2722bf8 100644 --- a/flink-runtime-web/web-dashboard/src/app/pages/job/modules/running-job/routes.ts +++ b/flink-runtime-web/web-dashboard/src/app/pages/job/modules/running-job/routes.ts @@ -80,6 +80,14 @@ export const RUNNING_JOB_ROUTES: Routes = [ path: 'configuration' } }, + { + path: 'rescales', + loadComponent: () => + import('@flink-runtime-web/pages/job/rescales/job-rescales.component').then(m => m.JobRescalesComponent), + data: { + path: 'rescales' + } + }, { path: '**', redirectTo: 'overview', pathMatch: 'full' } ] } diff --git a/flink-runtime-web/web-dashboard/src/app/pages/job/rescales/job-rescales.component.html b/flink-runtime-web/web-dashboard/src/app/pages/job/rescales/job-rescales.component.html new file mode 100644 index 0000000000000..2e90f2fbb7f61 --- /dev/null +++ b/flink-runtime-web/web-dashboard/src/app/pages/job/rescales/job-rescales.component.html @@ -0,0 +1,103 @@ + + + + + + + + Option + Value + + + + + + Scheduler Execution Mode + REACTIVE + + + + Submission Resource Wait Timeout + + {{ rescalesConfig['submissionResourceWaitTimeoutInMillis'] | humanizeDuration }} + + + + Submission ResourceStabilization Timeout + + {{ + rescalesConfig['submissionResourceStabilizationTimeoutInMillis'] | humanizeDuration + }} + + + + Slot Idle Timeout + {{ rescalesConfig['slotIdleTimeoutInMillis'] | humanizeDuration }} + + + Executing Cooldown Timeout + {{ rescalesConfig['executingCooldownTimeoutInMillis'] | humanizeDuration }} + + + Executing Resource Stabilization Timeout + + {{ + rescalesConfig['executingResourceStabilizationTimeoutInMillis'] | humanizeDuration + }} + + + + Maximum Delay For Triggering Rescale + + {{ rescalesConfig['maximumDelayForTriggeringRescaleInMillis'] | humanizeDuration }} + + + + Rescale On Failed Checkpoint Count + {{ rescalesConfig['rescaleOnFailedCheckpointCount'] }} + + + History Max + {{ rescalesConfig['rescaleHistoryMax'] }} + + + + + + + + + + + + diff --git a/flink-runtime-web/web-dashboard/src/app/pages/job/rescales/job-rescales.component.less b/flink-runtime-web/web-dashboard/src/app/pages/job/rescales/job-rescales.component.less new file mode 100644 index 0000000000000..6b78072948107 --- /dev/null +++ b/flink-runtime-web/web-dashboard/src/app/pages/job/rescales/job-rescales.component.less @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +:host { + ::ng-deep { + .ant-tabs-tabpane { + position: relative; + top: -16px; + padding: 24px; + } + + .ant-tabs-nav-list { + padding: 4px 16px; + } + } +} + +.refresh { + margin-right: 12px; +} + +nz-empty { + padding: 24px; +} diff --git a/flink-runtime-web/web-dashboard/src/app/pages/job/rescales/job-rescales.component.ts b/flink-runtime-web/web-dashboard/src/app/pages/job/rescales/job-rescales.component.ts new file mode 100644 index 0000000000000..4b80254bedfc7 --- /dev/null +++ b/flink-runtime-web/web-dashboard/src/app/pages/job/rescales/job-rescales.component.ts @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import { NgIf } from '@angular/common'; +import { ChangeDetectionStrategy, ChangeDetectorRef, Component, OnDestroy, OnInit } from '@angular/core'; +import { forkJoin, of, Subject } from 'rxjs'; +import { catchError, distinctUntilChanged, switchMap, takeUntil } from 'rxjs/operators'; + +import { HumanizeDurationPipe } from '@flink-runtime-web/components/humanize-duration.pipe'; +import { RescalesConfig, JobDetail } from '@flink-runtime-web/interfaces'; +import { JobService } from '@flink-runtime-web/services'; +import { NzButtonModule } from 'ng-zorro-antd/button'; +import { NzCollapseModule } from 'ng-zorro-antd/collapse'; +import { NzDividerModule } from 'ng-zorro-antd/divider'; +import { NzEmptyModule } from 'ng-zorro-antd/empty'; +import { NzIconModule } from 'ng-zorro-antd/icon'; +import { NzTableModule } from 'ng-zorro-antd/table'; +import { NzTabsModule } from 'ng-zorro-antd/tabs'; +import { NzTooltipModule } from 'ng-zorro-antd/tooltip'; + +import { JobLocalService } from '../job-local.service'; + +@Component({ + selector: 'flink-job-rescales', + templateUrl: './job-rescales.component.html', + styleUrls: ['./job-rescales.component.less'], + changeDetection: ChangeDetectionStrategy.OnPush, + imports: [ + NgIf, + NzTabsModule, + NzDividerModule, + HumanizeDurationPipe, + NzTableModule, + NzIconModule, + NzButtonModule, + NzEmptyModule, + NzCollapseModule, + NzTooltipModule + ] +}) +export class JobRescalesComponent implements OnInit, OnDestroy { + public rescalesConfig?: RescalesConfig; + public jobDetail: JobDetail; + + private refresh$ = new Subject(); + private destroy$ = new Subject(); + + constructor( + private readonly jobService: JobService, + private readonly jobLocalService: JobLocalService, + private readonly cdr: ChangeDetectorRef + ) {} + + public ngOnInit(): void { + this.refresh$ + .pipe( + switchMap(() => + forkJoin([ + this.jobService.loadRescalesConfig(this.jobDetail.jid).pipe( + catchError(() => { + return of(undefined); + }) + ) + ]) + ), + takeUntil(this.destroy$) + ) + .subscribe(([config]) => { + this.rescalesConfig = config; + this.cdr.markForCheck(); + }); + + this.jobLocalService + .jobDetailChanges() + .pipe( + distinctUntilChanged((pre, next) => pre.jid === next.jid), + takeUntil(this.destroy$) + ) + .subscribe(data => { + this.jobDetail = data; + this.cdr.markForCheck(); + this.refresh$.next(); + }); + } + + public ngOnDestroy(): void { + this.destroy$.next(); + this.destroy$.complete(); + this.refresh$.complete(); + } + + public refresh(): void { + this.refresh$.next(); + } +} diff --git a/flink-runtime-web/web-dashboard/src/app/services/job.service.ts b/flink-runtime-web/web-dashboard/src/app/services/job.service.ts index bd4a0d9bdbbd9..313a3e9308ea4 100644 --- a/flink-runtime-web/web-dashboard/src/app/services/job.service.ts +++ b/flink-runtime-web/web-dashboard/src/app/services/job.service.ts @@ -26,6 +26,7 @@ import { CheckpointConfig, CheckpointDetail, CheckpointSubTask, + RescalesConfig, JobAccumulators, JobBackpressure, JobConfig, @@ -178,6 +179,10 @@ export class JobService { ); } + public loadRescalesConfig(jobId: string): Observable { + return this.httpClient.get(`${this.configService.BASE_URL}/jobs/${jobId}/rescales/config`); + } + public loadJobResourceRequirements(jobId: string): Observable { return this.httpClient.get( `${this.configService.BASE_URL}/jobs/${jobId}/resource-requirements`