Skip to content

Commit 9496a69

Browse files
author
Tom Softreck
committed
update
1 parent 77b1135 commit 9496a69

File tree

17 files changed

+2327
-0
lines changed

17 files changed

+2327
-0
lines changed

src/dialogchain/engine/__init__.py

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
"""
2+
DialogChain Engine Package
3+
4+
This package contains the core engine components for DialogChain,
5+
split into modular components for better maintainability.
6+
"""
7+
8+
from .core import DialogChainEngine
9+
from .route import Route, RouteConfig
10+
from .processor import ProcessorManager, ProcessorConfig
11+
from .connector import ConnectorManager, default_connector_manager
12+
from .utils import parse_uri, merge_dicts, get_nested_value, set_nested_value, deep_update, format_template
13+
14+
__all__ = [
15+
'DialogChainEngine',
16+
'Route',
17+
'RouteConfig',
18+
'ProcessorManager',
19+
'ProcessorConfig',
20+
'ConnectorManager',
21+
'default_connector_manager',
22+
'parse_uri',
23+
'merge_dicts',
24+
'get_nested_value',
25+
'set_nested_value',
26+
'deep_update',
27+
'format_template',
28+
]
Lines changed: 280 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,280 @@
1+
"""
2+
Connector management for DialogChain engine.
3+
4+
This module handles the creation and management of source and destination connectors.
5+
"""
6+
7+
from typing import Dict, Any, Optional, Type, Union
8+
import logging
9+
import importlib
10+
from urllib.parse import urlparse, parse_qs
11+
12+
from ..connectors import Source, Destination, ConnectorError
13+
14+
logger = logging.getLogger(__name__)
15+
16+
class ConnectorManager:
17+
"""Manages the creation and lifecycle of connectors."""
18+
19+
def __init__(
20+
self,
21+
source_types: Optional[Dict[str, Type[Source]]] = None,
22+
destination_types: Optional[Dict[str, Type[Destination]]] = None
23+
):
24+
"""Initialize the connector manager.
25+
26+
Args:
27+
source_types: Optional dictionary of source types to source classes
28+
destination_types: Optional dictionary of destination types to destination classes
29+
"""
30+
self.source_types = source_types or {}
31+
self.destination_types = destination_types or {}
32+
33+
# Register built-in connectors
34+
self._register_builtin_connectors()
35+
36+
def _register_builtin_connectors(self) -> None:
37+
"""Register built-in source and destination connectors."""
38+
from ..connectors.sources import (
39+
RTSPSource, TimerSource, FileSource, IMAPSource
40+
)
41+
from ..connectors.destinations import (
42+
HTTPDestination, EmailDestination, FileDestination, LogDestination
43+
)
44+
45+
# Register built-in sources
46+
self.register_source('rtsp', RTSPSource)
47+
self.register_source('timer', TimerSource)
48+
self.register_source('file', FileSource)
49+
self.register_source('imap', IMAPSource)
50+
51+
# Register built-in destinations
52+
self.register_destination('http', HTTPDestination)
53+
self.register_destination('https', HTTPDestination)
54+
self.register_destination('smtp', EmailDestination)
55+
self.register_destination('file', FileDestination)
56+
self.register_destination('log', LogDestination)
57+
58+
def register_source(self, scheme: str, source_class: Type[Source]) -> None:
59+
"""Register a source connector class.
60+
61+
Args:
62+
scheme: URI scheme (e.g., 'http', 'file')
63+
source_class: Source class to register
64+
"""
65+
self.source_types[scheme] = source_class
66+
logger.debug(f"Registered source connector: {scheme} -> {source_class.__name__}")
67+
68+
def register_destination(self, scheme: str, dest_class: Type[Destination]) -> None:
69+
"""Register a destination connector class.
70+
71+
Args:
72+
scheme: URI scheme (e.g., 'http', 'file')
73+
dest_class: Destination class to register
74+
"""
75+
self.destination_types[scheme] = dest_class
76+
logger.debug(f"Registered destination connector: {scheme} -> {dest_class.__name__}")
77+
78+
def create_source(self, config: Union[str, Dict[str, Any]]) -> Source:
79+
"""Create a source connector from a URI or config dictionary.
80+
81+
Args:
82+
config: Either a URI string or a config dictionary
83+
84+
Returns:
85+
Configured Source instance
86+
87+
Raises:
88+
ValueError: If the source type is unknown
89+
ConnectorError: If there's an error creating the connector
90+
"""
91+
if isinstance(config, str):
92+
return self._create_source_from_uri(config)
93+
elif isinstance(config, dict):
94+
return self._create_source_from_config(config)
95+
else:
96+
raise ValueError(f"Invalid source config type: {type(config)}")
97+
98+
def create_destination(self, config: Union[str, Dict[str, Any]]) -> Destination:
99+
"""Create a destination connector from a URI or config dictionary.
100+
101+
Args:
102+
config: Either a URI string or a config dictionary
103+
104+
Returns:
105+
Configured Destination instance
106+
107+
Raises:
108+
ValueError: If the destination type is unknown
109+
ConnectorError: If there's an error creating the connector
110+
"""
111+
if isinstance(config, str):
112+
return self._create_destination_from_uri(config)
113+
elif isinstance(config, dict):
114+
return self._create_destination_from_config(config)
115+
else:
116+
raise ValueError(f"Invalid destination config type: {type(config)}")
117+
118+
def _create_source_from_uri(self, uri: str) -> Source:
119+
"""Create a source connector from a URI.
120+
121+
Args:
122+
uri: Source URI (e.g., 'timer:5s', 'imap://user:pass@server')
123+
124+
Returns:
125+
Configured Source instance
126+
"""
127+
if '://' not in uri and ':' in uri:
128+
# Handle URIs without slashes (e.g., 'timer:5s')
129+
scheme, path = uri.split(':', 1)
130+
uri = f"{scheme}://{path}"
131+
132+
parsed = self._parse_uri(uri)
133+
scheme = parsed['scheme']
134+
135+
if scheme not in self.source_types:
136+
raise ValueError(f"Unsupported source type: {scheme}")
137+
138+
source_class = self.source_types[scheme]
139+
try:
140+
return source_class(uri, **parsed['options'])
141+
except Exception as e:
142+
raise ConnectorError(f"Failed to create source {scheme}: {e}") from e
143+
144+
def _create_destination_from_uri(self, uri: str) -> Destination:
145+
"""Create a destination connector from a URI.
146+
147+
Args:
148+
uri: Destination URI (e.g., 'http://example.com', 'file:/path/to/file')
149+
150+
Returns:
151+
Configured Destination instance
152+
"""
153+
parsed = self._parse_uri(uri)
154+
scheme = parsed['scheme']
155+
156+
if scheme not in self.destination_types:
157+
raise ValueError(f"Unsupported destination type: {scheme}")
158+
159+
dest_class = self.destination_types[scheme]
160+
try:
161+
return dest_class(uri, **parsed['options'])
162+
except Exception as e:
163+
raise ConnectorError(f"Failed to create destination {scheme}: {e}") from e
164+
165+
def _create_source_from_config(self, config: Dict[str, Any]) -> Source:
166+
"""Create a source connector from a config dictionary.
167+
168+
Args:
169+
config: Source configuration dictionary
170+
171+
Returns:
172+
Configured Source instance
173+
"""
174+
source_type = config.get('type')
175+
if not source_type:
176+
raise ValueError("Source config must include 'type'")
177+
178+
if source_type not in self.source_types:
179+
# Try to dynamically import the source
180+
try:
181+
module_name, _, class_name = source_type.rpartition('.')
182+
if not module_name:
183+
raise ValueError(f"Invalid source type format: {source_type}")
184+
185+
module = importlib.import_module(module_name)
186+
source_class = getattr(module, class_name)
187+
self.register_source(source_type, source_class)
188+
except (ImportError, AttributeError) as e:
189+
raise ValueError(f"Unknown source type: {source_type}") from e
190+
191+
source_class = self.source_types[source_type]
192+
try:
193+
return source_class(**{k: v for k, v in config.items() if k != 'type'})
194+
except Exception as e:
195+
raise ConnectorError(f"Failed to create source {source_type}: {e}") from e
196+
197+
def _create_destination_from_config(self, config: Dict[str, Any]) -> Destination:
198+
"""Create a destination connector from a config dictionary.
199+
200+
Args:
201+
config: Destination configuration dictionary
202+
203+
Returns:
204+
Configured Destination instance
205+
"""
206+
dest_type = config.get('type')
207+
if not dest_type:
208+
raise ValueError("Destination config must include 'type'")
209+
210+
if dest_type not in self.destination_types:
211+
# Try to dynamically import the destination
212+
try:
213+
module_name, _, class_name = dest_type.rpartition('.')
214+
if not module_name:
215+
raise ValueError(f"Invalid destination type format: {dest_type}")
216+
217+
module = importlib.import_module(module_name)
218+
dest_class = getattr(module, class_name)
219+
self.register_destination(dest_type, dest_class)
220+
except (ImportError, AttributeError) as e:
221+
raise ValueError(f"Unknown destination type: {dest_type}") from e
222+
223+
dest_class = self.destination_types[dest_type]
224+
try:
225+
return dest_class(**{k: v for k, v in config.items() if k != 'type'})
226+
except Exception as e:
227+
raise ConnectorError(f"Failed to create destination {dest_type}: {e}") from e
228+
229+
@staticmethod
230+
def _parse_uri(uri: str) -> Dict[str, Any]:
231+
"""Parse a URI into its components and query parameters.
232+
233+
Args:
234+
uri: URI to parse
235+
236+
Returns:
237+
Dictionary with 'scheme', 'netloc', 'path', 'params', 'query', 'fragment',
238+
and 'options' (query parameters as a dict)
239+
"""
240+
if '://' not in uri and ':' in uri:
241+
# Handle simple scheme:path format
242+
scheme, path = uri.split(':', 1)
243+
return {
244+
'scheme': scheme,
245+
'netloc': '',
246+
'path': path,
247+
'params': '',
248+
'query': '',
249+
'fragment': '',
250+
'options': {}
251+
}
252+
253+
parsed = urlparse(uri)
254+
options = {}
255+
256+
# Parse query parameters
257+
if parsed.query:
258+
query_params = parse_qs(parsed.query, keep_blank_values=True)
259+
options = {k: v[0] if len(v) == 1 else v for k, v in query_params.items()}
260+
261+
# Add username/password from netloc if present
262+
if '@' in parsed.netloc:
263+
auth_part, netloc = parsed.netloc.rsplit('@', 1)
264+
if ':' in auth_part:
265+
username, password = auth_part.split(':', 1)
266+
options['username'] = username
267+
options['password'] = password
268+
269+
return {
270+
'scheme': parsed.scheme,
271+
'netloc': parsed.netloc,
272+
'path': parsed.path,
273+
'params': parsed.params,
274+
'query': parsed.query,
275+
'fragment': parsed.fragment,
276+
'options': options
277+
}
278+
279+
# Default connector manager instance
280+
default_connector_manager = ConnectorManager()

0 commit comments

Comments
 (0)