From 3613b176eab2132f0d89b5173befb11e36dec802 Mon Sep 17 00:00:00 2001 From: bufferx Date: Tue, 16 Apr 2013 10:51:55 +0800 Subject: [PATCH 1/6] Non-Blocking Connection and Timeout Support --- .gitignore | 2 ++ asyncmongo/backends/tornado_backend.py | 7 ++++ asyncmongo/connection.py | 49 ++++++++++++++++++++++++-- 3 files changed, 55 insertions(+), 3 deletions(-) diff --git a/.gitignore b/.gitignore index 3129a9a..ca44206 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,5 @@ *.pyc +*.pyo build dist +tags diff --git a/asyncmongo/backends/tornado_backend.py b/asyncmongo/backends/tornado_backend.py index 91af0df..0c7d1a3 100644 --- a/asyncmongo/backends/tornado_backend.py +++ b/asyncmongo/backends/tornado_backend.py @@ -28,6 +28,13 @@ def __init__(self, socket, **kwargs): """ self.__stream = tornado.iostream.IOStream(socket, **kwargs) + @property + def io_loop(self): + return self.__stream.io_loop + + def connect(self, address, callback=None): + self.__stream.connect(address, callback) + def write(self, data): self.__stream.write(data) diff --git a/asyncmongo/connection.py b/asyncmongo/connection.py index b0774d6..2d5bb69 100644 --- a/asyncmongo/connection.py +++ b/asyncmongo/connection.py @@ -19,11 +19,15 @@ import struct import logging from types import NoneType +import time from errors import ProgrammingError, IntegrityError, InterfaceError import helpers import asyncjobs +ASYNC_BACKEND_TORNADO = 'tornado' +ASYNC_BACKEND_GLIB2 = 'glib2' +ASYNC_BACKEND_GLIB3 = 'glib3' class Connection(object): """ @@ -35,6 +39,8 @@ class Connection(object): - `autoreconnect` (optional): auto reconnect on interface errors - `rs`: replica set name (required when replica sets are used) - `seed`: seed list to connect to a replica set (required when replica sets are used) + - `connect_timeout`: timeout for initial connection to mongodb, float data, in seconds + - `request_timeout`: timeout for entire request to mongodb, float data, in seconds - `**kwargs`: passed to `backends.AsyncBackend.register_stream` """ @@ -45,9 +51,11 @@ def __init__(self, dbpass=None, autoreconnect=True, pool=None, - backend="tornado", + backend=ASYNC_BACKEND_TORNADO, rs=None, seed=None, + connect_timeout=20.0, + request_timeout=20.0, **kwargs): assert isinstance(autoreconnect, bool) assert isinstance(dbuser, (str, unicode, NoneType)) @@ -78,7 +86,12 @@ def __init__(self, self.__kwargs = kwargs self.__backend = self.__load_backend(backend) self.__job_queue = [] + self.__backend_class = backend self.usage_count = 0 + self.__request_timeout = request_timeout + self.__min_timeout = min(connect_timeout, request_timeout) + self.__timeout = None + self.__start_time = time.time() self.__connect() def __load_backend(self, name): @@ -101,13 +114,40 @@ def _socket_connect(self): """create a socket, connect, register a stream with the async backend""" self.usage_count = 0 try: + self.__start_time = time.time() + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0) - s.connect((self._host, self._port)) - self.__stream = self.__backend.register_stream(s, **self.__kwargs) + + if ASYNC_BACKEND_TORNADO == self.__backend_class: + self.__stream = self.__backend.register_stream(s, **self.__kwargs) + if self.__min_timeout: + self.__timeout = self.__stream.io_loop.add_timeout( + self.__start_time + self.__min_timeout, + self._on_timeout) + self.__stream.connect((self._host, self._port), self._on_connect) + else: + s.connect((self._host, self._port)) + self.__stream = self.__backend.register_stream(s, **self.__kwargs) + self.__stream.set_close_callback(self._socket_close) self.__alive = True except socket.error, error: raise InterfaceError(error) + + def _on_timeout(self): + self.__timeout = None + self.close() + + def _on_connect(self): + if self.__timeout is not None: + #self.__timeout.callback = None + self.__stream.io_loop.remove_timeout(self.__timeout) + self.__timeout = None + + if self.__request_timeout: + self.__timeout = self.__stream.io_loop.add_timeout( + self.__start_time + self.__request_timeout, + self._on_timeout) def _socket_close(self): """cleanup after the socket is closed by the other end""" @@ -196,6 +236,9 @@ def _parse_header(self, header): raise def _parse_response(self, response): + if self.__callback is None: + return + callback = self.__callback request_id = self.__request_id self.__request_id = None From 630e434b8641594fdb17a7cb0948be774408408b Mon Sep 17 00:00:00 2001 From: bufferx Date: Tue, 16 Apr 2013 15:38:09 +0800 Subject: [PATCH 2/6] bugfix: the request-timeout was not removed timely --- asyncmongo/connection.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/asyncmongo/connection.py b/asyncmongo/connection.py index 2d5bb69..35003b8 100644 --- a/asyncmongo/connection.py +++ b/asyncmongo/connection.py @@ -140,7 +140,6 @@ def _on_timeout(self): def _on_connect(self): if self.__timeout is not None: - #self.__timeout.callback = None self.__stream.io_loop.remove_timeout(self.__timeout) self.__timeout = None @@ -221,6 +220,10 @@ def _send_message(self, message, callback): # return self.__request_id def _parse_header(self, header): + if self.__timeout is not None: + self.__stream.io_loop.remove_timeout(self.__timeout) + self.__timeout = None + # return self.__receive_data_on_socket(length - 16, sock) length = int(struct.unpack(" Date: Fri, 19 Jul 2013 14:53:57 +0800 Subject: [PATCH 3/6] add life-time for long connection --- asyncmongo/connection.py | 25 +++++++++++++++++++------ 1 file changed, 19 insertions(+), 6 deletions(-) diff --git a/asyncmongo/connection.py b/asyncmongo/connection.py index dcc42d9..a165eb2 100644 --- a/asyncmongo/connection.py +++ b/asyncmongo/connection.py @@ -41,6 +41,8 @@ class Connection(object): - `seed`: seed list to connect to a replica set (required when replica sets are used) - `connect_timeout`: timeout for initial connection to mongodb, float data, in seconds - `request_timeout`: timeout for entire request to mongodb, float data, in seconds + - `life_time`: life time for connection to mongodb, float data, in + seconds, 0 or None for unlimited - `secondary_only`: (optional, only useful for replica set connections) if true, connect to a secondary member only - `**kwargs`: passed to `backends.AsyncBackend.register_stream` @@ -58,6 +60,7 @@ def __init__(self, seed=None, connect_timeout=20.0, request_timeout=20.0, + life_time=60.0, secondary_only=False, **kwargs): assert isinstance(autoreconnect, bool) @@ -76,6 +79,9 @@ def __init__(self, assert isinstance(port, int) assert seed is None + assert connect_timeout > 0 + assert request_timeout > 0 + self._host = host self._port = port self.__rs = rs @@ -95,6 +101,7 @@ def __init__(self, self.usage_count = 0 self.__request_timeout = request_timeout self.__min_timeout = min(connect_timeout, request_timeout) + self.__life_time = life_time self.__timeout = None self.__start_time = time.time() self.__connect() @@ -139,19 +146,27 @@ def _socket_connect(self): except socket.error, error: raise InterfaceError(error) + def _release_timeout(self): + if self.__timeout is not None: + self.__stream.io_loop.remove_timeout(self.__timeout) + self.__timeout = None + def _on_timeout(self): self.__timeout = None self.close() def _on_connect(self): - if self.__timeout is not None: - self.__stream.io_loop.remove_timeout(self.__timeout) - self.__timeout = None + self._release_timeout() if self.__request_timeout: self.__timeout = self.__stream.io_loop.add_timeout( self.__start_time + self.__request_timeout, self._on_timeout) + + if self.__life_time: + self.__stream.io_loop.add_timeout( + self.__start_time + self.__life_time, + self._on_timeout) def _socket_close(self): """cleanup after the socket is closed by the other end""" @@ -225,9 +240,7 @@ def _send_message(self, message, callback): # return self.__request_id def _parse_header(self, header): - if self.__timeout is not None: - self.__stream.io_loop.remove_timeout(self.__timeout) - self.__timeout = None + self._release_timeout() # return self.__receive_data_on_socket(length - 16, sock) length = int(struct.unpack(" Date: Fri, 19 Jul 2013 14:56:41 +0800 Subject: [PATCH 4/6] set start-time when connect --- asyncmongo/connection.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/asyncmongo/connection.py b/asyncmongo/connection.py index a165eb2..8ac2a5f 100644 --- a/asyncmongo/connection.py +++ b/asyncmongo/connection.py @@ -103,7 +103,6 @@ def __init__(self, self.__min_timeout = min(connect_timeout, request_timeout) self.__life_time = life_time self.__timeout = None - self.__start_time = time.time() self.__connect() def __load_backend(self, name): @@ -112,6 +111,8 @@ def __load_backend(self, name): return mod.AsyncBackend() def __connect(self): + self.__start_time = time.time() + if self.__dbuser and self.__dbpass: self._put_job(asyncjobs.AuthorizeJob(self, self.__dbuser, self.__dbpass, self.__pool)) From f1a25aaf7f84e688e77f05b2a1c00c2b18d2b148 Mon Sep 17 00:00:00 2001 From: "bufferx (Zhang ZY)" Date: Fri, 19 Jul 2013 15:13:47 +0800 Subject: [PATCH 5/6] set request_timeout to ioloop when send_message --- asyncmongo/connection.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/asyncmongo/connection.py b/asyncmongo/connection.py index 8ac2a5f..0f67502 100644 --- a/asyncmongo/connection.py +++ b/asyncmongo/connection.py @@ -80,7 +80,7 @@ def __init__(self, assert seed is None assert connect_timeout > 0 - assert request_timeout > 0 + assert isinstance(request_timeout, (float, int, NoneType) self._host = host self._port = port @@ -103,6 +103,7 @@ def __init__(self, self.__min_timeout = min(connect_timeout, request_timeout) self.__life_time = life_time self.__timeout = None + self.__start_time = time.time() self.__connect() def __load_backend(self, name): @@ -111,8 +112,6 @@ def __load_backend(self, name): return mod.AsyncBackend() def __connect(self): - self.__start_time = time.time() - if self.__dbuser and self.__dbpass: self._put_job(asyncjobs.AuthorizeJob(self, self.__dbuser, self.__dbpass, self.__pool)) @@ -206,6 +205,11 @@ def send_message(self, message, callback): self._put_job(asyncjobs.AsyncMessage(self, message, callback), 0) self._next_job() + if self.__request_timeout: + self.__timeout = self.__stream.io_loop.add_timeout( + time.time() + self.__request_timeout, + self._on_timeout) + def _put_job(self, job, pos=None): if pos is None: pos = len(self.__job_queue) From a3cdab97c6f7fd75cae375a6f9131b6c3d9a613b Mon Sep 17 00:00:00 2001 From: "bufferx (Zhang ZY)" Date: Fri, 19 Jul 2013 16:00:55 +0800 Subject: [PATCH 6/6] bugfix: invalid syntax @line:85 --- asyncmongo/connection.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/asyncmongo/connection.py b/asyncmongo/connection.py index 0f67502..b7a97ed 100644 --- a/asyncmongo/connection.py +++ b/asyncmongo/connection.py @@ -80,7 +80,7 @@ def __init__(self, assert seed is None assert connect_timeout > 0 - assert isinstance(request_timeout, (float, int, NoneType) + assert isinstance(request_timeout, (float, int, NoneType)) self._host = host self._port = port