Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.dubbo.common.utils.CollectionUtils;
import org.apache.dubbo.common.utils.UrlUtils;

import java.util.Collections;
import java.util.List;
import java.util.function.Consumer;

Expand All @@ -32,26 +33,34 @@ public class ListenerRegistryWrapper implements Registry {
LoggerFactory.getErrorTypeAwareLogger(ListenerRegistryWrapper.class);

private final Registry registry;
private final URL url;
private final List<RegistryServiceListener> listeners;

public ListenerRegistryWrapper(Registry registry, List<RegistryServiceListener> listeners) {
this(registry, registry == null ? null : registry.getUrl(), listeners);
}

public ListenerRegistryWrapper(Registry registry, URL url, List<RegistryServiceListener> listeners) {
this.registry = registry;
this.url = url;
this.listeners = listeners;
}

@Override
public URL getUrl() {
return registry.getUrl();
return registry == null ? url : registry.getUrl();
}

@Override
public boolean isAvailable() {
return registry.isAvailable();
return registry != null && registry.isAvailable();
}

@Override
public void destroy() {
registry.destroy();
if (registry != null) {
registry.destroy();
}
}

@Override
Expand Down Expand Up @@ -94,20 +103,22 @@ public void subscribe(URL url, NotifyListener listener) {
@Override
public void unsubscribe(URL url, NotifyListener listener) {
try {
registry.unsubscribe(url, listener);
if (registry != null) {
registry.unsubscribe(url, listener);
}
} finally {
listenerEvent(serviceListener -> serviceListener.onUnsubscribe(url, registry));
}
}

@Override
public boolean isServiceDiscovery() {
return registry.isServiceDiscovery();
return registry != null && registry.isServiceDiscovery();
}

@Override
public List<URL> lookup(URL url) {
return registry.lookup(url);
return registry == null ? Collections.emptyList() : registry.lookup(url);
}

public Registry getRegistry() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ public RegistryFactoryWrapper(RegistryFactory registryFactory) {
public Registry getRegistry(URL url) {
return new ListenerRegistryWrapper(
registryFactory.getRegistry(url),
url,
Collections.unmodifiableList(url.getOrDefaultApplicationModel()
.getExtensionLoader(RegistryServiceListener.class)
.getActivateExtension(url, "registry.listeners")));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.dubbo.common.url.component.ServiceConfigURL;
import org.apache.dubbo.registry.integration.DemoService;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;

Expand All @@ -31,6 +32,7 @@
import static org.apache.dubbo.registry.Constants.REGISTER_IP_KEY;
import static org.apache.dubbo.rpc.cluster.Constants.REFER_KEY;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
Expand Down Expand Up @@ -73,4 +75,89 @@ void testSubscribe() {
registryWrapper.subscribe(subscribeUrl, notifyListener);
verify(listener, times(1)).onSubscribe(subscribeUrl, registry);
}

@Test
void testNullRegistryIsSafe() {
URL registryUrl = URL.valueOf("registry://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService");
URL serviceUrl = URL.valueOf("dubbo://127.0.0.1:20881/" + DemoService.class.getName());
NotifyListener notifyListener = mock(NotifyListener.class);
RegistryServiceListener listener = mock(RegistryServiceListener.class);
ListenerRegistryWrapper wrapper =
new ListenerRegistryWrapper(null, registryUrl, Collections.singletonList(listener));

Assertions.assertEquals(registryUrl, wrapper.getUrl());
Assertions.assertFalse(wrapper.isAvailable());
Assertions.assertFalse(wrapper.isServiceDiscovery());
Assertions.assertTrue(wrapper.lookup(serviceUrl).isEmpty());
Assertions.assertDoesNotThrow(wrapper::destroy);
Assertions.assertDoesNotThrow(() -> wrapper.register(serviceUrl));
Assertions.assertDoesNotThrow(() -> wrapper.unregister(serviceUrl));
Assertions.assertDoesNotThrow(() -> wrapper.subscribe(serviceUrl, notifyListener));
Assertions.assertDoesNotThrow(() -> wrapper.unsubscribe(serviceUrl, notifyListener));

verify(listener, times(1)).onRegister(serviceUrl, null);
verify(listener, times(1)).onUnregister(serviceUrl, null);
verify(listener, times(1)).onSubscribe(serviceUrl, null);
verify(listener, times(1)).onUnsubscribe(serviceUrl, null);
}

@Test
void testDelegatesToRegistryWhenPresent() {
URL registryUrl = URL.valueOf("registry://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService");
URL serviceUrl = URL.valueOf("dubbo://127.0.0.1:20881/" + DemoService.class.getName());
Registry registry = mock(Registry.class);
NotifyListener notifyListener = mock(NotifyListener.class);
RegistryServiceListener listener = mock(RegistryServiceListener.class);

when(registry.getUrl()).thenReturn(registryUrl);
when(registry.isAvailable()).thenReturn(true);
when(registry.isServiceDiscovery()).thenReturn(true);
when(registry.lookup(serviceUrl)).thenReturn(Collections.singletonList(serviceUrl));

ListenerRegistryWrapper wrapper = new ListenerRegistryWrapper(registry, Collections.singletonList(listener));

Assertions.assertEquals(registryUrl, wrapper.getUrl());
Assertions.assertTrue(wrapper.isAvailable());
Assertions.assertTrue(wrapper.isServiceDiscovery());
Assertions.assertEquals(Collections.singletonList(serviceUrl), wrapper.lookup(serviceUrl));

wrapper.unsubscribe(serviceUrl, notifyListener);
wrapper.destroy();

verify(registry, times(2)).getUrl();
verify(registry).isAvailable();
verify(registry).isServiceDiscovery();
verify(registry).lookup(serviceUrl);
verify(registry).unsubscribe(serviceUrl, notifyListener);
verify(registry).destroy();
verify(listener, times(1)).onUnsubscribe(serviceUrl, registry);
verify(listener, never()).onSubscribe(serviceUrl, registry);
}

@Test
void testLegacyConstructorWithNullRegistry() {
URL serviceUrl = URL.valueOf("dubbo://127.0.0.1:20881/" + DemoService.class.getName());
ListenerRegistryWrapper wrapper = new ListenerRegistryWrapper(null, Collections.emptyList());

Assertions.assertNull(wrapper.getUrl());
Assertions.assertFalse(wrapper.isAvailable());
Assertions.assertFalse(wrapper.isServiceDiscovery());
Assertions.assertTrue(wrapper.lookup(serviceUrl).isEmpty());
Assertions.assertDoesNotThrow(wrapper::destroy);
}

@Test
void testRegistryPresentButUnavailable() {
URL serviceUrl = URL.valueOf("dubbo://127.0.0.1:20881/" + DemoService.class.getName());
Registry registry = mock(Registry.class);
when(registry.isAvailable()).thenReturn(false);
when(registry.isServiceDiscovery()).thenReturn(false);

ListenerRegistryWrapper wrapper = new ListenerRegistryWrapper(registry, Collections.emptyList());

Assertions.assertFalse(wrapper.isAvailable());
Assertions.assertFalse(wrapper.isServiceDiscovery());
verify(registry).isAvailable();
verify(registry).isServiceDiscovery();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.extension.ExtensionLoader;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;

Expand Down Expand Up @@ -53,4 +54,17 @@ void test() throws Exception {
Mockito.verify(listener1, Mockito.times(1)).onUnsubscribe(url, SimpleRegistryFactory.registry);
Mockito.verify(listener2, Mockito.times(1)).onUnsubscribe(url, SimpleRegistryFactory.registry);
}

@Test
void testNullRegistryUsesOriginalUrl() {
RegistryFactory nullRegistryFactory = Mockito.mock(RegistryFactory.class);
URL url = URL.valueOf("simple://localhost:8080/registry-service");
Mockito.when(nullRegistryFactory.getRegistry(url)).thenReturn(null);

Registry registry = new RegistryFactoryWrapper(nullRegistryFactory).getRegistry(url);

Assertions.assertTrue(registry instanceof ListenerRegistryWrapper);
Assertions.assertEquals(url, registry.getUrl());
Assertions.assertDoesNotThrow(() -> registry.subscribe(url, Mockito.mock(NotifyListener.class)));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -138,4 +138,20 @@ public boolean isAvailable() {
Assertions.assertFalse(registries.contains(registry1));
Assertions.assertFalse(registries.contains(registry2));
}

@Test
void testGetRegistryReturnsNullWhenCheckIsFalseAndCreateFails() {
AbstractRegistryFactory throwingRegistryFactory = new AbstractRegistryFactory() {
@Override
protected Registry createRegistry(URL url) {
throw new IllegalStateException("connect failed");
}
};
throwingRegistryFactory.setApplicationModel(ApplicationModel.defaultModel());

Registry registry = throwingRegistryFactory.getRegistry(
URL.valueOf("dubbo://" + NetUtils.getLocalHost() + ":2233?check=false"));

Assertions.assertNull(registry);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.dubbo.metadata.MetadataInfo;
import org.apache.dubbo.registry.client.DefaultServiceInstance;
import org.apache.dubbo.registry.client.ServiceDiscovery;
import org.apache.dubbo.registry.client.ServiceDiscoveryFactory;
import org.apache.dubbo.registry.client.ServiceInstance;
import org.apache.dubbo.registry.client.event.ServiceInstancesChangedEvent;
import org.apache.dubbo.registry.client.event.listener.ServiceInstancesChangedListener;
Expand All @@ -38,6 +39,7 @@

import com.google.common.collect.Sets;
import org.junit.jupiter.api.Test;
import org.mockito.MockedStatic;
import org.mockito.Mockito;

import static org.apache.dubbo.common.constants.CommonConstants.REVISION_KEY;
Expand All @@ -48,29 +50,30 @@ public class MultipleServiceDiscoveryTest {
private static String mockZkAddress = "zookeeper://mock-zk:2181?check=false";

@Test
public void testOnEvent() {
try {
String metadata_111 = "{\"app\":\"app1\",\"revision\":\"111\",\"services\":{"
+ "\"org.apache.dubbo.demo.DemoService:dubbo\":{\"name\":\"org.apache.dubbo.demo.DemoService\",\"protocol\":\"dubbo\",\"path\":\"org.apache.dubbo.demo.DemoService\",\"params\":{\"side\":\"provider\",\"release\":\"\",\"methods\":\"sayHello,sayHelloAsync\",\"deprecated\":\"false\",\"dubbo\":\"2.0.2\",\"pid\":\"72723\",\"interface\":\"org.apache.dubbo.demo.DemoService\",\"service-name-mapping\":\"true\",\"timeout\":\"3000\",\"generic\":\"false\",\"metadata-type\":\"remote\",\"delay\":\"5000\",\"application\":\"app1\",\"dynamic\":\"true\",\"REGISTRY_CLUSTER\":\"registry1\",\"anyhost\":\"true\",\"timestamp\":\"1625800233446\"}}"
+ "}}";
MetadataInfo metadataInfo = JsonUtils.toJavaObject(metadata_111, MetadataInfo.class);
ApplicationModel applicationModel = ApplicationModel.defaultModel();
applicationModel.getApplicationConfigManager().setApplication(new ApplicationConfig("app2"));
String multipleUrl = String.format(
"multiple://mock-registry:2181?reference-registry=%s&child.a1=%s&check=false",
mockZkAddress, mockZkAddress);
URL url = URL.valueOf(multipleUrl);
url.setScopeModel(applicationModel);
MultipleServiceDiscovery multipleServiceDiscovery = new MultipleServiceDiscovery(url);
Class<MultipleServiceDiscovery> msdClass = MultipleServiceDiscovery.class;
Field serviceDiscoveriesField = msdClass.getDeclaredField("serviceDiscoveries");
serviceDiscoveriesField.setAccessible(true);
ServiceDiscovery mockServiceDiscovery = Mockito.mock(ServiceDiscovery.class);
public void testOnEvent() throws Exception {
String metadata_111 = "{\"app\":\"app1\",\"revision\":\"111\",\"services\":{"
+ "\"org.apache.dubbo.demo.DemoService:dubbo\":{\"name\":\"org.apache.dubbo.demo.DemoService\",\"protocol\":\"dubbo\",\"path\":\"org.apache.dubbo.demo.DemoService\",\"params\":{\"side\":\"provider\",\"release\":\"\",\"methods\":\"sayHello,sayHelloAsync\",\"deprecated\":\"false\",\"dubbo\":\"2.0.2\",\"pid\":\"72723\",\"interface\":\"org.apache.dubbo.demo.DemoService\",\"service-name-mapping\":\"true\",\"timeout\":\"3000\",\"generic\":\"false\",\"metadata-type\":\"remote\",\"delay\":\"5000\",\"application\":\"app1\",\"dynamic\":\"true\",\"REGISTRY_CLUSTER\":\"registry1\",\"anyhost\":\"true\",\"timestamp\":\"1625800233446\"}}"
+ "}}";
MetadataInfo metadataInfo = JsonUtils.toJavaObject(metadata_111, MetadataInfo.class);
ApplicationModel applicationModel = ApplicationModel.defaultModel();
applicationModel.getApplicationConfigManager().setApplication(new ApplicationConfig("app2"));
String multipleUrl = String.format(
"multiple://mock-registry:2181?reference-registry=%s&child.a1=%s&check=false",
mockZkAddress, mockZkAddress);
URL url = URL.valueOf(multipleUrl);
url.setScopeModel(applicationModel);
ServiceDiscovery mockServiceDiscovery = Mockito.mock(ServiceDiscovery.class);
ServiceDiscoveryFactory mockFactory = Mockito.mock(ServiceDiscoveryFactory.class);
Mockito.when(mockFactory.getServiceDiscovery(Mockito.any(URL.class))).thenReturn(mockServiceDiscovery);

try (MockedStatic<ServiceDiscoveryFactory> serviceDiscoveryFactoryMockedStatic =
Mockito.mockStatic(ServiceDiscoveryFactory.class)) {
serviceDiscoveryFactoryMockedStatic
.when(() -> ServiceDiscoveryFactory.getExtension(Mockito.any(URL.class)))
.thenReturn(mockFactory);
Mockito.when(mockServiceDiscovery.getRemoteMetadata(Mockito.anyString(), Mockito.anyList()))
.thenReturn(metadataInfo);
Map<String, ServiceDiscovery> mockServiceDiscoveries = new HashMap<>();
mockServiceDiscoveries.put("child.a1", mockServiceDiscovery);
serviceDiscoveriesField.set(multipleServiceDiscovery, mockServiceDiscoveries);
MultipleServiceDiscovery multipleServiceDiscovery = new MultipleServiceDiscovery(url);
MultipleServiceDiscovery.MultiServiceInstancesChangedListener listener =
(MultipleServiceDiscovery.MultiServiceInstancesChangedListener)
multipleServiceDiscovery.createListener(Sets.newHashSet("app1"));
Expand All @@ -93,10 +96,6 @@ public void testOnEvent() {
(Map<String, List<ServiceInstancesChangedListener.ProtocolServiceKeyWithUrls>>)
serviceUrlsField.get(listener);
Assert.assertTrue(!CollectionUtils.isEmptyMap(map), "url can not be empty");
} catch (NoSuchFieldException e) {
throw new RuntimeException(e);
} catch (IllegalAccessException e) {
throw new RuntimeException(e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.utils.StringUtils;
import org.apache.dubbo.common.utils.UrlUtils;
import org.apache.dubbo.registry.client.DefaultServiceInstance;
import org.apache.dubbo.registry.client.ServiceInstance;
import org.apache.dubbo.registry.zookeeper.ZookeeperInstance;
Expand Down Expand Up @@ -118,8 +117,8 @@ public List<ACL> getAclForPath(String path) {
throw new IllegalStateException("zookeeper client initialization failed");
}

boolean check = UrlUtils.isCheck(connectionURL);
if (check && !curatorFramework.getZookeeperClient().isConnected()) {
if (!curatorFramework.getZookeeperClient().isConnected()) {
curatorFramework.close();
throw new IllegalStateException("failed to connect to zookeeper server");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -220,9 +220,8 @@ void testRegistryNotCheckConnect() {
ApplicationModel applicationModel = ApplicationModel.defaultModel();
registryUrl.setScopeModel(applicationModel);

Assertions.assertDoesNotThrow(() -> {
new ZookeeperServiceDiscovery(applicationModel, registryUrl);
});
Assertions.assertThrowsExactly(
IllegalStateException.class, () -> new ZookeeperServiceDiscovery(applicationModel, registryUrl));
}

@AfterAll
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,9 @@
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.mockStatic;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

/**
Expand Down Expand Up @@ -98,6 +100,7 @@ void testBuildCuratorFramework() throws Exception {
CuratorFramework curatorFramework = CuratorFrameworkUtils.buildCuratorFramework(registryUrl, null);
Assertions.assertNotNull(curatorFramework);
Assertions.assertTrue(curatorFramework.getZookeeperClient().isConnected());
verify(mockCuratorFramework, never()).close();
curatorFramework.getZookeeperClient().close();
}

Expand All @@ -113,20 +116,18 @@ void testBuildServiceDiscovery() throws Exception {
@Test
void testBuildCuratorFrameworkCheckConnectDefault() {
when(mockCuratorZookeeperClient.isConnected()).thenReturn(false);
Assertions.assertThrowsExactly(IllegalStateException.class, () -> {
CuratorFramework curatorFramework = CuratorFrameworkUtils.buildCuratorFramework(registryUrl, null);
curatorFramework.getZookeeperClient().close();
});
Assertions.assertThrowsExactly(
IllegalStateException.class, () -> CuratorFrameworkUtils.buildCuratorFramework(registryUrl, null));
verify(mockCuratorFramework).close();
}

@Test
void testBuildCuratorFrameworkNotCheckConnect() {
when(mockCuratorZookeeperClient.isConnected()).thenReturn(false);
URL url = registryUrl.addParameter(CHECK_KEY, false);
Assertions.assertDoesNotThrow(() -> {
CuratorFramework curatorFramework = CuratorFrameworkUtils.buildCuratorFramework(url, null);
curatorFramework.getZookeeperClient().close();
});
Assertions.assertThrowsExactly(
IllegalStateException.class, () -> CuratorFrameworkUtils.buildCuratorFramework(url, null));
verify(mockCuratorFramework).close();
}

@Test
Expand Down
Loading
Loading