diff --git a/azkaban_cli/api.py b/azkaban_cli/api.py index 04c8a09..84b03b9 100755 --- a/azkaban_cli/api.py +++ b/azkaban_cli/api.py @@ -9,18 +9,19 @@ import logging import os -def upload_request(session, host, session_id, project, zip_path): - """Upload request for the Azkaban API - - :param session: A session for creating the request - :type session: requests.Session - :param str host: Hostname where the request should go - :param str session_id: An id that the user should have when is logged in - :param str project: Project name on Azkaban - :param str zip_path: Local path from zip that will be uploaded - :return: The response from the request made - :rtype: requests.Response - :raises requests.exceptions.ConnectionError: if cannot connect to host +def upload_request(session: requests.Session, host: str, session_id: str, project: str, zip_path: str) -> requests.Response: + """ + Upload request for the Azkaban API + Args: + session: A session for creating the request + host: Hostname where the request should go + session_id: An id that the user should have when is logged in + project: Project name on Azkaban + zip_path: Local path from zip that will be uploaded + Raise: + requests.exceptions.ConnectionError: if cannot connect to host + Returns: + The response from the request made """ zip_file = open(zip_path, 'rb') @@ -42,17 +43,18 @@ def upload_request(session, host, session_id, project, zip_path): return response -def login_request(session, host, user, password): - """Login request for the Azkaban API - - :param session: A session for creating the request - :type session: requests.Session - :param str host: Hostname where the request should go - :param str user: The user name - :param str password: The user password - :return: The response from the request made - :rtype: requests.Response - :raises requests.exceptions.ConnectionError: if cannot connect to host +def login_request(session: requests.Session, host: str, user: str, password: str) -> requests.Response: + """ + Login request for the Azkaban API + Args: + session: A session for creating the request + host: Hostname where the request should go + user: The user name + password: The user password + Raises: + requests.exceptions.ConnectionError: if cannot connect to host + Retunrs: + The response from the request made """ response = session.post( @@ -68,20 +70,23 @@ def login_request(session, host, user, password): return response -def schedule_request(session, host, session_id, project, flow, cron, **execution_options): - r"""Schedule request for the Azkaban API - - :param session: A session for creating the request - :type session: requests.Session - :param str host: Hostname where the request should go - :param str session_id: An id that the user should have when is logged in - :param str project: Project name that contains the flow that will be scheduled on Azkaban - :param str flow: Flow name to be scheduled on Azkaban - :param str cron: Cron expression in quartz format used to schedule - :param \*\*execution_options: Optional parameters to execution - :return: The response from the request made - :rtype: requests.Response - :raises requests.exceptions.ConnectionError: if cannot connect to host +def schedule_request( + session: requests.Session, host: str, session_id: str, project: str, flow: str, cron: str, **execution_options) -> requests.Response: + """ + Schedule request for the Azkaban API + Args: + session: A session for creating the request + session: requests.Session + host: Hostname where the request should go + session_id: An id that the user should have when is logged in + project: Project name that contains the flow that will be scheduled on Azkaban + flow: Flow name to be scheduled on Azkaban + cron: Cron expression in quartz format used to schedule + \*\*execution_options: Optional parameters to execution + Raises: + requests.exceptions.ConnectionError: if cannot connect to host + Returns: + The response from the request made """ data = { @@ -104,17 +109,18 @@ def schedule_request(session, host, session_id, project, flow, cron, **execution return response -def fetch_flows_request(session, host, session_id, project): - """Fetch flows of a project request for the Azkaban API - - :param session: A session for creating the request - :type session: requests.Session - :param str host: Hostname where the request should go - :param str session_id: An id that the user should have when is logged in - :param str project: Project name whose flows will be fetched on Azkaban - :return: The response from the request made - :rtype: requests.Response - :raises requests.exceptions.ConnectionError: if cannot connect to host +def fetch_flows_request(session: requests.Session, host: str, session_id: str, project: str) -> requests.Response: + """ + Fetch flows of a project request for the Azkaban API + Args: + session: A session for creating the request + host: Hostname where the request should go + session_id: An id that the user should have when is logged in + project: Project name whose flows will be fetched on Azkaban + Raises: + raises requests.exceptions.ConnectionError: if cannot connect to host + Returns: + The response from the request made """ response = session.get( @@ -130,19 +136,21 @@ def fetch_flows_request(session, host, session_id, project): return response -def fetch_executions_of_a_flow_request(session, session_id, project, flow, start, length): - """fetch executions of a flow on a given project - - :param session: A session for creating the request - :type session: requests.Session - :param str session_id: An id that the user should have when is logged in - :param str project: Project name whose flows will be fetched on Azkaban - :param str flow: Flow name whose schedule will be fetched on Azkaban - :param int start: The start index of the returned list (inclusive) - :param int length: The length of the returned list - :return: The response from the request made - :rtype: requests.Response - :raises requests.exceptions.ConnectionError: if cannot connect to host +def fetch_executions_of_a_flow_request(session: requests.Session, session_id: str, project: str, flow: str, start: int, length:int) -> requests.Response: + """ + fetch executions of a flow on a given project + Args: + session: A session for creating the request + session: requests.Session + session_id: An id that the user should have when is logged in + project: Project name whose flows will be fetched on Azkaban + flow: Flow name whose schedule will be fetched on Azkaban + start: The start index of the returned list (inclusive) + length: The length of the returned list + Raises: + requests.exceptions.ConnectionError: if cannot connect to host + Returns: + The response from the request made """ response = session.get( @@ -161,18 +169,19 @@ def fetch_executions_of_a_flow_request(session, session_id, project, flow, start return response -def fetch_jobs_from_flow_request(session, host, session_id, project, flow): - """Fetch jobs of a flow of a project request for the Azkaban API - - :param session: A session for creating the request - :type session: requests.Session - :param str host: Hostname where the request should go - :param str session_id: An id that the user should have when is logged in - :param str project: Project name whose flow's jobs will be fetched on Azkaban - :param str project: Flow id whose jobs will be fetched on Azkaban - :return: The response from the request made - :rtype: requests.Response - :raises requests.exceptions.ConnectionError: if cannot connect to host +def fetch_jobs_from_flow_request(session: requests.Session, host: str, session_id: str, project: str, flow: str) -> requests.Response: + """ + Fetch jobs of a flow of a project request for the Azkaban API + Args: + session: A session for creating the request + host: Hostname where the request should go + session_id: An id that the user should have when is logged in + project: Project name whose flow's jobs will be fetched on Azkaban + flow: Flow id whose jobs will be fetched on Azkaban + Raises: + requests.exceptions.ConnectionError: if cannot connect to host + Returns: + The response from the request made """ response = session.get( @@ -189,18 +198,18 @@ def fetch_jobs_from_flow_request(session, host, session_id, project, flow): return response -def fetch_schedule_request(session, host, session_id, project_id, flow): +def fetch_schedule_request(session: requests.Session, host: str, session_id: str, project_id: str, flow: str) -> requests.Response: """Fetch flow of a project request for the Azkaban API - - :param session: A session for creating the request - :type session: requests.Session - :param str host: Hostname where the request should go - :param str session_id: An id that the user should have when is logged in - :param str project_id: Project ID whose flow schedule will be fetched on Azkaban - :param str flow: Flow name whose schedule will be fetched on Azkaban - :return: The response from the request made - :rtype: requests.Response - :raises requests.exceptions.ConnectionError: if cannot connect to host + Args: + session: A session for creating the request + host: Hostname where the request should go + session_id: An id that the user should have when is logged in + project: Project name whose flow's jobs will be fetched on Azkaban + project: Flow id whose jobs will be fetched on Azkaban + Raises: + requests.exceptions.ConnectionError: if cannot connect to host + Returns: + The response from the request made """ response = session.get( @@ -217,17 +226,18 @@ def fetch_schedule_request(session, host, session_id, project_id, flow): return response -def unschedule_request(session, host, session_id, schedule_id): - """Unschedule request for the Azkaban API - - :param session: A session for creating the request - :type session: requests.Session - :param str host: Hostname where the request should go - :param str session_id: An id that the user should have when is logged in - :param str schedule_id: Schedule id of the flow that will be unscheduled on Azkaban - :return: The response from the request made - :rtype: requests.Response - :raises requests.exceptions.ConnectionError: if cannot connect to host +def unschedule_request(session: requests.Session, host: str, session_id: str, schedule_id: str) -> requests.Response: + """ + Unschedule request for the Azkaban API + Args: + session: A session for creating the request + host: Hostname where the request should go + session_id: An id that the user should have when is logged in + schedule_id: Schedule id of the flow that will be unscheduled on Azkaban + Raises: + requests.exceptions.ConnectionError: if cannot connect to host + Returns: + The response from the request made """ data = { @@ -247,19 +257,20 @@ def unschedule_request(session, host, session_id, schedule_id): return response -def execute_request(session, host, session_id, project, flow, **execution_options): - """Execute request for the Azkaban API - - :param session: A session for creating the request - :type session: requests.Session - :param str host: Hostname where the request should go - :param str session_id: An id that the user should have when is logged in - :param str project: Project name that contains the flow that will be executed on Azkaban - :param str flow: Flow name to be executed on Azkaban - :param \*\*execution_options: Optional parameters to execution - :return: The response from the request made - :rtype: requests.Response - :raises requests.exceptions.ConnectionError: if cannot connect to host +def execute_request(session: requests.Session, host: str, session_id: str, project: str, flow: str, **execution_options) -> requests.Response: + """ + Execute request for the Azkaban API + Args: + session: A session for creating the request + host: Hostname where the request should go + session_id: An id that the user should have when is logged in + project: Project name that contains the flow that will be executed on Azkaban + flow: Flow name to be executed on Azkaban + \*\*execution_options: Optional parameters to execution + Raises: + requests.exceptions.ConnectionError: if cannot connect to host + Returns: + The response from the request made """ params = { @@ -280,17 +291,18 @@ def execute_request(session, host, session_id, project, flow, **execution_option return response -def cancel_request(session, host, session_id, exec_id): - """Cancel an running flow for the Azkaban API - - :param session: A session for creating the request - :type session: requests.Session - :param str host: Hostname where the request should go - :param str session_id: An id that the user should have when is logged in - :param str exec_id: Execution id to be canceled - :return: The response from the request made - :rtype: requests.Response - :raises requests.exceptions.ConnectionError: if cannot connect to host +def cancel_request(session: requests.Session, host: str, session_id: str, exec_id: str) -> requests.Response: + """ + Cancel an running flow for the Azkaban API + Args: + session: A session for creating the request + host: Hostname where the request should go + session_id: An id that the user should have when is logged in + exec_id: Execution id to be canceled + Raises: + requests.exceptions.ConnectionError: if cannot connect to host + Returns: + The response from the request made """ response = session.get( @@ -307,17 +319,19 @@ def cancel_request(session, host, session_id, exec_id): return response -def create_request(session, host, session_id, project, description): - """Create a Project request for the Azkaban API - - :param session: A session for creating the request - :type session: requests.Session - :param str session_id: An id that the user should have when is logged in - :param str project: Project name to be created on Azkaban - :param str description: The description for the project - :return: The response from the request made - :rtype: requests.Response - :raises requests.exceptions.ConnectionError: if cannot connect to host +def create_request(session: requests.Session, host: str, session_id: str, project: str, description: str) -> requests.Response: + """ + Create a Project request for the Azkaban API + Args: + session: A session for creating the request + host: Hostname where the request should go + session_id: An id that the user should have when is logged in + project: Project name to be created on Azkaban + description: The description for the project + Raises: + requests.exceptions.ConnectionError: if cannot connect to host + Returns: + The response from the request made """ response = session.post( @@ -334,15 +348,18 @@ def create_request(session, host, session_id, project, description): return response -def delete_request(session, host, session_id, project): - """Delete a Project request for the Azkaban API - - :param session: A session for creating the request - :type session: requests.Session - :param str session_id: An id that the user should have when is logged in - :param str project: Project name to be deleted on Azkaban:return: The response from the request made - :rtype: requests.Response - :raises requests.exceptions.ConnectionError: if cannot connect to host +def delete_request(session: requests.Session, host: str, session_id: str, project: str) -> requests.Response: + """ + Delete a Project request for the Azkaban API + Args: + session: A session for creating the request + host: Hostname where the request should go + session_id: An id that the user should have when is logged in + project: Project name to be deleted on Azkaban + Raises: + requests.exceptions.ConnectionError: if cannot connect to host + Returns: + The response from the request made """ response = session.get( @@ -358,16 +375,17 @@ def delete_request(session, host, session_id, project): return response -def fetch_projects_request(session, host, session_id): - """Fetch all projects request for the Azkaban API - - :param session: A session for creating the request - :type session: requests.Session - :param str host: Hostname where the request should go - :param str session_id: An id that the user should have when is logged in - :return: The response from the request made - :rtype: requests.Response - :raises requests.exceptions.ConnectionError: if cannot connect to host +def fetch_projects_request(session: requests.Session, host: str, session_id: str) -> requests.Response: + """ + Fetch all projects request for the Azkaban API + Args: + session: A session for creating the request + host: Hostname where the request should go + session_id: An id that the user should have when is logged in + Raises: + requests.exceptions.ConnectionError: if cannot connect to host + Returns: + The response from the request made """ response = session.get( @@ -381,19 +399,21 @@ def fetch_projects_request(session, host, session_id): return response -def add_permission_request(session, host, session_id, project, group, permission_options): - """Add permission request for the Azkaban API - - :param session: A session for creating the request - :type session: requests.Session - :param str host: Hostname where the request should go - :param str session_id: An id that the user should have when is logged in - :param str project: Project name that will receive group permissions on Azkaban - :param str group: Group name on Azkaban - :param Dictionary permission_options: The permissions options added to group in the project on Azkaban - :return: The response from the request made - :rtype: requests.Response - :raises requests.exceptions.ConnectionError: if cannot connect to host +def add_permission_request(session: requests.Session, host: str, session_id: str, project: str, group: str, permission_options: dict) -> requests.Response: + """ + Add permission request for the Azkaban API + Fetch all projects request for the Azkaban API + Args: + session: A session for creating the request + host: Hostname where the request should go + session_id: An id that the user should have when is logged in + project: Project name that will receive group permissions on Azkaban + group: Group name on Azkaban + permission_options: The permissions options added to group in the project on Azkaban + Raises: + requests.exceptions.ConnectionError: if cannot connect to host + Returns: + The response from the request made """ response = __call_permission_api(session, host, session_id, 'addPermission', project, group, permission_options) @@ -402,18 +422,19 @@ def add_permission_request(session, host, session_id, project, group, permission return response -def remove_permission_request(session, host, session_id, project, group): - """Remove permission request for the Azkaban API - - :param session: A session for creating the request - :type session: requests.Session - :param str host: Hostname where the request should go - :param str session_id: An id that the user should have when is logged in - :param str project: Project name that will lose group permissions on Azkaban - :param str group: Group name on Azkaban - :return: The response from the request made - :rtype: requests.Response - :raises requests.exceptions.ConnectionError: if cannot connect to host +def remove_permission_request(session: requests.Session, host: str, session_id: str, project: str, group: str) -> requests.Response: + """ + Remove permission request for the Azkaban API + Args: + session: A session for creating the request + host: Hostname where the request should go + session_id: An id that the user should have when is logged in + project: Project name that will lose group permissions on Azkaban + group: Group name on Azkaban + Raises: + requests.exceptions.ConnectionError: if cannot connect to host + Returns: + The response from the request made """ #to remove a group permission, we have to pass all permissions as False @@ -425,19 +446,20 @@ def remove_permission_request(session, host, session_id, project, group): return response -def change_permission_request(session, host, session_id, project, group, permission_options): - """Change permission request for the Azkaban API - - :param session: A session for creating the request - :type session: requests.Session - :param str host: Hostname where the request should go - :param str session_id: An id that the user should have when is logged in - :param str project: Project name that will receive the newly updated group permissions on Azkaban - :param str group: Group name on Azkaban - :param Dictionary permission_options: The permissions options to replace old permission for the group in the project on Azkaban - :return: The response from the request made - :rtype: requests.Response - :raises requests.exceptions.ConnectionError: if cannot connect to host +def change_permission_request(session: requests.Session, host: str, session_id: str, project: str, group: str, permission_options: dict) -> requests.Response: + """ + Change permission request for the Azkaban API + Args: + session: A session for creating the request + host: Hostname where the request should go + session_id: An id that the user should have when is logged in + project: Project name that will receive the newly updated group permissions on Azkaban + group: Group name on Azkaban + permission_options: The permissions options to replace old permission for the group in the project on Azkaban + Raises: + requests.exceptions.ConnectionError: if cannot connect to host + Returns: + The response from the request made """ response = __call_permission_api(session, host, session_id, 'changePermission', project, group, permission_options) @@ -447,17 +469,18 @@ def change_permission_request(session, host, session_id, project, group, permiss return response -def fetch_sla_request(session, host, session_id, schedule_id): - """Fetch flow of a SLA request for the Azkaban API - - :param session: A session for creating the request - :type session: requests.Session - :param str host: Hostname where the request should go - :param str session_id: An id that the user should have when is logged in - :param str schedule_id: The id of the shchedule. - :return: The response from the request made - :rtype: requests.Response - :raises requests.exceptions.ConnectionError: if cannot connect to host +def fetch_sla_request(session: requests.Session, host: str, session_id: str, schedule_id: str) -> requests.Response: + """ + Fetch flow of a SLA request for the Azkaban API + Args: + session: A session for creating the request + host: Hostname where the request should go + session_id: An id that the user should have when is logged in + schedule_id: The id of the shchedule + Raises: + requests.exceptions.ConnectionError: if cannot connect to host + Returns: + The response from the request made """ response = session.get( @@ -473,21 +496,23 @@ def fetch_sla_request(session, host, session_id, schedule_id): return response -def __call_permission_api(session, host, session_id, operation, project, group, permission_options ): +def __call_permission_api( + session: requests.Response, host: str, session_id: str, operation: str, project: str, group: str, permission_options: str + ) -> requests.Response: """ This function is a utility to call permission API in Azkaban. - - :param str operation:The action to be executed in Azkaban, can be with values [addPermission OU changePermission] - :param session: A session for creating the request - :type session: requests.Session - :param str host: Hostname where the request should go - :param str session_id: An id that the user should have when is logged in - :param str project: Project name that will receive the newly updated group permissions on Azkaban - :param str group: Group name on Azkaban - :param Dictionary permission_options: The permissions options to replace old permission for the group in the project on Azkaban - :return: The response from the request made - :rtype: requests.Response - :raises requests.exceptions.ConnectionError: if cannot connect to host + Args: + session: A session for creating the request + host: Hostname where the request should go + session_id: An id that the user should have when is logged in + operation: The action to be executed in Azkaban, can be with values [addPermission OU changePermission] + project: Project name that will receive the newly updated group permissions on Azkaban + group: Group name on Azkaban + permission_options: The permissions options to replace old permission for the group in the project on Azkaban + Raises: + requests.exceptions.ConnectionError: if cannot connect to host + Returns: + The response from the request made Sample request to Azkaban #https://azkaban.qa.globoi.com/manager?project=teste-permission-api-20190806&name=time-dmp&ajax=addPermission&permissions%5Badmin%5D=false&permissions%5Bread%5D=true&permissions%5Bwrite%5D=false&permissions%5Bexecute%5D=true&permissions%5Bschedule%5D=false&group=true @@ -509,17 +534,18 @@ def __call_permission_api(session, host, session_id, operation, project, group, } ) -def fetch_flow_execution_request(session, host, session_id, exec_id): - """Fetch a flow execution request for the Azkaban API - - :param session: A session for creating the request - :type session: requests.Session - :param str host: Hostname where the request should go - :param str session_id: An id that the user should have when is logged in - :param str exec_id: Execution id to be fetched - :return: The response from the request made - :rtype: requests.Response - :raises requests.exceptions.ConnectionError: if cannot connect to host +def fetch_flow_execution_request(session: requests.Session, host: str, session_id: str, exec_id: str) -> requests.Response: + """ + Fetch a flow execution request for the Azkaban API + Args: + session: A session for creating the request + host: Hostname where the request should go + session_id: An id that the user should have when is logged in + exec_id: Execution id to be fetched + Raises: + requests.exceptions.ConnectionError: if cannot connect to host + Returns: + The response from the request made """ response = session.get( @@ -535,21 +561,22 @@ def fetch_flow_execution_request(session, host, session_id, exec_id): return response -def fetch_flow_execution_updates_request(session, host, session_id, exec_id, last_update_time): - """Fetch a flow execution updates request for the Azkaban API - - :param session: A session for creating the request - :type session: requests.Session - :param str host: Hostname where the request should go - :param str session_id: An id that the user should have when is logged in - :param str exec_id: Execution id to be fetched - :return: The response from the request made - :param last_update_time: The criteria to filter by last update time. Set the - value to be -1 if all job information are needed. Use -lt="value" to - subscribe the default value, defaults to -1 - :type last_update_time: str, optional - :rtype: requests.Response - :raises requests.exceptions.ConnectionError: if cannot connect to host +def fetch_flow_execution_updates_request( + session: requests.Session, host: str, session_id: str, exec_id: str, last_update_time: str = None + ) -> requests.Response: + """ + Fetch a flow execution updates request for the Azkaban API + Args: + session: A session for creating the request + host: Hostname where the request should go + session_id: An id that the user should have when is logged in + exec_id: Execution id to be fetched + last_update_time: The criteria to filter by last update time. Set the value to be -1 if all + job information are needed. Use -lt="value" to subscribe the default value, defaults to -1 + Raises: + requests.exceptions.ConnectionError: if cannot connect to host + Returns: + The response from the request made """ response = session.get( @@ -566,25 +593,28 @@ def fetch_flow_execution_updates_request(session, host, session_id, exec_id, las return response -def fetch_execution_job_log_request(session, host, session_id, exec_id, jobid, offset, length): - """Fetches the correponding job logs. - +def fetch_execution_job_log_request( + session: requests.Response, host: str, session_id: str, exec_id: str, jobid: str, offset: str, length: str + ) -> requests.Response: + """ + Fetches the correponding job logs. This method receives the execution id, jobid, offset and lenght, makes a fetch request to get the correponding job logs and evaluates the response. - - Returns the json response from the request. - - :param execution_id: Execution id on Azkaban - :type execution_id: str - :param jobid: The unique id for the job to be fetched. - :type jobid: str - :param offset: The offset for the log data. - :type offset: str - :param length: The length of the log data. For example, if the offset set is - 10 and the length is 1000, the returned log will starts from the 10th character - and has a length of 1000 (less if the remaining log is less than 1000 long) - :type length: str - :raises FetchExecutionJobsLogError: when Azkaban api returns error in response + Args: + session: A session for creating the request + host: Hostname where the request should go + session_id: An id that the user should have when is logged in + exec_id: Execution id to be fetched + jobid: The unique id for the job to be fetched + offset: The offset for the log data + length: The length of the log data. For example, if the offset set is 10 and the length is 1000, + the returned log will starts from the 10th character and has a length of 1000 + (less if the remaining log is less than 1000 long) + + Raises: + FetchExecutionJobsLogError: when Azkaban api returns error in response + Returns: + The json response from the request """ response = session.get( @@ -603,20 +633,18 @@ def fetch_execution_job_log_request(session, host, session_id, exec_id, jobid, o return response -def resume_flow_execution(session, host, session_id, exec_id): - """Resume a flow execution request for the Azkaban API - - :param session: A session for creating the request - :type session: requests.Session - :param str host: Hostname where the request should go - :param str session_id: An id that the user should have when is logged in - :param str exec_id: Execution id to be fetched - :param str session_id: An id that the user should have when is logged in - :param str project: Project name that will receive the newly updated group permissions on Azkaban - :param str project: Flow id whose executions will be fetched on Azkaban - :return: The response from the request made - :rtype: requests.Response - :raises requests.exceptions.ConnectionError: if cannot connect to host +def resume_flow_execution(session: requests.Session, host: str, session_id: str , exec_id: str) -> requests.Response: + """ + Resume a flow execution request for the Azkaban API + Args: + session: A session for creating the request + host: Hostname where the request should go + session_id: An id that the user should have when is logged in + exec_id: Execution id to be fetched + Raises: + requests.exceptions.ConnectionError: if cannot connect to host + Returns: + The response from the request made """ response = session.get( host + '/executor', @@ -631,16 +659,19 @@ def resume_flow_execution(session, host, session_id, exec_id): return response -def fetch_running_executions_of_a_flow_request(session, host, session_id, project, flow): - - """Fetch running executions of a flow - - :param str session_id: An id that the user should have when is logged in - :param str project: Project name that will receive the newly updated group permissions on Azkaban - :param str project: Flow id whose executions will be fetched on Azkaban - :return: The response from the request made - :rtype: requests.Response - :raises requests.exceptions.ConnectionError: if cannot connect to host +def fetch_running_executions_of_a_flow_request(session: requests.Session, host: str, session_id: str, project: str, flow: str) -> requests.Response: + """ + Fetch running executions of a flow + Args: + session: A session for creating the request + host: Hostname where the request should go + session_id: An id that the user should have when is logged in + project: Project name that will receive the newly updated group permissions on Azkaban + Flow: Flow id whose executions will be fetched on Azkaban + Raises: + requests.exceptions.ConnectionError: if cannot connect to host + Returns: + The response from the request made """ response = session.get( diff --git a/azkaban_cli/azkaban.py b/azkaban_cli/azkaban.py index 0bb23de..3e23e8d 100755 --- a/azkaban_cli/azkaban.py +++ b/azkaban_cli/azkaban.py @@ -49,12 +49,13 @@ def __init__(self): self.__user = None self.__session_id = None - def __validate_host(self, host): + def __validate_host(self, host: str) -> str: """ PRIVATE Receives a host and when the host ends with '/', will we return a host without the '/'. - :param host: - :return: host: - :rtype: str: + Args: + host: azkaban host + Returns: + host without the '/' """ valid_host = host @@ -145,12 +146,11 @@ def __catch_response_error(self, response, exception, ignore_empty_responses=Fal if not ignore_empty_responses: self.__catch_empty_response(exception, response_json) - def get_logged_session(self): + def get_logged_session(self) -> dict: """ Method for return the host and session id of the logged session saved on the class - - :return: A dictionary containing host, user and session_id as keys - :rtype: dict + Returns: + A dictionary containing host, user and session_id as keys """ logged_session = { @@ -161,13 +161,13 @@ def get_logged_session(self): return logged_session - def set_logged_session(self, host, user, session_id): + def set_logged_session(self, host: str, user: str, session_id: str): """ Method for set host, user and session_id, attributes of the class - - :param str host: Azkaban hostname - :param str user: Azkaban username - :param str session_id: session.id received from a login request + Args: + host: Azkaban hostname + user: Azkaban username + session_id: session.id received from a login request """ self.__host = host @@ -179,7 +179,7 @@ def logout(self): self.set_logged_session(None, None, None) - def login(self, host, user, password): + def login(self, host: str, user: str, password: str): """ Login command, intended to make the request to Azkaban and treat the response properly @@ -187,11 +187,12 @@ def login(self, host, user, password): password is wrong or could not connect to host, it returns false and do not change the host and session_id attribute from the class. If everything is fine, saves the new session_id and corresponding host as attributes in the class and returns True - - :param str host: Azkaban hostname - :param str user: Username to login - :param str password: Password from user - :raises LoginError: when Azkaban api returns error in response + Args: + host: Azkaban hostname + user: Username to login + password: Password from user + Raises: + LoginError: when Azkaban api returns error in response """ valid_host = self.__validate_host(host) @@ -205,7 +206,7 @@ def login(self, host, user, password): logging.info('Logged as %s' % (user)) - def upload(self, path, project=None, zip_name=None): + def upload(self, path: str, project: str=None, zip_name: str=None): """ Upload command, intended to make the request to Azkaban and treat the response properly @@ -217,11 +218,12 @@ def upload(self, path, project=None, zip_name=None): passed. If zip name is not passed as argument, the project name will be used for the zip. If project or path is wrong or if there is no session_id, it returns false. If everything is fine, returns True. - - :param str path: path to be zipped and uploaded - :param str project: Project name on Azkaban, optional. - :param str zip_name: Zip name that will be created and uploaded, optional. - :raises UploadError: when Azkaban api returns error in response + Args: + path: path to be zipped and uploaded + project: Project name on Azkaban, optional + zip_name: Zip name that will be created and uploaded, optional + Raises: + UploadError: when Azkaban api returns error in response """ self.__check_if_logged() @@ -249,7 +251,7 @@ def upload(self, path, project=None, zip_name=None): response_json = response.json() logging.info('Project %s updated to version %s' % (project, response_json[u'version'])) - def schedule(self, project, flow, cron, **execution_options): + def schedule(self, project: str, flow: str, cron: str, **execution_options: dict): """ Schedule command, intended to make the request to Azkaban and treat the response properly. @@ -258,11 +260,12 @@ def schedule(self, project, flow, cron, **execution_options): If project, flow or cron is wrong or if there is no session_id, it returns false. If everything is fine, returns True. - - :param str project: Project name on Azkaban - :param str flow: Flow name on Azkaban - :param str cron: Cron expression, in quartz format [Eg.: '0*/10*?**' -> Every 10 minutes] - :raises ScheduleError: when Azkaban api returns error in response + Args: + project: Project name on Azkaban + flow: flow name on Azkaban + cron: Cron expression, in quartz format [Eg.: '0*/10*?**' -> Every 10 minutes] + Raises: + ScheduleError: when Azkaban api returns error in response """ self.__check_if_logged() @@ -285,7 +288,7 @@ def schedule(self, project, flow, cron, **execution_options): logging.info(response_json[u'message']) logging.info('scheduleId: %s' % (response_json[u'scheduleId'])) - def fetch_flows(self, project): + def fetch_flows(self, project: str) -> json: """ Fetch flows command, intended to make the request to Azkaban and treat the response properly. @@ -294,9 +297,12 @@ def fetch_flows(self, project): If project is wrong or there is no session_id, it returns false. If everything is fine, returns True. - - :param str project: project name on Azkaban - :raises FetchFlowsError: when Azkaban api returns error in response + Args: + project_: project name on Azkaban + Raises: + FetchJobsFromFlowError: when Azkaban api returns error in response + Returns: + the json response from the request """ self.__check_if_logged() @@ -314,19 +320,20 @@ def fetch_flows(self, project): logging.info('Project ID: %s' % (response_json[u'projectId'])) return response_json - def fetch_jobs_from_flow(self, project, flow): + def fetch_jobs_from_flow(self, project: str, flow: str): """ Fetch jobs of a flow command, intended to make the request to Azkaban and return the response. This method receives the project name and flow id, makes the fetch jobs of a flow request to fetch the jobs of a flow and evaluates the response. - - Returns the json response from the request. - - :param str project: project name on Azkaban - :param str flow: flow id on Azkaban - :raises FetchJobsFromFlowError: when Azkaban api returns error in response + Args: + project_id: project id on Azkaban + flow: flow name on Azkaban + Raises: + FetchJobsFromFlowError: when Azkaban api returns error in response + Returns: + the json response from the request """ self.__check_if_logged() @@ -343,7 +350,7 @@ def fetch_jobs_from_flow(self, project, flow): return response.json() - def fetch_schedule(self, project_id, flow): + def fetch_schedule(self, project_id: str, flow: str) -> json: """ Fetch schedule command, intended to make the request to Azkaban and treat the response properly. @@ -352,10 +359,13 @@ def fetch_schedule(self, project_id, flow): If project_id or flow is wrong or there is no session_id, it returns false. If everything is fine, returns True. - - :param str project_id: project id on Azkaban - :param str flow: flow name on Azkaban - :raises FetchScheduleError: when Azkaban api returns error in response + Args: + project_id: project id on Azkaban + flow: flow name on Azkaban + Raises: + FetchScheduleError: when Azkaban api returns error in response + Returns: + the json response from the request """ self.__check_if_logged() @@ -374,7 +384,7 @@ def fetch_schedule(self, project_id, flow): logging.info('Schedule ID: %s' % (response_json[u'schedule'][u'scheduleId'])) return response_json - def unschedule(self, schedule_id): + def unschedule(self, schedule_id: str): """ Unschedule command, intended to make the request to Azkaban and treat the response properly. @@ -383,9 +393,10 @@ def unschedule(self, schedule_id): If schedule_id is wrong or there is no session_id, it returns false. If everything is fine, returns True. - - :param str schedule_id: Schedule id on Azkaban - :raises UnscheduleError: when Azkaban api returns error in response + Args: + schedule_id: Schedule id on Azkaban + Raises: + UnscheduleError: when Azkaban api returns error in response """ self.__check_if_logged() @@ -402,7 +413,7 @@ def unschedule(self, schedule_id): response_json = response.json() logging.info(response_json[u'message']) - def execute(self, project, flow, **execution_options): + def execute(self, project: str, flow: str, **execution_options: dict): """ Execute command, intended to make the request to Azkaban and treat the response properly. @@ -410,10 +421,11 @@ def execute(self, project, flow, **execution_options): response. If project or flow is wrong or if there is no session_id, it returns false. If everything is fine, returns True. - - :param str project: Project name on Azkaban - :param str flow: Flow name on Azkaban - :raises ExecuteError: when Azkaban api returns error in response + Args: + project: Project name on Azkaban + flow: Flow name on Azkaban + Raises: + ExecuteError: when Azkaban api returns error in response """ self.__check_if_logged() @@ -434,7 +446,7 @@ def execute(self, project, flow, **execution_options): response_json = response.json() logging.info('%s' % (response_json[u'message'])) - def cancel(self, execution_id): + def cancel(self, execution_id: str): """ Execute command, intended to make the request to Azkaban and treat the response properly. @@ -442,9 +454,10 @@ def cancel(self, execution_id): evaluate the response. If the flow is not running, it will return an error message. - - :param str execution_id: Execution id on Azkaban - :raises CancelError: when Azkaban api returns error in response + Args: + execution_id: Execution id on Azkaban + Raises: + CancelError: when Azkaban api returns error in response """ self.__check_if_logged() @@ -458,15 +471,15 @@ def cancel(self, execution_id): self.__catch_response_error(response, CancelError) - def create(self, project, description): + def create(self, project: str, description: str): """ Create command, intended to make the request to Azkaban and treat the response properly. This method receives the project name and the description, make the execute request to create the project and evaluate the response. - - :param str project: Project name on Azkaban - :param str description: Description for the project + Args: + project: Project name on Azkaban + description: Description for the project """ self.__check_if_logged() @@ -483,14 +496,14 @@ def create(self, project, description): logging.info('Project %s created successfully' % (project)) - def delete(self, project): + def delete(self, project: str): """ Delete command, intended to make the request to Azkaban and treat the response properly. This method receives the project name, make the execute request to delete the project and evaluate the response. - - :param str project: Project name on Azkaban + Args: + project: Project name on Azkaban """ self.__check_if_logged() @@ -504,10 +517,12 @@ def delete(self, project): # The delete request does not return any message - def fetch_projects(self): + def fetch_projects(self) -> str: """ Fetch all projects command, intended to make the request to Azkaban and treat the response properly. This method makes the fetch projects request to fetch all the projects and evaluates the response. + Returns: + Request response content in text """ self.__check_if_logged() @@ -523,16 +538,16 @@ def fetch_projects(self): return response.text - def add_permission(self, project, group, permission_options): + def add_permission(self, project: str, group: str, permission_options: dict): """ Add permission command, intended to make the request to Azkaban and treat the response properly. This method receives the project name, the group name, and the permission options and execute request to add a group permission to the project and evaluate the response. - - :param str project: Project name on Azkaban - :param str group: Group name on Azkaban - :param Dictionary permission_options: The group permissions in the project on Azkaban + Args: + project: Project name on Azkaban + group: Group name on Azkaban + permission_options: The group permissions in the project on Azkaban """ self.__check_if_logged() @@ -552,15 +567,15 @@ def add_permission(self, project, group, permission_options): logging.info('Group [%s] add with permission [%s] in the Project [%s] successfully' % (group, permission_options, project)) - def remove_permission(self, project, group): + def remove_permission(self, project: str, group: str): """ Remove permission command, intended to make the request to Azkaban and treat the response properly. This method receives the project name and the group name and execute request to remove a group permission from the project and evaluate the response. - - :param str project: Project name on Azkaban - :param str group: Group name on Azkaban + Args: + project: Project name on Azkaban + group: Group name on Azkaban """ self.__check_if_logged() @@ -577,16 +592,16 @@ def remove_permission(self, project, group): logging.info('Group [%s] permission removed from the Project [%s] successfully' % (group, project)) - def change_permission(self, project, group, permission_options): + def change_permission(self, project: str, group: str, permission_options: dict): """ Change permission command, intended to make the request to Azkaban and treat the response properly. This method receives the project name, the group name, and the permission options and execute request to change a existing group permission in a project and evaluate the response. - - :param str project: Project name on Azkaban - :param str group: Group name on Azkaban - :param Dictionary permission_options: The group permissions in the project on Azkaban + Args: + project: Project name on Azkaban + group: Group name on Azkaban + permission_options: The group permissions in the project on Azkaban """ self.__check_if_logged() @@ -606,12 +621,16 @@ def change_permission(self, project, group, permission_options): logging.info('Group [%s] AAA received new permissions [%s] in the Project [%s] successfully' % (group, permission_options, project)) - def fetch_sla(self, schedule_id): + def fetch_sla(self, schedule_id: str) -> json: """ Fetch SLA command, intended to make the request to Azkaban and treat the response properly. Given a schedule id, this API call fetches the SLA. - - :param str schedule_id: Schedule ID on Azkaban (Find on fetch_schedule) + Args: + schedule_id: Schedule ID on Azkaban (Find on fetch_schedule) + Raises: + FetchSLAError: when Azkaban api returns error in response + Returns: + Returns the json response from the request """ self.__check_if_logged() @@ -625,10 +644,9 @@ def fetch_sla(self, schedule_id): self.__catch_response_error(response, FetchSLAError) - response_json = response.json() - return response_json + return response.json() - def __check_group_permissions(self, permission_options): + def __check_group_permissions(self, permission_options: dict) -> dict: """ Check the group permissions for the project. Catch all permission from the dict and set as True, if the option dont exists in this dict, set False. If no permissions are found, just set the Read default to true, as in the @@ -640,8 +658,10 @@ def __check_group_permissions(self, permission_options): EXECUTE...: The user is allowed to execute, pause, cancel jobs. SCHEDULE..: The user is allowed to add, modify and remove a flow from the schedule. - :param Dictionary permission_options: The group permissions in the project on Azkaban - :return: Dictionary filled_permission_options: Dictionary containing filled permissions. + Args: + permission_options: The group permissions in the project on Azkaban + Returns: + Dictionary containing filled permissions """ __options = ["admin", "write", "read", "execute", "schedule"] filled_permission_options = { @@ -663,18 +683,19 @@ def __check_group_permissions(self, permission_options): return filled_permission_options - def fetch_flow_execution(self, execution_id): + def fetch_flow_execution(self, execution_id: str) -> json: """ Fetch a flow execution command, intended to make the request to Azkaban and treat the response properly. This method receives the execution id, makes the fetch a flow execution request to fetch the flow execution details and evaluates the response. - - Returns the json response from the request. - - :param str execution_id: Execution id on Azkaban - :raises FetchFlowExecutionError: when Azkaban api returns error in response + Args: + execution_id: Execution id on Azkaban + Raises: + FetchExecutionJobsLogError: when Azkaban api returns error in response + Returns: + Returns the json response from the request """ self.__check_if_logged() @@ -690,22 +711,22 @@ def fetch_flow_execution(self, execution_id): return response.json() - def fetch_flow_execution_updates(self, execution_id, last_update_time): + def fetch_flow_execution_updates(self, execution_id: str, last_update_time: str) -> json: """ Fetch a flow execution updates command, intended to make the request to Azkaban and treat the response properly. This method receives the execution id and the last_update_time , makes the fetch a flow execution request to fetch the flow execution update details and evaluates the response. - - Returns the json response from the request. - - :param str execution_id: Execution id on Azkaban - :param str last_update_time: (optional) The criteria to filter by last update time. - Set the value to be -1 if all job information are needed. Use -lt="value" - to subscribe the default value, defaults to -1 - :raises FetchFlowExecutionError: when Azkaban api returns error in response - :return class:`Response` object as json friendly. + Args: + execution_id: Execution id on Azkaban + last_update_time: (optional) The criteria to filter by last update time. + Set the value to be -1 if all job information are needed. Use -lt="value" + to subscribe the default value, defaults to -1 + Raises: + FetchExecutionJobsLogError: when Azkaban api returns error in response + Returns: + Returns `Response` object as json friendly """ self.__check_if_logged() @@ -722,22 +743,21 @@ def fetch_flow_execution_updates(self, execution_id, last_update_time): return response.json() - def fetch_executions_of_a_flow(self, project, flow, start, length): + def fetch_executions_of_a_flow(self, project: str, flow: str, start: int, length: int) -> json: """ Fetch executions of a flow command, intended to make the request to Azkaban and treat the response properly. - This method receives the project name, the flow id, the start index of the returned list and the length of the returned list, it makes the fetch and evaluates the response. - - Returns the json response from the request. - - :param str project: Project name on Azkaban - :param str flow: Flow id on Azkaban - :param int start: Start index of the returned list - :param int length: Length of the returned list - :raises FetchExecutionsOfAFlowError: when Azkaban api returns error in response - :return class:`Response` object as json friendly. + Args: + project: Project name on Azkaban + flow: Flow id on Azkaban + start: Start index of the returned list + length: Length of the returned list + Raises: + FetchExecutionJobsLogError: when Azkaban api returns error in response + Returns: + Returns `Response` object as json friendly """ self.__check_if_logged() @@ -755,25 +775,22 @@ def fetch_executions_of_a_flow(self, project, flow, start, length): return response.json() - def fetch_execution_job_log(self, execution_id, jobid, offset, length): - """Fetches the correponding job logs. - + def fetch_execution_job_log(self, execution_id: str, jobid: str, offset: str, length: str) -> json: + """ + Fetches the correponding job logs. This method receives the execution id, jobid, offset and lenght, makes a fetch request to get the correponding job logs and evaluates the response. - - Returns the json response from the request. - - :param execution_id: Execution id on Azkaban - :type execution_id: str - :param jobid: The unique id for the job to be fetched. - :type jobid: str - :param offset: The offset for the log data. - :type offset: str - :param length: The length of the log data. For example, if the offset set is - 10 and the length is 1000, the returned log will starts from the 10th character - and has a length of 1000 (less if the remaining log is less than 1000 long) - :type length: str - :raises FetchExecutionJobsLogError: when Azkaban api returns error in response + Args: + exec_id: Execution id on Azkaban + jobid: The unique id for the job to be fetched + offset: The offset for the log data + length: The length of the log data. For example, if the offset set is 10 and the length is 1000, + the returned log will starts from the 10th character and has a length of 1000 + (less if the remaining log is less than 1000 long) + Raises: + FetchExecutionJobsLogError: when Azkaban api returns error in response + Returns: + The json response from the request """ self.__check_if_logged() @@ -791,13 +808,15 @@ def fetch_execution_job_log(self, execution_id, jobid, offset, length): return response.json() - def resume_flow_execution(self, execution_id): - """Resume a flow execution for the Azkaban API - - :param str execution_id: Execution id to be resumed - :return: The response from the request made - :rtype: requests.Response - :raises ResumeFlowExecutionError: when Azkaban api returns error in response + def resume_flow_execution(self, execution_id: str) -> json: + """ + Resume a flow execution for the Azkaban API + Args: + execution_id: Execution id to be resumed + Raise: + ResumeFlowExecutionError: when Azkaban api returns error in response + Returns: + The response from the request made """ self.__check_if_logged() @@ -812,19 +831,19 @@ def resume_flow_execution(self, execution_id): return response.json() - def fetch_running_executions_of_a_flow(self, project, flow): - """Fetch running executions of a flow command, intended to make the request to Azkaban + def fetch_running_executions_of_a_flow(self, project: str, flow: str) -> json: + """ + Fetch running executions of a flow command, intended to make the request to Azkaban and treat the response properly. This method receives the project name and the flow id, making the fetch and evaluating the response. - - Returns the json response from the request. - - :param project: Project name on Azkaban - :type project: str - :param flow: Flow id on Azkaban - :type flow: str - :raises FetchRunningExecutionsOfAFlowError: when Azkaban api returns error in response + Args: + project: Project name on Azkaban + flow: Flow id on Azkaban + Raise: + FetchRunningExecutionsOfAFlowError: when Azkaban api returns error in response + Returns: + Returns the json response from the request """ self.__check_if_logged()