-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathapi_client.py
More file actions
179 lines (149 loc) · 7.19 KB
/
api_client.py
File metadata and controls
179 lines (149 loc) · 7.19 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
"""
Модуль для работы с API получения цен активов с асинхронными запросами и кэшированием
"""
import asyncio
import logging
import time
from typing import Optional, Dict
from datetime import datetime, timedelta
import httpx
from config import COINGECKO_API_URL, COINGECKO_BTC_ID, COINGECKO_GOLD_ID
logger = logging.getLogger(__name__)
# Время кэширования в секундах (5 минут)
CACHE_TTL = 300
class PriceAPI:
"""Класс для получения цен через CoinGecko API с асинхронными запросами и кэшированием"""
def __init__(self):
"""Инициализация API клиента"""
self.cache: Dict[str, Dict] = {} # {asset: {'price': float, 'timestamp': float}}
self.client: Optional[httpx.AsyncClient] = None
self._lock = asyncio.Lock()
async def _get_client(self) -> httpx.AsyncClient:
"""Получить или создать асинхронный HTTP клиент"""
if self.client is None:
self.client = httpx.AsyncClient(
timeout=httpx.Timeout(10.0, connect=5.0),
limits=httpx.Limits(max_keepalive_connections=5, max_connections=10)
)
return self.client
async def close(self):
"""Закрыть HTTP клиент"""
if self.client:
await self.client.aclose()
self.client = None
def _is_cache_valid(self, asset: str) -> bool:
"""Проверить валидность кэша для актива"""
if asset not in self.cache:
return False
cache_entry = self.cache[asset]
cache_age = time.time() - cache_entry['timestamp']
return cache_age < CACHE_TTL
def _get_cached_price(self, asset: str) -> Optional[float]:
"""Получить цену из кэша если она валидна"""
if self._is_cache_valid(asset):
logger.debug(f"Цена {asset} получена из кэша")
return self.cache[asset]['price']
return None
def _set_cached_price(self, asset: str, price: float):
"""Сохранить цену в кэш"""
self.cache[asset] = {
'price': price,
'timestamp': time.time()
}
logger.debug(f"Цена {asset} сохранена в кэш")
async def get_btc_price(self) -> Optional[float]:
"""Получить текущую цену Bitcoin в USD (асинхронно)"""
asset = 'btc'
# Проверяем кэш
cached_price = self._get_cached_price(asset)
if cached_price is not None:
return cached_price
try:
client = await self._get_client()
url = f"{COINGECKO_API_URL}/simple/price"
params = {
'ids': COINGECKO_BTC_ID,
'vs_currencies': 'usd'
}
async with self._lock:
response = await client.get(url, params=params)
response.raise_for_status()
data = response.json()
price = data.get(COINGECKO_BTC_ID, {}).get('usd')
if price:
self._set_cached_price(asset, price)
logger.info(f"Цена BTC получена из API: ${price:,.2f}")
return price
else:
logger.warning("Цена BTC не найдена в ответе API")
return None
except httpx.TimeoutException:
logger.error("Таймаут при получении цены BTC")
return None
except httpx.HTTPStatusError as e:
logger.error(f"HTTP ошибка при получении цены BTC: {e.response.status_code}")
return None
except Exception as e:
logger.error(f"Ошибка при получении цены BTC: {e}", exc_info=True)
return None
async def get_gold_price(self) -> Optional[float]:
"""Получить текущую цену золота в USD за унцию (асинхронно)"""
asset = 'gold'
# Проверяем кэш
cached_price = self._get_cached_price(asset)
if cached_price is not None:
return cached_price
try:
client = await self._get_client()
url = f"{COINGECKO_API_URL}/simple/price"
params = {
'ids': COINGECKO_GOLD_ID,
'vs_currencies': 'usd'
}
async with self._lock:
response = await client.get(url, params=params)
response.raise_for_status()
data = response.json()
price = data.get(COINGECKO_GOLD_ID, {}).get('usd')
if price:
self._set_cached_price(asset, price)
logger.info(f"Цена Gold получена из API: ${price:,.2f}")
return price
else:
logger.warning("Цена Gold не найдена в ответе API")
return None
except httpx.TimeoutException:
logger.error("Таймаут при получении цены Gold")
return None
except httpx.HTTPStatusError as e:
logger.error(f"HTTP ошибка при получении цены Gold: {e.response.status_code}")
return None
except Exception as e:
logger.error(f"Ошибка при получении цены Gold: {e}", exc_info=True)
return None
async def get_price(self, asset: str) -> Optional[float]:
"""Получить цену актива по его названию (асинхронно)"""
asset_lower = asset.lower()
if asset_lower == 'btc' or asset_lower == 'bitcoin':
return await self.get_btc_price()
elif asset_lower == 'gold' or asset_lower == 'золото':
return await self.get_gold_price()
else:
logger.warning(f"Неподдерживаемый актив: {asset}")
return None
async def get_prices_batch(self, assets: list) -> Dict[str, Optional[float]]:
"""Получить цены для нескольких активов одновременно (асинхронно)"""
tasks = [self.get_price(asset) for asset in assets]
prices = await asyncio.gather(*tasks, return_exceptions=True)
result = {}
for asset, price in zip(assets, prices):
if isinstance(price, Exception):
logger.error(f"Ошибка при получении цены {asset}: {price}")
result[asset] = None
else:
result[asset] = price
return result
def clear_cache(self):
"""Очистить кэш цен"""
self.cache.clear()
logger.info("Кэш цен очищен")