Skip to content

Commit 382ffe4

Browse files
subscription_thread_dispatcher for handling creating threads which execute user set callbacks
1 parent 4dc36ef commit 382ffe4

File tree

7 files changed

+1046
-627
lines changed

7 files changed

+1046
-627
lines changed

CMakeLists.txt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -335,6 +335,7 @@ add_library(livekit SHARED
335335
src/room.cpp
336336
src/room_proto_converter.cpp
337337
src/room_proto_converter.h
338+
src/subscription_thread_dispatcher.cpp
338339
src/local_participant.cpp
339340
src/remote_participant.cpp
340341
src/stats.cpp
@@ -723,4 +724,4 @@ add_custom_target(clean_all
723724
COMMAND ${CMAKE_COMMAND} -E chdir "${CMAKE_SOURCE_DIR}" ${CMAKE_COMMAND} -E rm -rf "${CMAKE_BINARY_DIR}"
724725
COMMENT "Full clean: CMake outputs + Rust target + generated protos + delete build/"
725726
VERBATIM
726-
)
727+
)

include/livekit/room.h

Lines changed: 2 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -17,38 +17,24 @@
1717
#ifndef LIVEKIT_ROOM_H
1818
#define LIVEKIT_ROOM_H
1919

20-
#include "livekit/audio_stream.h"
2120
#include "livekit/data_stream.h"
2221
#include "livekit/e2ee.h"
2322
#include "livekit/ffi_handle.h"
2423
#include "livekit/room_event_types.h"
25-
#include "livekit/video_stream.h"
24+
#include "livekit/subscription_thread_dispatcher.h"
2625

2726
#include <cstdint>
28-
#include <functional>
2927
#include <memory>
3028
#include <mutex>
31-
#include <thread>
3229

3330
namespace livekit {
3431

35-
class AudioFrame;
36-
class VideoFrame;
3732
class RoomDelegate;
3833
struct RoomInfoData;
3934
namespace proto {
4035
class FfiEvent;
4136
}
4237

43-
/// Callback type for incoming audio frames.
44-
/// Invoked on a dedicated reader thread per (participant, source) pair.
45-
using AudioFrameCallback = std::function<void(const AudioFrame &)>;
46-
47-
/// Callback type for incoming video frames.
48-
/// Invoked on a dedicated reader thread per (participant, source) pair.
49-
using VideoFrameCallback =
50-
std::function<void(const VideoFrame &frame, std::int64_t timestamp_us)>;
51-
5238
struct E2EEOptions;
5339
class E2EEManager;
5440
class LocalParticipant;
@@ -311,8 +297,6 @@ class Room {
311297
TrackSource source);
312298

313299
private:
314-
friend class RoomCallbackTest;
315-
316300
mutable std::mutex lock_;
317301
ConnectionState connection_state_ = ConnectionState::Disconnected;
318302
RoomDelegate *delegate_ = nullptr; // Not owned
@@ -330,75 +314,12 @@ class Room {
330314
byte_stream_readers_;
331315
// E2EE
332316
std::unique_ptr<E2EEManager> e2ee_manager_;
317+
std::shared_ptr<SubscriptionThreadDispatcher> subscription_thread_dispatcher_;
333318

334319
// FfiClient listener ID (0 means no listener registered)
335320
int listener_id_{0};
336321

337322
void OnEvent(const proto::FfiEvent &event);
338-
339-
// -------------------------------------------------------------------
340-
// Frame callback internals
341-
// -------------------------------------------------------------------
342-
343-
struct CallbackKey {
344-
std::string participant_identity;
345-
TrackSource source;
346-
bool operator==(const CallbackKey &o) const {
347-
return participant_identity == o.participant_identity &&
348-
source == o.source;
349-
}
350-
};
351-
352-
struct CallbackKeyHash {
353-
std::size_t operator()(const CallbackKey &k) const {
354-
auto h1 = std::hash<std::string>{}(k.participant_identity);
355-
auto h2 = std::hash<int>{}(static_cast<int>(k.source));
356-
return h1 ^ (h2 << 1);
357-
}
358-
};
359-
360-
struct ActiveReader {
361-
std::shared_ptr<AudioStream> audio_stream;
362-
std::shared_ptr<VideoStream> video_stream;
363-
std::thread thread;
364-
};
365-
366-
struct RegisteredAudioCallback {
367-
AudioFrameCallback callback;
368-
AudioStream::Options options;
369-
};
370-
371-
struct RegisteredVideoCallback {
372-
VideoFrameCallback callback;
373-
VideoStream::Options options;
374-
};
375-
376-
std::unordered_map<CallbackKey, RegisteredAudioCallback, CallbackKeyHash>
377-
audio_callbacks_;
378-
std::unordered_map<CallbackKey, RegisteredVideoCallback, CallbackKeyHash>
379-
video_callbacks_;
380-
std::unordered_map<CallbackKey, ActiveReader, CallbackKeyHash>
381-
active_readers_;
382-
383-
static constexpr int kMaxActiveReaders = 20;
384-
385-
// Must be called with lock_ held. Closes the stream for the given key and
386-
// returns the old reader thread (which the caller must join outside the
387-
// lock).
388-
std::thread extractReaderThread(const CallbackKey &key);
389-
390-
// Must be called with lock_ held. Returns old reader thread if one existed.
391-
std::thread startAudioReader(const CallbackKey &key,
392-
const std::shared_ptr<Track> &track,
393-
AudioFrameCallback cb,
394-
const AudioStream::Options &opts);
395-
std::thread startVideoReader(const CallbackKey &key,
396-
const std::shared_ptr<Track> &track,
397-
VideoFrameCallback cb,
398-
const VideoStream::Options &opts);
399-
400-
// Stops all readers (closes streams, joins threads). Must NOT hold lock_.
401-
void stopAllReaders();
402323
};
403324
} // namespace livekit
404325

0 commit comments

Comments
 (0)