Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.aries.rsa.topologymanager.importer;

import java.util.Objects;

import org.osgi.service.remoteserviceadmin.EndpointDescription;

class EndpointDescriptionFilter {
private final String filter;
private final EndpointDescription endpointDescription;

EndpointDescriptionFilter(String filter, EndpointDescription endpointDescription) {
this.filter = filter;
this.endpointDescription = endpointDescription;
}

@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
}
if (obj == null || getClass() != obj.getClass()) {
return false;
}
EndpointDescriptionFilter other = (EndpointDescriptionFilter) obj;
return filter.equals(other.filter) && endpointDescription.equals(other.endpointDescription);
}

@Override
public int hashCode() {
return Objects.hash(filter, endpointDescription);
}

String getFilter() {
return filter;
}

EndpointDescription getEndpoint() {
return endpointDescription;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,15 @@
package org.apache.aries.rsa.topologymanager.importer;

import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.Executors;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import org.osgi.framework.BundleContext;
Expand Down Expand Up @@ -61,13 +65,25 @@ public class TopologyManagerImport implements EndpointEventListener, RemoteServi

/**
* List of already imported Endpoints by their matched filter
* Provides an easier access to the ImportRegistration for a given EndpointDescription and filter, e.g. when an endpoint is removed or modified.
*/
private final MultiMap<String, ImportRegistration> importedServices = new MultiMap<>();
private final ConcurrentMap<EndpointDescriptionFilter, ImportRegistration> importedServices = new ConcurrentHashMap<>();

private final Set<ImportReference> inProgressUnimports = ConcurrentHashMap.newKeySet();

public TopologyManagerImport(BundleContext bc) {
rsaSet = new CopyOnWriteArraySet<>();
bctx = bc;
execService = Executors.newCachedThreadPool(new NamedThreadFactory(getClass()));

// max 20, default=CPU-1, but minimum 2
int poolSize = Math.max(2, Math.min(20, Runtime.getRuntime().availableProcessors() - 1));
execService = new ThreadPoolExecutor(
poolSize,
poolSize,
10L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(),
new NamedThreadFactory(getClass())
);
}

public void start() {
Expand All @@ -89,12 +105,13 @@ public void stop() {
}
// close all imports
importPossibilities.clear();
importedServices.allValues().forEach(this::unimportRegistration);
importedServices.values().forEach(this::unimportRegistration);
importedServices.clear();
}

public void add(RemoteServiceAdmin rsa) {
rsaSet.add(rsa);
importPossibilities.keySet().forEach(this::synchronizeImportsAsync);
execService.execute(this::synchronizeImports);
}

public void remove(RemoteServiceAdmin rsa) {
Expand All @@ -105,90 +122,118 @@ public void remove(RemoteServiceAdmin rsa) {
public void remoteAdminEvent(RemoteServiceAdminEvent event) {
ImportReference ref = event.getImportReference();
if (event.getType() == RemoteServiceAdminEvent.IMPORT_UNREGISTRATION && ref != null) {
importedServices.allValues().stream()
.filter(reg -> ref.equals(reg.getImportReference()))
.forEach(this::unimportRegistration);
if (inProgressUnimports.contains(ref)) {
return;// no need to iterate over the imports
}
importedServices.values().stream()
.filter(ir -> ref.equals(ir.getImportReference()))
.forEach(this::unimportRegistration);
}
}

private void synchronizeImportsAsync(final String filter) {
LOG.debug("Import of a service for filter {} was queued", filter);
if (!rsaSet.isEmpty()) {
execService.execute(() -> synchronizeImports(filter));
private void synchronizeImports() {
try {
// remove all invalid imports and temporarily collect all filters and endpoints while at it for the imports below
Map<String, Set<EndpointDescription>> validFiltersToImRegs = removeInvalidRegs();
importAdded(validFiltersToImRegs);
} catch (Exception e) {
LOG.error(e.getMessage(), e);
}
}

/**
* Synchronizes the actual imports with the possible imports for the given filter,
* i.e. un-imports previously imported endpoints that are no longer valid or possible,
* and imports new possible endpoints that are not already imported.
* <p>
* TODO but optional: if the service is already imported and the endpoint is still
* in the list of possible imports check if a "better" endpoint is now in the list.
* Imports all endpoints that are in the importPossibilities but not yet imported,
* and that are still valid (e.g. not removed while the imports were being removed).
* Used after a new RSA is added.
*
* @param filter the filter whose endpoints are synchronized
* @param validFiltersToImRegs a map of filters to the endpoints that are still valid imports after the removals,
* used to avoid importing endpoints that were removed while the imports were being removed
*/
private void synchronizeImports(final String filter) {
try {
// we have a set of all current imports, and a set of all possible imports (with overlap)
Set<ImportRegistration> imported = importedServices.get(filter);
Set<EndpointDescription> possible = importPossibilities.get(filter);
// first we iterate over all current imports, and split them into two groups:
// - still valid (no null references) and possible (in possible set)
// - invalid (contain null references) or no longer possible (not in possible set)
// note that this part should be concurrency-safe (get every reference only once and don't modify anything)
Set<EndpointDescription> valid = new HashSet<>(); // imports that are still valid and possible
Set<ImportRegistration> invalid = new LinkedHashSet<>(); // imports that are no longer valid/possible
for (ImportRegistration reg : imported) {
ImportReference ref = reg.getImportReference();
EndpointDescription endpoint = ref == null ? null : ref.getImportedEndpoint();
// check if the currently imported endpoint is still valid and possible
if (endpoint != null && possible.contains(endpoint)) {
valid.add(endpoint); // valid and possible
} else {
invalid.add(reg); // invalid (reg or ref or endpoint is null) or no longer possible
}
private void importAdded(Map<String, Set<EndpointDescription>> validFiltersToImRegs) {
// now import all new endpoints for each filter
for (String filter : importPossibilities.keySet()) {
Set<EndpointDescription> validEndpoints = validFiltersToImRegs.get(filter);
for (EndpointDescription ed : importPossibilities.get(filter)) {
// if the endpoint is not already imported for the filter, import it
if (validEndpoints == null || !validEndpoints.contains(ed)) {
// this is a new endpoint for the filter, import it
synchronizeAddedImport(filter, ed);
}
}
// now that we figured out what needs to be done, apply the changes
invalid.forEach(this::unimportRegistration); // remove invalid/non-possible imports
possible.forEach(endpoint -> { // import all possible endpoints that are not already imported
if (!valid.contains(endpoint)) {
importService(filter, endpoint);
}
});
} catch (Exception e) {
LOG.error("error synchronizing imports", e);
}
// Notify EndpointListeners? NO!
}

/**
* Tries to import the service with each RSA until one import is successful.
* Removes all imports that are no longer valid,
* e.g. because the endpoint is no longer in the list of possible imports for the filter,
* or because the import registration has no ImportReference (e.g. because the import failed).
* Used after a new RSA is added.
*
* @param filter the filter that matched the endpoint
* @param endpoint endpoint to import
* @return a map of filters to the endpoints that are still valid imports, used for the imports after the removals
*/
private void importService(String filter, EndpointDescription endpoint) {
private Map<String, Set<EndpointDescription>> removeInvalidRegs() {
Map<String, Set<EndpointDescription>> validFiltersToImRegs = new HashMap<>();
importedServices.entrySet().stream()
.filter(entry -> {
// an import is invalid if the endpoint is no longer in the list of possible imports for the filter,
// or if the import registration has no ImportReference (e.g. because the import failed)
boolean invalid = !importPossibilities.get(entry.getKey().getFilter()).contains(entry.getKey().getEndpoint())
|| entry.getValue().getImportReference() == null;
if (!invalid) {
validFiltersToImRegs.compute(entry.getKey().getFilter(), (filter, eds) -> {
if (eds == null) {
eds = new HashSet<>();
}
eds.add(entry.getKey().getEndpoint());
return eds;
});
}
return invalid;
})
.forEach(entry -> synchronizeRemovedImport(entry.getKey().getFilter(), entry.getKey().getEndpoint()));
return validFiltersToImRegs;
}

private void synchronizeAddedImport(String filter, EndpointDescription endpoint) {
if (!rsaSet.isEmpty()) {
execService.execute(() -> importedServices.computeIfAbsent(new EndpointDescriptionFilter(filter, endpoint),
edFilter -> importService(filter, endpoint)));
}
}

private void synchronizeRemovedImport(String filter, EndpointDescription endpoint) {
if (!rsaSet.isEmpty()) {
execService.execute(() -> {
ImportRegistration importRegistration = importedServices.remove(new EndpointDescriptionFilter(filter, endpoint));
if (importRegistration != null) {
unimportRegistration(importRegistration);
}
});
}
}

private ImportRegistration importService(String filter, EndpointDescription endpoint) {
for (RemoteServiceAdmin rsa : rsaSet) {
ImportRegistration reg = rsa.importService(endpoint);
if (reg != null) {
if (reg.getException() == null) {
LOG.debug("Service import was successful {}", reg);
importedServices.put(filter, reg);
return;
ImportRegistration ir = rsa.importService(endpoint);
if (ir != null) {
if (ir.getException() == null) {
LOG.debug("Service import was successful for filter {}: {}", filter, ir);
return ir;
} else {
LOG.warn("Error importing service {}", endpoint, reg.getException());
reg.close();
LOG.info("Error importing service for filter {}: {}", filter, endpoint, ir.getException());
}
}
}
return null;
}

private void unimportRegistration(ImportRegistration reg) {
importedServices.remove(reg);
// spares unnecessary iteration when the unimport event is received
inProgressUnimports.add(reg.getImportReference());
reg.close();
inProgressUnimports.remove(reg.getImportReference());
}

@Override
public void endpointChanged(EndpointEvent event, String filter) {
if (stopped) {
Expand All @@ -199,17 +244,20 @@ public void endpointChanged(EndpointEvent event, String filter) {
switch (event.getType()) {
case EndpointEvent.ADDED:
importPossibilities.put(filter, endpoint);
synchronizeAddedImport(filter, endpoint);
break;
case EndpointEvent.REMOVED:
case EndpointEvent.MODIFIED_ENDMATCH:
importPossibilities.remove(filter, endpoint);
synchronizeRemovedImport(filter, endpoint);
break;
case EndpointEvent.MODIFIED:
// new endpoint has same endpoint.id and equals old endpoint, but has updated properties
importPossibilities.replace(filter, endpoint, endpoint);
importPossibilities.remove(filter, endpoint);
synchronizeRemovedImport(filter, endpoint);
importPossibilities.put(filter, endpoint);
synchronizeAddedImport(filter, endpoint);
break;
}
synchronizeImportsAsync(filter);
}

}