Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 61 additions & 9 deletions src/workerd/io/trace-stream.c++
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,17 @@ namespace {
V(CFJSON, "cfJson") \
V(CLOSE, "close") \
V(CODE, "code") \
V(COUNT, "count") \
V(CPUTIME, "cpuTime") \
V(CRON, "cron") \
V(CUSTOM, "custom") \
V(DAEMONDOWN, "daemonDown") \
V(DEBUG, "debug") \
V(DIAGNOSTICCHANNEL, "diagnosticChannel") \
V(DIAGNOSTIC, "diagnostic") \
V(DIAGNOSTICSTYPE, "diagnosticsType") \
V(DISPATCHNAMESPACE, "dispatchNamespace") \
V(DROPPEDEVENTS, "droppedEvents") \
V(EMAIL, "email") \
V(ENTRYPOINT, "entrypoint") \
V(ERROR, "error") \
Expand Down Expand Up @@ -76,6 +80,8 @@ namespace {
V(SPANOPEN, "spanOpen") \
V(STACK, "stack") \
V(STATUSCODE, "statusCode") \
V(STREAMDIAGEVENT, "streamDiagEvent") \
V(STREAMDIAGNOSTIC, "streamDiagnostic") \
V(TAG, "tag") \
V(TIMESTAMP, "timestamp") \
V(TRACEID, "traceId") \
Expand Down Expand Up @@ -485,6 +491,19 @@ jsg::JsValue ToJs(jsg::Lock& js, const Log& log, StringCache& cache) {
return obj;
}

jsg::JsValue ToJs(jsg::Lock& js, const StreamDiagnosticsEvent& streamDiag, StringCache& cache) {
auto obj = js.obj();
obj.set(js, TYPE_STR, cache.get(js, STREAMDIAGNOSTIC_STR));
// At present we only support the droppedEvents type.

// Handle droppedEvents
auto droppedEventsDiagnostic = js.obj();
droppedEventsDiagnostic.set(js, DIAGNOSTICSTYPE_STR, cache.get(js, DROPPEDEVENTS_STR));
droppedEventsDiagnostic.set(js, COUNT_STR, js.num(streamDiag.droppedEventsCount));
obj.set(js, DIAGNOSTIC_STR, kj::mv(droppedEventsDiagnostic));
return obj;
}

jsg::JsValue ToJs(jsg::Lock& js, const Return& ret, StringCache& cache) {
auto obj = js.obj();
obj.set(js, TYPE_STR, cache.get(js, RETURN_STR));
Expand Down Expand Up @@ -533,6 +552,9 @@ jsg::JsValue ToJs(jsg::Lock& js, const TailEvent& event, StringCache& cache) {
KJ_CASE_ONEOF(log, Log) {
obj.set(js, EVENT_STR, ToJs(js, log, cache));
}
KJ_CASE_ONEOF(diagEvent, StreamDiagnosticsEvent) {
obj.set(js, EVENT_STR, ToJs(js, diagEvent, cache));
}
KJ_CASE_ONEOF(ret, Return) {
obj.set(js, EVENT_STR, ToJs(js, ret, cache));
}
Expand Down Expand Up @@ -569,6 +591,9 @@ kj::Maybe<kj::StringPtr> getHandlerName(const TailEvent& event) {
KJ_CASE_ONEOF(_, Log) {
return LOG_STR;
}
KJ_CASE_ONEOF(_, StreamDiagnosticsEvent) {
return STREAMDIAGEVENT_STR;
}
KJ_CASE_ONEOF(_, Return) {
return RETURN_STR;
}
Expand Down Expand Up @@ -1004,7 +1029,7 @@ TailStreamWriter::TailStreamWriter(Pending pending, kj::TaskSet& waitUntilTasks)
: inner(kj::mv(pending)),
waitUntilTasks(waitUntilTasks) {}

bool TailStreamWriter::reportImpl(TailEvent&& event) {
bool TailStreamWriter::reportImpl(TailEvent&& event, size_t sizeHint) {
// In reportImpl, our inner state must be active.
auto& actives = KJ_ASSERT_NONNULL(inner.tryGet<kj::Vector<kj::Own<Active>>>());

Expand All @@ -1030,13 +1055,35 @@ bool TailStreamWriter::reportImpl(TailEvent&& event) {
bool isClosing = event.event.is<Outcome>();
// Deliver the event to the queue and make sure we are processing.
for (auto& active: actives) {
// Optimization: Elide copy for last tail worker, helpful for common case of only one STW
// being present.
if (&active == &actives.back()) {
active->queue.push(kj::mv(event));
// Only queue the event if we don't have an excessive queue size yet. Return and Outcome
// events are only provided once and thus won't be dropped.
if (active->queueSize < maxQueueSize || event.event.is<Outcome>() || event.event.is<Return>()) {
// When we get to the outcome, no more events will be dropped. Inject an internal diagnostics
// event indicating how many events were dropped if applicable.
if (event.event.is<Outcome>() && active->droppedEvents > 0) {
StreamDiagnosticsEvent diag(active->droppedEvents);
TailEvent diagTailEvent(event.spanContext.clone(), event.invocationId, event.timestamp,
event.sequence, kj::mv(diag));
active->queue.push(kj::mv(diagTailEvent));
// Increment the outcome sequence number to keep things consistent.
event.sequence++;
}

// Optimization: Elide copy for last tail worker, helpful for common case of only one STW
// being present.
if (&active == &actives.back()) {
active->queue.push(kj::mv(event));
} else {
active->queue.push(event.clone());
}
// Adjust estimated queue size based on size hint and an arbitrary amount for serialization
// overhead. As long as this estimate is reasonably accurate, we won't need to check the
// size again when serializing the message.
active->queueSize += tailSerializationOverhead + sizeHint;
} else {
active->queue.push(event.clone());
active->droppedEvents++;
Comment thread
fhanau marked this conversation as resolved.
}

if (!active->pumping) {
waitUntilTasks.add(pump(kj::addRef(*active)));
}
Expand Down Expand Up @@ -1088,6 +1135,9 @@ kj::Promise<void> TailStreamWriter::pump(kj::Own<Active> current) {
auto builder = KJ_ASSERT_NONNULL(current->capability).reportRequest();
auto eventsBuilder = builder.initEvents(current->queue.size());
size_t n = 0;

// We're synchronously draining the queue – reset its size.
current->queueSize = 0;
current->queue.drainTo([&](TailEvent&& event) { event.copyTo(eventsBuilder[n++]); });

auto result = co_await builder.send();
Expand Down Expand Up @@ -1128,8 +1178,10 @@ kj::Maybe<kj::Own<TailStreamWriter>> initializeTailStreamWriter(
return kj::heap<TailStreamWriter>(kj::mv(streamingTailWorkers), waitUntilTasks);
}

void TailStreamWriter::report(
const InvocationSpanContext& context, TailEvent::Event&& event, kj::Date timestamp) {
void TailStreamWriter::report(const InvocationSpanContext& context,
TailEvent::Event&& event,
kj::Date timestamp,
size_t sizeHint) {
// Becomes a no-op if a terminal event (close) has been reported, or if the stream closed due to
// not receiving a well-formed event handler. We need to disambiguate these cases as the former
// indicates an implementation error resulting in trailing events whereas the latter case is
Expand Down Expand Up @@ -1195,7 +1247,7 @@ void TailStreamWriter::report(

// The state is determined to be closing when it receives a terminal event (tracing::Outcome),
// or if there are no active tail workers left, we can close the internal state at that point.
if (reportImpl(kj::mv(tailEvent))) {
if (reportImpl(kj::mv(tailEvent), sizeHint)) {
inner = Closed{};
}
}
Expand Down
21 changes: 19 additions & 2 deletions src/workerd/io/trace-stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,13 @@ class TailStreamCustomEvent final: public WorkerInterface::CustomEvent {
// A utility class that receives tracing events and generates/reports TailEvents.
class TailStreamWriter final {
public:
// The maximum size of the queue, in bytes.
const size_t maxQueueSize = 2 * 1024 * 1024;
// The estimated overhead of TailEvent wrapping per message. This does not need to be very
// accurate, but should be enough to avoid allocating too much memory/hitting capnp RPC message
// size limits when sending many tiny events.
const size_t tailSerializationOverhead = 64;

Comment thread
fhanau marked this conversation as resolved.
// The initial state of our tail worker writer is that it is pending the first onset event. During
// this time we will only have a collection of WorkerInterface instances. When our first event is
// reported (the onset) we will arrange to acquire tailStream capabilities from each then use
Expand All @@ -75,7 +82,10 @@ class TailStreamWriter final {
TailStreamWriter(Pending pending, kj::TaskSet& waitUntilTasks);
KJ_DISALLOW_COPY_AND_MOVE(TailStreamWriter);

void report(const InvocationSpanContext& context, TailEvent::Event&& event, kj::Date time);
void report(const InvocationSpanContext& context,
TailEvent::Event&& event,
kj::Date time,
size_t sizeHint);

private:
// Instances of Active are refcounted. The TailStreamWriter itself holds the initial ref. Whenever
Expand All @@ -87,6 +97,11 @@ class TailStreamWriter final {
kj::Maybe<rpc::TailStreamTarget::Client> capability;
bool pumping = false;
bool onsetSeen = false;
// Estimated byte size of the queue, used to drop events to avoid excessive memory usage.
size_t queueSize = 0;
// The number of tail events we had to drop. We'll send a warning indicating this at the end of
// the stream.
uint32_t droppedEvents = 0;
workerd::util::Queue<TailEvent> queue;

Active(rpc::TailStreamTarget::Client capability): capability(kj::mv(capability)) {}
Expand All @@ -98,7 +113,9 @@ class TailStreamWriter final {
kj::TaskSet& waitUntilTasks;

static kj::Promise<void> pump(kj::Own<Active> current);
bool reportImpl(TailEvent&& event);
// Report an event to the tail stream writer.
// sizeHint: The approximate size of the event, in bytes.
bool reportImpl(TailEvent&& event, size_t sizeHint);

uint32_t sequence = 0;
bool onsetSeen = false;
Expand Down
15 changes: 15 additions & 0 deletions src/workerd/io/trace-test.c++
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,21 @@ KJ_TEST("Read/Write Exception works") {
KJ_ASSERT(info3.stack == kj::none);
}

KJ_TEST("Read/Write StreamDiagnosticsEvent works") {
capnp::MallocMessageBuilder builder;
auto infoBuilder = builder.initRoot<rpc::Trace::StreamDiagnosticsEvent>();

StreamDiagnosticsEvent info(42);
info.copyTo(infoBuilder);

auto reader = infoBuilder.asReader();
StreamDiagnosticsEvent info2(reader);
KJ_ASSERT(info2.droppedEventsCount == 42);

StreamDiagnosticsEvent info3 = info.clone();
KJ_ASSERT(info3.droppedEventsCount == 42);
}

KJ_TEST("Read/Write Attribute works") {
capnp::MallocMessageBuilder builder;
auto infoBuilder = builder.initRoot<rpc::Trace::Attribute>();
Expand Down
40 changes: 40 additions & 0 deletions src/workerd/io/trace.c++
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,9 @@ kj::String KJ_STRINGIFY(const TailEvent::Event& event) {
KJ_CASE_ONEOF(log, Log) {
return kj::str("Log");
}
KJ_CASE_ONEOF(streamDiag, StreamDiagnosticsEvent) {
return kj::str("StreamDiagnosticsEvent(droppedEvents: ", streamDiag.droppedEventsCount, ")");
}
KJ_CASE_ONEOF(ret, Return) {
return kj::str("Return");
}
Expand Down Expand Up @@ -533,6 +536,34 @@ DiagnosticChannelEvent DiagnosticChannelEvent::clone() const {
return DiagnosticChannelEvent(timestamp, kj::str(channel), kj::heapArray<kj::byte>(message));
}

StreamDiagnosticsEvent::StreamDiagnosticsEvent(uint32_t droppedEventsCount)
: droppedEventsCount(droppedEventsCount) {}

StreamDiagnosticsEvent::StreamDiagnosticsEvent(rpc::Trace::StreamDiagnosticsEvent::Reader reader) {
auto diagnosticReader = reader.getDiagnostic();
switch (diagnosticReader.which()) {
case rpc::Trace::StreamDiagnosticsEvent::Diagnostic::UNDEFINED:
KJ_FAIL_ASSERT("received invalid diagnostics event");
break;
case rpc::Trace::StreamDiagnosticsEvent::Diagnostic::DROPPED_EVENTS:
auto droppedEvents = diagnosticReader.getDroppedEvents();
droppedEventsCount = droppedEvents.getCount();
KJ_DASSERT(droppedEventsCount > 0);
break;
}
}

void StreamDiagnosticsEvent::copyTo(rpc::Trace::StreamDiagnosticsEvent::Builder builder) const {
KJ_DASSERT(droppedEventsCount > 0);
auto diagnosticBuilder = builder.initDiagnostic();
auto droppedEventsBuilder = diagnosticBuilder.initDroppedEvents();
droppedEventsBuilder.setCount(droppedEventsCount);
}

StreamDiagnosticsEvent StreamDiagnosticsEvent::clone() const {
return StreamDiagnosticsEvent(droppedEventsCount);
}

HibernatableWebSocketEventInfo::HibernatableWebSocketEventInfo(Type type): type(type) {}

HibernatableWebSocketEventInfo::HibernatableWebSocketEventInfo(
Expand Down Expand Up @@ -1356,6 +1387,9 @@ TailEvent::Event readEventFromTailEvent(const rpc::Trace::TailEvent::Reader& rea
case rpc::Trace::TailEvent::Event::LOG: {
return Log(event.getLog());
}
case rpc::Trace::TailEvent::Event::STREAM_DIAGNOSTICS: {
return StreamDiagnosticsEvent(event.getStreamDiagnostics());
}
}
KJ_UNREACHABLE;
}
Expand Down Expand Up @@ -1396,6 +1430,9 @@ void TailEvent::copyTo(rpc::Trace::TailEvent::Builder builder) const {
KJ_CASE_ONEOF(log, Log) {
log.copyTo(eventBuilder.initLog());
}
KJ_CASE_ONEOF(streamDiag, StreamDiagnosticsEvent) {
streamDiag.copyTo(eventBuilder.initStreamDiagnostics());
}
KJ_CASE_ONEOF(ret, Return) {
ret.copyTo(eventBuilder.initReturn());
}
Expand Down Expand Up @@ -1433,6 +1470,9 @@ TailEvent TailEvent::clone() const {
KJ_CASE_ONEOF(log, Log) {
return log.clone();
}
KJ_CASE_ONEOF(streamDiag, StreamDiagnosticsEvent) {
return streamDiag.clone();
}
KJ_CASE_ONEOF(ret, Return) {
return ret.clone();
}
Expand Down
16 changes: 16 additions & 0 deletions src/workerd/io/trace.h
Original file line number Diff line number Diff line change
Expand Up @@ -512,6 +512,21 @@ struct DiagnosticChannelEvent final {
DiagnosticChannelEvent clone() const;
};

// Describes a stream diagnostics event. Currently only droppedEvents is supported.
struct StreamDiagnosticsEvent final {
explicit StreamDiagnosticsEvent(uint32_t droppedEventsCount);
StreamDiagnosticsEvent(rpc::Trace::StreamDiagnosticsEvent::Reader reader);
StreamDiagnosticsEvent(StreamDiagnosticsEvent&&) = default;
KJ_DISALLOW_COPY(StreamDiagnosticsEvent);

// The count of dropped events for the "droppedEvents" diagnostic. When we support other event
// types, this should be replaced with a kj::OneOf<> of all the different types.
uint32_t droppedEventsCount;

void copyTo(rpc::Trace::StreamDiagnosticsEvent::Builder builder) const;
StreamDiagnosticsEvent clone() const;
};

// Describes a log event
struct Log final {
explicit Log(kj::Date timestamp, LogLevel logLevel, kj::String message);
Expand Down Expand Up @@ -765,6 +780,7 @@ struct TailEvent final {
DiagnosticChannelEvent,
Exception,
Log,
StreamDiagnosticsEvent,
Return,
CustomInfo>;

Expand Down
Loading