From 5bb253344a5a7a33c67cd9f5a31e6e885d462f5c Mon Sep 17 00:00:00 2001 From: Luca Date: Wed, 3 Jun 2026 12:38:21 +0200 Subject: [PATCH 1/3] refactor: remove old outdated things and add proper size limits --- src/batch_queue/mod.rs | 90 +++--------------------------------------- src/handler/error.rs | 4 -- src/handler/mod.rs | 67 +++++++++++++++++++------------ src/handler/replay.rs | 3 -- src/handler/vitals.rs | 8 +--- src/main.rs | 15 ++++++- src/replay_storage.rs | 20 +++++++++- src/tinybird.rs | 16 -------- 8 files changed, 82 insertions(+), 141 deletions(-) diff --git a/src/batch_queue/mod.rs b/src/batch_queue/mod.rs index 1972fa2..2cd5942 100644 --- a/src/batch_queue/mod.rs +++ b/src/batch_queue/mod.rs @@ -5,7 +5,7 @@ use crate::error_tracking::sourcemaps::SourcemapResolver; use crate::error_tracking::v3::ErrorLanguage; use crate::polar::{PolarClient, UsageCounts}; use crate::tinybird::{ - ErrorOccurrenceV3Row, ModsEventRow, ReplayRow, TinybirdClient, WebEventRow, WebVitalRow, + ErrorOccurrenceV3Row, ModsEventRow, TinybirdClient, WebEventRow, WebVitalRow, }; use serde::{Deserialize, Serialize}; use std::collections::HashMap; @@ -90,11 +90,6 @@ pub enum QueuedEvent { #[serde(skip_serializing_if = "Option::is_none")] tracking: Option, }, - Replay { - row: ReplayRow, - #[serde(skip_serializing_if = "Option::is_none")] - tracking: Option, - }, } impl QueuedEvent { @@ -104,7 +99,6 @@ impl QueuedEvent { QueuedEvent::ModsEvent { .. } => "mods_events", QueuedEvent::ErrorOccurrenceV3 { .. } => "error_tracking_v3", QueuedEvent::WebVital { .. } => "web_vitals", - QueuedEvent::Replay { .. } => "session_replays", } } } @@ -117,7 +111,6 @@ struct InMemoryBatch { mods_events: Vec<(ModsEventRow, Option)>, error_occurrences_v3: Vec<(ErrorOccurrenceV3Row, ErrorLanguage, Option)>, web_vitals: Vec<(WebVitalRow, Option)>, - replays: Vec<(ReplayRow, Option)>, } impl Default for InMemoryBatch { @@ -127,7 +120,6 @@ impl Default for InMemoryBatch { mods_events: Vec::with_capacity(INITIAL_BATCH_CAPACITY), error_occurrences_v3: Vec::new(), web_vitals: Vec::with_capacity(INITIAL_BATCH_CAPACITY / 4), - replays: Vec::new(), } } } @@ -138,7 +130,6 @@ impl InMemoryBatch { && self.mods_events.is_empty() && self.error_occurrences_v3.is_empty() && self.web_vitals.is_empty() - && self.replays.is_empty() } fn total_count(&self) -> usize { @@ -146,7 +137,6 @@ impl InMemoryBatch { + self.mods_events.len() + self.error_occurrences_v3.len() + self.web_vitals.len() - + self.replays.len() } fn push(&mut self, event: QueuedEvent) { @@ -159,7 +149,6 @@ impl InMemoryBatch { tracking, } => self.error_occurrences_v3.push((*row, language, tracking)), QueuedEvent::WebVital { row, tracking } => self.web_vitals.push((row, tracking)), - QueuedEvent::Replay { row, tracking } => self.replays.push((row, tracking)), } } @@ -192,11 +181,6 @@ impl InMemoryBatch { .into_iter() .map(|(row, tracking)| QueuedEvent::WebVital { row, tracking }), ); - result.extend( - self.replays - .into_iter() - .map(|(row, tracking)| QueuedEvent::Replay { row, tracking }), - ); result } @@ -204,8 +188,7 @@ impl InMemoryBatch { let estimated_owners = (self.web_events.len() + self.mods_events.len() + self.error_occurrences_v3.len() - + self.web_vitals.len() - + self.replays.len()) + + self.web_vitals.len()) .min(100); let mut usage: AggregatedUsage = HashMap::with_capacity(estimated_owners); @@ -244,23 +227,6 @@ impl InMemoryBatch { } } count_usage!(&self.web_vitals, web_vitals); - for (row, ctx) in &self.replays { - if let Some(ctx) = ctx { - let entry = usage - .entry(Arc::clone(&ctx.owner_id)) - .or_insert_with(|| OwnerUsage { - counts: UsageCounts::default(), - token: Arc::clone(&ctx.token), - org: ctx.organization_id.as_ref().map(Arc::clone), - }); - entry.counts.session_replays += 1; - entry - .counts - .session_replay_ids - .insert(row.session_id.clone()); - } - } - usage } } @@ -272,7 +238,6 @@ struct BatchSendResult { failed_error_occurrences_v3: Vec<(ErrorOccurrenceV3Row, ErrorLanguage, Option)>, failed_web_vitals: Vec<(WebVitalRow, Option)>, - failed_replays: Vec<(ReplayRow, Option)>, had_permanent_failure: bool, errors: Vec, } @@ -283,7 +248,6 @@ impl BatchSendResult { || !self.failed_mods_events.is_empty() || !self.failed_error_occurrences_v3.is_empty() || !self.failed_web_vitals.is_empty() - || !self.failed_replays.is_empty() } fn into_in_memory_batch(self) -> InMemoryBatch { @@ -292,7 +256,6 @@ impl BatchSendResult { mods_events: self.failed_mods_events, error_occurrences_v3: self.failed_error_occurrences_v3, web_vitals: self.failed_web_vitals, - replays: self.failed_replays, } } @@ -301,7 +264,6 @@ impl BatchSendResult { + self.failed_mods_events.len() + self.failed_error_occurrences_v3.len() + self.failed_web_vitals.len() - + self.failed_replays.len() } fn error_summary(&self) -> String { @@ -391,8 +353,8 @@ impl BatchQueue { pub async fn queue_event( &self, event: QueuedEvent, - ) -> Result<(), mpsc::error::SendError> { - self.sender.send(event).await + ) -> Result<(), mpsc::error::TrySendError> { + self.sender.try_send(event) } pub fn track_replay_usage(&self, session_id: &str, tracking: TrackingContext) { @@ -576,7 +538,6 @@ impl BatchQueue { mods_events, error_occurrences_v3, web_vitals, - replays, } = batch; let web_event_rows: Vec<_> = web_events.iter().map(|(e, _)| e).collect(); @@ -585,15 +546,8 @@ impl BatchQueue { let error_occurrence_v3_rows: Vec<_> = error_occurrences_v3.iter().map(|(e, _, _)| e).collect(); let web_vital_rows: Vec<_> = web_vitals.iter().map(|(e, _)| e).collect(); - let replay_rows: Vec<_> = replays.iter().map(|(e, _)| e).collect(); - - let ( - web_events_res, - mods_events_res, - error_occurrences_v3_res, - web_vitals_res, - replays_res, - ) = tokio::join!( + + let (web_events_res, mods_events_res, error_occurrences_v3_res, web_vitals_res) = tokio::join!( async { if web_event_rows.is_empty() { Ok(()) @@ -624,13 +578,6 @@ impl BatchQueue { self.tinybird.insert_web_vitals(&web_vital_rows).await } }, - async { - if replay_rows.is_empty() { - Ok(()) - } else { - self.tinybird.insert_replays(&replay_rows).await - } - }, ); if let Err(e) = web_events_res { @@ -670,14 +617,6 @@ impl BatchQueue { result.failed_web_vitals = web_vitals; } - if let Err(e) = replays_res { - record_batch_error(&mut result, "session_replays", replays.len(), &e); - if !e.is_transient() { - result.had_permanent_failure = true; - } - result.failed_replays = replays; - } - result } @@ -1130,7 +1069,6 @@ mod tests { assert_eq!(batch.error_occurrences_v3.len(), 1); assert_eq!(batch.web_vitals.len(), 1); assert!(batch.web_events.is_empty()); - assert!(batch.replays.is_empty()); } #[test] @@ -1206,22 +1144,6 @@ mod tests { .datasource(), "web_vitals" ); - assert_eq!( - QueuedEvent::Replay { - row: ReplayRow { - id: Uuid::new_v4(), - project_id: Uuid::new_v4(), - session_id: "session".to_string(), - identifier: Some(Uuid::new_v4().to_string()), - events: "[]".to_string(), - has_full_snapshot: 0, - created_at: Utc::now(), - }, - tracking: None, - } - .datasource(), - "session_replays" - ); } #[test] diff --git a/src/handler/error.rs b/src/handler/error.rs index f15b611..338aac8 100644 --- a/src/handler/error.rs +++ b/src/handler/error.rs @@ -29,9 +29,6 @@ pub(crate) struct ErrorRequest { build_id: Option, #[serde(default)] context: Option, - // TODO: handle project_name once project-level routing is supported. - #[serde(default)] - project_name: Option, #[serde(default, alias = "sdk_name")] sdk_name: Option, #[serde(default, alias = "sdk_version")] @@ -81,7 +78,6 @@ pub async fn error( }; let context = request_context(payload.context, empty_context); - let _project_name = payload.project_name; for mut error in payload.errors { if error.session_id.is_none() { diff --git a/src/handler/mod.rs b/src/handler/mod.rs index 858ead3..51e9c3c 100644 --- a/src/handler/mod.rs +++ b/src/handler/mod.rs @@ -37,6 +37,7 @@ static PROJECT_CACHE: LazyLock>> = LazyLock::n }); pub type HandlerResponse = (StatusCode, Json); +pub const MAX_REQUEST_BODY_BYTES: usize = 16 * 1024 * 1024; #[derive(Debug, Deserialize, Default)] pub struct EncodingQuery { @@ -47,26 +48,25 @@ pub fn decompress_body<'a>( body: &'a [u8], encoding: Option<&str>, ) -> Result, String> { + if body.len() > MAX_REQUEST_BODY_BYTES { + return Err("Request body too large".to_string()); + } + match encoding { Some("gzip") => { let mut decoder = flate2::read::GzDecoder::new(body); - let mut decompressed = Vec::with_capacity(body.len()); - decoder - .read_to_end(&mut decompressed) - .map_err(|e| format!("Failed to decompress gzip: {}", e))?; + let decompressed = read_limited(&mut decoder, "gzip")?; Ok(Cow::Owned(decompressed)) } Some("zstd") => { - let decompressed = zstd::stream::decode_all(body) + let mut decoder = zstd::stream::read::Decoder::new(body) .map_err(|e| format!("Failed to decompress zstd: {}", e))?; + let decompressed = read_limited(&mut decoder, "zstd")?; Ok(Cow::Owned(decompressed)) } Some("deflate") => { let mut decoder = flate2::read::DeflateDecoder::new(body); - let mut decompressed = Vec::with_capacity(body.len()); - decoder - .read_to_end(&mut decompressed) - .map_err(|e| format!("Failed to decompress deflate: {}", e))?; + let decompressed = read_limited(&mut decoder, "deflate")?; Ok(Cow::Owned(decompressed)) } Some(enc) => Err(format!("Unsupported encoding: {}", enc)), @@ -74,6 +74,20 @@ pub fn decompress_body<'a>( } } +fn read_limited(reader: &mut impl Read, encoding: &str) -> Result, String> { + let mut limited = reader.take((MAX_REQUEST_BODY_BYTES + 1) as u64); + let mut decompressed = Vec::with_capacity(MAX_REQUEST_BODY_BYTES.min(1024 * 1024)); + limited + .read_to_end(&mut decompressed) + .map_err(|e| format!("Failed to decompress {}: {}", encoding, e))?; + + if decompressed.len() > MAX_REQUEST_BODY_BYTES { + return Err("Request body too large after decompression".to_string()); + } + + Ok(decompressed) +} + pub fn get_authorization(headers: &HeaderMap) -> Option { headers .get("Authorization") @@ -92,6 +106,22 @@ pub fn error_response(status: StatusCode, message: &str) -> HandlerResponse { (status, Json(serde_json::json!({ "error": message }))) } +pub fn queue_error_response( + error: tokio::sync::mpsc::error::TrySendError, + item: &str, +) -> HandlerResponse { + match error { + tokio::sync::mpsc::error::TrySendError::Full(_) => { + warn!("Ingestion queue full while queueing {}", item); + error_response(StatusCode::SERVICE_UNAVAILABLE, "Ingestion queue is full") + } + tokio::sync::mpsc::error::TrySendError::Closed(_) => { + error!("Ingestion queue closed while queueing {}", item); + error_response(StatusCode::INTERNAL_SERVER_ERROR, "Failed to queue event") + } + } +} + pub fn success_response(warnings: HashMap) -> HandlerResponse { if warnings.is_empty() { ( @@ -469,10 +499,7 @@ pub async fn insert_web_event( tracking, }) .await - .map_err(|e| { - error!("Failed to queue event: {}", e); - error_response(StatusCode::INTERNAL_SERVER_ERROR, "Failed to queue event") - })?; + .map_err(|e| queue_error_response(e, "web event"))?; Ok(event_id) } @@ -513,10 +540,7 @@ pub async fn insert_mods_event( batch_queue .queue_event(QueuedEvent::ModsEvent { row, tracking }) .await - .map_err(|e| { - error!("Failed to queue event: {}", e); - error_response(StatusCode::INTERNAL_SERVER_ERROR, "Failed to queue event") - })?; + .map_err(|e| queue_error_response(e, "mods event"))?; Ok(event_id) } @@ -533,13 +557,7 @@ pub async fn insert_error_occurrence_v3( tracking, }) .await - .map_err(|e| { - error!("Failed to queue error occurrence v3: {}", e); - error_response( - StatusCode::INTERNAL_SERVER_ERROR, - "Failed to queue error occurrence", - ) - })?; + .map_err(|e| queue_error_response(e, "error occurrence"))?; Ok(()) } @@ -916,7 +934,6 @@ async fn process_replay_request( is_final, batch_id, sequence, - timestamp: _, url, identifier, mut events, diff --git a/src/handler/replay.rs b/src/handler/replay.rs index 169d346..730bc8c 100644 --- a/src/handler/replay.rs +++ b/src/handler/replay.rs @@ -72,8 +72,6 @@ pub(crate) struct ReplayRequest { #[serde(default)] pub(crate) batch_id: Option, pub(crate) sequence: u32, - #[allow(dead_code)] - pub(crate) timestamp: u64, pub(crate) url: String, #[serde(default, alias = "anonymousId")] pub(crate) identifier: Option, @@ -111,7 +109,6 @@ pub async fn replay( is_final, batch_id, sequence, - timestamp: _, url, identifier, mut events, diff --git a/src/handler/vitals.rs b/src/handler/vitals.rs index 6a2f7ff..13a10f7 100644 --- a/src/handler/vitals.rs +++ b/src/handler/vitals.rs @@ -1,6 +1,6 @@ use super::{ EncodingQuery, check_ip_allowed, decompress_body, error_response, get_authorization, - get_client_ip, load_project_context, + get_client_ip, load_project_context, queue_error_response, }; use crate::batch_queue::{FailedRequest, QueuedEvent, RequestType, TrackingContext}; use crate::models::AppState; @@ -177,11 +177,7 @@ pub async fn vitals( }) .await { - error!("Failed to queue web vital: {}", e); - return error_response( - StatusCode::INTERNAL_SERVER_ERROR, - "Failed to queue web vital", - ); + return queue_error_response(e, "web vital"); } if let Some(session_id) = req.session_id.as_deref() diff --git a/src/main.rs b/src/main.rs index 2842e88..72e9672 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,6 +1,6 @@ use axum::{ Router, - extract::{MatchedPath, Request}, + extract::{DefaultBodyLimit, MatchedPath, Request}, http::HeaderName, http::Method, http::StatusCode, @@ -53,6 +53,13 @@ async fn shutdown_signal() { } } +fn env_u32(name: &str, default: u32) -> u32 { + std::env::var(name) + .ok() + .and_then(|value| value.parse().ok()) + .unwrap_or(default) +} + #[tokio::main] async fn main() { #[cfg(debug_assertions)] @@ -77,9 +84,12 @@ async fn main() { let database_url = std::env::var("DATABASE_URL") .expect("DATABASE_URL must be set in .env file or environment variables"); + let database_max_connections = env_u32("DATABASE_MAX_CONNECTIONS", 10); + let database_acquire_timeout_secs = env_u32("DATABASE_ACQUIRE_TIMEOUT_SECS", 5); let pool = PgPoolOptions::new() - .max_connections(3) + .max_connections(database_max_connections) + .acquire_timeout(Duration::from_secs(database_acquire_timeout_secs.into())) .connect(&database_url) .await .expect("Failed to connect to database"); @@ -170,6 +180,7 @@ async fn main() { .route("/v1/error", post(handler::error)) .route("/v1/replay", post(handler::replay)) .layer(axum::middleware::from_fn(track_metrics)) + .layer(DefaultBodyLimit::max(handler::MAX_REQUEST_BODY_BYTES)) .layer(RequestDecompressionLayer::new()) .layer(cors) .with_state(state); diff --git a/src/replay_storage.rs b/src/replay_storage.rs index 1c986a0..45ac524 100644 --- a/src/replay_storage.rs +++ b/src/replay_storage.rs @@ -5,11 +5,13 @@ use serde::Serialize; use serde_json::Value; use std::collections::{HashMap, HashSet}; use std::io::Write; +use std::time::Duration; use tracing::warn; use uuid::Uuid; const REPLAY_CONTENT_ENCODING: &str = "zstd"; const ZSTD_COMPRESSION_LEVEL: i32 = 3; +const REPLAY_COMPRESSION_TIMEOUT: Duration = Duration::from_secs(10); #[derive(Clone)] pub struct ReplayStorage { @@ -47,6 +49,8 @@ pub struct ReplayFilterEventInput<'a> { pub enum ReplayStorageError { Serialization(serde_json::Error), Compression(std::io::Error), + CompressionTimeout, + CompressionTask(String), Upload(String), Database(sqlx::Error), } @@ -60,6 +64,12 @@ impl std::fmt::Display for ReplayStorageError { ReplayStorageError::Compression(error) => { write!(f, "Failed to compress replay chunk: {}", error) } + ReplayStorageError::CompressionTimeout => { + write!(f, "Timed out while compressing replay chunk") + } + ReplayStorageError::CompressionTask(error) => { + write!(f, "Replay compression task failed: {}", error) + } ReplayStorageError::Upload(error) => { write!(f, "Failed to upload replay chunk: {}", error) } @@ -162,7 +172,7 @@ impl ReplayStorage { } let snapshot_id = Uuid::new_v4(); - let (compressed, uncompressed_bytes) = zstd_json_value_array(&input.events)?; + let (compressed, uncompressed_bytes) = compress_replay_events(input.events.clone()).await?; let compressed_bytes = i64::try_from(compressed.len()).unwrap_or(i64::MAX); let first_event_timestamp_ms = replay_first_event_timestamp_ms(&input.events); let last_event_timestamp_ms = replay_last_event_timestamp_ms(&input.events); @@ -612,6 +622,14 @@ fn zstd_json_value_array(events: &[Value]) -> Result<(Vec, i64), ReplayStora Ok((writer.into_inner(), uncompressed_bytes)) } +async fn compress_replay_events(events: Vec) -> Result<(Vec, i64), ReplayStorageError> { + let task = tokio::task::spawn_blocking(move || zstd_json_value_array(&events)); + tokio::time::timeout(REPLAY_COMPRESSION_TIMEOUT, task) + .await + .map_err(|_| ReplayStorageError::CompressionTimeout)? + .map_err(|error| ReplayStorageError::CompressionTask(error.to_string()))? +} + fn replay_timestamp_ms(event: &Value) -> Option { let value = event.get("timestamp")?; if let Some(timestamp) = value.as_i64() { diff --git a/src/tinybird.rs b/src/tinybird.rs index a94fa5c..2e7ec9b 100644 --- a/src/tinybird.rs +++ b/src/tinybird.rs @@ -112,18 +112,6 @@ pub struct WebVitalRow { pub created_at: DateTime, } -#[derive(Debug, Clone, Serialize, Deserialize)] -pub struct ReplayRow { - pub id: Uuid, - pub project_id: Uuid, - pub session_id: String, - pub identifier: Option, - pub events: String, - pub has_full_snapshot: u8, - #[serde(with = "chrono::serde::ts_milliseconds")] - pub created_at: DateTime, -} - #[derive(Debug)] pub enum TinybirdError { Request(reqwest::Error), @@ -252,8 +240,4 @@ impl TinybirdClient { pub async fn insert_web_vitals(&self, rows: &[&WebVitalRow]) -> Result<(), TinybirdError> { self.send_batch("web_vitals", rows).await } - - pub async fn insert_replays(&self, rows: &[&ReplayRow]) -> Result<(), TinybirdError> { - self.send_batch("session_replays", rows).await - } } From bf242ddab5f53451d6ce2190d02471ae5977a395 Mon Sep 17 00:00:00 2001 From: Luca Date: Wed, 3 Jun 2026 12:42:48 +0200 Subject: [PATCH 2/3] chore: update regexes.yml from uap-core --- regexes.yaml | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/regexes.yaml b/regexes.yaml index 727f395..fa94845 100644 --- a/regexes.yaml +++ b/regexes.yaml @@ -833,6 +833,10 @@ user_agent_parsers: # Vivaldi - regex: '(Vivaldi)/(\d+)(?:\.(\d+)|)(?:\.(\d+)|)' + # Microsoft Teams embedded browser (needs to be before Edge and Chrome) + - regex: '.{0,200}(Teams)/(\d+)\.(\d+)\.(\d+)' + family_replacement: "Microsoft Teams" + # Edge/major_version.minor_version # Edge with chromium Edg/major_version.minor_version.patch.minor_patch - regex: '(Edge?)/(\d+)(?:\.(\d+)|)(?:\.(\d+)|)(?:\.(\d+)|)' @@ -949,6 +953,9 @@ user_agent_parsers: - regex: '(Valve(?: Steam|) Client).{1,200}Chrome/(\d+)\.(\d+)\.(\d+)' family_replacement: "Steam Client" + # Ladybird + - regex: '(Ladybird)/(\d+)\.(\d+)(?:\.(\d+)|)' + # Chrome/Chromium/major_version.minor_version - regex: '(Chromium|Chrome)/(\d+)\.(\d+)(?:\.(\d+)|)(?:\.(\d+)|)' From 2c632b9d25d6c8965d6faeea5bb9306ec5650f02 Mon Sep 17 00:00:00 2001 From: Luca Date: Wed, 3 Jun 2026 12:47:06 +0200 Subject: [PATCH 3/3] refactor: swtich to glibc --- Dockerfile | 14 ++++++-------- 1 file changed, 6 insertions(+), 8 deletions(-) diff --git a/Dockerfile b/Dockerfile index 2af6079..3448e88 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,6 +1,5 @@ FROM rust:1-bookworm AS chef -RUN rustup target add x86_64-unknown-linux-musl -RUN cargo install cargo-chef +RUN cargo install cargo-chef --locked WORKDIR /app FROM chef AS planner @@ -8,24 +7,23 @@ COPY . . RUN cargo chef prepare --recipe-path recipe.json FROM chef AS builder -RUN apt-get update && apt-get install -y musl-tools pkg-config \ +RUN apt-get update && apt-get install -y --no-install-recommends pkg-config \ && rm -rf /var/lib/apt/lists/* COPY --from=planner /app/recipe.json recipe.json RUN cargo chef cook \ --release \ - --target x86_64-unknown-linux-musl \ --recipe-path recipe.json COPY . . RUN cargo build \ --release \ - --target x86_64-unknown-linux-musl \ - --bin data-collector + --bin data-collector \ + && strip /app/target/release/data-collector -FROM gcr.io/distroless/static-debian12 AS runtime +FROM gcr.io/distroless/cc-debian12 AS runtime WORKDIR /app COPY regexes.yaml ./regexes.yaml COPY --from=builder \ - /app/target/x86_64-unknown-linux-musl/release/data-collector \ + /app/target/release/data-collector \ /usr/local/bin/data-collector USER nonroot:nonroot