diff --git a/modules/ensemble/lib/framework/apiproviders/firestore/firestore_api_provider.dart b/modules/ensemble/lib/framework/apiproviders/firestore/firestore_api_provider.dart index 2c6dbf988..a1a905022 100644 --- a/modules/ensemble/lib/framework/apiproviders/firestore/firestore_api_provider.dart +++ b/modules/ensemble/lib/framework/apiproviders/firestore/firestore_api_provider.dart @@ -12,10 +12,23 @@ import 'package:yaml/yaml.dart'; import 'package:firebase_core/firebase_core.dart'; import 'package:flutter/foundation.dart'; +/// Replaces an existing Firestore listener for [apiName], cancelling any prior +/// subscription so repeated `listenForChanges` invocations do not accumulate. +@visibleForTesting +void replaceFirestoreApiSubscription( + Map subscriptions, + String apiName, + StreamSubscription subscription, +) { + subscriptions[apiName]?.cancel(); + subscriptions[apiName] = subscription; +} + class FirestoreAPIProvider extends APIProvider with LiveAPIProvider { FirebaseApp? _app; FirebaseFirestore get firestore => FirebaseFirestore.instanceFor(app: _app!); - List _subscriptions = []; + Map _subscriptions = {}; + bool _disposed = false; late FirestoreApp firestoreApp; @override @@ -259,16 +272,26 @@ class FirestoreAPIProvider extends APIProvider with LiveAPIProvider { List pathSegments = path.split('/'); bool isDocumentPath = pathSegments.length % 2 == 0; + void deliverSnapshot(dynamic snapshot) { + if (_disposed) return; + listener(getOKResponse(apiName, snapshot)); + } + if (isDocumentPath) { DocumentReference docRef = firestore.doc(path); - _subscriptions.add(docRef.snapshots().listen((DocumentSnapshot snapshot) { - listener.call(getOKResponse(apiName, snapshot)); - })); + replaceFirestoreApiSubscription( + _subscriptions, + apiName, + docRef.snapshots().listen(deliverSnapshot), + ); } else { - Query query = firestoreApp.getQuery(api, isCollectionGroup: api['isCollectionGroup'] ?? false); - _subscriptions.add(query.snapshots().listen((QuerySnapshot snapshot) { - listener.call(getOKResponse(apiName, snapshot)); - })); + Query query = firestoreApp.getQuery( + api, isCollectionGroup: api['isCollectionGroup'] ?? false); + replaceFirestoreApiSubscription( + _subscriptions, + apiName, + query.snapshots().listen(deliverSnapshot), + ); } Map body = { 'message': 'Subscribed to API', @@ -284,13 +307,12 @@ class FirestoreAPIProvider extends APIProvider with LiveAPIProvider { @override dispose() { + _disposed = true; try { - if (_subscriptions.isNotEmpty) { - for (var subscription in _subscriptions) { - subscription.cancel(); - } + for (final subscription in _subscriptions.values) { + subscription.cancel(); } - _subscriptions = []; + _subscriptions.clear(); } catch (e) { print('Error disposing FirestoreAPIProvider: $e'); } diff --git a/modules/ensemble/lib/framework/scope.dart b/modules/ensemble/lib/framework/scope.dart index 2d51ef545..b6ef34db3 100644 --- a/modules/ensemble/lib/framework/scope.dart +++ b/modules/ensemble/lib/framework/scope.dart @@ -664,6 +664,8 @@ mixin PageBindingManager on IsScopeManager { void dispatch(ModelChangeEvent event) { //log("EventBus ${eventBus.hashCode} firing $event"); + // Page dispose closes the event bus before Screen cancels live API listeners. + if (eventBus.streamController.isClosed) return; eventBus.fire(event); } diff --git a/modules/ensemble/test/firestore_listen_dedup_test.dart b/modules/ensemble/test/firestore_listen_dedup_test.dart new file mode 100644 index 000000000..69c954c75 --- /dev/null +++ b/modules/ensemble/test/firestore_listen_dedup_test.dart @@ -0,0 +1,57 @@ +import 'dart:async'; + +import 'package:ensemble/framework/apiproviders/firestore/firestore_api_provider.dart'; +import 'package:event_bus/event_bus.dart'; +import 'package:flutter_test/flutter_test.dart'; + +void main() { + group('replaceFirestoreApiSubscription', () { + test('cancels prior subscription for the same apiName', () async { + final firstController = StreamController(); + final secondController = StreamController(); + var firstEventsAfterReplace = 0; + + final first = firstController.stream.listen((_) { + firstEventsAfterReplace++; + }); + final second = secondController.stream.listen((_) {}); + + final subscriptions = {'users': first}; + replaceFirestoreApiSubscription(subscriptions, 'users', second); + + firstController.add(1); + await Future.delayed(Duration.zero); + + expect(firstEventsAfterReplace, 0); + expect(subscriptions['users'], same(second)); + expect(first.isPaused, isFalse); + + await firstController.close(); + await secondController.close(); + }); + + test('keeps subscriptions for different api names', () async { + final usersController = StreamController(); + final ordersController = StreamController(); + final users = usersController.stream.listen((_) {}); + final orders = ordersController.stream.listen((_) {}); + + final subscriptions = {'users': users}; + replaceFirestoreApiSubscription(subscriptions, 'orders', orders); + + expect(subscriptions.keys, containsAll(['users', 'orders'])); + + await usersController.close(); + await ordersController.close(); + }); + }); + + group('event bus dispose race', () { + test('closed event bus rejects new events', () { + final bus = EventBus(); + bus.destroy(); + expect(bus.streamController.isClosed, isTrue); + expect(() => bus.fire('event'), throwsStateError); + }); + }); +}