diff --git a/.gitignore b/.gitignore index cb37fd0a..fddee521 100644 --- a/.gitignore +++ b/.gitignore @@ -8,6 +8,9 @@ cmake_install.cmake install_manifest.txt compile_commands.json +# Logging +/mlog/ + # Ninja .ninja_deps .ninja_log diff --git a/CMakeLists.txt b/CMakeLists.txt index da91c152..887bfa84 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -92,6 +92,7 @@ add_library(moqx_core STATIC src/UpstreamProvider.cpp src/relay/TopNFilter.cpp src/relay/PropertyRanking.cpp + src/logging/MLogCleaner.cpp ) target_include_directories(moqx_core @@ -110,6 +111,9 @@ target_link_libraries(moqx_core PUBLIC # FizzAcceptorHandshakeHelper but omits this dep from its cmake config. Folly::folly_io_async_fdsock_async_fd_socket moxygen::moxygen_moqclient + moxygen::moxygen_mlog_file_mlogger + moxygen::moxygen_mlog_mlogger_factory + moxygen::moxygen_mlog_sampling_mlogger_factory ) target_compile_options(moqx_core PRIVATE -Wall -Wextra -Wpedantic) diff --git a/config.example.yaml b/config.example.yaml index 46479f3f..15f0993b 100644 --- a/config.example.yaml +++ b/config.example.yaml @@ -74,6 +74,15 @@ admin: # cert_file: /path/to/cert.pem # key_file: /path/to/key.pem +logging: # Logging configuration (optional) + mlog: # MoQ-level structured logging + dir: "./mlog" # Output directory for per-session mlog files + sample_rate: 1 # Fraction of sessions to log (0.0–1.0, default: 1.0) + # max_age_days: 7 # Delete .mlog files older than N days (omit = no age limit) + # max_dir_mb: 1024 # Trim directory to N MB by deleting oldest files first (omit = no size limit) + # cleanup_interval_secs: 600 # How often to run cleanup in seconds (default: 600 = 10 min) + + # relay_id: "my-relay-1" # Relay identity (optional; random hex string generated if absent) # listener_defaults: # Default settings inherited by all listeners diff --git a/docs/config.md b/docs/config.md index 81e5136b..502db653 100644 --- a/docs/config.md +++ b/docs/config.md @@ -246,6 +246,50 @@ upstream routing. --- +## Logging + +### `logging.mlog` + +Enables MoQ-level (mlog) structured logging of control messages and protocol +events, one file per session. + +```yaml +logging: + mlog: + dir: "/var/log/moqx/mlog" # required to enable; empty string disables + sample_rate: 0.01 # log 1% of sessions (default: 1.0 = all) + max_age_days: 7 # delete files older than 7 days + max_dir_mb: 1024 # trim to 1 GB, deleting oldest files first +``` + +| Field | Type | Default | Description | +|---|---|---|---| +| `dir` | `string` | — (disabled) | Output directory. Each session writes to `/.mlog`. If empty, mlog is disabled. | +| `sample_rate` | `float` | `1.0` | Fraction of sessions to log, in `[0.0, 1.0]`. `1.0` logs all sessions; `0.01` logs ~1%. | +| `max_age_days` | `uint` | — (no limit) | Delete `.mlog` files whose last-write time is older than this many days. Omit to keep all files regardless of age. Must be ≥ 1 if set. | +| `max_dir_mb` | `uint` | — (no limit) | After age-based deletion, if the combined size of remaining `.mlog` files exceeds this value (in MB), the oldest files are deleted until the directory is under the limit. Omit for no size cap. Must be ≥ 1 if set. | + +#### Retention behaviour + +The two limits are applied together on each cleanup pass (at startup and then +periodically at the configured `cleanup_interval_secs`): + +1. **Age** — any `.mlog` file older than `max_age_days` is deleted first. +2. **Size** — if the remaining files still exceed `max_dir_mb`, the oldest + files (by last-write time) are deleted until the directory is under the + limit. + +Either limit can be set independently; both are optional. + +#### Lifecycle + +| Field | Lifecycle | +|---|---| +| `dir`, `sample_rate` | Static — requires restart | +| `max_age_days`, `max_dir_mb`, `cleanup_interval_secs` | Static — requires restart | + +--- + ## Admin Server The admin server exposes an HTTP management API. It is optional; omit the diff --git a/src/MoqxPicoRelayServer.h b/src/MoqxPicoRelayServer.h index 13fae237..f95088de 100644 --- a/src/MoqxPicoRelayServer.h +++ b/src/MoqxPicoRelayServer.h @@ -13,6 +13,7 @@ #include "stats/StatsRegistry.h" #include #include +#include #include #include @@ -35,6 +36,10 @@ class MoqxPicoRelayServer : public moxygen::MoQPicoQuicEventBaseServer { void setStatsRegistry(std::shared_ptr registry); + void setMLoggerFactory(std::shared_ptr factory) { + moxygen::MoQServerBase::setMLoggerFactory(std::move(factory)); + } + // Preferred entry point: binds the address from the stored ListenerConfig. void start(); diff --git a/src/MoqxRelayServer.h b/src/MoqxRelayServer.h index 09f5bf9d..d1d13e93 100644 --- a/src/MoqxRelayServer.h +++ b/src/MoqxRelayServer.h @@ -13,6 +13,7 @@ #include "stats/StatsRegistry.h" #include #include +#include namespace openmoq::moqx { @@ -28,6 +29,10 @@ class MoqxRelayServer : public moxygen::MoQServer { void setStatsRegistry(std::shared_ptr registry); + void setMLoggerFactory(std::shared_ptr factory) { + moxygen::MoQServerBase::setMLoggerFactory(std::move(factory)); + } + // Preferred entry point: binds the address from the stored ListenerConfig. void start(); diff --git a/src/MoqxServerFactory.h b/src/MoqxServerFactory.h index 4b01e181..717772a0 100644 --- a/src/MoqxServerFactory.h +++ b/src/MoqxServerFactory.h @@ -7,6 +7,7 @@ #pragma once #include +#include #include "MoqxPicoRelayServer.h" #include "MoqxRelayContext.h" @@ -15,15 +16,19 @@ #include "stats/StatsRegistry.h" #include #include +#include + namespace openmoq::moqx { // Creates the appropriate relay server for the given listener config and wires // stats. Both stack paths accept the same ioExecutor for a uniform call site. +// Optionally wires an mlog factory for per-session logging. inline std::shared_ptr makeRelayServer( const config::ListenerConfig& listenerCfg, std::shared_ptr context, std::shared_ptr ioExecutor, - std::shared_ptr statsRegistry + std::shared_ptr statsRegistry, + std::shared_ptr mlogFactory = nullptr ) { if (listenerCfg.quicStack == config::QuicStack::Picoquic) { auto server = std::make_shared( @@ -32,11 +37,17 @@ inline std::shared_ptr makeRelayServer( std::move(ioExecutor) ); server->setStatsRegistry(std::move(statsRegistry)); + if (mlogFactory) { + server->setMLoggerFactory(std::move(mlogFactory)); + } return server; } auto server = std::make_shared(listenerCfg, std::move(context), std::move(ioExecutor)); server->setStatsRegistry(std::move(statsRegistry)); + if (mlogFactory) { + server->setMLoggerFactory(std::move(mlogFactory)); + } return server; } diff --git a/src/config/Config.h b/src/config/Config.h index a05e674b..3e3ca1fd 100644 --- a/src/config/Config.h +++ b/src/config/Config.h @@ -111,12 +111,25 @@ struct AdminConfig { std::optional tls; }; +struct MLogConfig { + std::string dir; // output directory; empty = disabled + float sampleRate{1.0f}; // 1.0 = log all sessions, 0.0 = none + std::optional maxAgeDays; // delete files older than N days; nullopt = no limit + std::optional maxDirMb; // trim directory to this size in MB; nullopt = no limit + uint32_t cleanupIntervalSecs{600}; // how often to run cleanup (default 10 min) +}; + +struct LoggingConfig { + std::optional mlog; +}; + struct Config { std::vector listeners; folly::F14FastMap services; std::optional admin; std::string relayID; // always set: from config or randomly generated uint32_t threads{1}; + std::optional logging; }; } // namespace openmoq::moqx::config diff --git a/src/config/ConfigResolver.cpp b/src/config/ConfigResolver.cpp index 20c29da6..cceb0cb9 100644 --- a/src/config/ConfigResolver.cpp +++ b/src/config/ConfigResolver.cpp @@ -6,6 +6,7 @@ #include "config/loader/ConfigResolver.h" +#include #include #include #include @@ -685,6 +686,28 @@ folly::Expected resolveConfig(const ParsedConfig& c errors.push_back("threads > 1 is not yet supported"); } + // === Validate logging === + if (config.logging.value().has_value()) { + const auto& logging = *config.logging.value(); + if (logging.mlog.value().has_value()) { + const auto& mlog = *logging.mlog.value(); + if (mlog.sample_rate.value().has_value()) { + float rate = *mlog.sample_rate.value(); + if (rate < 0.0f || rate > 1.0f) { + errors.push_back( + "logging.mlog.sample_rate must be in [0.0, 1.0], got " + std::to_string(rate) + ); + } + } + if (mlog.max_age_days.value().has_value() && *mlog.max_age_days.value() == 0) { + errors.push_back("logging.mlog.max_age_days must be >= 1 if set"); + } + if (mlog.max_dir_mb.value().has_value() && *mlog.max_dir_mb.value() == 0) { + errors.push_back("logging.mlog.max_dir_mb must be >= 1 if set"); + } + } + } + if (!errors.empty()) { return folly::makeUnexpected("Config validation failed:\n - " + folly::join("\n - ", errors)); } @@ -715,6 +738,34 @@ folly::Expected resolveConfig(const ParsedConfig& c // Resolve relayID: use configured value or generate a random hex string std::string relayID = config.relay_id.value().value_or(generateRelayID()); + // Resolve logging config + std::optional loggingConfig; + if (config.logging.value().has_value()) { + const auto& parsedLogging = *config.logging.value(); + LoggingConfig resolved; + if (parsedLogging.mlog.value().has_value()) { + const auto& parsedMlog = *parsedLogging.mlog.value(); + MLogConfig mlogConfig; + mlogConfig.dir = parsedMlog.dir.value(); + mlogConfig.sampleRate = parsedMlog.sample_rate.value().value_or(1.0f); + mlogConfig.maxAgeDays = parsedMlog.max_age_days.value(); + mlogConfig.maxDirMb = parsedMlog.max_dir_mb.value(); + mlogConfig.cleanupIntervalSecs = + parsedMlog.cleanup_interval_secs.value().value_or(600u); + if (!mlogConfig.dir.empty()) { + std::error_code ec; + std::filesystem::create_directories(mlogConfig.dir, ec); + if (ec) { + return folly::makeUnexpected( + "Failed to create mlog directory '" + mlogConfig.dir + "': " + ec.message() + ); + } + } + resolved.mlog = std::move(mlogConfig); + } + loggingConfig = std::move(resolved); + } + return ResolvedConfig{ .config = Config{ @@ -732,6 +783,7 @@ folly::Expected resolveConfig(const ParsedConfig& c .admin = std::move(adminConfig), .relayID = std::move(relayID), .threads = threads, + .logging = std::move(loggingConfig), }, .warnings = std::move(warnings), }; diff --git a/src/config/loader/ParsedConfig.h b/src/config/loader/ParsedConfig.h index c4a68c57..110c1f83 100644 --- a/src/config/loader/ParsedConfig.h +++ b/src/config/loader/ParsedConfig.h @@ -215,6 +215,28 @@ struct ParsedServiceDefaultsConfig { rfl::Description<"Default cache settings for services", std::optional> cache; }; +struct ParsedMLogConfig { + rfl::Description<"Directory for per-session MoQ log files (empty = disabled)", std::string> dir; + rfl::Description<"Fraction of sessions to log (0.0-1.0, default 1.0)", std::optional> + sample_rate; + rfl::Description< + "Delete log files older than this many days (omit = no age limit)", + std::optional> + max_age_days; + rfl::Description< + "Trim mlog directory to this size in MB by deleting oldest files first (omit = no size limit)", + std::optional> + max_dir_mb; + rfl::Description< + "How often to run mlog cleanup, in seconds (default 600 = 10 minutes)", + std::optional> + cleanup_interval_secs; +}; + +struct ParsedLoggingConfig { + rfl::Description<"MoQ-level (mlog) per-session logging", std::optional> mlog; +}; + struct ParsedConfig { rfl::Description< "Listener definitions (currently exactly one supported)", @@ -235,6 +257,7 @@ struct ParsedConfig { std::optional> listener_defaults; rfl::Description<"Number of IO worker threads (default: 1)", std::optional> threads; + rfl::Description<"Logging configuration", std::optional> logging; }; } // namespace openmoq::moqx::config diff --git a/src/logging/MLogCleaner.cpp b/src/logging/MLogCleaner.cpp new file mode 100644 index 00000000..412ea1dc --- /dev/null +++ b/src/logging/MLogCleaner.cpp @@ -0,0 +1,130 @@ +/* + * Copyright (c) OpenMOQ contributors. + * This source code is licensed under the Apache 2.0 license found in the + * LICENSE file in the root directory of this source tree. + */ + +#include "logging/MLogCleaner.h" + +#include +#include +#include +#include + +#include + +namespace openmoq::moqx::logging { + +namespace { +constexpr uint64_t kBytesPerMb = 1024ULL * 1024ULL; +constexpr uint64_t kSecondsPerDay = 24ULL * 60ULL * 60ULL; +} // namespace + +MLogCleaner::MLogCleaner( + std::string dir, + std::optional maxAgeDays, + std::optional maxDirMb) + : dir_(std::move(dir)) { + if (maxAgeDays) { + maxAge_ = std::chrono::seconds(*maxAgeDays * kSecondsPerDay); + } + if (maxDirMb) { + maxDirBytes_ = *maxDirMb * kBytesPerMb; + } +} + +void MLogCleaner::cleanup() { + namespace fs = std::filesystem; + using namespace std::chrono; + + if (dir_.empty()) { + return; + } + if (!maxAge_ && !maxDirBytes_) { + return; + } + + std::error_code ec; + if (!fs::is_directory(dir_, ec)) { + XLOG(WARN) << "mlog cleanup: directory not found or inaccessible: " << dir_; + return; + } + + const auto now = system_clock::now(); + + struct FileEntry { + fs::path path; + fs::file_time_type mtime; + uintmax_t size; + }; + + std::vector remaining; + uint64_t totalSize = 0; + + for (const auto& entry : fs::directory_iterator(dir_, ec)) { + if (!entry.is_regular_file(ec) || ec) { + if (ec) { + XLOG(WARN) << "mlog cleanup: failed to check file type " << entry.path() << ": " + << ec.message(); + } + ec.clear(); + continue; + } + if (entry.path().extension() != ".mlog") { + continue; + } + + const uintmax_t size = entry.file_size(ec); + if (ec) { + XLOG(WARN) << "mlog cleanup: failed to stat " << entry.path() << ": " << ec.message(); + ec.clear(); + continue; + } + const auto mtime = entry.last_write_time(ec); + if (ec) { + XLOG(WARN) << "mlog cleanup: failed to get mtime " << entry.path() << ": " + << ec.message(); + ec.clear(); + continue; + } + + // Age-based deletion: remove immediately if beyond max age. + if (maxAge_) { + if (now - clock_cast(mtime) > *maxAge_) { + fs::remove(entry.path(), ec); + if (ec) { + XLOG(WARN) << "mlog cleanup: failed to remove aged file " + << entry.path() << ": " << ec.message(); + ec.clear(); + } + continue; + } + } + + remaining.push_back({entry.path(), mtime, size}); + totalSize += size; + } + + // Size-based deletion: delete oldest files first until under the limit. + if (maxDirBytes_ && totalSize > *maxDirBytes_) { + std::sort(remaining.begin(), remaining.end(), [](const FileEntry& a, const FileEntry& b) { + return a.mtime < b.mtime; // oldest first + }); + + for (const auto& f : remaining) { + if (totalSize <= *maxDirBytes_) { + break; + } + fs::remove(f.path, ec); + if (ec) { + XLOG(WARN) << "mlog cleanup: failed to remove oversized file " << f.path << ": " + << ec.message(); + ec.clear(); + continue; + } + totalSize -= f.size; + } + } +} + +} // namespace openmoq::moqx::logging diff --git a/src/logging/MLogCleaner.h b/src/logging/MLogCleaner.h new file mode 100644 index 00000000..dd0b4fc9 --- /dev/null +++ b/src/logging/MLogCleaner.h @@ -0,0 +1,55 @@ +/* + * Copyright (c) OpenMOQ contributors. + * This source code is licensed under the Apache 2.0 license found in the + * LICENSE file in the root directory of this source tree. + */ + +#pragma once + +#include +#include +#include +#include + +namespace openmoq::moqx::logging { + +/** + * MLogCleaner enforces retention limits on a directory of .mlog files. + * + * Two independent limits can be set, both applied on each cleanup() call: + * + * 1. Age limit (maxAgeDays): any .mlog file whose last-write time is older + * than the configured number of days is deleted unconditionally. + * + * 2. Size limit (maxDirMb): after age-based deletion, if the sum of + * remaining .mlog file sizes still exceeds the limit, files are deleted + * in oldest-first order until the directory is under the limit. + * + * cleanup() is designed to be called: + * - once at startup (to clean up files left by previous relay runs), and + * - periodically during the relay's lifetime (interval is configurable) to + * prevent unbounded growth. + * + * cleanup() is not thread-safe; callers must serialise calls externally (e.g. + * by always posting to the same single-threaded mlog executor). + */ +class MLogCleaner { + public: + MLogCleaner( + std::string dir, + std::optional maxAgeDays, + std::optional maxDirMb); + + /** + * Run one cleanup pass. Logs warnings on filesystem errors but never + * throws; partial progress (some files deleted) is acceptable. + */ + void cleanup(); + + private: + std::string dir_; + std::optional maxAge_; // derived from maxAgeDays + std::optional maxDirBytes_; // derived from maxDirMb +}; + +} // namespace openmoq::moqx::logging diff --git a/src/main.cpp b/src/main.cpp index 46873906..7bc08f22 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -13,6 +13,7 @@ #include "admin/StateHandler.h" #include "config/loader/ConfigInit.h" #include "stats/StatsRegistry.h" +#include "logging/MLogCleaner.h" #include @@ -24,6 +25,8 @@ #include #include #include +#include +#include #include #include @@ -88,8 +91,38 @@ int main(int argc, char* argv[]) { } // === 2. Set up logging/observability === - // TODO: logging framework, log levels, structured logging - // (currently handled implicitly by folly::Init) + std::shared_ptr mlogFactory; + std::shared_ptr mlogExecutor; + std::shared_ptr mlogCleaner; + // Shared_ptr to a recursive schedule function; kept alive until evb exits. + std::shared_ptr> mlogCleanupSchedule; + + if (config.logging && config.logging->mlog) { + const auto& mcfg = *config.logging->mlog; + auto baseFactory = std::make_shared(moxygen::VantagePoint::SERVER); + // Set output directory on the underlying factory + if (!mcfg.dir.empty()) { + baseFactory->setDir(mcfg.dir); + } + // Use background executor for mlog writes + mlogExecutor = std::make_shared( + 1, + std::make_shared("moqx-mlog") + ); + baseFactory->setWriteExecutor(mlogExecutor); + if (mcfg.sampleRate < 1.0f) { + mlogFactory = std::make_shared(baseFactory, mcfg.sampleRate); + } else { + mlogFactory = std::move(baseFactory); + } + + // Set up directory cleanup if either retention limit is configured. + if (!mcfg.dir.empty() && (mcfg.maxAgeDays || mcfg.maxDirMb)) { + mlogCleaner = std::make_shared( + mcfg.dir, mcfg.maxAgeDays, mcfg.maxDirMb + ); + } + } // === 3. Set up signal handling === folly::EventBase evb; @@ -114,7 +147,9 @@ int main(int argc, char* argv[]) { std::vector> servers; for (const auto& listenerCfg : config.listeners) { - servers.emplace_back(makeRelayServer(listenerCfg, context, ioExecutor, statsRegistry)); + servers.emplace_back( + makeRelayServer(listenerCfg, context, ioExecutor, statsRegistry, mlogFactory) + ); } if (!servers.empty()) { @@ -145,6 +180,31 @@ int main(int argc, char* argv[]) { context->initUpstreams(ioExecutor->getAllEventBases()[0].get()); } + // Schedule mlog directory cleanup if configured. + // Runs at startup (to clean files from previous runs) and then periodically + // so the directory doesn't grow unbounded + if (mlogCleaner && mlogExecutor) { + const uint32_t intervalMs = config.logging->mlog->cleanupIntervalSecs * 1000; + // Startup pass — run immediately on the mlog executor. + mlogExecutor->add([c = mlogCleaner] { c->cleanup(); }); + // Periodic pass — schedule via the main event base, post work to executor. + mlogCleanupSchedule = std::make_shared>(); + std::weak_ptr> weak = mlogCleanupSchedule; + *mlogCleanupSchedule = + [&evb, c = mlogCleaner, exec = mlogExecutor, weak, intervalMs]() { + evb.runAfterDelay( + [c, exec, weak, intervalMs]() { + exec->add([c] { c->cleanup(); }); + //self-reschedule for the next run, if still alive + if (auto fn = weak.lock()) { + (*fn)(); + } + }, + intervalMs); + }; + (*mlogCleanupSchedule)(); + } + evb.loopForever(); // Hard shutdown watchdog: if teardown hangs, force-exit after 10 seconds. @@ -172,7 +232,11 @@ int main(int argc, char* argv[]) { // TODO: TBD // === 12. Flush telemetry/logs === - // TODO: ensure observability data is sent + // Join mlog write executor after all sessions have closed so that any + // pending outputLogs() tasks complete before process exit. + if (mlogExecutor) { + mlogExecutor->join(); + } // === 13. Clean up resources === // Stop admin last — allows a final metrics scrape during drain.