Skip to content

Commit 65d3160

Browse files
committed
WIP
1 parent 4586abb commit 65d3160

5 files changed

Lines changed: 472 additions & 166 deletions

File tree

matrix_client/api.py

Lines changed: 9 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
# -*- coding: utf-8 -*-
22
# Copyright 2015 OpenMarket Ltd
3+
# Copyright 2017 Adam Beckmeyer
34
#
45
# Licensed under the Apache License, Version 2.0 (the "License");
56
# you may not use this file except in compliance with the License.
@@ -553,22 +554,16 @@ def _send(self, method, path, content=None, query_params={}, headers={},
553554
if headers["Content-Type"] == "application/json" and content is not None:
554555
content = json.dumps(content)
555556

556-
response = None
557-
while True:
558-
response = requests.request(
559-
method, endpoint,
560-
params=query_params,
561-
data=content,
562-
headers=headers,
563-
verify=self.validate_cert
564-
)
565-
566-
if response.status_code == 429:
567-
sleep(response.json()['retry_after_ms'] / 1000)
568-
else:
569-
break
557+
response = requests.request(
558+
method, endpoint,
559+
params=query_params,
560+
data=content,
561+
headers=headers,
562+
verify=self.validate_cert
563+
)
570564

571565
if response.status_code < 200 or response.status_code >= 300:
566+
# Error raised with status_code == 429 should be handled separately
572567
raise MatrixRequestError(
573568
code=response.status_code, content=response.text
574569
)

matrix_client/client.py

Lines changed: 108 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
# -*- coding: utf-8 -*-
22
# Copyright 2015 OpenMarket Ltd
3+
# Copyright 2017 Adam Beckmeyer
34
#
45
# Licensed under the Apache License, Version 2.0 (the "License");
56
# you may not use this file except in compliance with the License.
@@ -12,12 +13,17 @@
1213
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1314
# See the License for the specific language governing permissions and
1415
# limitations under the License.
16+
from gevent import monkey; monkey.patch_all()
17+
1518
from .api import MatrixHttpApi
1619
from .errors import MatrixRequestError, MatrixUnexpectedResponse
1720
from .room import Room
1821
from .user import User
19-
from threading import Thread
20-
from time import sleep
22+
from .queue import RequestQueue
23+
import gevent
24+
import gevent.pool
25+
from gevent.event import AsyncResult
26+
from functools import partial
2127
from uuid import uuid4
2228
import logging
2329
import sys
@@ -59,8 +65,8 @@ def global_callback(incoming_event):
5965
6066
"""
6167

62-
def __init__(self, base_url, token=None, user_id=None,
63-
valid_cert_check=True, sync_filter_limit=20):
68+
def __init__(self, base_url, token=None, user_id=None, valid_cert_check=True,
69+
sync_filter_limit=20, async=False, num_threads=10):
6470
""" Create a new Matrix Client object.
6571
6672
Args:
@@ -73,13 +79,18 @@ def __init__(self, base_url, token=None, user_id=None,
7379
the token) if supplying a token; otherwise, ignored.
7480
valid_cert_check (bool): Check the homeservers
7581
certificate on connections?
82+
async (bool): Run the client in async mode; if `True`, methods
83+
return `AsyncResult`s instead of blocking on api calls.
84+
num_threads (int): Number of greenlets with which to make
85+
matrix requests. Only evaluated if `async`.
7686
7787
Returns:
7888
MatrixClient
7989
8090
Raises:
8191
MatrixRequestError, ValueError
8292
"""
93+
# Set properties that may be overwritten if async
8394
if token is not None and user_id is None:
8495
raise ValueError("must supply user_id along with token")
8596

@@ -96,6 +107,22 @@ def __init__(self, base_url, token=None, user_id=None,
96107
self.sync_thread = None
97108
self.should_listen = False
98109

110+
# Both call methods accept two callbacks. First one is called without
111+
# any arguments. Second is called with output of first callback as an arg
112+
if async:
113+
# _async_call pushses callbacks onto `self.queue` and returns an
114+
# AsyncResult promising the output of the second callback
115+
self._call = self._async_call
116+
self.queue = RequestQueue()
117+
self.thread_pool = gevent.pool.Pool(size=num_threads)
118+
while not self.thread_pool.full():
119+
self.thread_pool.add(gevent.spawn(self.queue.call_forever))
120+
else:
121+
# _sync_call immediately calls both callbacks and blocks until complete
122+
self._call = self._sync_call
123+
self.queue = None
124+
self.thread_pool = None
125+
99126
""" Time to wait before attempting a /sync request after failing."""
100127
self.bad_sync_timeout_limit = 60 * 60
101128
self.rooms = {
@@ -116,9 +143,14 @@ def set_user_id(self, user_id):
116143

117144
def register_as_guest(self):
118145
""" Register a guest account on this HS.
146+
147+
Note: Registration and login methods are always synchronous.
148+
119149
Note: HS must have guest registration enabled.
150+
120151
Returns:
121152
str: Access Token
153+
122154
Raises:
123155
MatrixRequestError
124156
"""
@@ -128,6 +160,8 @@ def register_as_guest(self):
128160
def register_with_password(self, username, password):
129161
""" Register for a new account on this HS.
130162
163+
Note: Registration and login methods are always synchronous.
164+
131165
Args:
132166
username (str): Account username
133167
password (str): Account password
@@ -158,6 +192,8 @@ def _post_registration(self, response):
158192
def login_with_password_no_sync(self, username, password):
159193
""" Login to the homeserver.
160194
195+
Note: Registration and login methods are always synchronous.
196+
161197
Args:
162198
username (str): Account username
163199
password (str): Account password
@@ -182,6 +218,8 @@ def login_with_password_no_sync(self, username, password):
182218
def login_with_password(self, username, password, limit=10):
183219
""" Login to the homeserver.
184220
221+
Note: Registration and login methods are always synchronous.
222+
185223
Args:
186224
username (str): Account username
187225
password (str): Account password
@@ -203,6 +241,8 @@ def login_with_password(self, username, password, limit=10):
203241

204242
def logout(self):
205243
""" Logout from the homeserver.
244+
245+
Note: Registration and login methods are synchronous.
206246
"""
207247
self.stop_listener_thread()
208248
self.api.logout()
@@ -217,12 +257,17 @@ def create_room(self, alias=None, is_public=False, invitees=()):
217257
218258
Returns:
219259
Room
260+
or
261+
AsyncResult(Room)
220262
221263
Raises:
222264
MatrixRequestError
223265
"""
224-
response = self.api.create_room(alias, is_public, invitees)
225-
return self._mkroom(response["room_id"])
266+
out = self._call(
267+
partial(self.api.create_room, alias, is_public, invitees),
268+
self._mkroom
269+
)
270+
return out
226271

227272
def join_room(self, room_id_or_alias):
228273
""" Join a room.
@@ -232,15 +277,17 @@ def join_room(self, room_id_or_alias):
232277
233278
Returns:
234279
Room
280+
or
281+
AsyncResult(Room)
235282
236283
Raises:
237284
MatrixRequestError
238285
"""
239-
response = self.api.join_room(room_id_or_alias)
240-
room_id = (
241-
response["room_id"] if "room_id" in response else room_id_or_alias
286+
out = self._call(
287+
partial(self.api.join_room, room_id_or_alias),
288+
partial(self._mkroom, room_id_or_alias=room_id_or_alias)
242289
)
243-
return self._mkroom(room_id)
290+
return out
244291

245292
def get_rooms(self):
246293
""" Return a dict of {room_id: Room objects} that the user has joined.
@@ -360,7 +407,7 @@ def listen_forever(self, timeout_ms=30000, exception_handler=None):
360407
if e.code >= 500:
361408
logger.warning("Problem occured serverside. Waiting %i seconds",
362409
bad_sync_timeout)
363-
sleep(bad_sync_timeout)
410+
gevent.sleep(bad_sync_timeout)
364411
bad_sync_timeout = min(bad_sync_timeout * 2,
365412
self.bad_sync_timeout_limit)
366413
else:
@@ -375,6 +422,9 @@ def listen_forever(self, timeout_ms=30000, exception_handler=None):
375422
def start_listener_thread(self, timeout_ms=30000, exception_handler=None):
376423
""" Start a listener thread to listen for events in the background.
377424
425+
Note that as of right now this thread is responsible for calling
426+
listener callbacks as well as for syncing with the homeserver.
427+
378428
Args:
379429
timeout (int): How long to poll the Home Server for before
380430
retrying.
@@ -383,12 +433,10 @@ def start_listener_thread(self, timeout_ms=30000, exception_handler=None):
383433
thread.
384434
"""
385435
try:
386-
thread = Thread(target=self.listen_forever,
387-
args=(timeout_ms, exception_handler))
388-
thread.daemon = True
436+
thread = gevent.spawn(self.listen_forever,
437+
timeout_ms, exception_handler)
389438
self.sync_thread = thread
390439
self.should_listen = True
391-
thread.start()
392440
except:
393441
e = sys.exc_info()[0]
394442
logger.error("Error: unable to start thread. %s", str(e))
@@ -413,21 +461,40 @@ def upload(self, content, content_type):
413461
MatrixRequestError: If the upload failed for some reason.
414462
"""
415463
try:
416-
response = self.api.media_upload(content, content_type)
417-
if "content_uri" in response:
418-
return response["content_uri"]
419-
else:
420-
raise MatrixUnexpectedResponse(
421-
"The upload was successful, but content_uri wasn't found."
422-
)
464+
# If not async, exceptions can be handled and logged
465+
return self._call(
466+
partial(self._media_upload, content, content_type),
467+
self._upload
468+
)
469+
except MatrixRequestError as e:
470+
raise MatrixRequestError(
471+
code=e.code,
472+
content="Upload failed: %s" % e
473+
)
474+
475+
def _media_upload(self, content, content_type):
476+
"""Wraps `self.api.media_upload` to allow error handling."""
477+
try:
478+
return self.api.media_upload(content, content_type)
423479
except MatrixRequestError as e:
424480
raise MatrixRequestError(
425481
code=e.code,
426482
content="Upload failed: %s" % e
427483
)
428484

429-
def _mkroom(self, room_id):
430-
self.rooms[room_id] = Room(self, room_id)
485+
def _upload(self, response):
486+
"""Helper function to be used as callback by `self.upload`"""
487+
if "content_uri" in response:
488+
return response["content_uri"]
489+
else:
490+
raise MatrixUnexpectedResponse(
491+
"The upload was successful, but content_uri wasn't found."
492+
)
493+
494+
def _mkroom(self, response=None, room_id_or_alias=None):
495+
if response and "room_id" in response:
496+
room_id_or_alias = response["room_id"]
497+
self.rooms[room_id_or_alias] = Room(self, room_id)
431498
return self.rooms[room_id]
432499

433500
def _process_state_event(self, state_event, current_room):
@@ -447,11 +514,12 @@ def _process_state_event(self, state_event, current_room):
447514
listener['event_type'] is None or
448515
listener['event_type'] == state_event['type']
449516
):
450-
listener['callback'](state_event)
517+
gevent.spawn(listener['callback'], state_event)
451518

452519
def _sync(self, timeout_ms=30000):
453520
# TODO: Deal with presence
454521
# TODO: Deal with left rooms
522+
# TODO: Use gevent pool with queue to call listeners
455523
response = self.api.sync(self.sync_token, timeout_ms, filter=self.sync_filter)
456524
self.sync_token = response["next_batch"]
457525

@@ -467,7 +535,7 @@ def _sync(self, timeout_ms=30000):
467535

468536
for room_id, sync_room in response['rooms']['join'].items():
469537
if room_id not in self.rooms:
470-
self._mkroom(room_id)
538+
self._mkroom(room_id_or_alias=room_id)
471539
room = self.rooms[room_id]
472540
room.prev_batch = sync_room["timeline"]["prev_batch"]
473541

@@ -507,8 +575,7 @@ def get_user(self, user_id):
507575
Args:
508576
user_id (str): The matrix user id of a user.
509577
"""
510-
511-
return User(self.api, user_id)
578+
return User(self.api, user_id, self._call)
512579

513580
def remove_room_alias(self, room_alias):
514581
"""Remove mapping of an alias
@@ -524,3 +591,16 @@ def remove_room_alias(self, room_alias):
524591
return True
525592
except MatrixRequestError:
526593
return False
594+
595+
def _async_call(self, first_callback, final_callback):
596+
"""Call `final_callback` on result of `first_callback` asynchronously"""
597+
first_result = AsyncResult()
598+
self.queue.put((first_callback, first_result))
599+
final_result = AsyncResult()
600+
# lambda function will wait for first_result to be fulfilled
601+
self.queue.put(lambda: final_callback(first_result.get()), final_result)
602+
return final_result
603+
604+
def _sync_call(self, first_callback, final_callback):
605+
"""Call `final_callback` on result of `first_callback` synchronously"""
606+
return final_callback(first_callback())

0 commit comments

Comments
 (0)