diff --git a/.gitignore b/.gitignore index 3129a9a..ca44206 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,5 @@ *.pyc +*.pyo build dist +tags diff --git a/asyncmongo/backends/tornado_backend.py b/asyncmongo/backends/tornado_backend.py index 91af0df..0c7d1a3 100644 --- a/asyncmongo/backends/tornado_backend.py +++ b/asyncmongo/backends/tornado_backend.py @@ -28,6 +28,13 @@ def __init__(self, socket, **kwargs): """ self.__stream = tornado.iostream.IOStream(socket, **kwargs) + @property + def io_loop(self): + return self.__stream.io_loop + + def connect(self, address, callback=None): + self.__stream.connect(address, callback) + def write(self, data): self.__stream.write(data) diff --git a/asyncmongo/connection.py b/asyncmongo/connection.py index 500e57d..b7a97ed 100644 --- a/asyncmongo/connection.py +++ b/asyncmongo/connection.py @@ -19,11 +19,15 @@ import struct import logging from types import NoneType +import time from errors import ProgrammingError, IntegrityError, InterfaceError import helpers import asyncjobs +ASYNC_BACKEND_TORNADO = 'tornado' +ASYNC_BACKEND_GLIB2 = 'glib2' +ASYNC_BACKEND_GLIB3 = 'glib3' class Connection(object): """ @@ -35,6 +39,10 @@ class Connection(object): - `autoreconnect` (optional): auto reconnect on interface errors - `rs`: replica set name (required when replica sets are used) - `seed`: seed list to connect to a replica set (required when replica sets are used) + - `connect_timeout`: timeout for initial connection to mongodb, float data, in seconds + - `request_timeout`: timeout for entire request to mongodb, float data, in seconds + - `life_time`: life time for connection to mongodb, float data, in + seconds, 0 or None for unlimited - `secondary_only`: (optional, only useful for replica set connections) if true, connect to a secondary member only - `**kwargs`: passed to `backends.AsyncBackend.register_stream` @@ -47,9 +55,12 @@ def __init__(self, dbpass=None, autoreconnect=True, pool=None, - backend="tornado", + backend=ASYNC_BACKEND_TORNADO, rs=None, seed=None, + connect_timeout=20.0, + request_timeout=20.0, + life_time=60.0, secondary_only=False, **kwargs): assert isinstance(autoreconnect, bool) @@ -68,6 +79,9 @@ def __init__(self, assert isinstance(port, int) assert seed is None + assert connect_timeout > 0 + assert isinstance(request_timeout, (float, int, NoneType)) + self._host = host self._port = port self.__rs = rs @@ -83,7 +97,13 @@ def __init__(self, self.__kwargs = kwargs self.__backend = self.__load_backend(backend) self.__job_queue = [] + self.__backend_class = backend self.usage_count = 0 + self.__request_timeout = request_timeout + self.__min_timeout = min(connect_timeout, request_timeout) + self.__life_time = life_time + self.__timeout = None + self.__start_time = time.time() self.__connect() def __load_backend(self, name): @@ -106,13 +126,47 @@ def _socket_connect(self): """create a socket, connect, register a stream with the async backend""" self.usage_count = 0 try: + self.__start_time = time.time() + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0) - s.connect((self._host, self._port)) - self.__stream = self.__backend.register_stream(s, **self.__kwargs) + + if ASYNC_BACKEND_TORNADO == self.__backend_class: + self.__stream = self.__backend.register_stream(s, **self.__kwargs) + if self.__min_timeout: + self.__timeout = self.__stream.io_loop.add_timeout( + self.__start_time + self.__min_timeout, + self._on_timeout) + self.__stream.connect((self._host, self._port), self._on_connect) + else: + s.connect((self._host, self._port)) + self.__stream = self.__backend.register_stream(s, **self.__kwargs) + self.__stream.set_close_callback(self._socket_close) self.__alive = True except socket.error, error: raise InterfaceError(error) + + def _release_timeout(self): + if self.__timeout is not None: + self.__stream.io_loop.remove_timeout(self.__timeout) + self.__timeout = None + + def _on_timeout(self): + self.__timeout = None + self.close() + + def _on_connect(self): + self._release_timeout() + + if self.__request_timeout: + self.__timeout = self.__stream.io_loop.add_timeout( + self.__start_time + self.__request_timeout, + self._on_timeout) + + if self.__life_time: + self.__stream.io_loop.add_timeout( + self.__start_time + self.__life_time, + self._on_timeout) def _socket_close(self): """cleanup after the socket is closed by the other end""" @@ -151,6 +205,11 @@ def send_message(self, message, callback): self._put_job(asyncjobs.AsyncMessage(self, message, callback), 0) self._next_job() + if self.__request_timeout: + self.__timeout = self.__stream.io_loop.add_timeout( + time.time() + self.__request_timeout, + self._on_timeout) + def _put_job(self, job, pos=None): if pos is None: pos = len(self.__job_queue) @@ -186,6 +245,8 @@ def _send_message(self, message, callback): # return self.__request_id def _parse_header(self, header): + self._release_timeout() + # return self.__receive_data_on_socket(length - 16, sock) length = int(struct.unpack("