@@ -2,7 +2,7 @@ import 'dart:async';
22import 'dart:convert' ;
33import 'package:flutter/foundation.dart' ;
44import 'package:web_socket_channel/web_socket_channel.dart' ;
5- import 'package:web_socket_channel/status.dart' as status ;
5+ import 'package:web_socket_channel/status.dart' ;
66import 'exception.dart' ;
77import 'realtime_subscription.dart' ;
88import 'client.dart' ;
@@ -15,46 +15,35 @@ typedef GetFallbackCookie = String? Function();
1515
1616mixin RealtimeMixin {
1717 late Client client;
18- final Set <String > _channels = {};
18+ final Map <String , List < StreamController < RealtimeMessage >> > _channels = {};
1919 WebSocketChannel ? _websok;
2020 String ? _lastUrl;
2121 late WebSocketFactory getWebSocket;
2222 GetFallbackCookie ? getFallbackCookie;
2323 int ? get closeCode => _websok? .closeCode;
24- int _subscriptionsCounter = 0 ;
25- Map <int , RealtimeSubscription > _subscriptions = {};
26- bool _notifyDone = true ;
27- StreamSubscription ? _websocketSubscription;
28- bool _creatingSocket = false ;
2924
3025 Future <dynamic > _closeConnection () async {
31- await _websocketSubscription? .cancel ();
32- await _websok? .sink.close (status.normalClosure, 'Ending session' );
26+ await _websok? .sink.close (normalClosure);
3327 _lastUrl = null ;
3428 }
3529
3630 _createSocket () async {
37- if (_creatingSocket || _channels.isEmpty) return ;
38- _creatingSocket = true ;
3931 final uri = _prepareUri ();
4032 if (_websok == null ) {
4133 _websok = await getWebSocket (uri);
4234 _lastUrl = uri.toString ();
4335 } else {
4436 if (_lastUrl == uri.toString () && _websok? .closeCode == null ) {
45- _creatingSocket = false ;
4637 return ;
4738 }
48- _notifyDone = false ;
4939 await _closeConnection ();
5040 _lastUrl = uri.toString ();
5141 _websok = await getWebSocket (uri);
52- _notifyDone = true ;
5342 }
5443 debugPrint ('subscription: $_lastUrl ' );
5544
5645 try {
57- _websocketSubscription = _websok? .stream.listen ((response) {
46+ _websok? .stream.listen ((response) {
5847 final data = RealtimeResponse .fromJson (response);
5948 switch (data.type) {
6049 case 'error' :
@@ -78,25 +67,28 @@ mixin RealtimeMixin {
7867 break ;
7968 case 'event' :
8069 final message = RealtimeMessage .fromMap (data.data);
81- for (var subscription in _subscriptions.values ) {
82- for ( var channel in message.channels ) {
83- if (subscription.channels. contains ( channel) ) {
84- subscription.controller .add (message);
70+ for (var channel in message.channels ) {
71+ if (_channels[ channel] != null ) {
72+ for ( var stream in _channels[ channel] ! ) {
73+ stream.sink .add (message);
8574 }
8675 }
8776 }
8877 break ;
8978 }
9079 }, onDone: () {
91- if (! _notifyDone || _creatingSocket) return ;
92- for (var subscription in _subscriptions.values) {
93- subscription.close ();
80+ for (var list in _channels.values) {
81+ for (var stream in list) {
82+ stream.close ();
83+ }
9484 }
9585 _channels.clear ();
9686 _closeConnection ();
9787 }, onError: (err, stack) {
98- for (var subscription in _subscriptions.values) {
99- subscription.controller.addError (err, stack);
88+ for (var list in _channels.values) {
89+ for (var stream in list) {
90+ stream.sink.addError (err, stack);
91+ }
10092 }
10193 if (_websok? .closeCode != null && _websok? .closeCode != 1008 ) {
10294 debugPrint ("Reconnecting in one second." );
@@ -111,8 +103,6 @@ mixin RealtimeMixin {
111103 throw AppwriteException (e.message);
112104 }
113105 throw AppwriteException (e.toString ());
114- } finally {
115- _creatingSocket = false ;
116106 }
117107 }
118108
@@ -128,46 +118,40 @@ mixin RealtimeMixin {
128118 port: uri.port,
129119 queryParameters: {
130120 "project" : client.config['project' ],
131- "channels[]" : _channels.toList (),
121+ "channels[]" : _channels.keys. toList (),
132122 },
133123 path: uri.path + "/realtime" ,
134124 );
135125 }
136126
137127 RealtimeSubscription subscribeTo (List <String > channels) {
138128 StreamController <RealtimeMessage > controller = StreamController .broadcast ();
139- _channels.addAll (channels);
129+ for (var channel in channels) {
130+ if (! _channels.containsKey (channel)) {
131+ _channels[channel] = [];
132+ }
133+ _channels[channel]! .add (controller);
134+ }
140135 Future .delayed (Duration .zero, () => _createSocket ());
141- int id = DateTime .now ().microsecondsSinceEpoch;
142136 RealtimeSubscription subscription = RealtimeSubscription (
143- controller: controller,
144- channels: channels,
137+ stream: controller.stream,
145138 close: () async {
146- _subscriptions.remove (id);
147- _subscriptionsCounter-- ;
148139 controller.close ();
149- _cleanup (channels);
150-
151- if (_channels.isNotEmpty) {
140+ for (var channel in channels) {
141+ _channels[channel]! .remove (controller);
142+ if (_channels[channel]! .isEmpty) {
143+ _channels.remove (channel);
144+ }
145+ }
146+ if (_channels.isNotEmpty) {
152147 await Future .delayed (Duration .zero, () => _createSocket ());
153148 } else {
154149 await _closeConnection ();
155150 }
156151 });
157- _subscriptions[id] = subscription;
158152 return subscription;
159153 }
160154
161- void _cleanup (List <String > channels) {
162- for (var channel in channels) {
163- bool found = _subscriptions.values
164- .any ((subscription) => subscription.channels.contains (channel));
165- if (! found) {
166- _channels.remove (channel);
167- }
168- }
169- }
170-
171155 void handleError (RealtimeResponse response) {
172156 if (response.data['code' ] == 1008 ) {
173157 throw AppwriteException (response.data["message" ], response.data["code" ]);
0 commit comments