Skip to content

Commit b204ecd

Browse files
committed
feat:support rate-limit window expiration.
Signed-off-by: Haotian Zhang <928016560@qq.com>
1 parent b04d53b commit b204ecd

15 files changed

Lines changed: 1107 additions & 44 deletions

File tree

polaris-common/polaris-client/src/main/java/com/tencent/polaris/client/flow/BaseFlow.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import com.tencent.polaris.api.rpc.Criteria;
4343
import com.tencent.polaris.api.rpc.RequestBaseEntity;
4444
import com.tencent.polaris.api.utils.CollectionUtils;
45+
import com.tencent.polaris.api.utils.StringUtils;
4546
import com.tencent.polaris.client.util.Utils;
4647
import com.tencent.polaris.logging.LoggerFactory;
4748
import org.slf4j.Logger;
@@ -96,6 +97,9 @@ public static Instance commonGetOneInstance(Extensions extensions, ServiceKey se
9697
ServiceConfig serviceConfig = extensions.getConfiguration().getProvider().getService();
9798
RouteInfo routeInfo = new RouteInfo(
9899
null, null, dstSvcInfo, null, "", serviceConfig);
100+
if (StringUtils.isNotBlank(protocol)) {
101+
routeInfo.putRouterMetadata("metadataRoute", metadata);
102+
}
99103
ResourcesResponse resourcesResponse = BaseFlow
100104
.syncGetResources(extensions, false, provider, flowControlParam);
101105
LOG.debug("[ConnectionManager]success to discover service {}", svcEventKey);

polaris-common/polaris-client/src/main/java/com/tencent/polaris/client/remote/ServiceAddressRepository.java

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,7 @@
2424
import com.tencent.polaris.api.plugin.common.PluginTypes;
2525
import com.tencent.polaris.api.plugin.compose.Extensions;
2626
import com.tencent.polaris.api.plugin.loadbalance.LoadBalancer;
27-
import com.tencent.polaris.api.pojo.DefaultInstance;
28-
import com.tencent.polaris.api.pojo.DefaultServiceInstances;
29-
import com.tencent.polaris.api.pojo.Instance;
30-
import com.tencent.polaris.api.pojo.ServiceInstances;
31-
import com.tencent.polaris.api.pojo.ServiceKey;
27+
import com.tencent.polaris.api.pojo.*;
3228
import com.tencent.polaris.api.rpc.Criteria;
3329
import com.tencent.polaris.api.utils.CollectionUtils;
3430
import com.tencent.polaris.api.utils.IPAddressUtils;
@@ -190,11 +186,8 @@ public int nodeListSize() {
190186
}
191187

192188
private Instance getDiscoverInstance() throws PolarisException {
193-
Instance instance = BaseFlow.commonGetOneInstance(extensions, remoteCluster, routers, lbPolicy, protocol,
189+
return BaseFlow.commonGetOneInstance(extensions, remoteCluster, routers, lbPolicy, protocol,
194190
clientId);
195-
LOG.info("success to get instance for service {}, instance is {}:{}", remoteCluster, instance.getHost(),
196-
instance.getPort());
197-
return instance;
198191
}
199192

200193
@JustForTest

polaris-plugins/polaris-plugin-api/src/main/java/com/tencent/polaris/api/plugin/route/RouteInfo.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -256,6 +256,15 @@ public Map<String, String> getRouterMetadata(String routerType) {
256256
return Collections.unmodifiableMap(metadata);
257257
}
258258

259+
public void putRouterMetadata(String routerType, Map<String, String> metadata) {
260+
Map<String, String> tempMetadata = routerMetadata.get(routerType);
261+
if (tempMetadata == null || tempMetadata.isEmpty()) {
262+
tempMetadata = new HashMap<>();
263+
routerMetadata.put(routerType, tempMetadata);
264+
}
265+
tempMetadata.putAll(metadata);
266+
}
267+
259268
public void setRouterArguments(Map<String, Set<RouteArgument>> routerArguments) {
260269
Map<String, Map<String, String>> routerMetadata = this.routerMetadata;
261270

polaris-ratelimit/polaris-ratelimit-client/src/main/java/com/tencent/polaris/ratelimit/client/flow/AsyncRateLimitConnector.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,13 @@ public StreamCounterSet getStreamCounterSet(Extensions extensions, ServiceKey re
8484
}
8585
if (null != streamCounterSet) {
8686
//切换了节点,去掉初始化记录
87-
streamCounterSet.deleteInitRecord(serviceIdentifier);
87+
InitializeRecord removedRecord = streamCounterSet.deleteInitRecord(serviceIdentifier);
88+
if (removedRecord != null) {
89+
RateLimitWindow rateLimitWindow = removedRecord.getRateLimitWindow();
90+
uniqueKey = rateLimitWindow != null ? rateLimitWindow.getUniqueKey() : null;
91+
LOG.info("[getStreamCounterSet] host switched, and initRecord removed serviceIdentifier: {}, window "
92+
+ "{} {}", serviceIdentifier, rateLimitWindow, uniqueKey);
93+
}
8894
//切换了节点,老的不再使用
8995
if (streamCounterSet.decreaseReference()) {
9096
nodeToStream.remove(node);

polaris-ratelimit/polaris-ratelimit-client/src/main/java/com/tencent/polaris/ratelimit/client/flow/QuotaFlow.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,15 @@ public void init(Extensions extensions) throws PolarisException {
9292
FlowCache flowCache = extensions.getFlowCache();
9393
return flowCache.loadPluginCacheObject(API_ID, key, path -> TrieUtil.buildSimpleApiTrieNode((String) path));
9494
};
95+
rateLimitExtension.submitExpireJob(() -> {
96+
try {
97+
for (Map.Entry<ServiceKey, RateLimitWindowSet> entry : svcToWindowSet.entrySet()) {
98+
entry.getValue().cleanupContainers();
99+
}
100+
} catch (Throwable e) {
101+
LOG.error("Failed to cleanup expired rate limit window", e);
102+
}
103+
});
95104

96105
// init tsf rate limit master utils if need
97106
Map<String, String> metadata = rateLimitConfig.getMetadata();

polaris-ratelimit/polaris-ratelimit-client/src/main/java/com/tencent/polaris/ratelimit/client/flow/RateLimitExtension.java

Lines changed: 39 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,20 +25,22 @@
2525
import com.tencent.polaris.api.utils.StringUtils;
2626
import com.tencent.polaris.api.utils.ThreadPoolUtils;
2727
import com.tencent.polaris.client.util.NamedThreadFactory;
28+
import com.tencent.polaris.logging.LoggerFactory;
2829
import com.tencent.polaris.ratelimit.client.sync.RemoteSyncTask;
29-
import com.tencent.polaris.ratelimit.client.utils.RateLimitConstants;
3030
import com.tencent.polaris.specification.api.v1.traffic.manage.RateLimitProto;
31+
import org.slf4j.Logger;
3132

3233
import java.util.Collection;
3334
import java.util.HashMap;
3435
import java.util.Map;
35-
import java.util.Random;
3636
import java.util.concurrent.*;
3737

3838
import static com.tencent.polaris.api.plugin.ratelimiter.ServiceRateLimiter.*;
3939

4040
public class RateLimitExtension extends Destroyable {
4141

42+
private static final Logger LOG = LoggerFactory.getLogger(RateLimitExtension.class);
43+
4244
private final Extensions extensions;
4345

4446
private final Map<String, ServiceRateLimiter> rateLimiters = new HashMap<>();
@@ -113,9 +115,17 @@ private String getRateLimiterName(RateLimitProto.Rule.Resource resource, String
113115
* @param task 任务
114116
*/
115117
public void submitSyncTask(RemoteSyncTask task, long initialDelay, long delay) {
118+
if (scheduledTasks.containsKey(task.getWindow().getUniqueKey())) {
119+
LOG.warn("task has exist, ignore, task {}, window {}, uniqueKey {} ", task, task.getWindow(),
120+
task.getWindow().getUniqueKey());
121+
task.getWindow().setStatus(RateLimitWindow.WindowStatus.CREATED.ordinal());
122+
return;
123+
}
116124
ScheduledFuture<?> scheduledFuture = syncExecutor
117-
.scheduleWithFixedDelay(task, 0, delay, TimeUnit.MILLISECONDS);
125+
.scheduleWithFixedDelay(task, initialDelay, delay, TimeUnit.MILLISECONDS);
118126
scheduledTasks.put(task.getWindow().getUniqueKey(), scheduledFuture);
127+
LOG.info("submit sync task success, task {}, future {}, window {}, uniqueKey {} ", task, scheduledFuture,
128+
task.getWindow(), task.getWindow().getUniqueKey());
119129
}
120130

121131
private static final int EXPIRE_INTERVAL_SECOND = 5;
@@ -130,8 +140,33 @@ public void submitExpireJob(Runnable task) {
130140
.scheduleWithFixedDelay(task, EXPIRE_INTERVAL_SECOND, EXPIRE_INTERVAL_SECOND, TimeUnit.SECONDS);
131141
}
132142

133-
public void stopSyncTask(String uniqueKey) {
143+
/**
144+
* 停止同步任务
145+
*
146+
* @param uniqueKey 窗口唯一标识
147+
* @param window 限流窗口
148+
*/
149+
public void stopSyncTask(String uniqueKey, RateLimitWindow window) {
150+
// 从connector初始化列表清理
151+
Runnable cleanTask = () -> {
152+
try {
153+
AsyncRateLimitConnector connector = window.getWindowSet().getAsyncRateLimitConnector();
154+
ServiceIdentifier identifier = new ServiceIdentifier(window.getSvcKey().getService(),
155+
window.getSvcKey().getNamespace(), window.getLabels());
156+
StreamCounterSet streamCounterSet = connector.getStreamCounterSet(
157+
window.getWindowSet().getRateLimitExtension().getExtensions(),
158+
window.getRemoteCluster(), window.getServiceAddressRepository(), window.getUniqueKey(), identifier);
159+
if (streamCounterSet != null) {
160+
streamCounterSet.deleteInitRecord(identifier, window);
161+
}
162+
LOG.info("clean task run success, window {}", window);
163+
} catch (Throwable e) {
164+
LOG.error("clean task run failed, window {}", window.getUniqueKey(), e);
165+
}
166+
};
167+
syncExecutor.schedule(cleanTask, 10, TimeUnit.MILLISECONDS);
134168
ScheduledFuture<?> future = scheduledTasks.remove(uniqueKey);
169+
LOG.info("scheduledTasks remove uniqueKey {}, future {}", uniqueKey, future);
135170
if (null != future) {
136171
future.cancel(true);
137172
}

polaris-ratelimit/polaris-ratelimit-client/src/main/java/com/tencent/polaris/ratelimit/client/flow/RateLimitWindow.java

Lines changed: 37 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,14 @@
1818
package com.tencent.polaris.ratelimit.client.flow;
1919

2020
import com.tencent.polaris.api.config.consumer.LoadBalanceConfig;
21+
import com.tencent.polaris.api.config.consumer.ServiceRouterConfig;
2122
import com.tencent.polaris.api.config.provider.RateLimitConfig;
2223
import com.tencent.polaris.api.plugin.compose.Extensions;
2324
import com.tencent.polaris.api.plugin.ratelimiter.InitCriteria;
2425
import com.tencent.polaris.api.plugin.ratelimiter.QuotaBucket;
2526
import com.tencent.polaris.api.plugin.ratelimiter.QuotaResult;
2627
import com.tencent.polaris.api.plugin.ratelimiter.ServiceRateLimiter;
27-
import com.tencent.polaris.api.pojo.*;
28+
import com.tencent.polaris.api.pojo.ServiceKey;
2829
import com.tencent.polaris.api.utils.StringUtils;
2930
import com.tencent.polaris.client.flow.FlowControlParam;
3031
import com.tencent.polaris.client.remote.ServiceAddressRepository;
@@ -39,11 +40,12 @@
3940
import com.tencent.polaris.specification.api.v1.traffic.manage.RateLimitProto.Amount;
4041
import com.tencent.polaris.specification.api.v1.traffic.manage.RateLimitProto.RateLimitCluster;
4142
import com.tencent.polaris.specification.api.v1.traffic.manage.RateLimitProto.Rule;
42-
import java.util.Random;
4343
import org.slf4j.Logger;
4444

45+
import java.util.ArrayList;
4546
import java.util.List;
4647
import java.util.Objects;
48+
import java.util.Random;
4749
import java.util.concurrent.atomic.AtomicInteger;
4850
import java.util.concurrent.atomic.AtomicLong;
4951
import java.util.concurrent.atomic.AtomicReference;
@@ -97,6 +99,8 @@ public enum WindowStatus {
9799

98100
private final AtomicLong lastInitTimeMs = new AtomicLong();
99101

102+
private final AtomicLong lastSyncTimeMs = new AtomicLong();
103+
100104
// 执行正式分配的令牌桶
101105
private final QuotaBucket allocatingBucket;
102106

@@ -144,16 +148,18 @@ public RateLimitWindow(RateLimitWindowSet windowSet, CommonQuotaRequest quotaReq
144148
this.syncParam = quotaRequest.getFlowControlParam();
145149
remoteCluster = getLimiterClusterService(rule.getCluster(), rateLimitConfig);
146150
serviceAddressRepository = buildServiceAddressRepository(rateLimitConfig.getLimiterAddresses(),
147-
uniqueKey, windowSet.getExtensions(), remoteCluster, null, LoadBalanceConfig.LOAD_BALANCE_RING_HASH, "grpc");
151+
uniqueKey, windowSet.getExtensions(), remoteCluster);
148152
allocatingBucket = getQuotaBucket(initCriteria, windowSet.getRateLimitExtension());
149153
lastAccessTimeMs.set(System.currentTimeMillis());
150154
this.rateLimitConfig = rateLimitConfig;
151155
buildRemoteConfigMode();
152156
}
153157

154158
private ServiceAddressRepository buildServiceAddressRepository(List<String> addresses, String hash, Extensions extensions,
155-
ServiceKey remoteCluster, List<String> routers, String lbPolicy, String protocol) {
156-
return new ServiceAddressRepository(addresses, hash, extensions, remoteCluster, routers, lbPolicy, protocol);
159+
ServiceKey remoteCluster) {
160+
List<String> routers = new ArrayList<>();
161+
routers.add(ServiceRouterConfig.DEFAULT_ROUTER_METADATA);
162+
return new ServiceAddressRepository(addresses, hash, extensions, remoteCluster, routers, LoadBalanceConfig.LOAD_BALANCE_RING_HASH, "grpc");
157163
}
158164

159165

@@ -249,10 +255,12 @@ public void init() {
249255
}
250256
if (configMode == RateLimitConstants.CONFIG_QUOTA_LOCAL_MODE && !isTsfCluster) {
251257
//本地限流,则直接可用
258+
LOG.info("[RateLimitWindow] local window {} initiated", this);
252259
status.set(WindowStatus.INITIALIZED.ordinal());
253260
return;
254261
}
255262
//加入轮询队列,走异步调度
263+
LOG.info("[RateLimitWindow] remote window {} first init", this);
256264
if (rule.getMetadataMap().containsKey("limiter")
257265
&& StringUtils.equalsIgnoreCase("tsf", rule.getMetadataMap().get("limiter"))) {
258266
windowSet.getRateLimitExtension().submitSyncTask(new TsfRemoteSyncTask(this), 0L, 1000L);
@@ -270,8 +278,13 @@ public void unInit() {
270278
return;
271279
}
272280
status.set(WindowStatus.DELETED.ordinal());
281+
LOG.info("[RateLimitWindow] window {} {} is set to DELETED", uniqueKey, this);
273282
//从轮询队列中剔除
274-
windowSet.getRateLimitExtension().stopSyncTask(uniqueKey);
283+
if (configMode == RateLimitConstants.CONFIG_QUOTA_LOCAL_MODE) {
284+
return;
285+
}
286+
LOG.info("[RateLimitWindow] stopSyncTask( uniqueKey {}, window {} ) ", uniqueKey, this);
287+
windowSet.getRateLimitExtension().stopSyncTask(uniqueKey, this);
275288
}
276289
}
277290

@@ -301,16 +314,21 @@ public void returnQuota(CommonQuotaRequest request) {
301314

302315
/**
303316
* 窗口已经过期
317+
* TSF 设置为不过期
304318
*
305319
* @return boolean
306320
*/
307321
public boolean isExpired() {
308-
long curTimeMs = System.currentTimeMillis();
309-
boolean expired = curTimeMs - lastAccessTimeMs.get() > expireDurationMs;
310-
if (expired) {
311-
LOG.info("[RateLimit]window has expired, expireDurationMs {}, uniqueKey {}", expireDurationMs, uniqueKey);
322+
if (!isTsfCluster) {
323+
long curTimeMs = System.currentTimeMillis();
324+
boolean expired = curTimeMs - lastAccessTimeMs.get() > expireDurationMs;
325+
if (expired) {
326+
LOG.info("[RateLimit] window has expired, expireDurationMs {}, uniqueKey {}, window {}", expireDurationMs,
327+
uniqueKey, this);
328+
}
329+
return expired;
312330
}
313-
return expired;
331+
return false;
314332
}
315333

316334
public long getLastInitTimeMs() {
@@ -321,6 +339,14 @@ public void setLastInitTimeMs(long lastInitTimeMs) {
321339
this.lastInitTimeMs.set(lastInitTimeMs);
322340
}
323341

342+
public long getLastSyncTimeMs() {
343+
return lastSyncTimeMs.get();
344+
}
345+
346+
public void setLastSyncTimeMs(long lastSyncTimeMs) {
347+
this.lastSyncTimeMs.set(lastSyncTimeMs);
348+
}
349+
324350
/**
325351
* 获取当前窗口的状态
326352
*

polaris-ratelimit/polaris-ratelimit-client/src/main/java/com/tencent/polaris/ratelimit/client/flow/RateLimitWindowSet.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import java.util.Map;
3030
import java.util.Set;
3131
import java.util.concurrent.ConcurrentHashMap;
32+
import java.util.concurrent.atomic.AtomicInteger;
3233
import java.util.function.Function;
3334

3435
public class RateLimitWindowSet {
@@ -133,6 +134,26 @@ public void deleteRules(Set<String> rules) {
133134
}
134135
}
135136

137+
/**
138+
* 过期清理单个rule下所有WindowContainer
139+
*/
140+
public void cleanupContainers() {
141+
AtomicInteger rulesExpired = new AtomicInteger(0);
142+
windowByRule.entrySet().removeIf(entry -> {
143+
boolean expired = entry.getValue().checkAndCleanExpiredWindows();
144+
if (expired) {
145+
rulesExpired.incrementAndGet();
146+
LOG.info("[RateLimitWindowSet] rule {} for service {} has been expired, window container {}",
147+
entry.getKey(), serviceKey, entry.getValue());
148+
}
149+
return expired;
150+
});
151+
if (rulesExpired.get() > 0) {
152+
LOG.info("[RateLimitWindowSet] {} rules have been cleaned up due to expiration, service {}",
153+
rulesExpired, serviceKey);
154+
}
155+
}
156+
136157
public AsyncRateLimitConnector getAsyncRateLimitConnector() {
137158
return asyncRateLimitConnector;
138159
}

polaris-ratelimit/polaris-ratelimit-client/src/main/java/com/tencent/polaris/ratelimit/client/flow/StreamCounterSet.java

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,11 @@
1919

2020
import com.tencent.polaris.client.pojo.Node;
2121
import com.tencent.polaris.logging.LoggerFactory;
22+
import org.slf4j.Logger;
2223

2324
import java.util.concurrent.atomic.AtomicInteger;
2425
import java.util.concurrent.atomic.AtomicReference;
2526

26-
import org.slf4j.Logger;
27-
2827
/**
2928
* 计数器对象
3029
*/
@@ -97,11 +96,20 @@ public boolean decreaseReference() {
9796
return false;
9897
}
9998

100-
public void deleteInitRecord(ServiceIdentifier serviceIdentifier) {
99+
public InitializeRecord deleteInitRecord(ServiceIdentifier serviceIdentifier) {
100+
StreamResource streamResource = currentStreamResource.get();
101+
if (null != streamResource) {
102+
return streamResource.deleteInitRecord(serviceIdentifier);
103+
}
104+
return null;
105+
}
106+
107+
public InitializeRecord deleteInitRecord(ServiceIdentifier serviceIdentifier, RateLimitWindow window) {
101108
StreamResource streamResource = currentStreamResource.get();
102109
if (null != streamResource) {
103-
streamResource.deleteInitRecord(serviceIdentifier);
110+
return streamResource.deleteInitRecord(serviceIdentifier, window);
104111
}
112+
return null;
105113
}
106114

107115

0 commit comments

Comments
 (0)