forked from heptaFinger/adata_test
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtest_rate_limit_fix.py
More file actions
148 lines (116 loc) · 4.54 KB
/
test_rate_limit_fix.py
File metadata and controls
148 lines (116 loc) · 4.54 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
# -*- coding: utf-8 -*-
"""
测试频率限制器修复效果
"""
import time
import threading
from adata.common import set_rate_limit, set_default_rate_limit
from adata.common.utils import requests
def test_timeout_mechanism():
"""测试超时机制"""
print("=== 测试超时机制 ===")
# 设置非常严格的限制(1次/分钟)
test_domain = "httpbin.org"
set_rate_limit(test_domain, 1)
print(f"设置 {test_domain} 每分钟限制1次请求")
url = "https://httpbin.org/get"
# 第一次请求应该成功
print("\n1. 第一次请求...")
start = time.time()
try:
res = requests.request('get', url, timeout=5)
print(f"✓ 请求成功,状态码: {res.status_code}")
except Exception as e:
print(f"✗ 请求失败: {e}")
print(f"耗时: {time.time() - start:.2f}s")
# 第二次请求应该超时(因为限制了1次/分钟)
print("\n2. 第二次请求(应该超时)...")
start = time.time()
try:
res = requests.request('get', url, timeout=5, max_wait_time=2)
print(f"✗ 预期应该超时,但请求成功了,状态码: {res.status_code}")
except Exception as e:
print(f"✓ 预期的超时异常: {type(e).__name__}")
print(f"耗时: {time.time() - start:.2f}s")
print()
def test_concurrent_requests():
"""测试并发请求(验证死锁修复)"""
print("=== 测试并发请求 ===")
test_domain = "httpbin.org"
set_rate_limit(test_domain, 5) # 每分钟5次
print(f"设置 {test_domain} 每分钟限制5次请求")
url = "https://httpbin.org/get"
results = []
def worker(i):
try:
start = time.time()
res = requests.request('get', url, timeout=10, max_wait_time=10)
results.append((i, True, res.status_code, time.time() - start))
except Exception as e:
results.append((i, False, str(e), time.time() - start))
# 启动10个并发请求
threads = []
for i in range(10):
t = threading.Thread(target=worker, args=(i,))
threads.append(t)
t.start()
# 等待所有线程完成
for t in threads:
t.join()
# 统计结果
success_count = sum(1 for _, success, _, _ in results if success)
print(f"\n10个并发请求结果:")
print(f"成功: {success_count}, 失败: {10 - success_count}")
for i, success, result, duration in results:
if success:
print(f"请求 {i}: 成功,状态码={result},耗时={duration:.2f}s")
else:
print(f"请求 {i}: 失败,原因={result},耗时={duration:.2f}s")
print()
def test_backoff_strategy():
"""测试指数退避策略"""
print("=== 测试指数退避策略 ===")
test_domain = "httpbin.org"
set_rate_limit(test_domain, 2) # 每分钟2次
print(f"设置 {test_domain} 每分钟限制2次请求")
url = "https://httpbin.org/get"
# 连续发送3次请求
for i in range(3):
start = time.time()
try:
res = requests.request('get', url, timeout=10, max_wait_time=5)
print(f"请求 {i+1}: 成功,状态码={res.status_code},耗时={time.time() - start:.2f}s")
except Exception as e:
print(f"请求 {i+1}: 失败,原因={e},耗时={time.time() - start:.2f}s")
time.sleep(0.1) # 小延迟,避免并发问题
print()
def test_default_settings():
"""测试默认设置"""
print("=== 测试默认设置 ===")
# 恢复默认设置
set_default_rate_limit(30)
print("恢复默认频率限制:每分钟30次")
url = "https://httpbin.org/get"
# 快速发送5次请求
start = time.time()
for i in range(5):
try:
res = requests.request('get', url, timeout=5)
print(f"请求 {i+1}: 成功,状态码={res.status_code}")
except Exception as e:
print(f"请求 {i+1}: 失败,原因={e}")
print(f"5次请求总耗时: {time.time() - start:.2f}s")
print()
if __name__ == "__main__":
print("频率限制器修复测试\n")
# 测试1: 超时机制
test_timeout_mechanism()
# 测试2: 并发请求(验证死锁修复)
print("注意:接下来的测试会模拟并发请求,验证死锁修复...")
time.sleep(1)
test_concurrent_requests()
# 测试3: 指数退避策略
test_backoff_strategy()
# 测试4: 默认设置
test_default_settings()
print("=== 所有测试完成 ===")