|
3 | 3 | import dis |
4 | 4 | import inspect |
5 | 5 | from collections.abc import Callable, Sequence |
| 6 | +from functools import lru_cache |
| 7 | +from types import CodeType |
6 | 8 | from typing import Any, Literal, cast, overload |
7 | 9 |
|
8 | 10 | from anyio import create_task_group |
@@ -105,29 +107,14 @@ def __init__( |
105 | 107 | while hasattr(func_to_inspect, "__wrapped__"): |
106 | 108 | func_to_inspect = func_to_inspect.__wrapped__ |
107 | 109 |
|
108 | | - code = func_to_inspect.__code__ |
109 | | - if code.co_argcount > 0: |
110 | | - event_arg_name = code.co_varnames[0] |
111 | | - last_was_event = False |
| 110 | + found_prevent_default, found_stop_propagation = _inspect_event_handler_code( |
| 111 | + func_to_inspect.__code__ |
| 112 | + ) |
112 | 113 |
|
113 | | - for instr in dis.get_instructions(func_to_inspect): |
114 | | - if ( |
115 | | - instr.opname in ("LOAD_FAST", "LOAD_FAST_BORROW") |
116 | | - and instr.argval == event_arg_name |
117 | | - ): |
118 | | - last_was_event = True |
119 | | - continue |
120 | | - |
121 | | - if last_was_event and instr.opname in ( |
122 | | - "LOAD_METHOD", |
123 | | - "LOAD_ATTR", |
124 | | - ): |
125 | | - if instr.argval == "preventDefault": |
126 | | - self.prevent_default = True |
127 | | - elif instr.argval == "stopPropagation": |
128 | | - self.stop_propagation = True |
129 | | - |
130 | | - last_was_event = False |
| 114 | + if found_prevent_default: |
| 115 | + self.prevent_default = True |
| 116 | + if found_stop_propagation: |
| 117 | + self.stop_propagation = True |
131 | 118 |
|
132 | 119 | __hash__ = None # type: ignore |
133 | 120 |
|
@@ -242,3 +229,46 @@ async def await_all_event_handlers(data: Sequence[Any]) -> None: |
242 | 229 | group.start_soon(func, data) |
243 | 230 |
|
244 | 231 | return await_all_event_handlers |
| 232 | + |
| 233 | + |
| 234 | +@lru_cache(maxsize=4096) |
| 235 | +def _inspect_event_handler_code(code: CodeType) -> tuple[bool, bool]: |
| 236 | + prevent_default = False |
| 237 | + stop_propagation = False |
| 238 | + |
| 239 | + if code.co_argcount > 0: |
| 240 | + names = code.co_names |
| 241 | + check_prevent_default = "preventDefault" in names |
| 242 | + check_stop_propagation = "stopPropagation" in names |
| 243 | + |
| 244 | + if not (check_prevent_default or check_stop_propagation): |
| 245 | + return False, False |
| 246 | + |
| 247 | + event_arg_name = code.co_varnames[0] |
| 248 | + last_was_event = False |
| 249 | + |
| 250 | + for instr in dis.get_instructions(code): |
| 251 | + if ( |
| 252 | + instr.opname in ("LOAD_FAST", "LOAD_FAST_BORROW") |
| 253 | + and instr.argval == event_arg_name |
| 254 | + ): |
| 255 | + last_was_event = True |
| 256 | + continue |
| 257 | + |
| 258 | + if last_was_event and instr.opname in ( |
| 259 | + "LOAD_METHOD", |
| 260 | + "LOAD_ATTR", |
| 261 | + ): |
| 262 | + if check_prevent_default and instr.argval == "preventDefault": |
| 263 | + prevent_default = True |
| 264 | + check_prevent_default = False |
| 265 | + elif check_stop_propagation and instr.argval == "stopPropagation": |
| 266 | + stop_propagation = True |
| 267 | + check_stop_propagation = False |
| 268 | + |
| 269 | + if not (check_prevent_default or check_stop_propagation): |
| 270 | + break |
| 271 | + |
| 272 | + last_was_event = False |
| 273 | + |
| 274 | + return prevent_default, stop_propagation |
0 commit comments