Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -88,4 +88,4 @@ _build
lib/generated_plugin_registrant.dart

# Test files - ignore any binary files in testfiles directory
testfiles/*.bin
testfiles/*.bin
295 changes: 267 additions & 28 deletions lib/src/core/pending_track_queue.dart
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

import 'dart:math' as math;

import 'package:flutter_webrtc/flutter_webrtc.dart' as rtc;
import 'package:meta/meta.dart';

Expand All @@ -26,26 +28,97 @@ typedef TrackExceptionEmitter = void Function(TrackSubscriptionExceptionEvent ev
@internal
class PendingTrackQueue {
final int maxSize;
Duration ttl;
final int _maxPerParticipant;
Duration _queueTtl;
Duration _metadataTimeout;
final TrackExceptionEmitter emitException;

// keyed by participant sid
final Map<String, List<PendingTrack>> _pending = {};

PendingTrackQueue({
required this.ttl,
required Duration metadataTimeout,
required this.emitException,
this.maxSize = 100,
});
}) : _metadataTimeout = metadataTimeout,
_queueTtl = _deriveQueueTtl(metadataTimeout),
// Keep any single participant from starving others by capping their share
// to roughly 25% of the total queue capacity (at least one slot).
_maxPerParticipant = math.max(1, (maxSize / 4).floor());

Duration get ttl => _queueTtl;

Duration get metadataTimeout => _metadataTimeout;

@visibleForTesting
PendingTrackQueueStats get stats => _buildStats();

@visibleForTesting
List<PendingTrack> debugPendingFor(String participantSid) =>
List<PendingTrack>.unmodifiable(_pending[participantSid] ?? const []);

void updateTtl(Duration ttl) {
this.ttl = ttl;
bool get hasPending => _pending.values.any((entries) => entries.isNotEmpty);

void updateTimeouts(Duration metadataTimeout) {
_metadataTimeout = metadataTimeout;
_queueTtl = _deriveQueueTtl(metadataTimeout);
refreshAll(reason: 'timeouts updated');
}

void clear() {
void clear({String reason = 'manual clear'}) {
final currentStats = stats;
if (currentStats.totalEntries > 0) {
logger.finer('Clearing pending track queue reason:$reason total:${currentStats.totalEntries}');
}
_pending.clear();
}

void refreshParticipant(String participantSid, {String reason = 'metadata progress'}) {
final entries = _pending[participantSid];
if (entries == null || entries.isEmpty) return;
final newExpiry = _newExpiry();
for (final entry in entries) {
entry.expiresAt = newExpiry;
}
logger.finer('Refreshed ${entries.length} pending tracks for participantSid:$participantSid reason:$reason');
}

void refreshAll({String reason = 'manual refresh'}) {
if (!hasPending) return;
final newExpiry = _newExpiry();
var refreshed = 0;
_pending.forEach((sid, list) {
for (final entry in list) {
entry.expiresAt = newExpiry;
refreshed++;
}
});
if (refreshed > 0) {
logger.finer('Refreshed ${refreshed} pending tracks reason:$reason');
}
}

void removeParticipant(String participantSid, {String reason = 'participant removed'}) {
final removed = _pending.remove(participantSid);
if (removed == null || removed.isEmpty) return;
final now = DateTime.now();
final maxAge = removed
.map((entry) => now.difference(entry.enqueuedAt))
.fold<Duration>(Duration.zero, (acc, value) => value > acc ? value : acc);
logger.finer(
'Removed ${removed.length} pending tracks sid:$participantSid reason:$reason maxAgeMs:${maxAge.inMilliseconds}',
);
for (final item in removed) {
emitException(
TrackSubscriptionExceptionEvent(
participant: null,
sid: item.trackSid,
reason: TrackSubscribeFailReason.noParticipantFound,
),
);
}
}

void enqueue({
required rtc.MediaStreamTrack track,
required rtc.MediaStream stream,
Expand All @@ -68,68 +141,183 @@ class PendingTrackQueue {

_removeExpired();

final listForParticipant = _pending.putIfAbsent(participantSid, () => []);
if (listForParticipant.length >= _maxPerParticipant) {
_dropWithException(
participantSid: participantSid,
trackSid: trackSid,
reason: 'per-participant capacity reached ($_maxPerParticipant)',
);
return;
}

final totalPending = _pending.values.fold<int>(0, (sum, list) => sum + list.length);
if (totalPending >= maxSize) {
final event = TrackSubscriptionExceptionEvent(
participant: null,
sid: trackSid,
reason: TrackSubscribeFailReason.noParticipantFound,
_dropWithException(
participantSid: participantSid,
trackSid: trackSid,
reason: 'global capacity reached ($maxSize)',
);
logger.severe('Pending track queue full, dropping trackSid:$trackSid participantSid:$participantSid');
emitException(event);
return;
}

final expiresAt = DateTime.now().add(ttl);
logger.fine('Queueing pending trackSid:$trackSid participantSid:$participantSid until metadata is ready');
final now = DateTime.now();
final expiresAt = now.add(_queueTtl);
logger.fine(
'Queueing pending trackSid:$trackSid participantSid:$participantSid pending=${totalPending + 1} '
'participantPending=${listForParticipant.length + 1}',
);
final entry = PendingTrack(
track: track,
stream: stream,
receiver: receiver,
participantSid: participantSid,
trackSid: trackSid,
enqueuedAt: now,
expiresAt: expiresAt,
);
final list = _pending.putIfAbsent(participantSid, () => []);
list.add(entry);
listForParticipant.add(entry);
}

@internal
Future<void> flush({
Future<PendingTrackQueueFlushResult> flush({
required bool isConnected,
String? participantSid,
required PendingTrackSubscriber subscriber,
}) async {
_removeExpired();
if (!isConnected) return;
if (!isConnected) {
return PendingTrackQueueFlushResult(
attempted: 0,
succeeded: 0,
transientFailures: 0,
hasPending: hasPending,
skippedForDisconnect: true,
);
}

final Iterable<PendingTrack> source = participantSid != null
? List<PendingTrack>.from(_pending[participantSid] ?? const [])
: _pending.values.expand((e) => e).toList();

var attempted = 0;
var succeeded = 0;
var transientFailures = 0;

for (final item in source) {
final success = await subscriber(item);
if (!item.beginProcessing()) {
continue;
}

attempted++;
var success = false;
try {
success = await subscriber(item);
} catch (error, stack) {
logger.warning(
'Pending track subscriber threw trackSid:${item.trackSid} participantSid:${item.participantSid}',
error,
stack,
);
} finally {
item.endProcessing();
}

if (success) {
succeeded++;
_pending[item.participantSid]?.remove(item);
if ((_pending[item.participantSid]?.isEmpty ?? false)) {
_pending.remove(item.participantSid);
}
} else {
transientFailures++;
item.retryCount += 1;
}
}

return PendingTrackQueueFlushResult(
attempted: attempted,
succeeded: succeeded,
transientFailures: transientFailures,
hasPending: hasPending,
skippedForDisconnect: false,
);
}

void _removeExpired() {
final now = DateTime.now();
_pending.forEach((sid, list) {
final expired = list.where((p) => p.expiresAt.isBefore(now)).toList();
for (final item in expired) {
list.remove(item);
final event = TrackSubscriptionExceptionEvent(
final expiredEntries = <PendingTrack>[];
_pending.removeWhere((sid, list) {
list.removeWhere((item) {
if (item.expiresAt.isBefore(now)) {
expiredEntries.add(item);
return true;
}
return false;
});
return list.isEmpty;
});

for (final item in expiredEntries) {
final age = now.difference(item.enqueuedAt);
logger.warning(
'Pending track expired (ttl) trackSid:${item.trackSid} participantSid:${item.participantSid} '
'ageMs:${age.inMilliseconds}',
);
emitException(
TrackSubscriptionExceptionEvent(
participant: null,
sid: item.trackSid,
reason: TrackSubscribeFailReason.noParticipantFound,
);
logger.warning('Pending track expired waiting for participant metadata: $event');
emitException(event);
),
);
}
}

static Duration _deriveQueueTtl(Duration subscribeTimeout) {
final multiplied = Duration(milliseconds: subscribeTimeout.inMilliseconds * 3);
const minTtl = Duration(seconds: 30);
return multiplied >= minTtl ? multiplied : minTtl;
}

DateTime _newExpiry() => DateTime.now().add(_queueTtl);

PendingTrackQueueStats _buildStats() {
var total = 0;
Duration? oldest;
final perParticipant = <String, int>{};
final now = DateTime.now();
_pending.forEach((sid, list) {
total += list.length;
perParticipant[sid] = list.length;
for (final entry in list) {
final age = now.difference(entry.enqueuedAt);
if (oldest == null || age > oldest!) {
oldest = age;
}
}
});
return PendingTrackQueueStats(
totalEntries: total,
entriesPerParticipant: perParticipant,
oldestEntryAge: oldest,
);
}

void _dropWithException({
required String participantSid,
required String trackSid,
required String reason,
}) {
final event = TrackSubscriptionExceptionEvent(
participant: null,
sid: trackSid,
reason: TrackSubscribeFailReason.noParticipantFound,
);
logger.severe(
'Pending track queue drop trackSid:$trackSid participantSid:$participantSid reason:$reason',
);
emitException(event);
}
}

Expand All @@ -140,14 +328,65 @@ class PendingTrack {
final rtc.RTCRtpReceiver? receiver;
final String participantSid;
final String trackSid;
final DateTime expiresAt;
final DateTime enqueuedAt;
DateTime expiresAt;
bool _processing = false;
int retryCount = 0;

PendingTrack({
required this.track,
required this.stream,
required this.receiver,
required this.participantSid,
required this.trackSid,
required this.enqueuedAt,
required this.expiresAt,
});

bool beginProcessing() {
if (_processing) return false;
_processing = true;
return true;
}

void endProcessing() {
_processing = false;
}
}

@internal
class PendingTrackQueueStats {
final int totalEntries;
final Map<String, int> entriesPerParticipant;
final Duration? oldestEntryAge;

const PendingTrackQueueStats({
required this.totalEntries,
required this.entriesPerParticipant,
required this.oldestEntryAge,
});

bool get hasEntries => totalEntries > 0;

@override
String toString() =>
'PendingTrackQueueStats(total:$totalEntries, oldestMs:${oldestEntryAge?.inMilliseconds}, perParticipant:$entriesPerParticipant)';
}

class PendingTrackQueueFlushResult {
final int attempted;
final int succeeded;
final int transientFailures;
final bool hasPending;
final bool skippedForDisconnect;

const PendingTrackQueueFlushResult({
required this.attempted,
required this.succeeded,
required this.transientFailures,
required this.hasPending,
required this.skippedForDisconnect,
});

bool get needsRetry => hasPending && !skippedForDisconnect;
}
Loading