Skip to content

Commit 0e7dd4b

Browse files
committed
add tracing feature to timely, and optionally instrument it for the tokio-console
1 parent 4db679a commit 0e7dd4b

2 files changed

Lines changed: 25 additions & 0 deletions

File tree

timely/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ timely_communication = { path = "../communication", version = "0.12", default-fe
3232
timely_container = { path = "../container", version = "0.12" }
3333
crossbeam-channel = "0.5.0"
3434
futures-util = "0.3"
35+
tracing = { version = "0.1.31", optional = true }
3536

3637
[dev-dependencies]
3738
# timely_sort="0.1.6"

timely/src/dataflow/operators/generic/builder_raw.rs

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,26 @@ impl<G: Scope> OperatorBuilder<G> {
161161
let inputs = self.shape.inputs;
162162
let outputs = self.shape.outputs;
163163

164+
// Generate a tracing span for the function.
165+
#[cfg(feature = "tracing")]
166+
let span = {
167+
let location = std::panic::Location::caller();
168+
// timely.console.span is a marker target allowing us to
169+
// turn these spans on and off specifically
170+
// TODO(guswynn): check with tracing folks if there is a better
171+
// way to do this
172+
tracing::trace_span!(
173+
target: "timely.console.span",
174+
"runtime.spawn",
175+
kind = %"timely-operator",
176+
"fn" = %std::any::type_name::<L>(),
177+
task.name = self.shape.name.as_str(),
178+
loc.file = location.file(),
179+
loc.line = location.line(),
180+
loc.col = location.column(),
181+
)
182+
};
183+
164184
let operator = OperatorCore {
165185
shape: self.shape,
166186
address: self.address,
@@ -190,6 +210,8 @@ where
190210
shared_progress: Rc<RefCell<SharedProgress<T>>>,
191211
activations: Rc<RefCell<Activations>>,
192212
summary: Vec<Vec<Antichain<T::Summary>>>,
213+
#[cfg(feature = "tracing")]
214+
span: tracing::Span,
193215
}
194216

195217
impl<T, L> Schedule for OperatorCore<T, L>
@@ -201,6 +223,8 @@ where
201223
fn path(&self) -> &[usize] { &self.address[..] }
202224
fn schedule(&mut self) -> bool {
203225
let shared_progress = &mut *self.shared_progress.borrow_mut();
226+
#[cfg(feature = "tracing")]
227+
let _s = self.span.enter();
204228
(self.logic)(shared_progress)
205229
}
206230
}

0 commit comments

Comments
 (0)