diff --git a/.devcontainer/postCreateCommand.sh b/.devcontainer/postCreateCommand.sh index e2ee4587d..9278287eb 100755 --- a/.devcontainer/postCreateCommand.sh +++ b/.devcontainer/postCreateCommand.sh @@ -3,6 +3,7 @@ pip3 install -e . pip3 install -e ./ext/dapr-ext-grpc/ pip3 install -e ./ext/dapr-ext-fastapi/ pip3 install -e ./ext/dapr-ext-workflow/ +pip3 install -e ./ext/dapr-ext-crewai/ # Install required packages pip3 install -r ./dev-requirements.txt diff --git a/.github/dependabot.yml b/.github/dependabot.yml index bbfe6e18d..dc01b9b54 100644 --- a/.github/dependabot.yml +++ b/.github/dependabot.yml @@ -16,6 +16,10 @@ updates: directory: "/ext/flask_dapr" schedule: interval: "daily" + - package-ecosystem: "pip" + directory: "/ext/dapr-ext-crewai" + schedule: + interval: "daily" - package-ecosystem: "github-actions" directory: "/" schedule: diff --git a/.github/workflows/build-push-to-main.yaml b/.github/workflows/build-push-to-main.yaml index 499daebde..dfff5ba5b 100644 --- a/.github/workflows/build-push-to-main.yaml +++ b/.github/workflows/build-push-to-main.yaml @@ -120,3 +120,10 @@ jobs: cd ext/dapr-ext-strands python setup.py sdist bdist_wheel twine upload dist/* + - name: Build and publish dapr-ext-crewai + env: + TWINE_PASSWORD: ${{ secrets.PYPI_UPLOAD_PASS }} + run: | + cd ext/dapr-ext-crewai + python setup.py sdist bdist_wheel + twine upload dist/* diff --git a/.github/workflows/build-tag.yaml b/.github/workflows/build-tag.yaml index aabcbca23..a867e48fd 100644 --- a/.github/workflows/build-tag.yaml +++ b/.github/workflows/build-tag.yaml @@ -10,6 +10,7 @@ on: - fastapi-v* - langgraph-v* - strands-v* + - crewai-v* workflow_dispatch: jobs: @@ -133,3 +134,11 @@ jobs: cd ext/dapr-ext-strands python setup.py sdist bdist_wheel twine upload dist/* + - name: Build and publish dapr-ext-crewai + if: startsWith(github.ref_name, 'crewai-v') + env: + TWINE_PASSWORD: ${{ secrets.PYPI_UPLOAD_PASS }} + run: | + cd ext/dapr-ext-crewai + python setup.py sdist bdist_wheel + twine upload dist/* diff --git a/.github/workflows/build.yaml b/.github/workflows/build.yaml index 7e03d1b80..6d65e634d 100644 --- a/.github/workflows/build.yaml +++ b/.github/workflows/build.yaml @@ -60,5 +60,8 @@ jobs: - name: Run unit-tests run: | tox -e py`echo "${{ matrix.python_ver }}" | sed 's/\.//g'` + - name: Run crewai unit-tests + run: | + tox -e crewai - name: Upload test coverage uses: codecov/codecov-action@v5 diff --git a/.github/workflows/fossa.yaml b/.github/workflows/fossa.yaml index fda00b65c..f853a84f8 100644 --- a/.github/workflows/fossa.yaml +++ b/.github/workflows/fossa.yaml @@ -28,6 +28,7 @@ on: - fastapi-v* - langgraph-v* - strands-v* + - crewai-v* pull_request: branches: - main diff --git a/.github/workflows/validate_examples.yaml b/.github/workflows/validate_examples.yaml index a003bb025..87c03c0c9 100644 --- a/.github/workflows/validate_examples.yaml +++ b/.github/workflows/validate_examples.yaml @@ -13,6 +13,7 @@ on: - flask-v* - langgraph-v* - strands-v* + - crewai-v* pull_request: branches: - main diff --git a/ext/dapr-ext-crewai/LICENSE b/ext/dapr-ext-crewai/LICENSE new file mode 100644 index 000000000..be033a7fd --- /dev/null +++ b/ext/dapr-ext-crewai/LICENSE @@ -0,0 +1,203 @@ +Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright 2021 The Dapr Authors. + + and others that have contributed code to the public domain. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. \ No newline at end of file diff --git a/ext/dapr-ext-crewai/README.rst b/ext/dapr-ext-crewai/README.rst new file mode 100644 index 000000000..e886451e3 --- /dev/null +++ b/ext/dapr-ext-crewai/README.rst @@ -0,0 +1,112 @@ +Dapr CrewAI Extension +===================== + +This is the Dapr Workflow extension for CrewAI agents. + +This extension enables durable execution of CrewAI agents using Dapr Workflows. +Each tool execution in an agent runs as a separate Dapr Workflow activity, providing: + +- **Fault tolerance**: Agents automatically resume from the last successful activity on failure +- **Durability**: Agent state is persisted and can survive process restarts +- **Observability**: Full visibility into agent execution through Dapr's workflow APIs + +Installation +------------ + +.. code-block:: bash + + pip install dapr-ext-crewai + +Prerequisites +------------- + +1. Dapr installed and initialized (``dapr init``) +2. A running Dapr sidecar (``dapr run`` or Kubernetes) + +Quick Start +----------- + +.. code-block:: python + + from crewai import Agent, Task + from crewai.tools import tool + from dapr.ext.crewai import DaprWorkflowAgentRunner + + @tool("Search the web for information") + def search_web(query: str) -> str: + """Search the web and return results.""" + return f"Results for: {query}" + + # Create your CrewAI agent + agent = Agent( + role="Research Assistant", + goal="Help users find accurate information", + backstory="An expert researcher with access to various tools", + tools=[search_web], + llm="openai/gpt-4o-mini", + ) + + # Define a task + task = Task( + description="Research the latest developments in AI agents", + expected_output="A comprehensive summary of recent AI agent news", + agent=agent, + ) + + # Create runner and start the workflow runtime + runner = DaprWorkflowAgentRunner(agent=agent) + runner.start() + + # Run the agent - each tool call is now durable + async for event in runner.run_async(task=task): + print(event) + + runner.shutdown() + +Running with Dapr +----------------- + +.. code-block:: bash + + dapr run --app-id crewai-agent --dapr-grpc-port 50001 -- python your_script.py + +How It Works +------------ + +The extension wraps CrewAI agent execution in a Dapr Workflow: + +1. **Workflow**: ``crewai_agent_workflow`` orchestrates the agent's execution loop +2. **Activities**: + - ``call_llm_activity``: Calls the LLM to decide the next action + - ``execute_tool_activity``: Executes a single tool durably + +Each iteration of the agent loop is checkpointed, so if the process fails, +the workflow resumes from the last successful activity. + +Advanced Usage +-------------- + +Synchronous execution: + +.. code-block:: python + + result = runner.run_sync(task=task, timeout=300) + print(result.final_response) + +Check workflow status: + +.. code-block:: python + + status = runner.get_workflow_status(workflow_id) + print(status) + +Terminate a workflow: + +.. code-block:: python + + runner.terminate_workflow(workflow_id) + +License +------- + +Apache License 2.0 diff --git a/ext/dapr-ext-crewai/dapr/ext/crewai/__init__.py b/ext/dapr-ext-crewai/dapr/ext/crewai/__init__.py new file mode 100644 index 000000000..c630d8ea8 --- /dev/null +++ b/ext/dapr-ext-crewai/dapr/ext/crewai/__init__.py @@ -0,0 +1,69 @@ +# -*- coding: utf-8 -*- + +""" +Copyright 2025 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" + +from .models import ( + AgentConfig, + AgentWorkflowInput, + AgentWorkflowOutput, + CallLlmInput, + CallLlmOutput, + ExecuteToolInput, + ExecuteToolOutput, + Message, + MessageRole, + TaskConfig, + ToolCall, + ToolDefinition, + ToolResult, +) +from .runner import DaprWorkflowAgentRunner +from .version import __version__ +from .workflow import ( + call_llm_activity, + clear_tool_registry, + crewai_agent_workflow, + execute_tool_activity, + get_registered_tool, + register_tool, +) + +__all__ = [ + # Main runner class + 'DaprWorkflowAgentRunner', + # Data models + 'AgentConfig', + 'AgentWorkflowInput', + 'AgentWorkflowOutput', + 'CallLlmInput', + 'CallLlmOutput', + 'ExecuteToolInput', + 'ExecuteToolOutput', + 'Message', + 'MessageRole', + 'TaskConfig', + 'ToolCall', + 'ToolDefinition', + 'ToolResult', + # Workflow and activities (for advanced usage) + 'crewai_agent_workflow', + 'call_llm_activity', + 'execute_tool_activity', + # Tool registry (for advanced usage) + 'register_tool', + 'get_registered_tool', + 'clear_tool_registry', + # Version + '__version__', +] diff --git a/ext/dapr-ext-crewai/dapr/ext/crewai/models.py b/ext/dapr-ext-crewai/dapr/ext/crewai/models.py new file mode 100644 index 000000000..acea281bb --- /dev/null +++ b/ext/dapr-ext-crewai/dapr/ext/crewai/models.py @@ -0,0 +1,397 @@ +# -*- coding: utf-8 -*- + +""" +Copyright 2025 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" + +# Serializable data models for CrewAI Dapr Workflow integration. + +from dataclasses import dataclass, field +from enum import Enum +from typing import Any, Optional + + +class MessageRole(str, Enum): + """Role of a message in the conversation.""" + + USER = 'user' + ASSISTANT = 'assistant' + TOOL = 'tool' + SYSTEM = 'system' + + +@dataclass +class ToolCall: + """Represents a tool call from the LLM.""" + + id: str + name: str + args: dict[str, Any] + + def to_dict(self) -> dict[str, Any]: + """Convert to dictionary for serialization.""" + return { + 'id': self.id, + 'name': self.name, + 'args': self.args, + } + + @classmethod + def from_dict(cls, data: dict[str, Any]) -> 'ToolCall': + """Create from dictionary.""" + return cls( + id=data['id'], + name=data['name'], + args=data['args'], + ) + + +@dataclass +class ToolResult: + """Represents the result of a tool execution.""" + + tool_call_id: str + tool_name: str + result: Any + error: Optional[str] = None + + def to_dict(self) -> dict[str, Any]: + """Convert to dictionary for serialization.""" + return { + 'tool_call_id': self.tool_call_id, + 'tool_name': self.tool_name, + 'result': self.result, + 'error': self.error, + } + + @classmethod + def from_dict(cls, data: dict[str, Any]) -> 'ToolResult': + """Create from dictionary.""" + return cls( + tool_call_id=data['tool_call_id'], + tool_name=data['tool_name'], + result=data['result'], + error=data.get('error'), + ) + + +@dataclass +class Message: + """A serializable message in the conversation. + + This is a simplified representation that can be serialized for Dapr workflow state. + """ + + role: MessageRole + content: Optional[str] = None + tool_calls: list[ToolCall] = field(default_factory=list) + tool_results: list[ToolResult] = field(default_factory=list) + tool_call_id: Optional[str] = None # For tool response messages + name: Optional[str] = None # Tool name for tool response messages + + def to_dict(self) -> dict[str, Any]: + """Convert to dictionary for serialization.""" + return { + 'role': self.role.value, + 'content': self.content, + 'tool_calls': [tc.to_dict() for tc in self.tool_calls], + 'tool_results': [tr.to_dict() for tr in self.tool_results], + 'tool_call_id': self.tool_call_id, + 'name': self.name, + } + + @classmethod + def from_dict(cls, data: dict[str, Any]) -> 'Message': + """Create from dictionary.""" + return cls( + role=MessageRole(data['role']), + content=data.get('content'), + tool_calls=[ToolCall.from_dict(tc) for tc in data.get('tool_calls', [])], + tool_results=[ToolResult.from_dict(tr) for tr in data.get('tool_results', [])], + tool_call_id=data.get('tool_call_id'), + name=data.get('name'), + ) + + +@dataclass +class ToolDefinition: + """Serializable tool definition.""" + + name: str + description: str + parameters: Optional[dict[str, Any]] = None + result_as_answer: bool = False + + def to_dict(self) -> dict[str, Any]: + """Convert to dictionary for serialization.""" + return { + 'name': self.name, + 'description': self.description, + 'parameters': self.parameters, + 'result_as_answer': self.result_as_answer, + } + + @classmethod + def from_dict(cls, data: dict[str, Any]) -> 'ToolDefinition': + """Create from dictionary.""" + return cls( + name=data['name'], + description=data['description'], + parameters=data.get('parameters'), + result_as_answer=data.get('result_as_answer', False), + ) + + +@dataclass +class AgentConfig: + """Serializable agent configuration.""" + + role: str + goal: str + backstory: str + model: str + tool_definitions: list[ToolDefinition] = field(default_factory=list) + max_iter: int = 25 + verbose: bool = False + allow_delegation: bool = False + system_template: Optional[str] = None + prompt_template: Optional[str] = None + response_template: Optional[str] = None + + def to_dict(self) -> dict[str, Any]: + """Convert to dictionary for serialization.""" + return { + 'role': self.role, + 'goal': self.goal, + 'backstory': self.backstory, + 'model': self.model, + 'tool_definitions': [td.to_dict() for td in self.tool_definitions], + 'max_iter': self.max_iter, + 'verbose': self.verbose, + 'allow_delegation': self.allow_delegation, + 'system_template': self.system_template, + 'prompt_template': self.prompt_template, + 'response_template': self.response_template, + } + + @classmethod + def from_dict(cls, data: dict[str, Any]) -> 'AgentConfig': + """Create from dictionary.""" + return cls( + role=data['role'], + goal=data['goal'], + backstory=data['backstory'], + model=data['model'], + tool_definitions=[ + ToolDefinition.from_dict(td) for td in data.get('tool_definitions', []) + ], + max_iter=data.get('max_iter', 25), + verbose=data.get('verbose', False), + allow_delegation=data.get('allow_delegation', False), + system_template=data.get('system_template'), + prompt_template=data.get('prompt_template'), + response_template=data.get('response_template'), + ) + + +@dataclass +class TaskConfig: + """Serializable task configuration.""" + + description: str + expected_output: str + context: Optional[str] = None + + def to_dict(self) -> dict[str, Any]: + """Convert to dictionary for serialization.""" + return { + 'description': self.description, + 'expected_output': self.expected_output, + 'context': self.context, + } + + @classmethod + def from_dict(cls, data: dict[str, Any]) -> 'TaskConfig': + """Create from dictionary.""" + return cls( + description=data['description'], + expected_output=data['expected_output'], + context=data.get('context'), + ) + + +@dataclass +class AgentWorkflowInput: + """Input for the CrewAI agent workflow.""" + + agent_config: AgentConfig + task_config: TaskConfig + messages: list[Message] + session_id: str + iteration: int = 0 + max_iterations: int = 25 + + def to_dict(self) -> dict[str, Any]: + """Convert to dictionary for serialization.""" + return { + 'agent_config': self.agent_config.to_dict(), + 'task_config': self.task_config.to_dict(), + 'messages': [m.to_dict() for m in self.messages], + 'session_id': self.session_id, + 'iteration': self.iteration, + 'max_iterations': self.max_iterations, + } + + @classmethod + def from_dict(cls, data: dict[str, Any]) -> 'AgentWorkflowInput': + """Create from dictionary.""" + return cls( + agent_config=AgentConfig.from_dict(data['agent_config']), + task_config=TaskConfig.from_dict(data['task_config']), + messages=[Message.from_dict(m) for m in data['messages']], + session_id=data['session_id'], + iteration=data.get('iteration', 0), + max_iterations=data.get('max_iterations', 25), + ) + + +@dataclass +class AgentWorkflowOutput: + """Output from the CrewAI agent workflow.""" + + final_response: Optional[str] + messages: list[Message] + iterations: int + status: str = 'completed' + error: Optional[str] = None + + def to_dict(self) -> dict[str, Any]: + """Convert to dictionary for serialization.""" + return { + 'final_response': self.final_response, + 'messages': [m.to_dict() for m in self.messages], + 'iterations': self.iterations, + 'status': self.status, + 'error': self.error, + } + + @classmethod + def from_dict(cls, data: dict[str, Any]) -> 'AgentWorkflowOutput': + """Create from dictionary.""" + return cls( + final_response=data.get('final_response'), + messages=[Message.from_dict(m) for m in data['messages']], + iterations=data['iterations'], + status=data.get('status', 'completed'), + error=data.get('error'), + ) + + +@dataclass +class CallLlmInput: + """Input for the call_llm activity.""" + + agent_config: AgentConfig + task_config: TaskConfig + messages: list[Message] + + def to_dict(self) -> dict[str, Any]: + """Convert to dictionary for serialization.""" + return { + 'agent_config': self.agent_config.to_dict(), + 'task_config': self.task_config.to_dict(), + 'messages': [m.to_dict() for m in self.messages], + } + + @classmethod + def from_dict(cls, data: dict[str, Any]) -> 'CallLlmInput': + """Create from dictionary.""" + return cls( + agent_config=AgentConfig.from_dict(data['agent_config']), + task_config=TaskConfig.from_dict(data['task_config']), + messages=[Message.from_dict(m) for m in data['messages']], + ) + + +@dataclass +class CallLlmOutput: + """Output from the call_llm activity.""" + + message: Message + is_final: bool + error: Optional[str] = None + + def to_dict(self) -> dict[str, Any]: + """Convert to dictionary for serialization.""" + return { + 'message': self.message.to_dict(), + 'is_final': self.is_final, + 'error': self.error, + } + + @classmethod + def from_dict(cls, data: dict[str, Any]) -> 'CallLlmOutput': + """Create from dictionary.""" + return cls( + message=Message.from_dict(data['message']), + is_final=data['is_final'], + error=data.get('error'), + ) + + +@dataclass +class ExecuteToolInput: + """Input for the execute_tool activity.""" + + tool_call: ToolCall + agent_role: str + session_id: str + + def to_dict(self) -> dict[str, Any]: + """Convert to dictionary for serialization.""" + return { + 'tool_call': self.tool_call.to_dict(), + 'agent_role': self.agent_role, + 'session_id': self.session_id, + } + + @classmethod + def from_dict(cls, data: dict[str, Any]) -> 'ExecuteToolInput': + """Create from dictionary.""" + return cls( + tool_call=ToolCall.from_dict(data['tool_call']), + agent_role=data['agent_role'], + session_id=data['session_id'], + ) + + +@dataclass +class ExecuteToolOutput: + """Output from the execute_tool activity.""" + + tool_result: ToolResult + result_as_answer: bool = False + + def to_dict(self) -> dict[str, Any]: + """Convert to dictionary for serialization.""" + return { + 'tool_result': self.tool_result.to_dict(), + 'result_as_answer': self.result_as_answer, + } + + @classmethod + def from_dict(cls, data: dict[str, Any]) -> 'ExecuteToolOutput': + """Create from dictionary.""" + return cls( + tool_result=ToolResult.from_dict(data['tool_result']), + result_as_answer=data.get('result_as_answer', False), + ) diff --git a/ext/dapr-ext-crewai/dapr/ext/crewai/py.typed b/ext/dapr-ext-crewai/dapr/ext/crewai/py.typed new file mode 100644 index 000000000..7632ecf77 --- /dev/null +++ b/ext/dapr-ext-crewai/dapr/ext/crewai/py.typed @@ -0,0 +1 @@ +# Marker file for PEP 561 diff --git a/ext/dapr-ext-crewai/dapr/ext/crewai/runner.py b/ext/dapr-ext-crewai/dapr/ext/crewai/runner.py new file mode 100644 index 000000000..bfb40ac18 --- /dev/null +++ b/ext/dapr-ext-crewai/dapr/ext/crewai/runner.py @@ -0,0 +1,607 @@ +# -*- coding: utf-8 -*- + +""" +Copyright 2025 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" + +# Runner for executing CrewAI agents as Dapr Workflows. + +import json +import logging +import uuid +from typing import TYPE_CHECKING, Any, AsyncIterator, Optional + +from dapr.ext.workflow import DaprWorkflowClient, WorkflowRuntime, WorkflowStatus + +from .models import ( + AgentConfig, + AgentWorkflowInput, + AgentWorkflowOutput, + Message, + MessageRole, + TaskConfig, + ToolDefinition, +) +from .workflow import ( + call_llm_activity, + clear_tool_registry, + crewai_agent_workflow, + execute_tool_activity, + register_tool, +) + +if TYPE_CHECKING: + from crewai import Agent, Task + +logger = logging.getLogger(__name__) + + +class DaprWorkflowAgentRunner: + """Runner that executes CrewAI agents as Dapr Workflows. + + This runner wraps a CrewAI Agent and executes it using Dapr Workflows, + making each tool execution a durable activity. This provides: + + - Fault tolerance: Agents automatically resume from the last successful activity + - Durability: Agent state persists and survives process restarts + - Observability: Full visibility into agent execution through Dapr's workflow APIs + + Example: + ```python + from crewai import Agent, Task + from crewai.tools import tool + from dapr.ext.crewai import DaprWorkflowAgentRunner + + @tool("Search the web") + def search_web(query: str) -> str: + return f"Results for: {query}" + + # Create your CrewAI agent + agent = Agent( + role="Research Assistant", + goal="Help users find information", + backstory="An expert researcher", + tools=[search_web], + llm="openai/gpt-4o-mini", + ) + + # Define a task + task = Task( + description="Research the latest AI developments", + expected_output="A summary of recent AI news", + agent=agent, + ) + + # Create runner and start the workflow runtime + runner = DaprWorkflowAgentRunner(agent=agent) + runner.start() + + # Run the agent - each tool call is now durable + async for event in runner.run_async(task=task): + print(event) + + # Shutdown when done + runner.shutdown() + ``` + + Attributes: + agent: The CrewAI Agent to execute + workflow_runtime: The Dapr WorkflowRuntime instance + workflow_client: The Dapr WorkflowClient for managing workflows + """ + + def __init__( + self, + agent: 'Agent', + *, + host: Optional[str] = None, + port: Optional[str] = None, + max_iterations: Optional[int] = None, + ): + """Initialize the runner. + + Args: + agent: The CrewAI Agent to execute + host: Dapr sidecar host (default: localhost) + port: Dapr sidecar port (default: 50001) + max_iterations: Maximum number of LLM call iterations + (default: uses agent's max_iter) + """ + self._agent = agent + self._max_iterations = max_iterations or getattr(agent, 'max_iter', 25) + self._host = host + self._port = port + + # Create workflow runtime + self._workflow_runtime = WorkflowRuntime(host=host, port=port) + + # Register workflow and activities + self._workflow_runtime.register_workflow( + crewai_agent_workflow, name='crewai_agent_workflow' + ) + self._workflow_runtime.register_activity(call_llm_activity, name='call_llm_activity') + self._workflow_runtime.register_activity( + execute_tool_activity, name='execute_tool_activity' + ) + + # Register agent's tools in the global registry + self._register_agent_tools() + + # Create workflow client (for starting/managing workflows) + self._workflow_client: Optional[DaprWorkflowClient] = None + self._started = False + + def _register_agent_tools(self) -> None: + """Register the agent's tools in the global tool registry.""" + clear_tool_registry() + + # Get tools from agent + tools = getattr(self._agent, 'tools', []) or [] + + for tool in tools: + tool_name = self._get_tool_name(tool) + tool_def = self._create_tool_definition(tool) + register_tool(tool_name, tool, tool_def) + logger.info(f'Registered tool: {tool_name}') + + def _get_tool_name(self, tool: Any) -> str: + """Get the name of a tool, sanitized for OpenAI API compatibility. + + OpenAI requires tool names to match pattern: ^[a-zA-Z0-9_-]+$ + CrewAI's @tool decorator sets `name` to the description string, + so we prefer the underlying function name when available. + """ + import re + + name = None + + # Try to get the underlying function name first (for @tool decorated functions) + if hasattr(tool, 'func') and hasattr(tool.func, '__name__'): + name = tool.func.__name__ + elif hasattr(tool, '_run') and hasattr(tool._run, '__name__'): + # For BaseTool subclasses, try to get a meaningful name + name = getattr(tool, 'name', None) + elif hasattr(tool, 'name'): + name = tool.name + elif hasattr(tool, '__name__'): + name = tool.__name__ + else: + name = str(type(tool).__name__) + + # Sanitize the name to match OpenAI's pattern: ^[a-zA-Z0-9_-]+$ + if name: + # Replace spaces and invalid chars with underscores + sanitized = re.sub(r'[^a-zA-Z0-9_-]', '_', name) + # Remove consecutive underscores + sanitized = re.sub(r'_+', '_', sanitized) + # Remove leading/trailing underscores + sanitized = sanitized.strip('_') + if sanitized: + return sanitized + + return 'unknown_tool' + + def _create_tool_definition(self, tool: Any) -> ToolDefinition: + """Create a serializable tool definition from a CrewAI tool.""" + name = self._get_tool_name(tool) + + # Get description - CrewAI @tool decorator stores it in 'description' attribute + # but also the 'name' attribute might contain the description string + description = getattr(tool, 'description', '') or '' + if not description and hasattr(tool, 'name'): + # If name looks like a description (has spaces), use it as description + tool_name = getattr(tool, 'name', '') + if ' ' in tool_name: + description = tool_name + + # Also try to get docstring from underlying function + if not description: + if hasattr(tool, 'func') and tool.func.__doc__: + description = tool.func.__doc__ + elif hasattr(tool, '_run') and tool._run.__doc__: + description = tool._run.__doc__ + + result_as_answer = getattr(tool, 'result_as_answer', False) + + # Try to extract parameters schema + parameters = None + if hasattr(tool, 'args_schema'): + schema = tool.args_schema + if hasattr(schema, 'model_json_schema'): + try: + parameters = schema.model_json_schema() + except Exception: + pass + elif hasattr(schema, 'schema'): + try: + parameters = schema.schema() + except Exception: + pass + + return ToolDefinition( + name=name, + description=description, + parameters=parameters, + result_as_answer=result_as_answer, + ) + + def _get_agent_config(self) -> AgentConfig: + """Extract serializable agent configuration.""" + # Get tools + tools = getattr(self._agent, 'tools', []) or [] + tool_definitions = [] + + for tool in tools: + tool_definitions.append(self._create_tool_definition(tool)) + + # Get model name + model = 'gpt-4o-mini' # Default + if hasattr(self._agent, 'llm'): + llm = self._agent.llm + if isinstance(llm, str): + model = llm + elif hasattr(llm, 'model_name'): + model = llm.model_name + elif hasattr(llm, 'model'): + model = str(llm.model) + + return AgentConfig( + role=self._safe_str(self._agent.role) or '', + goal=self._safe_str(self._agent.goal) or '', + backstory=self._safe_str(self._agent.backstory) or '', + model=model, + tool_definitions=tool_definitions, + max_iter=self._safe_int(getattr(self._agent, 'max_iter', 25), 25), + verbose=self._safe_bool(getattr(self._agent, 'verbose', False)), + allow_delegation=self._safe_bool(getattr(self._agent, 'allow_delegation', False)), + system_template=self._safe_str(getattr(self._agent, 'system_template', None)), + prompt_template=self._safe_str(getattr(self._agent, 'prompt_template', None)), + response_template=self._safe_str(getattr(self._agent, 'response_template', None)), + ) + + def _get_task_config(self, task: 'Task') -> TaskConfig: + """Extract serializable task configuration.""" + return TaskConfig( + description=self._safe_str(task.description), + expected_output=self._safe_str(task.expected_output), + context=self._safe_str(getattr(task, 'context', None)), + ) + + def _safe_str(self, value: Any) -> Optional[str]: + """Safely convert a value to string, handling CrewAI's _NotSpecified sentinel. + + CrewAI uses _NotSpecified as a sentinel value for unset optional fields. + This method converts such values to None for JSON serialization. + """ + if value is None: + return None + # Check for CrewAI's _NotSpecified sentinel + if type(value).__name__ == '_NotSpecified': + return None + if isinstance(value, str): + return value + # For other types, convert to string + return str(value) + + def _safe_int(self, value: Any, default: int) -> int: + """Safely convert a value to int, handling CrewAI's _NotSpecified sentinel.""" + if value is None or type(value).__name__ == '_NotSpecified': + return default + if isinstance(value, int): + return value + try: + return int(value) + except (ValueError, TypeError): + return default + + def _safe_bool(self, value: Any) -> bool: + """Safely convert a value to bool, handling CrewAI's _NotSpecified sentinel.""" + if value is None or type(value).__name__ == '_NotSpecified': + return False + return bool(value) + + def start(self) -> None: + """Start the workflow runtime. + + This must be called before running any workflows. It starts listening + for workflow work items in the background. + """ + if self._started: + return + + self._workflow_runtime.start() + self._workflow_client = DaprWorkflowClient(host=self._host, port=self._port) + self._started = True + logger.info('Dapr Workflow runtime started') + + def shutdown(self) -> None: + """Shutdown the workflow runtime. + + Call this when you're done running workflows to clean up resources. + """ + if not self._started: + return + + self._workflow_runtime.shutdown() + self._started = False + logger.info('Dapr Workflow runtime stopped') + + async def run_async( + self, + task: 'Task', + *, + session_id: Optional[str] = None, + workflow_id: Optional[str] = None, + poll_interval: float = 0.5, + ) -> AsyncIterator[dict[str, Any]]: + """Run the agent with a task. + + This starts a new Dapr Workflow for the agent execution. Each tool + execution becomes a durable activity within the workflow. + + Args: + task: The CrewAI Task to execute + session_id: Session ID for the execution + workflow_id: Optional workflow instance ID (generated if not provided) + poll_interval: How often to poll for workflow status (seconds) + + Yields: + Event dictionaries with workflow progress updates + + Raises: + RuntimeError: If the runner hasn't been started + """ + if not self._started: + raise RuntimeError('Runner not started. Call start() first.') + + # Generate session ID if not provided + if session_id is None: + session_id = uuid.uuid4().hex[:8] + + # Generate workflow ID if not provided + if workflow_id is None: + workflow_id = f'crewai-{session_id}-{uuid.uuid4().hex[:8]}' + + # Create initial user message from task + messages = [ + Message( + role=MessageRole.USER, + content=f'Please complete the following task:\n\n{task.description}\n\nExpected output: {task.expected_output}', + ) + ] + + # Create workflow input + workflow_input = AgentWorkflowInput( + agent_config=self._get_agent_config(), + task_config=self._get_task_config(task), + messages=messages, + session_id=session_id, + iteration=0, + max_iterations=self._max_iterations, + ) + + # Convert to dict and verify JSON serializable + workflow_input_dict = workflow_input.to_dict() + json.dumps(workflow_input_dict) # Validate serialization + + # Start the workflow + logger.info(f'Starting workflow: {workflow_id}') + self._workflow_client.schedule_new_workflow( + workflow=crewai_agent_workflow, + input=workflow_input_dict, + instance_id=workflow_id, + ) + + # Yield start event + yield { + 'type': 'workflow_started', + 'workflow_id': workflow_id, + 'session_id': session_id, + 'agent_role': self._agent.role, + } + + # Poll for workflow completion + import asyncio + + previous_status = None + + while True: + await asyncio.sleep(poll_interval) + + state = self._workflow_client.get_workflow_state(instance_id=workflow_id) + + if state is None: + yield { + 'type': 'workflow_error', + 'workflow_id': workflow_id, + 'error': 'Workflow state not found', + } + break + + # Yield status change events + if state.runtime_status != previous_status: + yield { + 'type': 'workflow_status_changed', + 'workflow_id': workflow_id, + 'status': str(state.runtime_status), + 'custom_status': state.serialized_custom_status, + } + previous_status = state.runtime_status + + # Check for completion + if state.runtime_status == WorkflowStatus.COMPLETED: + output_data = state.serialized_output + if output_data: + try: + output_dict = ( + json.loads(output_data) if isinstance(output_data, str) else output_data + ) + output = AgentWorkflowOutput.from_dict(output_dict) + + yield { + 'type': 'workflow_completed', + 'workflow_id': workflow_id, + 'final_response': output.final_response, + 'iterations': output.iterations, + 'status': output.status, + } + except Exception as e: + yield { + 'type': 'workflow_completed', + 'workflow_id': workflow_id, + 'raw_output': output_data, + 'parse_error': str(e), + } + else: + yield { + 'type': 'workflow_completed', + 'workflow_id': workflow_id, + } + break + + elif state.runtime_status == WorkflowStatus.FAILED: + error_info = None + if state.failure_details: + fd = state.failure_details + error_info = { + 'message': getattr(fd, 'message', str(fd)), + 'error_type': getattr(fd, 'error_type', None), + 'stack_trace': getattr(fd, 'stack_trace', None), + } + yield { + 'type': 'workflow_failed', + 'workflow_id': workflow_id, + 'error': error_info, + } + break + + elif state.runtime_status == WorkflowStatus.TERMINATED: + yield { + 'type': 'workflow_terminated', + 'workflow_id': workflow_id, + } + break + + def run_sync( + self, + task: 'Task', + *, + session_id: Optional[str] = None, + workflow_id: Optional[str] = None, + timeout: float = 300.0, + ) -> AgentWorkflowOutput: + """Run the agent synchronously and wait for completion. + + This is a convenience method that wraps run_async and waits for + the workflow to complete. + + Args: + task: The CrewAI Task to execute + session_id: Session ID for the execution + workflow_id: Optional workflow instance ID (generated if not provided) + timeout: Maximum time to wait for completion (seconds) + + Returns: + AgentWorkflowOutput with the final result + + Raises: + RuntimeError: If the runner hasn't been started + TimeoutError: If the workflow doesn't complete in time + """ + import asyncio + + async def _run(): + result = None + async for event in self.run_async( + task=task, + session_id=session_id, + workflow_id=workflow_id, + ): + if event['type'] == 'workflow_completed': + result = AgentWorkflowOutput( + final_response=event.get('final_response'), + messages=[], # Not included in event + iterations=event.get('iterations', 0), + status=event.get('status', 'completed'), + ) + elif event['type'] == 'workflow_failed': + error = event.get('error', {}) + raise RuntimeError(f'Workflow failed: {error.get("message", "Unknown error")}') + elif event['type'] == 'workflow_error': + raise RuntimeError(f'Workflow error: {event.get("error")}') + return result + + loop = asyncio.new_event_loop() + try: + return loop.run_until_complete(asyncio.wait_for(_run(), timeout=timeout)) + finally: + loop.close() + + def get_workflow_status(self, workflow_id: str) -> Optional[dict[str, Any]]: + """Get the status of a workflow. + + Args: + workflow_id: The workflow instance ID + + Returns: + Dictionary with workflow status or None if not found + """ + if not self._started: + raise RuntimeError('Runner not started. Call start() first.') + + state = self._workflow_client.get_workflow_state(instance_id=workflow_id) + if state is None: + return None + + return { + 'workflow_id': workflow_id, + 'status': str(state.runtime_status), + 'custom_status': state.serialized_custom_status, + 'created_at': str(state.created_at) if state.created_at else None, + 'last_updated_at': str(state.last_updated_at) if state.last_updated_at else None, + } + + def terminate_workflow(self, workflow_id: str) -> None: + """Terminate a running workflow. + + Args: + workflow_id: The workflow instance ID + """ + if not self._started: + raise RuntimeError('Runner not started. Call start() first.') + + self._workflow_client.terminate_workflow(instance_id=workflow_id) + logger.info(f'Terminated workflow: {workflow_id}') + + def purge_workflow(self, workflow_id: str) -> None: + """Purge a completed or terminated workflow. + + This removes all workflow state from the state store. + + Args: + workflow_id: The workflow instance ID + """ + if not self._started: + raise RuntimeError('Runner not started. Call start() first.') + + self._workflow_client.purge_workflow(instance_id=workflow_id) + logger.info(f'Purged workflow: {workflow_id}') + + @property + def agent(self) -> 'Agent': + """The CrewAI agent being executed.""" + return self._agent + + @property + def is_running(self) -> bool: + """Whether the workflow runtime is running.""" + return self._started diff --git a/ext/dapr-ext-crewai/dapr/ext/crewai/version.py b/ext/dapr-ext-crewai/dapr/ext/crewai/version.py new file mode 100644 index 000000000..88bd3bb82 --- /dev/null +++ b/ext/dapr-ext-crewai/dapr/ext/crewai/version.py @@ -0,0 +1,16 @@ +# -*- coding: utf-8 -*- + +""" +Copyright 2025 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" + +__version__ = '0.1.0.dev' diff --git a/ext/dapr-ext-crewai/dapr/ext/crewai/workflow.py b/ext/dapr-ext-crewai/dapr/ext/crewai/workflow.py new file mode 100644 index 000000000..1d6efb2a5 --- /dev/null +++ b/ext/dapr-ext-crewai/dapr/ext/crewai/workflow.py @@ -0,0 +1,506 @@ +# -*- coding: utf-8 -*- + +""" +Copyright 2025 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" + +# Dapr Workflow definitions for durable CrewAI agent execution. + +import json +import logging +from datetime import timedelta +from typing import Any, Optional + +from dapr.ext.workflow import ( + DaprWorkflowContext, + RetryPolicy, + WorkflowActivityContext, +) + +from .models import ( + AgentWorkflowInput, + AgentWorkflowOutput, + CallLlmInput, + CallLlmOutput, + ExecuteToolInput, + ExecuteToolOutput, + Message, + MessageRole, + ToolCall, + ToolDefinition, + ToolResult, +) + +logger = logging.getLogger(__name__) + + +# Global tool registry - tools are registered by the runner +_tool_registry: dict[str, Any] = {} +_tool_definitions: dict[str, ToolDefinition] = {} + + +def register_tool(name: str, tool: Any, definition: Optional[ToolDefinition] = None) -> None: + """Register a tool for use by the execute_tool activity. + + Args: + name: The tool name + tool: The actual tool object (CrewAI BaseTool or callable) + definition: Optional serializable tool definition + """ + _tool_registry[name] = tool + if definition: + _tool_definitions[name] = definition + + +def get_registered_tool(name: str) -> Optional[Any]: + """Get a registered tool by name.""" + return _tool_registry.get(name) + + +def get_tool_definition(name: str) -> Optional[ToolDefinition]: + """Get a tool definition by name.""" + return _tool_definitions.get(name) + + +def clear_tool_registry() -> None: + """Clear all registered tools.""" + _tool_registry.clear() + _tool_definitions.clear() + + +def crewai_agent_workflow(ctx: DaprWorkflowContext, input_data: dict[str, Any]): + """Dapr Workflow that orchestrates a CrewAI agent execution. + + This workflow: + 1. Calls the LLM to get the next action (as an activity) + 2. If the LLM returns tool calls, executes each tool (as separate activities) + 3. Loops back until the LLM returns a final response + + All iterations run within a single workflow instance, making the entire + agent execution durable and resumable. + + Args: + ctx: The Dapr workflow context + input_data: Dictionary containing AgentWorkflowInput data + + Returns: + AgentWorkflowOutput as a dictionary + """ + # Deserialize input + workflow_input = AgentWorkflowInput.from_dict(input_data) + + # Retry policy for activities + retry_policy = RetryPolicy( + max_number_of_attempts=3, + first_retry_interval=timedelta(seconds=1), + backoff_coefficient=2.0, + max_retry_interval=timedelta(seconds=30), + ) + + iteration = workflow_input.iteration + + # Main agent loop - runs until final response or max iterations + while iteration < workflow_input.max_iterations: + # Activity: Call LLM to get next action + llm_input = CallLlmInput( + agent_config=workflow_input.agent_config, + task_config=workflow_input.task_config, + messages=workflow_input.messages, + ) + + llm_output_data = yield ctx.call_activity( + call_llm_activity, + input=llm_input.to_dict(), + retry_policy=retry_policy, + ) + llm_output = CallLlmOutput.from_dict(llm_output_data) + + # Handle LLM errors + if llm_output.error: + return AgentWorkflowOutput( + final_response=None, + messages=workflow_input.messages, + iterations=iteration, + status='error', + error=llm_output.error, + ).to_dict() + + # Add LLM response to messages + workflow_input.messages.append(llm_output.message) + + # If this is a final response (no tool calls), return + if llm_output.is_final: + return AgentWorkflowOutput( + final_response=llm_output.message.content, + messages=workflow_input.messages, + iterations=iteration + 1, + status='completed', + ).to_dict() + + # Execute each tool call sequentially (CrewAI executes one at a time + # to allow the LLM to reflect on each result) + for tool_call in llm_output.message.tool_calls: + tool_input = ExecuteToolInput( + tool_call=tool_call, + agent_role=workflow_input.agent_config.role, + session_id=workflow_input.session_id, + ) + + tool_output_data = yield ctx.call_activity( + execute_tool_activity, + input=tool_input.to_dict(), + retry_policy=retry_policy, + ) + tool_output = ExecuteToolOutput.from_dict(tool_output_data) + + # Create tool result message + tool_result_message = Message( + role=MessageRole.TOOL, + content=str(tool_output.tool_result.result) + if tool_output.tool_result.result + else None, + tool_call_id=tool_output.tool_result.tool_call_id, + name=tool_output.tool_result.tool_name, + ) + workflow_input.messages.append(tool_result_message) + + # If tool has result_as_answer, return immediately + if tool_output.result_as_answer: + return AgentWorkflowOutput( + final_response=str(tool_output.tool_result.result), + messages=workflow_input.messages, + iterations=iteration + 1, + status='completed', + ).to_dict() + + # Increment iteration counter + iteration += 1 + + # Max iterations reached + return AgentWorkflowOutput( + final_response=None, + messages=workflow_input.messages, + iterations=iteration, + status='max_iterations_reached', + error=f'Max iterations ({workflow_input.max_iterations}) reached', + ).to_dict() + + +def call_llm_activity(ctx: WorkflowActivityContext, input_data: dict[str, Any]) -> dict[str, Any]: + """Activity that calls the LLM model to decide the next action. + + This activity uses the LiteLLM library (via CrewAI's LLM class) to call + the configured model with the conversation history and tool definitions. + + Args: + ctx: The workflow activity context + input_data: Dictionary containing CallLlmInput data + + Returns: + CallLlmOutput as a dictionary + """ + llm_input = CallLlmInput.from_dict(input_data) + + try: + # Import LiteLLM for model calls + from litellm import completion + + # Build messages for the LLM + messages = [] + + # Add system message with agent context + system_content = _build_system_prompt(llm_input.agent_config, llm_input.task_config) + messages.append( + { + 'role': 'system', + 'content': system_content, + } + ) + + # Convert workflow messages to LiteLLM format + for msg in llm_input.messages: + if msg.role == MessageRole.USER: + messages.append( + { + 'role': 'user', + 'content': msg.content or '', + } + ) + elif msg.role == MessageRole.ASSISTANT: + assistant_msg: dict[str, Any] = { + 'role': 'assistant', + } + if msg.content: + assistant_msg['content'] = msg.content + if msg.tool_calls: + assistant_msg['tool_calls'] = [ + { + 'id': tc.id, + 'type': 'function', + 'function': { + 'name': tc.name, + 'arguments': json.dumps(tc.args), + }, + } + for tc in msg.tool_calls + ] + messages.append(assistant_msg) + elif msg.role == MessageRole.TOOL: + messages.append( + { + 'role': 'tool', + 'tool_call_id': msg.tool_call_id, + 'name': msg.name, + 'content': msg.content or '', + } + ) + + # Build tool definitions for the LLM + tools = [] + for tool_def in llm_input.agent_config.tool_definitions: + tool_schema = { + 'type': 'function', + 'function': { + 'name': tool_def.name, + 'description': tool_def.description, + }, + } + if tool_def.parameters: + tool_schema['function']['parameters'] = tool_def.parameters + else: + # Default to empty object schema if no parameters defined + tool_schema['function']['parameters'] = { + 'type': 'object', + 'properties': {}, + } + tools.append(tool_schema) + + # Call the LLM + response = completion( + model=llm_input.agent_config.model, + messages=messages, + tools=tools if tools else None, + ) + + # Parse response + choice = response.choices[0] + response_message = choice.message + + # Extract tool calls + tool_calls = [] + if response_message.tool_calls: + for tc in response_message.tool_calls: + args = tc.function.arguments + if isinstance(args, str): + try: + args = json.loads(args) + except json.JSONDecodeError: + args = {} + + tool_calls.append( + ToolCall( + id=tc.id, + name=tc.function.name, + args=args, + ) + ) + + # Create response message + output_message = Message( + role=MessageRole.ASSISTANT, + content=response_message.content, + tool_calls=tool_calls, + ) + + # Determine if this is a final response (no tool calls) + is_final = len(tool_calls) == 0 + + return CallLlmOutput( + message=output_message, + is_final=is_final, + ).to_dict() + + except Exception as e: + logger.error(f'Error calling LLM: {e}') + import traceback + + traceback.print_exc() + return CallLlmOutput( + message=Message(role=MessageRole.ASSISTANT), + is_final=True, + error=str(e), + ).to_dict() + + +def _build_system_prompt(agent_config, task_config) -> str: + """Build the system prompt for the agent.""" + # Use custom template if provided, otherwise build default + if agent_config.system_template: + return agent_config.system_template.format( + role=agent_config.role, + goal=agent_config.goal, + backstory=agent_config.backstory, + task_description=task_config.description, + expected_output=task_config.expected_output, + ) + + return f"""You are {agent_config.role}. + +Your Goal: {agent_config.goal} + +Background: {agent_config.backstory} + +Current Task: +{task_config.description} + +Expected Output: +{task_config.expected_output} + +{f'Additional Context: {task_config.context}' if task_config.context else ''} + +Instructions: +- Use the available tools when needed to complete the task +- Think step by step about what you need to do +- When you have the final answer, provide it clearly without using tools +- Be concise but thorough in your responses""" + + +def execute_tool_activity( + ctx: WorkflowActivityContext, input_data: dict[str, Any] +) -> dict[str, Any]: + """Activity that executes a single CrewAI tool. + + This activity: + 1. Gets the tool from the registry + 2. Executes the tool with the provided arguments + 3. Returns the result + + Args: + ctx: The workflow activity context + input_data: Dictionary containing ExecuteToolInput data + + Returns: + ExecuteToolOutput as a dictionary + """ + tool_input = ExecuteToolInput.from_dict(input_data) + tool_call = tool_input.tool_call + + # Get the tool from registry + tool = get_registered_tool(tool_call.name) + tool_def = get_tool_definition(tool_call.name) + + if tool is None: + return ExecuteToolOutput( + tool_result=ToolResult( + tool_call_id=tool_call.id, + tool_name=tool_call.name, + result=None, + error=f"Tool '{tool_call.name}' not found in registry", + ) + ).to_dict() + + try: + # Execute the tool based on its type + result = _execute_tool(tool, tool_call.args) + + # Serialize result + if hasattr(result, 'model_dump'): + result = result.model_dump() + elif hasattr(result, 'to_dict'): + result = result.to_dict() + elif not isinstance(result, (str, int, float, bool, list, dict, type(None))): + result = str(result) + + # Check if this tool's result should be the final answer + result_as_answer = tool_def.result_as_answer if tool_def else False + + return ExecuteToolOutput( + tool_result=ToolResult( + tool_call_id=tool_call.id, + tool_name=tool_call.name, + result=result, + ), + result_as_answer=result_as_answer, + ).to_dict() + + except Exception as e: + logger.error(f"Error executing tool '{tool_call.name}': {e}") + import traceback + + traceback.print_exc() + return ExecuteToolOutput( + tool_result=ToolResult( + tool_call_id=tool_call.id, + tool_name=tool_call.name, + result=None, + error=str(e), + ) + ).to_dict() + + +def _execute_tool(tool: Any, args: dict[str, Any]) -> Any: + """Execute a tool and return the result. + + Handles different tool types: + - CrewAI BaseTool instances (with run() or _run() methods) + - Callable functions + - LangChain-style tools + """ + import asyncio + + # Check if it's a CrewAI BaseTool + if hasattr(tool, '_run'): + # CrewAI BaseTool - call _run directly + result = tool._run(**args) + if asyncio.iscoroutine(result): + loop = asyncio.new_event_loop() + try: + result = loop.run_until_complete(result) + finally: + loop.close() + return result + + elif hasattr(tool, 'run'): + # Tool with run method + result = tool.run(**args) + if asyncio.iscoroutine(result): + loop = asyncio.new_event_loop() + try: + result = loop.run_until_complete(result) + finally: + loop.close() + return result + + elif hasattr(tool, 'invoke'): + # LangChain-style tool + result = tool.invoke(args) + if asyncio.iscoroutine(result): + loop = asyncio.new_event_loop() + try: + result = loop.run_until_complete(result) + finally: + loop.close() + return result + + elif callable(tool): + # Plain callable + result = tool(**args) + if asyncio.iscoroutine(result): + loop = asyncio.new_event_loop() + try: + result = loop.run_until_complete(result) + finally: + loop.close() + return result + + else: + raise TypeError(f'Tool {tool} is not callable and has no run/_run/invoke method') diff --git a/ext/dapr-ext-crewai/examples/simple_agent.py b/ext/dapr-ext-crewai/examples/simple_agent.py new file mode 100644 index 000000000..8b45fc53c --- /dev/null +++ b/ext/dapr-ext-crewai/examples/simple_agent.py @@ -0,0 +1,147 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +""" +Copyright 2025 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" + +# Example: Simple CrewAI Agent with Dapr Workflows +# +# This example demonstrates how to run a CrewAI agent with durable tool execution +# using Dapr Workflows. Each tool call is executed as a workflow activity, +# providing fault tolerance and durability. +# +# Prerequisites: +# 1. Dapr installed and initialized: dapr init +# 2. Required packages: pip install dapr-ext-crewai crewai +# +# Run with Dapr: +# dapr run --app-id crewai-agent --dapr-grpc-port 50001 -- python simple_agent.py + +import asyncio +import os + +from crewai import Agent, Task +from crewai.tools import tool +from dapr.ext.crewai import DaprWorkflowAgentRunner + + +# Define tools using CrewAI's @tool decorator +@tool('Get the current weather for a city') +def get_weather(city: str) -> str: + """Get the current weather for a specified city.""" + # In a real application, this would call a weather API + weather_data = { + 'Tokyo': 'Sunny, 22°C', + 'London': 'Cloudy, 15°C', + 'New York': 'Partly cloudy, 18°C', + 'Paris': 'Rainy, 12°C', + } + return weather_data.get(city, f'Weather data not available for {city}') + + +@tool('Search for information on the web') +def search_web(query: str) -> str: + """Search the web for information on a given topic.""" + # In a real application, this would call a search API + return f"Search results for '{query}': Found 10 relevant articles about {query}." + + +@tool('Get the current date and time') +def get_datetime() -> str: + """Get the current date and time.""" + from datetime import datetime + + return datetime.now().strftime('%Y-%m-%d %H:%M:%S') + + +async def main(): + # Create a CrewAI agent with tools + agent = Agent( + role='Research Assistant', + goal='Help users find accurate and up-to-date information', + backstory="""You are an expert research assistant with access to various + information sources. You excel at finding and synthesizing information + to provide comprehensive answers to user queries.""", + tools=[get_weather, search_web, get_datetime], + llm=os.getenv('CREWAI_LLM', 'openai/gpt-4o-mini'), + verbose=True, + ) + + # Define a task for the agent + task = Task( + description="""Find out the current weather in Tokyo and search for + recent news about AI developments. Provide a brief summary.""", + expected_output="""A summary containing: + 1. Current weather in Tokyo + 2. Key recent AI news highlights""", + agent=agent, + ) + + # Create the Dapr Workflow runner + runner = DaprWorkflowAgentRunner( + agent=agent, + max_iterations=10, + ) + + try: + # Start the workflow runtime + print('Starting Dapr Workflow runtime...') + runner.start() + print('Runtime started successfully!') + + # Run the agent + print('\nExecuting agent task...') + print('=' * 60) + + # Process events as they come + async for event in runner.run_async(task=task): + event_type = event['type'] + + if event_type == 'workflow_started': + print(f'\nWorkflow started: {event.get("workflow_id")}') + print(f'Agent role: {event.get("agent_role")}') + + elif event_type == 'workflow_status_changed': + print(f'Status: {event.get("status")}') + + elif event_type == 'workflow_completed': + print('\n' + '=' * 60) + print('AGENT COMPLETED') + print('=' * 60) + print(f'Iterations: {event.get("iterations")}') + print(f'Status: {event.get("status")}') + print('\nFinal Response:') + print('-' * 40) + print(event.get('final_response', 'No response')) + + elif event_type == 'workflow_failed': + print(f'\nWorkflow FAILED: {event.get("error")}') + + elif event_type == 'workflow_error': + print(f'\nWorkflow ERROR: {event.get("error")}') + + except Exception as e: + print(f'Error: {e}') + import traceback + + traceback.print_exc() + + finally: + # Shutdown the runtime + print('\nShutting down Dapr Workflow runtime...') + runner.shutdown() + print('Done!') + + +if __name__ == '__main__': + asyncio.run(main()) diff --git a/ext/dapr-ext-crewai/examples/test_crash_recovery.py b/ext/dapr-ext-crewai/examples/test_crash_recovery.py new file mode 100644 index 000000000..4f4c6e522 --- /dev/null +++ b/ext/dapr-ext-crewai/examples/test_crash_recovery.py @@ -0,0 +1,279 @@ +# Copyright 2025 The Dapr Authors +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Test script to verify Dapr workflow crash recovery for CrewAI agents. + +This test: +1. Creates an agent with a prompt that requires calling 3 tools in sequence +2. Crashes the process during tool 2 execution (after tool 1 completes) +3. On restart, Dapr automatically resumes the workflow and completes it + +Usage: + # Clean up any previous test state first: + rm -f /tmp/crewai_crash_test_state.json + + # First run (will crash during tool 2): + dapr run --app-id crewai-crash-test --dapr-grpc-port 50001 -- python test_crash_recovery.py + + # Second run (Dapr auto-resumes and completes): + dapr run --app-id crewai-crash-test --dapr-grpc-port 50001 -- python test_crash_recovery.py +""" + +import asyncio +import json +import os +from pathlib import Path + +from crewai import Agent, Task +from crewai.tools import tool +from dapr.ext.crewai import DaprWorkflowAgentRunner +from dapr.ext.workflow import WorkflowStatus + + +def log(msg: str): + """Print with immediate flush.""" + print(msg, flush=True) + + +# State file to track execution across crashes +STATE_FILE = Path('/tmp/crewai_crash_test_state.json') + + +def load_state() -> dict: + """Load the test state from file.""" + if STATE_FILE.exists(): + with open(STATE_FILE, 'r') as f: + return json.load(f) + return { + 'run_count': 0, + 'tool1_executed': False, + 'tool2_executed': False, + 'tool3_executed': False, + 'workflow_scheduled': False, + 'workflow_id': None, # Store the actual workflow ID + } + + +def save_state(state: dict): + """Save the test state to file.""" + with open(STATE_FILE, 'w') as f: + json.dump(state, f, indent=2) + + +# Global state for this run +state = load_state() +state['run_count'] += 1 +save_state(state) + +log(f'\n{"=" * 60}') +log(f'RUN #{state["run_count"]}') +log(f'{"=" * 60}') +log( + f'Previous state: tool1={state["tool1_executed"]}, ' + f'tool2={state["tool2_executed"]}, tool3={state["tool3_executed"]}' +) +log(f'Workflow previously scheduled: {state["workflow_scheduled"]}') +log(f'Saved workflow_id: {state.get("workflow_id")}') +log(f'{"=" * 60}\n') + + +@tool('Step one - Initialize data') +def tool_step_one(input_data: str) -> str: + """Initialize data for the workflow. This is the first step.""" + log(f'\n>>> TOOL 1 EXECUTING: input={input_data}') + state['tool1_executed'] = True + save_state(state) + log('>>> TOOL 1 COMPLETED SUCCESSFULLY') + return f"Step 1 completed: Initialized with '{input_data}'. Now call step_two___process_data." + + +@tool('Step two - Process data') +def tool_step_two(data: str) -> str: + """Process the data from step one. This is the second step.""" + log(f'\n>>> TOOL 2 EXECUTING: data={data}') + + # On first run, crash during tool 2 (after tool 1 completed) + if state['run_count'] == 1: + log('>>> TOOL 2: SIMULATING CRASH!') + log('>>> The process will now terminate...') + log('>>> Run the program again to test recovery.\n') + os._exit(1) + + state['tool2_executed'] = True + save_state(state) + log('>>> TOOL 2 COMPLETED SUCCESSFULLY') + return f"Step 2 completed: Processed '{data}'. Now call step_three___finalize_results." + + +@tool('Step three - Finalize results') +def tool_step_three(processed_data: str) -> str: + """Finalize and return the results. This is the third and final step.""" + log(f'\n>>> TOOL 3 EXECUTING: processed_data={processed_data}') + state['tool3_executed'] = True + save_state(state) + log('>>> TOOL 3 COMPLETED SUCCESSFULLY') + return f"Step 3 completed: Final result based on '{processed_data}'. All steps done!" + + +# Create the agent with all three tools +agent = Agent( + role='Sequential Task Processor', + goal='Execute exactly three tools in sequence: step_one, step_two, step_three', + backstory="""You are a sequential task processor that MUST call tools in a specific order. + You MUST call all three tools in sequence: + 1. First call 'step_one___initialize_data' with the input + 2. Then call 'step_two___process_data' with the output from step 1 + 3. Finally call 'step_three___finalize_results' with the output from step 2 + + Do NOT skip any steps. Each tool must be called exactly once in order.""", + tools=[tool_step_one, tool_step_two, tool_step_three], + llm='openai/gpt-4o-mini', + verbose=True, +) + +# Create a task that requires all three tools +task = Task( + description="""Process the input "test_data_123" through all three steps. + + You MUST: + 1. Call step_one___initialize_data with "test_data_123" + 2. Call step_two___process_data with the result from step 1 + 3. Call step_three___finalize_results with the result from step 2 + + Call each tool exactly once in sequence.""", + expected_output="""A summary confirming all three steps completed.""", + agent=agent, +) + + +async def main(): + """Run the crash recovery test.""" + + runner = DaprWorkflowAgentRunner( + agent=agent, + max_iterations=10, + ) + + try: + # Start the runtime - this will auto-resume any in-progress agents + runner.start() + log('agent runtime started') + await asyncio.sleep(1) + + # Only schedule a new run if we haven't already + if not state['workflow_scheduled']: + log('Scheduling new workflow...') + async for event in runner.run_async(task=task): + event_type = event['type'] + log(f'Event: {event_type}') + if event_type == 'workflow_started': + # Save the actual workflow_id for polling on restart + actual_workflow_id = event.get('workflow_id') + state['workflow_scheduled'] = True + state['workflow_id'] = actual_workflow_id + save_state(state) + log(f'Workflow started: {actual_workflow_id}') + elif event_type == 'workflow_status_changed': + log(f'Status: {event.get("status")}') + elif event_type == 'workflow_completed': + print_completion(event) + break + elif event_type == 'workflow_failed': + log(f'\nWorkflow FAILED: {event.get("error")}') + break + else: + # Workflow was already scheduled - poll using the saved workflow_id + saved_workflow_id = state.get('workflow_id') + log(f'Workflow already scheduled. Polling for completion: {saved_workflow_id}') + await poll_for_completion(runner, saved_workflow_id) + + except KeyboardInterrupt: + log('\nInterrupted by user') + finally: + runner.shutdown() + log('Workflow runtime stopped') + + +async def poll_for_completion(runner: DaprWorkflowAgentRunner, workflow_id: str): + """Poll an existing workflow until it completes.""" + from dapr.ext.crewai.models import AgentWorkflowOutput + + if not workflow_id: + log('No workflow_id saved - cannot poll!') + return + + previous_status = None + while True: + await asyncio.sleep(1.0) + workflow_state = runner._workflow_client.get_workflow_state(instance_id=workflow_id) + + if workflow_state is None: + log('Workflow state not found!') + break + + if workflow_state.runtime_status != previous_status: + log(f'Workflow status: {workflow_state.runtime_status}') + previous_status = workflow_state.runtime_status + + if workflow_state.runtime_status == WorkflowStatus.COMPLETED: + output_data = workflow_state.serialized_output + if output_data: + output_dict = ( + json.loads(output_data) if isinstance(output_data, str) else output_data + ) + output = AgentWorkflowOutput.from_dict(output_dict) + print_completion( + { + 'final_response': output.final_response, + 'iterations': output.iterations, + } + ) + break + elif workflow_state.runtime_status == WorkflowStatus.FAILED: + log(f'\nWorkflow FAILED: {workflow_state.failure_details}') + break + elif workflow_state.runtime_status == WorkflowStatus.TERMINATED: + log('\nWorkflow was TERMINATED') + break + + +def print_completion(event: dict): + """Print completion summary and verification.""" + log(f'\n{"=" * 60}') + log('WORKFLOW COMPLETED!') + log(f'{"=" * 60}') + log(f'Final Response:\n{event.get("final_response")}') + log(f'Iterations: {event.get("iterations")}') + + # Reload state to get latest + final_state = load_state() + log(f'\n{"=" * 60}') + log('VERIFICATION:') + log(f'{"=" * 60}') + log(f'Tool 1 executed: {final_state["tool1_executed"]}') + log(f'Tool 2 executed: {final_state["tool2_executed"]}') + log(f'Tool 3 executed: {final_state["tool3_executed"]}') + log(f'Total runs: {final_state["run_count"]}') + + if final_state['run_count'] >= 2 and all( + [ + final_state['tool1_executed'], + final_state['tool2_executed'], + final_state['tool3_executed'], + ] + ): + log('\n>>> TEST PASSED: Crash recovery worked!') + log('>>> Workflow resumed after crash and completed all tools.') + + +if __name__ == '__main__': + asyncio.run(main()) diff --git a/ext/dapr-ext-crewai/setup.cfg b/ext/dapr-ext-crewai/setup.cfg new file mode 100644 index 000000000..ab5452ffc --- /dev/null +++ b/ext/dapr-ext-crewai/setup.cfg @@ -0,0 +1,41 @@ +[metadata] +url = https://dapr.io/ +author = Dapr Authors +author_email = daprweb@microsoft.com +license = Apache +license_file = LICENSE +classifiers = + Development Status :: 3 - Alpha + Intended Audience :: Developers + License :: OSI Approved :: Apache Software License + Operating System :: OS Independent + Programming Language :: Python + Programming Language :: Python :: 3.10 + Programming Language :: Python :: 3.11 + Programming Language :: Python :: 3.12 + Programming Language :: Python :: 3.13 +project_urls = + Documentation = https://github.com/dapr/docs + Source = https://github.com/dapr/python-sdk + +[options] +python_requires = >=3.10 +packages = find_namespace: +include_package_data = True +install_requires = + dapr >= 1.16.0 + dapr-ext-workflow >= 1.16.0 + crewai >= 0.80.0 + litellm >= 1.0.0 + protobuf >= 6.31.1 + +[options.packages.find] +include = + dapr.* + +exclude = + tests + +[options.package_data] +dapr.ext.crewai = + py.typed diff --git a/ext/dapr-ext-crewai/setup.py b/ext/dapr-ext-crewai/setup.py new file mode 100644 index 000000000..0e3bdb0f5 --- /dev/null +++ b/ext/dapr-ext-crewai/setup.py @@ -0,0 +1,71 @@ +# -*- coding: utf-8 -*- + +""" +Copyright 2025 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" + +import os + +from setuptools import setup + +# Load version in dapr package. +version_info = {} +with open('dapr/ext/crewai/version.py') as fp: + exec(fp.read(), version_info) +__version__ = version_info['__version__'] + + +def is_release(): + return '.dev' not in __version__ + + +name = 'dapr-ext-crewai' +version = __version__ +description = 'The official release of Dapr Python SDK CrewAI Extension.' +long_description = """ +This is the Dapr Workflow extension for CrewAI agents. + +This extension enables durable execution of CrewAI agents using Dapr Workflows. +Each tool execution in an agent runs as a separate Dapr Workflow activity, providing: + +- Fault tolerance: Agents automatically resume from the last successful activity on failure +- Durability: Agent state is persisted and can survive process restarts +- Observability: Full visibility into agent execution through Dapr's workflow APIs + +Dapr is a portable, serverless, event-driven runtime that makes it easy for developers to +build resilient, stateless and stateful microservices that run on the cloud and edge and +embraces the diversity of languages and developer frameworks. + +CrewAI is a framework for orchestrating role-playing, autonomous AI agents that work +together to accomplish complex tasks. +""".lstrip() + +# Get build number from GITHUB_RUN_NUMBER environment variable +build_number = os.environ.get('GITHUB_RUN_NUMBER', '0') + +if not is_release(): + name += '-dev' + version = f'{__version__}{build_number}' + description = 'The developmental release for the Dapr Workflow extension for CrewAI' + long_description = ( + 'This is the developmental release for the Dapr Workflow extension for CrewAI' + ) + +print(f'package name: {name}, version: {version}', flush=True) + + +setup( + name=name, + version=version, + description=description, + long_description=long_description, +) diff --git a/ext/dapr-ext-crewai/tests/__init__.py b/ext/dapr-ext-crewai/tests/__init__.py new file mode 100644 index 000000000..a67f5aa9b --- /dev/null +++ b/ext/dapr-ext-crewai/tests/__init__.py @@ -0,0 +1,16 @@ +# -*- coding: utf-8 -*- + +""" +Copyright 2025 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" + +# Tests for dapr-ext-crewai extension. diff --git a/ext/dapr-ext-crewai/tests/test_models.py b/ext/dapr-ext-crewai/tests/test_models.py new file mode 100644 index 000000000..8ff721951 --- /dev/null +++ b/ext/dapr-ext-crewai/tests/test_models.py @@ -0,0 +1,252 @@ +# -*- coding: utf-8 -*- + +""" +Copyright 2025 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" + +# Tests for data models. + +import json +import unittest + +from dapr.ext.crewai.models import ( + AgentConfig, + AgentWorkflowInput, + AgentWorkflowOutput, + CallLlmInput, + ExecuteToolInput, + Message, + MessageRole, + TaskConfig, + ToolCall, + ToolDefinition, + ToolResult, +) + + +class TestToolCall(unittest.TestCase): + def test_to_dict(self): + tc = ToolCall(id='call_123', name='search', args={'query': 'test'}) + result = tc.to_dict() + assert result == { + 'id': 'call_123', + 'name': 'search', + 'args': {'query': 'test'}, + } + + def test_from_dict(self): + data = {'id': 'call_123', 'name': 'search', 'args': {'query': 'test'}} + tc = ToolCall.from_dict(data) + assert tc.id == 'call_123' + assert tc.name == 'search' + assert tc.args == {'query': 'test'} + + def test_roundtrip(self): + original = ToolCall(id='call_123', name='search', args={'query': 'test'}) + roundtripped = ToolCall.from_dict(original.to_dict()) + assert original.id == roundtripped.id + assert original.name == roundtripped.name + assert original.args == roundtripped.args + + +class TestToolResult(unittest.TestCase): + def test_to_dict(self): + tr = ToolResult( + tool_call_id='call_123', + tool_name='search', + result='Found 10 results', + ) + result = tr.to_dict() + assert result['tool_call_id'] == 'call_123' + assert result['tool_name'] == 'search' + assert result['result'] == 'Found 10 results' + assert result['error'] is None + + def test_from_dict_with_error(self): + data = { + 'tool_call_id': 'call_123', + 'tool_name': 'search', + 'result': None, + 'error': 'Tool not found', + } + tr = ToolResult.from_dict(data) + assert tr.error == 'Tool not found' + assert tr.result is None + + +class TestMessage(unittest.TestCase): + def test_user_message(self): + msg = Message(role=MessageRole.USER, content='Hello') + data = msg.to_dict() + assert data['role'] == 'user' + assert data['content'] == 'Hello' + assert data['tool_calls'] == [] + + def test_assistant_message_with_tool_calls(self): + tc = ToolCall(id='call_123', name='search', args={'query': 'test'}) + msg = Message( + role=MessageRole.ASSISTANT, + content=None, + tool_calls=[tc], + ) + data = msg.to_dict() + assert data['role'] == 'assistant' + assert len(data['tool_calls']) == 1 + assert data['tool_calls'][0]['name'] == 'search' + + def test_tool_message(self): + msg = Message( + role=MessageRole.TOOL, + content='Result: success', + tool_call_id='call_123', + name='search', + ) + data = msg.to_dict() + assert data['role'] == 'tool' + assert data['tool_call_id'] == 'call_123' + assert data['name'] == 'search' + + def test_roundtrip(self): + tc = ToolCall(id='call_123', name='search', args={'query': 'test'}) + original = Message( + role=MessageRole.ASSISTANT, + content='Let me search', + tool_calls=[tc], + ) + roundtripped = Message.from_dict(original.to_dict()) + assert original.role == roundtripped.role + assert original.content == roundtripped.content + assert len(original.tool_calls) == len(roundtripped.tool_calls) + + +class TestAgentConfig(unittest.TestCase): + def test_to_dict(self): + config = AgentConfig( + role='Research Assistant', + goal='Help users', + backstory='An expert', + model='gpt-4o-mini', + tool_definitions=[ + ToolDefinition(name='search', description='Search the web'), + ], + ) + data = config.to_dict() + assert data['role'] == 'Research Assistant' + assert data['model'] == 'gpt-4o-mini' + assert len(data['tool_definitions']) == 1 + + def test_from_dict(self): + data = { + 'role': 'Assistant', + 'goal': 'Help', + 'backstory': 'Expert', + 'model': 'gpt-4', + 'tool_definitions': [], + 'max_iter': 30, + 'verbose': True, + } + config = AgentConfig.from_dict(data) + assert config.role == 'Assistant' + assert config.max_iter == 30 + assert config.verbose is True + + +class TestAgentWorkflowInput(unittest.TestCase): + def test_json_serializable(self): + """Test that the entire input can be JSON serialized.""" + agent_config = AgentConfig( + role='Assistant', + goal='Help', + backstory='Expert', + model='gpt-4', + ) + task_config = TaskConfig( + description='Do something', + expected_output='A result', + ) + workflow_input = AgentWorkflowInput( + agent_config=agent_config, + task_config=task_config, + messages=[Message(role=MessageRole.USER, content='Hello')], + session_id='test-session', + ) + + # Should not raise + json_str = json.dumps(workflow_input.to_dict()) + assert json_str is not None + + # Should roundtrip + parsed = AgentWorkflowInput.from_dict(json.loads(json_str)) + assert parsed.session_id == 'test-session' + assert parsed.agent_config.role == 'Assistant' + + +class TestAgentWorkflowOutput(unittest.TestCase): + def test_completed_output(self): + output = AgentWorkflowOutput( + final_response='The answer is 42', + messages=[], + iterations=3, + status='completed', + ) + data = output.to_dict() + assert data['final_response'] == 'The answer is 42' + assert data['iterations'] == 3 + assert data['status'] == 'completed' + + def test_error_output(self): + output = AgentWorkflowOutput( + final_response=None, + messages=[], + iterations=1, + status='error', + error='Something went wrong', + ) + data = output.to_dict() + assert data['error'] == 'Something went wrong' + + +class TestCallLlmInput(unittest.TestCase): + def test_to_dict(self): + agent_config = AgentConfig( + role='Assistant', + goal='Help', + backstory='Expert', + model='gpt-4', + ) + task_config = TaskConfig( + description='Do something', + expected_output='A result', + ) + llm_input = CallLlmInput( + agent_config=agent_config, + task_config=task_config, + messages=[], + ) + data = llm_input.to_dict() + assert 'agent_config' in data + assert 'task_config' in data + assert 'messages' in data + + +class TestExecuteToolInput(unittest.TestCase): + def test_to_dict(self): + tc = ToolCall(id='call_123', name='search', args={'query': 'test'}) + tool_input = ExecuteToolInput( + tool_call=tc, + agent_role='Assistant', + session_id='test-session', + ) + data = tool_input.to_dict() + assert data['tool_call']['id'] == 'call_123' + assert data['agent_role'] == 'Assistant' + assert data['session_id'] == 'test-session' diff --git a/ext/dapr-ext-crewai/tests/test_workflow.py b/ext/dapr-ext-crewai/tests/test_workflow.py new file mode 100644 index 000000000..7f70d2d23 --- /dev/null +++ b/ext/dapr-ext-crewai/tests/test_workflow.py @@ -0,0 +1,157 @@ +# -*- coding: utf-8 -*- + +""" +Copyright 2025 The Dapr Authors +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +""" + +# Tests for workflow and activities. + +import unittest + +from dapr.ext.crewai.models import ( + AgentConfig, + TaskConfig, + ToolDefinition, +) +from dapr.ext.crewai.workflow import ( + _build_system_prompt, + _execute_tool, + clear_tool_registry, + get_registered_tool, + get_tool_definition, + register_tool, +) + + +class TestToolRegistry(unittest.TestCase): + def setUp(self): + """Clear registry before each test.""" + clear_tool_registry() + + def test_register_and_get_tool(self): + def my_tool(x: str) -> str: + return f'result: {x}' + + tool_def = ToolDefinition(name='my_tool', description='A test tool') + register_tool('my_tool', my_tool, tool_def) + + retrieved = get_registered_tool('my_tool') + assert retrieved is my_tool + + retrieved_def = get_tool_definition('my_tool') + assert retrieved_def.name == 'my_tool' + + def test_get_nonexistent_tool(self): + result = get_registered_tool('nonexistent') + assert result is None + + def test_clear_registry(self): + def my_tool(): + pass + + register_tool('my_tool', my_tool) + assert get_registered_tool('my_tool') is not None + + clear_tool_registry() + assert get_registered_tool('my_tool') is None + + +class TestExecuteTool(unittest.TestCase): + def test_execute_callable(self): + def add(a: int, b: int) -> int: + return a + b + + result = _execute_tool(add, {'a': 1, 'b': 2}) + assert result == 3 + + def test_execute_tool_with_run_method(self): + class MyTool: + def run(self, query: str) -> str: + return f'searched: {query}' + + tool = MyTool() + result = _execute_tool(tool, {'query': 'test'}) + assert result == 'searched: test' + + def test_execute_tool_with_private_run_method(self): + class MyTool: + def _run(self, query: str) -> str: + return f'searched: {query}' + + tool = MyTool() + result = _execute_tool(tool, {'query': 'test'}) + assert result == 'searched: test' + + def test_execute_tool_with_invoke_method(self): + class MyTool: + def invoke(self, args: dict) -> str: + return f'invoked with: {args}' + + tool = MyTool() + result = _execute_tool(tool, {'query': 'test'}) + assert 'invoked with' in result + + +class TestBuildSystemPrompt(unittest.TestCase): + def test_default_prompt(self): + agent_config = AgentConfig( + role='Research Assistant', + goal='Help users find information', + backstory='An expert researcher', + model='gpt-4', + ) + task_config = TaskConfig( + description='Find info about AI', + expected_output='A summary', + ) + + prompt = _build_system_prompt(agent_config, task_config) + + assert 'Research Assistant' in prompt + assert 'Help users find information' in prompt + assert 'An expert researcher' in prompt + assert 'Find info about AI' in prompt + assert 'A summary' in prompt + + def test_custom_template(self): + agent_config = AgentConfig( + role='Assistant', + goal='Help', + backstory='Expert', + model='gpt-4', + system_template='You are {role}. Goal: {goal}. Task: {task_description}', + ) + task_config = TaskConfig( + description='Do the thing', + expected_output='Result', + ) + + prompt = _build_system_prompt(agent_config, task_config) + + assert prompt == 'You are Assistant. Goal: Help. Task: Do the thing' + + def test_prompt_with_context(self): + agent_config = AgentConfig( + role='Assistant', + goal='Help', + backstory='Expert', + model='gpt-4', + ) + task_config = TaskConfig( + description='Do the thing', + expected_output='Result', + context='Previous results showed X', + ) + + prompt = _build_system_prompt(agent_config, task_config) + + assert 'Previous results showed X' in prompt diff --git a/tox.ini b/tox.ini index 1bdb17921..9f705bb43 100644 --- a/tox.ini +++ b/tox.ini @@ -108,6 +108,18 @@ commands_pre = pip3 install -e {toxinidir}/ext/dapr-ext-fastapi/ pip3 install -e {toxinidir}/ext/dapr-ext-langgraph/ pip3 install -e {toxinidir}/ext/dapr-ext-strands/ + +[testenv:crewai] +basepython = python3 +usedevelop = False +deps = +commands = + python -m unittest discover -v ./ext/dapr-ext-crewai/tests +commands_pre = + pip3 install -e {toxinidir}/ + pip3 install -e {toxinidir}/ext/dapr-ext-workflow/ + pip3 install --no-deps -e {toxinidir}/ext/dapr-ext-crewai/ + [testenv:doc] basepython = python3 usedevelop = False