diff --git a/asyncmongo/connection.py b/asyncmongo/connection.py index 804ab13..89efcbe 100644 --- a/asyncmongo/connection.py +++ b/asyncmongo/connection.py @@ -253,4 +253,4 @@ def _parse_response(self, response): if response and response['data'] and response['data'][0].get('err') and response['data'][0].get('code'): callback(response, IntegrityError(response['data'][0]['err'], code=response['data'][0]['code'])) return - callback(response) + callback(response, None) diff --git a/asyncmongo/cursor.py b/asyncmongo/cursor.py index e098544..b81beb9 100644 --- a/asyncmongo/cursor.py +++ b/asyncmongo/cursor.py @@ -17,6 +17,7 @@ import logging from bson.son import SON +from tornado import gen import helpers import message @@ -266,6 +267,7 @@ def find_one(self, spec_or_id, **kwargs): kwargs['limit'] = -1 self.find(spec_or_id, **kwargs) + @gen.engine def find(self, spec=None, fields=None, skip=0, limit=0, timeout=True, snapshot=False, tailable=False, sort=None, max_scan=None, slave_okay=False, @@ -382,40 +384,69 @@ def find(self, spec=None, fields=None, skip=0, limit=0, if self.__debug: logging.debug('QUERY_SPEC: %r' % self.__query_spec()) - connection.send_message( - message.query(self.__query_options(), - self.full_collection_name, - self.__skip, - self.__limit, - self.__query_spec(), - self.__fields), - callback=functools.partial(self._handle_response, orig_callback=callback)) + tp, _ = yield gen.Task(connection.send_message, + message.query(self.__query_options(), + self.full_collection_name, + self.__skip, + self.__limit, + self.__query_spec(), + self.__fields),) + + response, error = tp + if error is not None: + self._handle_response(response, error, callback) + + cursor_id = response['cursor_id'] + retrieved = response['number_returned'] + while cursor_id != 0: + _limit = 0 + if self.__limit: + _limit = self.__limit - retrieved + + if _limit == 0: + try: + connection.send_message( + message.kill_cursors((cursor_id,)), + callback=None) + except Exception, e: + logging.debug('Error killing cursor %s: %s' % + (cursor_id, e), exc_info=True) + connection.close() + + break + + tp, _ = yield gen.Task(connection.send_message, + message.get_more(self.full_collection_name, + _limit, + cursor_id)) + result, error = tp + if error is not None: + callback(None, error=error) + raise StopIteration + + response['data'].extend(result['data']) + + cursor_id = result['cursor_id'] + retrieved += result['number_returned'] + + if self.__limit == -1 and len(response['data']) == 1: + # handle the find_one() call + callback(response['data'][0], error=None) + else: + callback(response['data'], error=None) + except StopIteration: + raise except Exception, e: logging.debug('Error sending query %s' % e) connection.close() raise def _handle_response(self, result, error=None, orig_callback=None): - if result and result.get('cursor_id'): - connection = self.__pool.connection() - try: - connection.send_message( - message.kill_cursors([result['cursor_id']]), - callback=None) - except Exception, e: - logging.debug('Error killing cursor %s: %s' % (result['cursor_id'], e)) - connection.close() - raise - if error: logging.debug('%s %s' % (self.full_collection_name , error)) orig_callback(None, error=error) else: - if self.__limit == -1 and len(result['data']) == 1: - # handle the find_one() call - orig_callback(result['data'][0], error=None) - else: - orig_callback(result['data'], error=None) + orig_callback(result['data'], error=None) def __query_options(self):