diff --git a/CMakeLists.txt b/CMakeLists.txt index cadc899..55044d8 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -14,7 +14,7 @@ set(CMAKE_CXX_STANDARD_REQUIRED ON) include_directories(src/include) -set(EXTENSION_SOURCES src/quackscale_extension.cpp src/attach_ducklake.cpp src/tailscale_bridge.cpp src/tailscale_forwarder.cpp src/tailscale_log_capture.cpp) +set(EXTENSION_SOURCES src/quackscale_extension.cpp src/attach_ducklake.cpp src/tailscale_bridge.cpp src/tailscale_forwarder.cpp src/tailscale_log_capture.cpp src/tailscale_http.cpp) if(QUACKSCALE_WITH_TAILSCALE) include(${CMAKE_CURRENT_SOURCE_DIR}/cmake/Libtailscale.cmake) diff --git a/README.md b/README.md index d337f1e..7cb46eb 100644 --- a/README.md +++ b/README.md @@ -243,7 +243,9 @@ flowchart TB m -->|encrypted TCP| c2 ``` -**`tailscale_quack_forward`** is required on each client when using embedded tsnet: Quack speaks normal HTTP/TCP, which kernel routing does not send over the tailnet by itself. The forwarder listens on loopback and dials the server via `tailscale_dial`, then clients use **`ATTACH 'quack:…'`** for the remote catalog or **`attach_ducklake`** for server-owned DuckLake tables. +After `tailscale_up`, QuackScale wraps DuckDB's HTTP layer so requests to tailnet hosts (`100.64.0.0/10`, `*.ts.net`) are dialed over tsnet automatically — clients can **`ATTACH 'quack:100.x.x.x:9494'`** directly, no forwarder. Opt out with `http_route => false`. + +**`tailscale_quack_forward`** remains for cases the router does not cover: bare MagicDNS **short** names (e.g. `lake-server` without a `.ts.net` suffix), a pinned `127.0.0.1:` endpoint, or non-HTTP consumers. It listens on loopback and dials the server via `tailscale_dial`. Either path feeds **`ATTACH 'quack:…'`** or **`attach_ducklake`**. End-to-end recipes and DuckLake patterns: **[docs/GUIDE.md](docs/GUIDE.md)**. @@ -251,7 +253,7 @@ End-to-end recipes and DuckLake patterns: **[docs/GUIDE.md](docs/GUIDE.md)**. ## SQL API (`LOAD quackscale`) -Use **`CALL`** for table functions (same style as `CALL quack_serve`). Parameters for `tailscale_up` / `tailscale_login`: `hostname`, `authkey` (or `TS_AUTHKEY` env), `control_url`, `state_dir`, `ephemeral`, `loopback_proxy`. +Use **`CALL`** for table functions (same style as `CALL quack_serve`). Parameters for `tailscale_up` / `tailscale_login`: `hostname`, `authkey` (or `TS_AUTHKEY` env), `control_url`, `state_dir`, `ephemeral`, `loopback_proxy`, `http_route` (transparent HTTP routing to tailnet hosts — default `true`). ### Tailnet lifecycle @@ -269,7 +271,7 @@ Use **`CALL`** for table functions (same style as `CALL quack_serve`). Parameter |---------|---------| | [`CALL tailscale_serve_local(port => 9494)`](docs/GUIDE.md#use-case-1--remote-duckdb-hub-pattern-a) | Tailscale Serve: tailnet TCP **→** `127.0.0.1:9494`. Run after local `quack_serve`. | | [`CALL tailscale_ping(host => 'peer', port => 9494)`](docs/GUIDE.md#observability) | TCP dial to a peer over tsnet — readiness before Quack `ATTACH`. | -| [`CALL tailscale_quack_forward(host => 'peer', port => 9494)`](docs/GUIDE.md#standard-client-connection-recipe) | Listen on loopback; dial peer for each Quack HTTP connection. Returns `quack_uri`. **Preferred client path.** | +| [`CALL tailscale_quack_forward(host => 'peer', port => 9494)`](docs/GUIDE.md#standard-client-connection-recipe) | Listen on loopback; dial peer for each Quack HTTP connection. Returns `quack_uri`. For MagicDNS short names, pinned local ports, or non-HTTP clients — otherwise `ATTACH 'quack:100.x:9494'` works directly after `tailscale_up`. | | [`CALL tailscale_quack_proxy()`](docs/DEVELOPMENT.md) | Legacy SOCKS proxy + `ALL_PROXY` — deprecated; use `tailscale_quack_forward`. | | [`CALL tailscale_proxy_status()`](docs/DEVELOPMENT.md) | Legacy SOCKS status. | diff --git a/docs/GUIDE.md b/docs/GUIDE.md index a55cff6..2c6b946 100644 --- a/docs/GUIDE.md +++ b/docs/GUIDE.md @@ -30,7 +30,9 @@ Credentials: [AUTHENTICATION.md](AUTHENTICATION.md). Build and SQL reference: [R └─────────────────────────────────────────────────────────────────┘ ``` -**Why `tailscale_quack_forward`?** Quack uses normal HTTP/TCP. Embedded tsnet does not route kernel TCP to tailnet IPs. The forwarder listens on loopback and dials the peer via `tailscale_dial`. +**Transparent routing (default).** After `tailscale_up`, QuackScale wraps DuckDB's HTTP layer so requests to tailnet hosts (`100.64.0.0/10`, `*.ts.net`) are dialed over tsnet — `ATTACH 'quack:100.x.x.x:9494'` works with no forwarder. Disable with `tailscale_up(..., http_route => false)`. + +**Why `tailscale_quack_forward`?** It covers what the router does not: bare MagicDNS **short** names (no `.ts.net` suffix), a pinned `127.0.0.1:` endpoint, or non-HTTP clients. It listens on loopback and dials the peer via `tailscale_dial`. The compose demo uses it (stable hostnames) and also probes the direct-router path. **Why `tailscale_down`?** `tailscale_up` and the forwarder start background threads. One-shot DuckDB processes **hang after SQL finishes** unless tsnet is shut down. @@ -325,7 +327,7 @@ DuckLake metadata: file (`*.ducklake`), Postgres, or DuckDB — see [DuckLake at |-------|------------| | `remote.lake.table` does not exist | Use `attach_ducklake`, `quack_query`, or `ducklake:quack:` | | Client hangs after SQL completes | Emit done marker, then `CALL tailscale_down()` | -| Kernel TCP to `100.x:9494` fails from tsnet client | Use `tailscale_quack_forward` | +| Kernel TCP to `100.x:9494` fails from tsnet client | `ATTACH 'quack:100.x:9494'` after `tailscale_up` (transparent router), or `tailscale_quack_forward` for short names / non-HTTP | | `quack_query` + `ATTACH remote` stalls | Run lake queries **before** attach; separate statements | | `quack_query(…, quack_discover())` hangs | Discover locally or use known hostname | diff --git a/scripts/e2e/quacktail-compose-bootstrap.sh b/scripts/e2e/quacktail-compose-bootstrap.sh index 2051b18..bf9cb98 100644 --- a/scripts/e2e/quacktail-compose-bootstrap.sh +++ b/scripts/e2e/quacktail-compose-bootstrap.sh @@ -114,6 +114,33 @@ FROM quack_query( SQL } +# Direct ATTACH over the transparent HTTPUtil router — NO tailscale_quack_forward. tailscale_up +# auto-installs the router, so the server's 100.x tailnet IP is dialed over tsnet directly. +# A bare MagicDNS short name is not routable this way (it's neither a *.ts.net FQDN nor a 100.x +# IP, so IsTailnetHost rejects it), hence we attach the resolved IP. +compose_sql_router_probe() { + local server_ip="${1:?server tailnet ip required}" + local router_uri="quack:${server_ip}:${QUACK_PORT}" + cat <&2 + else + echo "warn: router probe skipped — server tailnet IP not resolvable yet (regenerated at client run)" >&2 + fi + else + echo "note: router probe skipped — this quackscale build has no http_route (transparent router)" >&2 + fi if duckdb_has_quackscale_function tailscale_ping; then ping_sql="CALL tailscale_ping(host => '${SERVER_HOST}', port => ${QUACK_PORT});" fi @@ -236,6 +279,8 @@ FROM quack_query( disable_ssl => true ); +${router_probe_sql} + ${lake_discover_sql} ${lake_attach_sql} ${lake_select} diff --git a/scripts/e2e/quacktail-entrypoint.sh b/scripts/e2e/quacktail-entrypoint.sh index 549bb06..5717faa 100755 --- a/scripts/e2e/quacktail-entrypoint.sh +++ b/scripts/e2e/quacktail-entrypoint.sh @@ -197,6 +197,11 @@ quacktail_client_session_succeeded() { if [[ "${QUACKTAIL_ENABLE_DUCKLAKE:-0}" == "1" ]]; then grep -q "LAKE_PASSED" "$out" 2>/dev/null || return 1 fi + # If the session includes the transparent-router probe (direct ATTACH, no forwarder), it + # must pass too. Gated on the generated SQL so older builds without the probe still pass. + if grep -q "ROUTER_PASSED" "${WORK}/client_session.sql" 2>/dev/null; then + grep -q "ROUTER_PASSED" "$out" 2>/dev/null || return 1 + fi return 0 } @@ -391,6 +396,14 @@ run_client() { exit 1 fi + # Make router coverage explicit: green output should never leave it ambiguous whether the + # transparent-router path (direct ATTACH, no forwarder) was actually exercised this run. + if grep -q "ROUTER_PASSED" "${WORK}/client_session.sql" 2>/dev/null; then + echo "✓ transparent router exercised (ROUTER_PASSED)" + else + echo "note: transparent-router probe not in this session — forwarder path only (see bootstrap log)" + fi + if [[ "$QUIET" == "1" ]]; then quacktail_show_client_demo_output "$out" echo "" diff --git a/scripts/lib/quacktail_ext.sh b/scripts/lib/quacktail_ext.sh index 898695f..1999a5c 100755 --- a/scripts/lib/quacktail_ext.sh +++ b/scripts/lib/quacktail_ext.sh @@ -31,6 +31,32 @@ quacktail_has_quackscale_function() { [[ "$count" == "1" ]] } +# True if this quackscale build supports transparent HTTP routing (the http_route parameter +# on tailscale_up). Gates the e2e router probe so older release binaries are not asserted. +# Fails *loudly* (logs an error) if quackscale can't even load, so a broken build is not +# silently mistaken for "old build without the router". +quacktail_quackscale_supports_router() { + local duckdb_bin="${DUCKDB_BIN:-/usr/local/bin/duckdb}" + local ext_dir="${DUCKDB_EXTENSION_DIRECTORY:-$(quacktail_ext_container_dir)}" + local set_ext="SET extension_directory='${ext_dir}';" + local out + [[ -x "$duckdb_bin" ]] || { echo "warn: router probe: duckdb not executable at $duckdb_bin" >&2; return 1; } + + # Baseline: prove quackscale actually loads and runs. If this fails the build is broken — say so + # loudly rather than silently concluding "router unsupported" (which would skip the assertion). + if ! "$duckdb_bin" :memory: -batch -c "${set_ext} LOAD quackscale; SELECT 1;" >/dev/null 2>&1; then + echo "error: router probe: 'LOAD quackscale; SELECT 1' failed — quackscale not loadable at ${ext_dir}" >&2 + return 1 + fi + + # An invalid named parameter makes tailscale_up list its valid ones (a bind-time error, so + # tailscale_up never runs). "http_route" in that list ⇒ this build has the router. + out="$("$duckdb_bin" :memory: -batch -c \ + "${set_ext} LOAD quackscale; CALL tailscale_up(quackscale_router_capability_probe => true);" \ + 2>&1)" || true + printf '%s\n' "$out" | grep -q 'http_route' +} + quacktail_list_quackscale_functions() { local duckdb_bin="${DUCKDB_BIN:-/usr/local/bin/duckdb}" local ext_dir="${DUCKDB_EXTENSION_DIRECTORY:-$(quacktail_ext_container_dir)}" diff --git a/src/include/tailscale_bridge.hpp b/src/include/tailscale_bridge.hpp index 367982c..6c90845 100644 --- a/src/include/tailscale_bridge.hpp +++ b/src/include/tailscale_bridge.hpp @@ -80,7 +80,15 @@ class TailscaleBridge { //! Dial host:port over the tailnet via tsnet (peer connectivity check). void PingTCP(const string &host, idx_t port, idx_t timeout_ms); + //! Dial a tailnet peer over tsnet and return the connected socket fd (caller owns/closes + //! it). Throws if the node is not up or the dial fails. Used by the HTTPUtil router. + int DialTCP(const string &host, idx_t port); + string PrimaryTailnetIP() const; + //! First IPv4 (colon-free) tailnet address — in practice the 100.64.0.0/10 CGNAT IP, since + //! `ips` only holds tailnet addresses. This is the one the transparent HTTPUtil router can + //! reach. Empty if the node only has an IPv6 tailnet address yet. + string RoutableTailnetIP() const; string FormatQuackURI(const string &host, idx_t port) const; string QuackListenURI(idx_t port = QUACKSCALE_DEFAULT_QUACK_PORT) const; vector QuackDiscoveryEndpoints(idx_t port = QUACKSCALE_DEFAULT_QUACK_PORT) const; diff --git a/src/include/tailscale_http.hpp b/src/include/tailscale_http.hpp new file mode 100644 index 0000000..0ec73d0 --- /dev/null +++ b/src/include/tailscale_http.hpp @@ -0,0 +1,110 @@ +#pragma once + +#include "duckdb/common/http_util.hpp" +#include "duckdb/common/unordered_map.hpp" +#include "duckdb/common/vector.hpp" + +#include + +namespace duckdb { + +class DatabaseInstance; + +//! True if `proto_host_port` (e.g. "http://100.95.32.19:9494") names a tailnet host: +//! an IPv4 in the CGNAT range 100.64.0.0/10, or a *.ts.net MagicDNS name. Scheme and +//! :port are stripped before the test. NOTE: bare MagicDNS short names ("lake-server") +//! are NOT matched — those still need tailscale_quack_forward. +bool IsTailnetHost(const string &proto_host_port); + +//! Install TailscaleHTTPUtil as the database's global HTTP util, wrapping whatever util is +//! currently registered (httpfs, after auto-load). Idempotent: a second call is a no-op. Called +//! from tailscale_up (after the node is up) and from tailscale_login (the wrap is inert until the +//! node comes up), unless http_route => false. +void RegisterTailscaleHTTPUtil(DatabaseInstance &db); + +//! HTTP/1.1 client that speaks plaintext over a tailscale_dial'd fd to a tailnet peer (the tailnet +//! is the encryption layer, so only http:// is routed here — https:// is left to httpfs). Holds +//! one keep-alive connection open across requests and frames responses by Content-Length, chunked +//! transfer-encoding, or read-to-EOF (which then closes the connection); a stale pooled connection +//! is transparently redialed once for idempotent requests. +class TailscaleHTTPClient : public HTTPClient { +public: + explicit TailscaleHTTPClient(const string &proto_host_port); + ~TailscaleHTTPClient() override; + + void Initialize(HTTPParams &http_params) override; + + unique_ptr Get(GetRequestInfo &info) override; + unique_ptr Post(PostRequestInfo &info) override; + unique_ptr Put(PutRequestInfo &info) override; + unique_ptr Head(HeadRequestInfo &info) override; + unique_ptr Delete(DeleteRequestInfo &info) override; + + //! Parsed HTTP response. Public so the file-local ToHTTPResponse() helper can move from it. + struct ParsedResponse { + int status = 0; + string reason; + HTTPHeaders headers; + string body; + //! The peer signalled (or the framing implies) that this connection must not be reused. + bool connection_close = false; + }; + +private: + //! Send one request and read the full response, redialing once if a *reused* keep-alive + //! connection turns out to be dead. `has_body` requests carry Content-Length framing. + ParsedResponse RoundTrip(const string &method, const string &path, const HTTPHeaders &headers, + const_data_ptr_t body, idx_t body_len, bool has_body); + + void CloseConn(); + bool ReadMore(); //!< append one chunk of socket data into rx; false on EOF/error + bool ReadResponse(const string &method, ParsedResponse &out); + bool ReadChunkedBody(string &body); + + string host; //!< parsed from base_url, scheme + port stripped + string port; //!< defaults to "80" if the URL omits it + int fd = -1; //!< persistent keep-alive connection, -1 == not connected + string rx; //!< bytes read from fd but not consumed; empty on a fresh dial, carried across reused requests + bool read_error = false; //!< last ReadMore hit a socket error (vs a clean EOF); reset per response + idx_t timeout_seconds = 30; +}; + +//! Global HTTP util that intercepts tailnet hosts and delegates everything else to the +//! previously-registered util (httpfs), preserving real TLS / proxies / secrets and its +//! keep-alive cache for non-tailnet traffic. Maintains a small idle pool of tailnet +//! clients so keep-alive survives across DuckDB file handles. +class TailscaleHTTPUtil : public HTTPUtil { +public: + explicit TailscaleHTTPUtil(HTTPUtil &prev) : prev(prev) { + } + + string GetName() const override { + return "Tailscale"; + } + + unique_ptr InitializeParameters(DatabaseInstance &db, const string &path) override { + return prev.InitializeParameters(db, path); + } + unique_ptr InitializeParameters(ClientContext &context, const string &path) override { + return prev.InitializeParameters(context, path); + } + unique_ptr InitializeParameters(optional_ptr opener, + optional_ptr info) override { + return prev.InitializeParameters(opener, info); + } + + unique_ptr InitializeClient(HTTPParams &http_params, const string &proto_host_port) override; + void CloseClient(unique_ptr &&client) override; + +private: + //! Reference to the previous global util. After SetHTTPUtil() the old util is retained + //! in DBConfig (old_http_utils) for the DB lifetime, so this ref stays valid. + HTTPUtil &prev; + + //! Idle keep-alive clients keyed by proto_host_port, capped at MAX_IDLE_PER_HOST each. + static constexpr idx_t MAX_IDLE_PER_HOST = 8; + std::mutex pool_mutex; + unordered_map>> idle_clients; +}; + +} // namespace duckdb diff --git a/src/quackscale_extension.cpp b/src/quackscale_extension.cpp index efde422..b882542 100644 --- a/src/quackscale_extension.cpp +++ b/src/quackscale_extension.cpp @@ -4,11 +4,13 @@ #include "quackscale_defaults.hpp" #include "attach_ducklake.hpp" #include "tailscale_bridge.hpp" +#include "tailscale_http.hpp" #include "duckdb.hpp" #include "duckdb/common/exception.hpp" #include "duckdb/function/scalar_function.hpp" #include "duckdb/function/table_function.hpp" +#include "duckdb/main/database.hpp" #include "duckdb/parser/parsed_data/create_scalar_function_info.hpp" #include @@ -57,10 +59,38 @@ static void RegisterAuthParameters(TableFunction &function) { function.named_parameters["state_dir"] = LogicalType::VARCHAR; function.named_parameters["ephemeral"] = LogicalType::BOOLEAN; function.named_parameters["loopback_proxy"] = LogicalType::BOOLEAN; + // Transparent HTTP routing: when true (default) tailscale_up/login installs the HTTPUtil + // so HTTP to tailnet hosts (100.64/10, *.ts.net) is dialed over tsnet — ATTACH 'quack:...' + // works without tailscale_quack_forward. Set false to use only the forwarder. + function.named_parameters["http_route"] = LogicalType::BOOLEAN; +} + +//! Whether transparent HTTP routing was requested (default true). Kept out of +//! TailscaleAuthConfig because installing the router needs the DatabaseInstance, which only +//! the table function has — the bridge is database-agnostic. +static bool ParseHttpRoute(TableFunctionBindInput &input) { + auto it = input.named_parameters.find("http_route"); + if (it != input.named_parameters.end() && !it->second.IsNull()) { + return it->second.GetValue(); + } + return true; +} + +//! Install the tailnet HTTPUtil router unless the user opted out. No-op without libtailscale. +static void MaybeInstallHttpRoute(ClientContext &context, bool http_route) { +#ifdef QUACKSCALE_WITH_TAILSCALE + if (http_route) { + RegisterTailscaleHTTPUtil(DatabaseInstance::GetDatabase(context)); + } +#else + (void)context; + (void)http_route; +#endif } struct QuackscaleUpBindData : public TableFunctionData { TailscaleAuthConfig config; + bool http_route = true; bool finished = false; }; @@ -68,6 +98,7 @@ static unique_ptr QuackscaleUpBind(ClientContext &context, TableFu vector &return_types, vector &names) { auto bind = make_uniq(); bind->config = ParseAuthConfig(input); + bind->http_route = ParseHttpRoute(input); return_types = {LogicalType::BOOLEAN, LogicalType::VARCHAR, LogicalType::LIST(LogicalType::VARCHAR)}; names = {"running", "hostname", "tailnet_ips"}; @@ -83,6 +114,9 @@ static void QuackscaleUpFunction(ClientContext &context, TableFunctionInput &dat auto &bridge = TailscaleBridge::Get(); bridge.Up(bind.config); auto status = bridge.Status(); + if (status.running) { + MaybeInstallHttpRoute(context, bind.http_route); + } output.SetCardinality(1); output.SetValue(0, 0, Value::BOOLEAN(status.running)); @@ -217,6 +251,7 @@ static void QuackscaleDiscoverFunction(ClientContext &context, TableFunctionInpu struct QuackscaleBeginLoginBindData : public TableFunctionData { TailscaleAuthConfig config; + bool http_route = true; bool finished = false; }; @@ -224,6 +259,7 @@ static unique_ptr QuackscaleBeginLoginBind(ClientContext &context, vector &return_types, vector &names) { auto bind = make_uniq(); bind->config = ParseAuthConfig(input); + bind->http_route = ParseHttpRoute(input); return_types = {LogicalType::VARCHAR, LogicalType::VARCHAR, LogicalType::VARCHAR}; names = {"status", "login_url", "message"}; return std::move(bind); @@ -234,6 +270,10 @@ static void QuackscaleBeginLoginFunction(ClientContext &context, TableFunctionIn if (bind.finished) { return; } + // Login is asynchronous, so install the router now: the wrap is inert until the node is + // up (dials for tailnet hosts simply fail until then), and this avoids threading the + // http_route flag through the login-status polling path. + MaybeInstallHttpRoute(context, bind.http_route); TailscaleBridge::Get().BeginInteractiveLogin(bind.config); auto login = TailscaleBridge::Get().LoginStatus(); output.SetCardinality(1); diff --git a/src/tailscale_bridge.cpp b/src/tailscale_bridge.cpp index 551dc8c..6b22bae 100644 --- a/src/tailscale_bridge.cpp +++ b/src/tailscale_bridge.cpp @@ -466,6 +466,43 @@ void TailscaleBridge::PingTCP(const string &host, idx_t port, idx_t timeout_ms) #endif } +int TailscaleBridge::DialTCP(const string &host, idx_t port) { +#ifdef QUACKSCALE_WITH_TAILSCALE + if (host.empty()) { + throw IOException("tailscale dial: host must not be empty"); + } + if (port == 0 || port > 65535) { + throw IOException("tailscale dial: port must be between 1 and 65535"); + } + int ts_handle; + { + std::lock_guard guard(g_tailscale_mutex); + if (!running) { + throw IOException("tailscale dial: node is not up; call tailscale_up() first"); + } + EnsureHandle(); + ts_handle = handle; + } + // Dial outside the lock: tailscale_dial blocks while the netstack establishes the + // connection, and the handle is stable once the node is up. tsnet's Dial is safe for + // concurrent use on one server, so this is sound — and necessary: holding g_tailscale_mutex + // across the dial would serialize every node operation behind one in-flight connect, which + // would defeat the parallel range reads httpfs issues over the router. (The forwarder's + // dial_fn holds the lock across its dial; that is over-conservative, not a different + // requirement.) + auto addr = StringUtil::Format("%s:%d", host, port); + tailscale_conn conn = -1; + if (tailscale_dial(ts_handle, "tcp", addr.c_str(), &conn) != 0) { + throw IOException("tailscale_dial(%s) failed: %s", addr, LastErrorMessage()); + } + return conn; +#else + (void)host; + (void)port; + throw NotImplementedException("QuackScale was built without libtailscale."); +#endif +} + string TailscaleBridge::PrimaryTailnetIP() const { if (ips.empty()) { throw InvalidInputException("Tailscale is not up or has no tailnet IPs yet. Call tailscale_up() first."); @@ -480,7 +517,24 @@ string TailscaleBridge::FormatQuackURI(const string &host, idx_t port) const { return StringUtil::Format("quack:%s:%d", host, port); } +string TailscaleBridge::RoutableTailnetIP() const { + // IsTailnetHost (the transparent HTTPUtil router) matches IPv4 in 100.64.0.0/10, not the + // IPv6 ULA; the first colon-free address is our tailnet IPv4. + for (auto &ip : ips) { + if (ip.find(':') == string::npos) { + return ip; + } + } + return string(); +} + string TailscaleBridge::QuackListenURI(idx_t port) const { + // Prefer the routable 100.x IPv4 so the returned URI works with the transparent router + // (ATTACH 'quack:100.x:port') without a forwarder. Fall back to MagicDNS, then any IP. + auto routable = RoutableTailnetIP(); + if (!routable.empty()) { + return FormatQuackURI(routable, port); + } if (!hostname.empty()) { return FormatQuackURI(hostname, port); } @@ -493,21 +547,26 @@ vector TailscaleBridge::QuackDiscoveryEndpoints(idx_t po return endpoints; } - if (!hostname.empty()) { + // Tailnet IPs first: the IPv4 100.x address is directly ATTACH-able through the transparent + // HTTPUtil router, so it leads. (IPv6 rows follow; IsTailnetHost does not match them yet.) + for (auto &ip : ips) { QuackDiscoveryEndpoint entry; - entry.host = hostname; + entry.host = ip; entry.port = port; - entry.via = "magicdns"; - entry.listen_uri = FormatQuackURI(hostname, port); + entry.via = "tailnet_ip"; + entry.listen_uri = FormatQuackURI(ip, port); endpoints.push_back(std::move(entry)); } - for (auto &ip : ips) { + // MagicDNS short name last: friendlier to type, but only reachable via + // tailscale_quack_forward — IsTailnetHost matches *.ts.net FQDNs and 100.x IPs, not bare + // short names. + if (!hostname.empty()) { QuackDiscoveryEndpoint entry; - entry.host = ip; + entry.host = hostname; entry.port = port; - entry.via = "tailnet_ip"; - entry.listen_uri = FormatQuackURI(ip, port); + entry.via = "magicdns"; + entry.listen_uri = FormatQuackURI(hostname, port); endpoints.push_back(std::move(entry)); } return endpoints; diff --git a/src/tailscale_http.cpp b/src/tailscale_http.cpp new file mode 100644 index 0000000..3048329 --- /dev/null +++ b/src/tailscale_http.cpp @@ -0,0 +1,549 @@ +#include "tailscale_http.hpp" + +#include "tailscale_bridge.hpp" + +#include "duckdb/common/exception.hpp" +#include "duckdb/common/helper.hpp" +#include "duckdb/common/string_util.hpp" +#include "duckdb/main/database.hpp" +#include "duckdb/main/extension_helper.hpp" + +#include +#include + +#ifndef _WIN32 +#include +#include +#include +#endif + +namespace duckdb { + +//===--------------------------------------------------------------------===// +// Host parsing / tailnet detection +//===--------------------------------------------------------------------===// + +// Strip scheme, path and (optionally) port from a proto_host_port string. +// IPv4 / hostname only — IPv6 literals are not handled yet. +static void ParseProtoHostPort(const string &proto_host_port, string &host_out, string &port_out) { + string h = proto_host_port; + auto scheme = h.find("://"); + if (scheme != string::npos) { + h = h.substr(scheme + 3); + } + auto slash = h.find('/'); + if (slash != string::npos) { + h = h.substr(0, slash); + } + auto colon = h.rfind(':'); + if (colon != string::npos) { + port_out = h.substr(colon + 1); + host_out = h.substr(0, colon); + } else { + host_out = h; + port_out = "80"; + } +} + +bool IsTailnetHost(const string &proto_host_port) { + // Only intercept plaintext HTTP (or scheme-less) URLs. We speak plaintext over WireGuard and + // never negotiate TLS, so https:// URLs — even to a tailnet host — are left to the delegate + // util (httpfs) which has real TLS, rather than being mis-sent as cleartext. + if (StringUtil::StartsWith(StringUtil::Lower(proto_host_port), "https://")) { + return false; + } + string host, port; + ParseProtoHostPort(proto_host_port, host, port); + if (host.empty()) { + return false; + } + // MagicDNS FQDNs. + if (StringUtil::EndsWith(StringUtil::Lower(host), ".ts.net")) { + return true; + } + // IPv4 in the CGNAT range 100.64.0.0/10 (100.64.x.x .. 100.127.x.x). + auto parts = StringUtil::Split(host, "."); + if (parts.size() != 4) { + return false; + } + for (auto &p : parts) { + if (p.empty()) { + return false; + } + for (char c : p) { + if (c < '0' || c > '9') { + return false; + } + } + } + auto octet0 = std::atoi(parts[0].c_str()); + auto octet1 = std::atoi(parts[1].c_str()); + return octet0 == 100 && octet1 >= 64 && octet1 <= 127; +} + +//===--------------------------------------------------------------------===// +// Plaintext HTTP/1.1 over a tsnet-dialed fd, with keep-alive +//===--------------------------------------------------------------------===// + +namespace { + +idx_t ParsePort(const string &port) { + auto value = std::atoi(port.c_str()); + if (value <= 0 || value > 65535) { + return 80; + } + return static_cast(value); +} + +// Strict decimal length: true only if `s` is non-empty, all ASCII digits, and in range. Rejects +// "123abc", "+5", "-1" (which would otherwise wrap huge via stoull and drive a runaway read). +bool ParseContentLength(const string &s, size_t &out) { + if (s.empty()) { + return false; + } + for (char c : s) { + if (c < '0' || c > '9') { + return false; + } + } + try { + out = static_cast(std::stoull(s)); + } catch (...) { + return false; + } + return true; +} + +// Strict hex chunk size: hex digits only, no "0x" prefix or sign (extensions are stripped before +// this by the caller). +bool ParseChunkSize(const string &s, size_t &out) { + if (s.empty()) { + return false; + } + for (char c : s) { + bool hex = (c >= '0' && c <= '9') || (c >= 'a' && c <= 'f') || (c >= 'A' && c <= 'F'); + if (!hex) { + return false; + } + } + try { + out = static_cast(std::stoull(s, nullptr, 16)); + } catch (...) { + return false; + } + return true; +} + +#ifndef _WIN32 + +bool WriteAll(int fd, const char *data, size_t len) { + size_t off = 0; + while (off < len) { + ssize_t w = write(fd, data + off, len - off); + if (w <= 0) { + return false; + } + off += static_cast(w); + } + return true; +} + +#endif + +// Build the request line + headers. Skips any caller-supplied +// Host/Content-Length/Connection/Transfer-Encoding so we own framing. We keep the +// connection alive (HTTP/1.1 default) and frame the response ourselves. +string BuildRequestHead(const string &method, const string &path, const string &host, const string &port, + const HTTPHeaders &headers, idx_t content_length, bool has_body) { + string req = method + " " + (path.empty() ? "/" : path) + " HTTP/1.1\r\n"; + req += "Host: " + host + ":" + port + "\r\n"; + for (auto &h : headers) { + auto key = StringUtil::Lower(h.first); + if (key == "host" || key == "content-length" || key == "connection" || key == "transfer-encoding") { + continue; + } + req += h.first + ": " + h.second + "\r\n"; + } + if (has_body) { + req += "Content-Length: " + std::to_string(content_length) + "\r\n"; + } + req += "Connection: keep-alive\r\n\r\n"; + return req; +} + +// Parse the status line + header block. Returns the HTTP minor version (0 or 1) so the +// caller can apply the right keep-alive default. +int ParseHead(const string &head, int &status_out, string &reason_out, HTTPHeaders &headers_out) { + int http_minor = 1; + auto lines = StringUtil::Split(head, "\r\n"); + if (lines.empty()) { + status_out = 0; + return http_minor; + } + // Status line: "HTTP/1.1 200 OK" + auto &status_line = lines[0]; + if (StringUtil::StartsWith(status_line, "HTTP/1.0")) { + http_minor = 0; + } + auto sp1 = status_line.find(' '); + if (sp1 != string::npos) { + auto sp2 = status_line.find(' ', sp1 + 1); + auto code = status_line.substr(sp1 + 1, sp2 == string::npos ? string::npos : sp2 - sp1 - 1); + status_out = std::atoi(code.c_str()); + if (sp2 != string::npos) { + reason_out = status_line.substr(sp2 + 1); + } + } + for (idx_t i = 1; i < lines.size(); i++) { + auto colon = lines[i].find(':'); + if (colon == string::npos) { + continue; + } + auto key = lines[i].substr(0, colon); + auto value = lines[i].substr(colon + 1); + StringUtil::Trim(key); + StringUtil::Trim(value); + headers_out.Insert(key, value); + } + return http_minor; +} + +// Lower-cased header value, or "" if the header is absent (never touches a missing key). +string HeaderValueLower(const HTTPHeaders &headers, const string &key) { + if (!headers.HasHeader(key)) { + return string(); + } + return StringUtil::Lower(headers.GetHeaderValue(key)); +} + +unique_ptr ToHTTPResponse(TailscaleHTTPClient::ParsedResponse &parsed) { + auto response = make_uniq(HTTPUtil::ToStatusCode(parsed.status)); + response->body = std::move(parsed.body); + response->reason = std::move(parsed.reason); + response->headers = std::move(parsed.headers); + // Reflect the status in success/request_error: HTTPResponse::success defaults true, so without + // this a 4xx/5xx — or an unparseable status line (status 0) — would look like a successful + // fetch to a caller using Success()/HasRequestError(). + response->success = parsed.status >= 200 && parsed.status < 400; + if (!response->success) { + response->request_error = StringUtil::Format("tailscale HTTP status %d %s", parsed.status, parsed.reason); + } + return response; +} + +} // namespace + +TailscaleHTTPClient::TailscaleHTTPClient(const string &proto_host_port) : HTTPClient(proto_host_port) { + ParseProtoHostPort(proto_host_port, host, port); +} + +TailscaleHTTPClient::~TailscaleHTTPClient() { + CloseConn(); +} + +void TailscaleHTTPClient::Initialize(HTTPParams &http_params) { + if (http_params.timeout > 0) { + timeout_seconds = http_params.timeout; + } +} + +void TailscaleHTTPClient::CloseConn() { +#ifndef _WIN32 + if (fd >= 0) { + close(fd); + } +#endif + fd = -1; + rx.clear(); +} + +bool TailscaleHTTPClient::ReadMore() { +#ifndef _WIN32 + char buf[16384]; + ssize_t n = read(fd, buf, sizeof(buf)); + if (n == 0) { + return false; // clean EOF (peer closed) + } + if (n < 0) { + read_error = true; // socket error / timeout — NOT a clean end of message + return false; + } + rx.append(buf, static_cast(n)); + return true; +#else + read_error = true; + return false; +#endif +} + +bool TailscaleHTTPClient::ReadChunkedBody(string &body) { + for (;;) { + // Chunk-size line. + size_t eol; + while ((eol = rx.find("\r\n")) == string::npos) { + if (!ReadMore()) { + return false; + } + } + auto size_line = rx.substr(0, eol); + rx.erase(0, eol + 2); + auto semi = size_line.find(';'); // strip chunk extensions + if (semi != string::npos) { + size_line = size_line.substr(0, semi); + } + StringUtil::Trim(size_line); + size_t chunk_size = 0; + if (!ParseChunkSize(size_line, chunk_size)) { + return false; // malformed chunk size — framing is unrecoverable + } + + if (chunk_size == 0) { + // Trailer headers (if any), then a blank line terminates the message. + for (;;) { + size_t tend; + while ((tend = rx.find("\r\n")) == string::npos) { + if (!ReadMore()) { + return false; + } + } + bool blank = (tend == 0); + rx.erase(0, tend + 2); + if (blank) { + return true; + } + } + } + + while (rx.size() < chunk_size + 2) { // +2 for the chunk's trailing CRLF + if (!ReadMore()) { + return false; + } + } + body.append(rx, 0, chunk_size); + rx.erase(0, chunk_size + 2); + } +} + +bool TailscaleHTTPClient::ReadResponse(const string &method, ParsedResponse &out) { + read_error = false; + // 1. Header block. + size_t header_end; + while ((header_end = rx.find("\r\n\r\n")) == string::npos) { + if (!ReadMore()) { + return false; + } + } + auto head = rx.substr(0, header_end); + rx.erase(0, header_end + 4); // rx now begins at the body + int http_minor = ParseHead(head, out.status, out.reason, out.headers); + + // Keep-alive default: HTTP/1.1 reuses unless told to close; HTTP/1.0 closes unless told to keep. + auto connection = HeaderValueLower(out.headers, "Connection"); + if (http_minor == 0) { + out.connection_close = (connection != "keep-alive"); + } else { + out.connection_close = (connection == "close"); + } + + // 2. Body framing. These statuses (and HEAD) never carry a body. + bool no_body = method == "HEAD" || out.status == 204 || out.status == 304 || + (out.status >= 100 && out.status < 200); + if (no_body) { + return true; + } + + if (HeaderValueLower(out.headers, "Transfer-Encoding").find("chunked") != string::npos) { + return ReadChunkedBody(out.body); + } + if (out.headers.HasHeader("Content-Length")) { + auto raw = out.headers.GetHeaderValue("Content-Length"); + StringUtil::Trim(raw); + size_t len = 0; + if (!ParseContentLength(raw, len)) { + out.connection_close = true; // malformed length — connection framing is now unknown + return false; + } + while (rx.size() < len) { + if (!ReadMore()) { + return false; // short read: surfaced as a request failure, never a truncated body + } + } + out.body = rx.substr(0, len); + rx.erase(0, len); + // On a kept-alive connection there must be nothing after the body until our next request; + // leftover bytes mean the stream is desynced, so don't reuse this connection. + if (!out.connection_close && !rx.empty()) { + out.connection_close = true; + } + return true; + } + + // No length signal: the body runs until EOF, so the connection cannot be reused. + out.connection_close = true; + while (ReadMore()) { + } + if (read_error) { + return false; // a socket error/timeout mid-body is not a clean EOF — don't accept a truncated body + } + out.body = std::move(rx); + rx.clear(); + return true; +} + +TailscaleHTTPClient::ParsedResponse TailscaleHTTPClient::RoundTrip(const string &method, const string &path, + const HTTPHeaders &headers, const_data_ptr_t body, + idx_t body_len, bool has_body) { +#ifndef _WIN32 + // Try at most twice: a connection reused from the keep-alive pool may have been closed by the + // peer while idle. A failure on a *fresh* connection is a real error. Only idempotent requests + // (no body — GET/HEAD/DELETE) are retried; resending a POST/PUT body could double-execute a + // write the server already processed before dropping the socket. + for (int attempt = 0; attempt < 2; attempt++) { + bool reused = (fd >= 0); + if (fd < 0) { + fd = TailscaleBridge::Get().DialTCP(host, ParsePort(port)); + rx.clear(); + // Best-effort read/write deadlines so a hung peer cannot block a query forever. + struct timeval tv; + tv.tv_sec = static_cast(timeout_seconds); + tv.tv_usec = 0; + setsockopt(fd, SOL_SOCKET, SO_RCVTIMEO, &tv, sizeof(tv)); + setsockopt(fd, SOL_SOCKET, SO_SNDTIMEO, &tv, sizeof(tv)); + } + + auto head = BuildRequestHead(method, path, host, port, headers, body_len, has_body); + bool ok = WriteAll(fd, head.data(), head.size()); + if (ok && has_body && body_len > 0) { + ok = WriteAll(fd, const_char_ptr_cast(body), body_len); + } + + ParsedResponse parsed; + if (ok) { + ok = ReadResponse(method, parsed); + } + if (!ok) { + CloseConn(); + if (reused && !has_body) { + continue; // stale idle keep-alive on an idempotent request: redial and retry once + } + throw IOException("tailscale HTTP: request to %s:%s failed", host, port); + } + + if (parsed.connection_close) { + CloseConn(); + } + return parsed; + } + throw IOException("tailscale HTTP: request to %s:%s failed after retry", host, port); +#else + (void)method; + (void)path; + (void)headers; + (void)body; + (void)body_len; + (void)has_body; + throw NotImplementedException("tailscale HTTP routing is not supported on Windows"); +#endif +} + +unique_ptr TailscaleHTTPClient::Get(GetRequestInfo &info) { + auto merged = BaseRequest::MergeHeaders(info.headers, info.params); + auto parsed = RoundTrip("GET", info.path, merged, nullptr, 0, false); + auto response = ToHTTPResponse(parsed); + + // Honor the streaming handlers httpfs uses for ranged/streamed reads. + bool keep_going = true; + if (info.response_handler) { + keep_going = info.response_handler(*response); + } + if (keep_going && info.content_handler && !response->body.empty()) { + info.content_handler(const_data_ptr_cast(response->body.data()), response->body.size()); + } + return response; +} + +unique_ptr TailscaleHTTPClient::Post(PostRequestInfo &info) { + auto merged = BaseRequest::MergeHeaders(info.headers, info.params); + auto parsed = RoundTrip("POST", info.path, merged, info.buffer_in, info.buffer_in_len, true); + info.buffer_out = parsed.body; + return ToHTTPResponse(parsed); +} + +unique_ptr TailscaleHTTPClient::Put(PutRequestInfo &info) { + auto merged = BaseRequest::MergeHeaders(info.headers, info.params); + // PutRequestInfo carries content_type out of band; surface it as a header unless the + // caller already supplied one. + if (!info.content_type.empty() && !merged.HasHeader("Content-Type")) { + merged.Insert("Content-Type", info.content_type); + } + auto parsed = RoundTrip("PUT", info.path, merged, info.buffer_in, info.buffer_in_len, true); + return ToHTTPResponse(parsed); +} + +unique_ptr TailscaleHTTPClient::Head(HeadRequestInfo &info) { + auto merged = BaseRequest::MergeHeaders(info.headers, info.params); + auto parsed = RoundTrip("HEAD", info.path, merged, nullptr, 0, false); + return ToHTTPResponse(parsed); +} + +unique_ptr TailscaleHTTPClient::Delete(DeleteRequestInfo &info) { + auto merged = BaseRequest::MergeHeaders(info.headers, info.params); + auto parsed = RoundTrip("DELETE", info.path, merged, nullptr, 0, false); + return ToHTTPResponse(parsed); +} + +//===--------------------------------------------------------------------===// +// TailscaleHTTPUtil — routing + keep-alive client pool +//===--------------------------------------------------------------------===// + +unique_ptr TailscaleHTTPUtil::InitializeClient(HTTPParams &http_params, const string &proto_host_port) { + if (!IsTailnetHost(proto_host_port)) { + return prev.InitializeClient(http_params, proto_host_port); + } + // Reuse an idle keep-alive client for this host if we have one. + { + std::lock_guard guard(pool_mutex); + auto entry = idle_clients.find(proto_host_port); + if (entry != idle_clients.end() && !entry->second.empty()) { + auto client = std::move(entry->second.back()); + entry->second.pop_back(); + client->Initialize(http_params); + return client; + } + } + auto client = make_uniq(proto_host_port); + client->Initialize(http_params); + return std::move(client); +} + +void TailscaleHTTPUtil::CloseClient(unique_ptr &&client) { + if (!client || !IsTailnetHost(client->GetBaseUrl())) { + // Non-tailnet: let httpfs keep caching its keep-alive clients. + prev.CloseClient(std::move(client)); + return; + } + // Tailnet client: return it to the idle pool (keeping its live fd) up to the cap; + // drop the overflow. IsTailnetHost(base_url) == true ⇒ we created a TailscaleHTTPClient. + std::lock_guard guard(pool_mutex); + auto &bucket = idle_clients[client->GetBaseUrl()]; + if (bucket.size() < MAX_IDLE_PER_HOST) { + bucket.push_back(std::move(client)); + } + // else: unique_ptr drops here, closing the connection in ~TailscaleHTTPClient. +} + +void RegisterTailscaleHTTPUtil(DatabaseInstance &db) { + // Make sure httpfs is the global util first, so non-tailnet traffic has a real + // implementation (TLS / proxies / secrets) to delegate to. Best-effort: if httpfs is + // unavailable we simply wrap whatever util is currently registered. + ExtensionHelper::TryAutoLoadExtension(db, "httpfs"); + + auto ¤t = db.config.GetHTTPUtil(); + if (current.GetName() == "Tailscale") { + return; // already wrapped + } + auto util = make_shared_ptr(current); + db.config.SetHTTPUtil(util); +} + +} // namespace duckdb diff --git a/test/sql/quackscale.test b/test/sql/quackscale.test index 5d1e775..25d5b5e 100644 --- a/test/sql/quackscale.test +++ b/test/sql/quackscale.test @@ -56,3 +56,17 @@ statement error CALL attach_ducklake('quack:127.0.0.1:19494'); ---- attach_ducklake requires LOAD quack + +# http_route (transparent HTTPUtil routing, default on) is a recognized parameter on +# tailscale_up/tailscale_login. An invalid parameter error lists the valid ones, so the +# presence of "http_route" in the message proves it is registered. Live routing behaviour +# (ATTACH 'quack:100.x:...' with no forwarder, keep-alive) is covered by the e2e suite. +statement error +CALL tailscale_up(not_a_real_param => true); +---- +http_route + +statement error +CALL tailscale_login(not_a_real_param => true); +---- +http_route