-
Notifications
You must be signed in to change notification settings - Fork 53
Expand file tree
/
Copy pathCH_04_05.py
More file actions
80 lines (56 loc) · 1.98 KB
/
CH_04_05.py
File metadata and controls
80 lines (56 loc) · 1.98 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
import json
import asyncio
from pprint import pp
from quart import Quart, websocket, render_template
from quart import g
from quart_redis import RedisHandler, get_redis
app = Quart(__name__)
app.config["REDIS_URI"] = "redis://localhost/"
redis_handler = RedisHandler(app)
connections = set()
import json
import asyncio
import click
import aioredis
class Chat:
def __init__(self, room_name):
self.room_name = room_name
async def start_db(self):
self.redis = await aioredis.create_redis_pool("redis://localhost")
await self.redis.set("room_name", self.room_name)
async def save_message(self, message_json):
room_name = await self.redis.get("room_name")
await self.redis.rpush(room_name, message_json)
async def clear_db(self):
await self.redis.flushall()
async def get_all_messages(self):
room_name = await self.redis.get("room_name")
message_jsons = await self.redis.lrange(room_name, 0, -1, encoding="utf-8")
messages = []
for message in message_jsons:
message_dictionary = json.loads(message)
messages.append(message_dictionary)
return messages
async def get_name(self):
return await self.redis.get("room_name", encoding="utf-8")
chat_db = Chat("chat_room")
@app.before_serving
async def init_db():
await chat_db.start_db()
@app.websocket("/ws")
async def ws():
connections.add(websocket._get_current_object())
try:
while True:
message = await websocket.receive()
# save the message
send_coroutines = [connection.send(message) for connection in connections]
await asyncio.gather(*send_coroutines)
finally:
connections.remove(websocket._get_current_object())
@app.route("/")
async def chat():
redis = get_redis()
messages = [] # replace the empty list with your code
return await render_template("chat_redis.html", messages=messages)
app.run(use_reloader=True, port=3000)