Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,16 @@ _CPack_Packages/

# Dependency scratch dir
/.scratch/
/.docker-deps/

# OS
.DS_Store

# IDE / tools
.vscode/
.claude/
.agents/
AGENTS.md
.cache/

# Docker secrets (never commit)
Expand Down
3 changes: 3 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ if(POLICY CMP0167)
endif()

find_package(moxygen REQUIRED)
find_package(OpenSSL REQUIRED)

# --- yaml-cpp (FetchContent) ---

Expand Down Expand Up @@ -117,6 +118,7 @@ target_compile_options(moqx_cache PRIVATE -Wall -Wextra -Wpedantic)
# --- Core library ---

add_library(moqx_core STATIC
src/auth/Auth.cpp
src/NamespaceTree.cpp
src/SubscriptionRegistry.cpp
src/MoqxRelay.cpp
Expand Down Expand Up @@ -159,6 +161,7 @@ target_link_libraries(moqx_core PUBLIC
# FizzAcceptorHandshakeHelper but omits this dep from its cmake config.
Folly::folly_io_async_fdsock_async_fd_socket
moxygen::moxygen_moqclient
OpenSSL::Crypto
)

target_compile_options(moqx_core PRIVATE -Wall -Wextra -Wpedantic)
Expand Down
9 changes: 9 additions & 0 deletions config.example.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,15 @@ services:
# tls:
# insecure: false # true = skip cert verification (dev only)
# # ca_cert: /path/to/ca.pem # Custom CA cert (mutually exclusive with insecure: true)
# auth: # Optional: enable CAT-style authorization for this service
# enabled: true # Turn auth on for this service (default: false)
# token_type: 0 # MOQT AUTHORIZATION_TOKEN type; 0 = out-of-band/private type (draft-ietf-moq-transport, "Token Type")
# hmac_keys: # Keys that sign/verify CAT tokens (HMAC-SHA256)
# - id: "key-1" # Key ID carried in the token envelope; selects which secret verifies it
# secret: "replace-with-secret" # Shared secret (use a long, random value)
# require_setup_token: true # Require a token at CLIENT_SETUP, not just on individual requests
# allow_request_token_override: true # Allow a per-request token to override the session's setup grants
# strict_claims: false # Reject unsupported token claims instead of ignoring them

testing:
match:
Expand Down
2 changes: 1 addition & 1 deletion src/MoqxPicoRelayServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ void MoqxPicoRelayServer::onNewSession(std::shared_ptr<MoQSession> clientSession
}

void MoqxPicoRelayServer::terminateClientSession(std::shared_ptr<MoQSession> session) {
context_->onSessionEnd();
context_->onSessionEnd(session);
MoQPicoQuicEventBaseServer::terminateClientSession(std::move(session));
}

Expand Down
162 changes: 159 additions & 3 deletions src/MoqxRelay.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,86 @@ void MoqxRelay::onUpstreamDisconnect() {
upstreamSubNsHandle_.reset();
}

folly::Expected<folly::Unit, auth::AuthError> MoqxRelay::authenticateSession(
const ClientSetup& clientSetup,
std::shared_ptr<MoQSession> session
) {
if (!authVerifier_.enabled()) {
return folly::unit;
}
auto token = auth::findAuthToken(clientSetup.params, authVerifier_.tokenType());
if (!token) {
if (!authVerifier_.requireSetupToken()) {
return folly::unit;
}
return folly::makeUnexpected(auth::AuthError::Missing);
}
auto grants = authVerifier_.verify(*token);
if (grants.hasError()) {
return folly::makeUnexpected(grants.error());
}
if (!auth::allows(grants.value(), auth::Action::ClientSetup, TrackNamespace{})) {
return folly::makeUnexpected(auth::AuthError::Forbidden);
}
sessionAuth_.insert_or_assign(session.get(), std::move(grants.value()));
return folly::unit;
}

folly::Expected<folly::Unit, auth::AuthError> MoqxRelay::authorize(
auth::Action action,
const Parameters& params,
const TrackNamespace& ns,
const std::shared_ptr<MoQSession>& session,
std::optional<std::string_view> trackName
) {
if (!authVerifier_.enabled()) {
return folly::unit;
}

auto token = auth::findAuthToken(params, authVerifier_.tokenType());
if (token && !authVerifier_.allowRequestTokenOverride()) {
// Request carried a per-request token but override is disabled, so the
// grants established at session setup govern. This is expected behavior;
// log at DBG1 for visibility rather than failing the request.
XLOG(DBG1) << "authorize: ignoring request AUTHORIZATION_TOKEN for action="
<< static_cast<uint64_t>(action) << " (allow_request_token_override is disabled)";
token.reset();
}

// Resolve grants from exactly one source (request token or session), then run
// a single auth::allows() check below.
const auth::Grants* grants = nullptr;
auth::Grants verified;
if (token) {
auto res = authVerifier_.verify(*token);
if (res.hasError()) {
XLOG(DBG1) << "authorize: request token verification failed for action="
<< static_cast<uint64_t>(action) << ": " << auth::toString(res.error());
return folly::makeUnexpected(res.error());
}
verified = std::move(res.value());
grants = &verified;
} else {
auto it = sessionAuth_.find(session.get());
if (it == sessionAuth_.end()) {
XLOG(DBG1) << "authorize: no session grants for action=" << static_cast<uint64_t>(action)
<< " ns=" << ns;
return folly::makeUnexpected(auth::AuthError::Missing);
}
grants = &it->second;
}

const bool permitted =
trackName ? auth::allows(*grants, action, FullTrackName{ns, std::string(*trackName)})
: auth::allows(*grants, action, ns);
if (!permitted) {
XLOG(DBG1) << "authorize: action=" << static_cast<uint64_t>(action)
<< " not permitted for ns=" << ns;
return folly::makeUnexpected(auth::AuthError::Forbidden);
}
return folly::unit;
}

// Sends SUBSCRIBE_UPDATE to update forwarding state. Called from:
// - subscribeNamespace: forwarder was empty, new subscriber added
// (forward=true)
Expand Down Expand Up @@ -141,7 +221,6 @@ std::shared_ptr<Subscriber::PublishNamespaceHandle> MoqxRelay::doPublishNamespac
std::string peerID
) {
XLOG(DBG1) << __func__ << " ns=" << pubNs.trackNamespace;
// check auth
if (!pubNs.trackNamespace.startsWith(allowedNamespacePrefix_)) {
return nullptr;
}
Expand Down Expand Up @@ -191,6 +270,15 @@ folly::coro::Task<Subscriber::PublishNamespaceResult> MoqxRelay::publishNamespac
// TODO: store auth for forwarding on future SubscribeNamespace?
auto session = MoQSession::getRequestSession();
auto requestID = pubNs.requestID;
auto authRes =
authorize(auth::Action::PublishNamespace, pubNs.params, pubNs.trackNamespace, session);
if (authRes.hasError()) {
co_return folly::makeUnexpected(PublishNamespaceError{
requestID,
PublishNamespaceErrorCode::UNAUTHORIZED,
auth::toString(authRes.error())
});
}
auto result = doPublishNamespace(std::move(pubNs), session, std::move(callback));
if (!result) {
co_return folly::makeUnexpected(
Expand Down Expand Up @@ -274,6 +362,19 @@ Subscriber::PublishResult
MoqxRelay::publish(PublishRequest pub, std::shared_ptr<Publisher::SubscriptionHandle> handle) {
XLOG(DBG1) << __func__ << " ftn=" << pub.fullTrackName;
XCHECK(handle) << "Publish handle cannot be null";
auto session = MoQSession::getRequestSession();
auto authRes = authorize(
auth::Action::Publish,
pub.params,
pub.fullTrackName.trackNamespace,
session,
pub.fullTrackName.trackName
);
if (authRes.hasError()) {
return folly::makeUnexpected(
PublishError{pub.requestID, PublishErrorCode::UNAUTHORIZED, auth::toString(authRes.error())}
);
}
if (!pub.fullTrackName.trackNamespace.startsWith(allowedNamespacePrefix_)) {
return folly::makeUnexpected(
PublishError{pub.requestID, PublishErrorCode::UNINTERESTED, "bad namespace"}
Expand Down Expand Up @@ -555,6 +656,21 @@ folly::coro::Task<Publisher::SubscribeNamespaceResult> MoqxRelay::subscribeNames
// Fall through: register the peer as a normal subNs subscriber so it
// receives namespace announcements as publishers connect.
}
if (incomingPeerID.empty()) {
auto authRes = authorize(
auth::Action::SubscribeNamespace,
subNs.params,
subNs.trackNamespacePrefix,
session
);
if (authRes.hasError()) {
co_return folly::makeUnexpected(SubscribeNamespaceError{
subNs.requestID,
SubscribeNamespaceErrorCode::UNAUTHORIZED,
auth::toString(authRes.error())
});
}
}
auto maybeNegotiatedVersion = session->getNegotiatedVersion();
CHECK(maybeNegotiatedVersion.has_value());

Expand Down Expand Up @@ -698,6 +814,20 @@ MoqxRelay::PublishState MoqxRelay::findPublishState(const FullTrackName& ftn) {
folly::coro::Task<Publisher::SubscribeResult>
MoqxRelay::subscribe(SubscribeRequest subReq, std::shared_ptr<TrackConsumer> consumer) {
auto session = MoQSession::getRequestSession();
auto authRes = authorize(
auth::Action::Subscribe,
subReq.params,
subReq.fullTrackName.trackNamespace,
session,
subReq.fullTrackName.trackName
);
if (authRes.hasError()) {
co_return folly::makeUnexpected(SubscribeError{
subReq.requestID,
SubscribeErrorCode::UNAUTHORIZED,
auth::toString(authRes.error())
});
}
const auto& ftn = subReq.fullTrackName;

if (ftn.trackNamespace.empty()) {
Expand Down Expand Up @@ -820,8 +950,18 @@ folly::coro::Task<Publisher::FetchResult>
MoqxRelay::fetch(Fetch fetch, std::shared_ptr<FetchConsumer> consumer) {
auto session = MoQSession::getRequestSession();

// check auth
// get trackNamespace
auto authRes = authorize(
auth::Action::Fetch,
fetch.params,
fetch.fullTrackName.trackNamespace,
session,
fetch.fullTrackName.trackName
);
if (authRes.hasError()) {
co_return folly::makeUnexpected(
FetchError({fetch.requestID, FetchErrorCode::UNAUTHORIZED, auth::toString(authRes.error())})
);
}
if (fetch.fullTrackName.trackNamespace.empty()) {
co_return folly::makeUnexpected(
FetchError({fetch.requestID, FetchErrorCode::TRACK_NOT_EXIST, "namespace required"})
Expand Down Expand Up @@ -887,6 +1027,22 @@ MoqxRelay::fetch(Fetch fetch, std::shared_ptr<FetchConsumer> consumer) {
folly::coro::Task<Publisher::TrackStatusResult> MoqxRelay::trackStatus(TrackStatus trackStatus) {
XLOG(DBG1) << __func__ << " ftn=" << trackStatus.fullTrackName;

auto session = MoQSession::getRequestSession();
auto authRes = authorize(
auth::Action::TrackStatus,
trackStatus.params,
trackStatus.fullTrackName.trackNamespace,
session,
trackStatus.fullTrackName.trackName
);
if (authRes.hasError()) {
co_return folly::makeUnexpected(TrackStatusError{
trackStatus.requestID,
TrackStatusErrorCode::UNAUTHORIZED,
auth::toString(authRes.error())
});
}

if (trackStatus.fullTrackName.trackNamespace.empty()) {
co_return folly::makeUnexpected(TrackStatusError(
{trackStatus.requestID, TrackStatusErrorCode::TRACK_NOT_EXIST, "namespace required"}
Expand Down
22 changes: 20 additions & 2 deletions src/MoqxRelay.h
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include "NamespaceTree.h"
#include "SubscriptionRegistry.h"
#include "UpstreamProvider.h"
#include "auth/Auth.h"
#include "config/Config.h"
#include "relay/PropertyRanking.h"
#include <moxygen/MoQSession.h>
Expand Down Expand Up @@ -100,12 +101,13 @@ class MoqxRelay : public moxygen::Publisher,
explicit MoqxRelay(
config::CacheConfig cache = {},
std::string relayID = {},
config::AuthConfig auth = {},
uint64_t maxDeselected = kDefaultMaxDeselected,
std::chrono::milliseconds idleTimeout = kDefaultIdleTimeout,
std::chrono::milliseconds activityThreshold = kDefaultActivityThreshold
)
: relayID_(std::move(relayID)), maxDeselected_(maxDeselected), idleTimeout_(idleTimeout),
activityThreshold_(activityThreshold) {
: relayID_(std::move(relayID)), authVerifier_(std::move(auth)), maxDeselected_(maxDeselected),
idleTimeout_(idleTimeout), activityThreshold_(activityThreshold) {
if (cache.maxCachedTracks > 0) {
cache_ = std::make_unique<MoqxCache>(cache.maxCachedTracks, cache.maxCachedGroupsPerTrack);
cache_->setMaxCachedBytes(static_cast<size_t>(cache.maxCachedMb) * 1024 * 1024);
Expand All @@ -119,6 +121,12 @@ class MoqxRelay : public moxygen::Publisher,
allowedNamespacePrefix_ = std::move(allowed);
}

folly::Expected<folly::Unit, auth::AuthError> authenticateSession(
const moxygen::ClientSetup& clientSetup,
std::shared_ptr<moxygen::MoQSession> session
);
void removeSessionAuth(moxygen::MoQSession* session) { sessionAuth_.erase(session); }

// Store the upstream provider. The provider must have been constructed with
// publishHandler=this and subscribeHandler=this so that the upstream relay's
// reciprocal subNs and namespace announcements route through MoqxRelay.
Expand Down Expand Up @@ -297,8 +305,18 @@ class MoqxRelay : public moxygen::Publisher,
void
onTrackEvicted(const moxygen::FullTrackName& ftn, std::shared_ptr<moxygen::MoQSession> session);

folly::Expected<folly::Unit, auth::AuthError> authorize(
auth::Action action,
const moxygen::Parameters& params,
const moxygen::TrackNamespace& ns,
const std::shared_ptr<moxygen::MoQSession>& session,
std::optional<std::string_view> trackName = std::nullopt
);

moxygen::TrackNamespace allowedNamespacePrefix_;
std::string relayID_;
auth::AuthTokenVerifier authVerifier_;
folly::F14FastMap<moxygen::MoQSession*, auth::Grants> sessionAuth_;
std::shared_ptr<UpstreamProvider> upstream_;

// Holds the peer subNs handle for the upstream (initiating) direction.
Expand Down
29 changes: 26 additions & 3 deletions src/MoqxRelayContext.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,10 @@ MoqxRelayContext::MoqxRelayContext(
)
: serviceMatcher_(services), relayID_(relayID) {
for (const auto& [name, svc] : services) {
services_.emplace(name, ServiceEntry{svc, std::make_shared<MoqxRelay>(svc.cache, relayID)});
services_.emplace(
name,
ServiceEntry{svc, std::make_shared<MoqxRelay>(svc.cache, relayID, svc.auth)}
);
}
}

Expand Down Expand Up @@ -138,14 +141,17 @@ void MoqxRelayContext::onNewSession(std::shared_ptr<MoQSession> clientSession) {
}
}

void MoqxRelayContext::onSessionEnd() {
void MoqxRelayContext::onSessionEnd(std::shared_ptr<MoQSession> session) {
if (statsCollector_) {
statsCollector_->onSessionEnd();
}
for (auto& [name, entry] : services_) {
entry.relay->removeSessionAuth(session.get());
}
}

folly::Expected<folly::Unit, SessionCloseErrorCode> MoqxRelayContext::validateAuthority(
const ClientSetup& /*clientSetup*/,
const ClientSetup& clientSetup,
uint64_t /*negotiatedVersion*/,
std::shared_ptr<MoQSession> session
) {
Expand All @@ -161,6 +167,23 @@ folly::Expected<folly::Unit, SessionCloseErrorCode> MoqxRelayContext::validateAu
// Route: set per-service relay as handler
auto it = services_.find(*matchedName);
CHECK(it != services_.end()) << "Service '" << *matchedName << "' matched but no entry found";
auto authRes = it->second.relay->authenticateSession(clientSetup, session);
if (authRes.hasError()) {
XLOG(ERR) << "Authorization failed for authority=" << authority << " path=" << path
<< " reason=" << auth::toString(authRes.error());
switch (authRes.error()) {
case auth::AuthError::Expired:
return folly::makeUnexpected(SessionCloseErrorCode::EXPIRED_AUTH_TOKEN);
case auth::AuthError::Malformed:
return folly::makeUnexpected(SessionCloseErrorCode::MALFORMED_AUTH_TOKEN);
case auth::AuthError::BadSignature:
case auth::AuthError::Forbidden:
case auth::AuthError::Missing:
case auth::AuthError::WrongTokenType:
return folly::makeUnexpected(SessionCloseErrorCode::UNAUTHORIZED);
}
return folly::makeUnexpected(SessionCloseErrorCode::UNAUTHORIZED);
}
session->setPublishHandler(it->second.relay);
session->setSubscribeHandler(it->second.relay);
return folly::unit;
Expand Down
Loading
Loading