Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions lib/crewai/src/crewai/events/base_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ class BaseEvent(BaseModel):
parent_event_id: str | None = None
previous_event_id: str | None = None
triggered_by_event_id: str | None = None
started_event_id: str | None = None
emission_sequence: int | None = None

def to_json(self, exclude: set[str] | None = None) -> Serializable:
Expand Down
59 changes: 44 additions & 15 deletions lib/crewai/src/crewai/events/event_bus.py
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,8 @@ def emit(self, source: Any, event: BaseEvent) -> Future[None] | None:
if popped is None:
handle_empty_pop(event_type_name)
else:
_, popped_type = popped
popped_event_id, popped_type = popped
event.started_event_id = popped_event_id
expected_start = VALID_EVENT_PAIRS.get(event_type_name)
if expected_start and popped_type and popped_type != expected_start:
handle_mismatch(event_type_name, popped_type, expected_start)
Expand Down Expand Up @@ -569,24 +570,52 @@ def scoped_handlers(self) -> Generator[None, Any, None]:
... # Do stuff...
... # Handlers are cleared after the context
"""
with self._rwlock.w_locked():
prev_sync = self._sync_handlers
prev_async = self._async_handlers
prev_deps = self._handler_dependencies
prev_cache = self._execution_plan_cache
self._sync_handlers = {}
self._async_handlers = {}
self._handler_dependencies = {}
self._execution_plan_cache = {}
with self._rwlock.r_locked():
saved_sync: dict[type[BaseEvent], frozenset[SyncHandler]] = dict(
self._sync_handlers
)
saved_async: dict[type[BaseEvent], frozenset[AsyncHandler]] = dict(
self._async_handlers
)
saved_deps: dict[type[BaseEvent], dict[Handler, list[Depends[Any]]]] = {
event_type: dict(handlers)
for event_type, handlers in self._handler_dependencies.items()
}

for event_type, sync_handlers in saved_sync.items():
for sync_handler in sync_handlers:
self.off(event_type, sync_handler)

for event_type, async_handlers in saved_async.items():
for async_handler in async_handlers:
self.off(event_type, async_handler)

try:
yield
finally:
with self._rwlock.w_locked():
self._sync_handlers = prev_sync
self._async_handlers = prev_async
self._handler_dependencies = prev_deps
self._execution_plan_cache = prev_cache
with self._rwlock.r_locked():
current_sync = dict(self._sync_handlers)
current_async = dict(self._async_handlers)

for event_type, cur_sync in current_sync.items():
orig_sync = saved_sync.get(event_type, frozenset())
for new_handler in cur_sync - orig_sync:
self.off(event_type, new_handler)

for event_type, cur_async in current_async.items():
orig_async = saved_async.get(event_type, frozenset())
for new_async_handler in cur_async - orig_async:
self.off(event_type, new_async_handler)

for event_type, sync_handlers in saved_sync.items():
for sync_handler in sync_handlers:
deps = saved_deps.get(event_type, {}).get(sync_handler)
self._register_handler(event_type, sync_handler, deps)

for event_type, async_handlers in saved_async.items():
for async_handler in async_handlers:
deps = saved_deps.get(event_type, {}).get(async_handler)
self._register_handler(event_type, async_handler, deps)

def shutdown(self, wait: bool = True) -> None:
"""Gracefully shutdown the event loop and wait for all tasks to finish.
Expand Down
4 changes: 4 additions & 0 deletions lib/crewai/tests/memory/test_external_memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,7 @@ def on_search_completed(source, event):
"parent_event_id": None,
"previous_event_id": ANY,
"triggered_by_event_id": None,
"started_event_id": ANY,
"emission_sequence": ANY,
"query": "test value",
"limit": 3,
Expand All @@ -330,6 +331,7 @@ def on_search_completed(source, event):
"parent_event_id": ANY,
"previous_event_id": ANY,
"triggered_by_event_id": None,
"started_event_id": ANY,
"emission_sequence": ANY,
"query": "test value",
"results": [],
Expand Down Expand Up @@ -390,6 +392,7 @@ def on_save_completed(source, event):
"parent_event_id": None,
"previous_event_id": ANY,
"triggered_by_event_id": None,
"started_event_id": ANY,
"emission_sequence": ANY,
"value": "saving value",
"metadata": {"task": "test_task"},
Expand All @@ -411,6 +414,7 @@ def on_save_completed(source, event):
"parent_event_id": ANY,
"previous_event_id": ANY,
"triggered_by_event_id": None,
"started_event_id": ANY,
"emission_sequence": ANY,
"value": "saving value",
"metadata": {"task": "test_task"},
Expand Down
4 changes: 4 additions & 0 deletions lib/crewai/tests/memory/test_long_term_memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ def on_save_completed(source, event):
"parent_event_id": None,
"previous_event_id": ANY,
"triggered_by_event_id": None,
"started_event_id": ANY,
"emission_sequence": ANY,
"value": "test_task",
"metadata": {"task": "test_task", "quality": 0.5},
Expand All @@ -94,6 +95,7 @@ def on_save_completed(source, event):
"parent_event_id": None,
"previous_event_id": ANY,
"triggered_by_event_id": None,
"started_event_id": ANY,
"emission_sequence": ANY,
"value": "test_task",
"metadata": {
Expand Down Expand Up @@ -153,6 +155,7 @@ def on_search_completed(source, event):
"parent_event_id": None,
"previous_event_id": ANY,
"triggered_by_event_id": None,
"started_event_id": ANY,
"emission_sequence": ANY,
"query": "test query",
"limit": 5,
Expand All @@ -175,6 +178,7 @@ def on_search_completed(source, event):
"parent_event_id": ANY,
"previous_event_id": ANY,
"triggered_by_event_id": None,
"started_event_id": ANY,
"emission_sequence": ANY,
"query": "test query",
"results": None,
Expand Down
4 changes: 4 additions & 0 deletions lib/crewai/tests/memory/test_short_term_memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ def on_search_completed(source, event):
"parent_event_id": None,
"previous_event_id": ANY,
"triggered_by_event_id": None,
"started_event_id": ANY,
"emission_sequence": ANY,
"query": "test value",
"limit": 3,
Expand All @@ -107,6 +108,7 @@ def on_search_completed(source, event):
"parent_event_id": None,
"previous_event_id": ANY,
"triggered_by_event_id": None,
"started_event_id": ANY,
"emission_sequence": ANY,
"query": "test value",
"results": [],
Expand Down Expand Up @@ -164,6 +166,7 @@ def on_save_completed(source, event):
"parent_event_id": None,
"previous_event_id": ANY,
"triggered_by_event_id": None,
"started_event_id": ANY,
"emission_sequence": ANY,
"value": "test value",
"metadata": {"task": "test_task"},
Expand All @@ -185,6 +188,7 @@ def on_save_completed(source, event):
"parent_event_id": None,
"previous_event_id": ANY,
"triggered_by_event_id": None,
"started_event_id": ANY,
"emission_sequence": ANY,
"value": "test value",
"metadata": {"task": "test_task"},
Expand Down
Loading