-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathbasic_usage.py
More file actions
89 lines (79 loc) · 2.61 KB
/
basic_usage.py
File metadata and controls
89 lines (79 loc) · 2.61 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
#!/usr/bin/env python3
"""
Basic usage example for DialogChain.
This example demonstrates how to create a simple DialogChain application
with a timer source and a logging destination.
"""
import asyncio
import logging
from datetime import datetime
# Set up basic logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
async def timer_source(interval_seconds=3):
"""A simple timer source that yields messages at regular intervals."""
while True:
try:
# Create a message with the current time
message = {
'timestamp': datetime.now().isoformat(),
'message': 'Hello from Timer Source!'
}
yield message
await asyncio.sleep(interval_seconds)
except asyncio.CancelledError:
logger.info("Timer source cancelled")
break
except Exception as e:
logger.error(f"Error in timer source: {e}")
await asyncio.sleep(1) # Prevent tight loop on errors
async def log_destination(message):
"""A simple logging destination."""
try:
logger.info(f"Received message: {message}")
except Exception as e:
logger.error(f"Error in log destination: {e}")
async def main():
"""Main function to run the example."""
logger.info("Starting DialogChain example...")
# Create an async task for the timer source
timer_task = asyncio.create_task(
process_messages(timer_source(3), log_destination)
)
try:
# Run for 15 seconds
await asyncio.sleep(15)
# Cancel the timer task
timer_task.cancel()
try:
await timer_task
except asyncio.CancelledError:
pass
except KeyboardInterrupt:
logger.info("Shutting down...")
timer_task.cancel()
try:
await timer_task
except asyncio.CancelledError:
pass
async def process_messages(source, destination):
"""Process messages from source and send to destination."""
try:
async for message in source:
await destination(message)
except asyncio.CancelledError:
logger.info("Message processing cancelled")
raise
except Exception as e:
logger.error(f"Error in message processing: {e}")
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
logger.info("Application stopped by user")
except Exception as e:
logger.error(f"Application error: {e}")
raise