1818from datetime import datetime , timedelta , timezone
1919from urllib import parse
2020import re
21+ import os
2122import json
2223from base64 import b64encode
2324from typing import Any , Optional , Dict
4950 'https://cloudtasks.googleapis.com/v2/' + _CLOUD_TASKS_API_RESOURCE_PATH
5051_FIREBASE_FUNCTION_URL_FORMAT = \
5152 'https://{location_id}-{project_id}.cloudfunctions.net/{resource_id}'
53+ _EMULATOR_HOST_ENV_VAR = 'CLOUD_TASKS_EMULATOR_HOST'
54+ _EMULATED_SERVICE_ACCOUNT_DEFAULT = 'emulated-service-acct@email.com'
5255
5356_FUNCTIONS_HEADERS = {
5457 'X-GOOG-API-FORMAT-VERSION' : '2' ,
5861# Default canonical location ID of the task queue.
5962_DEFAULT_LOCATION = 'us-central1'
6063
64+ def _get_emulator_host () -> Optional [str ]:
65+ emulator_host = os .environ .get (_EMULATOR_HOST_ENV_VAR )
66+ if emulator_host :
67+ if '//' in emulator_host :
68+ raise ValueError (
69+ f'Invalid { _EMULATOR_HOST_ENV_VAR } : "{ emulator_host } ". It must follow format '
70+ '"host:port".' )
71+ return emulator_host
72+ return None
73+
74+
6175def _get_functions_service (app ) -> _FunctionsService :
6276 return _utils .get_app_service (app , _FUNCTIONS_ATTRIBUTE , _FunctionsService )
6377
@@ -103,13 +117,19 @@ def __init__(self, app: App):
103117 'projectId option, or use service account credentials. Alternatively, set the '
104118 'GOOGLE_CLOUD_PROJECT environment variable.' )
105119
106- self ._credential = app .credential .get_credential ()
120+ self ._emulator_host = _get_emulator_host ()
121+ if self ._emulator_host :
122+ self ._credential = _utils .EmulatorAdminCredentials ()
123+ else :
124+ self ._credential = app .credential .get_credential ()
125+
107126 self ._http_client = _http_client .JsonHttpClient (credential = self ._credential )
108127
109128 def task_queue (self , function_name : str , extension_id : Optional [str ] = None ) -> TaskQueue :
110129 """Creates a TaskQueue instance."""
111130 return TaskQueue (
112- function_name , extension_id , self ._project_id , self ._credential , self ._http_client )
131+ function_name , extension_id , self ._project_id , self ._credential , self ._http_client ,
132+ self ._emulator_host )
113133
114134 @classmethod
115135 def handle_functions_error (cls , error : Any ):
@@ -125,7 +145,8 @@ def __init__(
125145 extension_id : Optional [str ],
126146 project_id ,
127147 credential ,
128- http_client
148+ http_client ,
149+ emulator_host : Optional [str ] = None
129150 ) -> None :
130151
131152 # Validate function_name
@@ -134,6 +155,7 @@ def __init__(
134155 self ._project_id = project_id
135156 self ._credential = credential
136157 self ._http_client = http_client
158+ self ._emulator_host = emulator_host
137159 self ._function_name = function_name
138160 self ._extension_id = extension_id
139161 # Parse resources from function_name
@@ -167,16 +189,26 @@ def enqueue(self, task_data: Any, opts: Optional[TaskOptions] = None) -> str:
167189 str: The ID of the task relative to this queue.
168190 """
169191 task = self ._validate_task_options (task_data , self ._resource , opts )
170- service_url = self ._get_url (self ._resource , _CLOUD_TASKS_API_URL_FORMAT )
192+ emulator_url = self ._get_emulator_url (self ._resource )
193+ service_url = emulator_url or self ._get_url (self ._resource , _CLOUD_TASKS_API_URL_FORMAT )
171194 task_payload = self ._update_task_payload (task , self ._resource , self ._extension_id )
172195 try :
173196 resp = self ._http_client .body (
174197 'post' ,
175198 url = service_url ,
176199 headers = _FUNCTIONS_HEADERS ,
177- json = {'task' : task_payload .__dict__ }
200+ json = {'task' : task_payload .to_api_dict () }
178201 )
179- task_name = resp .get ('name' , None )
202+ if self ._is_emulated ():
203+ # Emulator returns a response with format {task: {name: <task_name>}}
204+ # The task name also has an extra '/' at the start compared to prod
205+ task_info = resp .get ('task' ) or {}
206+ task_name = task_info .get ('name' )
207+ if task_name :
208+ task_name = task_name [1 :]
209+ else :
210+ # Production returns a response with format {name: <task_name>}
211+ task_name = resp .get ('name' )
180212 task_resource = \
181213 self ._parse_resource_name (task_name , f'queues/{ self ._resource .resource_id } /tasks' )
182214 return task_resource .resource_id
@@ -197,7 +229,11 @@ def delete(self, task_id: str) -> None:
197229 ValueError: If the input arguments are invalid.
198230 """
199231 _Validators .check_non_empty_string ('task_id' , task_id )
200- service_url = self ._get_url (self ._resource , _CLOUD_TASKS_API_URL_FORMAT + f'/{ task_id } ' )
232+ emulator_url = self ._get_emulator_url (self ._resource )
233+ if emulator_url :
234+ service_url = emulator_url + f'/{ task_id } '
235+ else :
236+ service_url = self ._get_url (self ._resource , _CLOUD_TASKS_API_URL_FORMAT + f'/{ task_id } ' )
201237 try :
202238 self ._http_client .body (
203239 'delete' ,
@@ -235,8 +271,8 @@ def _validate_task_options(
235271 """Validate and create a Task from optional ``TaskOptions``."""
236272 task_http_request = {
237273 'url' : '' ,
238- 'oidc_token ' : {
239- 'service_account_email ' : ''
274+ 'oidcToken ' : {
275+ 'serviceAccountEmail ' : ''
240276 },
241277 'body' : b64encode (json .dumps (data ).encode ()).decode (),
242278 'headers' : {
@@ -250,7 +286,7 @@ def _validate_task_options(
250286 task .http_request ['headers' ] = {** task .http_request ['headers' ], ** opts .headers }
251287 if opts .schedule_time is not None and opts .schedule_delay_seconds is not None :
252288 raise ValueError (
253- 'Both sechdule_delay_seconds and schedule_time cannot be set at the same time.' )
289+ 'Both schedule_delay_seconds and schedule_time cannot be set at the same time.' )
254290 if opts .schedule_time is not None and opts .schedule_delay_seconds is None :
255291 if not isinstance (opts .schedule_time , datetime ):
256292 raise ValueError ('schedule_time should be UTC datetime.' )
@@ -288,7 +324,10 @@ def _update_task_payload(self, task: Task, resource: Resource, extension_id: str
288324 """Prepares task to be sent with credentials."""
289325 # Get function url from task or generate from resources
290326 if not _Validators .is_non_empty_string (task .http_request ['url' ]):
291- task .http_request ['url' ] = self ._get_url (resource , _FIREBASE_FUNCTION_URL_FORMAT )
327+ if self ._is_emulated ():
328+ task .http_request ['url' ] = ''
329+ else :
330+ task .http_request ['url' ] = self ._get_url (resource , _FIREBASE_FUNCTION_URL_FORMAT )
292331
293332 # Refresh the credential to ensure all attributes (e.g. service_account_email, id_token)
294333 # are populated, preventing cold start errors.
@@ -298,20 +337,40 @@ def _update_task_payload(self, task: Task, resource: Resource, extension_id: str
298337 except RefreshError as err :
299338 raise ValueError (f'Initial task payload credential refresh failed: { err } ' ) from err
300339
301- # If extension id is provided, it emplies that it is being run from a deployed extension.
340+ # If extension id is provided, it implies that it is being run from a deployed extension.
302341 # Meaning that it's credential should be a Compute Engine Credential.
303342 if _Validators .is_non_empty_string (extension_id ) and \
304343 isinstance (self ._credential , ComputeEngineCredentials ):
305344 id_token = self ._credential .token
306345 task .http_request ['headers' ] = \
307346 {** task .http_request ['headers' ], 'Authorization' : f'Bearer { id_token } ' }
308347 # Delete oidc token
309- del task .http_request ['oidc_token ' ]
348+ del task .http_request ['oidcToken ' ]
310349 else :
311- task .http_request ['oidc_token' ] = \
312- {'service_account_email' : self ._credential .service_account_email }
350+ try :
351+ task .http_request ['oidcToken' ] = \
352+ {'serviceAccountEmail' : self ._credential .service_account_email }
353+ except AttributeError as error :
354+ if self ._is_emulated ():
355+ task .http_request ['oidcToken' ] = \
356+ {'serviceAccountEmail' : _EMULATED_SERVICE_ACCOUNT_DEFAULT }
357+ else :
358+ raise ValueError (
359+ 'Failed to determine service account. Initialize the SDK with service '
360+ 'account credentials or set service account ID as an app option.'
361+ ) from error
313362 return task
314363
364+ def _get_emulator_url (self , resource : Resource ):
365+ if self ._emulator_host :
366+ emulator_url_format = f'http://{ self ._emulator_host } /' + _CLOUD_TASKS_API_RESOURCE_PATH
367+ url = self ._get_url (resource , emulator_url_format )
368+ return url
369+ return None
370+
371+ def _is_emulated (self ):
372+ return self ._emulator_host is not None
373+
315374
316375class _Validators :
317376 """A collection of data validation utilities."""
@@ -436,6 +495,14 @@ class Task:
436495 schedule_time : Optional [str ] = None
437496 dispatch_deadline : Optional [str ] = None
438497
498+ def to_api_dict (self ) -> dict :
499+ """Converts the Task object to a dictionary suitable for the Cloud Tasks API."""
500+ return {
501+ 'httpRequest' : self .http_request ,
502+ 'name' : self .name ,
503+ 'scheduleTime' : self .schedule_time ,
504+ 'dispatchDeadline' : self .dispatch_deadline ,
505+ }
439506
440507@dataclass
441508class Resource :
0 commit comments