Skip to content

Commit cef1f5c

Browse files
committed
add tracing feature to timely, and optionally instrument it for the tokio-console
1 parent 4db679a commit cef1f5c

2 files changed

Lines changed: 27 additions & 0 deletions

File tree

timely/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ timely_communication = { path = "../communication", version = "0.12", default-fe
3232
timely_container = { path = "../container", version = "0.12" }
3333
crossbeam-channel = "0.5.0"
3434
futures-util = "0.3"
35+
tracing = { version = "0.1.31", optional = true }
3536

3637
[dev-dependencies]
3738
# timely_sort="0.1.6"

timely/src/dataflow/operators/generic/builder_raw.rs

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -161,13 +161,35 @@ impl<G: Scope> OperatorBuilder<G> {
161161
let inputs = self.shape.inputs;
162162
let outputs = self.shape.outputs;
163163

164+
// Generate a tracing span for the function.
165+
#[cfg(feature = "tracing")]
166+
let span = {
167+
let location = std::panic::Location::caller();
168+
// timely.console.span is a marker target allowing us to
169+
// turn these spans on and off specifically
170+
// TODO(guswynn): check with tracing folks if there is a better
171+
// way to do this
172+
tracing::trace_span!(
173+
target: "timely.console.span",
174+
"runtime.spawn",
175+
kind = %"timely-operator",
176+
"fn" = %std::any::type_name::<L>(),
177+
task.name = self.shape.name.as_str(),
178+
loc.file = location.file(),
179+
loc.line = location.line(),
180+
loc.col = location.column(),
181+
)
182+
};
183+
164184
let operator = OperatorCore {
165185
shape: self.shape,
166186
address: self.address,
167187
activations: self.scope.activations(),
168188
logic,
169189
shared_progress: Rc::new(RefCell::new(SharedProgress::new(inputs, outputs))),
170190
summary: self.summary,
191+
#[cfg(feature = "tracing")]
192+
span,
171193
};
172194

173195
self.scope.add_operator_with_indices(Box::new(operator), self.index, self.global);
@@ -190,6 +212,8 @@ where
190212
shared_progress: Rc<RefCell<SharedProgress<T>>>,
191213
activations: Rc<RefCell<Activations>>,
192214
summary: Vec<Vec<Antichain<T::Summary>>>,
215+
#[cfg(feature = "tracing")]
216+
span: tracing::Span,
193217
}
194218

195219
impl<T, L> Schedule for OperatorCore<T, L>
@@ -201,6 +225,8 @@ where
201225
fn path(&self) -> &[usize] { &self.address[..] }
202226
fn schedule(&mut self) -> bool {
203227
let shared_progress = &mut *self.shared_progress.borrow_mut();
228+
#[cfg(feature = "tracing")]
229+
let _s = self.span.enter();
204230
(self.logic)(shared_progress)
205231
}
206232
}

0 commit comments

Comments
 (0)