-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathretry_utils.py
More file actions
318 lines (256 loc) · 10.8 KB
/
retry_utils.py
File metadata and controls
318 lines (256 loc) · 10.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
import time
import random
import asyncio
import logging
from functools import wraps
from typing import Callable, Type, Tuple, Optional, Any
from dataclasses import dataclass
logger = logging.getLogger(__name__)
@dataclass
class RetryConfig:
max_attempts: int = 3
base_delay: float = 1.0
max_delay: float = 60.0
exponential_base: float = 2.0
jitter: bool = True
retryable_exceptions: Tuple[Type[Exception], ...] = (Exception,)
class RetryError(Exception):
def __init__(self, message: str, attempts: int, last_exception: Exception):
super().__init__(message)
self.attempts = attempts
self.last_exception = last_exception
def retry_with_backoff(
config: Optional[RetryConfig] = None,
on_retry: Optional[Callable[[int, Exception], None]] = None
):
def decorator(func: Callable) -> Callable:
@wraps(func)
def wrapper(*args, **kwargs) -> Any:
nonlocal config
if config is None:
config = RetryConfig()
last_exception = None
for attempt in range(1, config.max_attempts + 1):
try:
return func(*args, **kwargs)
except config.retryable_exceptions as e:
last_exception = e
if attempt == config.max_attempts:
error_msg = str(last_exception) if last_exception and str(last_exception) else type(last_exception).__name__ if last_exception else "未知错误"
logger.error(f"函数 {func.__name__} 在 {attempt} 次尝试后失败: {error_msg}")
raise RetryError(
f"函数 {func.__name__} 在 {config.max_attempts} 次尝试后失败",
config.max_attempts,
last_exception
)
delay = _calculate_delay(attempt, config)
# 使用debug级别减少日志噪音,只在最终失败时输出error
error_msg = str(e) if str(e) else type(e).__name__
logger.debug(
f"函数 {func.__name__} 第 {attempt} 次尝试失败: {error_msg}. "
f"{delay:.2f}秒后重试..."
)
if on_retry:
on_retry(attempt, e)
time.sleep(delay)
raise RetryError(
f"函数 {func.__name__} 在 {config.max_attempts} 次尝试后失败",
config.max_attempts,
last_exception
)
return wrapper
return decorator
def _calculate_delay(attempt: int, config: RetryConfig) -> float:
delay = config.base_delay * (config.exponential_base ** (attempt - 1))
delay = min(delay, config.max_delay)
if config.jitter:
delay = delay * (0.5 + random.random() * 0.5)
return delay
class CircuitBreaker:
def __init__(
self,
failure_threshold: int = 5,
recovery_timeout: float = 60.0,
expected_exception: Type[Exception] = Exception
):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.expected_exception = expected_exception
self.failure_count = 0
self.last_failure_time = None
self.state = 'closed'
def __enter__(self):
if self.state == 'open':
if self._should_attempt_reset():
self.state = 'half-open'
logger.info("熔断器进入半开状态,尝试恢复")
else:
raise CircuitBreakerError("熔断器处于开启状态,拒绝请求")
return self
def __exit__(self, exc_type, exc_val, exc_tb):
if exc_type is not None and issubclass(exc_type, self.expected_exception):
self._on_failure()
else:
self._on_success()
return False
def _should_attempt_reset(self) -> bool:
if self.last_failure_time is None:
return True
return time.time() - self.last_failure_time >= self.recovery_timeout
def _on_failure(self):
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = 'open'
logger.warning(
f"熔断器开启:失败次数 {self.failure_count} "
f"达到阈值 {self.failure_threshold}"
)
def _on_success(self):
self.failure_count = 0
self.last_failure_time = None
if self.state == 'half-open':
self.state = 'closed'
logger.info("熔断器已恢复到关闭状态")
def get_state(self) -> str:
return self.state
def get_failure_count(self) -> int:
return self.failure_count
class CircuitBreakerError(Exception):
pass
def circuit_breaker(
failure_threshold: int = 5,
recovery_timeout: float = 60.0,
expected_exception: Type[Exception] = Exception
):
def decorator(func: Callable) -> Callable:
cb = CircuitBreaker(failure_threshold, recovery_timeout, expected_exception)
@wraps(func)
def wrapper(*args, **kwargs) -> Any:
try:
with cb:
return func(*args, **kwargs)
except CircuitBreakerError:
logger.error(f"函数 {func.__name__} 被熔断器拒绝")
raise
except Exception as e:
raise
wrapper.circuit_breaker = cb
return wrapper
return decorator
# 异步版本的重试机制
def async_retry_with_backoff(
config: Optional[RetryConfig] = None,
on_retry: Optional[Callable[[int, Exception], None]] = None
):
def decorator(func: Callable) -> Callable:
@wraps(func)
async def wrapper(*args, **kwargs) -> Any:
nonlocal config
if config is None:
config = RetryConfig()
last_exception = None
for attempt in range(1, config.max_attempts + 1):
try:
return await func(*args, **kwargs)
except config.retryable_exceptions as e:
last_exception = e
if attempt == config.max_attempts:
error_msg = str(last_exception) if last_exception and str(last_exception) else type(last_exception).__name__ if last_exception else "未知错误"
logger.error(f"异步函数 {func.__name__} 在 {attempt} 次尝试后失败: {error_msg}")
raise RetryError(
f"异步函数 {func.__name__} 在 {config.max_attempts} 次尝试后失败",
config.max_attempts,
last_exception
)
delay = _calculate_delay(attempt, config)
# 使用debug级别减少日志噪音,只在最终失败时输出error
error_msg = str(e) if str(e) else type(e).__name__
logger.debug(
f"异步函数 {func.__name__} 第 {attempt} 次尝试失败: {error_msg}. "
f"{delay:.2f}秒后重试..."
)
if on_retry:
on_retry(attempt, e)
await asyncio.sleep(delay)
raise RetryError(
f"异步函数 {func.__name__} 在 {config.max_attempts} 次尝试后失败",
config.max_attempts,
last_exception
)
return wrapper
return decorator
# 异步版本的熔断器
class AsyncCircuitBreaker:
def __init__(
self,
failure_threshold: int = 5,
recovery_timeout: float = 60.0,
expected_exception: Type[Exception] = Exception
):
self.failure_threshold = failure_threshold
self.recovery_timeout = recovery_timeout
self.expected_exception = expected_exception
self.failure_count = 0
self.last_failure_time = None
self.state = 'closed'
self._lock = asyncio.Lock()
async def __aenter__(self):
async with self._lock:
if self.state == 'open':
if self._should_attempt_reset():
self.state = 'half-open'
logger.info("异步熔断器进入半开状态,尝试恢复")
else:
raise CircuitBreakerError("异步熔断器处于开启状态,拒绝请求")
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
async with self._lock:
if exc_type is not None and issubclass(exc_type, self.expected_exception):
await self._on_failure()
else:
await self._on_success()
return False
def _should_attempt_reset(self) -> bool:
if self.last_failure_time is None:
return True
return time.time() - self.last_failure_time >= self.recovery_timeout
async def _on_failure(self):
self.failure_count += 1
self.last_failure_time = time.time()
if self.failure_count >= self.failure_threshold:
self.state = 'open'
logger.warning(
f"异步熔断器开启:失败次数 {self.failure_count} "
f"达到阈值 {self.failure_threshold}"
)
async def _on_success(self):
self.failure_count = 0
self.last_failure_time = None
if self.state == 'half-open':
self.state = 'closed'
logger.info("异步熔断器已恢复到关闭状态")
def get_state(self) -> str:
return self.state
def get_failure_count(self) -> int:
return self.failure_count
def async_circuit_breaker(
failure_threshold: int = 5,
recovery_timeout: float = 60.0,
expected_exception: Type[Exception] = Exception
):
def decorator(func: Callable) -> Callable:
cb = AsyncCircuitBreaker(failure_threshold, recovery_timeout, expected_exception)
@wraps(func)
async def wrapper(*args, **kwargs) -> Any:
try:
async with cb:
return await func(*args, **kwargs)
except CircuitBreakerError:
logger.error(f"异步函数 {func.__name__} 被熔断器拒绝")
raise
except Exception as e:
raise
wrapper.circuit_breaker = cb
return wrapper
return decorator