diff --git a/.gitignore b/.gitignore index 6825ac5..3b7350c 100644 --- a/.gitignore +++ b/.gitignore @@ -4,3 +4,4 @@ .wrangler node_modules docs/plans +.worktrees diff --git a/Cargo.toml b/Cargo.toml index 7ba50a5..434e9cb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,6 +18,10 @@ path = "tests/routing.rs" name = "pagination" path = "tests/pagination.rs" +[[test]] +name = "redirect" +path = "tests/redirect.rs" + [dependencies] # Multistore multistore = { version = "0.3.0", features = ["azure"] } diff --git a/src/cache.rs b/src/cache.rs index 7e05e6c..74afc33 100644 --- a/src/cache.rs +++ b/src/cache.rs @@ -3,6 +3,7 @@ //! Each public function caches one API call type with its own TTL. //! Adjust the `*_CACHE_SECS` constants to tune per-datatype expiry. +use crate::redirect::RedirectInfo; use crate::registry::{DataConnection, SourceProduct, SourceProductList}; use multistore::error::ProxyError; @@ -18,6 +19,12 @@ const DATA_CONNECTIONS_CACHE_SECS: u32 = 1800; // 30 minutes /// Product list for an account (`/api/v1/products/{account}`). const PRODUCT_LIST_CACHE_SECS: u32 = 60; // 1 minute +/// Result from the Source API: either the expected data or a redirect. +pub enum ApiResponse { + Ok(T), + Redirect(RedirectInfo), +} + // ── Public cache functions ───────────────────────────────────────── /// Fetch a single product's metadata, cached for `PRODUCT_CACHE_SECS`. @@ -57,6 +64,7 @@ pub async fn get_or_fetch_data_connections( } /// Fetch an account's product list, cached for `PRODUCT_LIST_CACHE_SECS`. +#[allow(dead_code)] pub async fn get_or_fetch_product_list( api_base_url: &str, account: &str, @@ -74,6 +82,43 @@ pub async fn get_or_fetch_product_list( .await } +/// Fetch a single product's metadata, returning redirect info if the account was renamed. +pub async fn get_or_fetch_product_or_redirect( + api_base_url: &str, + account: &str, + product: &str, + api_secret: Option<&str>, + request_id: &str, +) -> Result, ProxyError> { + let api_url = format!("{}/api/v1/products/{}/{}", api_base_url, account, product); + cached_fetch_with_redirect( + &api_url, + &api_url, + PRODUCT_CACHE_SECS, + api_secret, + request_id, + ) + .await +} + +/// Fetch an account's product list, returning redirect info if the account was renamed. +pub async fn get_or_fetch_product_list_or_redirect( + api_base_url: &str, + account: &str, + api_secret: Option<&str>, + request_id: &str, +) -> Result, ProxyError> { + let api_url = format!("{}/api/v1/products/{}", api_base_url, account); + cached_fetch_with_redirect( + &api_url, + &api_url, + PRODUCT_LIST_CACHE_SECS, + api_secret, + request_id, + ) + .await +} + // ── Internal helper ──────────────────────────────────────────────── /// Generic cache-or-fetch: check the Cache API, return cached JSON on hit, @@ -168,3 +213,126 @@ async fn cached_fetch( Ok(result) } + +/// Like `cached_fetch`, but returns `ApiResponse::Redirect` on HTTP 301 +/// instead of treating it as an error. +async fn cached_fetch_with_redirect( + cache_key: &str, + api_url: &str, + ttl_secs: u32, + api_secret: Option<&str>, + request_id: &str, +) -> Result, ProxyError> { + let span = tracing::info_span!( + "cached_fetch", + cache_key = %cache_key, + cache_hit = tracing::field::Empty, + api_status = tracing::field::Empty, + ); + let _guard = span.enter(); + + let cache = worker::Cache::default(); + + // ── Cache hit ────────────────────────────────────────────── + if let Some(mut cached_resp) = cache + .get(cache_key, false) + .await + .map_err(|e| ProxyError::Internal(format!("cache get failed: {}", e)))? + { + span.record("cache_hit", true); + let text = cached_resp + .text() + .await + .map_err(|e| ProxyError::Internal(format!("cache body read failed: {}", e)))?; + + // Try to parse as redirect first, then as the expected type. + if let Ok(redirect) = serde_json::from_str::(&text) { + if !redirect.redirect_to.is_empty() { + return Ok(ApiResponse::Redirect(redirect)); + } + } + let result: T = serde_json::from_str(&text) + .map_err(|e| ProxyError::Internal(format!("cache JSON parse failed: {}", e)))?; + return Ok(ApiResponse::Ok(result)); + } + + // ── Cache miss — fetch from API ──────────────────────────── + span.record("cache_hit", false); + let init = web_sys::RequestInit::new(); + init.set_method("GET"); + let req_headers = web_sys::Headers::new() + .map_err(|e| ProxyError::Internal(format!("headers build failed: {:?}", e)))?; + if let Some(secret) = api_secret { + req_headers + .set("Authorization", secret) + .map_err(|e| ProxyError::Internal(format!("header set failed: {:?}", e)))?; + } + if !request_id.is_empty() { + let _ = req_headers.set("x-request-id", request_id); + } + init.set_headers(&req_headers); + let req = web_sys::Request::new_with_str_and_init(api_url, &init) + .map_err(|e| ProxyError::Internal(format!("request build failed: {:?}", e)))?; + let worker_req: worker::Request = req.into(); + let mut resp = worker::Fetch::Request(worker_req) + .send() + .await + .map_err(|e| ProxyError::Internal(format!("api fetch failed: {}", e)))?; + + let status = resp.status_code(); + span.record("api_status", status); + + // ── Handle redirect ──────────────────────────────────────── + if status == 301 { + let text = resp + .text() + .await + .map_err(|e| ProxyError::Internal(format!("body read failed: {}", e)))?; + let redirect: RedirectInfo = serde_json::from_str(&text) + .map_err(|e| ProxyError::Internal(format!("redirect JSON parse failed: {}", e)))?; + + // Cache the redirect response + let headers = worker::Headers::new(); + let _ = headers.set("content-type", "application/json"); + let _ = headers.set("cache-control", &format!("max-age={}", ttl_secs)); + if let Ok(cache_resp) = worker::Response::ok(&text) { + let cache_resp = cache_resp.with_headers(headers); + if let Err(e) = cache.put(cache_key, cache_resp).await { + tracing::warn!("cache put failed: {}", e); + } + } + + return Ok(ApiResponse::Redirect(redirect)); + } + + if status == 404 { + return Err(ProxyError::BucketNotFound("not found".into())); + } + if status != 200 { + return Err(ProxyError::Internal(format!( + "API returned {} for {}", + status, api_url + ))); + } + + let text = resp + .text() + .await + .map_err(|e| ProxyError::Internal(format!("body read failed: {}", e)))?; + + let result: T = serde_json::from_str(&text) + .map_err(|e| ProxyError::Internal(format!("JSON parse failed: {} for {}", e, api_url)))?; + + // ── Store in cache ───────────────────────────────────────── + let headers = worker::Headers::new(); + let _ = headers.set("content-type", "application/json"); + let _ = headers.set("cache-control", &format!("max-age={}", ttl_secs)); + if let Ok(cache_resp) = worker::Response::ok(&text) { + let cache_resp = cache_resp.with_headers(headers); + if let Err(e) = cache.put(cache_key, cache_resp).await { + tracing::warn!("cache put failed: {}", e); + } + } + + Ok(ApiResponse::Ok(result)) +} diff --git a/src/handlers.rs b/src/handlers.rs index b5ba931..d075dba 100644 --- a/src/handlers.rs +++ b/src/handlers.rs @@ -4,10 +4,12 @@ //! custom endpoints are checked before the S3 proxy pipeline runs. use multistore::api::list::parse_list_query_params; -use multistore::api::response::{ListBucketResult, ListCommonPrefix}; +use multistore::api::response::{ErrorResponse, ListBucketResult, ListCommonPrefix}; use multistore::route_handler::{ProxyResult, RequestInfo, RouteHandler, RouteHandlerFuture}; +use crate::cache::ApiResponse; use crate::pagination::paginate_prefixes; +use crate::redirect::{build_redirect_path, permanent_redirect_xml}; use crate::registry::SourceCoopRegistry; const VERSION: &str = env!("CARGO_PKG_VERSION"); @@ -77,8 +79,24 @@ impl RouteHandler for AccountListHandler { // List products for this account let account = bucket; - match self.registry.list_products(account).await { - Ok(products) => { + match self.registry.list_products_or_redirect(account).await { + Ok(ApiResponse::Redirect(info)) => { + let original_path = format!("/{}", account); + let location = + build_redirect_path(&original_path, req.query, &info.redirect_to); + let xml = permanent_redirect_xml(&info.redirect_to, &self.registry.request_id); + let mut result = ProxyResult::xml(301, xml); + if let Ok(value) = location.parse() { + result.headers.insert(http::header::LOCATION, value); + } + Some(result) + } + Ok(ApiResponse::Ok(product_list)) => { + let products: Vec = product_list + .products + .into_iter() + .map(|p| p.product_id) + .collect(); let params = parse_list_query_params(req.query); let all_prefixes: Vec = products.into_iter().map(|p| format!("{p}/")).collect(); @@ -110,7 +128,7 @@ impl RouteHandler for AccountListHandler { } Err(e) => { tracing::error!("AccountList({}) error: {:?}", account, e); - let err = multistore::api::response::ErrorResponse { + let err = ErrorResponse { code: "BadGateway".to_string(), message: "Failed to list products from upstream API".to_string(), resource: String::new(), diff --git a/src/lib.rs b/src/lib.rs index d025812..7c24e08 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -2,6 +2,7 @@ mod analytics; mod cache; mod handlers; mod pagination; +mod redirect; mod registry; use analytics::{extract_path_segments, log_request, RequestEvent}; @@ -15,6 +16,7 @@ use multistore_cf_workers::{ JsBody, NoopCredentialRegistry, WorkerBackend, WorkerSubscriber, }; use multistore_path_mapping::{MappedRegistry, PathMapping}; +use redirect::{build_redirect_path, extract_redirect_segments, permanent_redirect_xml}; use registry::SourceCoopRegistry; use worker::{event, Context, Env, Result}; @@ -73,13 +75,41 @@ async fn fetch(req: web_sys::Request, env: Env, ctx: Context) -> Result, new_account: &str) -> String { + let trimmed = path.strip_prefix('/').unwrap_or(path); + let rest = match trimmed.find('/') { + Some(idx) => &trimmed[idx..], + None => "", + }; + + let new_path = format!("/{}{}", new_account, rest); + + match query { + Some(q) if !q.is_empty() => format!("{}?{}", new_path, q), + _ => new_path, + } +} + +/// Generate an S3 PermanentRedirect XML error body. +pub fn permanent_redirect_xml(new_account: &str, request_id: &str) -> String { + format!( + r#" + +PermanentRedirect +The account you are attempting to access must be addressed using the specified endpoint. +data.source.coop +{} +{} +"#, + new_account, request_id + ) +} + +/// Extract account and optionally product from the raw request path. +/// +/// Returns `(Some(account), Some(product))` for `/{account}/{product}[/...]` +/// and `(Some(account), None)` for `/{account}[/]`. +pub fn extract_redirect_segments(path: &str) -> (Option<&str>, Option<&str>) { + let trimmed = path.trim_start_matches('/'); + if trimmed.is_empty() { + return (None, None); + } + let mut parts = trimmed.splitn(3, '/'); + let account = parts.next().filter(|s| !s.is_empty()); + let product = parts.next().filter(|s| !s.is_empty()); + (account, product) +} diff --git a/src/registry.rs b/src/registry.rs index e99cb2c..aebdf2d 100644 --- a/src/registry.rs +++ b/src/registry.rs @@ -31,6 +31,7 @@ impl SourceCoopRegistry { } /// List products for an account via the Source API. + #[allow(dead_code)] pub async fn list_products(&self, account: &str) -> Result, ProxyError> { let product_list = crate::cache::get_or_fetch_product_list( &self.api_base_url, @@ -45,6 +46,36 @@ impl SourceCoopRegistry { .map(|p| p.product_id) .collect()) } + + /// Fetch product metadata, returning redirect info if the account was renamed. + pub async fn get_product_or_redirect( + &self, + account: &str, + product: &str, + ) -> Result, ProxyError> { + crate::cache::get_or_fetch_product_or_redirect( + &self.api_base_url, + account, + product, + self.api_secret.as_deref(), + &self.request_id, + ) + .await + } + + /// List products for an account, returning redirect info if the account was renamed. + pub async fn list_products_or_redirect( + &self, + account: &str, + ) -> Result, ProxyError> { + crate::cache::get_or_fetch_product_list_or_redirect( + &self.api_base_url, + account, + self.api_secret.as_deref(), + &self.request_id, + ) + .await + } } impl BucketRegistry for SourceCoopRegistry { diff --git a/tests/redirect.rs b/tests/redirect.rs new file mode 100644 index 0000000..3a1f1d0 --- /dev/null +++ b/tests/redirect.rs @@ -0,0 +1,131 @@ +#[path = "../src/redirect.rs"] +mod redirect; + +use redirect::{build_redirect_path, extract_redirect_segments, permanent_redirect_xml}; + +// ── build_redirect_path ────────────────────────────────────────── + +#[test] +fn redirect_object_request() { + let result = build_redirect_path( + "/old-account/some-product/file.parquet", + None, + "new-account", + ); + assert_eq!(result, "/new-account/some-product/file.parquet"); +} + +#[test] +fn redirect_object_request_with_query() { + let result = build_redirect_path( + "/old-account/some-product/file.parquet", + Some("versionId=123"), + "new-account", + ); + assert_eq!( + result, + "/new-account/some-product/file.parquet?versionId=123" + ); +} + +#[test] +fn redirect_product_list() { + let result = build_redirect_path( + "/old-account/some-product", + Some("list-type=2&prefix=subdir/"), + "new-account", + ); + assert_eq!( + result, + "/new-account/some-product?list-type=2&prefix=subdir/" + ); +} + +#[test] +fn redirect_account_list() { + let result = build_redirect_path("/old-account", Some("list-type=2"), "new-account"); + assert_eq!(result, "/new-account?list-type=2"); +} + +#[test] +fn redirect_account_list_no_query() { + let result = build_redirect_path("/old-account", None, "new-account"); + assert_eq!(result, "/new-account"); +} + +#[test] +fn redirect_nested_key() { + let result = build_redirect_path("/old-account/product/dir/sub/file.txt", None, "new-account"); + assert_eq!(result, "/new-account/product/dir/sub/file.txt"); +} + +#[test] +fn redirect_preserves_trailing_slash() { + let result = build_redirect_path("/old-account/", Some("list-type=2"), "new-account"); + assert_eq!(result, "/new-account/?list-type=2"); +} + +// ── permanent_redirect_xml ─────────────────────────────────────── + +#[test] +fn xml_contains_permanent_redirect_code() { + let xml = permanent_redirect_xml("new-account", "req-123"); + assert!(xml.contains("PermanentRedirect")); +} + +#[test] +fn xml_contains_bucket_name() { + let xml = permanent_redirect_xml("new-account", "req-123"); + assert!(xml.contains("new-account")); +} + +#[test] +fn xml_contains_request_id() { + let xml = permanent_redirect_xml("new-account", "req-123"); + assert!(xml.contains("req-123")); +} + +#[test] +fn xml_contains_endpoint() { + let xml = permanent_redirect_xml("new-account", "req-123"); + assert!(xml.contains("data.source.coop")); +} + +// ── extract_redirect_segments ──────────────────────────────────── + +#[test] +fn segments_empty_path() { + assert_eq!(extract_redirect_segments("/"), (None, None)); +} + +#[test] +fn segments_account_only() { + assert_eq!( + extract_redirect_segments("/cholmes"), + (Some("cholmes"), None) + ); +} + +#[test] +fn segments_account_and_product() { + assert_eq!( + extract_redirect_segments("/cholmes/admin-boundaries"), + (Some("cholmes"), Some("admin-boundaries")) + ); +} + +#[test] +fn segments_account_product_and_key() { + assert_eq!( + extract_redirect_segments("/cholmes/admin-boundaries/file.parquet"), + (Some("cholmes"), Some("admin-boundaries")) + ); +} + +#[test] +fn segments_account_trailing_slash() { + assert_eq!( + extract_redirect_segments("/cholmes/"), + (Some("cholmes"), None) + ); +}