Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"

[project]
name = "singlestoredb"
version = "1.16.10"
version = "1.16.11rc1+byasaini"
description = "Interface to the SingleStoreDB database and workspace management APIs"
readme = {file = "README.md", content-type = "text/markdown"}
license = {text = "Apache-2.0"}
Expand Down
295 changes: 295 additions & 0 deletions singlestoredb/ai/embeddings.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
import contextvars
import logging
import os
import time
import uuid
from typing import Any
from typing import AsyncIterator
from typing import Callable
from typing import Optional
from typing import Union
Expand All @@ -9,6 +14,231 @@
from singlestoredb import manage_workspaces
from singlestoredb.management.inference_api import InferenceAPIInfo


# Per-task trace id propagated into the embeddings HTTP transport.
#
# Callers (e.g. an EMBED_TEXT UDF) can do ``http_trace_id.set("<id>")``
# right before invoking ``aembed_documents``. The :class:`TracingAsyncTransport`
# reads this var inside ``handle_async_request`` and stamps every log line
# with it, so each per-stage HTTP timing line can be correlated back to the
# UDF request that produced it.
#
# ContextVars are per-asyncio-Task: even with many concurrent EMBED_TEXT
# coroutines sharing a single dispatch loop, each call has its own private
# context, so a set() in one task does not leak into another.
http_trace_id: 'contextvars.ContextVar[str]' = contextvars.ContextVar(
'singlestoredb_embeddings_http_trace_id', default='-',
)

_http_log = logging.getLogger('singlestoredb.ai.embeddings.http')


def _fmt_addr(addr: Any) -> str:
"""Format a ``(host, port, ...)`` sockaddr tuple as ``host:port``."""
if not addr:
return '?'
try:
return f'{addr[0]}:{addr[1]}'
except Exception:
return str(addr)


# Hosts already logged as transport-pinned, so we only emit one line per host
# instead of one per request.
_pin_logged_hosts: 'set[str]' = set()


class HttpTraceIdFilter(logging.Filter):
"""
Stamps every log record with the current :data:`http_trace_id` value
under ``record.trace_id``.

Attach this to handlers (or loggers) for ``httpx`` / ``httpcore`` so
that their low-level per-stage log lines (``connect_tcp.started``,
``start_tls.started``, ``send_request_body.complete`` etc.) carry the
same trace id the caller stamped before invoking ``aembed_documents``.

Why this works: ``httpcore`` runs inside the same ``asyncio.Task`` as
the caller (it's just deeper in the ``await`` chain), and ContextVars
are per-Task, so ``http_trace_id.get()`` inside ``filter()`` returns
the value set by the caller for that specific request. Each concurrent
EMBED_TEXT call has its own value and they do not bleed across.
"""

def filter(self, record: logging.LogRecord) -> bool:
record.trace_id = http_trace_id.get()
return True


def enable_http_debug_logging(level: int = logging.DEBUG) -> None:
"""
Turn on per-stage httpx / httpcore logging.

This is very verbose (one log line per TCP connect, TLS handshake,
header send, body send, response header read, response body read, etc.)
but is the fastest way to pinpoint which network phase a stuck embedding
request is sitting in. Enable on demand, e.g. by setting the
``SINGLESTOREDB_EMBEDDINGS_HTTP_DEBUG=1`` env var or by calling this
function directly.
"""
for name in (
'httpx',
'httpcore',
'httpcore.connection',
'httpcore.http11',
'httpcore.http2',
'httpcore.proxy',
):
logging.getLogger(name).setLevel(level)


if os.environ.get(
'SINGLESTOREDB_EMBEDDINGS_HTTP_DEBUG', '',
).lower() in ('1', 'true', 'yes'):
enable_http_debug_logging()


class TracingAsyncTransport(httpx.AsyncBaseTransport):
"""
Wraps another :class:`httpx.AsyncBaseTransport` and logs per-stage
timings for every request, stamped with the current :data:`http_trace_id`
context value.

Emits three log lines per request:

1. ``->`` when the request is handed to the inner transport, with the
outgoing body size.
2. ``<- headers`` when the response headers arrive (gives time-to-first-
byte, i.e. the gap that captures DNS + TCP + TLS + upload + upstream
processing).
3. ``<- body`` when the response body is fully consumed (gives separate
body-download elapsed and total elapsed).

The 1->2 gap vs the 2->3 gap is what tells you whether a hang is on the
request/upstream side or on the response/download side.
"""

def __init__(self, inner: httpx.AsyncBaseTransport) -> None:
self._inner = inner

async def handle_async_request(
self, request: httpx.Request,
) -> httpx.Response:
tid = http_trace_id.get()
rid = uuid.uuid4().hex[:6]

# Deterministic, resolver-independent IP pin. A DNS pin (monkeypatching
# socket.getaddrinfo) is bypassed by httpx/anyio's resolution path, so
# connections still spread across every NLB IP and hit the cross-AZ
# source-port collision. Here we instead dial a fixed IP at the
# transport while preserving TLS SNI + certificate validation + the
# Host header, so every embedding request lands on one endpoint
# regardless of how DNS is resolved. Enabled via
# SINGLESTOREDB_EMBEDDINGS_PIN_IP (optionally restricted to
# SINGLESTOREDB_EMBEDDINGS_PIN_HOST). Read per-request so a pin set
# after this client was constructed (e.g. by the EMBED_TEXT notebook)
# still takes effect.
pin_ip = os.environ.get('SINGLESTOREDB_EMBEDDINGS_PIN_IP')
if pin_ip:
pin_host = os.environ.get('SINGLESTOREDB_EMBEDDINGS_PIN_HOST')
host = request.url.host
if host and host != pin_ip and (not pin_host or host == pin_host):
try:
# Preserve the original Host header (httpx set it to the
# hostname at build time); only ensure it is present.
if 'host' not in request.headers:
request.headers['Host'] = host
request.extensions = {
**request.extensions, 'sni_hostname': host,
}
request.url = request.url.copy_with(host=pin_ip)
if host not in _pin_logged_hosts:
_pin_logged_hosts.add(host)
_http_log.warning(
'[%s/%s] TRANSPORT IP PIN active: dialing %s via %s '
'(SNI/cert/Host preserved)', tid, rid, host, pin_ip,
)
except Exception as e:
_http_log.warning(
'[%s/%s] TRANSPORT IP PIN failed: %r', tid, rid, e,
)

try:
body_len = len(request.content or b'')
except httpx.RequestNotRead:
body_len = -1
t0 = time.perf_counter()
_http_log.info(
'[%s/%s] -> %s %s body=%dB',
tid, rid, request.method, request.url, body_len,
)
try:
response = await self._inner.handle_async_request(request)
except BaseException as e:
_http_log.error(
'[%s/%s] xx EXC after %.3fs: %r',
tid, rid, time.perf_counter() - t0, e,
)
raise

t_headers = time.perf_counter()
# Surface the actual local/remote socket addresses for this request so
# the (src_ip:src_port -> dst_ip:dst_port) 4-tuple can be correlated
# with node-side tcpdump/conntrack captures. ``src`` here is the pod's
# chosen ephemeral port (the one Cilium masquerade normally preserves),
# and ``dst`` is the resolved endpoint IP actually connected to.
local_addr = remote_addr = None
try:
stream = response.extensions.get('network_stream')
if stream is not None:
local_addr = stream.get_extra_info('client_addr')
remote_addr = stream.get_extra_info('server_addr')
if local_addr is None or remote_addr is None:
sock = stream.get_extra_info('socket')
if sock is not None:
local_addr = local_addr or sock.getsockname()
remote_addr = remote_addr or sock.getpeername()
except Exception:
pass
_http_log.info(
'[%s/%s] <- headers status=%d ttfb=%.3fs src=%s dst=%s',
tid, rid, response.status_code, t_headers - t0,
_fmt_addr(local_addr), _fmt_addr(remote_addr),
)

# Wrap the body stream so we also time how long the body download
# itself takes. Captures `tid`, `rid`, `t0`, `t_headers` by closure
# so the log line is correctly correlated even though body consumption
# happens later (after handle_async_request has returned).
original_stream = response.stream

class _TimingStream(httpx.AsyncByteStream):

async def __aiter__(self) -> AsyncIterator[bytes]:
total = 0
t_body_start = time.perf_counter()
try:
async for chunk in original_stream:
total += len(chunk)
yield chunk
finally:
t_body_end = time.perf_counter()
_http_log.info(
'[%s/%s] <- body bytes=%d body_elapsed=%.3fs '
'total=%.3fs',
tid, rid, total,
t_body_end - t_body_start, t_body_end - t0,
)

async def aclose(self) -> None:
await original_stream.aclose()

response.stream = _TimingStream()
return response

async def aclose(self) -> None:
await self._inner.aclose()

try:
from langchain_openai import OpenAIEmbeddings
except ImportError:
Expand All @@ -34,6 +264,7 @@ def SingleStoreEmbeddingsFactory(
model_name: str,
api_key: Optional[str] = None,
http_client: Optional[httpx.Client] = None,
http_async_client: Optional[httpx.AsyncClient] = None,
obo_token_getter: Optional[Callable[[], Optional[str]]] = None,
base_url: Optional[str] = None,
hosting_platform: Optional[str] = None,
Expand Down Expand Up @@ -152,6 +383,70 @@ def _inject_headers(request: Any, **_ignored: Any) -> None:
)
if http_client is not None:
openai_kwargs['http_client'] = http_client

if http_async_client is None:
# Explicit timeouts: without these, httpx falls back to its 5s
# default at the client level, but the OpenAI SDK overrides that
# with a per-request 600s read timeout, so a stalled response can
# sit on the socket for ~10 minutes before httpx notices. We use a
# tighter read timeout so a dead/half-open connection fails fast
# instead of waiting for the application-level defensive timeout
# (e.g. EMBED_TEXT's asyncio.wait_for) to fire.
client_timeout = httpx.Timeout(
connect=float(
os.environ.get(
'SINGLESTOREDB_EMBEDDINGS_CONNECT_TIMEOUT', '10',
),
),
read=float(
os.environ.get(
'SINGLESTOREDB_EMBEDDINGS_READ_TIMEOUT', '60',
),
),
write=float(
os.environ.get(
'SINGLESTOREDB_EMBEDDINGS_WRITE_TIMEOUT', '30',
),
),
pool=float(
os.environ.get(
'SINGLESTOREDB_EMBEDDINGS_POOL_TIMEOUT', '10',
),
),
)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sync timeouts ignored async client

Medium Severity

SingleStoreEmbeddingsFactory parses connect_timeout and read_timeout from the optional sync http_client for Bedrock, but the new default httpx.AsyncClient for OpenAI embeddings only uses env defaults (60s read). Callers who configure timeouts via http_client get different limits on async embedding HTTP than they expect.

Fix in Cursor Fix in Web

Reviewed by Cursor Bugbot for commit 018db7a. Configure here.

# Allow connection reuse. The previous configuration
# (max_keepalive_connections=0) forced a fresh TCP+TLS handshake
# for every request, which under heavy concurrency churns sockets
# and occasionally yields one connection that the upstream accepts
# but never finishes responding on.
client_limits = httpx.Limits(
max_connections=int(
os.environ.get(
'SINGLESTOREDB_EMBEDDINGS_MAX_CONNECTIONS', '64',
),
),
max_keepalive_connections=int(
os.environ.get(
'SINGLESTOREDB_EMBEDDINGS_MAX_KEEPALIVE', '16',
),
),
keepalive_expiry=float(
os.environ.get(
'SINGLESTOREDB_EMBEDDINGS_KEEPALIVE_EXPIRY', '30',
),
),
)
http_async_client = httpx.AsyncClient(
timeout=client_timeout,
limits=client_limits,
transport=TracingAsyncTransport(
httpx.AsyncHTTPTransport(
limits=client_limits,
),
),
)
openai_kwargs['http_async_client'] = http_async_client

return OpenAIEmbeddings(
**openai_kwargs,
**kwargs,
Expand Down
13 changes: 13 additions & 0 deletions singlestoredb/apps/_python_udfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,24 @@ async def run_udf_app(
f'You can only define a maximum of {MAX_UDFS_LIMIT} functions.',
)

# uvicorn's default keep-alive timeout (5s) makes the server the active
# closer of idle pooled connections from the upstream proxy (cilium-envoy).
# The server then holds the TCP TIME-WAIT for that 4-tuple for ~60s; when the
# proxy reuses the same source port within that window the new SYN collides
# with the lingering TIME-WAIT socket and is silently dropped, surfacing as
# 5s connect timeouts (503 "upstream connect error ... connection timeout").
# Keeping connections warm past the proxy's idle interval avoids that churn.
# keep_alive_timeout = int(
# os.environ.get('SINGLESTOREDB_APP_KEEPALIVE_TIMEOUT', '120'),
# )
keep_alive_timeout = 120

config = uvicorn.Config(
app,
host='0.0.0.0',
port=app_config.listen_port,
log_config=app.get_uvicorn_log_config(),
timeout_keep_alive=keep_alive_timeout,
)

# Register the functions only if the app is running interactively.
Expand Down
Loading
Loading