From e7e3e122611340454739116b56753761c8b7e66e Mon Sep 17 00:00:00 2001 From: Dan Davison Date: Thu, 17 Jul 2025 09:50:21 -0400 Subject: [PATCH 1/3] Minor cleanup --- openai_agents/workflows/research_agents/research_manager.py | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/openai_agents/workflows/research_agents/research_manager.py b/openai_agents/workflows/research_agents/research_manager.py index 19bdd224b..8dc087be7 100644 --- a/openai_agents/workflows/research_agents/research_manager.py +++ b/openai_agents/workflows/research_agents/research_manager.py @@ -46,16 +46,13 @@ async def _plan_searches(self, query: str) -> WebSearchPlan: async def _perform_searches(self, search_plan: WebSearchPlan) -> list[str]: with custom_span("Search the web"): - num_completed = 0 tasks = [ asyncio.create_task(self._search(item)) for item in search_plan.searches ] results = [] for task in workflow.as_completed(tasks): - result = await task - if result is not None: + if result := await task: results.append(result) - num_completed += 1 return results async def _search(self, item: WebSearchItem) -> str | None: From 9a0f59f16085127fd248860f2801ae134fd5c839 Mon Sep 17 00:00:00 2001 From: Dan Davison Date: Thu, 17 Jul 2025 10:36:21 -0400 Subject: [PATCH 2/3] User super() instead of .next in interceptor --- context_propagation/interceptor.py | 26 +++++++++++++------------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/context_propagation/interceptor.py b/context_propagation/interceptor.py index b90585484..19f0d3c9b 100644 --- a/context_propagation/interceptor.py +++ b/context_propagation/interceptor.py @@ -113,7 +113,7 @@ async def start_workflow_update( self, input: temporalio.client.StartWorkflowUpdateInput ) -> temporalio.client.WorkflowUpdateHandle[Any]: set_header_from_context(input, self._payload_converter) - return await self.next.start_workflow_update(input) + return await super().start_workflow_update(input) class _ContextPropagationActivityInboundInterceptor( @@ -123,40 +123,40 @@ async def execute_activity( self, input: temporalio.worker.ExecuteActivityInput ) -> Any: with context_from_header(input, temporalio.activity.payload_converter()): - return await self.next.execute_activity(input) + return await super().execute_activity(input) class _ContextPropagationWorkflowInboundInterceptor( temporalio.worker.WorkflowInboundInterceptor ): def init(self, outbound: temporalio.worker.WorkflowOutboundInterceptor) -> None: - self.next.init(_ContextPropagationWorkflowOutboundInterceptor(outbound)) + super().init(_ContextPropagationWorkflowOutboundInterceptor(outbound)) async def execute_workflow( self, input: temporalio.worker.ExecuteWorkflowInput ) -> Any: with context_from_header(input, temporalio.workflow.payload_converter()): - return await self.next.execute_workflow(input) + return await super().execute_workflow(input) async def handle_signal(self, input: temporalio.worker.HandleSignalInput) -> None: with context_from_header(input, temporalio.workflow.payload_converter()): - return await self.next.handle_signal(input) + return await super().handle_signal(input) async def handle_query(self, input: temporalio.worker.HandleQueryInput) -> Any: with context_from_header(input, temporalio.workflow.payload_converter()): - return await self.next.handle_query(input) + return await super().handle_query(input) def handle_update_validator( self, input: temporalio.worker.HandleUpdateInput ) -> None: with context_from_header(input, temporalio.workflow.payload_converter()): - self.next.handle_update_validator(input) + super().handle_update_validator(input) async def handle_update_handler( self, input: temporalio.worker.HandleUpdateInput ) -> Any: with context_from_header(input, temporalio.workflow.payload_converter()): - return await self.next.handle_update_handler(input) + return await super().handle_update_handler(input) class _ContextPropagationWorkflowOutboundInterceptor( @@ -166,28 +166,28 @@ async def signal_child_workflow( self, input: temporalio.worker.SignalChildWorkflowInput ) -> None: set_header_from_context(input, temporalio.workflow.payload_converter()) - return await self.next.signal_child_workflow(input) + return await super().signal_child_workflow(input) async def signal_external_workflow( self, input: temporalio.worker.SignalExternalWorkflowInput ) -> None: set_header_from_context(input, temporalio.workflow.payload_converter()) - return await self.next.signal_external_workflow(input) + return await super().signal_external_workflow(input) def start_activity( self, input: temporalio.worker.StartActivityInput ) -> temporalio.workflow.ActivityHandle: set_header_from_context(input, temporalio.workflow.payload_converter()) - return self.next.start_activity(input) + return super().start_activity(input) async def start_child_workflow( self, input: temporalio.worker.StartChildWorkflowInput ) -> temporalio.workflow.ChildWorkflowHandle: set_header_from_context(input, temporalio.workflow.payload_converter()) - return await self.next.start_child_workflow(input) + return await super().start_child_workflow(input) def start_local_activity( self, input: temporalio.worker.StartLocalActivityInput ) -> temporalio.workflow.ActivityHandle: set_header_from_context(input, temporalio.workflow.payload_converter()) - return self.next.start_local_activity(input) + return super().start_local_activity(input) From 7f0e55193fabb3a78443564551fae206482f6611 Mon Sep 17 00:00:00 2001 From: Dan Davison Date: Thu, 17 Jul 2025 12:03:17 -0400 Subject: [PATCH 3/3] Revert "Minor cleanup" This reverts commit e7e3e122611340454739116b56753761c8b7e66e. --- openai_agents/workflows/research_agents/research_manager.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/openai_agents/workflows/research_agents/research_manager.py b/openai_agents/workflows/research_agents/research_manager.py index 8dc087be7..19bdd224b 100644 --- a/openai_agents/workflows/research_agents/research_manager.py +++ b/openai_agents/workflows/research_agents/research_manager.py @@ -46,13 +46,16 @@ async def _plan_searches(self, query: str) -> WebSearchPlan: async def _perform_searches(self, search_plan: WebSearchPlan) -> list[str]: with custom_span("Search the web"): + num_completed = 0 tasks = [ asyncio.create_task(self._search(item)) for item in search_plan.searches ] results = [] for task in workflow.as_completed(tasks): - if result := await task: + result = await task + if result is not None: results.append(result) + num_completed += 1 return results async def _search(self, item: WebSearchItem) -> str | None: