Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 6 additions & 8 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,31 +1,29 @@
FROM rust:1-bookworm AS chef
RUN rustup target add x86_64-unknown-linux-musl
RUN cargo install cargo-chef
RUN cargo install cargo-chef --locked
WORKDIR /app

FROM chef AS planner
COPY . .
RUN cargo chef prepare --recipe-path recipe.json

FROM chef AS builder
RUN apt-get update && apt-get install -y musl-tools pkg-config \
RUN apt-get update && apt-get install -y --no-install-recommends pkg-config \
&& rm -rf /var/lib/apt/lists/*
COPY --from=planner /app/recipe.json recipe.json
RUN cargo chef cook \
--release \
--target x86_64-unknown-linux-musl \
--recipe-path recipe.json
COPY . .
RUN cargo build \
--release \
--target x86_64-unknown-linux-musl \
--bin data-collector
--bin data-collector \
&& strip /app/target/release/data-collector

FROM gcr.io/distroless/static-debian12 AS runtime
FROM gcr.io/distroless/cc-debian12 AS runtime
WORKDIR /app
COPY regexes.yaml ./regexes.yaml
COPY --from=builder \
/app/target/x86_64-unknown-linux-musl/release/data-collector \
/app/target/release/data-collector \
/usr/local/bin/data-collector
USER nonroot:nonroot

Expand Down
7 changes: 7 additions & 0 deletions regexes.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -833,6 +833,10 @@ user_agent_parsers:
# Vivaldi
- regex: '(Vivaldi)/(\d+)(?:\.(\d+)|)(?:\.(\d+)|)'

# Microsoft Teams embedded browser (needs to be before Edge and Chrome)
- regex: '.{0,200}(Teams)/(\d+)\.(\d+)\.(\d+)'
family_replacement: "Microsoft Teams"

# Edge/major_version.minor_version
# Edge with chromium Edg/major_version.minor_version.patch.minor_patch
- regex: '(Edge?)/(\d+)(?:\.(\d+)|)(?:\.(\d+)|)(?:\.(\d+)|)'
Expand Down Expand Up @@ -949,6 +953,9 @@ user_agent_parsers:
- regex: '(Valve(?: Steam|) Client).{1,200}Chrome/(\d+)\.(\d+)\.(\d+)'
family_replacement: "Steam Client"

# Ladybird
- regex: '(Ladybird)/(\d+)\.(\d+)(?:\.(\d+)|)'

# Chrome/Chromium/major_version.minor_version
- regex: '(Chromium|Chrome)/(\d+)\.(\d+)(?:\.(\d+)|)(?:\.(\d+)|)'

Expand Down
90 changes: 6 additions & 84 deletions src/batch_queue/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::error_tracking::sourcemaps::SourcemapResolver;
use crate::error_tracking::v3::ErrorLanguage;
use crate::polar::{PolarClient, UsageCounts};
use crate::tinybird::{
ErrorOccurrenceV3Row, ModsEventRow, ReplayRow, TinybirdClient, WebEventRow, WebVitalRow,
ErrorOccurrenceV3Row, ModsEventRow, TinybirdClient, WebEventRow, WebVitalRow,
};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
Expand Down Expand Up @@ -90,11 +90,6 @@ pub enum QueuedEvent {
#[serde(skip_serializing_if = "Option::is_none")]
tracking: Option<TrackingContext>,
},
Replay {
row: ReplayRow,
#[serde(skip_serializing_if = "Option::is_none")]
tracking: Option<TrackingContext>,
},
}

impl QueuedEvent {
Expand All @@ -104,7 +99,6 @@ impl QueuedEvent {
QueuedEvent::ModsEvent { .. } => "mods_events",
QueuedEvent::ErrorOccurrenceV3 { .. } => "error_tracking_v3",
QueuedEvent::WebVital { .. } => "web_vitals",
QueuedEvent::Replay { .. } => "session_replays",
}
}
}
Expand All @@ -117,7 +111,6 @@ struct InMemoryBatch {
mods_events: Vec<(ModsEventRow, Option<TrackingContext>)>,
error_occurrences_v3: Vec<(ErrorOccurrenceV3Row, ErrorLanguage, Option<TrackingContext>)>,
web_vitals: Vec<(WebVitalRow, Option<TrackingContext>)>,
replays: Vec<(ReplayRow, Option<TrackingContext>)>,
}

impl Default for InMemoryBatch {
Expand All @@ -127,7 +120,6 @@ impl Default for InMemoryBatch {
mods_events: Vec::with_capacity(INITIAL_BATCH_CAPACITY),
error_occurrences_v3: Vec::new(),
web_vitals: Vec::with_capacity(INITIAL_BATCH_CAPACITY / 4),
replays: Vec::new(),
}
}
}
Expand All @@ -138,15 +130,13 @@ impl InMemoryBatch {
&& self.mods_events.is_empty()
&& self.error_occurrences_v3.is_empty()
&& self.web_vitals.is_empty()
&& self.replays.is_empty()
}

fn total_count(&self) -> usize {
self.web_events.len()
+ self.mods_events.len()
+ self.error_occurrences_v3.len()
+ self.web_vitals.len()
+ self.replays.len()
}

fn push(&mut self, event: QueuedEvent) {
Expand All @@ -159,7 +149,6 @@ impl InMemoryBatch {
tracking,
} => self.error_occurrences_v3.push((*row, language, tracking)),
QueuedEvent::WebVital { row, tracking } => self.web_vitals.push((row, tracking)),
QueuedEvent::Replay { row, tracking } => self.replays.push((row, tracking)),
}
}

Expand Down Expand Up @@ -192,20 +181,14 @@ impl InMemoryBatch {
.into_iter()
.map(|(row, tracking)| QueuedEvent::WebVital { row, tracking }),
);
result.extend(
self.replays
.into_iter()
.map(|(row, tracking)| QueuedEvent::Replay { row, tracking }),
);
result
}

fn aggregate_usage(&self) -> AggregatedUsage {
let estimated_owners = (self.web_events.len()
+ self.mods_events.len()
+ self.error_occurrences_v3.len()
+ self.web_vitals.len()
+ self.replays.len())
+ self.web_vitals.len())
.min(100);

let mut usage: AggregatedUsage = HashMap::with_capacity(estimated_owners);
Expand Down Expand Up @@ -244,23 +227,6 @@ impl InMemoryBatch {
}
}
count_usage!(&self.web_vitals, web_vitals);
for (row, ctx) in &self.replays {
if let Some(ctx) = ctx {
let entry = usage
.entry(Arc::clone(&ctx.owner_id))
.or_insert_with(|| OwnerUsage {
counts: UsageCounts::default(),
token: Arc::clone(&ctx.token),
org: ctx.organization_id.as_ref().map(Arc::clone),
});
entry.counts.session_replays += 1;
entry
.counts
.session_replay_ids
.insert(row.session_id.clone());
}
}

usage
}
}
Expand All @@ -272,7 +238,6 @@ struct BatchSendResult {
failed_error_occurrences_v3:
Vec<(ErrorOccurrenceV3Row, ErrorLanguage, Option<TrackingContext>)>,
failed_web_vitals: Vec<(WebVitalRow, Option<TrackingContext>)>,
failed_replays: Vec<(ReplayRow, Option<TrackingContext>)>,
had_permanent_failure: bool,
errors: Vec<String>,
}
Expand All @@ -283,7 +248,6 @@ impl BatchSendResult {
|| !self.failed_mods_events.is_empty()
|| !self.failed_error_occurrences_v3.is_empty()
|| !self.failed_web_vitals.is_empty()
|| !self.failed_replays.is_empty()
}

fn into_in_memory_batch(self) -> InMemoryBatch {
Expand All @@ -292,7 +256,6 @@ impl BatchSendResult {
mods_events: self.failed_mods_events,
error_occurrences_v3: self.failed_error_occurrences_v3,
web_vitals: self.failed_web_vitals,
replays: self.failed_replays,
}
}

Expand All @@ -301,7 +264,6 @@ impl BatchSendResult {
+ self.failed_mods_events.len()
+ self.failed_error_occurrences_v3.len()
+ self.failed_web_vitals.len()
+ self.failed_replays.len()
}

fn error_summary(&self) -> String {
Expand Down Expand Up @@ -391,8 +353,8 @@ impl BatchQueue {
pub async fn queue_event(
&self,
event: QueuedEvent,
) -> Result<(), mpsc::error::SendError<QueuedEvent>> {
self.sender.send(event).await
) -> Result<(), mpsc::error::TrySendError<QueuedEvent>> {
self.sender.try_send(event)
}

pub fn track_replay_usage(&self, session_id: &str, tracking: TrackingContext) {
Expand Down Expand Up @@ -576,7 +538,6 @@ impl BatchQueue {
mods_events,
error_occurrences_v3,
web_vitals,
replays,
} = batch;

let web_event_rows: Vec<_> = web_events.iter().map(|(e, _)| e).collect();
Expand All @@ -585,15 +546,8 @@ impl BatchQueue {
let error_occurrence_v3_rows: Vec<_> =
error_occurrences_v3.iter().map(|(e, _, _)| e).collect();
let web_vital_rows: Vec<_> = web_vitals.iter().map(|(e, _)| e).collect();
let replay_rows: Vec<_> = replays.iter().map(|(e, _)| e).collect();

let (
web_events_res,
mods_events_res,
error_occurrences_v3_res,
web_vitals_res,
replays_res,
) = tokio::join!(

let (web_events_res, mods_events_res, error_occurrences_v3_res, web_vitals_res) = tokio::join!(
async {
if web_event_rows.is_empty() {
Ok(())
Expand Down Expand Up @@ -624,13 +578,6 @@ impl BatchQueue {
self.tinybird.insert_web_vitals(&web_vital_rows).await
}
},
async {
if replay_rows.is_empty() {
Ok(())
} else {
self.tinybird.insert_replays(&replay_rows).await
}
},
);

if let Err(e) = web_events_res {
Expand Down Expand Up @@ -670,14 +617,6 @@ impl BatchQueue {
result.failed_web_vitals = web_vitals;
}

if let Err(e) = replays_res {
record_batch_error(&mut result, "session_replays", replays.len(), &e);
if !e.is_transient() {
result.had_permanent_failure = true;
}
result.failed_replays = replays;
}

result
}

Expand Down Expand Up @@ -1130,7 +1069,6 @@ mod tests {
assert_eq!(batch.error_occurrences_v3.len(), 1);
assert_eq!(batch.web_vitals.len(), 1);
assert!(batch.web_events.is_empty());
assert!(batch.replays.is_empty());
}

#[test]
Expand Down Expand Up @@ -1206,22 +1144,6 @@ mod tests {
.datasource(),
"web_vitals"
);
assert_eq!(
QueuedEvent::Replay {
row: ReplayRow {
id: Uuid::new_v4(),
project_id: Uuid::new_v4(),
session_id: "session".to_string(),
identifier: Some(Uuid::new_v4().to_string()),
events: "[]".to_string(),
has_full_snapshot: 0,
created_at: Utc::now(),
},
tracking: None,
}
.datasource(),
"session_replays"
);
}

#[test]
Expand Down
4 changes: 0 additions & 4 deletions src/handler/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,6 @@ pub(crate) struct ErrorRequest {
build_id: Option<String>,
#[serde(default)]
context: Option<Value>,
// TODO: handle project_name once project-level routing is supported.
#[serde(default)]
project_name: Option<String>,
#[serde(default, alias = "sdk_name")]
sdk_name: Option<String>,
#[serde(default, alias = "sdk_version")]
Expand Down Expand Up @@ -81,7 +78,6 @@ pub async fn error(
};

let context = request_context(payload.context, empty_context);
let _project_name = payload.project_name;

for mut error in payload.errors {
if error.session_id.is_none() {
Expand Down
Loading