This example demonstrates how to use the Rewire library for managing the lifecycle of Python functions, both synchronous and asynchronous.
First, let's import the required modules and create a LifecycleModule instance:
from contextlib import asynccontextmanager
import anyio
from rewire import LifecycleModule
lm = LifecycleModule() # create lifecycle managerHere are two examples of functions - an asynchronous function my_async_function(), and a synchronous function my_sync_function().
async def my_async_function():
print("My async function ran")
def my_sync_function():
print("My sync function ran")To start the functions, pass them to the run method of the lifecycle manager:
lm.run(my_async_function()) # you can pass awaitables
lm.run(my_sync_function) # you can also pass callables (will run in thread)You can define stop callbacks that will be executed when the lifecycle manager is stopped:
async def my_function_with_stop_callback():
print("Running function with stop callback")
async def stop():
print("Stop callback called")
async def stop_sync():
print("Sync stop callback called")
lm.on_stop(stop) # you add stop hooks after start
lm.on_stop(stop_sync)Define an asynchronous context manager to run before any run() functions and exit after all stop() functions:
@asynccontextmanager
async def my_context_manager():
print("Context manager entered")
try:
yield
finally:
print("Context manager exited")Apply the context manager to the lifecycle manager:
lm.contextmanager(my_context_manager())Finally, use anyio.run() to start the lifecycle manager:
anyio.run(lm.start, True)