Skip to content

Commit 28682bb

Browse files
author
Jeroen van der Heijden
authored
Change reconnect behavior (#27)
* change reconnect behavior * retry connect on auth failure * remove not required sleep
1 parent 002222d commit 28682bb

3 files changed

Lines changed: 50 additions & 35 deletions

File tree

thingsdb/client/client.py

Lines changed: 47 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,13 @@
55
from ssl import SSLContext, PROTOCOL_TLS
66
from typing import Optional, Union, Any
77
from deprecation import deprecated
8+
from concurrent.futures import CancelledError
89
from .buildin import Buildin
910
from .protocol import Proto
1011
from .protocol import Protocol
1112
from .abc.events import Events
1213
from ..util.convert import convert
13-
from ..exceptions import ForbiddenError
14+
from ..exceptions import ForbiddenError, NodeError, AuthError
1415

1516

1617

@@ -57,6 +58,7 @@ def __init__(
5758
self._pool = None
5859
self._protocol = None
5960
self._pid = 0
61+
self._write_pkg = self._ensure_write if auto_reconnect else self._write
6062
self._reconnect = auto_reconnect
6163
self._scope = '@t' # default to thingsdb scope
6264
self._pool_idx = 0
@@ -354,9 +356,6 @@ def query(
354356
raised. See thingsdb.exceptions for all possible exceptions and
355357
https://docs.thingsdb.net/v0/errors/ for info on the error codes.
356358
"""
357-
if self._protocol is None:
358-
raise ConnectionError('no connection')
359-
360359
if scope is None:
361360
scope = self._scope
362361

@@ -367,7 +366,42 @@ def query(
367366
else:
368367
data = [scope, code]
369368

370-
return self._protocol.write(Proto.REQ_QUERY, data, timeout=timeout)
369+
return self._write_pkg(Proto.REQ_QUERY, data, timeout=timeout)
370+
371+
async def _ensure_write(
372+
self,
373+
tp: Proto,
374+
data: Any = None,
375+
is_bin: bool = False,
376+
timeout: Optional[int] = None
377+
) -> asyncio.Future:
378+
while True:
379+
if not self.is_connected():
380+
logging.info('wait for a connection')
381+
await asyncio.sleep(1.0)
382+
continue
383+
384+
try:
385+
res = await self._protocol.write(tp, data, is_bin, timeout)
386+
except (CancelledError, NodeError, AuthError) as e:
387+
logging.error(
388+
f'error sending package: '
389+
f'{e}({e.__class__.__name__}) (will try again)')
390+
await asyncio.sleep(1.0)
391+
continue
392+
393+
return res
394+
395+
async def _write(
396+
self,
397+
tp: Proto,
398+
data: Any = None,
399+
is_bin: bool = False,
400+
timeout: Optional[int] = None
401+
) -> asyncio.Future:
402+
if not self.is_connected():
403+
raise ConnectionError('no connection')
404+
return await self._protocol.write(tp, data, is_bin, timeout)
371405

372406
def run(
373407
self,
@@ -421,9 +455,6 @@ def run(
421455
raised. See thingsdb.exceptions for all possible exceptions and
422456
https://docs.thingsdb.net/v0/errors/ for info on the error codes.
423457
"""
424-
if self._protocol is None:
425-
raise ConnectionError('no connection')
426-
427458
if scope is None:
428459
scope = self._scope
429460

@@ -435,7 +466,7 @@ def run(
435466
elif args and convert_args:
436467
args = [convert(arg) for arg in args]
437468

438-
return self._protocol.write(
469+
return self._write_pkg(
439470
Proto.REQ_RUN,
440471
[scope, procedure, args],
441472
timeout=timeout)
@@ -468,13 +499,10 @@ def watch(self, *ids: int, scope: Optional[str] = None) -> asyncio.Future:
468499
asyncio.Future (None):
469500
Future which result will be set to `None` if successful.
470501
"""
471-
if self._protocol is None:
472-
raise ConnectionError('no connection')
473-
474502
if scope is None:
475503
scope = self._scope
476504

477-
return self._protocol.write(Proto.REQ_WATCH, [scope, *ids])
505+
return self._write_pkg(Proto.REQ_WATCH, [scope, *ids])
478506

479507
def unwatch(
480508
self,
@@ -503,13 +531,10 @@ def unwatch(
503531
asyncio.Future (None):
504532
Future which result will be set to `None` if successful.
505533
"""
506-
if self._protocol is None:
507-
raise ConnectionError('no connection')
508-
509534
if scope is None:
510535
scope = self._scope
511536

512-
return self._protocol.write(Proto.REQ_UNWATCH, [scope, *ids])
537+
return self._write_pkg(Proto.REQ_UNWATCH, [scope, *ids])
513538

514539
@staticmethod
515540
def _auth_check(auth):
@@ -556,7 +581,6 @@ def _on_event(self, pkg):
556581
def _on_connection_lost(self, protocol, exc):
557582
if self._protocol is not protocol:
558583
return
559-
560584
self._protocol = None
561585

562586
if self._reconnect:
@@ -571,9 +595,11 @@ async def _reconnect_loop(self):
571595
try:
572596
await self._connect(timeout=timeout)
573597
await self._ping(timeout=2)
598+
await self._authenticate(timeout=5)
574599
except Exception as e:
575600
logging.error(
576-
f'connecting to {host}:{port} failed ({e}), '
601+
f'connecting to {host}:{port} failed: '
602+
f'{e}({e.__class__.__name__}), '
577603
f'try next connect in {wait_time} seconds'
578604
)
579605
else:
@@ -587,8 +613,6 @@ async def _reconnect_loop(self):
587613
wait_time = min(wait_time, self.MAX_RECONNECT_WAIT_TIME)
588614
timeout = min(timeout+1, self.MAX_RECONNECT_TIMEOUT)
589615

590-
await self._authenticate(timeout=5)
591-
592616
if self._reconnect:
593617
try:
594618
await self.watch(scope='@n')
@@ -599,19 +623,10 @@ async def _reconnect_loop(self):
599623
event_handler.on_reconnect()
600624

601625
def _ping(self, timeout):
602-
if self._protocol is None:
603-
raise ConnectionError('no connection')
604-
605-
return self._protocol.write(Proto.REQ_PING, timeout=timeout)
626+
return self._write(Proto.REQ_PING, timeout=timeout)
606627

607628
def _authenticate(self, timeout):
608-
if self._protocol is None:
609-
raise ConnectionError('no connection')
610-
611-
return self._protocol.write(
612-
Proto.REQ_AUTH,
613-
data=self._auth,
614-
timeout=timeout)
629+
return self._write(Proto.REQ_AUTH, data=self._auth, timeout=timeout)
615630

616631
@deprecated(details='Use `set_default_scope` instead')
617632
def use(self, scope):

thingsdb/client/protocol.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -188,7 +188,7 @@ def data_received(self, data: bytes) -> None:
188188
try:
189189
self.package.extract_data_from(self._buffered_data)
190190
except KeyError as e:
191-
logging.error(f'Unsupported package received: {e}')
191+
logging.error(f'unsupported package received: {e}')
192192
except Exception as e:
193193
logging.exception(e)
194194
# empty the byte-array to recover from this error
@@ -203,7 +203,7 @@ def data_received(self, data: bytes) -> None:
203203
except Exception as e:
204204
logging.exception(e)
205205
else:
206-
logging.error(f'Unsupported package type received: {tp}')
206+
logging.error(f'unsupported package type received: {tp}')
207207

208208
self.package = None
209209

thingsdb/version.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
__version__ = '0.6.18'
1+
__version__ = '0.6.19'

0 commit comments

Comments
 (0)