Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ jobs:
- name: Checkout
uses: actions/checkout@8e8c483db84b4bee98b60c0593521ed34d9990e8 # v6.0.1

- uses: CVector-Energy/python-test@main
- uses: CVector-Energy/python-test@uv
with:
python-version: ${{ matrix.python-version }}
src-dirs: .
17 changes: 17 additions & 0 deletions .github/workflows/license-check-python.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
name: Python License Check

on:
push:

jobs:
license-check:
runs-on: ubuntu-24.04

steps:
- uses: actions/checkout@1af3b93b6815bc44a9784bd300feb67ff0d1eeb3 # v6.0.0

- name: Check Python licenses
uses: CVector-Energy/pyproject-license-check@main
with:
app-id: ${{ vars.APP_ID }}
app-private-key: ${{ secrets.APP_PRIVATE_KEY }}
3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
[project]
name = "cvec"
version = "1.4.3"
version = "1.5.0"
description = "SDK for CVector Energy"
authors = [{ name = "CVector", email = "support@cvector.energy" }]
readme = "README.md"
requires-python = ">=3.10"
dependencies = [
"pydantic>=2.12.0",
"pyarrow>=22.0.0",
"brotli>=1.2.0,<2",
]
license = "MIT"
license-files = ['LICENSE']
Expand Down
90 changes: 78 additions & 12 deletions src/cvec/cvec.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,19 @@
import gzip
import json
import logging
import os
import time
import zlib

import brotli # type: ignore[import-untyped]
from datetime import datetime
from typing import Any, Dict, List, Optional
from urllib.error import HTTPError, URLError
from urllib.parse import urlencode, urljoin
from urllib.request import Request, urlopen

from cvec.http_cache import MAX_CACHE_ENTRIES, CacheEntry, parse_max_age

from cvec.models.agent_post import AgentPost, AgentPostRecommendation, AgentPostTag
from cvec.models.eav_column import EAVColumn
from cvec.models.eav_filter import EAVFilter
Expand Down Expand Up @@ -53,6 +60,9 @@ def __init__(
self._publishable_key = None
self._api_key = api_key or os.environ.get("CVEC_API_KEY")

# HTTP cache for GET requests
self._cache: Dict[str, CacheEntry] = {}

if not self.host:
raise ValueError(
"CVEC_HOST must be set either as an argument or environment variable"
Expand Down Expand Up @@ -105,8 +115,58 @@ def _get_headers(self) -> Dict[str, str]:
"Authorization": f"Bearer {self._access_token}",
"Content-Type": "application/json",
"Accept": "application/json",
"Accept-Encoding": "br, gzip, deflate",
}

@staticmethod
def _read_response(response: Any) -> tuple[bytes, str]:
"""Read and decompress response body.

Returns:
Tuple of (decompressed data, content type)
"""
raw = response.read()
encoding = response.headers.get("Content-Encoding", "")
if encoding == "br":
raw = brotli.decompress(raw)
elif encoding == "gzip":
raw = gzip.decompress(raw)
elif encoding == "deflate":
raw = zlib.decompress(raw)
content_type: str = response.headers.get("content-type", "")
return raw, content_type

@staticmethod
def _parse_response_body(response_data: bytes, content_type: str) -> Any:
"""Parse response body based on content type."""
if content_type == "application/vnd.apache.arrow.stream":
return response_data
return json.loads(response_data.decode("utf-8"))

def _process_response(self, response: Any, url: str, method: str) -> Any:
"""Read, decompress, parse, and optionally cache a response."""
response_data, content_type = self._read_response(response)
parsed = self._parse_response_body(response_data, content_type)

if method == "GET":
cache_control = response.headers.get("Cache-Control", "")
max_age = parse_max_age(cache_control)
if max_age is not None:
if url not in self._cache and len(self._cache) >= MAX_CACHE_ENTRIES:
worst_url = min(
self._cache, key=lambda u: self._cache[u].expires_at
)
del self._cache[worst_url]
etag = response.headers.get("ETag", "") or None
self._cache[url] = CacheEntry(
data=parsed,
etag=etag,
max_age=max_age,
stored_at=time.monotonic(),
)

return parsed

def _make_request(
self,
method: str,
Expand All @@ -124,6 +184,17 @@ def _make_request(
if filtered_params:
url = f"{url}?{urlencode(filtered_params)}"

# Check cache for GET requests
if method == "GET" and url in self._cache:
entry = self._cache[url]
if time.monotonic() - entry.stored_at < entry.max_age:
return entry.data
# Stale entry with ETag: use conditional request
if entry.etag:
if headers is None:
headers = {}
headers["If-None-Match"] = entry.etag

request_headers = self._get_headers()
if headers:
request_headers.update(headers)
Expand All @@ -140,16 +211,16 @@ def make_http_request() -> Any:
url, data=request_body, headers=request_headers, method=method
)
with urlopen(req) as response:
response_data = response.read()
content_type = response.headers.get("content-type", "")

if content_type == "application/vnd.apache.arrow.stream":
return response_data
return json.loads(response_data.decode("utf-8"))
return self._process_response(response, url, method)

try:
return make_http_request()
except HTTPError as e:
# Handle 304 Not Modified
if e.code == 304 and method == "GET" and url in self._cache:
entry = self._cache[url]
entry.stored_at = time.monotonic()
return entry.data
# Handle 401 Unauthorized with token refresh
if e.code == 401 and self._access_token and self._refresh_token:
try:
Expand All @@ -164,12 +235,7 @@ def make_http_request() -> Any:
url, data=request_body, headers=request_headers, method=method
)
with urlopen(req) as response:
response_data = response.read()
content_type = response.headers.get("content-type", "")

if content_type == "application/vnd.apache.arrow.stream":
return response_data
return json.loads(response_data.decode("utf-8"))
return self._process_response(response, url, method)
except (HTTPError, URLError, ValueError, KeyError) as refresh_error:
logger.warning(
"Token refresh failed, continuing with original request: %s",
Expand Down
40 changes: 40 additions & 0 deletions src/cvec/http_cache.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
"""In-memory HTTP cache for GET requests with Cache-Control and ETag support."""

from dataclasses import dataclass
from typing import Any, Optional


MAX_CACHE_ENTRIES = 100


@dataclass
class CacheEntry:
"""A cached HTTP response."""

data: Any
etag: Optional[str]
max_age: int
stored_at: float

@property
def expires_at(self) -> float:
"""Monotonic time when this entry expires."""
return self.stored_at + self.max_age


def parse_max_age(header: Optional[str]) -> Optional[int]:
"""Parse max-age value from a Cache-Control header.

Returns:
The max-age value in seconds, or None if not present.
"""
if header is None:
return None
for directive in header.split(","):
directive = directive.strip()
if directive.startswith("max-age="):
try:
return int(directive[len("max-age=") :])
except ValueError:
return None
return None
Loading
Loading