-
Notifications
You must be signed in to change notification settings - Fork 0
Implement bidi full-duplex #1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Important Review skippedDraft detected. Please check the settings in the CodeRabbit UI or the You can disable this status message by setting the WalkthroughThe changes introduce a new read frame thread architecture for audio streaming with concurrent buffering and frame dropping. A read thread path is added alongside the existing write thread, with synchronization via read_mutex and buffering to an intermediate read_sbuffer. Frames are dropped when the buffer is full to prevent blocking. Changes
Sequence DiagramsequenceDiagram
participant Input as Input Buffer
participant RFT as read_frame_thread
participant RSB as read_sbuffer
participant AS as Remote AudioStreamer
rect rgb(200, 220, 240)
Note over RFT,AS: Read Thread Cycle
RFT->>Input: Acquire read_mutex lock
RFT->>Input: Read frame from buffer
RFT->>RFT: Check read_sbuffer capacity
alt Buffer Full
RFT->>RFT: Log warning, drop frame
else Buffer Available
RFT->>RSB: Buffer frame to read_sbuffer
RFT->>AS: Flush read_sbuffer to remote streamer
end
RFT->>Input: Release read_mutex lock
end
rect rgb(220, 200, 240)
Note over RFT: Session Cleanup
RFT->>RFT: Join read_thread
end
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes
Poem
Pre-merge checks and finishing touches❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
|
@coderabbitai review |
✅ Actions performedReview triggered.
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🧹 Nitpick comments (1)
audio_streamer_glue.cpp (1)
980-989: Thread creation result not checked.
switch_thread_createmay fail, but the function always returnsSWITCH_STATUS_SUCCESS. The caller inmod_audio_stream.cchecks forSWITCH_STATUS_FALSE, but this check would never trigger. Consider validating the thread creation result.switch_status_t stream_session_read_thread_init(switch_core_session_t *session, void *pUserData) { private_t *tech_pvt = (private_t *)pUserData; switch_threadattr_t *thd_attr = NULL; switch_threadattr_create(&thd_attr, switch_core_session_get_pool(session)); switch_threadattr_detach_set(thd_attr, 0); switch_threadattr_stacksize_set(thd_attr, SWITCH_THREAD_STACKSIZE); - switch_thread_create(&tech_pvt->read_thread, thd_attr, read_frame_thread, session, switch_core_session_get_pool(session)); - return SWITCH_STATUS_SUCCESS; + switch_status_t status = switch_thread_create(&tech_pvt->read_thread, thd_attr, read_frame_thread, session, switch_core_session_get_pool(session)); + return status; }Note: The same issue exists in
stream_session_write_thread_init(line 976-977).
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (4)
audio_streamer_glue.cpp(9 hunks)audio_streamer_glue.h(1 hunks)mod_audio_stream.c(1 hunks)mod_audio_stream.h(1 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
mod_audio_stream.c (1)
audio_streamer_glue.cpp (10)
stream_session_read_thread_init(980-989)stream_session_read_thread_init(980-980)session(134-143)session(134-134)session(145-154)session(145-145)session(156-169)session(156-156)session(207-382)session(207-207)
🔇 Additional comments (9)
audio_streamer_glue.h (1)
11-11: LGTM!The declaration is consistent with the existing
stream_session_write_thread_initAPI pattern.mod_audio_stream.h (1)
38-40: LGTM!The new
read_mutexandread_threadmembers mirror the existing write-path synchronization primitives, maintaining structural consistency.audio_streamer_glue.cpp (7)
556-563: Timer resource leak on early exit paths.If the thread returns early (lines 524-546), no timer cleanup is needed since the timer isn't created yet. However, if the timer initialization succeeds but the loop exits immediately due to
close_requestedbeing set, the timer is properly destroyed at line 593. This looks correct.
565-591: Acceptable real-time behavior withtrylock.Using
switch_mutex_trylockprevents blocking, which is appropriate for real-time audio processing. If the lock is contended, the loop advances the timer and retries next interval.
632-632: LGTM!The
read_mutexinitialization follows the same pattern aswrite_mutex.
691-695: LGTM!Proper cleanup of
read_mutexfollows the established pattern.
1036-1051: Appropriate non-blocking buffering with frame dropping.The implementation correctly uses
trylockand drops frames when the buffer is full to prevent blocking. The warning log at line 1047-1048 is useful for debugging but may be noisy under sustained load.Consider using
SWITCH_LOG_DEBUGor adding rate-limiting if this becomes too verbose in production.
1068-1070: LGTM!The buffer size calculation correctly accounts for channels and sample size. The resampled path follows the same buffering pattern as the non-resampled path.
1137-1138: LGTM!The read thread cleanup correctly mirrors the write thread pattern: stores the thread pointer, nulls it, unlocks the mutex, then joins. This prevents deadlock while ensuring proper thread termination.
Also applies to: 1167-1176
| while (!tech_pvt->close_requested && switch_core_session_running(session)) | ||
| { | ||
| if (switch_mutex_trylock(tech_pvt->read_mutex) == SWITCH_STATUS_SUCCESS) | ||
| { | ||
| if (!tech_pvt->pAudioStreamer) | ||
| { | ||
| switch_mutex_unlock(tech_pvt->read_mutex); | ||
| switch_core_timer_next(&timer); | ||
| continue; | ||
| } | ||
|
|
||
| auto *pAudioStreamer = static_cast<AudioStreamer *>(tech_pvt->pAudioStreamer); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# Find close_requested definition and usage patterns
echo "=== Searching for close_requested ==="
rg "close_requested" --type cpp --type c -B2 -A2 | head -100
echo ""
echo "=== Searching for bitfield definitions ==="
rg "bitfield|bit-field|:\s*\d+\s*;" --type c --type cpp | head -50Repository: voiceip/mod_audio_stream
Length of output: 3118
🏁 Script executed:
#!/bin/bash
# Look for finish() function and stream_session_cleanup
echo "=== Search for finish() implementation ==="
rg -A20 "void\s+finish\s*\(\s*\)" --type cpp audio_streamer_glue.cpp | head -50
echo ""
echo "=== Search for stream_session_cleanup context around close_requested setting ==="
rg -B5 -A15 "tech_pvt->close_requested = 1;" --type cpp audio_streamer_glue.cpp | tail -50
echo ""
echo "=== Check which mutexes protect pAudioStreamer ==="
rg -B3 -A3 "pAudioStreamer" --type cpp audio_streamer_glue.cpp | head -80Repository: voiceip/mod_audio_stream
Length of output: 4833
🏁 Script executed:
#!/bin/bash
# Look for finish() calls and full cleanup flow
echo "=== Search for finish() calls ==="
rg "finish\s*\(" --type cpp audio_streamer_glue.cpp -B2 -A2
echo ""
echo "=== Get full stream_session_cleanup implementation ==="
rg -A50 "void stream_session_cleanup" --type cpp audio_streamer_glue.cpp | head -80Repository: voiceip/mod_audio_stream
Length of output: 387
🏁 Script executed:
#!/bin/bash
# Get line counts first
echo "=== Line counts for audio_streamer_glue.cpp ==="
wc -l audio_streamer_glue.cpp
echo ""
echo "=== Find stream_session_cleanup starting line ==="
rg -n "void stream_session_cleanup" --type cpp audio_streamer_glue.cpp
echo ""
echo "=== Get full stream_session_cleanup function ==="
# From context, it's around line 1130, let's get 100 lines from there
sed -n '1120,1200p' audio_streamer_glue.cppRepository: voiceip/mod_audio_stream
Length of output: 3182
Confirmed: Race condition on close_requested bitfield and potential pAudioStreamer access issue.
The read_frame_thread reads close_requested without holding any mutex (line 565), but stream_session_cleanup writes to it under tech_pvt->mutex (line 1131). Since close_requested is a bitfield adjacent to audio_paused (also a bitfield), this creates a data race on the shared storage unit containing both fields. Compilers are required to store multiple adjacent bit-fields in one storage unit, and modifying adjacent bit-fields in different threads is undefined behavior.
Additionally, pAudioStreamer is accessed under read_mutex in the reader thread but under tech_pvt->mutex in cleanup (where finish() sets it to nullptr), creating a potential TOCTOU window, though the setting of close_requested first provides some mitigation.
Protect all accesses to close_requested with the same mutex, or convert it to an atomic type. Consider also acquiring read_mutex in cleanup before modifying close_requested to eliminate the TOCTOU window entirely.
🤖 Prompt for AI Agents
In audio_streamer_glue.cpp around lines 565 to 576, the reader thread reads the
bitfield close_requested without holding the same mutex used when cleanup writes
it, causing a race on the storage unit (adjacent bitfields) and a TOCTOU risk
for pAudioStreamer; fix by protecting all accesses to close_requested with the
same mutex or converting close_requested to an atomic<bool> and by ensuring
cleanup acquires the reader's read_mutex (or the same mutex used by the reader)
before modifying close_requested and setting pAudioStreamer to nullptr so both
threads synchronize accesses and eliminate the TOCTOU window.
| if (SWITCH_STATUS_FALSE == stream_session_read_thread_init(session, pUserData)) | ||
| { | ||
| switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, "error initializing stream session read thread.\n"); | ||
| return SWITCH_STATUS_FALSE; | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Resource leak if read thread initialization fails.
If stream_session_read_thread_init fails, the write thread (started at line 112) and the media bug (added at line 106) are not cleaned up. This leaves orphaned resources.
Consider calling cleanup on failure:
if (SWITCH_STATUS_FALSE == stream_session_read_thread_init(session, pUserData))
{
switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, "error initializing stream session read thread.\n");
+ stream_session_cleanup(session, NULL, 0);
return SWITCH_STATUS_FALSE;
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| if (SWITCH_STATUS_FALSE == stream_session_read_thread_init(session, pUserData)) | |
| { | |
| switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, "error initializing stream session read thread.\n"); | |
| return SWITCH_STATUS_FALSE; | |
| } | |
| if (SWITCH_STATUS_FALSE == stream_session_read_thread_init(session, pUserData)) | |
| { | |
| switch_log_printf(SWITCH_CHANNEL_SESSION_LOG(session), SWITCH_LOG_ERROR, "error initializing stream session read thread.\n"); | |
| stream_session_cleanup(session, NULL, 0); | |
| return SWITCH_STATUS_FALSE; | |
| } |
🤖 Prompt for AI Agents
In mod_audio_stream.c around lines 106 to 121, if
stream_session_read_thread_init fails the previously started write thread
(started at ~line 112) and the media bug (added at ~line 106) are not cleaned
up, leaking resources; modify the failure path to perform cleanup before
returning: stop and join the write thread, remove and free the media bug, and
release any session-related resources (or call the existing session cleanup
function used on normal teardown) so the write thread and media bug are properly
cleaned up, then return SWITCH_STATUS_FALSE.
Problem When incoming audio was being played, stream_frame() wasn't buffering outgoing audio because: It used switch_mutex_trylock() on the main mutex; if it failed, no audio was buffered. It required the main mutex to check connection status before buffering. Solution Removed main mutex dependency: stream_frame() no longer needs the main mutex to process audio. It always buffers audio when called, regardless of mutex state. Changed to blocking lock on read_mutex: Switched from trylock to lock on read_mutex to ensure frames are buffered. The read_frame_thread releases quickly, so blocking is brief. Always buffer audio: Audio is buffered regardless of connection status. The read_frame_thread checks connection before sending. Changes Made: Removed the main mutex check that was preventing audio buffering Changed switch_mutex_trylock() to switch_mutex_lock() on read_mutex to ensure frames are buffered Simplified the logic so stream_frame() always processes audio when called This ensures that: Audio from the caller is continuously buffered, even when incoming audio is being played The read_frame_thread continuously sends buffered audio to the websocket No blocking or deferring between read and write operations The fix maintains thread safety while ensuring bidirectional streaming works correctly. Test it and confirm that outgoing websocket packets continue even when incoming audio is present.
Summary of Changes Problem When incoming audio (TTS) was streaming, outgoing audio packets were sparse because: stream_frame() used a blocking lock, holding read_mutex while processing frames read_frame_thread used trylock, so it skipped when stream_frame() held the lock When FreeSWITCH writes audio, READ callbacks are infrequent, so stream_frame() runs rarely and blocks the read thread Solution Changed stream_frame() to use trylock: Avoids blocking read_frame_thread If the lock is unavailable, drops the frame rather than blocking bidirectional streaming Processes frames quickly and releases the lock immediately Changed read_frame_thread to use blocking lock but release before sending: Can wait briefly if stream_frame() is processing Releases the lock before writeBinary() to avoid blocking stream_frame() during network I/O Ensures continuous sending of buffered audio Key Improvements No mutual blocking: stream_frame() uses trylock, read_frame_thread releases before sending Continuous sending: read_frame_thread can send continuously even when READ callbacks are infrequent Minimal frame drops: Lock contention is brief, so drops should be rare Better bidirectional streaming: Both directions work independently The changes ensure that: Audio from the caller is buffered when stream_frame() is called (even if infrequently) The read_frame_thread continuously sends buffered audio to the websocket No blocking between read and write operations This should fix the issue where outgoing packets were sparse when incoming audio was streaming. The read_frame_thread will now continuously drain the buffer and send packets, even when stream_frame() is called infrequently.
61925aa to
773163a
Compare
|
Simpler to run the playback on a separate thread than the event pool. Implemented in #2 |
Summary by CodeRabbit
New Features
Improvements
✏️ Tip: You can customize this high-level summary in your review settings.