|
1 | 1 | import copy |
2 | | -from typing import Any, Awaitable, Callable, Mapping, Optional, Union |
| 2 | +from collections.abc import Awaitable, Callable, Mapping |
| 3 | +from typing import Any |
3 | 4 |
|
4 | 5 | from fastapi import FastAPI, Request |
5 | 6 | from starlette.requests import HTTPConnection |
|
9 | 10 |
|
10 | 11 | def startup_event_generator( |
11 | 12 | broker: AsyncBroker, |
12 | | - app_or_path: Union[str, FastAPI], |
| 13 | + app_or_path: str | FastAPI, |
13 | 14 | ) -> Callable[[TaskiqState], Awaitable[None]]: |
14 | 15 | """ |
15 | 16 | Generate shutdown event. |
@@ -67,7 +68,7 @@ async def shutdown(state: TaskiqState) -> None: |
67 | 68 | return shutdown |
68 | 69 |
|
69 | 70 |
|
70 | | -def init(broker: AsyncBroker, app_or_path: Union[str, FastAPI]) -> None: |
| 71 | +def init(broker: AsyncBroker, app_or_path: str | FastAPI) -> None: |
71 | 72 | """ |
72 | 73 | Add taskiq startup events. |
73 | 74 |
|
@@ -95,7 +96,7 @@ def init(broker: AsyncBroker, app_or_path: Union[str, FastAPI]) -> None: |
95 | 96 | def populate_dependency_context( |
96 | 97 | broker: AsyncBroker, |
97 | 98 | app: FastAPI, |
98 | | - asgi_state: Optional[Mapping[str, Any]] = None, |
| 99 | + asgi_state: Mapping[str, Any] | None = None, |
99 | 100 | ) -> None: |
100 | 101 | """ |
101 | 102 | Populate dependency context. |
|
0 commit comments