Skip to content

Commit c82d81e

Browse files
committed
[FLINK-38901][runtime-web] Introduce the Rescales/Configuration sub-page for streaming jobs with the adaptive scheduler enabled
1 parent 49dfc16 commit c82d81e

File tree

12 files changed

+326
-2
lines changed

12 files changed

+326
-2
lines changed

flink-runtime-web/web-dashboard/src/app/interfaces/job-detail.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ export interface JobDetail {
6161
'stream-graph': StreamGraph;
6262
'pending-operators': number;
6363
'application-id': string;
64+
schedulerType: string;
6465
}
6566

6667
interface Plan {
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
export interface RescalesConfig {
20+
rescaleHistoryMax: number;
21+
schedulerExecutionMode: string;
22+
submissionResourceWaitTimeoutInMillis: number;
23+
submissionResourceStabilizationTimeoutInMillis: number;
24+
slotIdleTimeoutInMillis: number;
25+
executingCooldownTimeoutInMillis: number;
26+
executingResourceStabilizationTimeoutInMillis: number;
27+
maximumDelayForTriggeringRescaleInMillis: number;
28+
rescaleOnFailedCheckpointCount: number;
29+
}

flink-runtime-web/web-dashboard/src/app/interfaces/public-api.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ export * from './job-vertex';
2727
export * from './job-checkpoint';
2828
export * from './job-backpressure';
2929
export * from './job-flamegraph';
30+
export * from './job-rescales';
3031
export * from './plan';
3132
export * from './overview';
3233
export * from './task-manager';

flink-runtime-web/web-dashboard/src/app/pages/job/job-detail/status/job-status.component.ts

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@ export class JobStatusComponent implements OnInit, OnDestroy {
6363
urlLoading = true;
6464
readonly listOfNavigation: RouterTab[];
6565
private readonly checkpointIndexOfNavigation: number;
66+
private readonly rescalesIndexOfNavigation: number;
6667

6768
webCancelEnabled = this.statusService.configuration.features['web-cancel'];
6869
isHistoryServer = this.statusService.configuration.features['web-history'];
@@ -80,6 +81,7 @@ export class JobStatusComponent implements OnInit, OnDestroy {
8081
) {
8182
this.listOfNavigation = moduleConfig.routerTabs || JOB_MODULE_DEFAULT_CONFIG.routerTabs;
8283
this.checkpointIndexOfNavigation = this.checkpointIndexOfNav();
84+
this.rescalesIndexOfNavigation = this.rescalesIndexOfNav();
8385
}
8486

8587
ngOnInit(): void {
@@ -126,6 +128,10 @@ export class JobStatusComponent implements OnInit, OnDestroy {
126128
return this.listOfNavigation.findIndex(item => item.path === 'checkpoints');
127129
}
128130

131+
rescalesIndexOfNav(): number {
132+
return this.listOfNavigation.findIndex(item => item.path === 'rescales');
133+
}
134+
129135
private handleJobDetailChanged(data: JobDetailCorrect): void {
130136
this.jobDetail = data;
131137
const checkpointNavIndex = this.checkpointIndexOfNav();
@@ -137,6 +143,17 @@ export class JobStatusComponent implements OnInit, OnDestroy {
137143
} else if (data.plan.type == 'BATCH' && checkpointNavIndex > -1) {
138144
this.listOfNavigation.splice(checkpointNavIndex, 1);
139145
}
146+
147+
const rescalesNavIndex = this.rescalesIndexOfNav();
148+
const shouldShowRescales = data.plan.type == 'STREAMING' && data.schedulerType == 'Adaptive';
149+
if (!shouldShowRescales && rescalesNavIndex > -1) {
150+
this.listOfNavigation.splice(rescalesNavIndex, 1);
151+
} else if (shouldShowRescales && rescalesNavIndex == -1) {
152+
this.listOfNavigation.splice(this.rescalesIndexOfNavigation, 0, {
153+
path: 'rescales',
154+
title: 'Rescales'
155+
});
156+
}
140157
this.cdr.markForCheck();
141158
}
142159

flink-runtime-web/web-dashboard/src/app/pages/job/job.config.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,8 @@ export const JOB_MODULE_DEFAULT_CONFIG: Required<JobModuleConfig> = {
2929
{ title: 'Data Skew', path: 'dataskew' },
3030
{ title: 'TimeLine', path: 'timeline' },
3131
{ title: 'Checkpoints', path: 'checkpoints' },
32-
{ title: 'Configuration', path: 'configuration' }
32+
{ title: 'Configuration', path: 'configuration' },
33+
{ title: 'Rescales', path: 'rescales' }
3334
]
3435
};
3536

flink-runtime-web/web-dashboard/src/app/pages/job/modules/completed-job/routes.ts

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ const OVERRIDE_JOB_MODULE_CONFIG_FACTORY = (statusService: StatusService): JobMo
5353
{ title: 'TimeLine', path: 'timeline' },
5454
{ title: 'Checkpoints', path: 'checkpoints' },
5555
{ title: 'Job Configuration', path: 'configuration' },
56+
{ title: 'Rescales', path: 'rescales' },
5657
{ title: 'Cluster Configuration', path: 'cluster_configuration' }
5758
]
5859
: JOB_MODULE_DEFAULT_CONFIG.routerTabs
@@ -129,6 +130,14 @@ export const COMPLETED_JOB_ROUES: Routes = [
129130
path: 'configuration'
130131
}
131132
},
133+
{
134+
path: 'rescales',
135+
loadComponent: () =>
136+
import('@flink-runtime-web/pages/job/rescales/job-rescales.component').then(m => m.JobRescalesComponent),
137+
data: {
138+
path: 'rescales'
139+
}
140+
},
132141
{
133142
path: 'cluster_configuration',
134143
loadComponent: () =>

flink-runtime-web/web-dashboard/src/app/pages/job/modules/running-job/routes.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,14 @@ export const RUNNING_JOB_ROUTES: Routes = [
8080
path: 'configuration'
8181
}
8282
},
83+
{
84+
path: 'rescales',
85+
loadComponent: () =>
86+
import('@flink-runtime-web/pages/job/rescales/job-rescales.component').then(m => m.JobRescalesComponent),
87+
data: {
88+
path: 'rescales'
89+
}
90+
},
8391
{ path: '**', redirectTo: 'overview', pathMatch: 'full' }
8492
]
8593
}
Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
<!--
2+
~ Licensed to the Apache Software Foundation (ASF) under one
3+
~ or more contributor license agreements. See the NOTICE file
4+
~ distributed with this work for additional information
5+
~ regarding copyright ownership. The ASF licenses this file
6+
~ to you under the Apache License, Version 2.0 (the
7+
~ "License"); you may not use this file except in compliance
8+
~ with the License. You may obtain a copy of the License at
9+
~
10+
~ http://www.apache.org/licenses/LICENSE-2.0
11+
~
12+
~ Unless required by applicable law or agreed to in writing, software
13+
~ distributed under the License is distributed on an "AS IS" BASIS,
14+
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
~ See the License for the specific language governing permissions and
16+
~ limitations under the License.
17+
-->
18+
19+
<nz-tabs
20+
*ngIf="rescalesConfig"
21+
[nzSize]="'small'"
22+
[nzAnimated]="{ inkBar: true, tabPane: false }"
23+
[nzTabBarExtraContent]="extraTemplate"
24+
>
25+
<nz-tab nzTitle="Configuration">
26+
<nz-table
27+
class="no-border small"
28+
[nzData]="rescalesConfig ? [''] : []"
29+
[nzSize]="'small'"
30+
[nzFrontPagination]="false"
31+
[nzShowPagination]="false"
32+
>
33+
<thead>
34+
<tr>
35+
<th><strong>Option</strong></th>
36+
<th><strong>Value</strong></th>
37+
</tr>
38+
</thead>
39+
<tbody>
40+
<ng-container *ngIf="rescalesConfig">
41+
<tr>
42+
<td>Scheduler Execution Mode</td>
43+
<td *ngIf="rescalesConfig['schedulerExecutionMode'] === 'REACTIVE'">REACTIVE</td>
44+
<td *ngIf="rescalesConfig['schedulerExecutionMode'] !== 'REACTIVE'"></td>
45+
</tr>
46+
<tr>
47+
<td>Submission Resource Wait Timeout</td>
48+
<td>
49+
{{ rescalesConfig['submissionResourceWaitTimeoutInMillis'] | humanizeDuration }}
50+
</td>
51+
</tr>
52+
<tr>
53+
<td>Submission ResourceStabilization Timeout</td>
54+
<td>
55+
{{
56+
rescalesConfig['submissionResourceStabilizationTimeoutInMillis'] | humanizeDuration
57+
}}
58+
</td>
59+
</tr>
60+
<tr>
61+
<td>Slot Idle Timeout</td>
62+
<td>{{ rescalesConfig['slotIdleTimeoutInMillis'] | humanizeDuration }}</td>
63+
</tr>
64+
<tr>
65+
<td>Executing Cooldown Timeout</td>
66+
<td>{{ rescalesConfig['executingCooldownTimeoutInMillis'] | humanizeDuration }}</td>
67+
</tr>
68+
<tr>
69+
<td>Executing Resource Stabilization Timeout</td>
70+
<td>
71+
{{
72+
rescalesConfig['executingResourceStabilizationTimeoutInMillis'] | humanizeDuration
73+
}}
74+
</td>
75+
</tr>
76+
<tr>
77+
<td>Maximum Delay For Triggering Rescale</td>
78+
<td>
79+
{{ rescalesConfig['maximumDelayForTriggeringRescaleInMillis'] | humanizeDuration }}
80+
</td>
81+
</tr>
82+
<tr>
83+
<td>Rescale On Failed Checkpoint Count</td>
84+
<td>{{ rescalesConfig['rescaleOnFailedCheckpointCount'] }}</td>
85+
</tr>
86+
<tr>
87+
<td>History Max</td>
88+
<td>{{ rescalesConfig['rescaleHistoryMax'] }}</td>
89+
</tr>
90+
</ng-container>
91+
</tbody>
92+
</nz-table>
93+
</nz-tab>
94+
</nz-tabs>
95+
96+
<ng-template #extraTemplate>
97+
<button nz-button nzType="primary" class="refresh" nzSize="small" (click)="refresh()">
98+
<i nz-icon nzType="sync"></i>
99+
Refresh
100+
</button>
101+
</ng-template>
102+
103+
<nz-empty *ngIf="!rescalesConfig"></nz-empty>
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
:host {
20+
::ng-deep {
21+
.ant-tabs-tabpane {
22+
position: relative;
23+
top: -16px;
24+
padding: 24px;
25+
}
26+
27+
.ant-tabs-nav-list {
28+
padding: 4px 16px;
29+
}
30+
}
31+
}
32+
33+
.refresh {
34+
margin-right: 12px;
35+
}
36+
37+
nz-empty {
38+
padding: 24px;
39+
}
Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
import { NgIf } from '@angular/common';
20+
import { ChangeDetectionStrategy, ChangeDetectorRef, Component, OnDestroy, OnInit } from '@angular/core';
21+
import { forkJoin, of, Subject } from 'rxjs';
22+
import { catchError, distinctUntilChanged, switchMap, takeUntil } from 'rxjs/operators';
23+
24+
import { HumanizeDurationPipe } from '@flink-runtime-web/components/humanize-duration.pipe';
25+
import {RescalesConfig, JobDetail} from '@flink-runtime-web/interfaces';
26+
import { JobService } from '@flink-runtime-web/services';
27+
import { NzButtonModule } from 'ng-zorro-antd/button';
28+
import { NzCollapseModule } from 'ng-zorro-antd/collapse';
29+
import { NzDividerModule } from 'ng-zorro-antd/divider';
30+
import { NzEmptyModule } from 'ng-zorro-antd/empty';
31+
import { NzIconModule } from 'ng-zorro-antd/icon';
32+
import { NzTableModule } from 'ng-zorro-antd/table';
33+
import { NzTabsModule } from 'ng-zorro-antd/tabs';
34+
import { NzTooltipModule } from 'ng-zorro-antd/tooltip';
35+
36+
import { JobLocalService } from '../job-local.service';
37+
38+
@Component({
39+
selector: 'flink-job-rescales',
40+
templateUrl: './job-rescales.component.html',
41+
styleUrls: ['./job-rescales.component.less'],
42+
changeDetection: ChangeDetectionStrategy.OnPush,
43+
imports: [
44+
NgIf,
45+
NzTabsModule,
46+
NzDividerModule,
47+
HumanizeDurationPipe,
48+
NzTableModule,
49+
NzIconModule,
50+
NzButtonModule,
51+
NzEmptyModule,
52+
NzCollapseModule,
53+
NzTooltipModule
54+
]
55+
})
56+
export class JobRescalesComponent implements OnInit, OnDestroy {
57+
58+
public rescalesConfig?: RescalesConfig;
59+
public jobDetail: JobDetail;
60+
61+
private refresh$ = new Subject<void>();
62+
private destroy$ = new Subject<void>();
63+
64+
constructor(
65+
private readonly jobService: JobService,
66+
private readonly jobLocalService: JobLocalService,
67+
private readonly cdr: ChangeDetectorRef
68+
) {}
69+
70+
public ngOnInit(): void {
71+
this.refresh$
72+
.pipe(
73+
switchMap(() =>
74+
forkJoin([
75+
this.jobService.loadRescalesConfig(this.jobDetail.jid).pipe(
76+
catchError(() => {
77+
return of(undefined);
78+
})
79+
)
80+
])
81+
),
82+
takeUntil(this.destroy$)
83+
)
84+
.subscribe(([config]) => {
85+
this.rescalesConfig = config;
86+
this.cdr.markForCheck();
87+
});
88+
89+
this.jobLocalService
90+
.jobDetailChanges()
91+
.pipe(
92+
distinctUntilChanged((pre, next) => pre.jid === next.jid),
93+
takeUntil(this.destroy$)
94+
)
95+
.subscribe(data => {
96+
this.jobDetail = data;
97+
this.cdr.markForCheck();
98+
this.refresh$.next();
99+
});
100+
}
101+
102+
public ngOnDestroy(): void {
103+
this.destroy$.next();
104+
this.destroy$.complete();
105+
this.refresh$.complete();
106+
}
107+
108+
public refresh(): void {
109+
this.refresh$.next();
110+
}
111+
}

0 commit comments

Comments
 (0)