Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 17 additions & 1 deletion xprof/convert/trace_viewer/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ cc_library(
"@com_google_absl//absl/base:core_headers",
"@com_google_absl//absl/container:flat_hash_map",
"@com_google_absl//absl/container:flat_hash_set",
"@com_google_absl//absl/functional:any_invocable",
"@com_google_absl//absl/functional:bind_front",
"@com_google_absl//absl/log",
"@com_google_absl//absl/log:check",
Expand All @@ -157,7 +158,7 @@ cc_library(
"@com_google_absl//absl/status:statusor",
"@com_google_absl//absl/strings",
"@com_google_absl//absl/time",
"@com_google_absl//absl/types:optional",
"@com_google_protobuf//:protobuf",
"@org_xprof//plugin/xprof/protobuf:task_proto_cc",
"@org_xprof//plugin/xprof/protobuf:trace_events_proto_cc",
"@org_xprof//plugin/xprof/protobuf:trace_events_raw_proto_cc",
Expand Down Expand Up @@ -255,3 +256,18 @@ cc_test(
"@xla//xla/tsl/platform:env",
],
)

cc_test(
name = "trace_events_test",
srcs = ["trace_events_test.cc"],
deps = [
":trace_events",
"//net/proto2/contrib/parse_proto:parse_text_proto",
"@com_google_absl//absl/status",
"@com_google_googletest//:gtest",
"@com_google_googletest//:gtest_main",
"@com_google_protobuf//:protobuf",
"@org_xprof//plugin/xprof/protobuf:trace_events_proto_cc",
"@org_xprof//plugin/xprof/protobuf:trace_events_raw_proto_cc",
],
)
105 changes: 63 additions & 42 deletions xprof/convert/trace_viewer/trace_events.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ limitations under the License.
#include <iterator>
#include <limits>
#include <memory>
#include <optional>
#include <string>
#include <utility>
#include <variant>
Expand All @@ -38,6 +37,8 @@ limitations under the License.
#include "absl/strings/string_view.h"
#include "absl/time/clock.h"
#include "absl/time/time.h"
#include "google/protobuf/arena.h"
#include "google/protobuf/io/coded_stream.h"
#include "xla/tsl/lib/io/iterator.h"
#include "xla/tsl/lib/io/table.h"
#include "xla/tsl/lib/io/table_builder.h"
Expand Down Expand Up @@ -94,6 +95,20 @@ inline void AppendEvents(TraceEventTrack&& src, TraceEventTrack* dst) {
}
}

template <typename Modifier>
absl::Status SerializeWithReusableEvent(const TraceEvent& event,
TraceEvent& reusable_event_copy,
std::string& output,
Modifier modifier) {
reusable_event_copy = event;
modifier(&reusable_event_copy);

output.clear();
return reusable_event_copy.AppendToString(&output)
? absl::OkStatus()
: absl::InternalError("Failed to serialize trace event");
}

} // namespace

TraceEvent::EventType GetTraceEventType(const TraceEvent& event) {
Expand Down Expand Up @@ -259,20 +274,21 @@ absl::Status DoStoreAsLevelDbTables(
const Trace& trace, std::unique_ptr<tsl::WritableFile>& trace_events_file,
std::unique_ptr<tsl::WritableFile>& trace_events_metadata_file,
std::unique_ptr<tsl::WritableFile>& trace_events_prefix_trie_file) {
auto executor = std::make_unique<XprofThreadPoolExecutor>(
"StoreAsLevelDbTables", /*num_threads=*/3);
std::unique_ptr<XprofThreadPoolExecutor> executor =
std::make_unique<XprofThreadPoolExecutor>("StoreAsLevelDbTables",
/*num_threads=*/3);
absl::Status trace_events_status, trace_events_metadata_status;
executor->Execute(
[&trace_events_file, &trace, &events_by_level, &trace_events_status]() {
trace_events_status = DoStoreAsLevelDbTable(
trace_events_file, trace, events_by_level,
GenerateTraceEventCopyForPersistingEventWithoutMetadata);
SerializeTraceEventForPersistingEventWithoutMetadata);
});
executor->Execute([&trace_events_metadata_file, &events_by_level, &trace,
&trace_events_metadata_status]() {
trace_events_metadata_status = DoStoreAsLevelDbTable(
trace_events_metadata_file, trace, events_by_level,
GenerateTraceEventCopyForPersistingOnlyMetadata);
SerializeTraceEventForPersistingOnlyMetadata);
});
absl::Status trace_events_prefix_trie_status;
executor->Execute([&trace_events_prefix_trie_file, &events_by_level,
Expand All @@ -286,43 +302,43 @@ absl::Status DoStoreAsLevelDbTables(
return trace_events_status;
}

std::optional<TraceEvent> GenerateTraceEventCopyForPersistingFullEvent(
const TraceEvent* event) {
TraceEvent event_copy = *event;
// To reduce file size, clear the timestamp from the value. It is
// redundant info because the timestamp is part of the key.
event_copy.clear_timestamp_ps();
return event_copy;
absl::Status SerializeTraceEventForPersistingFullEvent(
const TraceEvent& event, TraceEvent& reusable_event_copy,
std::string& output) {
return SerializeWithReusableEvent(
event, reusable_event_copy, output,
[](TraceEvent* copy) { copy->clear_timestamp_ps(); });
}

std::optional<TraceEvent>
GenerateTraceEventCopyForPersistingEventWithoutMetadata(
const TraceEvent* event) {
TraceEvent event_copy = *event;
// To reduce file size, clear the timestamp from the value. It is
// redundant info because the timestamp is part of the key.
event_copy.clear_timestamp_ps();
// To reduce file size, clear the raw data from the value. It is
// redundant info because the raw data is stored in the metadata file.
// However, we still need to keep the raw data for counter events as they
// are a special case and we need to return the args for the same during the
// initial read.
if (GetTraceEventType(*event) != TraceEvent::EVENT_TYPE_COUNTER) {
event_copy.clear_raw_data();
}
return event_copy;
absl::Status SerializeTraceEventForPersistingEventWithoutMetadata(
const TraceEvent& event, TraceEvent& reusable_event_copy,
std::string& output) {
return SerializeWithReusableEvent(
event, reusable_event_copy, output, [](TraceEvent* copy) {
copy->clear_timestamp_ps();
if (GetTraceEventType(*copy) != TraceEvent::EVENT_TYPE_COUNTER) {
copy->clear_raw_data();
}
});
}

std::optional<TraceEvent> GenerateTraceEventCopyForPersistingOnlyMetadata(
const TraceEvent* event) {
if (GetTraceEventType(*event) == TraceEvent::EVENT_TYPE_COUNTER) {
// Counter events are stored in the trace events file itself and do not
// require a metadata copy.
return std::nullopt;
absl::Status SerializeTraceEventForPersistingOnlyMetadata(
const TraceEvent& event, TraceEvent& reusable_event, std::string& output) {
// Counter events are stored in the trace events file itself and do not
// require a metadata copy.
if (GetTraceEventType(event) == TraceEvent::EVENT_TYPE_COUNTER) {
return absl::NotFoundError("No metadata found for counter event");
}
TraceEvent event_copy;
event_copy.set_raw_data(event->raw_data());
return event_copy;

// Reconstruct explicitly avoiding a deep copy inside
// SerializeWithReusableEvent to save substantial memory allocation
// operations.
reusable_event.Clear();
reusable_event.set_raw_data(event.raw_data());
output.clear();
return reusable_event.AppendToString(&output)
? absl::OkStatus()
: absl::InternalError("Failed to serialize trace event");
}

// Store the contents of this container in an sstable file. The format is as
Expand All @@ -340,8 +356,7 @@ std::optional<TraceEvent> GenerateTraceEventCopyForPersistingOnlyMetadata(
absl::Status DoStoreAsLevelDbTable(
std::unique_ptr<tsl::WritableFile>& file, const Trace& trace,
const std::vector<std::vector<const TraceEvent*>>& events_by_level,
std::function<std::optional<TraceEvent>(const TraceEvent*)>
generate_event_copy_fn) {
SerializeEventFn serialize_event_fn) {
absl::Time start_time = absl::Now();
tsl::table::Options options;
options.block_size = 20 * 1024 * 1024;
Expand All @@ -351,6 +366,9 @@ absl::Status DoStoreAsLevelDbTable(
builder.Add(kTraceMetadataKey, trace.SerializeAsString());

size_t num_of_events_dropped = 0; // Due to too many timestamp repetitions.
google::protobuf::Arena arena;
TraceEvent* reusable_event = google::protobuf::Arena::Create<TraceEvent>(&arena);
std::string buffer;
for (int zoom_level = 0; zoom_level < events_by_level.size(); ++zoom_level) {
// The key of level db table have to be monotonically increasing, therefore
// we make the timestamp repetition count as the last byte of key as tie
Expand All @@ -361,9 +379,12 @@ absl::Status DoStoreAsLevelDbTable(
uint64_t timestamp = event->timestamp_ps();
std::string key = LevelDbTableKey(zoom_level, timestamp, event->serial());
if (!key.empty()) {
auto event_copy = generate_event_copy_fn(event);
if (event_copy.has_value()) {
builder.Add(key, event_copy->SerializeAsString());
absl::Status status =
serialize_event_fn(*event, *reusable_event, buffer);
if (status.ok()) {
builder.Add(key, buffer);
} else if (!absl::IsNotFound(status)) {
return status;
}
} else {
++num_of_events_dropped;
Expand Down
31 changes: 16 additions & 15 deletions xprof/convert/trace_viewer/trace_events.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ limitations under the License.
#include "absl/base/optimization.h"
#include "absl/container/flat_hash_map.h"
#include "absl/container/flat_hash_set.h"
#include "absl/functional/any_invocable.h"
#include "absl/functional/bind_front.h"
#include "absl/log/check.h"
#include "absl/log/log.h"
Expand All @@ -40,7 +41,6 @@ limitations under the License.
#include "absl/strings/string_view.h"
#include "absl/time/clock.h"
#include "absl/time/time.h"
#include "absl/types/optional.h"
#include "xla/tsl/lib/io/iterator.h"
#include "xla/tsl/lib/io/table.h"
#include "xla/tsl/lib/io/table_builder.h"
Expand Down Expand Up @@ -87,34 +87,35 @@ static constexpr int kSearchParallelizationThreshold = 100;
std::vector<TraceEvent*> MergeEventTracks(
const std::vector<const TraceEventTrack*>& event_tracks);

using SerializeEventFn = absl::AnyInvocable<absl::Status(
const TraceEvent&, TraceEvent&, std::string&)>;

absl::Status DoStoreAsLevelDbTable(
std::unique_ptr<tsl::WritableFile>& file, const Trace& trace,
const std::vector<std::vector<const TraceEvent*>>& events_by_level,
std::function<std::optional<TraceEvent>(const TraceEvent*)>
generate_event_copy_fn);
SerializeEventFn serialize_event_fn);

absl::Status DoStoreAsLevelDbTables(
const std::vector<std::vector<const TraceEvent*>>& events_by_level,
const Trace& trace, std::unique_ptr<tsl::WritableFile>& trace_events_file,
std::unique_ptr<tsl::WritableFile>& trace_events_metadata_file,
std::unique_ptr<tsl::WritableFile>& trace_events_prefix_trie_file);

// Generates a copy of the event to be persisted in the trace events file.
// Serializes a copy of the event to be persisted in the trace events file.
// This is the copy of the passed event without the timestamp_ps field.
std::optional<TraceEvent> GenerateTraceEventCopyForPersistingFullEvent(
const TraceEvent* event);
absl::Status SerializeTraceEventForPersistingFullEvent(
const TraceEvent& event, TraceEvent& reusable_event, std::string& output);

// Generates a copy of the event to be persisted in the trace events file.
// Serializes a copy of the event to be persisted in the trace events file.
// This is the copy of the passed event without the raw_data and timestamp_ps
// fields.
std::optional<TraceEvent>
GenerateTraceEventCopyForPersistingEventWithoutMetadata(
const TraceEvent* event);
absl::Status SerializeTraceEventForPersistingEventWithoutMetadata(
const TraceEvent& event, TraceEvent& reusable_event, std::string& output);

// It generates a copy of the event to be persisted in the trace events metadata
// file. This only has the raw_data field set.
std::optional<TraceEvent> GenerateTraceEventCopyForPersistingOnlyMetadata(
const TraceEvent* event);
// It serializes a copy of the event to be persisted in the trace events
// metadata file. This only has the raw_data field set.
absl::Status SerializeTraceEventForPersistingOnlyMetadata(
const TraceEvent& event, TraceEvent& reusable_event, std::string& output);

// Opens the level db table from the given filename. The table is owned by the
// caller.
Expand Down Expand Up @@ -776,7 +777,7 @@ class TraceEventsContainerBase {
trace.set_num_events(NumEvents());
auto events_by_level = EventsByLevel();
return DoStoreAsLevelDbTable(file, trace, events_by_level,
GenerateTraceEventCopyForPersistingFullEvent);
SerializeTraceEventForPersistingFullEvent);
}

// Stores the contents of this container in three level-db sstable files. The
Expand Down
Loading
Loading