Skip to content

Commit 00f9689

Browse files
authored
Merge pull request #89 from cogwheel/subscription-logging
Improve logging for stage subscriptions
2 parents ae3cf17 + 82fb3dd commit 00f9689

1 file changed

Lines changed: 68 additions & 15 deletions

File tree

src/Broadcast.cpp

Lines changed: 68 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -82,10 +82,15 @@ namespace caff {
8282
return true;
8383
}
8484

85-
bool Broadcast::transitionState(State oldState, State newState) {
85+
bool Broadcast::transitionState(State const expected, State newState) {
86+
State oldState = expected;
8687
bool result = state.compare_exchange_strong(oldState, newState);
8788
if (!result)
88-
LOG_ERROR("Transitioning to state %s expects state %s", stateString(newState), stateString(oldState));
89+
LOG_ERROR(
90+
"Transitioning to state %s expects state %s but was in state %s",
91+
stateString(newState),
92+
stateString(expected),
93+
stateString(oldState));
8994
return result;
9095
}
9196

@@ -106,12 +111,14 @@ namespace caff {
106111
return caff_ResultBroadcastFailed;
107112
}
108113

114+
LOG_DEBUG("Creating feed: %s", feedId.c_str());
109115
auto feed = currentFeedInput();
110116
feed.sdpOffer = offer;
111117

112118
auto payload = graphqlRequest<caffql::Mutation::AddFeedField>(
113119
sharedCredentials, clientId, caffql::ClientType::Capture, feed);
114120
if (!payload) {
121+
LOG_ERROR("Request failed adding feed");
115122
return caff_ResultBroadcastFailed;
116123
}
117124

@@ -124,6 +131,10 @@ namespace caff {
124131
}
125132
}
126133

134+
if (payload->stage.feeds.empty() || payload->stage.feeds[0].id != feedId) {
135+
LOG_ERROR("Expected feed in AddFeedPayload");
136+
}
137+
127138
if (!payload->stage.broadcastId) {
128139
LOG_ERROR("Expected broadcast id in AddFeedPayload");
129140
return caff_ResultBroadcastFailed;
@@ -138,6 +149,7 @@ namespace caff {
138149
}
139150

140151
streamUrl = stream->url;
152+
LOG_DEBUG("Feed added to stage");
141153

142154
return stream->sdpAnswer;
143155
}
@@ -149,10 +161,12 @@ namespace caff {
149161
broadcastThread = std::thread([=] {
150162
setupSubscription();
151163

164+
LOG_DEBUG("Creating video track");
152165
videoCapturer = new VideoCapturer;
153166
auto videoSource = factory->CreateVideoSource(videoCapturer);
154167
auto videoTrack = factory->CreateVideoTrack("external_video", videoSource);
155168

169+
LOG_DEBUG("Creating audio track");
156170
cricket::AudioOptions audioOptions;
157171
audioOptions.echo_cancellation = false;
158172
audioOptions.noise_suppression = false;
@@ -175,6 +189,7 @@ namespace caff {
175189
mediaStream->AddTrack(videoTrack);
176190
mediaStream->AddTrack(audioTrack);
177191

192+
LOG_DEBUG("Creating peer connection");
178193
webrtc::PeerConnectionInterface::RTCConfiguration config;
179194
auto observer = new PeerConnectionObserver(failedCallback);
180195
peerConnection = factory->CreatePeerConnection(config, webrtc::PeerConnectionDependencies(observer));
@@ -193,6 +208,7 @@ namespace caff {
193208

194209
std::future_status status;
195210

211+
LOG_DEBUG("Awaiting SDP offer");
196212
auto constexpr futureWait = 1s;
197213
auto creationFuture = creationObserver->getFuture();
198214
status = creationFuture.wait_for(futureWait);
@@ -222,6 +238,7 @@ namespace caff {
222238
return;
223239
}
224240

241+
LOG_DEBUG("Creating local session description");
225242
webrtc::SdpParseError offerError;
226243
auto localDesc = webrtc::CreateSessionDescription(webrtc::SdpType::kOffer, offerSdp, &offerError);
227244
if (!localDesc) {
@@ -249,6 +266,7 @@ namespace caff {
249266
return;
250267
}
251268

269+
LOG_DEBUG("Creating feed");
252270
auto result = createFeed(offerSdp);
253271
auto error = get_if<caff_Result>(&result);
254272
if (error) {
@@ -257,6 +275,7 @@ namespace caff {
257275
return;
258276
}
259277

278+
LOG_DEBUG("Creating remote session description");
260279
webrtc::SdpParseError answerError;
261280
auto remoteDesc =
262281
webrtc::CreateSessionDescription(webrtc::SdpType::kAnswer, get<std::string>(result), &answerError);
@@ -274,13 +293,15 @@ namespace caff {
274293
return;
275294
}
276295

296+
LOG_DEBUG("Trickling ICE candidates");
277297
auto & candidates = observerFuture.get();
278298
if (!trickleCandidates(candidates, streamUrl, sharedCredentials)) {
279299
LOG_ERROR("Failed to negotiate ICE");
280300
failedCallback(caff_ResultFailure);
281301
return;
282302
}
283303

304+
LOG_DEBUG("Setting remote session description");
284305
rtc::scoped_refptr<SetSessionDescriptionObserver> setRemoteObserver =
285306
new rtc::RefCountedObject<SetSessionDescriptionObserver>;
286307

@@ -301,6 +322,7 @@ namespace caff {
301322
return;
302323
}
303324

325+
LOG_DEBUG("Adding stats observer");
304326
statsObserver = new rtc::RefCountedObject<StatsObserver>(sharedCredentials);
305327

306328
if (!transitionState(State::Starting, State::Streaming)) {
@@ -364,6 +386,7 @@ namespace caff {
364386
return;
365387
}
366388

389+
LOG_DEBUG("Awaiting screenshot");
367390
std::future_status status;
368391
auto screenshotFuture = screenshotPromise.get_future();
369392
try {
@@ -376,6 +399,7 @@ namespace caff {
376399
return;
377400
}
378401

402+
LOG_DEBUG("Sending screenshot");
379403
auto screenshotData = screenshotFuture.get();
380404
if (!updateScreenshot(broadcastId.value(), screenshotData, sharedCredentials)) {
381405
LOG_ERROR("Failed to send screenshot");
@@ -388,6 +412,7 @@ namespace caff {
388412
return;
389413
}
390414

415+
LOG_DEBUG("Awaiting stage subscription");
391416
// Make sure the subscription has opened before going live
392417
{
393418
auto openedFuture = subscriptionOpened.get_future();
@@ -400,7 +425,7 @@ namespace caff {
400425
}
401426

402427
// Set stage live
403-
428+
LOG_DEBUG("Setting stage live");
404429
auto startPayload = graphqlRequest<caffql::Mutation::StartBroadcastField>(
405430
sharedCredentials, clientId, caffql::ClientType::Capture, fullTitle());
406431
if (!startPayload || startPayload->error || !startPayload->stage.live) {
@@ -423,6 +448,7 @@ namespace caff {
423448
int const max_failures = 5;
424449
int failures = 0;
425450

451+
LOG_DEBUG("Starting heartbeats");
426452
for (; state == State::Live; std::this_thread::sleep_for(checkInterval)) {
427453
// TODO: use wall time?
428454
interval += checkInterval;
@@ -431,18 +457,18 @@ namespace caff {
431457

432458
interval = 0ms;
433459

460+
LOG_DEBUG("Updating webrtc stats");
434461
peerConnection->GetStats(
435462
statsObserver,
436463
nullptr,
437464
webrtc::PeerConnectionInterface::StatsOutputLevel::kStatsOutputLevelStandard);
438465

439466
// Heartbeat broadcast
440-
441467
auto heartbeatResponse = heartbeatStream(streamUrl, sharedCredentials);
442468

443469
if (heartbeatResponse) {
470+
LOG_DEBUG("Heartbeat succeeded");
444471
failures = 0;
445-
446472
// Update the feed's connection quality if it has changed
447473
bool shouldMutateFeed = false;
448474
{
@@ -453,9 +479,14 @@ namespace caff {
453479
}
454480
}
455481

456-
if (shouldMutateFeed && !updateFeed()) {
457-
failedCallback(caff_ResultBroadcastFailed);
458-
return;
482+
if (shouldMutateFeed) {
483+
if (updateFeed()) {
484+
LOG_DEBUG("Updated feed connection quality");
485+
} else {
486+
LOG_DEBUG("Failed to update feed");
487+
failedCallback(caff_ResultBroadcastFailed);
488+
return;
489+
}
459490
}
460491
} else {
461492
LOG_ERROR("Heartbeat failed");
@@ -617,15 +648,18 @@ namespace caff {
617648
void Broadcast::setupSubscription(size_t tryNum) {
618649
std::weak_ptr<Broadcast> weakThis = shared_from_this();
619650

651+
LOG_DEBUG("Setting up GraphQL subscription. Attempt %zu", tryNum);
652+
620653
auto messageHandler = [weakThis](caffql::GraphqlResponse<caffql::StageSubscriptionPayload> update) mutable {
654+
LOG_DEBUG("Subscription message received");
621655
auto strongThis = weakThis.lock();
622656
if (!strongThis) {
657+
LOG_DEBUG("Broadcast no longer exists. Ignoring.");
623658
return;
624659
}
625660

626661
if (auto payload = get_if<caffql::StageSubscriptionPayload>(&update)) {
627-
// TODO: Check for error messages to display to the user
628-
662+
LOG_DEBUG("Stage update received");
629663
if (strongThis->subscriptionState == SubscriptionState::None) {
630664
strongThis->subscriptionState = SubscriptionState::Open;
631665
strongThis->subscriptionOpened.set_value(true);
@@ -638,10 +672,12 @@ namespace caff {
638672

639673
if (feedIsOnStage) {
640674
if (strongThis->subscriptionState == SubscriptionState::Open) {
675+
LOG_DEBUG("Feed has appeared on stage");
641676
strongThis->subscriptionState = SubscriptionState::FeedHasAppeared;
642677
}
643678

644679
if (payload->stage.live) {
680+
LOG_DEBUG("Stage has gone live");
645681
strongThis->subscriptionState = SubscriptionState::StageHasGoneLive;
646682
}
647683
} else if (
@@ -650,16 +686,25 @@ namespace caff {
650686
strongThis->isOnline()) {
651687
// If our feed has appeared in the subscription and is no longer there
652688
// while we think we're online, then we've failed.
689+
LOG_ERROR("Feed no longer exists on stage");
653690
strongThis->failedCallback(caff_ResultTakeover);
654691
return;
692+
} else {
693+
LOG_ERROR("Feed not present in stage");
655694
}
656695

657696
if (!payload->stage.live && strongThis->subscriptionState == SubscriptionState::StageHasGoneLive &&
658697
strongThis->state == State::Live) {
659698
// If the stage is no longer live after we've gone live with our feed, then we've failed.
699+
LOG_ERROR("Stage failed to go live");
660700
strongThis->failedCallback(caff_ResultTakeover);
661701
}
662702
} else if (auto errors = get_if<std::vector<caffql::GraphqlError>>(&update)) {
703+
// TODO: See if we can report more meaningful errors to user than "broadcast failed"
704+
LOG_ERROR("Error(s) creating GraphQL subscription:");
705+
for (auto & error : *errors) {
706+
LOG_ERROR(" %s", error.message.c_str());
707+
}
663708
strongThis->failedCallback(caff_ResultBroadcastFailed);
664709
}
665710
};
@@ -671,35 +716,43 @@ namespace caff {
671716
case State::Streaming:
672717
case State::Live:
673718
if (endType == WebsocketClient::ConnectionEndType::Closed) {
719+
LOG_WARNING("Stage websocket was closed.");
720+
// retry immediately
674721
strongThis->setupSubscription();
675722
return;
676723
}
724+
LOG_WARNING("Stage websocket failed.");
725+
// retry after delay below
677726
break;
678-
679727
case State::Offline:
680728
case State::Stopping:
729+
LOG_DEBUG("Broadcast is offline. Subscription ended.");
681730
return;
682731
}
683732
} else {
733+
LOG_DEBUG("Broadcast no longer exists. Subscription ended.");
684734
return;
685735
}
686736

687737
auto sleepFor = backoffDuration(tryNum);
688-
LOG_ERROR("Retrying broadcast subscription in %lld seconds", sleepFor.count());
738+
LOG_WARNING("Retrying broadcast subscription in %lld seconds.", sleepFor.count());
689739
std::this_thread::sleep_for(sleepFor);
690740

691741
if (auto strongThis = weakThis.lock()) {
692742
switch (strongThis->state) {
693743
case State::Starting:
694744
case State::Streaming:
695745
case State::Live:
696-
LOG_ERROR("Retrying broadcast subscription");
746+
LOG_WARNING("Retrying broadcast subscription.");
697747
strongThis->setupSubscription(tryNum + 1);
698-
break;
748+
return;
699749
case State::Offline:
700750
case State::Stopping:
701-
break;
751+
LOG_WARNING("Broadcast offline. Retry canceled.");
752+
return;
702753
}
754+
} else {
755+
LOG_DEBUG("Broadcast no longer exists. Retry canceled.");
703756
}
704757
};
705758

0 commit comments

Comments
 (0)