Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,9 @@

public class ConfigCenterAddressManager extends AbstractAddressManager {

public ConfigCenterAddressManager(String projectName, List<String> addresses, EventBus eventBus) {
super(projectName, addresses);
public ConfigCenterAddressManager(String projectName, List<String> addresses, String ownRegion,
String ownAvailableZone, EventBus eventBus) {
super(projectName, addresses, ownRegion, ownAvailableZone);
eventBus.register(this);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.servicecomb.http.client.common.HttpResponse;
import org.apache.servicecomb.http.client.common.HttpTransport;
import org.apache.servicecomb.http.client.common.HttpUtils;
import org.apache.servicecomb.http.client.event.OperationEvents.UnAuthorizedOperationEvent;
import org.apache.servicecomb.http.client.utils.ServiceCombServiceAvailableUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -61,12 +62,15 @@ public class ConfigCenterClient implements ConfigCenterOperation {

private final Map<String, List<String>> dimensionConfigNames = new HashMap<>();

private EventBus eventBus;

public ConfigCenterClient(ConfigCenterAddressManager addressManager, HttpTransport httpTransport) {
this.addressManager = addressManager;
this.httpTransport = httpTransport;
}

public void setEventBus(EventBus eventBus) {
this.eventBus = eventBus;
addressManager.setEventBus(eventBus);
}

Expand All @@ -88,6 +92,7 @@ public QueryConfigurationsResponse queryConfigurations(QueryConfigurationsReques
HttpRequest.GET);

HttpResponse httpResponse = httpTransport.doRequest(httpRequest);
recordAndSendUnAuthorizedEvent(httpResponse, address);
if (httpResponse.getStatusCode() == HttpStatus.SC_OK) {
Map<String, Map<String, Object>> allConfigMap = HttpUtils.deserialize(
httpResponse.getContent(),
Expand Down Expand Up @@ -121,21 +126,17 @@ public QueryConfigurationsResponse queryConfigurations(QueryConfigurationsReques
}
queryConfigurationsResponse.setConfigurations(configurations);
queryConfigurationsResponse.setChanged(true);
addressManager.recordSuccessState(address);
return queryConfigurationsResponse;
} else if (httpResponse.getStatusCode() == HttpStatus.SC_NOT_MODIFIED) {
queryConfigurationsResponse.setChanged(false);
addressManager.recordSuccessState(address);
return queryConfigurationsResponse;
} else if (httpResponse.getStatusCode() == HttpStatus.SC_TOO_MANY_REQUESTS) {
LOGGER.warn("rate limited, keep the local dimension [{}] configs unchanged.", dimensionsInfo);
queryConfigurationsResponse.setChanged(false);
addressManager.recordSuccessState(address);
return queryConfigurationsResponse;
} else if (httpResponse.getStatusCode() == HttpStatus.SC_BAD_REQUEST) {
throw new OperationException("Bad request for query configurations.");
} else {
addressManager.recordFailState(address);
throw new OperationException(
"read response failed. status:"
+ httpResponse.getStatusCode()
Expand All @@ -151,6 +152,16 @@ public QueryConfigurationsResponse queryConfigurations(QueryConfigurationsReques
}
}

private void recordAndSendUnAuthorizedEvent(HttpResponse response, String address) {
if (this.eventBus != null && response.getStatusCode() == HttpStatus.SC_UNAUTHORIZED) {
LOGGER.warn("query configuration unauthorized from server [{}], message [{}]", address, response.getMessage());
addressManager.recordFailState(address);
this.eventBus.post(new UnAuthorizedOperationEvent(address));
} else {
addressManager.recordSuccessState(address);
}
}

/**
* Only the name of the new configuration item is printed.
* No log is printed when the configuration content is updated.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.apache.servicecomb.http.client.common.HttpResponse;
import org.apache.servicecomb.http.client.common.HttpTransport;
import org.apache.servicecomb.http.client.common.HttpUtils;
import org.apache.servicecomb.http.client.event.OperationEvents.UnAuthorizedOperationEvent;
import org.apache.servicecomb.http.client.utils.ServiceCombServiceAvailableUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -71,13 +72,16 @@ public class KieClient implements KieConfigOperation {

private final Map<String, List<String>> dimensionConfigNames = new HashMap<>();

private EventBus eventBus;

public KieClient(KieAddressManager addressManager, HttpTransport httpTransport, KieConfiguration kieConfiguration) {
this.httpTransport = httpTransport;
this.addressManager = addressManager;
this.kieConfiguration = kieConfiguration;
}

public void setEventBus(EventBus eventBus) {
this.eventBus = eventBus;
addressManager.setEventBus(eventBus);
}

Expand All @@ -91,6 +95,7 @@ public ConfigurationsResponse queryConfigurations(ConfigurationsRequest request,

HttpRequest httpRequest = new HttpRequest(url, null, null, HttpRequest.GET);
HttpResponse httpResponse = httpTransport.doRequest(httpRequest);
recordAndSendUnAuthorizedEvent(httpResponse, address);
ConfigurationsResponse configurationsResponse = new ConfigurationsResponse();
if (httpResponse.getStatusCode() == HttpStatus.SC_OK) {
revision = httpResponse.getHeader("X-Kie-Revision");
Expand All @@ -100,24 +105,20 @@ public ConfigurationsResponse queryConfigurations(ConfigurationsRequest request,
configurationsResponse.setConfigurations(configurations);
configurationsResponse.setChanged(true);
configurationsResponse.setRevision(revision);
addressManager.recordSuccessState(address);
return configurationsResponse;
}
if (httpResponse.getStatusCode() == HttpStatus.SC_BAD_REQUEST) {
throw new OperationException("Bad request for query configurations.");
}
if (httpResponse.getStatusCode() == HttpStatus.SC_NOT_MODIFIED) {
configurationsResponse.setChanged(false);
addressManager.recordSuccessState(address);
return configurationsResponse;
}
if (httpResponse.getStatusCode() == HttpStatus.SC_TOO_MANY_REQUESTS) {
LOGGER.warn("rate limited, keep the local dimension [{}] configs unchanged.", request.getLabelsQuery());
configurationsResponse.setChanged(false);
addressManager.recordSuccessState(address);
return configurationsResponse;
}
addressManager.recordFailState(address);
throw new OperationException(
"read response failed. status:" + httpResponse.getStatusCode() + "; message:" +
httpResponse.getMessage() + "; content:" + httpResponse.getContent());
Expand All @@ -128,6 +129,16 @@ public ConfigurationsResponse queryConfigurations(ConfigurationsRequest request,
}
}

private void recordAndSendUnAuthorizedEvent(HttpResponse response, String address) {
if (this.eventBus != null && response.getStatusCode() == HttpStatus.SC_UNAUTHORIZED) {
LOGGER.warn("query configuration unauthorized from server [{}], message [{}]", address, response.getMessage());
addressManager.recordFailState(address);
this.eventBus.post(new UnAuthorizedOperationEvent(address));
} else {
addressManager.recordSuccessState(address);
}
}

/**
* Only the name of the new configuration item is printed.
* No log is printed when the configuration content is updated.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@

public class KieAddressManager extends AbstractAddressManager {

public KieAddressManager(List<String> addresses, EventBus eventBus) {
super(addresses);
public KieAddressManager(List<String> addresses, String ownRegion, String ownAvailableZone, EventBus eventBus) {
super(addresses, ownRegion, ownAvailableZone);
eventBus.register(this);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ class KieAddressManagerTest {
public void kieAddressManagerTest() throws NoSuchFieldException, IllegalAccessException {
addresses.add("http://127.0.0.1:30103");
addresses.add("https://127.0.0.2:30103");
addressManager1 = new KieAddressManager(addresses, new EventBus());
addressManager1 = new KieAddressManager(addresses, "", "", new EventBus());
Field addressManagerField = addressManager1.getClass().getSuperclass().getDeclaredField("index");
addressManagerField.setAccessible(true);
addressManagerField.set(addressManager1, 0);
Expand All @@ -64,7 +64,7 @@ public void onRefreshEndpointEvent() {
Map<String, List<String>> zoneAndRegion = new HashMap<>();
zoneAndRegion.put("sameZone", addressAZ);
zoneAndRegion.put("sameRegion", addressRG);
addressManager1 = new KieAddressManager(addresses, new EventBus());
addressManager1 = new KieAddressManager(addresses, "", "", new EventBus());
RefreshEndpointEvent event = new RefreshEndpointEvent(zoneAndRegion, "KIE");
addressManager1.refreshEndpoint(event, "KIE");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@

public class DashboardAddressManager extends AbstractAddressManager {

public DashboardAddressManager(List<String> addresses, EventBus eventBus) {
super(addresses);
public DashboardAddressManager(List<String> addresses, String ownRegion, String ownAvailableZone, EventBus eventBus) {
super(addresses, ownRegion, ownAvailableZone);
eventBus.register(this);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class AddressManagerTest {
public void kieAddressManagerTest() throws IllegalAccessException, NoSuchFieldException {
addresses.add("http://127.0.0.1:30103");
addresses.add("https://127.0.0.2:30103");
addressManager1 = new DashboardAddressManager(addresses, new EventBus());
addressManager1 = new DashboardAddressManager(addresses, "", "", new EventBus());
Field addressManagerField = addressManager1.getClass().getSuperclass().getDeclaredField("index");
addressManagerField.setAccessible(true);
addressManagerField.set(addressManager1, 0);
Expand All @@ -65,7 +65,7 @@ public void onRefreshEndpointEvent() {
Map<String, List<String>> zoneAndRegion = new HashMap<>();
zoneAndRegion.put("sameZone", addressAZ);
zoneAndRegion.put("sameRegion", addressRG);
addressManager1 = new DashboardAddressManager(addresses, new EventBus());
addressManager1 = new DashboardAddressManager(addresses, "", "", new EventBus());
RefreshEndpointEvent event = new RefreshEndpointEvent(zoneAndRegion, "CseMonitoring");
addressManager1.refreshEndpoint(event, "CseMonitoring");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,21 @@

package org.apache.servicecomb.http.client.common;

import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;

import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.servicecomb.http.client.event.EngineConnectChangedEvent;
import org.apache.servicecomb.http.client.event.RefreshEndpointEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.CollectionUtils;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.eventbus.EventBus;
Expand All @@ -42,6 +45,10 @@ public class AbstractAddressManager {

private static final String V3_PREFIX = "/v3/";

private static final String ZONE = "availableZone";

private static final String REGION = "region";

private static final int ISOLATION_THRESHOLD = 3;

private volatile List<String> addresses = new ArrayList<>();
Expand Down Expand Up @@ -74,17 +81,58 @@ public class AbstractAddressManager {

private EventBus eventBus;

public AbstractAddressManager(List<String> addresses) {
public AbstractAddressManager(List<String> addresses, String ownRegion, String ownAvailableZone) {
this.projectName = DEFAULT_PROJECT;
this.addresses.addAll(addresses);
this.defaultAddress.addAll(addresses);
parseAndInitAddresses(addresses, ownRegion, ownAvailableZone, false);
this.index = !addresses.isEmpty() ? getRandomIndex() : 0;
}

public AbstractAddressManager(String projectName, List<String> addresses) {
/**
* address support config with region/availableZone info, to enable engine affinity calls during startup
* address may be like:
* https://192.168.20.13:30110?region=region1&availableZone=az
* https://192.168.20.13:30100?region=region1&availableZone=az
* When address have no datacenter information, roundRobin using address
*
* @param addresses engine addresses
* @param ownRegion microservice region
* @param ownAvailableZone microservice zone
* @param isFormat is need format
*/
private void parseAndInitAddresses(List<String> addresses, String ownRegion, String ownAvailableZone,
boolean isFormat) {
if (CollectionUtils.isEmpty(addresses)) {
return;
}
List<String> tempList = new ArrayList<>();
addressAutoRefreshed = addresses.stream().anyMatch(addr -> addr.contains(ZONE) || addr.contains(REGION));
for (String address : addresses) {
// Compatible IpPortManager init address is 127.0.0.1:30100
if (!address.startsWith("http")) {
tempList.add(address);
continue;
}
URLEndPoint endpoint = new URLEndPoint(address);
tempList.add(endpoint.toString());
buildAffinityAddress(endpoint, ownRegion, ownAvailableZone);
}
this.addresses.addAll(isFormat ? this.transformAddress(tempList) : tempList);
this.defaultAddress.addAll(isFormat ? this.transformAddress(tempList) : tempList);
}

private void buildAffinityAddress(URLEndPoint endpoint, String ownRegion, String ownAvailableZone) {
if (addressAutoRefreshed) {
if (regionAndAZMatch(ownRegion, ownAvailableZone, endpoint.getFirst(REGION), endpoint.getFirst(ZONE))) {
availableZone.add(endpoint.toString());
} else {
availableRegion.add(endpoint.toString());
}
}
}

public AbstractAddressManager(String projectName, List<String> addresses, String ownRegion, String ownAvailableZone) {
this.projectName = StringUtils.isEmpty(projectName) ? DEFAULT_PROJECT : projectName;
this.addresses = this.transformAddress(addresses);
this.defaultAddress.addAll(addresses);
parseAndInitAddresses(addresses, ownRegion, ownAvailableZone, true);
this.index = !addresses.isEmpty() ? getRandomIndex() : 0;
}

Expand Down Expand Up @@ -170,8 +218,9 @@ private String getAvailableZoneAddress() {
return getCurrentAddress(zoneOrRegionAddress);
}
LOGGER.warn("all auto discovery addresses are isolation, please check server status.");

// when all available address are isolation, it will use config addresses for polling.
return getCurrentAddress(addresses);
return getDefaultAddress();
}

private String getCurrentAddress(List<String> addresses) {
Expand Down Expand Up @@ -221,6 +270,11 @@ public void resetFailureStatus(String address) {
addressFailureStatus.put(address, 0);
}

/**
* Only authentication failure, IO, and timeout exception record as failed.
*
* @param address request address
*/
public void recordFailState(String address) {
synchronized (lock) {
if (!addressFailureStatus.containsKey(address)) {
Expand Down Expand Up @@ -271,4 +325,41 @@ public List<String> getIsolationAddresses() {
isolationAddresses.addAll(isolationRegionAddress);
return isolationAddresses;
}

public String compareAndGetAddress(String host) {
for (String address : defaultAddress) {
if (isAddressHostSame(address, host)) {
return address;
}
}
return "";
}

private boolean isAddressHostSame(String address, String host) {
if (StringUtils.isEmpty(host)) {
return false;
}
try {
URI uri = new URI(address);
return host.equals(uri.getHost());
} catch (Exception e) {
LOGGER.warn("Exception occurred while constructing URI using the address [{}]", address);
}
return false;
}

private boolean regionAndAZMatch(String ownRegion, String ownAvailableZone, String engineRegion,
String engineAvailableZone) {
return ownRegion.equalsIgnoreCase(engineRegion) && ownAvailableZone.equals(engineAvailableZone);
}

public void refreshAffinityAddress(Set<String> sameZone, Set<String> sameRegion) {
addressAutoRefreshed = true;
if (!sameZone.isEmpty()) {
availableZone.addAll(sameZone);
}
if (!sameRegion.isEmpty()) {
availableRegion.addAll(sameRegion);
}
}
}
Loading
Loading