140140 "source" : [
141141 " import asyncio\n " ,
142142 " \n " ,
143- " import async_timeout\n " ,
144- " \n " ,
145143 " import redis.asyncio as redis\n " ,
146144 " \n " ,
147145 " STOPWORD = \" STOP\"\n " ,
148146 " \n " ,
149147 " \n " ,
150148 " async def reader(channel: redis.client.PubSub):\n " ,
151149 " while True:\n " ,
152- " try:\n " ,
153- " async with async_timeout.timeout(1):\n " ,
154- " message = await channel.get_message(ignore_subscribe_messages=True)\n " ,
155- " if message is not None:\n " ,
156- " print(f\" (Reader) Message Received: {message}\" )\n " ,
157- " if message[\" data\" ].decode() == STOPWORD:\n " ,
158- " print(\" (Reader) STOP\" )\n " ,
159- " break\n " ,
160- " await asyncio.sleep(0.01)\n " ,
161- " except asyncio.TimeoutError:\n " ,
162- " pass\n " ,
150+ " message = await channel.get_message(ignore_subscribe_messages=True)\n " ,
151+ " if message is not None:\n " ,
152+ " print(f\" (Reader) Message Received: {message}\" )\n " ,
153+ " if message[\" data\" ].decode() == STOPWORD:\n " ,
154+ " print(\" (Reader) STOP\" )\n " ,
155+ " break\n " ,
163156 " \n " ,
164157 " r = redis.from_url(\" redis://localhost\" )\n " ,
165- " pubsub = r.pubsub()\n " ,
166- " await pubsub.subscribe(\" channel:1\" , \" channel:2\" )\n " ,
158+ " async with r.pubsub() as pubsub: \n " ,
159+ " await pubsub.subscribe(\" channel:1\" , \" channel:2\" )\n " ,
167160 " \n " ,
168- " future = asyncio.create_task(reader(pubsub))\n " ,
161+ " future = asyncio.create_task(reader(pubsub))\n " ,
169162 " \n " ,
170- " await r.publish(\" channel:1\" , \" Hello\" )\n " ,
171- " await r.publish(\" channel:2\" , \" World\" )\n " ,
172- " await r.publish(\" channel:1\" , STOPWORD)\n " ,
163+ " await r.publish(\" channel:1\" , \" Hello\" )\n " ,
164+ " await r.publish(\" channel:2\" , \" World\" )\n " ,
165+ " await r.publish(\" channel:1\" , STOPWORD)\n " ,
173166 " \n " ,
174- " await future"
167+ " await future"
175168 ]
176169 },
177170 {
204197 "source" : [
205198 " import asyncio\n " ,
206199 " \n " ,
207- " import async_timeout\n " ,
208- " \n " ,
209200 " import redis.asyncio as redis\n " ,
210201 " \n " ,
211202 " STOPWORD = \" STOP\"\n " ,
212203 " \n " ,
213204 " \n " ,
214205 " async def reader(channel: redis.client.PubSub):\n " ,
215206 " while True:\n " ,
216- " try:\n " ,
217- " async with async_timeout.timeout(1):\n " ,
218- " message = await channel.get_message(ignore_subscribe_messages=True)\n " ,
219- " if message is not None:\n " ,
220- " print(f\" (Reader) Message Received: {message}\" )\n " ,
221- " if message[\" data\" ].decode() == STOPWORD:\n " ,
222- " print(\" (Reader) STOP\" )\n " ,
223- " break\n " ,
224- " await asyncio.sleep(0.01)\n " ,
225- " except asyncio.TimeoutError:\n " ,
226- " pass\n " ,
207+ " message = await channel.get_message(ignore_subscribe_messages=True)\n " ,
208+ " if message is not None:\n " ,
209+ " print(f\" (Reader) Message Received: {message}\" )\n " ,
210+ " if message[\" data\" ].decode() == STOPWORD:\n " ,
211+ " print(\" (Reader) STOP\" )\n " ,
212+ " break\n " ,
227213 " \n " ,
228214 " \n " ,
229215 " r = await redis.from_url(\" redis://localhost\" )\n " ,
230- " pubsub = r.pubsub()\n " ,
231- " await pubsub.psubscribe(\" channel:*\" )\n " ,
216+ " async with r.pubsub() as pubsub: \n " ,
217+ " await pubsub.psubscribe(\" channel:*\" )\n " ,
232218 " \n " ,
233- " future = asyncio.create_task(reader(pubsub))\n " ,
219+ " future = asyncio.create_task(reader(pubsub))\n " ,
234220 " \n " ,
235- " await r.publish(\" channel:1\" , \" Hello\" )\n " ,
236- " await r.publish(\" channel:2\" , \" World\" )\n " ,
237- " await r.publish(\" channel:1\" , STOPWORD)\n " ,
221+ " await r.publish(\" channel:1\" , \" Hello\" )\n " ,
222+ " await r.publish(\" channel:2\" , \" World\" )\n " ,
223+ " await r.publish(\" channel:1\" , STOPWORD)\n " ,
238224 " \n " ,
239- " await future"
225+ " await future"
240226 ]
241227 },
242228 {
298284 },
299285 "nbformat" : 4 ,
300286 "nbformat_minor" : 1
301- }
287+ }
0 commit comments