Skip to content

Commit b44e558

Browse files
committed
[Improvement-17929] Make default worker group persistent and prevent workers from joining non-existent worker groups
1 parent 4ce723b commit b44e558

9 files changed

Lines changed: 44 additions & 58 deletions

File tree

docs/docs/en/guide/upgrade/incompatible.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,4 +43,5 @@ This document records the incompatible updates between each version. You need to
4343
## 3.4.1
4444

4545
* Remove import and export of workflow definition. ([#17940])(https://github.com/apache/dolphinscheduler/issues/17940)
46+
* Persist `default` worker group at database. ([#17929])(https://github.com/apache/dolphinscheduler/issues/17929)
4647

docs/docs/zh/guide/upgrade/incompatible.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,4 +47,5 @@
4747
## 3.4.1
4848

4949
* 移除导入导出工作流([#17940])(https://github.com/apache/dolphinscheduler/issues/17940)
50+
* 在数据库中持久化 `default` 工作组 ([#17929])(https://github.com/apache/dolphinscheduler/issues/17929)
5051

dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_h2.sql

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1051,6 +1051,8 @@ CREATE TABLE t_ds_worker_group
10511051
-- Records of t_ds_worker_group
10521052
-- ----------------------------
10531053

1054+
INSERT INTO `t_ds_worker_group` (name, addr_list, create_time, update_time, description) VALUES ('default', NULL, current_timestamp, current_timestamp, 'default worker group');
1055+
10541056
-- ----------------------------
10551057
-- Table structure for t_ds_relation_project_worker_group
10561058
-- ----------------------------

dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_mysql.sql

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1049,6 +1049,7 @@ CREATE TABLE `t_ds_worker_group` (
10491049
-- ----------------------------
10501050
-- Records of t_ds_worker_group
10511051
-- ----------------------------
1052+
INSERT INTO `t_ds_worker_group` (name, addr_list, create_time, update_time, description) VALUES ('default', NULL, current_timestamp, current_timestamp, 'default worker group');
10521053

10531054
-- ----------------------------
10541055
-- Table structure for t_ds_version

dolphinscheduler-dao/src/main/resources/sql/dolphinscheduler_postgresql.sql

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -966,6 +966,8 @@ CREATE TABLE t_ds_worker_group (
966966
CONSTRAINT name_unique UNIQUE (name)
967967
) ;
968968

969+
INSERT INTO t_ds_worker_group (name, addr_list, create_time, update_time, description) VALUES ('default', NULL, now(), now(), 'default worker group');
970+
969971
--
970972
-- Table structure for table t_ds_relation_project_worker_group
971973
--

dolphinscheduler-dao/src/main/resources/sql/upgrade/3.4.1_schema/mysql/dolphinscheduler_dml.sql

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,3 +14,5 @@
1414
* See the License for the specific language governing permissions and
1515
* limitations under the License.
1616
*/
17+
18+
INSERT IGNORE INTO t_ds_worker_group (name, addr_list, create_time, update_time, description) VALUES ('default', NULL, NOW(), NOW(), 'default worker group');

dolphinscheduler-dao/src/main/resources/sql/upgrade/3.4.1_schema/postgresql/dolphinscheduler_dml.sql

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,3 +14,8 @@
1414
* See the License for the specific language governing permissions and
1515
* limitations under the License.
1616
*/
17+
18+
-- Insert default worker group if not exists
19+
INSERT INTO t_ds_worker_group (name, addr_list, create_time, update_time, description)
20+
VALUES ('default', NULL, now(), now(), 'default worker group')
21+
ON CONFLICT (name) DO NOTHING;

dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/cluster/WorkerClusters.java

Lines changed: 22 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,9 @@
3535
import java.util.concurrent.CopyOnWriteArrayList;
3636
import java.util.stream.Collectors;
3737

38+
import lombok.extern.slf4j.Slf4j;
39+
40+
@Slf4j
3841
public class WorkerClusters extends AbstractClusterSubscribeListener<WorkerServerMetadata>
3942
implements
4043
IClusters<WorkerServerMetadata>,
@@ -43,11 +46,8 @@ public class WorkerClusters extends AbstractClusterSubscribeListener<WorkerServe
4346
// WorkerIdentifier(workerAddress) -> worker
4447
private final Map<String, WorkerServerMetadata> workerMapping = new ConcurrentHashMap<>();
4548

46-
// WorkerGroup from db -> WorkerIdentifier(workerAddress)
47-
private final Map<String, List<String>> dbWorkerGroupMapping = new ConcurrentHashMap<>();
48-
49-
// WorkerGroup from config -> WorkerIdentifier(workerAddress)
50-
private final Map<String, List<String>> configWorkerGroupMapping = new ConcurrentHashMap<>();
49+
// WorkerGroup -> WorkerIdentifier(workerAddress)
50+
private final Map<String, List<String>> workerGroupMapping = new ConcurrentHashMap<>();
5151

5252
private final List<IClustersChangeListener<WorkerServerMetadata>> workerClusterChangeListeners =
5353
new CopyOnWriteArrayList<>();
@@ -62,44 +62,23 @@ public Optional<WorkerServerMetadata> getServer(final String address) {
6262
return Optional.ofNullable(workerMapping.get(address));
6363
}
6464

65-
public List<String> getDbWorkerServerAddressByGroup(String workerGroup) {
66-
if (WorkerGroupUtils.getDefaultWorkerGroup().equals(workerGroup)) {
67-
return UnmodifiableList.unmodifiableList(new ArrayList<>(workerMapping.keySet()));
68-
}
69-
return dbWorkerGroupMapping.getOrDefault(workerGroup, Collections.emptyList());
70-
}
71-
72-
public List<String> getConfigWorkerServerAddressByGroup(String workerGroup) {
73-
if (WorkerGroupUtils.getDefaultWorkerGroup().equals(workerGroup)) {
74-
return UnmodifiableList.unmodifiableList(new ArrayList<>(workerMapping.keySet()));
75-
}
76-
return configWorkerGroupMapping.getOrDefault(workerGroup, Collections.emptyList());
65+
public List<String> getWorkerServerAddressByGroup(String workerGroup) {
66+
return workerGroupMapping.getOrDefault(workerGroup, Collections.emptyList());
7767
}
7868

7969
public List<String> getNormalWorkerServerAddressByGroup(String workerGroup) {
80-
List<String> dbWorkerAddresses = getDbWorkerServerAddressByGroup(workerGroup)
81-
.stream()
82-
.map(workerMapping::get)
83-
.filter(Objects::nonNull)
84-
.filter(workerServer -> workerServer.getServerStatus() == ServerStatus.NORMAL)
85-
.map(WorkerServerMetadata::getAddress)
86-
.collect(Collectors.toList());
87-
List<String> configWorkerAddresses = getConfigWorkerServerAddressByGroup(workerGroup)
70+
List<String> dbWorkerAddresses = getWorkerServerAddressByGroup(workerGroup)
8871
.stream()
8972
.map(workerMapping::get)
9073
.filter(Objects::nonNull)
9174
.filter(workerServer -> workerServer.getServerStatus() == ServerStatus.NORMAL)
9275
.map(WorkerServerMetadata::getAddress)
9376
.collect(Collectors.toList());
94-
dbWorkerAddresses.removeAll(configWorkerAddresses);
95-
dbWorkerAddresses.addAll(configWorkerAddresses);
9677
return UnmodifiableList.unmodifiableList(dbWorkerAddresses);
9778
}
9879

9980
public boolean containsWorkerGroup(String workerGroup) {
100-
return WorkerGroupUtils.getDefaultWorkerGroup().equals(workerGroup)
101-
|| dbWorkerGroupMapping.containsKey(workerGroup)
102-
|| configWorkerGroupMapping.containsKey(workerGroup);
81+
return workerGroupMapping.containsKey(workerGroup);
10382
}
10483

10584
@Override
@@ -109,9 +88,9 @@ public void registerListener(IClustersChangeListener<WorkerServerMetadata> liste
10988

11089
@Override
11190
public void onWorkerGroupDelete(List<WorkerGroup> workerGroups) {
112-
synchronized (dbWorkerGroupMapping) {
91+
synchronized (workerGroupMapping) {
11392
for (WorkerGroup workerGroup : workerGroups) {
114-
dbWorkerGroupMapping.remove(workerGroup.getName());
93+
workerGroupMapping.remove(workerGroup.getName());
11594
}
11695
}
11796
}
@@ -127,8 +106,8 @@ public void onWorkerGroupAdd(List<WorkerGroup> workerGroups) {
127106
public void onWorkerGroupChange(List<WorkerGroup> workerGroups) {
128107
for (WorkerGroup workerGroup : workerGroups) {
129108
List<String> workerAddresses = WorkerGroupUtils.getWorkerAddressListFromWorkerGroup(workerGroup);
130-
synchronized (dbWorkerGroupMapping) {
131-
dbWorkerGroupMapping.put(workerGroup.getName(), workerAddresses);
109+
synchronized (workerGroupMapping) {
110+
workerGroupMapping.put(workerGroup.getName(), workerAddresses);
132111
}
133112
}
134113
}
@@ -145,15 +124,12 @@ WorkerServerMetadata parseServerFromHeartbeat(String serverHeartBeatJson) {
145124
@Override
146125
public void onServerAdded(WorkerServerMetadata workerServer) {
147126
workerMapping.put(workerServer.getAddress(), workerServer);
148-
synchronized (configWorkerGroupMapping) {
149-
List<String> addWorkerGroupAddrList = configWorkerGroupMapping.get(workerServer.getWorkerGroup());
150-
if (addWorkerGroupAddrList == null) {
151-
List<String> newWorkerGroupAddrList = new ArrayList<>();
152-
newWorkerGroupAddrList.add(workerServer.getAddress());
153-
configWorkerGroupMapping.put(workerServer.getWorkerGroup(), newWorkerGroupAddrList);
154-
} else if (!addWorkerGroupAddrList.contains(workerServer.getAddress())) {
155-
addWorkerGroupAddrList.add(workerServer.getAddress());
156-
configWorkerGroupMapping.put(workerServer.getWorkerGroup(), addWorkerGroupAddrList);
127+
synchronized (workerGroupMapping) {
128+
if (!workerGroupMapping.containsKey(workerServer.getWorkerGroup())) {
129+
log.warn("The group: {} of worker: {} is not defined, please define the workergroup first",
130+
workerServer.getWorkerGroup(), workerServer);
131+
} else {
132+
workerGroupMapping.get(workerServer.getWorkerGroup()).add(workerServer.getAddress());
157133
}
158134
}
159135
for (IClustersChangeListener<WorkerServerMetadata> listener : workerClusterChangeListeners) {
@@ -164,13 +140,9 @@ public void onServerAdded(WorkerServerMetadata workerServer) {
164140
@Override
165141
public void onServerRemove(WorkerServerMetadata workerServer) {
166142
workerMapping.remove(workerServer.getAddress(), workerServer);
167-
synchronized (configWorkerGroupMapping) {
168-
List<String> removeWorkerGroupAddrList = configWorkerGroupMapping.get(workerServer.getWorkerGroup());
169-
if (removeWorkerGroupAddrList != null && removeWorkerGroupAddrList.contains(workerServer.getAddress())) {
170-
removeWorkerGroupAddrList.remove(workerServer.getAddress());
171-
if (removeWorkerGroupAddrList.isEmpty()) {
172-
configWorkerGroupMapping.remove(workerServer.getWorkerGroup());
173-
}
143+
synchronized (workerGroupMapping) {
144+
if (workerGroupMapping.containsKey(workerServer.getWorkerGroup())) {
145+
workerGroupMapping.get(workerServer.getWorkerGroup()).remove(workerServer.getAddress());
174146
}
175147
}
176148
for (IClustersChangeListener<WorkerServerMetadata> listener : workerClusterChangeListeners) {

dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/cluster/WorkerClustersTest.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -40,12 +40,12 @@ void testOnWorkerGroupDelete() {
4040
.addrList(normalWorkerServerMetadata.getAddress())
4141
.build();
4242
workerClusters.onWorkerGroupAdd(Lists.newArrayList(workerGroup));
43-
assertThat(workerClusters.getDbWorkerServerAddressByGroup("flinkCluster"))
43+
assertThat(workerClusters.getWorkerServerAddressByGroup("flinkCluster"))
4444
.containsExactly(normalWorkerServerMetadata.getAddress());
4545

4646
workerClusters.onWorkerGroupDelete(Lists.newArrayList(workerGroup));
4747
Truth.assertThat(workerClusters.containsWorkerGroup("flinkCluster")).isFalse();
48-
assertThat(workerClusters.getDbWorkerServerAddressByGroup("flinkCluster")).isEmpty();
48+
assertThat(workerClusters.getWorkerServerAddressByGroup("flinkCluster")).isEmpty();
4949
}
5050

5151
@Test
@@ -59,7 +59,7 @@ void testOnWorkerGroupAdd() {
5959
.addrList(normalWorkerServerMetadata.getAddress())
6060
.build();
6161
workerClusters.onWorkerGroupAdd(Lists.newArrayList(workerGroup));
62-
assertThat(workerClusters.getDbWorkerServerAddressByGroup("flinkCluster"))
62+
assertThat(workerClusters.getWorkerServerAddressByGroup("flinkCluster"))
6363
.containsExactly(normalWorkerServerMetadata.getAddress());
6464
}
6565

@@ -74,15 +74,15 @@ void testOnWorkerGroupChange() {
7474
.addrList(normalWorkerServerMetadata.getAddress())
7575
.build();
7676
workerClusters.onWorkerGroupAdd(Lists.newArrayList(workerGroup));
77-
assertThat(workerClusters.getDbWorkerServerAddressByGroup("flinkCluster"))
77+
assertThat(workerClusters.getWorkerServerAddressByGroup("flinkCluster"))
7878
.containsExactly(normalWorkerServerMetadata.getAddress());
7979

8080
WorkerGroup updatedWorkerGroup = WorkerGroup.builder()
8181
.name("flinkCluster")
8282
.addrList("")
8383
.build();
8484
workerClusters.onWorkerGroupChange(Lists.newArrayList(updatedWorkerGroup));
85-
assertThat(workerClusters.getDbWorkerServerAddressByGroup("flinkCluster")).isEmpty();
85+
assertThat(workerClusters.getWorkerServerAddressByGroup("flinkCluster")).isEmpty();
8686
assertThat(workerClusters.containsWorkerGroup("flinkCluster")).isTrue();
8787
}
8888

@@ -94,7 +94,7 @@ void testOnServerAdded() {
9494
WorkerClusters workerClusters = new WorkerClusters();
9595
workerClusters.onServerAdded(normalWorkerServerMetadata);
9696
workerClusters.onServerAdded(busyWorkerServerMetadata);
97-
assertThat(workerClusters.getDbWorkerServerAddressByGroup("default"))
97+
assertThat(workerClusters.getWorkerServerAddressByGroup("default"))
9898
.containsExactly(normalWorkerServerMetadata.getAddress(), busyWorkerServerMetadata.getAddress());
9999
assertThat(workerClusters.getNormalWorkerServerAddressByGroup("default"))
100100
.containsExactly(normalWorkerServerMetadata.getAddress());
@@ -110,7 +110,7 @@ void testOnServerRemove() {
110110
workerClusters.onServerAdded(busyWorkerServerMetadata);
111111
workerClusters.onServerRemove(busyWorkerServerMetadata);
112112

113-
assertThat(workerClusters.getDbWorkerServerAddressByGroup("default"))
113+
assertThat(workerClusters.getWorkerServerAddressByGroup("default"))
114114
.containsExactly(normalWorkerServerMetadata.getAddress());
115115
assertThat(workerClusters.getNormalWorkerServerAddressByGroup("default"))
116116
.containsExactly(normalWorkerServerMetadata.getAddress());
@@ -137,7 +137,7 @@ void testOnServerUpdate() {
137137

138138
workerClusters.onServerUpdate(workerServerMetadata);
139139

140-
assertThat(workerClusters.getDbWorkerServerAddressByGroup("default"))
140+
assertThat(workerClusters.getWorkerServerAddressByGroup("default"))
141141
.containsExactly(normalWorkerServerMetadata.getAddress(), workerServerMetadata.getAddress());
142142
assertThat(workerClusters.getNormalWorkerServerAddressByGroup("default"))
143143
.containsExactly(normalWorkerServerMetadata.getAddress(), workerServerMetadata.getAddress());

0 commit comments

Comments
 (0)