From 4d31e21e528c8f7af182c67cf60f2a14abdf9dab Mon Sep 17 00:00:00 2001 From: Mencho Menchev Date: Thu, 9 Apr 2026 14:33:31 +0300 Subject: [PATCH] Optimise import registrations and removals --- .../importer/EndpointDescriptionFilter.java | 58 ++++++ .../importer/TopologyManagerImport.java | 184 +++++++++++------- 2 files changed, 174 insertions(+), 68 deletions(-) create mode 100644 topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/EndpointDescriptionFilter.java diff --git a/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/EndpointDescriptionFilter.java b/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/EndpointDescriptionFilter.java new file mode 100644 index 000000000..8f44f4189 --- /dev/null +++ b/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/EndpointDescriptionFilter.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.aries.rsa.topologymanager.importer; + +import java.util.Objects; + +import org.osgi.service.remoteserviceadmin.EndpointDescription; + +class EndpointDescriptionFilter { + private final String filter; + private final EndpointDescription endpointDescription; + + EndpointDescriptionFilter(String filter, EndpointDescription endpointDescription) { + this.filter = filter; + this.endpointDescription = endpointDescription; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null || getClass() != obj.getClass()) { + return false; + } + EndpointDescriptionFilter other = (EndpointDescriptionFilter) obj; + return filter.equals(other.filter) && endpointDescription.equals(other.endpointDescription); + } + + @Override + public int hashCode() { + return Objects.hash(filter, endpointDescription); + } + + String getFilter() { + return filter; + } + + EndpointDescription getEndpoint() { + return endpointDescription; + } +} diff --git a/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/TopologyManagerImport.java b/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/TopologyManagerImport.java index b57d494bd..1b9ead7c3 100644 --- a/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/TopologyManagerImport.java +++ b/topology-manager/src/main/java/org/apache/aries/rsa/topologymanager/importer/TopologyManagerImport.java @@ -19,11 +19,15 @@ package org.apache.aries.rsa.topologymanager.importer; import java.util.HashSet; -import java.util.LinkedHashSet; +import java.util.HashMap; +import java.util.Map; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CopyOnWriteArraySet; -import java.util.concurrent.Executors; import java.util.concurrent.ExecutorService; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import org.osgi.framework.BundleContext; @@ -61,13 +65,25 @@ public class TopologyManagerImport implements EndpointEventListener, RemoteServi /** * List of already imported Endpoints by their matched filter + * Provides an easier access to the ImportRegistration for a given EndpointDescription and filter, e.g. when an endpoint is removed or modified. */ - private final MultiMap importedServices = new MultiMap<>(); + private final ConcurrentMap importedServices = new ConcurrentHashMap<>(); + + private final Set inProgressUnimports = ConcurrentHashMap.newKeySet(); public TopologyManagerImport(BundleContext bc) { rsaSet = new CopyOnWriteArraySet<>(); bctx = bc; - execService = Executors.newCachedThreadPool(new NamedThreadFactory(getClass())); + + // max 20, default=CPU-1, but minimum 2 + int poolSize = Math.max(2, Math.min(20, Runtime.getRuntime().availableProcessors() - 1)); + execService = new ThreadPoolExecutor( + poolSize, + poolSize, + 10L, TimeUnit.SECONDS, + new LinkedBlockingQueue<>(), + new NamedThreadFactory(getClass()) + ); } public void start() { @@ -89,12 +105,13 @@ public void stop() { } // close all imports importPossibilities.clear(); - importedServices.allValues().forEach(this::unimportRegistration); + importedServices.values().forEach(this::unimportRegistration); + importedServices.clear(); } public void add(RemoteServiceAdmin rsa) { rsaSet.add(rsa); - importPossibilities.keySet().forEach(this::synchronizeImportsAsync); + execService.execute(this::synchronizeImports); } public void remove(RemoteServiceAdmin rsa) { @@ -105,90 +122,118 @@ public void remove(RemoteServiceAdmin rsa) { public void remoteAdminEvent(RemoteServiceAdminEvent event) { ImportReference ref = event.getImportReference(); if (event.getType() == RemoteServiceAdminEvent.IMPORT_UNREGISTRATION && ref != null) { - importedServices.allValues().stream() - .filter(reg -> ref.equals(reg.getImportReference())) - .forEach(this::unimportRegistration); + if (inProgressUnimports.contains(ref)) { + return;// no need to iterate over the imports + } + importedServices.values().stream() + .filter(ir -> ref.equals(ir.getImportReference())) + .forEach(this::unimportRegistration); } } - private void synchronizeImportsAsync(final String filter) { - LOG.debug("Import of a service for filter {} was queued", filter); - if (!rsaSet.isEmpty()) { - execService.execute(() -> synchronizeImports(filter)); + private void synchronizeImports() { + try { + // remove all invalid imports and temporarily collect all filters and endpoints while at it for the imports below + Map> validFiltersToImRegs = removeInvalidRegs(); + importAdded(validFiltersToImRegs); + } catch (Exception e) { + LOG.error(e.getMessage(), e); } } /** - * Synchronizes the actual imports with the possible imports for the given filter, - * i.e. un-imports previously imported endpoints that are no longer valid or possible, - * and imports new possible endpoints that are not already imported. - *

- * TODO but optional: if the service is already imported and the endpoint is still - * in the list of possible imports check if a "better" endpoint is now in the list. + * Imports all endpoints that are in the importPossibilities but not yet imported, + * and that are still valid (e.g. not removed while the imports were being removed). + * Used after a new RSA is added. * - * @param filter the filter whose endpoints are synchronized + * @param validFiltersToImRegs a map of filters to the endpoints that are still valid imports after the removals, + * used to avoid importing endpoints that were removed while the imports were being removed */ - private void synchronizeImports(final String filter) { - try { - // we have a set of all current imports, and a set of all possible imports (with overlap) - Set imported = importedServices.get(filter); - Set possible = importPossibilities.get(filter); - // first we iterate over all current imports, and split them into two groups: - // - still valid (no null references) and possible (in possible set) - // - invalid (contain null references) or no longer possible (not in possible set) - // note that this part should be concurrency-safe (get every reference only once and don't modify anything) - Set valid = new HashSet<>(); // imports that are still valid and possible - Set invalid = new LinkedHashSet<>(); // imports that are no longer valid/possible - for (ImportRegistration reg : imported) { - ImportReference ref = reg.getImportReference(); - EndpointDescription endpoint = ref == null ? null : ref.getImportedEndpoint(); - // check if the currently imported endpoint is still valid and possible - if (endpoint != null && possible.contains(endpoint)) { - valid.add(endpoint); // valid and possible - } else { - invalid.add(reg); // invalid (reg or ref or endpoint is null) or no longer possible - } + private void importAdded(Map> validFiltersToImRegs) { + // now import all new endpoints for each filter + for (String filter : importPossibilities.keySet()) { + Set validEndpoints = validFiltersToImRegs.get(filter); + for (EndpointDescription ed : importPossibilities.get(filter)) { + // if the endpoint is not already imported for the filter, import it + if (validEndpoints == null || !validEndpoints.contains(ed)) { + // this is a new endpoint for the filter, import it + synchronizeAddedImport(filter, ed); + } } - // now that we figured out what needs to be done, apply the changes - invalid.forEach(this::unimportRegistration); // remove invalid/non-possible imports - possible.forEach(endpoint -> { // import all possible endpoints that are not already imported - if (!valid.contains(endpoint)) { - importService(filter, endpoint); - } - }); - } catch (Exception e) { - LOG.error("error synchronizing imports", e); } - // Notify EndpointListeners? NO! } /** - * Tries to import the service with each RSA until one import is successful. + * Removes all imports that are no longer valid, + * e.g. because the endpoint is no longer in the list of possible imports for the filter, + * or because the import registration has no ImportReference (e.g. because the import failed). + * Used after a new RSA is added. * - * @param filter the filter that matched the endpoint - * @param endpoint endpoint to import + * @return a map of filters to the endpoints that are still valid imports, used for the imports after the removals */ - private void importService(String filter, EndpointDescription endpoint) { + private Map> removeInvalidRegs() { + Map> validFiltersToImRegs = new HashMap<>(); + importedServices.entrySet().stream() + .filter(entry -> { + // an import is invalid if the endpoint is no longer in the list of possible imports for the filter, + // or if the import registration has no ImportReference (e.g. because the import failed) + boolean invalid = !importPossibilities.get(entry.getKey().getFilter()).contains(entry.getKey().getEndpoint()) + || entry.getValue().getImportReference() == null; + if (!invalid) { + validFiltersToImRegs.compute(entry.getKey().getFilter(), (filter, eds) -> { + if (eds == null) { + eds = new HashSet<>(); + } + eds.add(entry.getKey().getEndpoint()); + return eds; + }); + } + return invalid; + }) + .forEach(entry -> synchronizeRemovedImport(entry.getKey().getFilter(), entry.getKey().getEndpoint())); + return validFiltersToImRegs; + } + + private void synchronizeAddedImport(String filter, EndpointDescription endpoint) { + if (!rsaSet.isEmpty()) { + execService.execute(() -> importedServices.computeIfAbsent(new EndpointDescriptionFilter(filter, endpoint), + edFilter -> importService(filter, endpoint))); + } + } + + private void synchronizeRemovedImport(String filter, EndpointDescription endpoint) { + if (!rsaSet.isEmpty()) { + execService.execute(() -> { + ImportRegistration importRegistration = importedServices.remove(new EndpointDescriptionFilter(filter, endpoint)); + if (importRegistration != null) { + unimportRegistration(importRegistration); + } + }); + } + } + + private ImportRegistration importService(String filter, EndpointDescription endpoint) { for (RemoteServiceAdmin rsa : rsaSet) { - ImportRegistration reg = rsa.importService(endpoint); - if (reg != null) { - if (reg.getException() == null) { - LOG.debug("Service import was successful {}", reg); - importedServices.put(filter, reg); - return; + ImportRegistration ir = rsa.importService(endpoint); + if (ir != null) { + if (ir.getException() == null) { + LOG.debug("Service import was successful for filter {}: {}", filter, ir); + return ir; } else { - LOG.warn("Error importing service {}", endpoint, reg.getException()); - reg.close(); + LOG.info("Error importing service for filter {}: {}", filter, endpoint, ir.getException()); } } } + return null; } - + private void unimportRegistration(ImportRegistration reg) { - importedServices.remove(reg); + // spares unnecessary iteration when the unimport event is received + inProgressUnimports.add(reg.getImportReference()); reg.close(); + inProgressUnimports.remove(reg.getImportReference()); } - + @Override public void endpointChanged(EndpointEvent event, String filter) { if (stopped) { @@ -199,17 +244,20 @@ public void endpointChanged(EndpointEvent event, String filter) { switch (event.getType()) { case EndpointEvent.ADDED: importPossibilities.put(filter, endpoint); + synchronizeAddedImport(filter, endpoint); break; case EndpointEvent.REMOVED: case EndpointEvent.MODIFIED_ENDMATCH: importPossibilities.remove(filter, endpoint); + synchronizeRemovedImport(filter, endpoint); break; case EndpointEvent.MODIFIED: - // new endpoint has same endpoint.id and equals old endpoint, but has updated properties - importPossibilities.replace(filter, endpoint, endpoint); + importPossibilities.remove(filter, endpoint); + synchronizeRemovedImport(filter, endpoint); + importPossibilities.put(filter, endpoint); + synchronizeAddedImport(filter, endpoint); break; } - synchronizeImportsAsync(filter); } }