@@ -752,11 +752,12 @@ def call_activity(
752752 * ,
753753 input : Optional [TInput ] = None ,
754754 retry_policy : Optional [task .RetryPolicy ] = None ,
755+ tags : Optional [dict [str , str ]] = None ,
755756 ) -> task .Task [TOutput ]:
756757 id = self .next_sequence_number ()
757758
758759 self .call_activity_function_helper (
759- id , activity , input = input , retry_policy = retry_policy , is_sub_orch = False
760+ id , activity , input = input , retry_policy = retry_policy , is_sub_orch = False , tags = tags
760761 )
761762 return self ._pending_tasks .get (id , task .CompletableTask ())
762763
@@ -787,6 +788,7 @@ def call_activity_function_helper(
787788 * ,
788789 input : Optional [TInput ] = None ,
789790 retry_policy : Optional [task .RetryPolicy ] = None ,
791+ tags : Optional [dict [str , str ]] = None ,
790792 is_sub_orch : bool = False ,
791793 instance_id : Optional [str ] = None ,
792794 fn_task : Optional [task .CompletableTask [TOutput ]] = None ,
@@ -806,7 +808,7 @@ def call_activity_function_helper(
806808 if isinstance (activity_function , str )
807809 else task .get_name (activity_function )
808810 )
809- action = ph .new_schedule_task_action (id , name , encoded_input )
811+ action = ph .new_schedule_task_action (id , name , encoded_input , tags )
810812 else :
811813 if instance_id is None :
812814 # Create a deteministic instance ID based on the parent instance ID
0 commit comments