diff --git a/.gitignore b/.gitignore index 3129a9a..ca44206 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,5 @@ *.pyc +*.pyo build dist +tags diff --git a/asyncmongo/backends/tornado_backend.py b/asyncmongo/backends/tornado_backend.py index 91af0df..0c7d1a3 100644 --- a/asyncmongo/backends/tornado_backend.py +++ b/asyncmongo/backends/tornado_backend.py @@ -28,6 +28,13 @@ def __init__(self, socket, **kwargs): """ self.__stream = tornado.iostream.IOStream(socket, **kwargs) + @property + def io_loop(self): + return self.__stream.io_loop + + def connect(self, address, callback=None): + self.__stream.connect(address, callback) + def write(self, data): self.__stream.write(data) diff --git a/asyncmongo/connection.py b/asyncmongo/connection.py index d1d8abe..b184379 100644 --- a/asyncmongo/connection.py +++ b/asyncmongo/connection.py @@ -26,12 +26,17 @@ import socket import struct import logging +import time from bson import SON from errors import ProgrammingError, IntegrityError, InterfaceError, AuthenticationError import message import helpers +ASYNC_BACKEND_TORNADO = 'tornado' +ASYNC_BACKEND_GLIB2 = 'glib2' +ASYNC_BACKEND_GLIB3 = 'glib3' + class Connection(object): """ :Parameters: @@ -43,8 +48,9 @@ class Connection(object): - `**kwargs`: passed to `backends.AsyncBackend.register_stream` """ - def __init__(self, host, port, dbuser=None, dbpass=None, autoreconnect=True, pool=None, - backend="tornado", **kwargs): + def __init__(self, host, port, connect_timeout=20, request_timeout=20, + dbuser=None, dbpass=None, autoreconnect=True, pool=None, + backend=ASYNC_BACKEND_TORNADO, **kwargs): assert isinstance(host, (str, unicode)) assert isinstance(port, int) assert isinstance(autoreconnect, bool) @@ -66,7 +72,12 @@ def __init__(self, host, port, dbuser=None, dbpass=None, autoreconnect=True, poo self.__deferred_callback = None self.__kwargs = kwargs self.__backend = self.__load_backend(backend) + self.__backend_class = backend self.usage_count = 0 + self.__request_timeout = request_timeout + self.__min_timeout = min(connect_timeout, request_timeout) + self.__timeout = None + self.__start_time = time.time() self.__connect() def __load_backend(self, name): @@ -77,9 +88,21 @@ def __load_backend(self, name): def __connect(self): self.usage_count = 0 try: + self.__start_time = time.time() + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0) - s.connect((self.__host, self.__port)) - self.__stream = self.__backend.register_stream(s, **self.__kwargs) + + if ASYNC_BACKEND_TORNADO == self.__backend_class: + self.__stream = self.__backend.register_stream(s, **self.__kwargs) + if self.__min_timeout: + self.__timeout = self.__stream.io_loop.add_timeout( + self.__start_time + self.__min_timeout, + self._on_timeout) + self.__stream.connect((self.__host, self.__port), self._on_connect) + else: + s.connect((self.__host, self.__port)) + self.__stream = self.__backend.register_stream(s, **self.__kwargs) + self.__stream.set_close_callback(self._socket_close) self.__alive = True except socket.error, error: @@ -87,6 +110,21 @@ def __connect(self): if self.__dbuser and self.__dbpass: self.__authenticate = True + + def _on_timeout(self): + self.__timeout = None + self.close() + + def _on_connect(self): + if self.__timeout is not None: + #self.__timeout.callback = None + self.__stream.io_loop.remove_timeout(self.__timeout) + self.__timeout = None + + if self.__request_timeout: + self.__timeout = self.__stream.io_loop.add_timeout( + self.__start_time + self.__request_timeout, + self._on_timeout) def _socket_close(self): """cleanup after the socket is closed by the other end""" @@ -162,6 +200,9 @@ def _parse_header(self, header): raise def _parse_response(self, response): + if self.__callback is None: + return + callback = self.__callback request_id = self.__request_id self.__request_id = None