1+ # MicroPython asyncio module, for use with webassembly port
2+ # MIT license; Copyright (c) 2019-2024 Damien P. George
3+
4+ from time import ticks_ms as ticks , ticks_diff , ticks_add
5+ import sys , js , jsffi
6+
7+ # Import TaskQueue and Task from built-in C code.
8+ from _asyncio import TaskQueue , Task
9+
10+
11+ ################################################################################
12+ # Exceptions
13+
14+
15+ class CancelledError (BaseException ):
16+ pass
17+
18+
19+ class TimeoutError (Exception ):
20+ pass
21+
22+
23+ # Used when calling Loop.call_exception_handler.
24+ _exc_context = {"message" : "Task exception wasn't retrieved" , "exception" : None , "future" : None }
25+
26+
27+ ################################################################################
28+ # Sleep functions
29+
30+
31+ # "Yield" once, then raise StopIteration
32+ class SingletonGenerator :
33+ def __init__ (self ):
34+ self .state = None
35+ self .exc = StopIteration ()
36+
37+ def __iter__ (self ):
38+ return self
39+
40+ def __next__ (self ):
41+ if self .state is not None :
42+ _task_queue .push (cur_task , self .state )
43+ self .state = None
44+ return None
45+ else :
46+ self .exc .__traceback__ = None
47+ raise self .exc
48+
49+
50+ # Pause task execution for the given time (integer in milliseconds, MicroPython extension)
51+ # Use a SingletonGenerator to do it without allocating on the heap
52+ def sleep_ms (t , sgen = SingletonGenerator ()):
53+ assert sgen .state is None
54+ sgen .state = ticks_add (ticks (), max (0 , t ))
55+ return sgen
56+
57+
58+ # Pause task execution for the given time (in seconds)
59+ def sleep (t ):
60+ return sleep_ms (int (t * 1000 ))
61+
62+
63+ ################################################################################
64+ # Main run loop
65+
66+ asyncio_timer = None
67+
68+
69+ class TopLevelCoro :
70+ @staticmethod
71+ def set (resolve , reject ):
72+ TopLevelCoro .resolve = resolve
73+ TopLevelCoro .reject = reject
74+
75+ @staticmethod
76+ def send (value ):
77+ TopLevelCoro .resolve ()
78+
79+
80+ class ThenableEvent :
81+ def __init__ (self , thenable ):
82+ self .waiting = None # Task waiting on completion of this thenable
83+ thenable .then (self .set , self .cancel )
84+
85+ def set (self , value = None ):
86+ # Thenable/Promise is fulfilled, set result and schedule any waiting task.
87+ self .result = value
88+ if self .waiting :
89+ _task_queue .push (self .waiting )
90+ self .waiting = None
91+
92+ def cancel (self , value = None ):
93+ # Thenable/Promise is rejected, set error and schedule any waiting task.
94+ self .error = jsffi .JsException (
95+ value , getattr (value , "name" , None ), getattr (value , "message" , None )
96+ )
97+ if self .waiting :
98+ _task_queue .push (self .waiting )
99+ self .waiting = None
100+
101+ def remove (self , task ):
102+ self .waiting = None
103+
104+ # async
105+ def wait (self ):
106+ # Set the calling task as the task waiting on this thenable.
107+ self .waiting = cur_task
108+ # Set calling task's data to this object so it can be removed if needed.
109+ cur_task .data = self
110+ # Wait for the thenable to fulfill.
111+ yield
112+ # Raise the error, or return the result, of the thenable.
113+ if hasattr (self , "error" ):
114+ raise self .error
115+ return self .result
116+
117+
118+ # Ensure the awaitable is a task
119+ def _promote_to_task (aw ):
120+ return aw if isinstance (aw , Task ) else create_task (aw )
121+
122+
123+ def _schedule_run_iter (dt ):
124+ global asyncio_timer
125+ if asyncio_timer is not None :
126+ js .clearTimeout (asyncio_timer )
127+ asyncio_timer = js .setTimeout (_run_iter , dt )
128+
129+
130+ def _run_iter ():
131+ global cur_task
132+ excs_all = (CancelledError , Exception ) # To prevent heap allocation in loop
133+ excs_stop = (CancelledError , StopIteration ) # To prevent heap allocation in loop
134+ while True :
135+ # Wait until the head of _task_queue is ready to run
136+ t = _task_queue .peek ()
137+ if t :
138+ # A task waiting on _task_queue; "ph_key" is time to schedule task at
139+ dt = max (0 , ticks_diff (t .ph_key , ticks ()))
140+ else :
141+ # No tasks can be woken so finished running
142+ cur_task = _top_level_task
143+ return
144+
145+ if dt > 0 :
146+ # schedule to call again later
147+ cur_task = _top_level_task
148+ _schedule_run_iter (dt )
149+ return
150+
151+ # Get next task to run and continue it
152+ t = _task_queue .pop ()
153+ cur_task = t
154+ try :
155+ # Continue running the coroutine, it's responsible for rescheduling itself
156+ exc = t .data
157+ if not exc :
158+ t .coro .send (None )
159+ else :
160+ # If the task is finished and on the run queue and gets here, then it
161+ # had an exception and was not await'ed on. Throwing into it now will
162+ # raise StopIteration and the code below will catch this and run the
163+ # call_exception_handler function.
164+ t .data = None
165+ t .coro .throw (exc )
166+ except excs_all as er :
167+ # Check the task is not on any event queue
168+ assert t .data is None
169+ # This task is done.
170+ if t .state :
171+ # Task was running but is now finished.
172+ waiting = False
173+ if t .state is True :
174+ # "None" indicates that the task is complete and not await'ed on (yet).
175+ t .state = None
176+ elif callable (t .state ):
177+ # The task has a callback registered to be called on completion.
178+ t .state (t , er )
179+ t .state = False
180+ waiting = True
181+ else :
182+ # Schedule any other tasks waiting on the completion of this task.
183+ while t .state .peek ():
184+ _task_queue .push (t .state .pop ())
185+ waiting = True
186+ # "False" indicates that the task is complete and has been await'ed on.
187+ t .state = False
188+ if not waiting and not isinstance (er , excs_stop ):
189+ # An exception ended this detached task, so queue it for later
190+ # execution to handle the uncaught exception if no other task retrieves
191+ # the exception in the meantime (this is handled by Task.throw).
192+ _task_queue .push (t )
193+ # Save return value of coro to pass up to caller.
194+ t .data = er
195+ elif t .state is None :
196+ # Task is already finished and nothing await'ed on the task,
197+ # so call the exception handler.
198+
199+ # Save exception raised by the coro for later use.
200+ t .data = exc
201+
202+ # Create exception context and call the exception handler.
203+ _exc_context ["exception" ] = exc
204+ _exc_context ["future" ] = t
205+ Loop .call_exception_handler (_exc_context )
206+
207+
208+ # Create and schedule a new task from a coroutine.
209+ def create_task (coro ):
210+ if not hasattr (coro , "send" ):
211+ raise TypeError ("coroutine expected" )
212+ t = Task (coro , globals ())
213+ _task_queue .push (t )
214+ return t
215+
216+
217+ # Task used to suspend and resume top-level await.
218+ _top_level_task = Task (TopLevelCoro , globals ())
219+
220+ ################################################################################
221+ # Event loop wrapper
222+
223+
224+ cur_task = _top_level_task
225+
226+
227+ class Loop :
228+ _exc_handler = None
229+
230+ def create_task (coro ):
231+ return create_task (coro )
232+
233+ def close ():
234+ pass
235+
236+ def set_exception_handler (handler ):
237+ Loop ._exc_handler = handler
238+
239+ def get_exception_handler ():
240+ return Loop ._exc_handler
241+
242+ def default_exception_handler (loop , context ):
243+ print (context ["message" ], file = sys .stderr )
244+ print ("future:" , context ["future" ], "coro=" , context ["future" ].coro , file = sys .stderr )
245+ sys .print_exception (context ["exception" ], sys .stderr )
246+
247+ def call_exception_handler (context ):
248+ (Loop ._exc_handler or Loop .default_exception_handler )(Loop , context )
249+
250+
251+ def get_event_loop ():
252+ return Loop
253+
254+
255+ def current_task ():
256+ assert cur_task is not None
257+ return cur_task
258+
259+
260+ def new_event_loop ():
261+ global _task_queue
262+ _task_queue = TaskQueue (_schedule_run_iter ) # TaskQueue of Task instances.
263+ return Loop
264+
265+
266+ # Initialise default event loop.
267+ new_event_loop ()
0 commit comments