diff --git a/etcd3/client.py b/etcd3/client.py index 5c7c7d20..0b25979d 100644 --- a/etcd3/client.py +++ b/etcd3/client.py @@ -1,6 +1,7 @@ import functools import inspect import threading +import time import grpc import grpc._channel @@ -24,6 +25,42 @@ } +# this is used for gRPC proxy compatibility so that we do not +# mark as finished writing until we've received a response +# For more details see: https://github.com/davissp14/etcdv3-ruby/pull/117 +class BlockingRequest: + def __init__(self, request): + self.request = request + self.proceed = False + self.blocked = False + self.returned = False + + def read_done(self): + self.proceed = True + + def is_blocked(self): + return self.blocked + + def __iter__(self): + return self + + def __next__(self): + if not self.returned: + self.returned = True + return self.request + self.blocked = True + try: + raise StopIteration + except StopIteration: + raise StopIteration + finally: + while not self.proceed: + time.sleep(0.001) + self.blocked = False + + next = __next__ # Python 2 compatibility + + def _translate_exception(exc): code = exc.code() exception = _EXCEPTIONS_BY_CODE.get(code) @@ -898,13 +935,25 @@ def revoke_lease(self, lease_id): @_handle_errors def refresh_lease(self, lease_id): - keep_alive_request = etcdrpc.LeaseKeepAliveRequest(ID=lease_id) - request_stream = [keep_alive_request] - for response in self.leasestub.LeaseKeepAlive( - iter(request_stream), - self.timeout, - credentials=self.call_credentials, - metadata=self.metadata): + request_stream = BlockingRequest( + etcdrpc.LeaseKeepAliveRequest(ID=lease_id)) + responses = [] + try: + for response in self.leasestub.LeaseKeepAlive( + request_stream, + self.timeout, + credentials=self.call_credentials, + metadata=self.metadata): + responses.append(response) + break + except BaseException: + raise + finally: + request_stream.read_done() + while request_stream.is_blocked(): + time.sleep(0.001) + + for response in responses: yield response @_handle_errors diff --git a/setup.py b/setup.py index 110578db..5f546f2f 100644 --- a/setup.py +++ b/setup.py @@ -25,7 +25,7 @@ def load_reqs(filename): setup( name='etcd3', - version='0.10.0', + version='0.10.1', description="Python client for the etcd3 API", long_description=readme + '\n\n' + history, author="Louis Taylor", diff --git a/test-requirements.txt b/test-requirements.txt index c2e1b375..65221cf5 100644 --- a/test-requirements.txt +++ b/test-requirements.txt @@ -1,16 +1,16 @@ PyYAML==5.1 Sphinx==1.8.2 bumpversion==0.5.3 -coverage==4.5.1 +coverage==4.5.4 flake8-import-order==0.18 -flake8==3.6.0 +flake8==3.7.8 grpcio-tools>=1.2.0 -hypothesis==3.82.1 +hypothesis==4.36.2 pip==18.1 pytest==4.0.1 wheel==0.31.1 -pycodestyle==2.4.0 +pycodestyle==2.5.0 tox==3.5.3 -flake8-docstrings==1.3.0 +flake8-docstrings==1.4.0 mock==2.0.0 pifpaf>=0.27.1