From 381ddbb078d41b407f87bf3224e3254e21d4ff59 Mon Sep 17 00:00:00 2001 From: Meredith Heller Date: Mon, 15 Jun 2026 17:53:19 -0700 Subject: [PATCH 1/3] ref(outcomes): add quantity64 in processor --- rust_snuba/src/processors/outcomes.rs | 49 +++++++++++++++++++++++++-- 1 file changed, 47 insertions(+), 2 deletions(-) diff --git a/rust_snuba/src/processors/outcomes.rs b/rust_snuba/src/processors/outcomes.rs index e885f64f44..7d3c128eec 100644 --- a/rust_snuba/src/processors/outcomes.rs +++ b/rust_snuba/src/processors/outcomes.rs @@ -82,9 +82,25 @@ pub fn process_message( } } + msg.quantity64 = msg.quantity; + InsertBatch::from_rows([msg], None) } +// The `quantity` column is a u32, but incoming messages may carry a value that +// only fits in a u64, so we parse as u64 and narrow it for the u32 column while +// preserving the full value in `quantity64`. Values larger than u32::MAX are +// saturated to u32::MAX so the u32 column never wraps around to a bogus value. +fn serialize_quantity_as_u32(value: &Option, serializer: S) -> Result +where + S: serde::Serializer, +{ + match value { + Some(v) => serializer.serialize_some(&u32::try_from(*v).unwrap_or(u32::MAX)), + None => serializer.serialize_none(), + } +} + #[derive(Debug, Deserialize, Serialize)] struct Outcome { #[serde(default)] @@ -96,7 +112,11 @@ struct Outcome { timestamp: StringToIntDatetime, outcome: u8, category: Option, - quantity: Option, + #[serde(serialize_with = "serialize_quantity_as_u32")] + quantity: Option, + // Only derived from `quantity` in `process_message` + #[serde(skip_deserializing)] + quantity64: Option, reason: Option, event_id: Option, } @@ -127,7 +147,32 @@ mod tests { let result = process_message(payload, meta, &ProcessorConfig::default()) .expect("The message should be processed"); - let expected = b"{\"org_id\":1,\"project_id\":1,\"key_id\":null,\"timestamp\":1680029444,\"outcome\":4,\"category\":1,\"quantity\":3,\"reason\":null,\"event_id\":null}\n"; + let expected = b"{\"org_id\":1,\"project_id\":1,\"key_id\":null,\"timestamp\":1680029444,\"outcome\":4,\"category\":1,\"quantity\":3,\"quantity64\":3,\"reason\":null,\"event_id\":null}\n"; + + assert_eq!(result.rows.into_encoded_rows(), expected); + } + + #[test] + fn test_outcome_quantity_overflow() { + // A quantity larger than u32::MAX should saturate in the u32 `quantity` + // column while `quantity64` keeps the full value. + let data = r#"{ + "org_id": 1, + "outcome": 4, + "project_id": 1, + "quantity": 5000000000, + "timestamp": "2023-03-28T18:50:44.000011Z" + }"#; + let payload = KafkaPayload::new(None, None, Some(data.as_bytes().to_vec())); + let meta = KafkaMessageMetadata { + partition: 0, + offset: 1, + timestamp: DateTime::from(SystemTime::now()), + }; + let result = process_message(payload, meta, &ProcessorConfig::default()) + .expect("The message should be processed"); + + let expected = b"{\"org_id\":1,\"project_id\":1,\"key_id\":null,\"timestamp\":1680029444,\"outcome\":4,\"category\":1,\"quantity\":4294967295,\"quantity64\":5000000000,\"reason\":null,\"event_id\":null}\n"; assert_eq!(result.rows.into_encoded_rows(), expected); } From 3a7cb880c11d51875768b40c079abf2a9dc431d7 Mon Sep 17 00:00:00 2001 From: Meredith Heller Date: Mon, 15 Jun 2026 18:40:07 -0700 Subject: [PATCH 2/3] rename columns when serializing --- rust_snuba/src/processors/outcomes.rs | 34 ++++++++++----------------- 1 file changed, 12 insertions(+), 22 deletions(-) diff --git a/rust_snuba/src/processors/outcomes.rs b/rust_snuba/src/processors/outcomes.rs index 7d3c128eec..627bc815c8 100644 --- a/rust_snuba/src/processors/outcomes.rs +++ b/rust_snuba/src/processors/outcomes.rs @@ -82,25 +82,15 @@ pub fn process_message( } } - msg.quantity64 = msg.quantity; + msg.quantity32 = Some( + msg.quantity + .and_then(|quantity| u32::try_from(quantity).ok()) + .unwrap_or(0), + ); InsertBatch::from_rows([msg], None) } -// The `quantity` column is a u32, but incoming messages may carry a value that -// only fits in a u64, so we parse as u64 and narrow it for the u32 column while -// preserving the full value in `quantity64`. Values larger than u32::MAX are -// saturated to u32::MAX so the u32 column never wraps around to a bogus value. -fn serialize_quantity_as_u32(value: &Option, serializer: S) -> Result -where - S: serde::Serializer, -{ - match value { - Some(v) => serializer.serialize_some(&u32::try_from(*v).unwrap_or(u32::MAX)), - None => serializer.serialize_none(), - } -} - #[derive(Debug, Deserialize, Serialize)] struct Outcome { #[serde(default)] @@ -112,11 +102,11 @@ struct Outcome { timestamp: StringToIntDatetime, outcome: u8, category: Option, - #[serde(serialize_with = "serialize_quantity_as_u32")] + #[serde(rename(serialize = "quantity64"))] quantity: Option, // Only derived from `quantity` in `process_message` - #[serde(skip_deserializing)] - quantity64: Option, + #[serde(skip_deserializing, rename(serialize = "quantity"))] + quantity32: Option, reason: Option, event_id: Option, } @@ -147,15 +137,15 @@ mod tests { let result = process_message(payload, meta, &ProcessorConfig::default()) .expect("The message should be processed"); - let expected = b"{\"org_id\":1,\"project_id\":1,\"key_id\":null,\"timestamp\":1680029444,\"outcome\":4,\"category\":1,\"quantity\":3,\"quantity64\":3,\"reason\":null,\"event_id\":null}\n"; + let expected = b"{\"org_id\":1,\"project_id\":1,\"key_id\":null,\"timestamp\":1680029444,\"outcome\":4,\"category\":1,\"quantity64\":3,\"quantity\":3,\"reason\":null,\"event_id\":null}\n"; assert_eq!(result.rows.into_encoded_rows(), expected); } #[test] fn test_outcome_quantity_overflow() { - // A quantity larger than u32::MAX should saturate in the u32 `quantity` - // column while `quantity64` keeps the full value. + // A quantity larger than u32::MAX cannot fit in the u32 `quantity` + // column; default to 0 while `quantity64` keeps the full value. let data = r#"{ "org_id": 1, "outcome": 4, @@ -172,7 +162,7 @@ mod tests { let result = process_message(payload, meta, &ProcessorConfig::default()) .expect("The message should be processed"); - let expected = b"{\"org_id\":1,\"project_id\":1,\"key_id\":null,\"timestamp\":1680029444,\"outcome\":4,\"category\":1,\"quantity\":4294967295,\"quantity64\":5000000000,\"reason\":null,\"event_id\":null}\n"; + let expected = b"{\"org_id\":1,\"project_id\":1,\"key_id\":null,\"timestamp\":1680029444,\"outcome\":4,\"category\":1,\"quantity64\":5000000000,\"quantity\":0,\"reason\":null,\"event_id\":null}\n"; assert_eq!(result.rows.into_encoded_rows(), expected); } From 674506baa671b095a48934590fd0bb45e58f3129 Mon Sep 17 00:00:00 2001 From: Pierre Massat Date: Mon, 15 Jun 2026 20:57:51 -0700 Subject: [PATCH 3/3] test(outcomes): update snapshots for quantity64 column The processor now serializes both `quantity64` (full u64 value) and the derived u32 `quantity`. Regenerate the schema snapshots so they include the new `quantity64` field. Co-Authored-By: Claude Opus 4.8 (1M context) --- ...comesProcessor-outcomes__1__outcomes-discarded-hash.json.snap | 1 + ...outcomes-OutcomesProcessor-outcomes__1__outcomes-lb.json.snap | 1 + ...OutcomesProcessor-outcomes__1__outcomes-null-values.json.snap | 1 + ...omes-OutcomesProcessor-outcomes__1__outcomes-pop-us.json.snap | 1 + ...comesProcessor-outcomes__1__outcomes-relay-internal.json.snap | 1 + ...omesProcessor-outcomes__1__outcomes2-missing-key-id.json.snap | 1 + 6 files changed, 6 insertions(+) diff --git a/rust_snuba/src/processors/snapshots/rust_snuba__processors__tests__schemas@outcomes-OutcomesProcessor-outcomes__1__outcomes-discarded-hash.json.snap b/rust_snuba/src/processors/snapshots/rust_snuba__processors__tests__schemas@outcomes-OutcomesProcessor-outcomes__1__outcomes-discarded-hash.json.snap index 86873791d8..6fbbbcff7c 100644 --- a/rust_snuba/src/processors/snapshots/rust_snuba__processors__tests__schemas@outcomes-OutcomesProcessor-outcomes__1__outcomes-discarded-hash.json.snap +++ b/rust_snuba/src/processors/snapshots/rust_snuba__processors__tests__schemas@outcomes-OutcomesProcessor-outcomes__1__outcomes-discarded-hash.json.snap @@ -12,6 +12,7 @@ expression: snapshot_payload "outcome": 1, "project_id": 1, "quantity": 1, + "quantity64": 1, "reason": "discarded-hash", "timestamp": 1680029444 } diff --git a/rust_snuba/src/processors/snapshots/rust_snuba__processors__tests__schemas@outcomes-OutcomesProcessor-outcomes__1__outcomes-lb.json.snap b/rust_snuba/src/processors/snapshots/rust_snuba__processors__tests__schemas@outcomes-OutcomesProcessor-outcomes__1__outcomes-lb.json.snap index fbcc495038..9e1f6ce88a 100644 --- a/rust_snuba/src/processors/snapshots/rust_snuba__processors__tests__schemas@outcomes-OutcomesProcessor-outcomes__1__outcomes-lb.json.snap +++ b/rust_snuba/src/processors/snapshots/rust_snuba__processors__tests__schemas@outcomes-OutcomesProcessor-outcomes__1__outcomes-lb.json.snap @@ -12,6 +12,7 @@ expression: snapshot_payload "outcome": 4, "project_id": 1, "quantity": 1, + "quantity64": 1, "reason": null, "timestamp": 1680029439 } diff --git a/rust_snuba/src/processors/snapshots/rust_snuba__processors__tests__schemas@outcomes-OutcomesProcessor-outcomes__1__outcomes-null-values.json.snap b/rust_snuba/src/processors/snapshots/rust_snuba__processors__tests__schemas@outcomes-OutcomesProcessor-outcomes__1__outcomes-null-values.json.snap index 6b9743130b..de8023319a 100644 --- a/rust_snuba/src/processors/snapshots/rust_snuba__processors__tests__schemas@outcomes-OutcomesProcessor-outcomes__1__outcomes-null-values.json.snap +++ b/rust_snuba/src/processors/snapshots/rust_snuba__processors__tests__schemas@outcomes-OutcomesProcessor-outcomes__1__outcomes-null-values.json.snap @@ -12,6 +12,7 @@ expression: snapshot_payload "outcome": 0, "project_id": 1, "quantity": 1, + "quantity64": 1, "reason": null, "timestamp": 1679686100 } diff --git a/rust_snuba/src/processors/snapshots/rust_snuba__processors__tests__schemas@outcomes-OutcomesProcessor-outcomes__1__outcomes-pop-us.json.snap b/rust_snuba/src/processors/snapshots/rust_snuba__processors__tests__schemas@outcomes-OutcomesProcessor-outcomes__1__outcomes-pop-us.json.snap index 23ff5acb8e..e4d9bec88c 100644 --- a/rust_snuba/src/processors/snapshots/rust_snuba__processors__tests__schemas@outcomes-OutcomesProcessor-outcomes__1__outcomes-pop-us.json.snap +++ b/rust_snuba/src/processors/snapshots/rust_snuba__processors__tests__schemas@outcomes-OutcomesProcessor-outcomes__1__outcomes-pop-us.json.snap @@ -12,6 +12,7 @@ expression: snapshot_payload "outcome": 3, "project_id": 1, "quantity": 1, + "quantity64": 1, "reason": "project_id", "timestamp": 1680043860 } diff --git a/rust_snuba/src/processors/snapshots/rust_snuba__processors__tests__schemas@outcomes-OutcomesProcessor-outcomes__1__outcomes-relay-internal.json.snap b/rust_snuba/src/processors/snapshots/rust_snuba__processors__tests__schemas@outcomes-OutcomesProcessor-outcomes__1__outcomes-relay-internal.json.snap index 6de5551085..4c325ca614 100644 --- a/rust_snuba/src/processors/snapshots/rust_snuba__processors__tests__schemas@outcomes-OutcomesProcessor-outcomes__1__outcomes-relay-internal.json.snap +++ b/rust_snuba/src/processors/snapshots/rust_snuba__processors__tests__schemas@outcomes-OutcomesProcessor-outcomes__1__outcomes-relay-internal.json.snap @@ -12,6 +12,7 @@ expression: snapshot_payload "outcome": 3, "project_id": 1, "quantity": 1, + "quantity64": 1, "reason": "project_id", "timestamp": 1680043980 } diff --git a/rust_snuba/src/processors/snapshots/rust_snuba__processors__tests__schemas@outcomes-OutcomesProcessor-outcomes__1__outcomes2-missing-key-id.json.snap b/rust_snuba/src/processors/snapshots/rust_snuba__processors__tests__schemas@outcomes-OutcomesProcessor-outcomes__1__outcomes2-missing-key-id.json.snap index 4849025158..64225fc46e 100644 --- a/rust_snuba/src/processors/snapshots/rust_snuba__processors__tests__schemas@outcomes-OutcomesProcessor-outcomes__1__outcomes2-missing-key-id.json.snap +++ b/rust_snuba/src/processors/snapshots/rust_snuba__processors__tests__schemas@outcomes-OutcomesProcessor-outcomes__1__outcomes2-missing-key-id.json.snap @@ -12,6 +12,7 @@ expression: snapshot_payload "outcome": 4, "project_id": 1, "quantity": 3, + "quantity64": 3, "reason": null, "timestamp": 1680029449 }