|
1 | 1 | import importlib |
2 | | -from socket import socket |
3 | | -from typing import Optional |
| 2 | +import re |
| 3 | +from collections import OrderedDict |
| 4 | +from dataclasses import dataclass |
| 5 | +import time |
4 | 6 |
|
5 | 7 | from ._internal_logging import ( |
6 | 8 | InternalLogger, |
@@ -72,51 +74,190 @@ def __next__(self): |
72 | 74 | return host |
73 | 75 |
|
74 | 76 |
|
| 77 | +# --- Simple LRU Cache Implementation --- |
| 78 | + |
| 79 | + |
| 80 | +@dataclass |
| 81 | +class CacheEntry: |
| 82 | + data: bytes |
| 83 | + etag: str |
| 84 | + expires_at: float |
| 85 | + url: str # The full URL from the successful response |
| 86 | + |
| 87 | + |
| 88 | +class LRUCache: |
| 89 | + def __init__(self, max_size: int): |
| 90 | + self.max_size = max_size |
| 91 | + self.cache = OrderedDict() |
| 92 | + |
| 93 | + def get(self, key): |
| 94 | + try: |
| 95 | + value = self.cache.pop(key) |
| 96 | + self.cache[key] = value # Mark as recently used. |
| 97 | + return value |
| 98 | + except KeyError: |
| 99 | + return None |
| 100 | + |
| 101 | + def set(self, key, value): |
| 102 | + if key in self.cache: |
| 103 | + self.cache.pop(key) |
| 104 | + elif len(self.cache) >= self.max_size: |
| 105 | + self.cache.popitem(last=False) |
| 106 | + self.cache[key] = value |
| 107 | + |
| 108 | + def clear(self): |
| 109 | + self.cache.clear() |
| 110 | + |
| 111 | + def __len__(self): |
| 112 | + return len(self.cache) |
| 113 | + |
| 114 | + |
75 | 115 | class ApiClient: |
76 | 116 | def __init__(self, options): |
| 117 | + """ |
| 118 | + :param options: An object with attributes such as: |
| 119 | + - prefab_api_urls: list of API host URLs (e.g. ["https://a.example.com", "https://b.example.com"]) |
| 120 | + - version: version string |
| 121 | + """ |
77 | 122 | self.hosts = options.prefab_api_urls |
78 | 123 | self.session = requests.Session() |
79 | | - self.session.mount("https://", NoRetryAdapter()) |
80 | | - self.session.mount("http://", NoRetryAdapter()) |
81 | | - self.session.headers.update({VersionHeader: f"prefab-cloud-python-{Version}"}) |
| 124 | + self.session.mount("https://", requests.adapters.HTTPAdapter()) |
| 125 | + self.session.mount("http://", requests.adapters.HTTPAdapter()) |
| 126 | + self.session.headers.update( |
| 127 | + { |
| 128 | + "X-PrefabCloud-Client-Version": f"prefab-cloud-python-{getattr(options, 'version', 'development')}" |
| 129 | + } |
| 130 | + ) |
| 131 | + # Initialize a cache (here with a maximum of 2 entries). |
| 132 | + self.cache = LRUCache(max_size=2) |
82 | 133 |
|
83 | 134 | def get_host(self, attempt_number, host_list): |
84 | 135 | return host_list[attempt_number % len(host_list)] |
85 | 136 |
|
| 137 | + def _get_attempt_number(self) -> int: |
| 138 | + """ |
| 139 | + Retrieve the current attempt number from tenacity's statistics if available, |
| 140 | + otherwise default to 1. |
| 141 | + """ |
| 142 | + stats = getattr(self.resilient_request, "statistics", None) |
| 143 | + if stats is None: |
| 144 | + return 1 |
| 145 | + return stats.get("attempt_number", 1) |
| 146 | + |
| 147 | + def _build_url(self, path, hosts: list[str] = None) -> str: |
| 148 | + """ |
| 149 | + Build the full URL using host-selection logic. |
| 150 | + """ |
| 151 | + attempt_number = self._get_attempt_number() |
| 152 | + host = self.get_host(attempt_number - 1, hosts or self.hosts) |
| 153 | + return f"{host.rstrip('/')}/{path.lstrip('/')}" |
| 154 | + |
| 155 | + def _get_cached_response(self, url: str) -> Response: |
| 156 | + """ |
| 157 | + If a valid cache entry exists for the given URL, return a synthetic Response. |
| 158 | + """ |
| 159 | + now = time.time() |
| 160 | + entry = self.cache.get(url) |
| 161 | + if entry is not None and entry.expires_at > now: |
| 162 | + resp = Response() |
| 163 | + resp._content = entry.data |
| 164 | + resp.status_code = 200 |
| 165 | + resp.headers = {"ETag": entry.etag, "X-Cache": "HIT"} |
| 166 | + resp.url = entry.url |
| 167 | + return resp |
| 168 | + return None |
| 169 | + |
| 170 | + def _apply_cache_headers(self, url: str, kwargs: dict) -> dict: |
| 171 | + """ |
| 172 | + If a stale cache entry exists, add its ETag as an 'If-None-Match' header. |
| 173 | + """ |
| 174 | + entry = self.cache.get(url) |
| 175 | + headers = kwargs.get("headers", {}).copy() |
| 176 | + if entry is not None and entry.etag: |
| 177 | + headers["If-None-Match"] = entry.etag |
| 178 | + kwargs["headers"] = headers |
| 179 | + return kwargs |
| 180 | + |
| 181 | + def _update_cache(self, url: str, response: Response) -> None: |
| 182 | + """ |
| 183 | + If the response is cacheable (status 200, and Cache-Control does not include 'no-store'), |
| 184 | + update the cache. If Cache-Control includes 'no-cache', mark the cache entry as immediately expired, |
| 185 | + so that subsequent requests always trigger revalidation. |
| 186 | + """ |
| 187 | + cache_control = response.headers.get("Cache-Control", "") |
| 188 | + if "no-store" in cache_control.lower(): |
| 189 | + return |
| 190 | + |
| 191 | + etag = response.headers.get("ETag") |
| 192 | + max_age = 0 |
| 193 | + m = re.search(r"max-age=(\d+)", cache_control) |
| 194 | + if m: |
| 195 | + max_age = int(m.group(1)) |
| 196 | + |
| 197 | + # If 'no-cache' is present, then even though we may store the response, |
| 198 | + # we treat it as expired immediately so that every subsequent request is revalidated. |
| 199 | + if "no-cache" in cache_control.lower(): |
| 200 | + expires_at = time.time() # Immediately expired. |
| 201 | + else: |
| 202 | + expires_at = time.time() + max_age if max_age > 0 else 0 |
| 203 | + |
| 204 | + if (etag is not None or max_age > 0) and expires_at > time.time(): |
| 205 | + self.cache.set( |
| 206 | + url, |
| 207 | + CacheEntry( |
| 208 | + data=response.content, |
| 209 | + etag=etag, |
| 210 | + expires_at=expires_at, |
| 211 | + url=response.url, |
| 212 | + ), |
| 213 | + ) |
| 214 | + response.headers["X-Cache"] = "MISS" |
| 215 | + |
| 216 | + def _send_request(self, method: str, url: str, **kwargs) -> Response: |
| 217 | + """ |
| 218 | + Hook method to perform the actual HTTP request. |
| 219 | + """ |
| 220 | + return self.session.request(method, url, **kwargs) |
| 221 | + |
86 | 222 | @retry( |
87 | 223 | stop=stop_after_delay(8), |
88 | 224 | wait=wait_exponential(multiplier=1, min=0.05, max=2), |
89 | 225 | retry=retry_if_exception_type((RequestException, ConnectionError, OSError)), |
90 | 226 | ) |
91 | 227 | def resilient_request( |
92 | | - self, path, method="GET", hosts: Optional[list[str]] = None, **kwargs |
| 228 | + self, |
| 229 | + path, |
| 230 | + method="GET", |
| 231 | + allow_cache: bool = False, |
| 232 | + hosts: list[str] = None, |
| 233 | + **kwargs, |
93 | 234 | ) -> Response: |
94 | | - # Get the current attempt number from tenacity's context |
95 | | - attempt_number = self.resilient_request.statistics["attempt_number"] |
96 | | - host = self.get_host( |
97 | | - attempt_number - 1, hosts or self.hosts |
98 | | - ) # Subtract 1 because attempt_number starts at 1 |
99 | | - url = f"{host.rstrip('/')}/{path.lstrip('/')}" |
| 235 | + """ |
| 236 | + Makes a resilient (retrying) request. |
100 | 237 |
|
101 | | - try: |
102 | | - logger.info(f"Attempt {attempt_number}: Requesting {url}") |
103 | | - response = self.session.request(method, url, **kwargs) |
104 | | - response.raise_for_status() |
105 | | - logger.info(f"Attempt {attempt_number}: Successful request to {url}") |
106 | | - return response |
107 | | - except (RequestException, ConnectionError) as e: |
108 | | - logger.warning( |
109 | | - f"Attempt {attempt_number}: Request to {url} failed: {str(e)}. Will retry" |
110 | | - ) |
111 | | - raise |
112 | | - except OSError as e: |
113 | | - if isinstance(e, socket.gaierror): |
114 | | - logger.warning( |
115 | | - f"Attempt {attempt_number}: DNS resolution failed for {url}: {str(e)}. Will retry" |
116 | | - ) |
117 | | - raise |
118 | | - else: |
119 | | - logger.error( |
120 | | - f"Attempt {attempt_number}: Non-retryable error occurred: {str(e)}" |
121 | | - ) |
122 | | - raise |
| 238 | + If allow_cache is True and the request method is GET, caching logic is applied. |
| 239 | + This includes: |
| 240 | + - Checking the cache and returning a synthetic response if valid. |
| 241 | + - Adding an 'If-None-Match' header when a stale entry exists. |
| 242 | + - Handling a 304 (Not Modified) response by returning the cached entry. |
| 243 | + - Caching a 200 response if Cache-Control permits. |
| 244 | + """ |
| 245 | + url = self._build_url(path, hosts) |
| 246 | + if method.upper() == "GET" and allow_cache: |
| 247 | + cached = self._get_cached_response(url) |
| 248 | + if cached: |
| 249 | + return cached |
| 250 | + kwargs = self._apply_cache_headers(url, kwargs) |
| 251 | + response = self._send_request(method, url, **kwargs) |
| 252 | + if method.upper() == "GET" and allow_cache: |
| 253 | + if response.status_code == 304: |
| 254 | + cached = self.cache.get(url) |
| 255 | + if cached: |
| 256 | + resp = Response() |
| 257 | + resp._content = cached.data |
| 258 | + resp.status_code = 200 |
| 259 | + resp.headers = {"ETag": cached.etag, "X-Cache": "HIT"} |
| 260 | + resp.url = cached.url |
| 261 | + return resp |
| 262 | + self._update_cache(url, response) |
| 263 | + return response |
0 commit comments