-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathmessage_service.py
More file actions
103 lines (91 loc) · 3.81 KB
/
message_service.py
File metadata and controls
103 lines (91 loc) · 3.81 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
import asyncio
import json
from prompt_toolkit.application import get_app
class MessageService:
"""
Service to exchange data with server using websocket.
"""
def __init__(self, app_state, websocket):
self.app_state = app_state
self.send_queue = asyncio.Queue()
self.websocket = websocket
def prepare_send_request(self, message) -> bytes:
"""
Add credentials to message, serialize to json and encode.
:param message: message to send
:type message: dict
:return: bytes of encoded message
"""
message_data = {"username": self.app_state.username,
"password": self.app_state.password,
"filename": self.app_state.current_filename,
**message}
if self.app_state.current_file_owner:
message_data["owner"] = self.app_state.current_file_owner
if self.app_state.current_file_id:
message_data["file_id"] = self.app_state.current_file_id
return json.dumps(message_data).encode("utf-8")
async def send_request(self, message) -> None:
"""
Immediately send a request to websocket.
:param message: message to send
:type message: dict
"""
await self.websocket.send(self.prepare_send_request(message))
def put_message(self, message) -> None:
"""
Put message to send queue
:param message: message to send
:type message: bytes
"""
self.send_queue.put_nowait(message)
async def get_response(self) -> dict:
"""
Wait for closest message on websocket, deserialize and return it.
:return: message object as dict
"""
response = await self.websocket.recv()
return json.loads(response.decode("utf-8"))
async def receive_worker(self, notify, doc_editor) -> None:
"""
Waits for messages on websocket and apply updates to internal
application state.
:param notify: application notification function
:param doc_editor: document editor
:type notify: functions
:type doc_editor: DocumentEditor
"""
try:
async for message in self.websocket:
packet = json.loads(message.decode("utf-8"))
if packet["type"] == "patch" and packet["content"] not in \
doc_editor.patch_set:
doc_editor.update_text(packet["content"])
doc_editor.patch_set.append(packet["content"])
if packet["type"] == "save_file_response":
self.app_state.is_saving = False
if packet["success"]:
notify("File save",
"File has been saved on server.")
else:
notify("File save",
"Error saving a file :(")
get_app().invalidate()
if packet["type"] == "file_share_response":
if packet["success"]:
notify("File share", "File has been shared.")
else:
notify("File share", "Error sharing a file :( Try "
"another username?")
get_app().invalidate()
finally:
return
async def send_worker(self) -> None:
"""
Sends messages from send_queue to websocket.
"""
while True:
next_message = await self.send_queue.get()
await self.websocket.send(next_message)
# Notify the queue that the item has been processed.
self.send_queue.task_done()