Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ class APIProviders extends InheritedWidget {
} else if (provider == 'firebaseFunction') {
return FirebaseFunctionsAPIProvider();
} else if (provider == 'sse') {
return SSEAPIProvider();
return providers[provider] ?? SSEAPIProvider();
} else {
return providers[provider] ?? httpProvider;
}
Expand Down
24 changes: 16 additions & 8 deletions modules/ensemble/lib/framework/apiproviders/sse_api_provider.dart
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,9 @@ import 'package:crypto/crypto.dart';
/// Server-Sent Events (SSE) API Provider
/// Handles streaming HTTP connections
class SSEAPIProvider extends APIProvider with LiveAPIProvider {
static final Map<String, StreamSubscription> _subscriptions = {};
static final Map<String, http.Client> _activeClients = {};
static final Set<String> _manuallyDisconnected = {};
final Map<String, StreamSubscription> _subscriptions = {};
final Map<String, http.Client> _activeClients = {};
final Set<String> _manuallyDisconnected = {};

@override
Future<void> init(String appId, Map<String, dynamic> config) async {
Expand Down Expand Up @@ -530,22 +530,30 @@ class SSEAPIProvider extends APIProvider with LiveAPIProvider {

@override
dispose() {
_manuallyDisconnected.addAll(_subscriptions.keys);
for (final apiName in _subscriptions.keys.toList()) {
_manuallyDisconnected.add(apiName);
}

// Cancel all subscriptions
for (var subscription in _subscriptions.values) {
for (final subscription in _subscriptions.values) {
subscription.cancel();
}
_subscriptions.clear();

// Close all active clients
for (var client in _activeClients.values) {
for (final client in _activeClients.values) {
client.close();
}
_activeClients.clear();

_manuallyDisconnected.clear();
}

@visibleForTesting
int get subscriptionCountForTesting => _subscriptions.length;

@visibleForTesting
void trackSubscriptionForTesting(String apiName, StreamSubscription sub) {
_subscriptions[apiName] = sub;
}
}

/// Configuration options for SSE connections
Expand Down
52 changes: 52 additions & 0 deletions modules/ensemble/test/sse_provider_dispose_test.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import 'dart:async';

import 'package:ensemble/framework/apiproviders/api_provider.dart';
import 'package:ensemble/framework/apiproviders/http_api_provider.dart';
import 'package:ensemble/framework/apiproviders/sse_api_provider.dart';
import 'package:flutter/widgets.dart';
import 'package:flutter_test/flutter_test.dart';

void main() {
group('SSEAPIProvider.dispose', () {
test('cancels only this instance subscriptions', () async {
final providerA = SSEAPIProvider();
final providerB = SSEAPIProvider();

var aCanceled = false;
var bCanceled = false;
final controllerA = StreamController<int>(onCancel: () => aCanceled = true);
final controllerB = StreamController<int>(onCancel: () => bCanceled = true);

providerA.trackSubscriptionForTesting(
'liveFeed',
controllerA.stream.listen((_) {}),
);
providerB.trackSubscriptionForTesting(
'metrics',
controllerB.stream.listen((_) {}),
);

providerA.dispose();

expect(providerA.subscriptionCountForTesting, 0);
expect(providerB.subscriptionCountForTesting, 1);
expect(aCanceled, isTrue);
expect(bCanceled, isFalse);

await controllerA.close();
await controllerB.close();
});
});

group('APIProviders.getProvider', () {
test('returns cloned sse provider from screen map', () {
final sse = SSEAPIProvider();
final providers = APIProviders(
providers: {'sse': sse, 'http': HTTPAPIProvider()},
child: const SizedBox.shrink(),
);

expect(identical(providers.getProvider('sse'), sse), isTrue);
});
});
}
Loading