Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ add_library(moqx_core STATIC
src/relay/PropertyRanking.cpp
src/relay/CrossExecFilter.cpp
src/relay/CrossExecForwarderCallback.cpp
src/relay/WeakRelayForwarderCallback.cpp
src/relay/PublisherCrossExecFilter.cpp
src/relay/SubscriberCrossExecFilter.cpp
)
Expand Down
8 changes: 7 additions & 1 deletion scripts/perf-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
# --delivery-timeout N Delivery timeout in ms (default: 500)
# --transport TYPE quic or webtransport (default: quic)
# --no-relay-thread Disable relay exec thread (use_relay_thread: false)
# --local-forwarders Enable per-subscriber-thread local forwarders
# (use_local_forwarders: true; requires relay thread)
# --io-threads N Number of relay IO threads (default: 1)
# --threads N Number of perf client threads (default: 2)
# --relay-log SPEC folly XLOG config passed as --logging=SPEC to relay
Expand Down Expand Up @@ -52,6 +54,7 @@ DURATION=30
DELIVERY_TIMEOUT=500
TRANSPORT="quic"
USE_RELAY_THREAD="true"
USE_LOCAL_FORWARDERS="false"
IO_THREADS=1
CLIENT_THREADS=2
RELAY_LOG_SPEC=""
Expand Down Expand Up @@ -80,6 +83,7 @@ while [[ $# -gt 0 ]]; do
--delivery-timeout) DELIVERY_TIMEOUT="$2"; shift 2 ;;
--transport) TRANSPORT="$2"; shift 2 ;;
--no-relay-thread) USE_RELAY_THREAD="false"; shift ;;
--local-forwarders) USE_LOCAL_FORWARDERS="true"; shift ;;
--io-threads) IO_THREADS="$2"; shift 2 ;;
--threads) CLIENT_THREADS="$2"; shift 2 ;;
--relay-log) RELAY_LOG_SPEC="$2"; shift 2 ;;
Expand Down Expand Up @@ -206,6 +210,7 @@ cat >"$RELAY_CFG" <<EOF
relay_id: "perf-test-relay"
threads: $IO_THREADS
use_relay_thread: $USE_RELAY_THREAD
use_local_forwarders: $USE_LOCAL_FORWARDERS
mvfst_bpf_steering: $BPF_STEERING
listeners:
- name: perf
Expand Down Expand Up @@ -254,6 +259,7 @@ cp "$RELAY_CFG" "$LOG_DIR/relay.yaml"
echo "transport: $TRANSPORT"
echo "io_threads: $IO_THREADS"
echo "use_relay_thread: $USE_RELAY_THREAD"
echo "local_forwarders: $USE_LOCAL_FORWARDERS"
echo "subscriber_max: $SUBSCRIBER_MAX"
echo "ramp: $RAMP"
echo "duration: $DURATION"
Expand All @@ -269,7 +275,7 @@ echo ""

# ── Start relay ───────────────────────────────────────────────────────────────
ulimit -n 65536 2>/dev/null || true
echo "Starting relay (use_relay_thread=$USE_RELAY_THREAD, io_threads=$IO_THREADS, transport=$TRANSPORT, mvfst_bpf_steering=$BPF_STEERING)..."
echo "Starting relay (use_relay_thread=$USE_RELAY_THREAD, local_forwarders=$USE_LOCAL_FORWARDERS, io_threads=$IO_THREADS, transport=$TRANSPORT, mvfst_bpf_steering=$BPF_STEERING)..."
RELAY_LOGGING_ARG=()
[[ -n "$RELAY_LOG_SPEC" ]] && RELAY_LOGGING_ARG=("--logging=$RELAY_LOG_SPEC")
if [[ -n "$JEMALLOC" ]]; then
Expand Down
5 changes: 5 additions & 0 deletions src/MoqxCache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
*/

#include "MoqxCache.h"
#include "relay/NullConsumers.h"
#include <folly/logging/xlog.h>
#include <moxygen/MoQTrackProperties.h>

Expand Down Expand Up @@ -1292,6 +1293,10 @@ class MoqxCache::FetchWriteback : public FetchConsumer {
}
};

std::shared_ptr<TrackConsumer> MoqxCache::makePassiveConsumer(const FullTrackName& ftn) {
return getSubscribeWriteback(ftn, std::make_shared<moxygen::NullTrackConsumer>());
}

std::shared_ptr<TrackConsumer> MoqxCache::getSubscribeWriteback(
const FullTrackName& ftn,
std::shared_ptr<TrackConsumer> consumer
Expand Down
5 changes: 5 additions & 0 deletions src/MoqxCache.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ class MoqxCache {
std::shared_ptr<moxygen::TrackConsumer> consumer
);

// Returns a TrackConsumer suitable for use as a passive forwarder subscriber.
// Objects delivered to it are written to the cache; nothing is forwarded
// downstream (the inner consumer is a NullTrackConsumer).
std::shared_ptr<moxygen::TrackConsumer> makePassiveConsumer(const moxygen::FullTrackName& ftn);

// Serves objects from the cache to the consumer. If objects in the range are
// not in cache, issue one-or-more FETCH'es upstream. Objects fetched from
// upstream are written back to the cache and passed to the consumer.
Expand Down
Loading
Loading