Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,4 @@
.wrangler
node_modules
docs/plans
.worktrees
4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ path = "tests/routing.rs"
name = "pagination"
path = "tests/pagination.rs"

[[test]]
name = "redirect"
path = "tests/redirect.rs"

[dependencies]
# Multistore
multistore = { version = "0.3.0", features = ["azure"] }
Expand Down
168 changes: 168 additions & 0 deletions src/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
//! Each public function caches one API call type with its own TTL.
//! Adjust the `*_CACHE_SECS` constants to tune per-datatype expiry.

use crate::redirect::RedirectInfo;
use crate::registry::{DataConnection, SourceProduct, SourceProductList};
use multistore::error::ProxyError;

Expand All @@ -18,6 +19,12 @@ const DATA_CONNECTIONS_CACHE_SECS: u32 = 1800; // 30 minutes
/// Product list for an account (`/api/v1/products/{account}`).
const PRODUCT_LIST_CACHE_SECS: u32 = 60; // 1 minute

/// Result from the Source API: either the expected data or a redirect.
pub enum ApiResponse<T> {
Ok(T),
Redirect(RedirectInfo),
}

// ── Public cache functions ─────────────────────────────────────────

/// Fetch a single product's metadata, cached for `PRODUCT_CACHE_SECS`.
Expand Down Expand Up @@ -57,6 +64,7 @@ pub async fn get_or_fetch_data_connections(
}

/// Fetch an account's product list, cached for `PRODUCT_LIST_CACHE_SECS`.
#[allow(dead_code)]
pub async fn get_or_fetch_product_list(
api_base_url: &str,
account: &str,
Expand All @@ -74,6 +82,43 @@ pub async fn get_or_fetch_product_list(
.await
}

/// Fetch a single product's metadata, returning redirect info if the account was renamed.
pub async fn get_or_fetch_product_or_redirect(
api_base_url: &str,
account: &str,
product: &str,
api_secret: Option<&str>,
request_id: &str,
) -> Result<ApiResponse<SourceProduct>, ProxyError> {
let api_url = format!("{}/api/v1/products/{}/{}", api_base_url, account, product);
cached_fetch_with_redirect(
&api_url,
&api_url,
PRODUCT_CACHE_SECS,
api_secret,
request_id,
)
.await
}

/// Fetch an account's product list, returning redirect info if the account was renamed.
pub async fn get_or_fetch_product_list_or_redirect(
api_base_url: &str,
account: &str,
api_secret: Option<&str>,
request_id: &str,
) -> Result<ApiResponse<SourceProductList>, ProxyError> {
let api_url = format!("{}/api/v1/products/{}", api_base_url, account);
cached_fetch_with_redirect(
&api_url,
&api_url,
PRODUCT_LIST_CACHE_SECS,
api_secret,
request_id,
)
.await
}

// ── Internal helper ────────────────────────────────────────────────

/// Generic cache-or-fetch: check the Cache API, return cached JSON on hit,
Expand Down Expand Up @@ -168,3 +213,126 @@ async fn cached_fetch<T: serde::de::DeserializeOwned>(

Ok(result)
}

/// Like `cached_fetch`, but returns `ApiResponse::Redirect` on HTTP 301
/// instead of treating it as an error.
async fn cached_fetch_with_redirect<T: serde::de::DeserializeOwned>(
cache_key: &str,
api_url: &str,
ttl_secs: u32,
api_secret: Option<&str>,
request_id: &str,
) -> Result<ApiResponse<T>, ProxyError> {
let span = tracing::info_span!(
"cached_fetch",
cache_key = %cache_key,
cache_hit = tracing::field::Empty,
api_status = tracing::field::Empty,
);
let _guard = span.enter();

let cache = worker::Cache::default();

// ── Cache hit ──────────────────────────────────────────────
if let Some(mut cached_resp) = cache
.get(cache_key, false)
.await
.map_err(|e| ProxyError::Internal(format!("cache get failed: {}", e)))?
{
span.record("cache_hit", true);
let text = cached_resp
.text()
.await
.map_err(|e| ProxyError::Internal(format!("cache body read failed: {}", e)))?;

// Try to parse as redirect first, then as the expected type.
if let Ok(redirect) = serde_json::from_str::<RedirectInfo>(&text) {
if !redirect.redirect_to.is_empty() {
return Ok(ApiResponse::Redirect(redirect));
}
}
let result: T = serde_json::from_str(&text)
.map_err(|e| ProxyError::Internal(format!("cache JSON parse failed: {}", e)))?;
return Ok(ApiResponse::Ok(result));
}

// ── Cache miss — fetch from API ────────────────────────────
span.record("cache_hit", false);
let init = web_sys::RequestInit::new();
init.set_method("GET");
let req_headers = web_sys::Headers::new()
.map_err(|e| ProxyError::Internal(format!("headers build failed: {:?}", e)))?;
if let Some(secret) = api_secret {
req_headers
.set("Authorization", secret)
.map_err(|e| ProxyError::Internal(format!("header set failed: {:?}", e)))?;
}
if !request_id.is_empty() {
let _ = req_headers.set("x-request-id", request_id);
}
init.set_headers(&req_headers);
let req = web_sys::Request::new_with_str_and_init(api_url, &init)
.map_err(|e| ProxyError::Internal(format!("request build failed: {:?}", e)))?;
let worker_req: worker::Request = req.into();
let mut resp = worker::Fetch::Request(worker_req)
.send()
.await
.map_err(|e| ProxyError::Internal(format!("api fetch failed: {}", e)))?;

let status = resp.status_code();
span.record("api_status", status);

// ── Handle redirect ────────────────────────────────────────
if status == 301 {
let text = resp
.text()
.await
.map_err(|e| ProxyError::Internal(format!("body read failed: {}", e)))?;
let redirect: RedirectInfo = serde_json::from_str(&text)
.map_err(|e| ProxyError::Internal(format!("redirect JSON parse failed: {}", e)))?;

// Cache the redirect response
let headers = worker::Headers::new();
let _ = headers.set("content-type", "application/json");
let _ = headers.set("cache-control", &format!("max-age={}", ttl_secs));
if let Ok(cache_resp) = worker::Response::ok(&text) {
let cache_resp = cache_resp.with_headers(headers);
if let Err(e) = cache.put(cache_key, cache_resp).await {
tracing::warn!("cache put failed: {}", e);
}
}

return Ok(ApiResponse::Redirect(redirect));
}

if status == 404 {
return Err(ProxyError::BucketNotFound("not found".into()));
}
if status != 200 {
return Err(ProxyError::Internal(format!(
"API returned {} for {}",
status, api_url
)));
}

let text = resp
.text()
.await
.map_err(|e| ProxyError::Internal(format!("body read failed: {}", e)))?;

let result: T = serde_json::from_str(&text)
.map_err(|e| ProxyError::Internal(format!("JSON parse failed: {} for {}", e, api_url)))?;

// ── Store in cache ─────────────────────────────────────────
let headers = worker::Headers::new();
let _ = headers.set("content-type", "application/json");
let _ = headers.set("cache-control", &format!("max-age={}", ttl_secs));
if let Ok(cache_resp) = worker::Response::ok(&text) {
let cache_resp = cache_resp.with_headers(headers);
if let Err(e) = cache.put(cache_key, cache_resp).await {
tracing::warn!("cache put failed: {}", e);
}
}

Ok(ApiResponse::Ok(result))
}
26 changes: 22 additions & 4 deletions src/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@
//! custom endpoints are checked before the S3 proxy pipeline runs.

use multistore::api::list::parse_list_query_params;
use multistore::api::response::{ListBucketResult, ListCommonPrefix};
use multistore::api::response::{ErrorResponse, ListBucketResult, ListCommonPrefix};
use multistore::route_handler::{ProxyResult, RequestInfo, RouteHandler, RouteHandlerFuture};

use crate::cache::ApiResponse;
use crate::pagination::paginate_prefixes;
use crate::redirect::{build_redirect_path, permanent_redirect_xml};
use crate::registry::SourceCoopRegistry;

const VERSION: &str = env!("CARGO_PKG_VERSION");
Expand Down Expand Up @@ -77,8 +79,24 @@ impl RouteHandler for AccountListHandler {

// List products for this account
let account = bucket;
match self.registry.list_products(account).await {
Ok(products) => {
match self.registry.list_products_or_redirect(account).await {
Ok(ApiResponse::Redirect(info)) => {
let original_path = format!("/{}", account);
let location =
build_redirect_path(&original_path, req.query, &info.redirect_to);
let xml = permanent_redirect_xml(&info.redirect_to, &self.registry.request_id);
let mut result = ProxyResult::xml(301, xml);
if let Ok(value) = location.parse() {
result.headers.insert(http::header::LOCATION, value);
}
Some(result)
}
Ok(ApiResponse::Ok(product_list)) => {
let products: Vec<String> = product_list
.products
.into_iter()
.map(|p| p.product_id)
.collect();
let params = parse_list_query_params(req.query);
let all_prefixes: Vec<String> =
products.into_iter().map(|p| format!("{p}/")).collect();
Expand Down Expand Up @@ -110,7 +128,7 @@ impl RouteHandler for AccountListHandler {
}
Err(e) => {
tracing::error!("AccountList({}) error: {:?}", account, e);
let err = multistore::api::response::ErrorResponse {
let err = ErrorResponse {
code: "BadGateway".to_string(),
message: "Failed to list products from upstream API".to_string(),
resource: String::new(),
Expand Down
32 changes: 31 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ mod analytics;
mod cache;
mod handlers;
mod pagination;
mod redirect;
mod registry;

use analytics::{extract_path_segments, log_request, RequestEvent};
Expand All @@ -15,6 +16,7 @@ use multistore_cf_workers::{
JsBody, NoopCredentialRegistry, WorkerBackend, WorkerSubscriber,
};
use multistore_path_mapping::{MappedRegistry, PathMapping};
use redirect::{build_redirect_path, extract_redirect_segments, permanent_redirect_xml};
use registry::SourceCoopRegistry;
use worker::{event, Context, Env, Result};

Expand Down Expand Up @@ -73,13 +75,41 @@ async fn fetch(req: web_sys::Request, env: Env, ctx: Context) -> Result<web_sys:
};
let (rewritten_path, rewritten_query) = mapping.rewrite_request(&path, query.as_deref());

// ── Build gateway with route handlers ──────────────────────────
// ── Build registry (shared by redirect check + gateway) ───────
let registry = SourceCoopRegistry::new(
config.api_base_url.clone(),
config.api_secret.clone(),
request_id.clone(),
);

// ── Short-circuit: account rename redirect ──────────────────
// Only check product-level requests here. Account-only requests
// (e.g. /{account}?list-type=2) are handled by AccountListHandler
// to avoid a redundant API call.
{
let (account, product) = extract_redirect_segments(&path);
if let (Some(account), Some(product)) = (account, product) {
if let Ok(cache::ApiResponse::Redirect(info)) =
registry.get_product_or_redirect(account, product).await
{
tracing::info!(
old_account = account,
new_account = %info.redirect_to,
"account redirect"
);
let location = build_redirect_path(&path, query.as_deref(), &info.redirect_to);
let xml = permanent_redirect_xml(&info.redirect_to, &request_id);
let resp = xml_response(301, &xml);
let _ = resp.headers().set("location", &location);

let (acct, prod, key) = extract_path_segments(&path);
log_analytics(&env, &headers, &resp, &method, acct, prod, key);

return Ok(add_cors(resp));
}
}
}

let gateway = ProxyGateway::new(
WorkerBackend,
MappedRegistry::new(registry.clone(), mapping.clone()),
Expand Down
59 changes: 59 additions & 0 deletions src/redirect.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
//! S3-compliant redirect support for renamed accounts.

use serde::{Deserialize, Serialize};

/// Redirect info returned by the Source API when an account has been renamed.
#[derive(Debug, Clone, Deserialize, Serialize)]
#[serde(deny_unknown_fields)]
pub struct RedirectInfo {
pub redirect_to: String,
}

/// Build a redirect path by replacing the first path segment (account) with `new_account`.
///
/// Preserves the product, key, and query string.
/// Input `path` is the original decoded path (e.g. `/old-account/product/key`).
pub fn build_redirect_path(path: &str, query: Option<&str>, new_account: &str) -> String {
let trimmed = path.strip_prefix('/').unwrap_or(path);
let rest = match trimmed.find('/') {
Some(idx) => &trimmed[idx..],
None => "",
};

let new_path = format!("/{}{}", new_account, rest);

match query {
Some(q) if !q.is_empty() => format!("{}?{}", new_path, q),
_ => new_path,
}
}

/// Generate an S3 PermanentRedirect XML error body.
pub fn permanent_redirect_xml(new_account: &str, request_id: &str) -> String {
format!(
r#"<?xml version="1.0" encoding="UTF-8"?>
<Error>
<Code>PermanentRedirect</Code>
<Message>The account you are attempting to access must be addressed using the specified endpoint.</Message>
<Endpoint>data.source.coop</Endpoint>
<Bucket>{}</Bucket>
<RequestId>{}</RequestId>
</Error>"#,
new_account, request_id
)
}

/// Extract account and optionally product from the raw request path.
///
/// Returns `(Some(account), Some(product))` for `/{account}/{product}[/...]`
/// and `(Some(account), None)` for `/{account}[/]`.
pub fn extract_redirect_segments(path: &str) -> (Option<&str>, Option<&str>) {
let trimmed = path.trim_start_matches('/');
if trimmed.is_empty() {
return (None, None);
}
let mut parts = trimmed.splitn(3, '/');
let account = parts.next().filter(|s| !s.is_empty());
let product = parts.next().filter(|s| !s.is_empty());
(account, product)
}
Loading
Loading