diff --git a/README.md b/README.md index f44f6ef..ea44274 100755 --- a/README.md +++ b/README.md @@ -34,9 +34,9 @@ Commands: add_permission Add a group with permission in a project change_permission Change a group permission in a project create Create a new project + cancel Cancel a flow execution delete Delete a project execute Execute a flow from a project - cancel Cancel a flow execution fetch_running_executions_of_a_flow Fetch the running executions of a flow fetch_flow_execution Fetch a flow execution fetch_flow_execution_updates Fetch flow execution updates @@ -47,6 +47,7 @@ Commands: fetch_sla Fetch the SLA from a schedule login Login to an Azkaban server logout Logout from Azkaban session + pause_a_flow_execution Pause a flow execution remove_permission Remove group permission from a project schedule Schedule a flow from a project with specified cron... unschedule Unschedule a flow from a project diff --git a/azkaban_cli/api.py b/azkaban_cli/api.py index 04c8a09..5621a08 100755 --- a/azkaban_cli/api.py +++ b/azkaban_cli/api.py @@ -9,6 +9,7 @@ import logging import os + def upload_request(session, host, session_id, project, zip_path): """Upload request for the Azkaban API @@ -23,25 +24,22 @@ def upload_request(session, host, session_id, project, zip_path): :raises requests.exceptions.ConnectionError: if cannot connect to host """ - zip_file = open(zip_path, 'rb') + zip_file = open(zip_path, "rb") zip_name = os.path.basename(zip_path) response = session.post( - host + '/manager', - data={ - u'session.id': session_id, - u'ajax': u'upload', - u'project': project - }, + host + "/manager", + data={u"session.id": session_id, u"ajax": u"upload", u"project": project}, files={ - u'file': (zip_name, zip_file, 'application/zip'), - } + u"file": (zip_name, zip_file, "application/zip"), + }, ) logging.debug("Response: \n%s", response.text) return response + def login_request(session, host, user, password): """Login request for the Azkaban API @@ -55,19 +53,13 @@ def login_request(session, host, user, password): :raises requests.exceptions.ConnectionError: if cannot connect to host """ - response = session.post( - host, - data={ - u'action': u'login', - u'username': user, - u'password': password - } - ) + response = session.post(host, data={u"action": u"login", u"username": user, u"password": password}) logging.debug("Response: \n%s", response.text) return response + def schedule_request(session, host, session_id, project, flow, cron, **execution_options): r"""Schedule request for the Azkaban API @@ -85,25 +77,23 @@ def schedule_request(session, host, session_id, project, flow, cron, **execution """ data = { - u'session.id': session_id, - u'ajax': u'scheduleCronFlow', - u'projectName': project, - u'flow': flow, - u'cronExpression': cron + u"session.id": session_id, + u"ajax": u"scheduleCronFlow", + u"projectName": project, + u"flow": flow, + u"cronExpression": cron, } data.update(execution_options) logging.debug("Request data: \n%s", data) - response = session.post( - host + '/schedule', - data=data - ) + response = session.post(host + "/schedule", data=data) logging.debug("Response: \n%s", response.text) return response + def fetch_flows_request(session, host, session_id, project): """Fetch flows of a project request for the Azkaban API @@ -118,18 +108,14 @@ def fetch_flows_request(session, host, session_id, project): """ response = session.get( - host + '/manager', - params={ - u'session.id': session_id, - u'ajax': 'fetchprojectflows', - u'project': project - } + host + "/manager", params={u"session.id": session_id, u"ajax": "fetchprojectflows", u"project": project} ) logging.debug("Response: \n%s", response.text) return response + def fetch_executions_of_a_flow_request(session, session_id, project, flow, start, length): """fetch executions of a flow on a given project @@ -146,21 +132,22 @@ def fetch_executions_of_a_flow_request(session, session_id, project, flow, start """ response = session.get( - host + '/manager', + host + "/manager", params={ - u'session.id': session_id, - u'ajax':'fetchFlowExecutions', - u'project': project, - u'flow': flow, - u'start': start, - u'length': length, - } + u"session.id": session_id, + u"ajax": "fetchFlowExecutions", + u"project": project, + u"flow": flow, + u"start": start, + u"length": length, + }, ) logging.debug("Response: \n%s", response.text) return response + def fetch_jobs_from_flow_request(session, host, session_id, project, flow): """Fetch jobs of a flow of a project request for the Azkaban API @@ -176,19 +163,15 @@ def fetch_jobs_from_flow_request(session, host, session_id, project, flow): """ response = session.get( - host + '/manager', - params={ - u'session.id': session_id, - u'ajax': 'fetchflowgraph', - u'project': project, - u'flow': flow - } + host + "/manager", + params={u"session.id": session_id, u"ajax": "fetchflowgraph", u"project": project, u"flow": flow}, ) logging.debug("Response: \n%s", response.text) return response + def fetch_schedule_request(session, host, session_id, project_id, flow): """Fetch flow of a project request for the Azkaban API @@ -204,19 +187,15 @@ def fetch_schedule_request(session, host, session_id, project_id, flow): """ response = session.get( - host + '/schedule', - params={ - u'session.id': session_id, - u'ajax': 'fetchSchedule', - u'projectId': project_id, - u'flowId': flow - } + host + "/schedule", + params={u"session.id": session_id, u"ajax": "fetchSchedule", u"projectId": project_id, u"flowId": flow}, ) logging.debug("Response: \n%s", response.text) return response + def unschedule_request(session, host, session_id, schedule_id): """Unschedule request for the Azkaban API @@ -230,25 +209,19 @@ def unschedule_request(session, host, session_id, schedule_id): :raises requests.exceptions.ConnectionError: if cannot connect to host """ - data = { - u'session.id': session_id, - u'action': u'removeSched', - u'scheduleId': schedule_id - } + data = {u"session.id": session_id, u"action": u"removeSched", u"scheduleId": schedule_id} logging.debug("Request data: \n%s", data) - response = session.post( - host + '/schedule', - data=data - ) + response = session.post(host + "/schedule", data=data) logging.debug("Response: \n%s", response.text) return response + def execute_request(session, host, session_id, project, flow, **execution_options): - """Execute request for the Azkaban API + r"""Execute request for the Azkaban API :param session: A session for creating the request :type session: requests.Session @@ -262,24 +235,17 @@ def execute_request(session, host, session_id, project, flow, **execution_option :raises requests.exceptions.ConnectionError: if cannot connect to host """ - params = { - u'session.id': session_id, - u'ajax': 'executeFlow', - u'project': project, - u'flow': flow - } + params = {u"session.id": session_id, u"ajax": "executeFlow", u"project": project, u"flow": flow} params.update(execution_options) - response = session.get( - host + '/executor', - params=params - ) + response = session.get(host + "/executor", params=params) logging.debug("Response: \n%s", response.text) return response + def cancel_request(session, host, session_id, exec_id): """Cancel an running flow for the Azkaban API @@ -294,12 +260,7 @@ def cancel_request(session, host, session_id, exec_id): """ response = session.get( - host + '/executor', - params={ - u'session.id': session_id, - u'ajax': 'cancelFlow', - u'execid': exec_id - } + host + "/executor", params={u"session.id": session_id, u"ajax": "cancelFlow", u"execid": exec_id} ) logging.debug("Response: \n%s", response.text) @@ -321,19 +282,15 @@ def create_request(session, host, session_id, project, description): """ response = session.post( - host + '/manager', - data={ - u'session.id': session_id, - u'action': u'create', - u'name': project, - u'description': description - } + host + "/manager", + data={u"session.id": session_id, u"action": u"create", u"name": project, u"description": description}, ) logging.debug("Response: \n%s", response.text) return response + def delete_request(session, host, session_id, project): """Delete a Project request for the Azkaban API @@ -346,18 +303,14 @@ def delete_request(session, host, session_id, project): """ response = session.get( - host + '/manager', - params={ - u'session.id': session_id, - u'delete': 'true', - u'project': project - } + host + "/manager", params={u"session.id": session_id, u"delete": "true", u"project": project} ) logging.debug("Response: \n%s", response.text) return response + def fetch_projects_request(session, host, session_id): """Fetch all projects request for the Azkaban API @@ -370,17 +323,13 @@ def fetch_projects_request(session, host, session_id): :raises requests.exceptions.ConnectionError: if cannot connect to host """ - response = session.get( - host + '/index?all', - params={ - u'session.id': session_id - } - ) + response = session.get(host + "/index?all", params={u"session.id": session_id}) logging.debug("Response: \n%s", response.text) return response + def add_permission_request(session, host, session_id, project, group, permission_options): """Add permission request for the Azkaban API @@ -396,12 +345,13 @@ def add_permission_request(session, host, session_id, project, group, permission :raises requests.exceptions.ConnectionError: if cannot connect to host """ - response = __call_permission_api(session, host, session_id, 'addPermission', project, group, permission_options) + response = __call_permission_api(session, host, session_id, "addPermission", project, group, permission_options) logging.debug("Response: \n%s", response.text) return response + def remove_permission_request(session, host, session_id, project, group): """Remove permission request for the Azkaban API @@ -416,15 +366,16 @@ def remove_permission_request(session, host, session_id, project, group): :raises requests.exceptions.ConnectionError: if cannot connect to host """ - #to remove a group permission, we have to pass all permissions as False - permission_options = {'admin': False, 'read': False, 'write': False, 'execute': False, 'schedule': False} + # to remove a group permission, we have to pass all permissions as False + permission_options = {"admin": False, "read": False, "write": False, "execute": False, "schedule": False} - response = __call_permission_api(session, host, session_id, 'changePermission', project, group, permission_options) + response = __call_permission_api(session, host, session_id, "changePermission", project, group, permission_options) logging.debug("Response: \n%s", response.text) return response + def change_permission_request(session, host, session_id, project, group, permission_options): """Change permission request for the Azkaban API @@ -440,7 +391,7 @@ def change_permission_request(session, host, session_id, project, group, permiss :raises requests.exceptions.ConnectionError: if cannot connect to host """ - response = __call_permission_api(session, host, session_id, 'changePermission', project, group, permission_options) + response = __call_permission_api(session, host, session_id, "changePermission", project, group, permission_options) logging.debug("Response: \n%s", response.text) @@ -461,19 +412,20 @@ def fetch_sla_request(session, host, session_id, schedule_id): """ response = session.get( - host + '/schedule', + host + "/schedule", params={ - u'session.id': session_id, - u'ajax': 'slaInfo', - u'scheduleId': schedule_id, - } + u"session.id": session_id, + u"ajax": "slaInfo", + u"scheduleId": schedule_id, + }, ) logging.debug("Response: \n%s", response.text) return response -def __call_permission_api(session, host, session_id, operation, project, group, permission_options ): + +def __call_permission_api(session, host, session_id, operation, project, group, permission_options): """ This function is a utility to call permission API in Azkaban. @@ -494,21 +446,22 @@ def __call_permission_api(session, host, session_id, operation, project, group, """ return session.get( - host + '/manager', - params = { - u'session.id': session_id, - u'ajax': operation, - u'project': project, - u'name': group, - u'permissions[admin]': permission_options['admin'], - u'permissions[write]': permission_options['write'], - u'permissions[read]': permission_options['read'], - u'permissions[execute]': permission_options['execute'], - u'permissions[schedule]': permission_options['schedule'], - u'group': True - } + host + "/manager", + params={ + u"session.id": session_id, + u"ajax": operation, + u"project": project, + u"name": group, + u"permissions[admin]": permission_options["admin"], + u"permissions[write]": permission_options["write"], + u"permissions[read]": permission_options["read"], + u"permissions[execute]": permission_options["execute"], + u"permissions[schedule]": permission_options["schedule"], + u"group": True, + }, ) + def fetch_flow_execution_request(session, host, session_id, exec_id): """Fetch a flow execution request for the Azkaban API @@ -523,18 +476,14 @@ def fetch_flow_execution_request(session, host, session_id, exec_id): """ response = session.get( - host + '/executor', - params={ - u'session.id': session_id, - u'ajax': 'fetchexecflow', - u'execid': exec_id - } + host + "/executor", params={u"session.id": session_id, u"ajax": "fetchexecflow", u"execid": exec_id} ) logging.debug("Response: \n%s", response.text) return response + def fetch_flow_execution_updates_request(session, host, session_id, exec_id, last_update_time): """Fetch a flow execution updates request for the Azkaban API @@ -553,19 +502,20 @@ def fetch_flow_execution_updates_request(session, host, session_id, exec_id, las """ response = session.get( - host + '/executor', + host + "/executor", params={ - u'session.id': session_id, - u'ajax': 'fetchexecflowupdate', - u'execid': exec_id, - u'lastUpdateTime': last_update_time - } + u"session.id": session_id, + u"ajax": "fetchexecflowupdate", + u"execid": exec_id, + u"lastUpdateTime": last_update_time, + }, ) logging.debug("Response: \n%s", response.text) return response + def fetch_execution_job_log_request(session, host, session_id, exec_id, jobid, offset, length): """Fetches the correponding job logs. @@ -588,21 +538,22 @@ def fetch_execution_job_log_request(session, host, session_id, exec_id, jobid, o """ response = session.get( - host + '/executor', + host + "/executor", params={ - u'session.id': session_id, - u'ajax': 'fetchExecJobLogs', - u'execid': exec_id, - u'jobId': jobid, - u'offset': offset, - u'length': length - } + u"session.id": session_id, + u"ajax": "fetchExecJobLogs", + u"execid": exec_id, + u"jobId": jobid, + u"offset": offset, + u"length": length, + }, ) logging.debug("Response: \n%s", response.text) return response + def resume_flow_execution(session, host, session_id, exec_id): """Resume a flow execution request for the Azkaban API @@ -619,18 +570,14 @@ def resume_flow_execution(session, host, session_id, exec_id): :raises requests.exceptions.ConnectionError: if cannot connect to host """ response = session.get( - host + '/executor', - params={ - u'session.id': session_id, - u'ajax': 'resumeFlow', - u'execid': exec_id - } + host + "/executor", params={u"session.id": session_id, u"ajax": "resumeFlow", u"execid": exec_id} ) logging.debug("Response: \n%s", response.text) return response + def fetch_running_executions_of_a_flow_request(session, host, session_id, project, flow): """Fetch running executions of a flow @@ -644,13 +591,28 @@ def fetch_running_executions_of_a_flow_request(session, host, session_id, projec """ response = session.get( - host + '/executor', - params={ - u'session.id': session_id, - u'ajax': 'getRunning', - u'project': project, - u'flow': flow - } + host + "/executor", + params={u"session.id": session_id, u"ajax": "getRunning", u"project": project, u"flow": flow}, + ) + + logging.debug("Response: \n%s", response.text) + + return response + + +def pause_a_flow_execution(session, host, session_id, exec_id): + """Pause a flow execution for the Azkaban API + + :param session: A session for creating the request + :param str host: Hostname where the request should go + :param str session_id: An id that the user should have when is logged in + :param str exec_id: Execution id to be fetched + :return: The response from the request made + :rtype: requests.Response + :raises requests.exceptions.ConnectionError: if cannot connect to host + """ + response = session.get( + host + "/executor", params={u"session.id": session_id, u"ajax": "pauseFlow", u"execid": exec_id} ) logging.debug("Response: \n%s", response.text) diff --git a/azkaban_cli/azkaban.py b/azkaban_cli/azkaban.py index 0bb23de..8fe5a6f 100755 --- a/azkaban_cli/azkaban.py +++ b/azkaban_cli/azkaban.py @@ -33,7 +33,8 @@ FetchExecutionsOfAFlowError, FetchExecutionJobsLogError, ResumeFlowExecutionError, - FetchRunningExecutionsOfAFlowError + FetchRunningExecutionsOfAFlowError, + PauseAFlowExecutionError, ) @@ -50,7 +51,7 @@ def __init__(self): self.__session_id = None def __validate_host(self, host): - """ PRIVATE + """PRIVATE Receives a host and when the host ends with '/', will we return a host without the '/'. :param host: :return: host: @@ -58,13 +59,13 @@ def __validate_host(self, host): """ valid_host = host - while valid_host.endswith(u'/'): + while valid_host.endswith(u"/"): valid_host = valid_host[:-1] return valid_host def __check_if_logged(self): - """ PRIVATE + """PRIVATE Checks if the instance created has a valid session. :raise: NotLoggedOnError when __session_id not exists. """ @@ -72,45 +73,44 @@ def __check_if_logged(self): raise NotLoggedOnError() def __catch_login_html(self, response): - """ PRIVATE + """PRIVATE Checks the content in the verification is in at least one line of the response. :raise: SessionError when content not in response lines. """ - if " " in \ - response.text.splitlines(): + if ' ' in response.text.splitlines(): raise SessionError(response.text) def __catch_response_status_error(self, exception, response_json): - """ PRIVATE + """PRIVATE Verify error in response, catch response status. :raise: exception(error_msg), when error exists and status equals a error, with the 'error_msg'. """ - response_status = response_json.get('status') - if response_status == u'error': - error_msg = response_json[u'message'] + response_status = response_json.get("status") + if response_status == u"error": + error_msg = response_json[u"message"] raise exception(error_msg) def __catch_response_error_msg(self, exception, response_json): - """ PRIVATE + """PRIVATE Catches the error message when 'error' exists in the response keys. :raise: SessionError, when error_msg equals a 'sessions', or exception(error_msg). """ - if u'error' in response_json.keys(): - error_msg = response_json[u'error'] + if u"error" in response_json.keys(): + error_msg = response_json[u"error"] if error_msg == "session": raise SessionError(error_msg) raise exception(error_msg) def __catch_empty_response(self, exception, response_json): - """ PRIVATE + """PRIVATE Does not allow an empty response. :raise: exception """ if response_json == {}: - raise exception('Empty response') + raise exception("Empty response") def __catch_login_text(self, response): - """ PRIVATE + """PRIVATE Do not allow an empty login attempt. :raise: SessionError("Login error. Need username and password") """ @@ -118,14 +118,14 @@ def __catch_login_text(self, response): raise SessionError(response.text) def __catch_login(self, response): - """ PRIVATE + """PRIVATE Private method to call login_text and login_html from response. """ self.__catch_login_text(response) self.__catch_login_html(response) def __catch_response_error(self, response, exception, ignore_empty_responses=False): - """ PRIVATE + """PRIVATE Try to get the answer json. If an error occurs, define response_json as an empty json, send it together with the input to the error functions. """ @@ -153,11 +153,7 @@ def get_logged_session(self): :rtype: dict """ - logged_session = { - u'host': self.__host, - u'user': self.__user, - u'session_id': self.__session_id - } + logged_session = {u"host": self.__host, u"user": self.__user, u"session_id": self.__session_id} return logged_session @@ -201,9 +197,9 @@ def login(self, host, user, password): self.__catch_response_error(response, LoginError) response_json = response.json() - self.set_logged_session(valid_host, user, response_json['session.id']) + self.set_logged_session(valid_host, user, response_json["session.id"]) - logging.info('Logged as %s' % (user)) + logging.info("Logged as %s" % (user)) def upload(self, path, project=None, zip_name=None): """ @@ -235,7 +231,7 @@ def upload(self, path, project=None, zip_name=None): zip_name = project try: - zip_path = make_archive(zip_name, 'zip', path) + zip_path = make_archive(zip_name, "zip", path) except FileNotFoundError as e: raise UploadError(str(e)) @@ -247,7 +243,7 @@ def upload(self, path, project=None, zip_name=None): self.__catch_response_error(response, UploadError) response_json = response.json() - logging.info('Project %s updated to version %s' % (project, response_json[u'version'])) + logging.info("Project %s updated to version %s" % (project, response_json[u"version"])) def schedule(self, project, flow, cron, **execution_options): """ @@ -270,20 +266,14 @@ def schedule(self, project, flow, cron, **execution_options): execution_options = {k: v for (k, v) in execution_options.items() if v} response = api.schedule_request( - self.__session, - self.__host, - self.__session_id, - project, - flow, - cron, - **execution_options + self.__session, self.__host, self.__session_id, project, flow, cron, **execution_options ) self.__catch_response_error(response, ScheduleError) response_json = response.json() - logging.info(response_json[u'message']) - logging.info('scheduleId: %s' % (response_json[u'scheduleId'])) + logging.info(response_json[u"message"]) + logging.info("scheduleId: %s" % (response_json[u"scheduleId"])) def fetch_flows(self, project): """ @@ -301,17 +291,12 @@ def fetch_flows(self, project): self.__check_if_logged() - response = api.fetch_flows_request( - self.__session, - self.__host, - self.__session_id, - project - ) + response = api.fetch_flows_request(self.__session, self.__host, self.__session_id, project) self.__catch_response_error(response, FetchFlowsError) response_json = response.json() - logging.info('Project ID: %s' % (response_json[u'projectId'])) + logging.info("Project ID: %s" % (response_json[u"projectId"])) return response_json def fetch_jobs_from_flow(self, project, flow): @@ -331,13 +316,7 @@ def fetch_jobs_from_flow(self, project, flow): self.__check_if_logged() - response = api.fetch_jobs_from_flow_request( - self.__session, - self.__host, - self.__session_id, - project, - flow - ) + response = api.fetch_jobs_from_flow_request(self.__session, self.__host, self.__session_id, project, flow) self.__catch_response_error(response, FetchJobsFromFlowError) @@ -360,18 +339,12 @@ def fetch_schedule(self, project_id, flow): self.__check_if_logged() - response = api.fetch_schedule_request( - self.__session, - self.__host, - self.__session_id, - project_id, - flow - ) + response = api.fetch_schedule_request(self.__session, self.__host, self.__session_id, project_id, flow) self.__catch_response_error(response, FetchScheduleError) response_json = response.json() - logging.info('Schedule ID: %s' % (response_json[u'schedule'][u'scheduleId'])) + logging.info("Schedule ID: %s" % (response_json[u"schedule"][u"scheduleId"])) return response_json def unschedule(self, schedule_id): @@ -390,17 +363,12 @@ def unschedule(self, schedule_id): self.__check_if_logged() - response = api.unschedule_request( - self.__session, - self.__host, - self.__session_id, - schedule_id - ) + response = api.unschedule_request(self.__session, self.__host, self.__session_id, schedule_id) self.__catch_response_error(response, UnscheduleError) response_json = response.json() - logging.info(response_json[u'message']) + logging.info(response_json[u"message"]) def execute(self, project, flow, **execution_options): """ @@ -421,18 +389,13 @@ def execute(self, project, flow, **execution_options): execution_options = {k: v for (k, v) in execution_options.items() if v} response = api.execute_request( - self.__session, - self.__host, - self.__session_id, - project, - flow, - **execution_options + self.__session, self.__host, self.__session_id, project, flow, **execution_options ) self.__catch_response_error(response, ExecuteError) response_json = response.json() - logging.info('%s' % (response_json[u'message'])) + logging.info("%s" % (response_json[u"message"])) def cancel(self, execution_id): """ @@ -471,17 +434,11 @@ def create(self, project, description): self.__check_if_logged() - response = api.create_request( - self.__session, - self.__host, - self.__session_id, - project, - description - ) + response = api.create_request(self.__session, self.__host, self.__session_id, project, description) self.__catch_response_error(response, CreateError) - logging.info('Project %s created successfully' % (project)) + logging.info("Project %s created successfully" % (project)) def delete(self, project): """ @@ -495,12 +452,7 @@ def delete(self, project): self.__check_if_logged() - api.delete_request( - self.__session, - self.__host, - self.__session_id, - project - ) + api.delete_request(self.__session, self.__host, self.__session_id, project) # The delete request does not return any message @@ -512,11 +464,7 @@ def fetch_projects(self): self.__check_if_logged() - response = api.fetch_projects_request( - self.__session, - self.__host, - self.__session_id - ) + response = api.fetch_projects_request(self.__session, self.__host, self.__session_id) # The fetch projects request returns an html content, so we only catch login errors self.__catch_login(response) @@ -540,17 +488,15 @@ def add_permission(self, project, group, permission_options): permission_options = self.__check_group_permissions(permission_options) response = api.add_permission_request( - self.__session, - self.__host, - self.__session_id, - project, - group, - permission_options + self.__session, self.__host, self.__session_id, project, group, permission_options ) self.__catch_response_error(response, AddPermissionError, True) - logging.info('Group [%s] add with permission [%s] in the Project [%s] successfully' % (group, permission_options, project)) + logging.info( + "Group [%s] add with permission [%s] in the Project [%s] successfully" + % (group, permission_options, project) + ) def remove_permission(self, project, group): """ @@ -565,17 +511,11 @@ def remove_permission(self, project, group): self.__check_if_logged() - response = api.remove_permission_request( - self.__session, - self.__host, - self.__session_id, - project, - group - ) + response = api.remove_permission_request(self.__session, self.__host, self.__session_id, project, group) self.__catch_response_error(response, RemovePermissionError, True) - logging.info('Group [%s] permission removed from the Project [%s] successfully' % (group, project)) + logging.info("Group [%s] permission removed from the Project [%s] successfully" % (group, project)) def change_permission(self, project, group, permission_options): """ @@ -594,17 +534,15 @@ def change_permission(self, project, group, permission_options): permission_options = self.__check_group_permissions(permission_options) response = api.change_permission_request( - self.__session, - self.__host, - self.__session_id, - project, - group, - permission_options + self.__session, self.__host, self.__session_id, project, group, permission_options ) self.__catch_response_error(response, ChangePermissionError, True) - logging.info('Group [%s] AAA received new permissions [%s] in the Project [%s] successfully' % (group, permission_options, project)) + logging.info( + "Group [%s] AAA received new permissions [%s] in the Project [%s] successfully" + % (group, permission_options, project) + ) def fetch_sla(self, schedule_id): """ @@ -616,12 +554,7 @@ def fetch_sla(self, schedule_id): self.__check_if_logged() - response = api.fetch_sla_request( - self.__session, - self.__host, - self.__session_id, - schedule_id - ) + response = api.fetch_sla_request(self.__session, self.__host, self.__session_id, schedule_id) self.__catch_response_error(response, FetchSLAError) @@ -648,18 +581,19 @@ def __check_group_permissions(self, permission_options): option: permission_options[option] if option in permission_options else False for option in __options } - have_declared_options = \ - filled_permission_options['admin'] and \ - filled_permission_options['read'] and \ - filled_permission_options['write'] and \ - filled_permission_options['execute'] and \ - filled_permission_options['schedule'] + have_declared_options = ( + filled_permission_options["admin"] + and filled_permission_options["read"] + and filled_permission_options["write"] + and filled_permission_options["execute"] + and filled_permission_options["schedule"] + ) - if filled_permission_options['admin']: + if filled_permission_options["admin"]: filled_permission_options = {option: True for option in filled_permission_options} elif not have_declared_options: - filled_permission_options['read'] = True + filled_permission_options["read"] = True return filled_permission_options @@ -679,12 +613,7 @@ def fetch_flow_execution(self, execution_id): self.__check_if_logged() - response = api.fetch_flow_execution_request( - self.__session, - self.__host, - self.__session_id, - execution_id - ) + response = api.fetch_flow_execution_request(self.__session, self.__host, self.__session_id, execution_id) self.__catch_response_error(response, FetchFlowExecutionError) @@ -711,11 +640,7 @@ def fetch_flow_execution_updates(self, execution_id, last_update_time): self.__check_if_logged() response = api.fetch_flow_execution_updates_request( - self.__session, - self.__host, - self.__session_id, - execution_id, - last_update_time + self.__session, self.__host, self.__session_id, execution_id, last_update_time ) self.__catch_response_error(response, FetchFlowExecutionUpdatesError) @@ -743,12 +668,7 @@ def fetch_executions_of_a_flow(self, project, flow, start, length): self.__check_if_logged() response = api.fetch_executions_of_a_flow_request( - self.__session, - self.__session_id, - project, - flow, - start, - length + self.__session, self.__session_id, project, flow, start, length ) self.__catch_response_error(response, FetchExecutionsOfAFlowError) @@ -778,13 +698,7 @@ def fetch_execution_job_log(self, execution_id, jobid, offset, length): self.__check_if_logged() response = api.fetch_execution_job_log_request( - self.__session, - self.__host, - self.__session_id, - execution_id, - jobid, - offset, - length + self.__session, self.__host, self.__session_id, execution_id, jobid, offset, length ) self.__catch_response_error(response, FetchExecutionJobsLogError) @@ -801,12 +715,7 @@ def resume_flow_execution(self, execution_id): """ self.__check_if_logged() - response = api.resume_flow_execution( - self.__session, - self.__host, - self.__session_id, - execution_id - ) + response = api.resume_flow_execution(self.__session, self.__host, self.__session_id, execution_id) self.__catch_response_error(response, ResumeFlowExecutionError, ignore_empty_responses=True) @@ -840,3 +749,20 @@ def fetch_running_executions_of_a_flow(self, project, flow): self.__catch_response_error(response, FetchRunningExecutionsOfAFlowError) return response.json() + + def pause_a_flow_execution(self, exec_id): + """Pause a flow execution for the Azkaban API + + :param str exec_id: The Execution id + :return: The response from the request made + :rtype: requests.Response + :raises PauseAFlowExecutionError: when Azkaban api returns error in response + """ + + self.__check_if_logged() + + response = api.pause_flow_execution(self.__session, self.__host, self.__session_id, self.__exec_id) + + self.__catch_response_error(response, PauseAFlowExecutionError) + + return response.json() diff --git a/azkaban_cli/azkaban_cli.py b/azkaban_cli/azkaban_cli.py index e4e303f..2db2036 100755 --- a/azkaban_cli/azkaban_cli.py +++ b/azkaban_cli/azkaban_cli.py @@ -33,6 +33,7 @@ FetchExecutionJobsLogError, ResumeFlowExecutionError, FetchRunningExecutionsOfAFlowError, + PauseAFlowExecutionError, ) from azkaban_cli.__version__ import __version__ @@ -160,7 +161,7 @@ def __unschedule(ctx, project, flow): @login_required def __execute(ctx, project, flow, **execution_options): - azkaban = ctx.obj[u'azkaban'] + azkaban = ctx.obj[u"azkaban"] try: azkaban.execute(project, flow, **execution_options) @@ -215,7 +216,7 @@ def __delete(ctx, project): try: schedule = azkaban.fetch_schedule(project_id, flow_name) - except: + except Exception: logging.debug("Schedule not found") schedule = None @@ -519,6 +520,17 @@ def _fetch_running_executions_of_a_flow(ctx, project, flow): logging.error(str(e)) +@login_required +def __pause_a_flow_execution(ctx, exec_id): + azkaban = ctx.obj[u"azkaban"] + + try: + azkaban.pause_flow_execution(exec_id) + logging.info("Flow successfully paused") + except PauseAFlowExecutionError as e: + logging.error(str(e)) + + # ---------------------------------------------------------------------------------------------------------------------- # Interface # ---------------------------------------------------------------------------------------------------------------------- @@ -604,32 +616,75 @@ def unschedule(ctx, project, flow): @click.command(u"execute") @click.pass_context -@click.argument(u'project', type=click.STRING) -@click.argument(u'flow', type=click.STRING) -@click.option(u'--disabled', type=click.STRING, help=u'A list of job names that should be disabled for this execution. Should be formatted as a JSON Array String. Example Values: ["job_name_1", "job_name_2", "job_name_N"]') -@click.option(u'--success-emails', type=click.STRING, help=u'A list of emails to be notified if the execution succeeds. All emails are delimitted with [,|;|\s+]. Example Values: foo@email.com,bar@email.com') -@click.option(u'--failure-emails', type=click.STRING, help=u'A list of emails to be notified if the execution fails. All emails are delimitted with [,|;|\s+]. Example Values: foo@email.com,bar@email.com') -@click.option(u'--success-emails-override/--no-success-emails-override', help=u'Whether uses system default email settings to override successEmails.') -@click.option(u'--failure-emails-override/--no-failure-emails-override', help=u'Whether uses system default email settings to override failureEmails.') -@click.option(u'--notify-failure-first/--no-notify-failure-first', help=u'Whether sends out email notifications as long as the first failure occurs.') -@click.option(u'--notify-failure-last/--no-notify-failure-last', help=u'Whether sends out email notifications as long as the last failure occurs.') -@click.option(u'--failure-action', type=click.STRING, help=u'If a failure occurs, how should the execution behaves. Possible Values: finishCurrent, cancelImmediately, finishPossible') -@click.option(u'--concurrent-option', type=click.STRING, help=u'If you wanna specify concurrent option for scheduling flow. Possible values: ignore, pipeline, skip') -def execute(ctx, project, flow, disabled, success_emails, failure_emails, - success_emails_override, failure_emails_override, notify_failure_first, - notify_failure_last, failure_action, concurrent_option): +@click.argument(u"project", type=click.STRING) +@click.argument(u"flow", type=click.STRING) +@click.option( + u"--disabled", + type=click.STRING, + help=u'A list of job names that should be disabled for this execution. Should be formatted as a JSON Array String. Example Values: ["job_name_1", "job_name_2", "job_name_N"]', +) +@click.option( + u"--success-emails", + type=click.STRING, + help=r"A list of emails to be notified if the execution succeeds. All emails are delimitted with [,|;|\s+]. Example Values: foo@email.com,bar@email.com", +) +@click.option( + u"--failure-emails", + type=click.STRING, + help=r"A list of emails to be notified if the execution fails. All emails are delimitted with [,|;|\s+]. Example Values: foo@email.com,bar@email.com", +) +@click.option( + u"--success-emails-override/--no-success-emails-override", + help=u"Whether uses system default email settings to override successEmails.", +) +@click.option( + u"--failure-emails-override/--no-failure-emails-override", + help=u"Whether uses system default email settings to override failureEmails.", +) +@click.option( + u"--notify-failure-first/--no-notify-failure-first", + help=u"Whether sends out email notifications as long as the first failure occurs.", +) +@click.option( + u"--notify-failure-last/--no-notify-failure-last", + help=u"Whether sends out email notifications as long as the last failure occurs.", +) +@click.option( + u"--failure-action", + type=click.STRING, + help=u"If a failure occurs, how should the execution behaves. Possible Values: finishCurrent, cancelImmediately, finishPossible", +) +@click.option( + u"--concurrent-option", + type=click.STRING, + help=u"If you wanna specify concurrent option for scheduling flow. Possible values: ignore, pipeline, skip", +) +def execute( + ctx, + project, + flow, + disabled, + success_emails, + failure_emails, + success_emails_override, + failure_emails_override, + notify_failure_first, + notify_failure_last, + failure_action, + concurrent_option, +): """Execute a flow from a project""" execution_options = { - 'disabled': disabled, - 'successEmails': success_emails, - 'failureEmails': failure_emails, - 'successEmailsOverride': success_emails_override, - 'failureEmailsOverride': failure_emails_override, - 'notifyFailureFirst': notify_failure_first, - 'notifyFailureLast': notify_failure_last, - 'failureAction': failure_action, - 'concurrentOption': concurrent_option, + "disabled": disabled, + "successEmails": success_emails, + "failureEmails": failure_emails, + "successEmailsOverride": success_emails_override, + "failureEmailsOverride": failure_emails_override, + "notifyFailureFirst": notify_failure_first, + "notifyFailureLast": notify_failure_last, + "failureAction": failure_action, + "concurrentOption": concurrent_option, } __execute(ctx, project, flow, **execution_options) diff --git a/azkaban_cli/exceptions.py b/azkaban_cli/exceptions.py index efb566f..6dd04fd 100755 --- a/azkaban_cli/exceptions.py +++ b/azkaban_cli/exceptions.py @@ -1,70 +1,97 @@ # -*- coding: utf-8 -*- + class NotLoggedOnError(Exception): pass + class SessionError(Exception): pass + class LoginError(Exception): pass + class UploadError(Exception): pass + class ExecuteError(Exception): pass + class CancelError(Exception): pass + class ScheduleError(Exception): pass + class FetchFlowsError(Exception): pass + class FetchScheduleError(Exception): pass + class FetchSLAError(Exception): pass + class UnscheduleError(Exception): pass + class CreateError(Exception): pass + class FetchProjectsError(Exception): pass + class AddPermissionError(Exception): pass + class RemovePermissionError(Exception): pass + class ChangePermissionError(Exception): pass + class FetchFlowExecutionError(Exception): pass + class FetchJobsFromFlowError(Exception): pass + class FetchFlowExecutionUpdatesError(Exception): pass + class FetchExecutionsOfAFlowError(Exception): pass + class FetchExecutionJobsLogError(Exception): pass + class ResumeFlowExecutionError(Exception): pass + class FetchRunningExecutionsOfAFlowError(Exception): pass + + +class PauseAFlowExecutionError(Exception): + pass