Skip to content

Commit fb0cf11

Browse files
thomasywangmeta-codesync[bot]
authored andcommitted
Add spans to points along the message send/recv path (#1833)
Summary: Pull Request resolved: #1833 Our observability only shows us when a message is sent out and when it is delivered to its final destination. We want to add spans to some of the important stops along the way Reviewed By: vidhyav Differential Revision: D86802704 fbshipit-source-id: bf0708ef5021e38f74658071a36a08960e8658eb
1 parent 3acd912 commit fb0cf11

File tree

8 files changed

+339
-222
lines changed

8 files changed

+339
-222
lines changed

hyperactor/src/channel.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,11 +119,13 @@ pub trait Tx<M: RemoteMessage>: std::fmt::Debug {
119119
/// message is either delivered, or we eventually discover that
120120
/// the channel has failed and it will be sent back on `return_channel`.
121121
#[allow(clippy::result_large_err)] // TODO: Consider reducing the size of `SendError`.
122+
#[hyperactor::instrument_infallible]
122123
fn try_post(&self, message: M, return_channel: oneshot::Sender<SendError<M>>) {
123124
self.do_post(message, Some(return_channel));
124125
}
125126

126127
/// Enqueue a message to be sent on the channel.
128+
#[hyperactor::instrument_infallible]
127129
fn post(&self, message: M) {
128130
self.do_post(message, None);
129131
}
@@ -803,6 +805,7 @@ enum ChannelRxKind<M: RemoteMessage> {
803805

804806
#[async_trait]
805807
impl<M: RemoteMessage> Rx<M> for ChannelRx<M> {
808+
#[hyperactor::instrument]
806809
async fn recv(&mut self) -> Result<M, ChannelError> {
807810
match &mut self.inner {
808811
ChannelRxKind::Local(rx) => rx.recv().await,

hyperactor/src/channel/net/client.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ use std::ops::DerefMut;
1717
use backoff::ExponentialBackoffBuilder;
1818
use backoff::backoff::Backoff;
1919
use enum_as_inner::EnumAsInner;
20+
use hyperactor_telemetry::skip_record;
2021
use tokio::io::AsyncWriteExt;
2122
use tokio::io::ReadHalf;
2223
use tokio::io::WriteHalf;
@@ -789,8 +790,6 @@ where
789790
.as_ref()
790791
.map(|acked_seq| AckedSeqValue(acked_seq.clone()));
791792

792-
use hyperactor_telemetry::skip_record;
793-
794793
tracing::span!(
795794
Level::ERROR,
796795
"net i/o loop",
@@ -926,7 +925,7 @@ where
926925
tokio::select! {
927926
biased;
928927

929-
ack_result = reader.next() => {
928+
ack_result = reader.next().instrument(tracing::span!(Level::ERROR, "read ack", skip_record)) => {
930929
match ack_result {
931930
Ok(Some(buffer)) => {
932931
match deserialize_response(buffer) {
@@ -1018,7 +1017,7 @@ where
10181017

10191018
// We have to be careful to manage outgoing write states, so that we never write
10201019
// partial frames in the presence cancellation.
1021-
send_result = write_state.send() => {
1020+
send_result = write_state.send().instrument(tracing::span!(Level::ERROR, "write bytes", skip_record)) => {
10221021
match send_result {
10231022
Ok(()) => {
10241023
let mut message = outbox.pop_front().expect("outbox should not be empty");

0 commit comments

Comments
 (0)