@@ -2,7 +2,7 @@ import 'dart:async';
22import 'dart:convert' ;
33import 'package:flutter/foundation.dart' ;
44import 'package:web_socket_channel/web_socket_channel.dart' ;
5- import 'package:web_socket_channel/status.dart' ;
5+ import 'package:web_socket_channel/status.dart' as status ;
66import 'exception.dart' ;
77import 'realtime_subscription.dart' ;
88import 'client.dart' ;
@@ -15,35 +15,46 @@ typedef GetFallbackCookie = String? Function();
1515
1616mixin RealtimeMixin {
1717 late Client client;
18- final Map <String , List < StreamController < RealtimeMessage >> > _channels = {};
18+ final Set <String > _channels = {};
1919 WebSocketChannel ? _websok;
2020 String ? _lastUrl;
2121 late WebSocketFactory getWebSocket;
2222 GetFallbackCookie ? getFallbackCookie;
2323 int ? get closeCode => _websok? .closeCode;
24+ int _subscriptionsCounter = 0 ;
25+ Map <int , RealtimeSubscription > _subscriptions = {};
26+ bool _notifyDone = true ;
27+ StreamSubscription ? _websocketSubscription;
28+ bool _creatingSocket = false ;
2429
2530 Future <dynamic > _closeConnection () async {
26- await _websok? .sink.close (normalClosure);
31+ await _websocketSubscription? .cancel ();
32+ await _websok? .sink.close (status.normalClosure, 'Ending session' );
2733 _lastUrl = null ;
2834 }
2935
3036 _createSocket () async {
37+ if (_creatingSocket || _channels.isEmpty) return ;
38+ _creatingSocket = true ;
3139 final uri = _prepareUri ();
3240 if (_websok == null ) {
3341 _websok = await getWebSocket (uri);
3442 _lastUrl = uri.toString ();
3543 } else {
3644 if (_lastUrl == uri.toString () && _websok? .closeCode == null ) {
45+ _creatingSocket = false ;
3746 return ;
3847 }
48+ _notifyDone = false ;
3949 await _closeConnection ();
4050 _lastUrl = uri.toString ();
4151 _websok = await getWebSocket (uri);
52+ _notifyDone = true ;
4253 }
4354 debugPrint ('subscription: $_lastUrl ' );
4455
4556 try {
46- _websok? .stream.listen ((response) {
57+ _websocketSubscription = _websok? .stream.listen ((response) {
4758 final data = RealtimeResponse .fromJson (response);
4859 switch (data.type) {
4960 case 'error' :
@@ -67,28 +78,25 @@ mixin RealtimeMixin {
6778 break ;
6879 case 'event' :
6980 final message = RealtimeMessage .fromMap (data.data);
70- for (var channel in message.channels ) {
71- if (_channels[ channel] != null ) {
72- for ( var stream in _channels[ channel] ! ) {
73- stream.sink .add (message);
81+ for (var subscription in _subscriptions.values ) {
82+ for ( var channel in message.channels ) {
83+ if (subscription.channels. contains ( channel) ) {
84+ subscription.controller .add (message);
7485 }
7586 }
7687 }
7788 break ;
7889 }
7990 }, onDone: () {
80- for (var list in _channels.values) {
81- for (var stream in list) {
82- stream.close ();
83- }
91+ if (! _notifyDone || _creatingSocket) return ;
92+ for (var subscription in _subscriptions.values) {
93+ subscription.close ();
8494 }
8595 _channels.clear ();
8696 _closeConnection ();
8797 }, onError: (err, stack) {
88- for (var list in _channels.values) {
89- for (var stream in list) {
90- stream.sink.addError (err, stack);
91- }
98+ for (var subscription in _subscriptions.values) {
99+ subscription.controller.addError (err, stack);
92100 }
93101 if (_websok? .closeCode != null && _websok? .closeCode != 1008 ) {
94102 debugPrint ("Reconnecting in one second." );
@@ -103,6 +111,8 @@ mixin RealtimeMixin {
103111 throw AppwriteException (e.message);
104112 }
105113 throw AppwriteException (e.toString ());
114+ } finally {
115+ _creatingSocket = false ;
106116 }
107117 }
108118
@@ -118,40 +128,46 @@ mixin RealtimeMixin {
118128 port: uri.port,
119129 queryParameters: {
120130 "project" : client.config['project' ],
121- "channels[]" : _channels.keys. toList (),
131+ "channels[]" : _channels.toList (),
122132 },
123133 path: uri.path + "/realtime" ,
124134 );
125135 }
126136
127137 RealtimeSubscription subscribeTo (List <String > channels) {
128138 StreamController <RealtimeMessage > controller = StreamController .broadcast ();
129- for (var channel in channels) {
130- if (! _channels.containsKey (channel)) {
131- _channels[channel] = [];
132- }
133- _channels[channel]! .add (controller);
134- }
139+ _channels.addAll (channels);
135140 Future .delayed (Duration .zero, () => _createSocket ());
141+ int id = DateTime .now ().microsecondsSinceEpoch;
136142 RealtimeSubscription subscription = RealtimeSubscription (
137- stream: controller.stream,
143+ controller: controller,
144+ channels: channels,
138145 close: () async {
146+ _subscriptions.remove (id);
147+ _subscriptionsCounter-- ;
139148 controller.close ();
140- for (var channel in channels) {
141- _channels[channel]! .remove (controller);
142- if (_channels[channel]! .isEmpty) {
143- _channels.remove (channel);
144- }
145- }
146- if (_channels.isNotEmpty) {
149+ _cleanup (channels);
150+
151+ if (_channels.isNotEmpty) {
147152 await Future .delayed (Duration .zero, () => _createSocket ());
148153 } else {
149154 await _closeConnection ();
150155 }
151156 });
157+ _subscriptions[id] = subscription;
152158 return subscription;
153159 }
154160
161+ void _cleanup (List <String > channels) {
162+ for (var channel in channels) {
163+ bool found = _subscriptions.values
164+ .any ((subscription) => subscription.channels.contains (channel));
165+ if (! found) {
166+ _channels.remove (channel);
167+ }
168+ }
169+ }
170+
155171 void handleError (RealtimeResponse response) {
156172 if (response.data['code' ] == 1008 ) {
157173 throw AppwriteException (response.data["message" ], response.data["code" ]);
0 commit comments