Skip to content

Commit c2b9dca

Browse files
committed
IGNITE-14070 Snapshot listeners management.
1 parent caaffd5 commit c2b9dca

8 files changed

Lines changed: 219 additions & 84 deletions

File tree

modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteSnapshotManager.java

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,7 @@
101101
import org.apache.ignite.internal.processors.cache.persistence.metastorage.ReadWriteMetastorage;
102102
import org.apache.ignite.internal.processors.cache.persistence.partstate.GroupPartitionId;
103103
import org.apache.ignite.internal.processors.cache.persistence.snapshot.lifecycle.SnapshotLifecycleListener;
104+
import org.apache.ignite.internal.processors.cache.persistence.snapshot.lifecycle.SnapshotListeners;
104105
import org.apache.ignite.internal.processors.cache.persistence.tree.io.DataPageIO;
105106
import org.apache.ignite.internal.processors.cache.persistence.tree.io.DataPagePayload;
106107
import org.apache.ignite.internal.processors.cache.persistence.tree.io.PageIO;
@@ -303,7 +304,7 @@ public class IgniteSnapshotManager extends GridCacheSharedManagerAdapter
303304
private ClusterSnapshotFuture clusterSnpFut;
304305

305306
// todo something like pipeline
306-
private final List<SnapshotLifecycleListener> lifecycleListeners = new ArrayList<>();
307+
private SnapshotListeners snpLsnrs;
307308

308309
/** Current snapshot operation on local node. */
309310
private volatile SnapshotOperationRequest clusterSnpReq;
@@ -385,12 +386,10 @@ public static String partDeltaFileName(int partId) {
385386
U.ensureDirectory(locSnpDir, "snapshot work directory", log);
386387
U.ensureDirectory(tmpWorkDir, "temp directory for snapshot creation", log);
387388

388-
lifecycleListeners.add(new SnapshotLifeCycleListenerImpl(ctx));
389+
snpLsnrs = new SnapshotListeners(ctx.plugins());
389390

390-
SnapshotLifecycleListener[] lsnrs = cctx.kernalContext().plugins().extensions(SnapshotLifecycleListener.class);
391-
392-
if (lsnrs != null)
393-
Collections.addAll(lifecycleListeners, lsnrs);
391+
// Register default.
392+
snpLsnrs.register(new SnapshotRestoreConsistencyCheck(ctx));
394393

395394
MetricRegistry mreg = cctx.kernalContext().metric().registry(SNAPSHOT_METRICS);
396395

@@ -549,8 +548,8 @@ public void deleteSnapshot(File snpDir, String folderName) {
549548
}
550549
}
551550

552-
public Collection<SnapshotLifecycleListener> lifeCycleListeners() {
553-
return lifecycleListeners;
551+
public SnapshotListeners listeners() {
552+
return snpLsnrs;
554553
}
555554

556555
/**
@@ -691,7 +690,7 @@ private IgniteInternalFuture<SnapshotOperationResponse> initLocalSnapshotStartSt
691690
log.info("Snapshot metafile has been created: " + smf.getAbsolutePath());
692691
}
693692

694-
for (SnapshotLifecycleListener lsnr : lifecycleListeners)
693+
for (SnapshotLifecycleListener lsnr : listeners().list())
695694
lsnr.postCreate(req.snapshotName());
696695

697696
return new SnapshotOperationResponse();

modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotLifeCycleListenerImpl.java renamed to modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreConsistencyCheck.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,14 +45,18 @@
4545
import static org.apache.ignite.internal.processors.cache.verify.IdleVerifyUtility.calculatePartitionHash;
4646
import static org.apache.ignite.internal.processors.cache.verify.IdleVerifyUtility.checkPartitionsPageCrcSum;
4747

48-
public class SnapshotLifeCycleListenerImpl implements SnapshotLifecycleListener {
48+
public class SnapshotRestoreConsistencyCheck implements SnapshotLifecycleListener {
4949
/** */
5050
private final GridKernalContext ctx;
5151

5252
/** */
5353
private final IgniteLogger log;
5454

55-
public SnapshotLifeCycleListenerImpl(GridKernalContext ctx) {
55+
@Override public int priority() {
56+
return Integer.MIN_VALUE;
57+
}
58+
59+
public SnapshotRestoreConsistencyCheck(GridKernalContext ctx) {
5660
this.ctx = ctx;
5761

5862
log = ctx.log(getClass());

modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/SnapshotRestoreProcess.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,6 @@
5555
import org.apache.ignite.internal.processors.cache.persistence.snapshot.IgniteSnapshotManager.ClusterSnapshotFuture;
5656
import org.apache.ignite.internal.processors.cache.persistence.snapshot.lifecycle.RestoreHandleTask;
5757
import org.apache.ignite.internal.processors.cache.persistence.snapshot.lifecycle.SnapshotLifecycleListener;
58-
import org.apache.ignite.internal.processors.cache.verify.IdleVerifyResultV2;
5958
import org.apache.ignite.internal.processors.cluster.DiscoveryDataClusterState;
6059
import org.apache.ignite.internal.util.distributed.DistributedProcess;
6160
import org.apache.ignite.internal.util.future.GridFinishedFuture;
@@ -156,7 +155,7 @@ protected void cleanup() throws IgniteCheckedException {
156155
* @return Future that will be completed when the restore operation is complete and the cache groups are started.
157156
*/
158157
public IgniteFuture<Void> start(String snpName, @Nullable Collection<String> cacheGrpNames) {
159-
ClusterSnapshotFuture fut0;
158+
ClusterSnapshotFuture fut0 = null;
160159

161160
try {
162161
if (ctx.clientNode())
@@ -189,7 +188,9 @@ public IgniteFuture<Void> start(String snpName, @Nullable Collection<String> cac
189188
Map<ClusterNode, List<SnapshotMetadata>> metas =
190189
ctx.cache().context().snapshotMgr().collectSnapshotMetadata(snpName).get();
191190

192-
Collection<SnapshotLifecycleListener> lsnrs = ctx.cache().context().snapshotMgr().lifeCycleListeners();
191+
Collection<SnapshotLifecycleListener> lsnrs = ctx.cache().context().snapshotMgr().listeners().list();
192+
193+
assert !lsnrs.isEmpty();
193194

194195
if (!lsnrs.isEmpty()) {
195196
ctx.security().authorize(ADMIN_SNAPSHOT);
@@ -202,7 +203,6 @@ public IgniteFuture<Void> start(String snpName, @Nullable Collection<String> cac
202203

203204
ctx.task().execute(RestoreHandleTask.class, new SnapshotPartitionsVerifyTaskArg(cacheGrpNames, metas)).get();
204205
}
205-
//
206206

207207
// Map<ClusterNode, List<SnapshotMetadata>> metas = f.result().metas();
208208
Set<UUID> dataNodes = new HashSet<>();
@@ -258,6 +258,9 @@ public IgniteFuture<Void> start(String snpName, @Nullable Collection<String> cac
258258
prepareRestoreProc.start(req.requestId(), req);
259259
}
260260
catch (IgniteException | IgniteCheckedException e) {
261+
if (fut0 != null)
262+
finishProcess(fut0.rqId, e);
263+
261264
return new IgniteFinishedFutureImpl<>(e);
262265
}
263266

modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/lifecycle/RestoreHandleTask.java

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,20 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
118
package org.apache.ignite.internal.processors.cache.persistence.snapshot.lifecycle;
219

320
import java.util.ArrayList;
@@ -109,7 +126,7 @@ public class RestoreHandleTask extends ComputeTaskAdapter<SnapshotPartitionsVeri
109126
}
110127
}
111128

112-
for (SnapshotLifecycleListener lsnr : ignite.context().cache().context().snapshotMgr().lifeCycleListeners()) {
129+
for (SnapshotLifecycleListener lsnr : ignite.context().cache().context().snapshotMgr().listeners().list()) {
113130
List<ComputeJobResult> nodeResults = resMap.get(lsnr.name());
114131

115132
try {
@@ -149,12 +166,12 @@ public RestoreHandleJob(String snapshotName, String consistentId, Collection<Str
149166
/** {@inheritDoc} */
150167
@Override public Map<String, Object> execute() throws IgniteException {
151168
// todo expand restore hadnler separately
152-
Collection<SnapshotLifecycleListener> lsnrs = ignite.context().cache().context().snapshotMgr().lifeCycleListeners();
153-
154169
Map<String, Object> resMap = new HashMap<>();
155170

171+
SnapshotListeners lsnrs = ignite.context().cache().context().snapshotMgr().listeners();
172+
156173
try {
157-
for (SnapshotLifecycleListener lsnr : lsnrs) {
174+
for (SnapshotLifecycleListener lsnr : lsnrs.list()) {
158175
Object res = lsnr.handleRestore(snapshotName, consistentId, groupNames);
159176

160177
resMap.put(lsnr.name(), res);

modules/core/src/main/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/lifecycle/SnapshotLifecycleListener.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,20 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
118
package org.apache.ignite.internal.processors.cache.persistence.snapshot.lifecycle;
219

320
import java.util.Collection;
@@ -7,8 +24,14 @@
724
import org.apache.ignite.plugin.Extension;
825

926
public interface SnapshotLifecycleListener extends Extension {
27+
public static final int DEFAULT_PRIORITY = 0;
28+
1029
public String name();
1130

31+
public default int priority() {
32+
return DEFAULT_PRIORITY;
33+
}
34+
1235
public default void postCreate(String snpName) throws IgniteCheckedException {
1336
// No-op.
1437
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.ignite.internal.processors.cache.persistence.snapshot.lifecycle;
19+
20+
import java.util.ArrayList;
21+
import java.util.Collection;
22+
import java.util.Collections;
23+
import java.util.Comparator;
24+
import java.util.HashMap;
25+
import java.util.List;
26+
import java.util.Map;
27+
import org.apache.ignite.internal.processors.plugin.IgnitePluginProcessor;
28+
import org.jetbrains.annotations.Nullable;
29+
30+
public class SnapshotListeners {
31+
private final IgnitePluginProcessor plugins;
32+
private final Map<String, SnapshotLifecycleListener> lsnrsByName = new HashMap<>();
33+
private final List<SnapshotLifecycleListener> lsnrsByPriority = new ArrayList<>();
34+
35+
public SnapshotListeners(IgnitePluginProcessor plugins) {
36+
this.plugins = plugins;
37+
38+
SnapshotLifecycleListener[] lsnrs = plugins.extensions(SnapshotLifecycleListener.class);
39+
40+
if (lsnrs == null)
41+
return;
42+
43+
for (SnapshotLifecycleListener lsnr : lsnrs)
44+
register(lsnr);
45+
}
46+
47+
public Collection<SnapshotLifecycleListener> list() {
48+
return lsnrsByPriority;
49+
}
50+
51+
public @Nullable SnapshotLifecycleListener find(String name) {
52+
return lsnrsByName.get(name);
53+
}
54+
55+
public void register(SnapshotLifecycleListener lsnr) {
56+
if (lsnrsByName.putIfAbsent(lsnr.name(), lsnr) != null)
57+
throw new IllegalArgumentException("Listener named " + lsnr.name() + " is already registered.");
58+
59+
int idx = Collections.binarySearch(lsnrsByPriority, lsnr, Comparator.comparingInt(SnapshotLifecycleListener::priority));
60+
61+
if (idx < 0)
62+
lsnrsByPriority.add(-idx - 1, lsnr);
63+
else
64+
lsnrsByPriority.add(idx, lsnr);
65+
}
66+
67+
public boolean disable(String name) {
68+
if (lsnrsByPriority.removeIf(v -> name.equals(v.name()))) {
69+
lsnrsByName.remove(name);
70+
71+
return true;
72+
}
73+
74+
return false;
75+
}
76+
77+
public boolean enable(String name) {
78+
SnapshotLifecycleListener[] lsnrs = plugins.extensions(SnapshotLifecycleListener.class);
79+
80+
for (SnapshotLifecycleListener lsnr : lsnrs) {
81+
if (name.equals(lsnr.name())) {
82+
register(lsnr);
83+
84+
return true;
85+
}
86+
}
87+
88+
return false;
89+
}
90+
}

modules/core/src/test/java/org/apache/ignite/internal/processors/cache/persistence/snapshot/IgniteClusterSnapshotRestoreBaseTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ public abstract class IgniteClusterSnapshotRestoreBaseTest extends AbstractSnaps
3636

3737
/** {@inheritDoc} */
3838
@Override protected <K, V> CacheConfiguration<K, V> txCacheConfig(CacheConfiguration<K, V> ccfg) {
39-
return super.txCacheConfig(ccfg).setAffinity(new RendezvousAffinityFunction(false, 8));
39+
return super.txCacheConfig(ccfg).setAffinity(new RendezvousAffinityFunction(false, PARTS_NUMBER));
4040
}
4141

4242
/**

0 commit comments

Comments
 (0)