From e279966eafd87d4975fae113841c9d0d8303ddd0 Mon Sep 17 00:00:00 2001 From: GSmithApps Date: Tue, 22 Jul 2025 08:57:50 -0500 Subject: [PATCH] context vars --- custom_metric/activity.py | 5 ++++- custom_metric/shared.py | 4 ++++ custom_metric/worker.py | 10 ++++++++-- custom_metric/workflow.py | 18 +++++++++++++++++- 4 files changed, 33 insertions(+), 4 deletions(-) create mode 100644 custom_metric/shared.py diff --git a/custom_metric/activity.py b/custom_metric/activity.py index 7f2ee1163..476d72379 100644 --- a/custom_metric/activity.py +++ b/custom_metric/activity.py @@ -1,9 +1,12 @@ +import threading import time from temporalio import activity +from custom_metric.shared import user_id @activity.defn def print_and_sleep(): - print("In the activity.") + print(f"In the activity. in thread {threading.current_thread().name}") + print(f"User ID: {user_id.get()} in activity {activity.info().activity_id}") time.sleep(1) diff --git a/custom_metric/shared.py b/custom_metric/shared.py new file mode 100644 index 000000000..4c10439ae --- /dev/null +++ b/custom_metric/shared.py @@ -0,0 +1,4 @@ +from contextvars import ContextVar +from typing import Optional + +user_id: ContextVar[Optional[str]] = ContextVar("user_id", default=None) diff --git a/custom_metric/worker.py b/custom_metric/worker.py index 9ffad2077..4fe2949de 100644 --- a/custom_metric/worker.py +++ b/custom_metric/worker.py @@ -1,5 +1,6 @@ import asyncio from concurrent.futures import ThreadPoolExecutor +import threading from temporalio import activity from temporalio.client import Client @@ -13,12 +14,14 @@ from custom_metric.activity import print_and_sleep from custom_metric.workflow import StartTwoActivitiesWorkflow +from custom_metric.shared import user_id class SimpleWorkerInterceptor(Interceptor): def intercept_activity( self, next: ActivityInboundInterceptor ) -> ActivityInboundInterceptor: + user_id.set(activity.info().activity_id) # Set a user ID for the activity context return CustomScheduleToStartInterceptor(next) @@ -32,6 +35,9 @@ async def execute_activity(self, input: ExecuteActivityInput): # Could do the original schedule time instead of current attempt # schedule_to_start_second_option = activity.info().started_time - activity.info().scheduled_time + # print the thread name for debugging + print(f"In the activity interceptor. in thread {threading.current_thread().name}") + meter = activity.metric_meter() histogram = meter.create_histogram_timedelta( "custom_activity_schedule_to_start_latency", @@ -60,8 +66,8 @@ async def main(): activities=[print_and_sleep], # only one activity executor with two concurrently scheduled activities # to force a nontrivial schedule to start times - activity_executor=ThreadPoolExecutor(1), - max_concurrent_activities=1, + activity_executor=ThreadPoolExecutor(10), + max_concurrent_activities=10, ) await worker.run() diff --git a/custom_metric/workflow.py b/custom_metric/workflow.py index cf37823bd..1185c95b8 100644 --- a/custom_metric/workflow.py +++ b/custom_metric/workflow.py @@ -21,5 +21,21 @@ async def run(self): print_and_sleep, start_to_close_timeout=timedelta(seconds=5), ) - await asyncio.gather(activity1, activity2) + activity3 = workflow.execute_activity( + print_and_sleep, + start_to_close_timeout=timedelta(seconds=5), + ) + activity4 = workflow.execute_activity( + print_and_sleep, + start_to_close_timeout=timedelta(seconds=5), + ) + activity5 = workflow.execute_activity( + print_and_sleep, + start_to_close_timeout=timedelta(seconds=5), + ) + activity6 = workflow.execute_activity( + print_and_sleep, + start_to_close_timeout=timedelta(seconds=5), + ) + await asyncio.gather(activity1, activity2, activity3, activity4, activity5, activity6) return None