From 2b43f89be98ce80dd893b4c742adba72039fc1c2 Mon Sep 17 00:00:00 2001 From: betzm Date: Fri, 12 Aug 2022 12:23:21 +0200 Subject: [PATCH 1/7] [KARAF-7517] Avoid local removal of remote service if more consumers remain after uninstall of a consumer [KARAF-7518] Remove locally registered remote service if no longer available [KARAF-7515] Return CancellationException instead of null if service not reachable [KARAF-7433] Added cluster wide propagation of service properties Additional: -Added plenty of logging -Some code cleanup and renaming -Added and renamed constants in Constants.java -Shell listing of services now sorted and extended with the endpoint Id -ImportServiceListener internal thead no longer dying on collection modification --- .../apache/karaf/cellar/dosgi/Constants.java | 4 +- .../cellar/dosgi/EndpointDescription.java | 50 ++-- .../cellar/dosgi/ExportServiceListener.java | 69 +++-- .../cellar/dosgi/ImportServiceListener.java | 267 ++++++++++++------ .../dosgi/RemoteServiceCallHandler.java | 2 +- .../cellar/dosgi/RemoteServiceFactory.java | 18 +- .../dosgi/RemoteServiceInvocationHandler.java | 23 +- .../dosgi/RemoteServiceProxyClassLoader.java | 2 +- .../cellar/dosgi/internal/osgi/Activator.java | 5 +- .../osgi/RemovedNodeServiceTracker.java | 6 +- .../cellar/dosgi/management/ServiceMBean.java | 2 +- .../management/internal/ServiceMBeanImpl.java | 6 +- .../shell/ListDistributedServicesCommand.java | 25 +- .../cellar/dosgi/EndpointDescriptionTest.java | 11 +- 14 files changed, 313 insertions(+), 177 deletions(-) diff --git a/dosgi/src/main/java/org/apache/karaf/cellar/dosgi/Constants.java b/dosgi/src/main/java/org/apache/karaf/cellar/dosgi/Constants.java index c26b686ec..88c8dd539 100644 --- a/dosgi/src/main/java/org/apache/karaf/cellar/dosgi/Constants.java +++ b/dosgi/src/main/java/org/apache/karaf/cellar/dosgi/Constants.java @@ -18,9 +18,11 @@ */ public abstract class Constants { + public static final String DOT = "."; public static final String SEPARATOR = "/"; public static final String ALL_INTERFACES = "*"; - public static final String INTERFACE_SEPARATOR = ","; + public static final String COMMA_SEPARATOR = ","; + public static final String SERVICE_DOT = "service."; public static final String INTERFACE_PREFIX = "org.apache.karaf.cellar.dosgi"; public static final String REQUEST_PREFIX = "org.apache.karaf.cellar.dosgi.request"; public static final String RESULT_PREFIX = "org.apache.karaf.cellar.dosgi.result"; diff --git a/dosgi/src/main/java/org/apache/karaf/cellar/dosgi/EndpointDescription.java b/dosgi/src/main/java/org/apache/karaf/cellar/dosgi/EndpointDescription.java index 193b18264..71e56a9a9 100644 --- a/dosgi/src/main/java/org/apache/karaf/cellar/dosgi/EndpointDescription.java +++ b/dosgi/src/main/java/org/apache/karaf/cellar/dosgi/EndpointDescription.java @@ -36,15 +36,16 @@ public class EndpointDescription implements MultiNode { private final Map properties = new HashMap(); /** - * Constructor + * Constructor with service properties * * @param id * @param node + * @param properties */ - public EndpointDescription(String id, Node node) { + public EndpointDescription(String id, Node node, Map properties) { this.id = id; this.nodes.add(node); - properties.put(org.osgi.framework.Constants.OBJECTCLASS,getServiceClass()); + this.properties.putAll(properties); } @@ -54,8 +55,8 @@ public EndpointDescription(String id, Node node) { * * @param filter The filter to test. * @return true If the properties of this - * EndpointDescription match the filter, - * false otherwise. + * EndpointDescription match the filter, + * false otherwise. * @throws IllegalArgumentException If filter contains an * invalid filter string that cannot be parsed. */ @@ -64,8 +65,7 @@ public boolean matches(String filter) { try { f = FrameworkUtil.createFilter(filter); } catch (InvalidSyntaxException e) { - IllegalArgumentException iae = new IllegalArgumentException(e.getMessage()); - iae.initCause(e); + IllegalArgumentException iae = new IllegalArgumentException(e.getMessage(), e); throw iae; } @@ -76,9 +76,9 @@ public boolean matches(String filter) { dictionary.put(key, value); } /* - * we can use matchCase here since properties already supports case - * insensitive key lookup. - */ + * we can use matchCase here since properties already supports case + * insensitive key lookup. + */ return f.matchCase(dictionary); } @@ -90,28 +90,20 @@ public Set getNodes() { return nodes; } - public void setNodes(Set nodes) { - if(nodes != null) { - for(Node node:nodes) { - this.nodes.add(node); - } - } - } + public void setNodes(Set nodes) { + if (nodes != null) { + for (Node node : nodes) { + this.nodes.add(node); + } + } + } public Map getProperties() { return properties; } public final String getServiceClass() { - String result = null; - - if(id != null) { - String[] parts = id.split(Constants.SEPARATOR); - if(parts != null && parts.length > 0) { - result = parts[0]; - } - } - return result; + return (String) properties.get(org.osgi.framework.Constants.OBJECTCLASS); } @Override @@ -125,11 +117,7 @@ public boolean equals(Object o) { EndpointDescription endpointDescription = (EndpointDescription) o; - if (id != null ? !id.equals(endpointDescription.id) : endpointDescription.id != null) { - return false; - } - - return true; + return id != null ? id.equals(endpointDescription.id) : endpointDescription.id == null; } @Override diff --git a/dosgi/src/main/java/org/apache/karaf/cellar/dosgi/ExportServiceListener.java b/dosgi/src/main/java/org/apache/karaf/cellar/dosgi/ExportServiceListener.java index fff299b73..46e98cc34 100644 --- a/dosgi/src/main/java/org/apache/karaf/cellar/dosgi/ExportServiceListener.java +++ b/dosgi/src/main/java/org/apache/karaf/cellar/dosgi/ExportServiceListener.java @@ -30,22 +30,20 @@ import java.util.LinkedHashSet; import java.util.Map; import java.util.Set; +import java.util.TreeMap; /** * Listener called when a new service is exported. */ public class ExportServiceListener implements ServiceListener { - private static final transient Logger LOGGER = LoggerFactory.getLogger(ExportServiceListener.class); - + private static final Logger LOGGER = LoggerFactory.getLogger(ExportServiceListener.class); + private final Map consumers = new HashMap<>(); private ClusterManager clusterManager; private EventTransportFactory eventTransportFactory; private BundleContext bundleContext; private Map remoteEndpoints; - - private final Map consumers = new HashMap(); - private Node node; public void init() { @@ -54,7 +52,7 @@ public void init() { bundleContext.addServiceListener(this); // lookup for already exported services - ServiceReference[] references = null; + ServiceReference[] references; try { String filter = "(" + Constants.EXPORTED_INTERFACES + "=" + Constants.ALL_INTERFACES + ")"; references = bundleContext.getServiceReferences((String) null, filter); @@ -113,24 +111,33 @@ public void exportService(ServiceReference serviceReference) { String exportedServices = (String) serviceReference.getProperty(Constants.EXPORTED_INTERFACES); if (exportedServices != null && exportedServices.length() > 0) { - LOGGER.debug("CELLAR DOSGI: registering services {} in the cluster", exportedServices); - String[] interfaces = exportedServices.split(Constants.INTERFACE_SEPARATOR); + + Map exportedParametersMap = getExportedParameters(serviceReference); + + String exportedParameters = concatenateExportedParameters(exportedParametersMap); + + LOGGER.debug("CELLAR DOSGI: registering services {} in the cluster with parameters {}", exportedServices, exportedParametersMap); + + String[] interfaces = exportedServices.split(Constants.COMMA_SEPARATOR); Object service = bundleContext.getService(serviceReference); Set exportedInterfaces = getServiceInterfaces(service, interfaces); - for (String iface : exportedInterfaces) { - // add endpoint description to the set. - Version version = serviceReference.getBundle().getVersion(); - String endpointId = iface + Constants.SEPARATOR + version.toString(); + for (String exportedInterface : exportedInterfaces) { + // add endpoint description to the set, the endpoint ID must contain all significant service identifiers + String version = serviceReference.getBundle().getVersion().toString(); + String endpointId = exportedInterface + exportedParameters + version; EndpointDescription endpoint; if (remoteEndpoints.containsKey(endpointId)) { + LOGGER.debug("CELLAR DOSGI: adding endpoint ID to existing node {}", endpointId); endpoint = remoteEndpoints.get(endpointId); endpoint.getNodes().add(node); } else { - endpoint = new EndpointDescription(endpointId, node); + LOGGER.debug("CELLAR DOSGI: creating new endpoint ID {}", endpointId); + exportedParametersMap.put(org.osgi.framework.Constants.OBJECTCLASS, exportedInterface); + endpoint = new EndpointDescription(endpointId, node, exportedParametersMap); } remoteEndpoints.put(endpointId, endpoint); @@ -161,16 +168,22 @@ public void unExportService(ServiceReference serviceReference) { Thread.currentThread().setContextClassLoader(getClass().getClassLoader()); String exportedServices = (String) serviceReference.getProperty(Constants.EXPORTED_INTERFACES); if (exportedServices != null && exportedServices.length() > 0) { - LOGGER.debug("CELLAR DOSGI: un-register service {} from the cluster", exportedServices); - String[] interfaces = exportedServices.split(Constants.INTERFACE_SEPARATOR); + + Map exportedParametersMap = getExportedParameters(serviceReference); + + String exportedParameters = concatenateExportedParameters(exportedParametersMap); + + LOGGER.debug("CELLAR DOSGI: un-register service {} from the cluster with parameters {}", exportedServices, exportedParametersMap); + + String[] interfaces = exportedServices.split(Constants.COMMA_SEPARATOR); Object service = bundleContext.getService(serviceReference); Set exportedInterfaces = getServiceInterfaces(service, interfaces); - for (String iface : exportedInterfaces) { + for (String exportedInterface : exportedInterfaces) { // add endpoint description to the set. Version version = serviceReference.getBundle().getVersion(); - String endpointId = iface + Constants.SEPARATOR + version.toString(); + String endpointId = exportedInterface + exportedParameters + version.toString(); EndpointDescription endpointDescription = remoteEndpoints.remove(endpointId); endpointDescription.getNodes().remove(node); @@ -227,6 +240,28 @@ public Set getServiceInterfaces(Object service, String[] services) { return interfaceList; } + private Map getExportedParameters(ServiceReference serviceReference) { + // sorted map for reproducible endpointId results + TreeMap exportedParameters = new TreeMap(); + for (String key : serviceReference.getPropertyKeys()) { + // skip service private and instance properties + if (!key.startsWith(Constants.DOT) && !key.contains(Constants.SERVICE_DOT)) { + exportedParameters.put(key, serviceReference.getProperty(key)); + } + } + exportedParameters.remove(org.osgi.framework.Constants.OBJECTCLASS); + return exportedParameters; + } + + private String concatenateExportedParameters(Map exportedParameters) { + String sortedExportedParameters = Constants.SEPARATOR; + if (exportedParameters.isEmpty()) return sortedExportedParameters; + for (Map.Entry entry : exportedParameters.entrySet()) { + sortedExportedParameters = sortedExportedParameters + entry.getKey() + "=" + entry.getValue() + Constants.SEPARATOR; + } + return sortedExportedParameters; + } + public ClusterManager getClusterManager() { return clusterManager; } diff --git a/dosgi/src/main/java/org/apache/karaf/cellar/dosgi/ImportServiceListener.java b/dosgi/src/main/java/org/apache/karaf/cellar/dosgi/ImportServiceListener.java index c37da8900..a5dda62f4 100644 --- a/dosgi/src/main/java/org/apache/karaf/cellar/dosgi/ImportServiceListener.java +++ b/dosgi/src/main/java/org/apache/karaf/cellar/dosgi/ImportServiceListener.java @@ -27,8 +27,12 @@ import org.slf4j.LoggerFactory; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.Hashtable; +import java.util.Iterator; +import java.util.LinkedHashMap; import java.util.LinkedHashSet; import java.util.Map; import java.util.Set; @@ -39,48 +43,43 @@ /** * Listener for the service import. */ -public class ImportServiceListener implements ListenerHook, Runnable { +public class ImportServiceListener implements ListenerHook { - private static final transient Logger LOGGER = LoggerFactory.getLogger(ImportServiceListener.class); + private static final Logger LOGGER = LoggerFactory.getLogger(ImportServiceListener.class); - private BundleContext bundleContext; + private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(); + private final Map listenerRegistrations = Collections.synchronizedMap(new LinkedHashMap()); + private final Set pendingListeners = Collections.synchronizedSet(new LinkedHashSet()); + private final Map serviceRegistrations = new HashMap(); + private final Map producers = new HashMap(); + private final Map consumers = new HashMap(); + private Map remoteEndpoints; + private EventTransportFactory eventTransportFactory; private ClusterManager clusterManager; + private BundleContext bundleContext; private CommandStore commandStore; - private EventTransportFactory eventTransportFactory; - private Map remoteEndpoints; - private Set pendingListeners = new LinkedHashSet(); - - private final Map registrations = new HashMap(); - - private final Map producers = new HashMap(); - private final Map consumers = new HashMap(); - - private final ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor(); public void init() { remoteEndpoints = clusterManager.getMap(Constants.REMOTE_ENDPOINTS); - service.scheduleAtFixedRate(this, 0, 5, TimeUnit.SECONDS); + scheduledExecutorService.scheduleWithFixedDelay(new RemoteServiceTracker(this), 5, 5, TimeUnit.SECONDS); } public void destroy() { - service.shutdown(); - for (Map.Entry entry : registrations.entrySet()) { - ServiceRegistration registration = entry.getValue(); - registration.unregister(); - } - for (Map.Entry consumerEntry : consumers.entrySet()) { - EventConsumer consumer = consumerEntry.getValue(); - consumer.stop(); - } - consumers.clear(); - producers.clear(); - } - - @Override - public void run() { - for (ListenerInfo listener : pendingListeners) { - checkListener(listener); + scheduledExecutorService.shutdown(); + synchronized (pendingListeners) { + for (ServiceRegistration serviceRegistration : serviceRegistrations.values()) { + LOGGER.trace("CELLAR DOSGI: DESTROY removing registration {}", serviceRegistration.getReference().getPropertyKeys()); + serviceRegistration.unregister(); + } + for (EventConsumer eventConsumer : consumers.values()) { + eventConsumer.stop(); + } + listenerRegistrations.clear(); + serviceRegistrations.clear(); + pendingListeners.clear(); + consumers.clear(); + producers.clear(); } } @@ -89,15 +88,23 @@ public void added(Collection listeners) { ClassLoader originalClassLoader = Thread.currentThread().getContextClassLoader(); try { Thread.currentThread().setContextClassLoader(getClass().getClassLoader()); + final Set endpointDescriptions = new HashSet(remoteEndpoints.values()); for (ListenerInfo listenerInfo : (Collection) listeners) { - - if (listenerInfo.getBundleContext() == bundleContext || listenerInfo.getFilter() == null) { - continue; + if (listenerInfo.getFilter() == null) { + LOGGER.trace("CELLAR DOSGI: skip adding listener with no filter for bundle {}", listenerInfo.getBundleContext().getBundle().getBundleId()); + } else if (listenerInfo.getBundleContext() == bundleContext) { + LOGGER.trace("CELLAR DOSGI: skip adding listener {} with same bundle context for bundle {}", listenerInfo.getFilter(), listenerInfo.getBundleContext().getBundle().getBundleId()); + } else if (listenerInfo.isRemoved()) { + LOGGER.trace("CELLAR DOSGI: skip adding already removed listener {} for bundle {}", listenerInfo.getFilter(), listenerInfo.getBundleContext().getBundle().getBundleId()); + } else { + // make sure we only import remote services + if (!checkListener(listenerInfo, endpointDescriptions)) { + LOGGER.trace("CELLAR DOSGI: adding pending listener {} for bundle {}", listenerInfo.getFilter(), listenerInfo.getBundleContext().getBundle().getBundleId()); + if (!pendingListeners.add(listenerInfo)) { + LOGGER.warn("CELLAR DOSGI: pending listener {} for bundle {} already added!", listenerInfo.getFilter(), listenerInfo.getBundleContext().getBundle().getBundleId()); + } + } } - - pendingListeners.add(listenerInfo); - // make sure we only import remote services - checkListener(listenerInfo); } } finally { Thread.currentThread().setContextClassLoader(originalClassLoader); @@ -109,27 +116,51 @@ public void removed(Collection listeners) { ClassLoader originalClassLoader = Thread.currentThread().getContextClassLoader(); try { Thread.currentThread().setContextClassLoader(getClass().getClassLoader()); + final Set endpointDescriptions = new HashSet(remoteEndpoints.values()); for (ListenerInfo listenerInfo : (Collection) listeners) { - if (listenerInfo.getBundleContext() == bundleContext || listenerInfo.getFilter() == null) { - continue; - } - - // make sure we only import remote services - String filter = "(&" + listenerInfo.getFilter() + "(!(" + Constants.ENDPOINT_FRAMEWORK_UUID + "=" + clusterManager.getNode().getId() + ")))"; - // iterate through known services and import them if needed - Set matches = new LinkedHashSet(); - for (Map.Entry entry : remoteEndpoints.entrySet()) { - EndpointDescription endpointDescription = entry.getValue(); - if (endpointDescription.matches(filter)) { - matches.add(endpointDescription); + if (listenerInfo.getFilter() == null) { + LOGGER.trace("CELLAR DOSGI: skip removing listener with no filter for bundle {}", listenerInfo.getBundleContext().getBundle().getBundleId()); + LOGGER.trace("CELLAR DOSGI: removing pending listener for bundle {}", listenerInfo.getBundleContext().getBundle().getBundleId()); + if (!pendingListeners.remove(listenerInfo)) { + LOGGER.warn("CELLAR DOSGI: missing pending listener for bundle {} for removal!", listenerInfo.getBundleContext().getBundle().getBundleId()); + } + } else { + if (listenerInfo.getBundleContext() == bundleContext) { + LOGGER.trace("CELLAR DOSGI: skip removing listener {} with same bundle context for bundle {}", listenerInfo.getFilter(), listenerInfo.getBundleContext().getBundle().getBundleId()); + } else if (!listenerRegistrations.containsKey(listenerInfo)) { + LOGGER.trace("CELLAR DOSGI: skip removing unregistered listener {} for bundle {}", listenerInfo.getFilter(), listenerInfo.getBundleContext().getBundle().getBundleId()); + } else { + // make sure we only match remote services for this listener + String filter = "(&" + listenerInfo.getFilter() + "(!(" + Constants.ENDPOINT_FRAMEWORK_UUID + "=" + clusterManager.getNode().getId() + ")))"; + // iterate through known services and un-import them if needed + Set filteredEndpointEndpointDescriptions = new LinkedHashSet(); + for (EndpointDescription endpointDescription : endpointDescriptions) { + if (endpointDescription.matches(filter)) { + filteredEndpointEndpointDescriptions.add(endpointDescription); + } + } + synchronized (listenerRegistrations) { + for (EndpointDescription filteredEndpointEndpointDescription : filteredEndpointEndpointDescriptions) { + Iterator> iterator = listenerRegistrations.entrySet().iterator(); + while (iterator.hasNext()) { + Map.Entry entry = iterator.next(); + if (entry.getKey().equals(listenerInfo) && entry.getValue().equals(filteredEndpointEndpointDescription.getId())) { + LOGGER.trace("CELLAR DOSGI: removing registered listener {} for bundle {}", listenerInfo.getFilter(), listenerInfo.getBundleContext().getBundle().getBundleId()); + iterator.remove(); + } + } + // un-import service from registry if last listener for this filter is removed + if (!listenerRegistrations.containsValue(filteredEndpointEndpointDescription.getId())) { + unImportService(filteredEndpointEndpointDescription.getId()); + } + } + } + LOGGER.trace("CELLAR DOSGI: removing pending listener {} for bundle {}", listenerInfo.getFilter(), listenerInfo.getBundleContext().getBundle().getBundleId()); + if (!pendingListeners.remove(listenerInfo)) { + LOGGER.warn("CELLAR DOSGI: missing pending listener {} for bundle {} for removal!", listenerInfo.getFilter(), listenerInfo.getBundleContext().getBundle().getBundleId()); + } } } - - for (EndpointDescription endpoint : matches) { - unImportService(endpoint); - } - - pendingListeners.remove(listenerInfo); } } finally { Thread.currentThread().setContextClassLoader(originalClassLoader); @@ -137,78 +168,82 @@ public void removed(Collection listeners) { } /** - * Check if there is a match for the current {@link ListenerInfo}. + * Check if there is a match for the current {@link ListenerInfo} for importing. * * @param listenerInfo the listener info. */ - private void checkListener(ListenerInfo listenerInfo) { + private boolean checkListener(ListenerInfo listenerInfo, Set endpointDescriptions) { + // could be removed by bundles restarting ClassLoader originalClassLoader = Thread.currentThread().getContextClassLoader(); try { Thread.currentThread().setContextClassLoader(getClass().getClassLoader()); // iterate through known services and import them if needed - Set matches = new LinkedHashSet(); - for (Map.Entry entry : remoteEndpoints.entrySet()) { - EndpointDescription endpointDescription = entry.getValue(); + Set matchedEndpointDescriptions = new LinkedHashSet(); + for (EndpointDescription endpointDescription : endpointDescriptions) { + // match endpoint on OSGi filter and if not this node is already registered if (endpointDescription.matches(listenerInfo.getFilter()) && !endpointDescription.getNodes().contains(clusterManager.getNode().getId())) { - matches.add(endpointDescription); + matchedEndpointDescriptions.add(endpointDescription); + LOGGER.trace("CELLAR DOSGI: remote endpoint {} available for listener {}", endpointDescription.getId(), listenerInfo.getFilter()); } } - - for (EndpointDescription endpoint : matches) { - importService(endpoint, listenerInfo); + for (EndpointDescription matchedRemoteEndpointDescription : matchedEndpointDescriptions) { + importService(matchedRemoteEndpointDescription, listenerInfo); } + return matchedEndpointDescriptions.size() > 0; + } catch (Exception e) { + LOGGER.error("CELLAR DOSGI: listener {} check failed", listenerInfo.getFilter()); } finally { Thread.currentThread().setContextClassLoader(originalClassLoader); } + return false; } /** * Import a remote service to the service registry. * - * @param endpoint the endpoint to import. - * @param listenerInfo the associated listener info. + * @param endpointDescription the endpoint to import. + * @param listenerInfo the associated listener info. */ - private void importService(EndpointDescription endpoint, ListenerInfo listenerInfo) { - LOGGER.debug("CELLAR DOSGI: importing remote service"); + private void importService(EndpointDescription endpointDescription, ListenerInfo listenerInfo) { + LOGGER.debug("CELLAR DOSGI: importing remote endpoint {} with filter {}", endpointDescription.getId(), listenerInfo.getFilter()); - EventProducer requestProducer = producers.get(endpoint.getId()); + EventProducer requestProducer = producers.get(endpointDescription.getId()); if (requestProducer == null) { - requestProducer = eventTransportFactory.getEventProducer(Constants.INTERFACE_PREFIX + Constants.SEPARATOR + endpoint.getId(), Boolean.FALSE); - producers.put(endpoint.getId(), requestProducer); + LOGGER.trace("CELLAR DOSGI: creating new request producer"); + requestProducer = eventTransportFactory.getEventProducer(Constants.INTERFACE_PREFIX + Constants.SEPARATOR + endpointDescription.getId(), Boolean.FALSE); + producers.put(endpointDescription.getId(), requestProducer); } - EventConsumer resultConsumer = consumers.get(endpoint.getId()); + EventConsumer resultConsumer = consumers.get(endpointDescription.getId()); if (resultConsumer == null) { - resultConsumer = eventTransportFactory.getEventConsumer(Constants.RESULT_PREFIX + Constants.SEPARATOR + clusterManager.getNode().getId() + endpoint.getId(), Boolean.FALSE); - consumers.put(endpoint.getId(), resultConsumer); + LOGGER.trace("CELLAR DOSGI: creating new result consumer"); + resultConsumer = eventTransportFactory.getEventConsumer(Constants.RESULT_PREFIX + Constants.SEPARATOR + clusterManager.getNode().getId() + endpointDescription.getId(), Boolean.FALSE); + consumers.put(endpointDescription.getId(), resultConsumer); } else if (!resultConsumer.isConsuming()) { resultConsumer.start(); } - producers.put(endpoint.getId(), requestProducer); - consumers.put(endpoint.getId(), resultConsumer); - + LOGGER.trace("CELLAR DOSGI: adding service registration for {}", endpointDescription.getServiceClass()); ExecutionContext executionContext = new ClusteredExecutionContext(requestProducer, commandStore); - - RemoteServiceFactory remoteServiceFactory = new RemoteServiceFactory(endpoint, clusterManager, executionContext); - ServiceRegistration registration = listenerInfo.getBundleContext().registerService(endpoint.getServiceClass(), - remoteServiceFactory, - new Hashtable(endpoint.getProperties())); - registrations.put(endpoint, registration); - pendingListeners.remove(listenerInfo); + RemoteServiceFactory remoteServiceFactory = new RemoteServiceFactory(endpointDescription, clusterManager, executionContext); + ServiceRegistration serviceRegistration = listenerInfo.getBundleContext().registerService(endpointDescription.getServiceClass(), remoteServiceFactory, new Hashtable(endpointDescription.getProperties())); + serviceRegistrations.put(endpointDescription.getId(), serviceRegistration); + listenerRegistrations.put(listenerInfo, endpointDescription.getId()); } /** * Un-register an imported service. * - * @param endpoint the endpoint to un-register. + * @param endpointId the endpoint to un-register. */ - private void unImportService(EndpointDescription endpoint) { - ServiceRegistration registration = registrations.get(endpoint); - registration.unregister(); + private void unImportService(String endpointId) { + LOGGER.debug("CELLAR DOSGI: un-importing remote service with endpoint {}", endpointId); + + ServiceRegistration registration = serviceRegistrations.remove(endpointId); + if (registration != null) registration.unregister(); - producers.remove(endpoint.getId()); - EventConsumer consumer = consumers.remove(endpoint.getId()); + producers.remove(endpointId); + EventConsumer consumer = consumers.remove(endpointId); if (consumer != null) { consumer.stop(); } @@ -246,4 +281,52 @@ public void setEventTransportFactory(EventTransportFactory eventTransportFactory this.eventTransportFactory = eventTransportFactory; } -} + private static class RemoteServiceTracker implements Runnable { + + private static final Logger LOGGER = LoggerFactory.getLogger(RemoteServiceTracker.class); + + private final ImportServiceListener importServiceListener; + + public RemoteServiceTracker(ImportServiceListener importServiceListener) { + this.importServiceListener = importServiceListener; + } + + @Override + public void run() { + try { + final Set> localRemoteEndpointEntries = new HashSet>(importServiceListener.remoteEndpoints.entrySet()); + final Set localRemoteEndpointValues = new HashSet(localRemoteEndpointEntries.size()); + final Set localRemoteEndpointKeys = new HashSet(localRemoteEndpointEntries.size()); + for (Map.Entry localRemoteEndpointEntry : localRemoteEndpointEntries) { + localRemoteEndpointValues.add(localRemoteEndpointEntry.getValue()); + localRemoteEndpointKeys.add(localRemoteEndpointEntry.getKey()); + } + LOGGER.trace("CELLAR DOSGI: running the remote service tracker task having {} endpoints, {} pending listeners, {} registered listeners and {} service registrations", + localRemoteEndpointEntries.size(), importServiceListener.pendingListeners.size(), importServiceListener.listenerRegistrations.size(), importServiceListener.serviceRegistrations.size()); + synchronized (importServiceListener.pendingListeners) { + Iterator iterator = importServiceListener.pendingListeners.iterator(); + while (iterator.hasNext()) { + ListenerInfo listenerInfo = iterator.next(); + if (importServiceListener.checkListener(listenerInfo, localRemoteEndpointValues)) { + iterator.remove(); + } + } + } + synchronized (importServiceListener.listenerRegistrations) { + Iterator listenerRegistrationIterator = importServiceListener.listenerRegistrations.values().iterator(); + while (listenerRegistrationIterator.hasNext()) { + String registeredEndpointId = listenerRegistrationIterator.next(); + if (!localRemoteEndpointKeys.contains(registeredEndpointId)) { + LOGGER.trace("CELLAR DOSGI: registered remote endpoint {} unavailable", registeredEndpointId); + importServiceListener.unImportService(registeredEndpointId); + listenerRegistrationIterator.remove(); + } + } + } + } catch (Exception e) { + LOGGER.error("CELLAR DOSGI: " + e.getClass().getSimpleName() + " / " + e.getMessage()); + } + } + } + +} \ No newline at end of file diff --git a/dosgi/src/main/java/org/apache/karaf/cellar/dosgi/RemoteServiceCallHandler.java b/dosgi/src/main/java/org/apache/karaf/cellar/dosgi/RemoteServiceCallHandler.java index 810d4f4e7..e7068fb7c 100644 --- a/dosgi/src/main/java/org/apache/karaf/cellar/dosgi/RemoteServiceCallHandler.java +++ b/dosgi/src/main/java/org/apache/karaf/cellar/dosgi/RemoteServiceCallHandler.java @@ -41,7 +41,7 @@ public class RemoteServiceCallHandler extends CellarSupport implements EventHand public static final String SWITCH_ID = "org.apache.karaf.cellar.dosgi.switch"; - private static final transient Logger LOGGER = LoggerFactory.getLogger(RemoteServiceCallHandler.class); + private static final Logger LOGGER = LoggerFactory.getLogger(RemoteServiceCallHandler.class); private final Switch dosgiSwitch = new BasicSwitch(SWITCH_ID); diff --git a/dosgi/src/main/java/org/apache/karaf/cellar/dosgi/RemoteServiceFactory.java b/dosgi/src/main/java/org/apache/karaf/cellar/dosgi/RemoteServiceFactory.java index 11d132149..b112b4306 100644 --- a/dosgi/src/main/java/org/apache/karaf/cellar/dosgi/RemoteServiceFactory.java +++ b/dosgi/src/main/java/org/apache/karaf/cellar/dosgi/RemoteServiceFactory.java @@ -28,9 +28,9 @@ */ public class RemoteServiceFactory implements ServiceFactory { - private EndpointDescription description; - private ClusterManager clusterManager; - private ExecutionContext executionContext; + private final EndpointDescription description; + private final ClusterManager clusterManager; + private final ExecutionContext executionContext; public RemoteServiceFactory(EndpointDescription description, ClusterManager clusterManager, ExecutionContext executionContext) { this.description = description; @@ -43,12 +43,12 @@ public Object getService(Bundle bundle, ServiceRegistration registration) { ClassLoader classLoader = new RemoteServiceProxyClassLoader(bundle); List interfaces = new ArrayList(); String interfaceName = description.getServiceClass(); - try { - interfaces.add(classLoader.loadClass(interfaceName)); - } catch (ClassNotFoundException e) { - // Ignore - } - RemoteServiceInvocationHandler handler = new RemoteServiceInvocationHandler(description.getId(), interfaceName,clusterManager,executionContext); + try { + interfaces.add(classLoader.loadClass(interfaceName)); + } catch (ClassNotFoundException e) { + // Ignore + } + RemoteServiceInvocationHandler handler = new RemoteServiceInvocationHandler(description.getId(), interfaceName, clusterManager, executionContext); return Proxy.newProxyInstance(classLoader, interfaces.toArray(new Class[interfaces.size()]), handler); } diff --git a/dosgi/src/main/java/org/apache/karaf/cellar/dosgi/RemoteServiceInvocationHandler.java b/dosgi/src/main/java/org/apache/karaf/cellar/dosgi/RemoteServiceInvocationHandler.java index 79099f837..2f3ff7055 100644 --- a/dosgi/src/main/java/org/apache/karaf/cellar/dosgi/RemoteServiceInvocationHandler.java +++ b/dosgi/src/main/java/org/apache/karaf/cellar/dosgi/RemoteServiceInvocationHandler.java @@ -23,18 +23,19 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.concurrent.CancellationException; /** * Handler for cluster remote service invocation event. */ public class RemoteServiceInvocationHandler implements InvocationHandler { - private String endpointId; - private String serviceClass; - private ClusterManager clusterManager; - private ExecutionContext executionContext; + private final String endpointId; + private final String serviceClass; + private final ClusterManager clusterManager; + private final ExecutionContext executionContext; - public RemoteServiceInvocationHandler(String endpointId,String serviceClass, ClusterManager clusterManager, ExecutionContext executionContext) { + public RemoteServiceInvocationHandler(String endpointId, String serviceClass, ClusterManager clusterManager, ExecutionContext executionContext) { this.endpointId = endpointId; this.serviceClass = serviceClass; this.clusterManager = clusterManager; @@ -49,17 +50,17 @@ public Object invoke(Object o, Method method, Object[] arguments) throws Throwab remoteServiceCall.setServiceClass(serviceClass); List argumentList = new LinkedList(); - if(arguments != null && arguments.length > 0) { - for(Object arg:arguments) { + if (arguments != null && arguments.length > 0) { + for (Object arg : arguments) { argumentList.add(arg); } } remoteServiceCall.setArguments(argumentList); - Map results = executionContext.execute(remoteServiceCall); + Map results = executionContext.execute(remoteServiceCall); - if(results != null) { - for(Map.Entry entry:results.entrySet()) { + if (results != null) { + for (Map.Entry entry : results.entrySet()) { RemoteServiceResult result = entry.getValue(); // an exception being thrown by the remote service call must be raised locally @@ -75,7 +76,7 @@ public Object invoke(Object o, Method method, Object[] arguments) throws Throwab return result.getResult(); } } - return null; + throw new CancellationException(String.format("No remote service execution results for %s", serviceClass)); } } diff --git a/dosgi/src/main/java/org/apache/karaf/cellar/dosgi/RemoteServiceProxyClassLoader.java b/dosgi/src/main/java/org/apache/karaf/cellar/dosgi/RemoteServiceProxyClassLoader.java index 4c5e430d6..fa8565de0 100644 --- a/dosgi/src/main/java/org/apache/karaf/cellar/dosgi/RemoteServiceProxyClassLoader.java +++ b/dosgi/src/main/java/org/apache/karaf/cellar/dosgi/RemoteServiceProxyClassLoader.java @@ -20,7 +20,7 @@ */ public class RemoteServiceProxyClassLoader extends ClassLoader { - private Bundle bundle; + private final Bundle bundle; public RemoteServiceProxyClassLoader(Bundle bundle) { this.bundle = bundle; diff --git a/dosgi/src/main/java/org/apache/karaf/cellar/dosgi/internal/osgi/Activator.java b/dosgi/src/main/java/org/apache/karaf/cellar/dosgi/internal/osgi/Activator.java index ae0e8d1f4..36944594a 100644 --- a/dosgi/src/main/java/org/apache/karaf/cellar/dosgi/internal/osgi/Activator.java +++ b/dosgi/src/main/java/org/apache/karaf/cellar/dosgi/internal/osgi/Activator.java @@ -17,7 +17,10 @@ import org.apache.karaf.cellar.core.command.CommandStore; import org.apache.karaf.cellar.core.event.EventHandler; import org.apache.karaf.cellar.core.event.EventTransportFactory; -import org.apache.karaf.cellar.dosgi.*; +import org.apache.karaf.cellar.dosgi.ExportServiceListener; +import org.apache.karaf.cellar.dosgi.ImportServiceListener; +import org.apache.karaf.cellar.dosgi.RemoteServiceCallHandler; +import org.apache.karaf.cellar.dosgi.RemoteServiceResultHandler; import org.apache.karaf.cellar.dosgi.management.ServiceMBean; import org.apache.karaf.cellar.dosgi.management.internal.ServiceMBeanImpl; import org.apache.karaf.util.tracker.BaseActivator; diff --git a/dosgi/src/main/java/org/apache/karaf/cellar/dosgi/internal/osgi/RemovedNodeServiceTracker.java b/dosgi/src/main/java/org/apache/karaf/cellar/dosgi/internal/osgi/RemovedNodeServiceTracker.java index 1bad57afc..6f1bce16f 100644 --- a/dosgi/src/main/java/org/apache/karaf/cellar/dosgi/internal/osgi/RemovedNodeServiceTracker.java +++ b/dosgi/src/main/java/org/apache/karaf/cellar/dosgi/internal/osgi/RemovedNodeServiceTracker.java @@ -32,13 +32,13 @@ */ public class RemovedNodeServiceTracker implements Runnable { - private static final transient Logger LOGGER = LoggerFactory.getLogger(RemovedNodeServiceTracker.class); + private static final Logger LOGGER = LoggerFactory.getLogger(RemovedNodeServiceTracker.class); private ClusterManager clusterManager; private Map remoteEndpoints; - private ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); + private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); public void init() { remoteEndpoints = clusterManager.getMap(Constants.REMOTE_ENDPOINTS); @@ -60,7 +60,7 @@ public void setClusterManager(ClusterManager clusterManager) { @Override public void run() { - LOGGER.trace("CELLAR DOSGI: running the service tracker task"); + LOGGER.trace("CELLAR DOSGI: running the remote node tracker task"); ClassLoader originalClassLoader = Thread.currentThread().getContextClassLoader(); try { if (!remoteEndpoints.isEmpty()) { diff --git a/dosgi/src/main/java/org/apache/karaf/cellar/dosgi/management/ServiceMBean.java b/dosgi/src/main/java/org/apache/karaf/cellar/dosgi/management/ServiceMBean.java index eb4bc5f12..07540788e 100644 --- a/dosgi/src/main/java/org/apache/karaf/cellar/dosgi/management/ServiceMBean.java +++ b/dosgi/src/main/java/org/apache/karaf/cellar/dosgi/management/ServiceMBean.java @@ -21,6 +21,6 @@ */ public interface ServiceMBean { - public Map> getServices(); + Map> getServices(); } diff --git a/dosgi/src/main/java/org/apache/karaf/cellar/dosgi/management/internal/ServiceMBeanImpl.java b/dosgi/src/main/java/org/apache/karaf/cellar/dosgi/management/internal/ServiceMBeanImpl.java index a5272f448..707d8ab5a 100644 --- a/dosgi/src/main/java/org/apache/karaf/cellar/dosgi/management/internal/ServiceMBeanImpl.java +++ b/dosgi/src/main/java/org/apache/karaf/cellar/dosgi/management/internal/ServiceMBeanImpl.java @@ -21,7 +21,11 @@ import javax.management.NotCompliantMBeanException; import javax.management.StandardMBean; -import java.util.*; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; public class ServiceMBeanImpl extends StandardMBean implements ServiceMBean { diff --git a/dosgi/src/main/java/org/apache/karaf/cellar/dosgi/shell/ListDistributedServicesCommand.java b/dosgi/src/main/java/org/apache/karaf/cellar/dosgi/shell/ListDistributedServicesCommand.java index 67fb074ad..86830a54b 100644 --- a/dosgi/src/main/java/org/apache/karaf/cellar/dosgi/shell/ListDistributedServicesCommand.java +++ b/dosgi/src/main/java/org/apache/karaf/cellar/dosgi/shell/ListDistributedServicesCommand.java @@ -21,8 +21,9 @@ import org.apache.karaf.shell.api.action.lifecycle.Service; import org.apache.karaf.shell.support.table.ShellTable; +import java.util.Arrays; +import java.util.Comparator; import java.util.Map; -import java.util.Set; @Command(scope = "cluster", name = "service-list", description = "List the services available on the cluster") @Service @@ -38,17 +39,31 @@ protected Object doExecute() throws Exception { ShellTable table = new ShellTable(); table.column("Service Class"); table.column("Provider Node"); - for (Map.Entry entry : remoteEndpoints.entrySet()) { + table.column("Endpoint ID"); + Map.Entry[] entrySet = remoteEndpoints.entrySet().toArray(new Map.Entry[0]); + Arrays.sort(entrySet, new Comparator>() { + @Override + public int compare(Map.Entry a, Map.Entry b) { + return a.getKey().compareTo(b.getKey()); + } + }); + for (Map.Entry entry : entrySet) { EndpointDescription endpointDescription = entry.getValue(); String serviceClass = endpointDescription.getServiceClass(); - Set nodes = endpointDescription.getNodes(); + String endpointId = endpointDescription.getId(); + Node[] nodes = endpointDescription.getNodes().toArray(new Node[0]); + Arrays.sort(nodes, new Comparator() { + @Override + public int compare(Node a, Node b) { + return (a.getHost() + a.getPort()).compareTo(b.getHost() + b.getPort()); + } + }); for (Node node : nodes) { String nodeName = node.getAlias(); if (nodeName == null) { nodeName = node.getId(); } - table.addRow().addContent(serviceClass, nodeName); - serviceClass = ""; + table.addRow().addContent(serviceClass, nodeName, endpointId); } } table.print(System.out); diff --git a/dosgi/src/test/java/org/apache/karaf/cellar/dosgi/EndpointDescriptionTest.java b/dosgi/src/test/java/org/apache/karaf/cellar/dosgi/EndpointDescriptionTest.java index c13f52dca..968880574 100644 --- a/dosgi/src/test/java/org/apache/karaf/cellar/dosgi/EndpointDescriptionTest.java +++ b/dosgi/src/test/java/org/apache/karaf/cellar/dosgi/EndpointDescriptionTest.java @@ -16,6 +16,8 @@ import org.junit.Assert; import org.junit.Test; +import java.util.HashMap; + public class EndpointDescriptionTest { String objectClass = "org.apache.karaf.cellar.dosgi.Test"; @@ -24,11 +26,14 @@ public class EndpointDescriptionTest { @Test public void testMatches() throws Exception { // this is a dummy test for testing the behaviour of matches method + String version = "1.0.0"; String testEndpointFilter = String.format(filterPattern, objectClass); - String endpointId = objectClass + Constants.SEPARATOR + "1.0.0"; + String endpointId = objectClass + Constants.SEPARATOR + version; + HashMap exportedProperties = new HashMap(); + exportedProperties.put(org.osgi.framework.Constants.OBJECTCLASS, objectClass); - EndpointDescription endpointDescription1 = new EndpointDescription(endpointId, null); - EndpointDescription endpointDescription2 = new EndpointDescription(endpointId, null); + EndpointDescription endpointDescription1 = new EndpointDescription(endpointId, null, exportedProperties); + EndpointDescription endpointDescription2 = new EndpointDescription(endpointId, null, exportedProperties); Assert.assertTrue(endpointDescription1.matches(testEndpointFilter)); Assert.assertTrue(endpointDescription2.matches(testEndpointFilter)); } From 653fc4d5da3894dc015555b678a138f41eb68a17 Mon Sep 17 00:00:00 2001 From: betzm Date: Fri, 12 Aug 2022 16:18:20 +0200 Subject: [PATCH 2/7] Polishing --- .../karaf/cellar/dosgi/ImportServiceListener.java | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/dosgi/src/main/java/org/apache/karaf/cellar/dosgi/ImportServiceListener.java b/dosgi/src/main/java/org/apache/karaf/cellar/dosgi/ImportServiceListener.java index a5dda62f4..06b058569 100644 --- a/dosgi/src/main/java/org/apache/karaf/cellar/dosgi/ImportServiceListener.java +++ b/dosgi/src/main/java/org/apache/karaf/cellar/dosgi/ImportServiceListener.java @@ -88,6 +88,7 @@ public void added(Collection listeners) { ClassLoader originalClassLoader = Thread.currentThread().getContextClassLoader(); try { Thread.currentThread().setContextClassLoader(getClass().getClassLoader()); + // create a clone of remote endpoint descriptions to avoid concurrency concerns while iterating it final Set endpointDescriptions = new HashSet(remoteEndpoints.values()); for (ListenerInfo listenerInfo : (Collection) listeners) { if (listenerInfo.getFilter() == null) { @@ -116,6 +117,7 @@ public void removed(Collection listeners) { ClassLoader originalClassLoader = Thread.currentThread().getContextClassLoader(); try { Thread.currentThread().setContextClassLoader(getClass().getClassLoader()); + // create a clone of remote endpoint descriptions to avoid concurrency concerns while iterating it final Set endpointDescriptions = new HashSet(remoteEndpoints.values()); for (ListenerInfo listenerInfo : (Collection) listeners) { if (listenerInfo.getFilter() == null) { @@ -170,7 +172,8 @@ public void removed(Collection listeners) { /** * Check if there is a match for the current {@link ListenerInfo} for importing. * - * @param listenerInfo the listener info. + * @param listenerInfo the listener info. + * @param endpointDescriptions local copy of remote endpoint descriptions. */ private boolean checkListener(ListenerInfo listenerInfo, Set endpointDescriptions) { // could be removed by bundles restarting @@ -294,7 +297,8 @@ public RemoteServiceTracker(ImportServiceListener importServiceListener) { @Override public void run() { try { - final Set> localRemoteEndpointEntries = new HashSet>(importServiceListener.remoteEndpoints.entrySet()); + // create a clone of remote endpoints to avoid concurrency concerns while iterating it + final Set> localRemoteEndpointEntries = new HashSet(importServiceListener.remoteEndpoints.entrySet()); final Set localRemoteEndpointValues = new HashSet(localRemoteEndpointEntries.size()); final Set localRemoteEndpointKeys = new HashSet(localRemoteEndpointEntries.size()); for (Map.Entry localRemoteEndpointEntry : localRemoteEndpointEntries) { @@ -324,7 +328,7 @@ public void run() { } } } catch (Exception e) { - LOGGER.error("CELLAR DOSGI: " + e.getClass().getSimpleName() + " / " + e.getMessage()); + LOGGER.error("CELLAR DOSGI: {} / {}", e.getClass().getSimpleName(), e.getMessage()); } } } From 5020522263e0c388f6de997d9e491d45be12d9e0 Mon Sep 17 00:00:00 2001 From: betzm Date: Fri, 12 Aug 2022 16:43:40 +0200 Subject: [PATCH 3/7] Removed unnecessary logging --- .../org/apache/karaf/cellar/dosgi/ImportServiceListener.java | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/dosgi/src/main/java/org/apache/karaf/cellar/dosgi/ImportServiceListener.java b/dosgi/src/main/java/org/apache/karaf/cellar/dosgi/ImportServiceListener.java index 06b058569..8ab9a563b 100644 --- a/dosgi/src/main/java/org/apache/karaf/cellar/dosgi/ImportServiceListener.java +++ b/dosgi/src/main/java/org/apache/karaf/cellar/dosgi/ImportServiceListener.java @@ -158,9 +158,7 @@ public void removed(Collection listeners) { } } LOGGER.trace("CELLAR DOSGI: removing pending listener {} for bundle {}", listenerInfo.getFilter(), listenerInfo.getBundleContext().getBundle().getBundleId()); - if (!pendingListeners.remove(listenerInfo)) { - LOGGER.warn("CELLAR DOSGI: missing pending listener {} for bundle {} for removal!", listenerInfo.getFilter(), listenerInfo.getBundleContext().getBundle().getBundleId()); - } + pendingListeners.remove(listenerInfo); } } } From c33bde5de47a817b1789947cc1684d8ed3c4c5e6 Mon Sep 17 00:00:00 2001 From: betzm Date: Fri, 12 Aug 2022 22:39:30 +0200 Subject: [PATCH 4/7] Removed unnecessary logging --- .../org/apache/karaf/cellar/dosgi/ImportServiceListener.java | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/dosgi/src/main/java/org/apache/karaf/cellar/dosgi/ImportServiceListener.java b/dosgi/src/main/java/org/apache/karaf/cellar/dosgi/ImportServiceListener.java index 8ab9a563b..d2a072b71 100644 --- a/dosgi/src/main/java/org/apache/karaf/cellar/dosgi/ImportServiceListener.java +++ b/dosgi/src/main/java/org/apache/karaf/cellar/dosgi/ImportServiceListener.java @@ -123,9 +123,7 @@ public void removed(Collection listeners) { if (listenerInfo.getFilter() == null) { LOGGER.trace("CELLAR DOSGI: skip removing listener with no filter for bundle {}", listenerInfo.getBundleContext().getBundle().getBundleId()); LOGGER.trace("CELLAR DOSGI: removing pending listener for bundle {}", listenerInfo.getBundleContext().getBundle().getBundleId()); - if (!pendingListeners.remove(listenerInfo)) { - LOGGER.warn("CELLAR DOSGI: missing pending listener for bundle {} for removal!", listenerInfo.getBundleContext().getBundle().getBundleId()); - } + pendingListeners.remove(listenerInfo); } else { if (listenerInfo.getBundleContext() == bundleContext) { LOGGER.trace("CELLAR DOSGI: skip removing listener {} with same bundle context for bundle {}", listenerInfo.getFilter(), listenerInfo.getBundleContext().getBundle().getBundleId()); From ccabf4f19a19a03f878e54d4a81667f0e3a34272 Mon Sep 17 00:00:00 2001 From: betzm Date: Mon, 15 Aug 2022 09:05:55 +0200 Subject: [PATCH 5/7] Removed faulty pending listener remove --- .../cellar/dosgi/ImportServiceListener.java | 56 +++++++++---------- 1 file changed, 26 insertions(+), 30 deletions(-) diff --git a/dosgi/src/main/java/org/apache/karaf/cellar/dosgi/ImportServiceListener.java b/dosgi/src/main/java/org/apache/karaf/cellar/dosgi/ImportServiceListener.java index d2a072b71..89bdd5ff9 100644 --- a/dosgi/src/main/java/org/apache/karaf/cellar/dosgi/ImportServiceListener.java +++ b/dosgi/src/main/java/org/apache/karaf/cellar/dosgi/ImportServiceListener.java @@ -122,42 +122,38 @@ public void removed(Collection listeners) { for (ListenerInfo listenerInfo : (Collection) listeners) { if (listenerInfo.getFilter() == null) { LOGGER.trace("CELLAR DOSGI: skip removing listener with no filter for bundle {}", listenerInfo.getBundleContext().getBundle().getBundleId()); - LOGGER.trace("CELLAR DOSGI: removing pending listener for bundle {}", listenerInfo.getBundleContext().getBundle().getBundleId()); - pendingListeners.remove(listenerInfo); + } else if (listenerInfo.getBundleContext() == bundleContext) { + LOGGER.trace("CELLAR DOSGI: skip removing listener {} with same bundle context for bundle {}", listenerInfo.getFilter(), listenerInfo.getBundleContext().getBundle().getBundleId()); + } else if (!listenerRegistrations.containsKey(listenerInfo)) { + LOGGER.trace("CELLAR DOSGI: skip removing unregistered listener {} for bundle {}", listenerInfo.getFilter(), listenerInfo.getBundleContext().getBundle().getBundleId()); } else { - if (listenerInfo.getBundleContext() == bundleContext) { - LOGGER.trace("CELLAR DOSGI: skip removing listener {} with same bundle context for bundle {}", listenerInfo.getFilter(), listenerInfo.getBundleContext().getBundle().getBundleId()); - } else if (!listenerRegistrations.containsKey(listenerInfo)) { - LOGGER.trace("CELLAR DOSGI: skip removing unregistered listener {} for bundle {}", listenerInfo.getFilter(), listenerInfo.getBundleContext().getBundle().getBundleId()); - } else { - // make sure we only match remote services for this listener - String filter = "(&" + listenerInfo.getFilter() + "(!(" + Constants.ENDPOINT_FRAMEWORK_UUID + "=" + clusterManager.getNode().getId() + ")))"; - // iterate through known services and un-import them if needed - Set filteredEndpointEndpointDescriptions = new LinkedHashSet(); - for (EndpointDescription endpointDescription : endpointDescriptions) { - if (endpointDescription.matches(filter)) { - filteredEndpointEndpointDescriptions.add(endpointDescription); - } + // make sure we only match remote services for this listener + String filter = "(&" + listenerInfo.getFilter() + "(!(" + Constants.ENDPOINT_FRAMEWORK_UUID + "=" + clusterManager.getNode().getId() + ")))"; + // iterate through known services and un-import them if needed + Set filteredEndpointEndpointDescriptions = new LinkedHashSet(); + for (EndpointDescription endpointDescription : endpointDescriptions) { + if (endpointDescription.matches(filter)) { + filteredEndpointEndpointDescriptions.add(endpointDescription); } - synchronized (listenerRegistrations) { - for (EndpointDescription filteredEndpointEndpointDescription : filteredEndpointEndpointDescriptions) { - Iterator> iterator = listenerRegistrations.entrySet().iterator(); - while (iterator.hasNext()) { - Map.Entry entry = iterator.next(); - if (entry.getKey().equals(listenerInfo) && entry.getValue().equals(filteredEndpointEndpointDescription.getId())) { - LOGGER.trace("CELLAR DOSGI: removing registered listener {} for bundle {}", listenerInfo.getFilter(), listenerInfo.getBundleContext().getBundle().getBundleId()); - iterator.remove(); - } - } - // un-import service from registry if last listener for this filter is removed - if (!listenerRegistrations.containsValue(filteredEndpointEndpointDescription.getId())) { - unImportService(filteredEndpointEndpointDescription.getId()); + } + synchronized (listenerRegistrations) { + for (EndpointDescription filteredEndpointEndpointDescription : filteredEndpointEndpointDescriptions) { + Iterator> iterator = listenerRegistrations.entrySet().iterator(); + while (iterator.hasNext()) { + Map.Entry entry = iterator.next(); + if (entry.getKey().equals(listenerInfo) && entry.getValue().equals(filteredEndpointEndpointDescription.getId())) { + LOGGER.trace("CELLAR DOSGI: removing registered listener {} for bundle {}", listenerInfo.getFilter(), listenerInfo.getBundleContext().getBundle().getBundleId()); + iterator.remove(); } } + // un-import service from registry if last listener for this filter is removed + if (!listenerRegistrations.containsValue(filteredEndpointEndpointDescription.getId())) { + unImportService(filteredEndpointEndpointDescription.getId()); + } } - LOGGER.trace("CELLAR DOSGI: removing pending listener {} for bundle {}", listenerInfo.getFilter(), listenerInfo.getBundleContext().getBundle().getBundleId()); - pendingListeners.remove(listenerInfo); } + LOGGER.trace("CELLAR DOSGI: removing pending listener {} for bundle {}", listenerInfo.getFilter(), listenerInfo.getBundleContext().getBundle().getBundleId()); + pendingListeners.remove(listenerInfo); } } } finally { From 38ba3a221aad2a73145db41c126c80cf14a28773 Mon Sep 17 00:00:00 2001 From: betzm Date: Wed, 31 Aug 2022 09:43:46 +0200 Subject: [PATCH 6/7] -Extended shell service listing -Avoid registering remote endpoint on service registry if service is locally available -Added filter propagation for remote service call -Added remote service call preference for local services -Cache execution contexts containing already cached event consumers / producers avoiding stale references in assigned proxies --- dosgi/pom.xml | 2 +- .../apache/karaf/cellar/dosgi/Constants.java | 2 +- .../cellar/dosgi/EndpointDescription.java | 75 +++++++++-- .../cellar/dosgi/ExportServiceListener.java | 123 +++++++++--------- .../cellar/dosgi/ImportServiceListener.java | 83 ++++++------ .../karaf/cellar/dosgi/RemoteServiceCall.java | 27 ++-- .../dosgi/RemoteServiceCallHandler.java | 52 +++++--- .../cellar/dosgi/RemoteServiceFactory.java | 21 ++- .../dosgi/RemoteServiceInvocationHandler.java | 9 +- .../osgi/RemovedNodeServiceTracker.java | 10 +- .../shell/ListDistributedServicesCommand.java | 82 ++++++++++-- .../cellar/dosgi/EndpointDescriptionTest.java | 27 +++- .../dosgi/ExportServiceListenerTest.java | 12 +- 13 files changed, 350 insertions(+), 175 deletions(-) diff --git a/dosgi/pom.xml b/dosgi/pom.xml index b575be3bb..ecabe6f86 100644 --- a/dosgi/pom.xml +++ b/dosgi/pom.xml @@ -1,5 +1,5 @@ - +