From 2f34b53954854d71cb4f20d4b058aadede606471 Mon Sep 17 00:00:00 2001 From: Bryan English Date: Thu, 2 Jul 2026 16:57:44 -0400 Subject: [PATCH] fix(change-buffer): skip ops and chunk spans when the span is not live Previously, a single operation referencing a span that was no longer live -- already extracted (a late or duplicate op for an exported span), or whose Create was never applied -- made flush_change_buffer return SpanNotFound, aborting the entire batch. The caller then reset the change queue, discarding every still-pending operation including unrelated Creates; those spans were then orphaned and cascaded into further SpanNotFound errors at export time. flush_chunk shared the same fragility: one missing span id aborted extraction of the whole chunk. Now both paths skip the offending operation or span and continue. interpret_operation_cached still parses each op's payload so the read cursor stays aligned for subsequent ops, but applies mutations only when the target span is present. flush_chunk establishes its segment from the first live span and skips absent ids rather than returning an error. Skips are tallied in a new dropped_for_missing_span counter (exposed via a getter) and logged at debug level; non-zero values indicate benign late or duplicate operations rather than a fault. --- libdd-trace-utils/src/change_buffer/mod.rs | 353 ++++++++++++++++++--- 1 file changed, 315 insertions(+), 38 deletions(-) diff --git a/libdd-trace-utils/src/change_buffer/mod.rs b/libdd-trace-utils/src/change_buffer/mod.rs index 8fe4296a1a..21d5316023 100644 --- a/libdd-trace-utils/src/change_buffer/mod.rs +++ b/libdd-trace-utils/src/change_buffer/mod.rs @@ -156,6 +156,12 @@ pub struct ChangeBufferState { /// eliminates the alloc/dealloc churn that fragments the WASM linear memory allocator over /// time. span_pool: Vec>, + /// Count of operations (and chunk spans) skipped during a flush because their target span + /// was not live in [`Self::spans`] — either already extracted (a late/duplicate op) or a + /// Create that was never applied. Skipping keeps the flush resilient: one orphaned op no + /// longer aborts the whole batch (which previously discarded every still-pending op, + /// including unrelated Creates). Non-zero values are benign but worth surfacing. + dropped_for_missing_span: u64, } fn new_span_pooled( @@ -253,6 +259,7 @@ where str_agent_psr: T::Text::from_static_str("_dd.agent_psr"), str_internal: T::Text::from_static_str("internal"), span_pool: Vec::new(), + dropped_for_missing_span: 0, } } @@ -268,6 +275,14 @@ where self.span_pool.len() } + /// Number of operations/spans skipped during flushes because their target span was not + /// live (already extracted, or a Create that was never applied). Non-zero values are + /// benign — they indicate late or duplicate operations rather than a fault — but callers + /// may surface this as a metric to monitor pipeline health. + pub fn dropped_for_missing_span(&self) -> u64 { + self.dropped_for_missing_span + } + pub fn recycle_spans(&mut self, spans: Vec>) { let available = Self::SPANS_POOL_MAX_SIZE.saturating_sub(self.span_pool.len()); for span in spans.into_iter().take(available) { @@ -285,24 +300,31 @@ where let mut spans_vec = Vec::with_capacity(span_ids.len()); - // Fetch the trace_id corresponding to this chunk. It must be the same for all the spans in - // the chunk. - let Some(fst_id) = span_ids.first() else { + // The segment (and trace_id) is the same for all spans in the chunk. Establish it from + // the first span that is actually live: the nominal first id may be absent (never + // created, or already extracted), so fall back to the first present id rather than + // aborting the whole chunk. If none are live there is nothing to extract. + let Some(segment_id) = span_ids + .iter() + .find_map(|id| self.spans.get(id).map(|(_, segment_id)| *segment_id)) + else { return Ok(vec![]); }; - let Some((_span, segment_id)) = self.spans.get(fst_id) else { - return Err(ChangeBufferError::SpanNotFound(*fst_id)); - }; - - let segment_id = *segment_id; let segment = self.segments.get(&segment_id); + let mut skipped: u64 = 0; for span_id in span_ids { - let (mut span, _segment_id) = self - .spans - .remove(span_id) - .ok_or(ChangeBufferError::SpanNotFound(*span_id))?; + let Some((mut span, _segment_id)) = self.spans.remove(span_id) else { + // Span isn't live (already extracted, or its Create was never applied). Skip + // it instead of aborting the chunk so the remaining spans still export. + skipped += 1; + tracing::debug!( + span_id, + "change_buffer: skipping chunk span not present in the map" + ); + continue; + }; if is_local_root { self.copy_in_sampling_tags(segment, &mut span); @@ -319,6 +341,8 @@ where spans_vec.push(span); } + self.dropped_for_missing_span += skipped; + let segment = self.segments.get_mut(&segment_id); let should_remove = segment @@ -441,66 +465,102 @@ where cache_slot: &mut Option>, ) -> Result<()> { let buf = &self.change_buffer; - let cached = match cache_slot.as_mut() { - Some(cached) if op.span_id == cached.span_id => cached, - _ => { - let (span, segment_id) = self - .spans - .get_mut(&op.span_id) - .ok_or(ChangeBufferError::SpanNotFound(op.span_id))?; - - cache_slot.insert(SpanCache { + // Resolve the target span. It may be absent if it was already extracted (a late or + // duplicate op referencing an exported span) or if its Create was never applied. In + // that case we DO NOT abort: we still parse the op's payload below (advancing `index`) + // so the read cursor stays aligned for the remaining ops, but skip the mutation. This + // keeps a single orphaned op from discarding every still-pending op in the batch. + let cached: Option<&mut SpanCache> = match cache_slot.as_mut() { + Some(cached) if op.span_id == cached.span_id => Some(cached), + _ => match self.spans.get_mut(&op.span_id) { + Some((span, segment_id)) => Some(cache_slot.insert(SpanCache { span_id: op.span_id, // Safety: a mutable reference can't be null // TODO: use NonNull::from_mut once our MRSV is recent enough span_ptr: unsafe { NonNull::new_unchecked(span as *mut Span) }, segment_id: *segment_id, - }) - } + })), + None => { + // Don't cache a miss; drop any stale cache from a prior span. + *cache_slot = None; + self.dropped_for_missing_span += 1; + tracing::debug!( + span_id = op.span_id, + "change_buffer: skipping op for span not present in the map" + ); + None + } + }, }; + let segment_id = cached.as_ref().map_or(0, |c| c.segment_id); // Safety: span_ptr points into self.spans and is valid for write (safety pre-condition of // this function). // self.spans is never aliased/accessed otherwise for the lifetime of `span`. - let span = unsafe { cached.span_ptr.as_mut() }; + let span = cached.map(|c| unsafe { c.span_ptr.as_mut() }); match op.opcode { OpCode::SetMetaAttr => { let key = buf.read_string(&self.string_table, index)?; let val = buf.read_string(&self.string_table, index)?; - span.meta.insert(key, val); + if let Some(span) = span { + span.meta.insert(key, val); + } } OpCode::SetMetricAttr => { let key = buf.read_string(&self.string_table, index)?; let val: f64 = buf.read(index)?; - span.metrics.insert(key, val); + if let Some(span) = span { + span.metrics.insert(key, val); + } } OpCode::SetServiceName => { - span.service = buf.read_string(&self.string_table, index)?; + let service = buf.read_string(&self.string_table, index)?; + if let Some(span) = span { + span.service = service; + } } OpCode::SetResourceName => { - span.resource = buf.read_string(&self.string_table, index)?; + let resource = buf.read_string(&self.string_table, index)?; + if let Some(span) = span { + span.resource = resource; + } } OpCode::SetError => { - span.error = buf.read(index)?; + let error = buf.read(index)?; + if let Some(span) = span { + span.error = error; + } } OpCode::SetStart => { - span.start = buf.read(index)?; + let start = buf.read(index)?; + if let Some(span) = span { + span.start = start; + } } OpCode::SetDuration => { - span.duration = buf.read(index)?; + let duration = buf.read(index)?; + if let Some(span) = span { + span.duration = duration; + } } OpCode::SetType => { - span.r#type = buf.read_string(&self.string_table, index)?; + let r#type = buf.read_string(&self.string_table, index)?; + if let Some(span) = span { + span.r#type = r#type; + } } OpCode::SetName => { - span.name = buf.read_string(&self.string_table, index)?; + let name = buf.read_string(&self.string_table, index)?; + if let Some(span) = span { + span.name = name; + } } OpCode::SetTraceMetaAttr => { let name = buf.read_string(&self.string_table, index)?; let val = buf.read_string(&self.string_table, index)?; - if let Some(segment) = self.segments.get_mut(&cached.segment_id) { + if let Some(segment) = self.segments.get_mut(&segment_id) { segment.meta.insert(name, val); } } @@ -508,31 +568,37 @@ where let name = buf.read_string(&self.string_table, index)?; let val = buf.read(index)?; - if let Some(segment) = self.segments.get_mut(&cached.segment_id) { + if let Some(segment) = self.segments.get_mut(&segment_id) { segment.metrics.insert(name, val); } } OpCode::SetTraceOrigin => { let origin = buf.read_string(&self.string_table, index)?; - if let Some(segment) = self.segments.get_mut(&cached.segment_id) { + if let Some(segment) = self.segments.get_mut(&segment_id) { segment.origin = Some(origin); } } OpCode::BatchSetMeta => { let count: u32 = buf.read(index)?; + let mut span = span; for _ in 0..count { let key = buf.read_string(&self.string_table, index)?; let val = buf.read_string(&self.string_table, index)?; - span.meta.insert(key, val); + if let Some(span) = span.as_deref_mut() { + span.meta.insert(key, val); + } } } OpCode::BatchSetMetric => { let count: u32 = buf.read(index)?; + let mut span = span; for _ in 0..count { let key = buf.read_string(&self.string_table, index)?; let val: f64 = buf.read(index)?; - span.metrics.insert(key, val); + if let Some(span) = span.as_deref_mut() { + span.metrics.insert(key, val); + } } } OpCode::Create | OpCode::CreateSpan | OpCode::CreateSpanFull => { @@ -1043,4 +1109,215 @@ mod segment_isolation_tests { "segment 1 trace metric was polluted by segment 2's SetTraceMetricsAttr" ); } + + // ----------------------------------------------------------------------- + // Tolerance for operations/spans whose target span is not live. + // + // A span can be absent from the map because it was already extracted (a + // late or duplicate op for an exported span) or because its Create was + // never applied. Such ops/spans must be skipped — not treated as a hard + // error that aborts the whole flush/chunk — while keeping the buffer read + // cursor aligned so subsequent ops decode correctly. + // ----------------------------------------------------------------------- + + const OP_MISSING: u64 = 99; // a span id that is never created + + #[test] + fn op_for_missing_span_is_skipped_and_keeps_cursor_aligned() { + // Create span 1, then a SetServiceName for a missing span (skipped), + // then a SetServiceName for span 1. If the missing op's payload were + // not consumed, span 1's service would decode from the wrong bytes. + let mut w = BufWriter::new(); + w.op(OP_CREATE, 1); + w.u128(TRACE_ID); + w.u64(1); // segment_id + w.u64(0); // parent_id + w.op(OP_SET_SERVICE_NAME, OP_MISSING); + w.u32(0); // string_id → "svc-missing" + w.op(OP_SET_SERVICE_NAME, 1); + w.u32(1); // string_id → "svc-live" + + let mut buf_data = w.finish(); + let mut state = make_state(&mut buf_data); + state.string_table_insert_one(0, "svc-missing"); + state.string_table_insert_one(1, "svc-live"); + + // Must not error even though span 99 is absent. + state.flush_change_buffer().unwrap(); + + let spans = state.flush_chunk(&[1], true).unwrap(); + assert_eq!(spans.len(), 1); + assert_eq!( + spans[0].service, "svc-live", + "op after a skipped missing-span op decoded the wrong bytes" + ); + assert_eq!(state.dropped_for_missing_span(), 1); + } + + #[test] + fn missing_span_op_does_not_abort_pending_creates() { + // Regression for the cascade: an op for a missing span appears before a + // Create in the same batch. Aborting on the missing op (old behavior) + // dropped the still-pending Create, orphaning that span at export time. + let mut w = BufWriter::new(); + w.op(OP_SET_SERVICE_NAME, OP_MISSING); + w.u32(0); // string_id → "svc-missing" + w.op(OP_CREATE, 2); + w.u128(TRACE_ID); + w.u64(1); // segment_id + w.u64(0); // parent_id + + let mut buf_data = w.finish(); + let mut state = make_state(&mut buf_data); + state.string_table_insert_one(0, "svc-missing"); + + state.flush_change_buffer().unwrap(); + + // Span 2's Create must have landed despite the earlier missing-span op. + let spans = state.flush_chunk(&[2], true).unwrap(); + assert_eq!(spans.len(), 1, "Create was dropped by the missing-span op"); + assert_eq!(state.dropped_for_missing_span(), 1); + } + + #[test] + fn flush_chunk_skips_missing_spans() { + // A chunk listing a mix of live and absent span ids extracts only the + // live ones instead of aborting the whole chunk. + let mut w = BufWriter::new(); + w.op(OP_CREATE, 1); + w.u128(TRACE_ID); + w.u64(1); + w.u64(0); + w.op(OP_CREATE, 2); + w.u128(TRACE_ID); + w.u64(1); + w.u64(1); + + let mut buf_data = w.finish(); + let mut state = make_state(&mut buf_data); + state.flush_change_buffer().unwrap(); + + let spans = state.flush_chunk(&[1, OP_MISSING, 2], true).unwrap(); + assert_eq!(spans.len(), 2, "missing chunk span aborted the extract"); + assert_eq!(state.dropped_for_missing_span(), 1); + } + + #[test] + fn flush_chunk_all_missing_is_empty_not_error() { + // A chunk whose spans are all absent yields an empty result, not an error. + let w = BufWriter::new(); + let mut buf_data = w.finish(); + let mut state = make_state(&mut buf_data); + state.flush_change_buffer().unwrap(); + + let spans = state.flush_chunk(&[OP_MISSING], true).unwrap(); + assert!(spans.is_empty()); + } + + #[test] + fn batch_op_for_missing_span_consumes_payload() { + // A BatchSetMeta for a missing span must consume its full variable-length + // payload (count + pairs) so the following op decodes from the right offset. + let mut w = BufWriter::new(); + w.op(OP_CREATE, 1); + w.u128(TRACE_ID); + w.u64(1); + w.u64(0); + w.op(OP_BATCH_SET_META, OP_MISSING); + w.u32(2); // count + w.u32(0); // key_id + w.u32(0); // val_id + w.u32(0); // key_id + w.u32(0); // val_id + w.op(OP_SET_SERVICE_NAME, 1); + w.u32(1); // string_id → "svc-live" + + let mut buf_data = w.finish(); + let mut state = make_state(&mut buf_data); + state.string_table_insert_one(0, "k"); + state.string_table_insert_one(1, "svc-live"); + + state.flush_change_buffer().unwrap(); + + let spans = state.flush_chunk(&[1], true).unwrap(); + assert_eq!(spans.len(), 1); + assert_eq!( + spans[0].service, "svc-live", + "SetServiceName decoded wrong bytes: missing-span BatchSetMeta payload not consumed" + ); + assert_eq!(state.dropped_for_missing_span(), 1); + } + + #[test] + fn batch_metric_op_for_missing_span_consumes_payload() { + // Symmetric to the BatchSetMeta case: a BatchSetMetric for a missing span must + // consume its full count-prefixed payload so the following op stays aligned. + let mut w = BufWriter::new(); + w.op(OP_CREATE, 1); + w.u128(TRACE_ID); + w.u64(1); + w.u64(0); + w.op(OP_BATCH_SET_METRIC, OP_MISSING); + w.u32(2); // count + w.u32(0); // key_id + w.u64(1.5f64.to_bits()); // value + w.u32(0); // key_id + w.u64(2.5f64.to_bits()); // value + w.op(OP_SET_SERVICE_NAME, 1); + w.u32(1); // string_id → "svc-live" + + let mut buf_data = w.finish(); + let mut state = make_state(&mut buf_data); + state.string_table_insert_one(0, "k"); + state.string_table_insert_one(1, "svc-live"); + + state.flush_change_buffer().unwrap(); + + let spans = state.flush_chunk(&[1], true).unwrap(); + assert_eq!(spans.len(), 1); + assert_eq!( + spans[0].service, "svc-live", + "SetServiceName decoded wrong bytes: missing-span BatchSetMetric payload not consumed" + ); + assert_eq!(state.dropped_for_missing_span(), 1); + } + + #[test] + fn trace_level_op_for_missing_span_is_skipped_and_keeps_cursor_aligned() { + // A trace-level op (SetTraceMetaAttr) targeting a missing span must consume its + // payload (keeping the following op aligned) and must not pollute a live segment: + // the absent span resolves segment_id to the 0 sentinel, which matches no live + // segment. + let mut w = BufWriter::new(); + w.op(OP_CREATE, 1); + w.u128(TRACE_ID); + w.u64(1); // segment_id + w.u64(0); + w.op(OP_SET_TRACE_META_ATTR, OP_MISSING); + w.u32(0); // key → "env" + w.u32(1); // value → "staging" + w.op(OP_SET_SERVICE_NAME, 1); + w.u32(2); // string_id → "svc-live" + + let mut buf_data = w.finish(); + let mut state = make_state(&mut buf_data); + state.string_table_insert_one(0, "env"); + state.string_table_insert_one(1, "staging"); + state.string_table_insert_one(2, "svc-live"); + + state.flush_change_buffer().unwrap(); + + let spans = state.flush_chunk(&[1], true).unwrap(); + assert_eq!(spans.len(), 1); + assert_eq!( + spans[0].service, "svc-live", + "cursor misaligned after a skipped trace-level op" + ); + assert_eq!( + spans[0].meta.get("env"), + None, + "live segment polluted by a trace-level op for a missing span" + ); + assert_eq!(state.dropped_for_missing_span(), 1); + } }