From cef1f5cc3965d11369276aeed9888d7db4f3a73b Mon Sep 17 00:00:00 2001 From: Gus Wynn Date: Fri, 25 Feb 2022 12:01:00 -0800 Subject: [PATCH] add tracing feature to timely, and optionally instrument it for the tokio-console --- timely/Cargo.toml | 1 + .../dataflow/operators/generic/builder_raw.rs | 26 +++++++++++++++++++ 2 files changed, 27 insertions(+) diff --git a/timely/Cargo.toml b/timely/Cargo.toml index 0b550e53c..b498c97af 100644 --- a/timely/Cargo.toml +++ b/timely/Cargo.toml @@ -32,6 +32,7 @@ timely_communication = { path = "../communication", version = "0.12", default-fe timely_container = { path = "../container", version = "0.12" } crossbeam-channel = "0.5.0" futures-util = "0.3" +tracing = { version = "0.1.31", optional = true } [dev-dependencies] # timely_sort="0.1.6" diff --git a/timely/src/dataflow/operators/generic/builder_raw.rs b/timely/src/dataflow/operators/generic/builder_raw.rs index 8e97492af..4f1b72727 100644 --- a/timely/src/dataflow/operators/generic/builder_raw.rs +++ b/timely/src/dataflow/operators/generic/builder_raw.rs @@ -161,6 +161,26 @@ impl OperatorBuilder { let inputs = self.shape.inputs; let outputs = self.shape.outputs; + // Generate a tracing span for the function. + #[cfg(feature = "tracing")] + let span = { + let location = std::panic::Location::caller(); + // timely.console.span is a marker target allowing us to + // turn these spans on and off specifically + // TODO(guswynn): check with tracing folks if there is a better + // way to do this + tracing::trace_span!( + target: "timely.console.span", + "runtime.spawn", + kind = %"timely-operator", + "fn" = %std::any::type_name::(), + task.name = self.shape.name.as_str(), + loc.file = location.file(), + loc.line = location.line(), + loc.col = location.column(), + ) + }; + let operator = OperatorCore { shape: self.shape, address: self.address, @@ -168,6 +188,8 @@ impl OperatorBuilder { logic, shared_progress: Rc::new(RefCell::new(SharedProgress::new(inputs, outputs))), summary: self.summary, + #[cfg(feature = "tracing")] + span, }; self.scope.add_operator_with_indices(Box::new(operator), self.index, self.global); @@ -190,6 +212,8 @@ where shared_progress: Rc>>, activations: Rc>, summary: Vec>>, + #[cfg(feature = "tracing")] + span: tracing::Span, } impl Schedule for OperatorCore @@ -201,6 +225,8 @@ where fn path(&self) -> &[usize] { &self.address[..] } fn schedule(&mut self) -> bool { let shared_progress = &mut *self.shared_progress.borrow_mut(); + #[cfg(feature = "tracing")] + let _s = self.span.enter(); (self.logic)(shared_progress) } }