From 6f049c6836c9983f2a75badf3dd3658ecf6df6bf Mon Sep 17 00:00:00 2001 From: Robin Van de Merghel Date: Mon, 17 Mar 2025 07:39:26 +0100 Subject: [PATCH 01/21] feat: Adding JWT support alongside X509 auth --- Pilot/pilotTools.py | 40 +++++++++------- Pilot/proxyTools.py | 112 +++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 133 insertions(+), 19 deletions(-) diff --git a/Pilot/pilotTools.py b/Pilot/pilotTools.py index 8afe0f62..9c6b34ec 100644 --- a/Pilot/pilotTools.py +++ b/Pilot/pilotTools.py @@ -18,6 +18,15 @@ from functools import partial, wraps from threading import RLock +try: + from Pilot.proxyTools import ( + X509BasedRequest + ) +except ImportError: + from proxyTools import ( + X509BasedRequest + ) + ############################ # python 2 -> 3 "hacks" try: @@ -701,26 +710,21 @@ def sendMessage(url, pilotUUID, wnVO, method, rawMessage): caPath = os.getenv("X509_CERT_DIR") cert = os.getenv("X509_USER_PROXY") - context = ssl.create_default_context() - context.load_verify_locations(capath=caPath) - + message = json.dumps((json.dumps(rawMessage), pilotUUID, wnVO)) - try: - context.load_cert_chain(cert) # this is a proxy - raw_data = {"method": method, "args": message} - except IsADirectoryError: # assuming it'a dir containing cert and key - context.load_cert_chain(os.path.join(cert, "hostcert.pem"), os.path.join(cert, "hostkey.pem")) - raw_data = {"method": method, "args": message, "extraCredentials": '"hosts"'} - - if sys.version_info.major == 3: - data = urlencode(raw_data).encode("utf-8") # encode to bytes ! for python3 - else: - # Python2 - data = urlencode(raw_data) - - res = urlopen(url, data, context=context) - res.close() + raw_data = {"method": method, "args": message} + + + X509config = X509BasedRequest(url=url, + caPath=caPath, + certEnv=cert) + + # Do the request + _res = X509config.executeRequest(raw_data=raw_data, + headers={ + "User-Agent": X509config.generateUserAgent(pilotUUID=pilotUUID) + }) class CommandBase(object): diff --git a/Pilot/proxyTools.py b/Pilot/proxyTools.py index a5fa652e..5a2a2a81 100644 --- a/Pilot/proxyTools.py +++ b/Pilot/proxyTools.py @@ -1,10 +1,27 @@ -"""few functions for dealing with proxies""" +"""few functions for dealing with proxies and authentication""" from __future__ import absolute_import, division, print_function import re from base64 import b16decode from subprocess import PIPE, Popen +import ssl +import sys +import os + +try: + IsADirectoryError # pylint: disable=used-before-assignment +except NameError: + IsADirectoryError = OSError + +try: + from urllib.parse import urlencode + from urllib.request import Request, urlopen +except ImportError: + from urllib import urlencode + + from urllib2 import urlopen + VOMS_FQANS_OID = b"1.3.6.1.4.1.8005.100.100.4" VOMS_EXTENSION_OID = b"1.3.6.1.4.1.8005.100.100.5" @@ -65,3 +82,96 @@ def getVO(proxy_data): if match: return match.groups()[0].decode() raise NotImplementedError("Something went very wrong") + + +class BaseConnectedRequest(object): + """This class helps supporting multiple kinds of requests that requires connections""" + + def __init__(self, url, caPath, name="unknown"): + self.name = name + self.url = url + self.caPath = caPath + # We assume we have only one context, so this variable could be shared to avoid opening n times a cert. + # On the contrary, to avoid race conditions, we do avoid using "self.data" and "self.headers" + self._context = None + + self._prepareRequest() + + def generateUserAgent(self, pilotUUID): + """To analyse the traffic, we can send a taylor-made User-Agent + + Args: + pilotUUID (str): Unique ID of the Pilot + + Returns: + str: The generated user agent + """ + return "Dirac Pilot [%s]" % pilotUUID + + def _prepareRequest(self): + """As previously, loads the SSL certificates of the server (to avoid "unknown issuer")""" + # Load the SSL context + self._context = ssl.create_default_context() + self._context.load_verify_locations(capath=self.caPath) + + def executeRequest(self, raw_data, headers={"User-Agent": "Dirac Pilot [Unknown ID]"}): + """Execute a HTTP request with the data, headers, and the pre-defined data (SSL + auth) + + Args: + raw_data (dict): Data to send + headers (dict, optional): Headers to send, helps to track requests. Defaults to {"User-Agent": "Dirac Pilot [Unknown ID]"}. + + Returns: + str: Response of the HTTP request + """ + if sys.version_info[0] == 3: + data = urlencode(raw_data).encode("utf-8") # encode to bytes ! for python3 + else: + # Python2 + data = urlencode(raw_data) + + request = Request(self.url, data=data, headers=headers) + + res = urlopen(request, context=self._context) + res.close() + + return res.read() + + +class TokenBasedRequest(BaseConnectedRequest): + """Connected Request with JWT support""" + + def __init__(self, url, caPath, jwtData): + super(TokenBasedRequest, self).__init__(url, caPath, "TokenBasedConnection") + + self.jwtData = jwtData + + def executeRequest(self, raw_data, headers={"User-Agent": "Dirac Pilot [Unknown ID]"}): + # Adds the JWT in the HTTP request (in the Bearer field) + headers["Bearer"] = self.jwtData + return super(TokenBasedRequest, self).executeRequest(raw_data, headers) + + +class X509BasedRequest(BaseConnectedRequest): + """Connected Request with X509 support""" + + def __init__(self, url, caPath, certEnv): + super(X509BasedRequest, self).__init__(url, caPath, "X509BasedConnection") + + self.certEnv = certEnv + self._hasExtraCredentials = False + + # Load X509 once + try: + self._context.load_cert_chain(self.certEnv) + except IsADirectoryError: # assuming it'a dir containing cert and key + self._context.load_cert_chain( + os.path.join(self.certEnv, "hostcert.pem"), os.path.join(self.certEnv, "hostkey.pem") + ) + self._hasExtraCredentials = True + + def executeRequest(self, raw_data, headers={"User-Agent": "Dirac Pilot [Unknown ID]"}): + # Adds a flag if the passed cert is a Directory + if self._hasExtraCredentials: + raw_data["extraCredentials"] = '"hosts"' + return super(X509BasedRequest, self).executeRequest(raw_data, headers) From bad13321b69ccb8e3fd23abbfcf4183cd62bd70b Mon Sep 17 00:00:00 2001 From: Robin Van de Merghel Date: Wed, 19 Mar 2025 11:12:12 +0100 Subject: [PATCH 02/21] fix: Isort fixed, authorization header and doc --- Pilot/pilotTools.py | 13 ++----------- Pilot/proxyTools.py | 43 +++++++++++++++++++------------------------ 2 files changed, 21 insertions(+), 35 deletions(-) diff --git a/Pilot/pilotTools.py b/Pilot/pilotTools.py index 9c6b34ec..30bdb060 100644 --- a/Pilot/pilotTools.py +++ b/Pilot/pilotTools.py @@ -18,15 +18,6 @@ from functools import partial, wraps from threading import RLock -try: - from Pilot.proxyTools import ( - X509BasedRequest - ) -except ImportError: - from proxyTools import ( - X509BasedRequest - ) - ############################ # python 2 -> 3 "hacks" try: @@ -78,9 +69,9 @@ def load_module_from_path(module_name, path_to_module): basestring = str try: - from Pilot.proxyTools import getVO + from Pilot.proxyTools import X509BasedRequest, getVO except ImportError: - from proxyTools import getVO + from proxyTools import X509BasedRequest, getVO try: FileNotFoundError # pylint: disable=used-before-assignment diff --git a/Pilot/proxyTools.py b/Pilot/proxyTools.py index 5a2a2a81..d9eec0f5 100644 --- a/Pilot/proxyTools.py +++ b/Pilot/proxyTools.py @@ -2,12 +2,12 @@ from __future__ import absolute_import, division, print_function +import os import re -from base64 import b16decode -from subprocess import PIPE, Popen import ssl import sys -import os +from base64 import b16decode +from subprocess import PIPE, Popen try: IsADirectoryError # pylint: disable=used-before-assignment @@ -47,15 +47,10 @@ def findExtension(oid, lines): def getVO(proxy_data): """Fetches the VO in a chain certificate - Args: - proxy_data (bytes): Bytes for the proxy chain - - Raises: - Exception: Any error related to openssl - NotImplementedError: Not documented error - - Returns: - str: A VO + :param proxy_data: Bytes for the proxy chain + :type proxy_data: bytes + :return: A VO + :rtype: str """ chain = re.findall(br"-----BEGIN CERTIFICATE-----\n.+?\n-----END CERTIFICATE-----", proxy_data, flags=re.DOTALL) @@ -100,11 +95,11 @@ def __init__(self, url, caPath, name="unknown"): def generateUserAgent(self, pilotUUID): """To analyse the traffic, we can send a taylor-made User-Agent - Args: - pilotUUID (str): Unique ID of the Pilot - - Returns: - str: The generated user agent + :param pilotUUID: Unique ID of the Pilot + :type pilotUUID: str + :type param_name: param_type + :return: The generated user agent + :rtype: str """ return "Dirac Pilot [%s]" % pilotUUID @@ -117,12 +112,12 @@ def _prepareRequest(self): def executeRequest(self, raw_data, headers={"User-Agent": "Dirac Pilot [Unknown ID]"}): """Execute a HTTP request with the data, headers, and the pre-defined data (SSL + auth) - Args: - raw_data (dict): Data to send - headers (dict, optional): Headers to send, helps to track requests. Defaults to {"User-Agent": "Dirac Pilot [Unknown ID]"}. - - Returns: - str: Response of the HTTP request + :param raw_data: Data to send + :type raw_data: dict + :param headers: Headers to send, helps to track requests. Defaults to {"User-Agent": "Dirac Pilot [Unknown ID]"}. + :type headers: dict, optional + :return: Response of the HTTP request + :rtype: str """ if sys.version_info[0] == 3: data = urlencode(raw_data).encode("utf-8") # encode to bytes ! for python3 @@ -148,7 +143,7 @@ def __init__(self, url, caPath, jwtData): def executeRequest(self, raw_data, headers={"User-Agent": "Dirac Pilot [Unknown ID]"}): # Adds the JWT in the HTTP request (in the Bearer field) - headers["Bearer"] = self.jwtData + headers["Authorization"] = "Bearer: %s" % self.jwtData return super(TokenBasedRequest, self).executeRequest(raw_data, headers) From 2843494b9c564eb4c9db9ea0fbf18672bf8061dd Mon Sep 17 00:00:00 2001 From: Robin Van de Merghel Date: Wed, 19 Mar 2025 11:24:30 +0100 Subject: [PATCH 03/21] refactor: Changing the request header to a private field --- Pilot/pilotTools.py | 7 +++---- Pilot/proxyTools.py | 23 +++++++++++------------ 2 files changed, 14 insertions(+), 16 deletions(-) diff --git a/Pilot/pilotTools.py b/Pilot/pilotTools.py index 30bdb060..2d04222c 100644 --- a/Pilot/pilotTools.py +++ b/Pilot/pilotTools.py @@ -711,11 +711,10 @@ def sendMessage(url, pilotUUID, wnVO, method, rawMessage): caPath=caPath, certEnv=cert) + X509config.generateUserAgent(pilotUUID=pilotUUID) + # Do the request - _res = X509config.executeRequest(raw_data=raw_data, - headers={ - "User-Agent": X509config.generateUserAgent(pilotUUID=pilotUUID) - }) + _res = X509config.executeRequest(raw_data=raw_data) class CommandBase(object): diff --git a/Pilot/proxyTools.py b/Pilot/proxyTools.py index d9eec0f5..43e1c697 100644 --- a/Pilot/proxyTools.py +++ b/Pilot/proxyTools.py @@ -86,6 +86,9 @@ def __init__(self, url, caPath, name="unknown"): self.name = name self.url = url self.caPath = caPath + self.headers = { + "User-Agent": "Dirac Pilot [Unknown ID]" + } # We assume we have only one context, so this variable could be shared to avoid opening n times a cert. # On the contrary, to avoid race conditions, we do avoid using "self.data" and "self.headers" self._context = None @@ -97,11 +100,8 @@ def generateUserAgent(self, pilotUUID): :param pilotUUID: Unique ID of the Pilot :type pilotUUID: str - :type param_name: param_type - :return: The generated user agent - :rtype: str """ - return "Dirac Pilot [%s]" % pilotUUID + self.headers["User-Agent"] = "Dirac Pilot [%s]" % pilotUUID def _prepareRequest(self): """As previously, loads the SSL certificates of the server (to avoid "unknown issuer")""" @@ -109,7 +109,7 @@ def _prepareRequest(self): self._context = ssl.create_default_context() self._context.load_verify_locations(capath=self.caPath) - def executeRequest(self, raw_data, headers={"User-Agent": "Dirac Pilot [Unknown ID]"}): + def executeRequest(self, raw_data): """Execute a HTTP request with the data, headers, and the pre-defined data (SSL + auth) :param raw_data: Data to send @@ -125,7 +125,7 @@ def executeRequest(self, raw_data, headers={"User-Agent": "Dirac Pilot [Unknown # Python2 data = urlencode(raw_data) - request = Request(self.url, data=data, headers=headers) + request = Request(self.url, data=data, headers=self.headers) res = urlopen(request, context=self._context) res.close() @@ -140,11 +140,10 @@ def __init__(self, url, caPath, jwtData): super(TokenBasedRequest, self).__init__(url, caPath, "TokenBasedConnection") self.jwtData = jwtData - - def executeRequest(self, raw_data, headers={"User-Agent": "Dirac Pilot [Unknown ID]"}): + + def addJwtToHeader(self): # Adds the JWT in the HTTP request (in the Bearer field) - headers["Authorization"] = "Bearer: %s" % self.jwtData - return super(TokenBasedRequest, self).executeRequest(raw_data, headers) + self.headers["Authorization"] = "Bearer: %s" % self.jwtData class X509BasedRequest(BaseConnectedRequest): @@ -165,8 +164,8 @@ def __init__(self, url, caPath, certEnv): ) self._hasExtraCredentials = True - def executeRequest(self, raw_data, headers={"User-Agent": "Dirac Pilot [Unknown ID]"}): + def executeRequest(self, raw_data): # Adds a flag if the passed cert is a Directory if self._hasExtraCredentials: raw_data["extraCredentials"] = '"hosts"' - return super(X509BasedRequest, self).executeRequest(raw_data, headers) + return super(X509BasedRequest, self).executeRequest(raw_data) From 8ba74c8ae14dd30ebda845f4cfec4bd39941eb8c Mon Sep 17 00:00:00 2001 From: Robin Van de Merghel Date: Wed, 26 Mar 2025 18:09:26 +0100 Subject: [PATCH 04/21] feat: Adding a choice in sendMessage to allow JWT --- Pilot/pilotTools.py | 39 +++++++++++++++++++++++++++------------ 1 file changed, 27 insertions(+), 12 deletions(-) diff --git a/Pilot/pilotTools.py b/Pilot/pilotTools.py index 2d04222c..419819a8 100644 --- a/Pilot/pilotTools.py +++ b/Pilot/pilotTools.py @@ -69,9 +69,9 @@ def load_module_from_path(module_name, path_to_module): basestring = str try: - from Pilot.proxyTools import X509BasedRequest, getVO + from Pilot.proxyTools import X509BasedRequest, getVO, TokenBasedRequest except ImportError: - from proxyTools import X509BasedRequest, getVO + from proxyTools import X509BasedRequest, getVO, TokenBasedRequest try: FileNotFoundError # pylint: disable=used-before-assignment @@ -687,7 +687,7 @@ def cancelTimer(self): self._timer.cancel() -def sendMessage(url, pilotUUID, wnVO, method, rawMessage): +def sendMessage(url, pilotUUID, wnVO, method, rawMessage, withJWT=False): """ Invoke a remote method on a Tornado server and pass a JSON message to it. @@ -696,25 +696,40 @@ def sendMessage(url, pilotUUID, wnVO, method, rawMessage): :param str wnVO: VO name, relevant only if not contained in a proxy :param str method: a method to be invoked :param str rawMessage: a message to be sent, in JSON format + :param bool withJWT: tells if we use or not JWT :return: None. """ - caPath = os.getenv("X509_CERT_DIR") - cert = os.getenv("X509_USER_PROXY") - - message = json.dumps((json.dumps(rawMessage), pilotUUID, wnVO)) + caPath = os.getenv("X509_CERT_DIR") + message = json.dumps((json.dumps(rawMessage), pilotUUID, wnVO)) raw_data = {"method": method, "args": message} + config = None + + if withJWT: + jwtData = os.getenv("JWT_TOKEN") + + config = TokenBasedRequest( + url=url, + caPath=caPath, + jwtData=jwtData + ) + + else: + cert = os.getenv("X509_USER_PROXY") - X509config = X509BasedRequest(url=url, - caPath=caPath, - certEnv=cert) + config = X509BasedRequest( + url=url, + caPath=caPath, + certEnv=cert + ) - X509config.generateUserAgent(pilotUUID=pilotUUID) + # Config the header, will help debugging + config.generateUserAgent(pilotUUID=pilotUUID) # Do the request - _res = X509config.executeRequest(raw_data=raw_data) + _res = config.executeRequest(raw_data=raw_data) class CommandBase(object): From 5bc40fde39e7cb5b9c6353598ce4346fb239a5b6 Mon Sep 17 00:00:00 2001 From: Robin Van de Merghel Date: Tue, 1 Apr 2025 15:20:51 +0200 Subject: [PATCH 05/21] feat: Adding DiracX config and fetching JWT --- Pilot/pilotTools.py | 49 ++++++++++++++++++++++++++++++++++++++++++--- Pilot/proxyTools.py | 40 +++++++++++++++++++++++++----------- 2 files changed, 74 insertions(+), 15 deletions(-) diff --git a/Pilot/pilotTools.py b/Pilot/pilotTools.py index 419819a8..8ba4c12d 100644 --- a/Pilot/pilotTools.py +++ b/Pilot/pilotTools.py @@ -69,9 +69,9 @@ def load_module_from_path(module_name, path_to_module): basestring = str try: - from Pilot.proxyTools import X509BasedRequest, getVO, TokenBasedRequest + from Pilot.proxyTools import X509BasedRequest, getVO, TokenBasedRequest, BaseRequest except ImportError: - from proxyTools import X509BasedRequest, getVO, TokenBasedRequest + from proxyTools import X509BasedRequest, getVO, TokenBasedRequest, BaseRequest try: FileNotFoundError # pylint: disable=used-before-assignment @@ -732,6 +732,25 @@ def sendMessage(url, pilotUUID, wnVO, method, rawMessage, withJWT=False): _res = config.executeRequest(raw_data=raw_data) + +def retrieveJWT(diracXURL, pilotUUID, pilotSecret): + + caPath = os.getenv("X509_CERT_DIR") + + data_in_url = "?pilot_job_reference=%s&pilot_secret=%s" % (pilotUUID, pilotSecret) + + config = BaseRequest( + "%s/api/auth/pilot-login%s" % (diracXURL, data_in_url), + caPath=caPath + ) + + config.addHeader("Content-Type", "application/json") + + config.generateUserAgent(pilotUUID=pilotUUID) + + # TODO: Add new cert for DiracX (different from Dirac) + return config.executeRequest(raw_data={}, insecure=True) + class CommandBase(object): """CommandBase is the base class for every command in the pilot commands toolbox""" @@ -917,10 +936,16 @@ def __init__(self): self.site = "" self.setup = "" self.configServer = "" + self.diracXServer = "" self.ceName = "" self.ceType = "" self.queueName = "" self.gridCEType = "" + self.pilotSecret = "" + self.jwt = { + "access_token": "", + "refresh_token": "" + } # maxNumberOfProcessors: the number of # processors allocated to the pilot which the pilot can allocate to one payload # used to set payloadProcessors unless other limits are reached (like the number of processors on the WN) @@ -1005,6 +1030,7 @@ def __init__(self): ("y:", "CEType=", "CE Type (normally InProcess)"), ("z", "pilotLogging", "Activate pilot logging system"), ("C:", "configurationServer=", "Configuration servers to use"), + ("", "diracXServer=", "DiracX server to use"), ("D:", "disk=", "Require at least MB available"), ("E:", "commandExtensions=", "Python modules with extra commands"), ("F:", "pilotCFGFile=", "Specify pilot CFG file"), @@ -1030,6 +1056,7 @@ def __init__(self): ("", "preinstalledEnvPrefix=", "preinstalled pilot environment area prefix"), ("", "architectureScript=", "architecture script to use"), ("", "CVMFS_locations=", "comma-separated list of CVMS locations"), + ("", "pilotSecret=", "secret that the pilot uses with DiracX"), ) # Possibly get Setup and JSON URL/filename from command line @@ -1055,7 +1082,9 @@ def __init__(self): if self.useServerCertificate: self.installEnv["X509_USER_PROXY"] = self.certsLocation os.environ["X509_USER_PROXY"] = self.certsLocation - + + self.__retrieveIfNeededJWT() + def __setSecurityDir(self, envName, dirLocation): """Set the environment variable of the `envName`, and add it also to the Pilot Parameters @@ -1110,6 +1139,16 @@ def __checkSecurityDir(self, envName, dirName): self.log.error("Could not find/set %s" % envName) sys.exit(1) + def __retrieveIfNeededJWT(self): + + if self.diracXServer != "": + if self.pilotSecret == "": + raise ValueError("PilotSecret has to be defined") + if self.pilotUUID == "": + raise ValueError("PilotUUID has to be defined") + self.jwt = retrieveJWT(self.diracXServer, self.pilotUUID, self.pilotSecret) + self.log.debug("Retrieved JWT from DiracX") + def __initCommandLine1(self): """Parses and interpret options on the command line: first pass (essential things)""" @@ -1160,6 +1199,8 @@ def __initCommandLine2(self): self.keepPythonPath = True elif o in ("-C", "--configurationServer"): self.configServer = v + elif o == "--diracXServer": + self.diracXServer = v elif o in ("-G", "--Group"): self.userGroup = v elif o in ("-x", "--execute"): @@ -1233,6 +1274,8 @@ def __initCommandLine2(self): self.architectureScript = v elif o == "--CVMFS_locations": self.CVMFS_locations = v.split(",") + elif o == "--pilotSecret": + self.pilotSecret = v def __loadJSON(self): """ diff --git a/Pilot/proxyTools.py b/Pilot/proxyTools.py index 43e1c697..e54c1b9d 100644 --- a/Pilot/proxyTools.py +++ b/Pilot/proxyTools.py @@ -2,6 +2,7 @@ from __future__ import absolute_import, division, print_function +import json import os import re import ssl @@ -79,7 +80,7 @@ def getVO(proxy_data): raise NotImplementedError("Something went very wrong") -class BaseConnectedRequest(object): +class BaseRequest(object): """This class helps supporting multiple kinds of requests that requires connections""" def __init__(self, url, caPath, name="unknown"): @@ -108,17 +109,21 @@ def _prepareRequest(self): # Load the SSL context self._context = ssl.create_default_context() self._context.load_verify_locations(capath=self.caPath) + + def addHeader(self, key, value): + """Add a header (key, value) into the request header""" + self.headers[key] = value - def executeRequest(self, raw_data): + def executeRequest(self, raw_data, insecure=False): """Execute a HTTP request with the data, headers, and the pre-defined data (SSL + auth) :param raw_data: Data to send :type raw_data: dict - :param headers: Headers to send, helps to track requests. Defaults to {"User-Agent": "Dirac Pilot [Unknown ID]"}. - :type headers: dict, optional - :return: Response of the HTTP request - :rtype: str - """ + :param insecure: Deactivate proxy verification /!\ Debug ONLY + :type insecure: bool + :return: Parsed JSON response + :rtype: dict + """ if sys.version_info[0] == 3: data = urlencode(raw_data).encode("utf-8") # encode to bytes ! for python3 else: @@ -127,13 +132,24 @@ def executeRequest(self, raw_data): request = Request(self.url, data=data, headers=self.headers) - res = urlopen(request, context=self._context) - res.close() + ctx = self._context # Save in case of an insecure request + + if insecure: + # DEBUG ONLY + # Overrides context + ctx = ssl.create_default_context() + ctx.check_hostname = False + ctx.verify_mode = ssl.CERT_NONE - return res.read() + with urlopen(request, context=ctx) as res: + response_data = res.read().decode("utf-8") # Decode response bytes + try: + return json.loads(response_data) # Parse JSON response + except json.JSONDecodeError: + raise Exception("Invalid JSON response: %s" % response_data) -class TokenBasedRequest(BaseConnectedRequest): +class TokenBasedRequest(BaseRequest): """Connected Request with JWT support""" def __init__(self, url, caPath, jwtData): @@ -146,7 +162,7 @@ def addJwtToHeader(self): self.headers["Authorization"] = "Bearer: %s" % self.jwtData -class X509BasedRequest(BaseConnectedRequest): +class X509BasedRequest(BaseRequest): """Connected Request with X509 support""" def __init__(self, url, caPath, certEnv): From 1ecb0b6ede481e4b308443a87466029b67a32347 Mon Sep 17 00:00:00 2001 From: Robin Van de Merghel Date: Tue, 1 Apr 2025 16:48:30 +0200 Subject: [PATCH 06/21] fix: Avoid backslash space --- Pilot/proxyTools.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Pilot/proxyTools.py b/Pilot/proxyTools.py index e54c1b9d..e5c4a9bd 100644 --- a/Pilot/proxyTools.py +++ b/Pilot/proxyTools.py @@ -119,7 +119,7 @@ def executeRequest(self, raw_data, insecure=False): :param raw_data: Data to send :type raw_data: dict - :param insecure: Deactivate proxy verification /!\ Debug ONLY + :param insecure: Deactivate proxy verification WARNING Debug ONLY :type insecure: bool :return: Parsed JSON response :rtype: dict From bf612eee9d2253b117b09b3ea9d5cb6e77af5119 Mon Sep 17 00:00:00 2001 From: Robin Van de Merghel Date: Wed, 2 Apr 2025 09:51:19 +0200 Subject: [PATCH 07/21] fix: Fix for python2 support --- Pilot/proxyTools.py | 24 +++++++++++++++++------- 1 file changed, 17 insertions(+), 7 deletions(-) diff --git a/Pilot/proxyTools.py b/Pilot/proxyTools.py index e5c4a9bd..4141e709 100644 --- a/Pilot/proxyTools.py +++ b/Pilot/proxyTools.py @@ -13,7 +13,7 @@ try: IsADirectoryError # pylint: disable=used-before-assignment except NameError: - IsADirectoryError = OSError + IsADirectoryError = IOError try: from urllib.parse import urlencode @@ -21,7 +21,7 @@ except ImportError: from urllib import urlencode - from urllib2 import urlopen + from urllib2 import Request, urlopen VOMS_FQANS_OID = b"1.3.6.1.4.1.8005.100.100.4" @@ -141,12 +141,22 @@ def executeRequest(self, raw_data, insecure=False): ctx.check_hostname = False ctx.verify_mode = ssl.CERT_NONE - with urlopen(request, context=ctx) as res: - response_data = res.read().decode("utf-8") # Decode response bytes + if sys.version_info[0] >= 3: + # Python 3 code + with urlopen(request, context=ctx) as res: + response_data = res.read().decode("utf-8") # Decode response bytes + else: + # Python 2 code + res = urlopen(request, context=ctx) try: - return json.loads(response_data) # Parse JSON response - except json.JSONDecodeError: - raise Exception("Invalid JSON response: %s" % response_data) + response_data = res.read().decode("utf-8") # Decode response bytes + finally: + res.close() + + try: + return json.loads(response_data) # Parse JSON response + except ValueError: # In Python 2, json.JSONDecodeError is a subclass of ValueError + raise Exception("Invalid JSON response: %s" % response_data) class TokenBasedRequest(BaseRequest): From 3cb9b686f065d610748910b6b5f31ca33d0ef476 Mon Sep 17 00:00:00 2001 From: Robin Van de Merghel Date: Wed, 2 Apr 2025 15:21:06 +0200 Subject: [PATCH 08/21] fix: Small fixes and python2/3 compatibility --- Pilot/pilotTools.py | 16 ++++++++++------ Pilot/proxyTools.py | 6 +++--- 2 files changed, 13 insertions(+), 9 deletions(-) diff --git a/Pilot/pilotTools.py b/Pilot/pilotTools.py index 8ba4c12d..2da9053d 100644 --- a/Pilot/pilotTools.py +++ b/Pilot/pilotTools.py @@ -687,7 +687,7 @@ def cancelTimer(self): self._timer.cancel() -def sendMessage(url, pilotUUID, wnVO, method, rawMessage, withJWT=False): +def sendMessage(url, pilotUUID, wnVO, method, rawMessage, withJWT=False, jwt={}): """ Invoke a remote method on a Tornado server and pass a JSON message to it. @@ -697,6 +697,7 @@ def sendMessage(url, pilotUUID, wnVO, method, rawMessage, withJWT=False): :param str method: a method to be invoked :param str rawMessage: a message to be sent, in JSON format :param bool withJWT: tells if we use or not JWT + :param dict jwt: JWT for the requests :return: None. """ @@ -708,12 +709,15 @@ def sendMessage(url, pilotUUID, wnVO, method, rawMessage, withJWT=False): config = None if withJWT: - jwtData = os.getenv("JWT_TOKEN") + try: + access_token = jwt["access_token"] + except ValueError as e: + raise ValueError("JWT is needed, with an access_token field") config = TokenBasedRequest( url=url, caPath=caPath, - jwtData=jwtData + jwtData=access_token ) else: @@ -1141,10 +1145,10 @@ def __checkSecurityDir(self, envName, dirName): def __retrieveIfNeededJWT(self): - if self.diracXServer != "": - if self.pilotSecret == "": + if self.diracXServer: + if not self.pilotSecret: raise ValueError("PilotSecret has to be defined") - if self.pilotUUID == "": + if not self.pilotUUID: raise ValueError("PilotUUID has to be defined") self.jwt = retrieveJWT(self.diracXServer, self.pilotUUID, self.pilotSecret) self.log.debug("Retrieved JWT from DiracX") diff --git a/Pilot/proxyTools.py b/Pilot/proxyTools.py index 4141e709..b83e6190 100644 --- a/Pilot/proxyTools.py +++ b/Pilot/proxyTools.py @@ -124,7 +124,7 @@ def executeRequest(self, raw_data, insecure=False): :return: Parsed JSON response :rtype: dict """ - if sys.version_info[0] == 3: + if sys.version_info.major == 3: data = urlencode(raw_data).encode("utf-8") # encode to bytes ! for python3 else: # Python2 @@ -141,7 +141,7 @@ def executeRequest(self, raw_data, insecure=False): ctx.check_hostname = False ctx.verify_mode = ssl.CERT_NONE - if sys.version_info[0] >= 3: + if sys.version_info.major == 3: # Python 3 code with urlopen(request, context=ctx) as res: response_data = res.read().decode("utf-8") # Decode response bytes @@ -149,7 +149,7 @@ def executeRequest(self, raw_data, insecure=False): # Python 2 code res = urlopen(request, context=ctx) try: - response_data = res.read().decode("utf-8") # Decode response bytes + response_data = res.read() finally: res.close() From 672d30ffaa7ef2ae6b04626c2b1a1a7f3d5d5ac9 Mon Sep 17 00:00:00 2001 From: Robin Van de Merghel Date: Tue, 8 Apr 2025 11:24:44 +0200 Subject: [PATCH 09/21] fix: Changing diracX url, and parameter in sendMessage --- Pilot/pilotTools.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/Pilot/pilotTools.py b/Pilot/pilotTools.py index 2da9053d..50369fe4 100644 --- a/Pilot/pilotTools.py +++ b/Pilot/pilotTools.py @@ -687,7 +687,7 @@ def cancelTimer(self): self._timer.cancel() -def sendMessage(url, pilotUUID, wnVO, method, rawMessage, withJWT=False, jwt={}): +def sendMessage(url, pilotUUID, wnVO, method, rawMessage, jwt={}): """ Invoke a remote method on a Tornado server and pass a JSON message to it. @@ -696,7 +696,6 @@ def sendMessage(url, pilotUUID, wnVO, method, rawMessage, withJWT=False, jwt={}) :param str wnVO: VO name, relevant only if not contained in a proxy :param str method: a method to be invoked :param str rawMessage: a message to be sent, in JSON format - :param bool withJWT: tells if we use or not JWT :param dict jwt: JWT for the requests :return: None. """ @@ -708,7 +707,7 @@ def sendMessage(url, pilotUUID, wnVO, method, rawMessage, withJWT=False, jwt={}) config = None - if withJWT: + if jwt: try: access_token = jwt["access_token"] except ValueError as e: @@ -1034,7 +1033,7 @@ def __init__(self): ("y:", "CEType=", "CE Type (normally InProcess)"), ("z", "pilotLogging", "Activate pilot logging system"), ("C:", "configurationServer=", "Configuration servers to use"), - ("", "diracXServer=", "DiracX server to use"), + ("", "diracx_URL=", "DiracX Server URL to use"), ("D:", "disk=", "Require at least MB available"), ("E:", "commandExtensions=", "Python modules with extra commands"), ("F:", "pilotCFGFile=", "Specify pilot CFG file"), From 7eb8ca548db2418fe131f1e2bd8c2a27567471fc Mon Sep 17 00:00:00 2001 From: Robin Van de Merghel Date: Wed, 9 Apr 2025 09:12:38 +0200 Subject: [PATCH 10/21] feat: Changing retrieveJWTIfNeeded by a PilotLogin command --- Pilot/pilotCommands.py | 27 +++++++++++++++++++++++++++ Pilot/pilotTools.py | 12 ------------ 2 files changed, 27 insertions(+), 12 deletions(-) diff --git a/Pilot/pilotCommands.py b/Pilot/pilotCommands.py index 945a6b78..d562b266 100644 --- a/Pilot/pilotCommands.py +++ b/Pilot/pilotCommands.py @@ -572,6 +572,33 @@ def execute(self): self.log.error("Could not get execute dirac-admin-add-pilot [ERROR %d]" % retCode) +class PilotLogin(CommandBase): + """The Pilot logins and fetches his jwt, only compatible with DiracX cli command""" + + def __init__(self, pilotParams): + """c'tor""" + super(PilotLogin, self).__init__(pilotParams) + + @logFinalizer + def execute(self): + """Calls dirac-admin-add-pilot""" + + if not self.pp.pilotReference: + self.log.warn("Skipping module, no pilot reference found") + return + + if not self.pp.pilotSecret: + self.log.warn("Skipping module, no pilot secret found") + return + + checkCmd = "dirac pilot-login %s %s" % ( + self.pp.pilotReference, + self.pp.pilotSecret + ) + retCode, _ = self.executeAndGetOutput(checkCmd, self.pp.installEnv) + if retCode: + self.log.error("Could not get execute dirac pilot-login [ERROR %d]" % retCode) + class CheckCECapabilities(CommandBase): """Used to get CE tags and other relevant parameters.""" diff --git a/Pilot/pilotTools.py b/Pilot/pilotTools.py index 50369fe4..a04ac701 100644 --- a/Pilot/pilotTools.py +++ b/Pilot/pilotTools.py @@ -1085,8 +1085,6 @@ def __init__(self): if self.useServerCertificate: self.installEnv["X509_USER_PROXY"] = self.certsLocation os.environ["X509_USER_PROXY"] = self.certsLocation - - self.__retrieveIfNeededJWT() def __setSecurityDir(self, envName, dirLocation): """Set the environment variable of the `envName`, and add it also to the Pilot Parameters @@ -1142,16 +1140,6 @@ def __checkSecurityDir(self, envName, dirName): self.log.error("Could not find/set %s" % envName) sys.exit(1) - def __retrieveIfNeededJWT(self): - - if self.diracXServer: - if not self.pilotSecret: - raise ValueError("PilotSecret has to be defined") - if not self.pilotUUID: - raise ValueError("PilotUUID has to be defined") - self.jwt = retrieveJWT(self.diracXServer, self.pilotUUID, self.pilotSecret) - self.log.debug("Retrieved JWT from DiracX") - def __initCommandLine1(self): """Parses and interpret options on the command line: first pass (essential things)""" From 82bbda3fd938130e7d2aefd05788c11e0c8c1404 Mon Sep 17 00:00:00 2001 From: Robin Van de Merghel Date: Wed, 9 Apr 2025 11:38:17 +0200 Subject: [PATCH 11/21] fix: Fixing comments (removing TODO and adding Dirac requirements) --- Pilot/pilotCommands.py | 8 ++++++-- Pilot/pilotTools.py | 1 - 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/Pilot/pilotCommands.py b/Pilot/pilotCommands.py index d562b266..b7c814fd 100644 --- a/Pilot/pilotCommands.py +++ b/Pilot/pilotCommands.py @@ -573,7 +573,11 @@ def execute(self): class PilotLogin(CommandBase): - """The Pilot logins and fetches his jwt, only compatible with DiracX cli command""" + """The pilot logs in and fetches their JWT. + + .. note:: This command is only compatible with DiracX CLI, and requires Dirac version >= 9.0 + """ + def __init__(self, pilotParams): """c'tor""" @@ -581,7 +585,7 @@ def __init__(self, pilotParams): @logFinalizer def execute(self): - """Calls dirac-admin-add-pilot""" + """Calls dirac pilot-login""" if not self.pp.pilotReference: self.log.warn("Skipping module, no pilot reference found") diff --git a/Pilot/pilotTools.py b/Pilot/pilotTools.py index a04ac701..1ef63a45 100644 --- a/Pilot/pilotTools.py +++ b/Pilot/pilotTools.py @@ -751,7 +751,6 @@ def retrieveJWT(diracXURL, pilotUUID, pilotSecret): config.generateUserAgent(pilotUUID=pilotUUID) - # TODO: Add new cert for DiracX (different from Dirac) return config.executeRequest(raw_data={}, insecure=True) class CommandBase(object): From 3025d8bed97cf8ed726ad7c58313903715ed068f Mon Sep 17 00:00:00 2001 From: Robin Van de Merghel Date: Wed, 30 Apr 2025 11:12:35 +0200 Subject: [PATCH 12/21] fix: Fixed pilot argument, and pilot login command --- Pilot/pilotCommands.py | 33 ++++++++++++++++++++++++--------- Pilot/pilotTools.py | 5 +---- Pilot/proxyTools.py | 2 +- 3 files changed, 26 insertions(+), 14 deletions(-) diff --git a/Pilot/pilotCommands.py b/Pilot/pilotCommands.py index b7c814fd..8fd25ef2 100644 --- a/Pilot/pilotCommands.py +++ b/Pilot/pilotCommands.py @@ -44,7 +44,7 @@ def __init__(self, pilotParams): from shlex import quote except ImportError: from pipes import quote - + try: from Pilot.pilotTools import ( CommandBase, @@ -61,6 +61,13 @@ def __init__(self, pilotParams): safe_listdir, sendMessage, ) + +try: + from Pilot.proxyTools import BaseRequest +except ImportError: + from proxyTools import BaseRequest + +from urllib.error import HTTPError ############################ @@ -585,7 +592,7 @@ def __init__(self, pilotParams): @logFinalizer def execute(self): - """Calls dirac pilot-login""" + """Calls diracX api""" if not self.pp.pilotReference: self.log.warn("Skipping module, no pilot reference found") @@ -595,14 +602,20 @@ def execute(self): self.log.warn("Skipping module, no pilot secret found") return - checkCmd = "dirac pilot-login %s %s" % ( - self.pp.pilotReference, - self.pp.pilotSecret + config = BaseRequest( + "%s/api/auth/pilot-login" % ( + self.pp.diracXServer + ), + os.getenv("X509_CERT_DIR") ) - retCode, _ = self.executeAndGetOutput(checkCmd, self.pp.installEnv) - if retCode: - self.log.error("Could not get execute dirac pilot-login [ERROR %d]" % retCode) - + + config.generateUserAgent(self.pp.pilotUUID) + + self.pp.jwt = config.executeRequest({ + "pilot_stamp": self.pp.pilotStamp, + "pilot_secret": self.pp.pilotSecret + }, insecure=True) + class CheckCECapabilities(CommandBase): """Used to get CE tags and other relevant parameters.""" @@ -1263,3 +1276,5 @@ def execute(self): """Standard entry point to a pilot command""" self._setNagiosOptions() self._runNagiosProbes() + + diff --git a/Pilot/pilotTools.py b/Pilot/pilotTools.py index 1ef63a45..8ca02971 100644 --- a/Pilot/pilotTools.py +++ b/Pilot/pilotTools.py @@ -22,11 +22,8 @@ # python 2 -> 3 "hacks" try: from urllib.error import HTTPError, URLError - from urllib.parse import urlencode from urllib.request import urlopen except ImportError: - from urllib import urlencode - from urllib2 import HTTPError, URLError, urlopen try: @@ -1189,7 +1186,7 @@ def __initCommandLine2(self): self.keepPythonPath = True elif o in ("-C", "--configurationServer"): self.configServer = v - elif o == "--diracXServer": + elif o == "--diracx_URL": self.diracXServer = v elif o in ("-G", "--Group"): self.userGroup = v diff --git a/Pilot/proxyTools.py b/Pilot/proxyTools.py index b83e6190..12d83223 100644 --- a/Pilot/proxyTools.py +++ b/Pilot/proxyTools.py @@ -130,7 +130,7 @@ def executeRequest(self, raw_data, insecure=False): # Python2 data = urlencode(raw_data) - request = Request(self.url, data=data, headers=self.headers) + request = Request(self.url, data=data, headers=self.headers, method="POST") ctx = self._context # Save in case of an insecure request From 131080e4c92396918a34b556e69c9ab7f98ceedb Mon Sep 17 00:00:00 2001 From: Robin Van de Merghel Date: Fri, 2 May 2025 17:15:19 +0200 Subject: [PATCH 13/21] fix: Fixed to match the current PR endpoint --- Pilot/pilotCommands.py | 31 +++++++++++++++++++++---------- Pilot/pilotTools.py | 21 +++------------------ Pilot/proxyTools.py | 37 ++++++++++++++++++++++++++----------- 3 files changed, 50 insertions(+), 39 deletions(-) diff --git a/Pilot/pilotCommands.py b/Pilot/pilotCommands.py index 8fd25ef2..08099868 100644 --- a/Pilot/pilotCommands.py +++ b/Pilot/pilotCommands.py @@ -594,13 +594,19 @@ def __init__(self, pilotParams): def execute(self): """Calls diracX api""" - if not self.pp.pilotReference: - self.log.warn("Skipping module, no pilot reference found") - return + if not self.pp.pilotUUID: + self.log.error("PilotUUID not given, exiting...") + sys.exit(-1) if not self.pp.pilotSecret: - self.log.warn("Skipping module, no pilot secret found") - return + self.log.error("PilotSecret not given, exiting...") + sys.exit(-1) + + if not self.pp.diracXServer: + self.log.error("DiracXServer (url) not given, exiting...") + sys.exit(-1) + + self.log.info("Fetching JWT in DiracX (URL: %s)" % self.pp.diracXServer) config = BaseRequest( "%s/api/auth/pilot-login" % ( @@ -611,11 +617,16 @@ def execute(self): config.generateUserAgent(self.pp.pilotUUID) - self.pp.jwt = config.executeRequest({ - "pilot_stamp": self.pp.pilotStamp, - "pilot_secret": self.pp.pilotSecret - }, insecure=True) - + try: + self.pp.jwt = config.executeRequest({ + "pilot_stamp": self.pp.pilotUUID, + "pilot_secret": self.pp.pilotSecret + }, insecure=True) + except HTTPError as e: + self.log.error("Request failed: %s" % str(e)) + + self.log.info("Fetched the pilot token with the pilot secret.") + class CheckCECapabilities(CommandBase): """Used to get CE tags and other relevant parameters.""" diff --git a/Pilot/pilotTools.py b/Pilot/pilotTools.py index 8ca02971..f9ac013d 100644 --- a/Pilot/pilotTools.py +++ b/Pilot/pilotTools.py @@ -729,26 +729,11 @@ def sendMessage(url, pilotUUID, wnVO, method, rawMessage, jwt={}): config.generateUserAgent(pilotUUID=pilotUUID) # Do the request - _res = config.executeRequest(raw_data=raw_data) - - - -def retrieveJWT(diracXURL, pilotUUID, pilotSecret): - - caPath = os.getenv("X509_CERT_DIR") - - data_in_url = "?pilot_job_reference=%s&pilot_secret=%s" % (pilotUUID, pilotSecret) - - config = BaseRequest( - "%s/api/auth/pilot-login%s" % (diracXURL, data_in_url), - caPath=caPath + _res = config.executeRequest( + raw_data=raw_data, + content_type="x-www-form-urlencoded" ) - - config.addHeader("Content-Type", "application/json") - - config.generateUserAgent(pilotUUID=pilotUUID) - return config.executeRequest(raw_data={}, insecure=True) class CommandBase(object): """CommandBase is the base class for every command in the pilot commands toolbox""" diff --git a/Pilot/proxyTools.py b/Pilot/proxyTools.py index 12d83223..218e0438 100644 --- a/Pilot/proxyTools.py +++ b/Pilot/proxyTools.py @@ -17,12 +17,12 @@ try: from urllib.parse import urlencode + from urllib.error import HTTPError from urllib.request import Request, urlopen except ImportError: from urllib import urlencode - from urllib2 import Request, urlopen - + from urllib2 import Request, urlopen, HTTPError VOMS_FQANS_OID = b"1.3.6.1.4.1.8005.100.100.4" VOMS_EXTENSION_OID = b"1.3.6.1.4.1.8005.100.100.5" @@ -81,7 +81,7 @@ def getVO(proxy_data): class BaseRequest(object): - """This class helps supporting multiple kinds of requests that requires connections""" + """This class helps supporting multiple kinds of requests that require connections""" def __init__(self, url, caPath, name="unknown"): self.name = name @@ -114,21 +114,32 @@ def addHeader(self, key, value): """Add a header (key, value) into the request header""" self.headers[key] = value - def executeRequest(self, raw_data, insecure=False): + def executeRequest(self, raw_data, insecure=False, content_type="json"): """Execute a HTTP request with the data, headers, and the pre-defined data (SSL + auth) :param raw_data: Data to send :type raw_data: dict :param insecure: Deactivate proxy verification WARNING Debug ONLY :type insecure: bool + :param content_type: Data format to send, either "json" or "x-www-form-urlencoded" + :type content_type: str :return: Parsed JSON response :rtype: dict - """ - if sys.version_info.major == 3: - data = urlencode(raw_data).encode("utf-8") # encode to bytes ! for python3 + """ + if content_type == "json": + data = json.dumps(raw_data).encode("utf-8") + self.headers["Content-Type"] = "application/json" + elif content_type == "x-www-form-urlencoded": + if sys.version_info.major == 3: + data = urlencode(raw_data).encode("utf-8") # encode to bytes ! for python3 + else: + # Python2 + data = urlencode(raw_data) + self.headers["Content-Type"] = "application/x-www-form-urlencoded" else: - # Python2 - data = urlencode(raw_data) + raise ValueError("Invalid content_type. Use 'json' or 'x-www-form-urlencoded'.") + + self.headers["Content-Length"] = str(len(data)) request = Request(self.url, data=data, headers=self.headers, method="POST") @@ -190,8 +201,12 @@ def __init__(self, url, caPath, certEnv): ) self._hasExtraCredentials = True - def executeRequest(self, raw_data): + def executeRequest(self, raw_data, insecure=False, content_type="json"): # Adds a flag if the passed cert is a Directory if self._hasExtraCredentials: raw_data["extraCredentials"] = '"hosts"' - return super(X509BasedRequest, self).executeRequest(raw_data) + return super(X509BasedRequest, self).executeRequest( + raw_data, + insecure=insecure, + content_type=content_type + ) From 5567abbaf3148c238d5fd5488fbfe6e24675afa5 Mon Sep 17 00:00:00 2001 From: Robin Van de Merghel Date: Mon, 5 May 2025 11:36:48 +0200 Subject: [PATCH 14/21] fix: Fixed lint in pilotCommands (import error) --- Pilot/pilotCommands.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/Pilot/pilotCommands.py b/Pilot/pilotCommands.py index 08099868..fa29b7f4 100644 --- a/Pilot/pilotCommands.py +++ b/Pilot/pilotCommands.py @@ -66,8 +66,11 @@ def __init__(self, pilotParams): from Pilot.proxyTools import BaseRequest except ImportError: from proxyTools import BaseRequest - -from urllib.error import HTTPError + +try: + from urllib.error import HTTPError +except ImportError: + from urllib2 import HTTPError ############################ From 33d998eda027b668be7ef9b007600a7145563225 Mon Sep 17 00:00:00 2001 From: Robin Van de Merghel Date: Mon, 5 May 2025 11:42:24 +0200 Subject: [PATCH 15/21] fix: Fixed the name of the PilotLogin --- Pilot/pilotCommands.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Pilot/pilotCommands.py b/Pilot/pilotCommands.py index fa29b7f4..433b1926 100644 --- a/Pilot/pilotCommands.py +++ b/Pilot/pilotCommands.py @@ -582,7 +582,7 @@ def execute(self): self.log.error("Could not get execute dirac-admin-add-pilot [ERROR %d]" % retCode) -class PilotLogin(CommandBase): +class PilotLoginX(CommandBase): """The pilot logs in and fetches their JWT. .. note:: This command is only compatible with DiracX CLI, and requires Dirac version >= 9.0 @@ -591,7 +591,7 @@ class PilotLogin(CommandBase): def __init__(self, pilotParams): """c'tor""" - super(PilotLogin, self).__init__(pilotParams) + super(PilotLoginX, self).__init__(pilotParams) @logFinalizer def execute(self): From 42be67595e5cd14f459bc509dff19511fe041861 Mon Sep 17 00:00:00 2001 From: Robin Van de Merghel Date: Mon, 5 May 2025 12:53:38 +0200 Subject: [PATCH 16/21] feat: Aborting when we fetch a pilot and it does not work --- Pilot/pilotCommands.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/Pilot/pilotCommands.py b/Pilot/pilotCommands.py index 433b1926..83cccfd1 100644 --- a/Pilot/pilotCommands.py +++ b/Pilot/pilotCommands.py @@ -68,9 +68,9 @@ def __init__(self, pilotParams): from proxyTools import BaseRequest try: - from urllib.error import HTTPError + from urllib.error import HTTPError, URLError except ImportError: - from urllib2 import HTTPError + from urllib2 import HTTPError, URLError ############################ @@ -625,8 +625,10 @@ def execute(self): "pilot_stamp": self.pp.pilotUUID, "pilot_secret": self.pp.pilotSecret }, insecure=True) - except HTTPError as e: + except (HTTPError, URLError) as e: self.log.error("Request failed: %s" % str(e)) + self.log.error("Could not fetch pilot tokens. Aborting...") + sys.exit(1) self.log.info("Fetched the pilot token with the pilot secret.") From 1412a18dedf294985e479744a57f032736a7d733 Mon Sep 17 00:00:00 2001 From: Robin Van de Merghel Date: Mon, 5 May 2025 15:52:40 +0200 Subject: [PATCH 17/21] fix: Small fix of comment --- Pilot/pilotCommands.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Pilot/pilotCommands.py b/Pilot/pilotCommands.py index 83cccfd1..5ff47244 100644 --- a/Pilot/pilotCommands.py +++ b/Pilot/pilotCommands.py @@ -585,7 +585,7 @@ def execute(self): class PilotLoginX(CommandBase): """The pilot logs in and fetches their JWT. - .. note:: This command is only compatible with DiracX CLI, and requires Dirac version >= 9.0 + .. note:: This command is only compatible with DiracX, and requires Dirac version >= 9.0 """ From e2dba244dc74e206f995e65b31cfd917205c2f91 Mon Sep 17 00:00:00 2001 From: Robin Van de Merghel Date: Fri, 16 May 2025 17:05:50 +0200 Subject: [PATCH 18/21] feat: Add multi thread support to refresh tokens --- Pilot/dirac-pilot.py | 17 +++++ Pilot/pilotCommands.py | 36 ++++++++-- Pilot/pilotTools.py | 21 +++--- Pilot/proxyTools.py | 148 ++++++++++++++++++++++++++++++++++++----- 4 files changed, 190 insertions(+), 32 deletions(-) diff --git a/Pilot/dirac-pilot.py b/Pilot/dirac-pilot.py index 9c434c97..af6a36fd 100644 --- a/Pilot/dirac-pilot.py +++ b/Pilot/dirac-pilot.py @@ -49,6 +49,12 @@ getCommand, pythonPathCheck, ) + +try: + from Pilot.proxyTools import revokePilotToken +except ImportError: + from proxyTools import revokePilotToken + ############################ if __name__ == "__main__": @@ -124,3 +130,14 @@ if remote: log.buffer.flush() sys.exit(-1) + + log.info("Pilot tasks finished.") + + if pilotParams.jwt: + log.info("Revoking pilot token.") + revokePilotToken( + pilotParams.diracXServer, + pilotParams.pilotUUID, + pilotParams.jwt, + pilotParams.clientID + ) diff --git a/Pilot/pilotCommands.py b/Pilot/pilotCommands.py index 5ff47244..94652ddb 100644 --- a/Pilot/pilotCommands.py +++ b/Pilot/pilotCommands.py @@ -21,6 +21,7 @@ def __init__(self, pilotParams): import filecmp import os +import threading import platform import shutil import socket @@ -63,9 +64,9 @@ def __init__(self, pilotParams): ) try: - from Pilot.proxyTools import BaseRequest + from Pilot.proxyTools import BaseRequest, refreshTokenLoop except ImportError: - from proxyTools import BaseRequest + from proxyTools import BaseRequest, refreshTokenLoop try: from urllib.error import HTTPError, URLError @@ -592,6 +593,7 @@ class PilotLoginX(CommandBase): def __init__(self, pilotParams): """c'tor""" super(PilotLoginX, self).__init__(pilotParams) + self.jwt_lock = threading.Lock() @logFinalizer def execute(self): @@ -609,17 +611,20 @@ def execute(self): self.log.error("DiracXServer (url) not given, exiting...") sys.exit(-1) + if not self.pp.clientID: + self.log.error("ClientID not given, exiting...") + sys.exit(-1) + self.log.info("Fetching JWT in DiracX (URL: %s)" % self.pp.diracXServer) config = BaseRequest( - "%s/api/auth/pilot-login" % ( + "%s/api/pilots/token" % ( self.pp.diracXServer ), - os.getenv("X509_CERT_DIR") + os.getenv("X509_CERT_DIR"), + self.pp.pilotUUID ) - config.generateUserAgent(self.pp.pilotUUID) - try: self.pp.jwt = config.executeRequest({ "pilot_stamp": self.pp.pilotUUID, @@ -632,6 +637,25 @@ def execute(self): self.log.info("Fetched the pilot token with the pilot secret.") + self.log.info("Starting the refresh thread.") + self.log.info("Refreshing the token every %d seconds." % self.pp.refreshTokenEvery) + # Start background refresh thread + t = threading.Thread( + target=refreshTokenLoop, + args=( + self.pp.diracXServer, + self.pp.pilotUUID, + self.pp.jwt, + self.jwt_lock, + self.log, + self.pp.clientID, + self.pp.refreshTokenEvery + ) + ) + t.daemon = True + t.start() + + class CheckCECapabilities(CommandBase): """Used to get CE tags and other relevant parameters.""" diff --git a/Pilot/pilotTools.py b/Pilot/pilotTools.py index f9ac013d..b1f98a8d 100644 --- a/Pilot/pilotTools.py +++ b/Pilot/pilotTools.py @@ -705,15 +705,12 @@ def sendMessage(url, pilotUUID, wnVO, method, rawMessage, jwt={}): config = None if jwt: - try: - access_token = jwt["access_token"] - except ValueError as e: - raise ValueError("JWT is needed, with an access_token field") config = TokenBasedRequest( url=url, caPath=caPath, - jwtData=access_token + jwtData=jwt, + pilotUUID=pilotUUID ) else: @@ -722,12 +719,10 @@ def sendMessage(url, pilotUUID, wnVO, method, rawMessage, jwt={}): config = X509BasedRequest( url=url, caPath=caPath, - certEnv=cert + certEnv=cert, + pilotUUID=pilotUUID ) - # Config the header, will help debugging - config.generateUserAgent(pilotUUID=pilotUUID) - # Do the request _res = config.executeRequest( raw_data=raw_data, @@ -926,6 +921,8 @@ def __init__(self): self.queueName = "" self.gridCEType = "" self.pilotSecret = "" + self.clientID = "" + self.refreshTokenEvery = 300 self.jwt = { "access_token": "", "refresh_token": "" @@ -1041,6 +1038,8 @@ def __init__(self): ("", "architectureScript=", "architecture script to use"), ("", "CVMFS_locations=", "comma-separated list of CVMS locations"), ("", "pilotSecret=", "secret that the pilot uses with DiracX"), + ("", "clientID=", "client id used by DiracX to revoke a token"), + ("", "refreshTokenEvery=", "how often we have to refresh a token (in seconds)") ) # Possibly get Setup and JSON URL/filename from command line @@ -1248,6 +1247,10 @@ def __initCommandLine2(self): self.CVMFS_locations = v.split(",") elif o == "--pilotSecret": self.pilotSecret = v + elif o == "--clientID": + self.clientID = v + elif o == "--refreshTokenEvery": + self.refreshTokenEvery = int(v) def __loadJSON(self): """ diff --git a/Pilot/proxyTools.py b/Pilot/proxyTools.py index 218e0438..ad01544b 100644 --- a/Pilot/proxyTools.py +++ b/Pilot/proxyTools.py @@ -4,6 +4,7 @@ import json import os +import time import re import ssl import sys @@ -83,26 +84,23 @@ def getVO(proxy_data): class BaseRequest(object): """This class helps supporting multiple kinds of requests that require connections""" - def __init__(self, url, caPath, name="unknown"): + def __init__(self, url, caPath, pilotUUID, name="unknown"): self.name = name self.url = url self.caPath = caPath self.headers = { "User-Agent": "Dirac Pilot [Unknown ID]" } + self.pilotUUID = pilotUUID # We assume we have only one context, so this variable could be shared to avoid opening n times a cert. # On the contrary, to avoid race conditions, we do avoid using "self.data" and "self.headers" self._context = None self._prepareRequest() - def generateUserAgent(self, pilotUUID): - """To analyse the traffic, we can send a taylor-made User-Agent - - :param pilotUUID: Unique ID of the Pilot - :type pilotUUID: str - """ - self.headers["User-Agent"] = "Dirac Pilot [%s]" % pilotUUID + def generateUserAgent(self): + """To analyse the traffic, we can send a taylor-made User-Agent""" + self.addHeader("User-Agent", "Dirac Pilot [%s]" % self.pilotUUID) def _prepareRequest(self): """As previously, loads the SSL certificates of the server (to avoid "unknown issuer")""" @@ -128,18 +126,18 @@ def executeRequest(self, raw_data, insecure=False, content_type="json"): """ if content_type == "json": data = json.dumps(raw_data).encode("utf-8") - self.headers["Content-Type"] = "application/json" + self.addHeader("Content-Type", "application/json") elif content_type == "x-www-form-urlencoded": if sys.version_info.major == 3: data = urlencode(raw_data).encode("utf-8") # encode to bytes ! for python3 else: # Python2 data = urlencode(raw_data) - self.headers["Content-Type"] = "application/x-www-form-urlencoded" + self.addHeader("Content-Type", "application/x-www-form-urlencoded") else: raise ValueError("Invalid content_type. Use 'json' or 'x-www-form-urlencoded'.") - self.headers["Content-Length"] = str(len(data)) + self.addHeader("Content-Length", str(len(data))) request = Request(self.url, data=data, headers=self.headers, method="POST") @@ -173,21 +171,27 @@ def executeRequest(self, raw_data, insecure=False, content_type="json"): class TokenBasedRequest(BaseRequest): """Connected Request with JWT support""" - def __init__(self, url, caPath, jwtData): - super(TokenBasedRequest, self).__init__(url, caPath, "TokenBasedConnection") - + def __init__(self, url, caPath, jwtData, pilotUUID): + super(TokenBasedRequest, self).__init__(url, caPath, pilotUUID, "TokenBasedConnection") self.jwtData = jwtData def addJwtToHeader(self): # Adds the JWT in the HTTP request (in the Bearer field) - self.headers["Authorization"] = "Bearer: %s" % self.jwtData + self.headers["Authorization"] = "Bearer: %s" % self.jwtData["access_token"] + def executeRequest(self, raw_data, insecure=False, content_type="json", is_token_refreshed=False): + + return super(TokenBasedRequest, self).executeRequest( + raw_data, + insecure=insecure, + content_type=content_type + ) class X509BasedRequest(BaseRequest): """Connected Request with X509 support""" - def __init__(self, url, caPath, certEnv): - super(X509BasedRequest, self).__init__(url, caPath, "X509BasedConnection") + def __init__(self, url, caPath, certEnv, pilotUUID): + super(X509BasedRequest, self).__init__(url, caPath, pilotUUID, "X509BasedConnection") self.certEnv = certEnv self._hasExtraCredentials = False @@ -210,3 +214,113 @@ def executeRequest(self, raw_data, insecure=False, content_type="json"): insecure=insecure, content_type=content_type ) + + +def refreshPilotToken(url, pilotUUID, jwt, jwt_lock, clientID): + """ + Refresh the JWT token in a separate thread. + + :param str url: Server URL + :param str pilotUUID: Pilot unique ID + :param dict jwt: Shared dict with current JWT; updated in-place + :param threading.Lock jwt_lock: Lock to safely update the jwt dict + :return: None + """ + + # PRECONDITION: jwt must contain "refresh_token" + if not jwt or "refresh_token" not in jwt: + raise ValueError("To refresh a token, a pilot needs a JWT with refresh_token") + + # Get CA path from environment + caPath = os.getenv("X509_CERT_DIR") + + # Create request object with required configuration + config = BaseRequest( + url="%s/api/auth/token" % url, + caPath=caPath, + pilotUUID=pilotUUID + ) + + # Prepare refresh token payload + payload = { + "grant_type": "refresh_token", + "refresh_token": jwt["refresh_token"], + "client_id": clientID + } + + # Perform the request to refresh the token + response = config.executeRequest( + raw_data=payload, + insecure=True, + content_type="x-www-form-urlencoded" + ) + + # Ensure thread-safe update of the shared jwt dictionary + jwt_lock.acquire() + try: + jwt.update(response) + finally: + jwt_lock.release() + + +def revokePilotToken(url, pilotUUID, jwt, clientID): + """ + Refresh the JWT token in a separate thread. + + :param str url: Server URL + :param str pilotUUID: Pilot unique ID + :param dict jwt: Shared dict with current JWT; + :return: None + """ + + # PRECONDITION: jwt must contain "refresh_token" + if not jwt or "refresh_token" not in jwt: + raise ValueError("To refresh a token, a pilot needs a JWT with refresh_token") + + # Get CA path from environment + caPath = os.getenv("X509_CERT_DIR") + + # Create request object with required configuration + config = BaseRequest( + url="%s/api/auth/revoke" % url, + caPath=caPath, + pilotUUID=pilotUUID + ) + + # Prepare refresh token payload + payload = { + "refresh_token": jwt["refresh_token"], + "client_id": clientID + } + + # Perform the request to revoke the token + _response = config.executeRequest( + raw_data=payload, + insecure=True, + content_type="x-www-form-urlencoded" + ) + +# === Token refresher thread function === +def refreshTokenLoop(url, pilotUUID, jwt, jwt_lock, logger, clientID, interval=600): + """ + Periodically refresh the pilot JWT token. + + :param str url: DiracX server URL + :param str pilotUUID: Pilot UUID + :param dict jwt: Shared JWT dictionary + :param threading.Lock jwt_lock: Lock to safely update JWT + :param Logger logger: Logger to debug + :param str clientID: ClientID used to refresh tokens + :param int interval: Sleep time between refreshes in seconds + :return: None + """ + while True: + time.sleep(interval) + + try: + refreshPilotToken(url, pilotUUID, jwt, jwt_lock, clientID) + + logger.info("Token refreshed.") + except Exception as e: + logger.error("Token refresh failed: %s\n" % str(e)) + continue From 1267c8c30e742feeb24bf55abf72de715f65c382 Mon Sep 17 00:00:00 2001 From: Robin Van de Merghel Date: Wed, 21 May 2025 11:18:35 +0200 Subject: [PATCH 19/21] feat: Add thread to refresh tokens --- Pilot/pilotCommands.py | 6 +-- Pilot/proxyTools.py | 86 ++++++++++++++++++++++-------------------- 2 files changed, 47 insertions(+), 45 deletions(-) diff --git a/Pilot/pilotCommands.py b/Pilot/pilotCommands.py index 94652ddb..7bf09801 100644 --- a/Pilot/pilotCommands.py +++ b/Pilot/pilotCommands.py @@ -587,6 +587,7 @@ class PilotLoginX(CommandBase): """The pilot logs in and fetches their JWT. .. note:: This command is only compatible with DiracX, and requires Dirac version >= 9.0 + .. note:: This command will start a new thread to refresh tokens regularly """ @@ -611,10 +612,6 @@ def execute(self): self.log.error("DiracXServer (url) not given, exiting...") sys.exit(-1) - if not self.pp.clientID: - self.log.error("ClientID not given, exiting...") - sys.exit(-1) - self.log.info("Fetching JWT in DiracX (URL: %s)" % self.pp.diracXServer) config = BaseRequest( @@ -648,7 +645,6 @@ def execute(self): self.pp.jwt, self.jwt_lock, self.log, - self.pp.clientID, self.pp.refreshTokenEvery ) ) diff --git a/Pilot/proxyTools.py b/Pilot/proxyTools.py index ad01544b..d490c927 100644 --- a/Pilot/proxyTools.py +++ b/Pilot/proxyTools.py @@ -127,17 +127,23 @@ def executeRequest(self, raw_data, insecure=False, content_type="json"): if content_type == "json": data = json.dumps(raw_data).encode("utf-8") self.addHeader("Content-Type", "application/json") - elif content_type == "x-www-form-urlencoded": - if sys.version_info.major == 3: - data = urlencode(raw_data).encode("utf-8") # encode to bytes ! for python3 - else: - # Python2 - data = urlencode(raw_data) - self.addHeader("Content-Type", "application/x-www-form-urlencoded") + self.addHeader("Content-Length", str(len(data))) else: - raise ValueError("Invalid content_type. Use 'json' or 'x-www-form-urlencoded'.") - self.addHeader("Content-Length", str(len(data))) + data = urlencode(raw_data) + + if content_type == "x-www-form-urlencoded": + if sys.version_info.major == 3: + data = urlencode(raw_data).encode("utf-8") # encode to bytes ! for python3 + + self.addHeader("Content-Type", "application/x-www-form-urlencoded") + self.addHeader("Content-Length", str(len(data))) + elif content_type == "query": + self.url = self.url + "?" + data + data = None # No body + else: + raise ValueError("Invalid content_type. Use 'json' or 'x-www-form-urlencoded'.") + request = Request(self.url, data=data, headers=self.headers, method="POST") @@ -150,22 +156,26 @@ def executeRequest(self, raw_data, insecure=False, content_type="json"): ctx.check_hostname = False ctx.verify_mode = ssl.CERT_NONE - if sys.version_info.major == 3: - # Python 3 code - with urlopen(request, context=ctx) as res: - response_data = res.read().decode("utf-8") # Decode response bytes - else: - # Python 2 code - res = urlopen(request, context=ctx) - try: - response_data = res.read() - finally: - res.close() + + try: + if sys.version_info.major == 3: + # Python 3 code + with urlopen(request, context=ctx) as res: + response_data = res.read().decode("utf-8") # Decode response bytes + else: + # Python 2 code + res = urlopen(request, context=ctx) + try: + response_data = res.read() + finally: + res.close() + except HTTPError as e: + raise RuntimeError("HTTPError : %s" % e.read().decode()) try: return json.loads(response_data) # Parse JSON response except ValueError: # In Python 2, json.JSONDecodeError is a subclass of ValueError - raise Exception("Invalid JSON response: %s" % response_data) + raise ValueError("Invalid JSON response: %s" % response_data) class TokenBasedRequest(BaseRequest): @@ -174,12 +184,13 @@ class TokenBasedRequest(BaseRequest): def __init__(self, url, caPath, jwtData, pilotUUID): super(TokenBasedRequest, self).__init__(url, caPath, pilotUUID, "TokenBasedConnection") self.jwtData = jwtData + self.addJwtToHeader() def addJwtToHeader(self): # Adds the JWT in the HTTP request (in the Bearer field) - self.headers["Authorization"] = "Bearer: %s" % self.jwtData["access_token"] + self.headers["Authorization"] = "Bearer %s" % self.jwtData["access_token"] - def executeRequest(self, raw_data, insecure=False, content_type="json", is_token_refreshed=False): + def executeRequest(self, raw_data, insecure=False, content_type="json"): return super(TokenBasedRequest, self).executeRequest( raw_data, @@ -216,7 +227,7 @@ def executeRequest(self, raw_data, insecure=False, content_type="json"): ) -def refreshPilotToken(url, pilotUUID, jwt, jwt_lock, clientID): +def refreshPilotToken(url, pilotUUID, jwt, jwt_lock): """ Refresh the JWT token in a separate thread. @@ -235,24 +246,19 @@ def refreshPilotToken(url, pilotUUID, jwt, jwt_lock, clientID): caPath = os.getenv("X509_CERT_DIR") # Create request object with required configuration - config = BaseRequest( - url="%s/api/auth/token" % url, + config = TokenBasedRequest( + url="%s/api/pilots/refresh-token" % url, caPath=caPath, - pilotUUID=pilotUUID + pilotUUID=pilotUUID, + jwtData=jwt ) - # Prepare refresh token payload - payload = { - "grant_type": "refresh_token", - "refresh_token": jwt["refresh_token"], - "client_id": clientID - } - # Perform the request to refresh the token response = config.executeRequest( - raw_data=payload, + raw_data={ + "refresh_token": jwt["refresh_token"] + }, insecure=True, - content_type="x-www-form-urlencoded" ) # Ensure thread-safe update of the shared jwt dictionary @@ -269,6 +275,7 @@ def revokePilotToken(url, pilotUUID, jwt, clientID): :param str url: Server URL :param str pilotUUID: Pilot unique ID + :param str clientID: ClientID used to revoke tokens :param dict jwt: Shared dict with current JWT; :return: None """ @@ -297,11 +304,11 @@ def revokePilotToken(url, pilotUUID, jwt, clientID): _response = config.executeRequest( raw_data=payload, insecure=True, - content_type="x-www-form-urlencoded" + content_type="query" ) # === Token refresher thread function === -def refreshTokenLoop(url, pilotUUID, jwt, jwt_lock, logger, clientID, interval=600): +def refreshTokenLoop(url, pilotUUID, jwt, jwt_lock, logger, interval=600): """ Periodically refresh the pilot JWT token. @@ -310,7 +317,6 @@ def refreshTokenLoop(url, pilotUUID, jwt, jwt_lock, logger, clientID, interval=6 :param dict jwt: Shared JWT dictionary :param threading.Lock jwt_lock: Lock to safely update JWT :param Logger logger: Logger to debug - :param str clientID: ClientID used to refresh tokens :param int interval: Sleep time between refreshes in seconds :return: None """ @@ -318,7 +324,7 @@ def refreshTokenLoop(url, pilotUUID, jwt, jwt_lock, logger, clientID, interval=6 time.sleep(interval) try: - refreshPilotToken(url, pilotUUID, jwt, jwt_lock, clientID) + refreshPilotToken(url, pilotUUID, jwt, jwt_lock) logger.info("Token refreshed.") except Exception as e: From 244a06436d31c9b9901dfe75a1ff20b332eb5828 Mon Sep 17 00:00:00 2001 From: Robin Van de Merghel Date: Thu, 22 May 2025 14:40:48 +0200 Subject: [PATCH 20/21] fix: PilotLogin is not a command anymore, to log-in before the remote logger --- Pilot/pilotCommands.py | 71 ------------------------------------------ Pilot/pilotTools.py | 55 +++++++++++++++++++++++++++++--- 2 files changed, 51 insertions(+), 75 deletions(-) diff --git a/Pilot/pilotCommands.py b/Pilot/pilotCommands.py index 7bf09801..1b684df2 100644 --- a/Pilot/pilotCommands.py +++ b/Pilot/pilotCommands.py @@ -21,7 +21,6 @@ def __init__(self, pilotParams): import filecmp import os -import threading import platform import shutil import socket @@ -29,7 +28,6 @@ def __init__(self, pilotParams): import sys import time import traceback -import subprocess from collections import Counter ############################ @@ -583,75 +581,6 @@ def execute(self): self.log.error("Could not get execute dirac-admin-add-pilot [ERROR %d]" % retCode) -class PilotLoginX(CommandBase): - """The pilot logs in and fetches their JWT. - - .. note:: This command is only compatible with DiracX, and requires Dirac version >= 9.0 - .. note:: This command will start a new thread to refresh tokens regularly - """ - - - def __init__(self, pilotParams): - """c'tor""" - super(PilotLoginX, self).__init__(pilotParams) - self.jwt_lock = threading.Lock() - - @logFinalizer - def execute(self): - """Calls diracX api""" - - if not self.pp.pilotUUID: - self.log.error("PilotUUID not given, exiting...") - sys.exit(-1) - - if not self.pp.pilotSecret: - self.log.error("PilotSecret not given, exiting...") - sys.exit(-1) - - if not self.pp.diracXServer: - self.log.error("DiracXServer (url) not given, exiting...") - sys.exit(-1) - - self.log.info("Fetching JWT in DiracX (URL: %s)" % self.pp.diracXServer) - - config = BaseRequest( - "%s/api/pilots/token" % ( - self.pp.diracXServer - ), - os.getenv("X509_CERT_DIR"), - self.pp.pilotUUID - ) - - try: - self.pp.jwt = config.executeRequest({ - "pilot_stamp": self.pp.pilotUUID, - "pilot_secret": self.pp.pilotSecret - }, insecure=True) - except (HTTPError, URLError) as e: - self.log.error("Request failed: %s" % str(e)) - self.log.error("Could not fetch pilot tokens. Aborting...") - sys.exit(1) - - self.log.info("Fetched the pilot token with the pilot secret.") - - self.log.info("Starting the refresh thread.") - self.log.info("Refreshing the token every %d seconds." % self.pp.refreshTokenEvery) - # Start background refresh thread - t = threading.Thread( - target=refreshTokenLoop, - args=( - self.pp.diracXServer, - self.pp.pilotUUID, - self.pp.jwt, - self.jwt_lock, - self.log, - self.pp.refreshTokenEvery - ) - ) - t.daemon = True - t.start() - - class CheckCECapabilities(CommandBase): """Used to get CE tags and other relevant parameters.""" diff --git a/Pilot/pilotTools.py b/Pilot/pilotTools.py index b1f98a8d..a5ad854a 100644 --- a/Pilot/pilotTools.py +++ b/Pilot/pilotTools.py @@ -9,7 +9,7 @@ import re import select import signal -import ssl +import threading import subprocess import sys import threading @@ -66,9 +66,9 @@ def load_module_from_path(module_name, path_to_module): basestring = str try: - from Pilot.proxyTools import X509BasedRequest, getVO, TokenBasedRequest, BaseRequest + from Pilot.proxyTools import X509BasedRequest, getVO, TokenBasedRequest, BaseRequest, refreshTokenLoop except ImportError: - from proxyTools import X509BasedRequest, getVO, TokenBasedRequest, BaseRequest + from proxyTools import X509BasedRequest, getVO, TokenBasedRequest, BaseRequest, refreshTokenLoop try: FileNotFoundError # pylint: disable=used-before-assignment @@ -927,6 +927,7 @@ def __init__(self): "access_token": "", "refresh_token": "" } + self.jwt_lock = threading.Lock() # maxNumberOfProcessors: the number of # processors allocated to the pilot which the pilot can allocate to one payload # used to set payloadProcessors unless other limits are reached (like the number of processors on the WN) @@ -1065,7 +1066,53 @@ def __init__(self): if self.useServerCertificate: self.installEnv["X509_USER_PROXY"] = self.certsLocation os.environ["X509_USER_PROXY"] = self.certsLocation - + + + + if self.pilotUUID or not self.pilotSecret or not self.diracXServer: + self.log.info("Fetching JWT in DiracX (URL: %s)" % self.diracXServer) + + config = BaseRequest( + "%s/api/pilots/token" % ( + self.diracXServer + ), + os.getenv("X509_CERT_DIR"), + self.pilotUUID + ) + + try: + self.jwt = config.executeRequest({ + "pilot_stamp": self.pilotUUID, + "pilot_secret": self.pilotSecret + }, insecure=True) + except (HTTPError, URLError) as e: + self.log.error("Request failed: %s" % str(e)) + self.log.error("Could not fetch pilot tokens. Aborting...") + sys.exit(1) + + self.log.info("Fetched the pilot token with the pilot secret.") + + self.log.info("Starting the refresh thread.") + self.log.info("Refreshing the token every %d seconds." % self.refreshTokenEvery) + # Start background refresh thread + t = threading.Thread( + target=refreshTokenLoop, + args=( + self.diracXServer, + self.pilotUUID, + self.jwt, + self.jwt_lock, + self.log, + self.refreshTokenEvery + ) + ) + t.daemon = True + t.start() + else: + self.log.info("PilotUUID, pilotSecret, and diracXServer are needed to support DiracX.") + + + def __setSecurityDir(self, envName, dirLocation): """Set the environment variable of the `envName`, and add it also to the Pilot Parameters From 82f3fa120e912540ddf2b3d364248cd123a43bb3 Mon Sep 17 00:00:00 2001 From: Robin Van de Merghel Date: Fri, 23 May 2025 16:26:38 +0200 Subject: [PATCH 21/21] feat: Add working logging system --- Pilot/dirac-pilot.py | 24 ++++++--- Pilot/pilotCommands.py | 33 +++++++++--- Pilot/pilotTools.py | 118 ++++++++++++++++++++--------------------- Pilot/proxyTools.py | 26 +++++---- 4 files changed, 119 insertions(+), 82 deletions(-) diff --git a/Pilot/dirac-pilot.py b/Pilot/dirac-pilot.py index af6a36fd..a43de292 100644 --- a/Pilot/dirac-pilot.py +++ b/Pilot/dirac-pilot.py @@ -70,7 +70,7 @@ sys.stdout.write(bufContent) # now the remote logger. remote = pilotParams.pilotLogging and (pilotParams.loggerURL is not None) - if remote: + if remote and pilotParams.jwt: # In a remote logger enabled Dirac version we would have some classic logger content from a wrapper, # which we passed in: receivedContent = "" @@ -82,12 +82,18 @@ bufsize=pilotParams.loggerBufsize, pilotUUID=pilotParams.pilotUUID, debugFlag=pilotParams.debugFlag, - wnVO=pilotParams.wnVO, + jwt=pilotParams.jwt ) log.info("Remote logger activated") - log.buffer.write(receivedContent) + log.buffer.write(log.format_to_json( + "INFO", + receivedContent, + )) log.buffer.flush() - log.buffer.write(bufContent) + log.buffer.write(log.format_to_json( + "INFO", + bufContent, + )) else: log = Logger("Pilot", debugFlag=pilotParams.debugFlag) @@ -110,7 +116,7 @@ log.info("Executing commands: %s" % str(pilotParams.commands)) - if remote: + if remote and pilotParams.jwt: # It's safer to cancel the timer here. Each command has got its own logger object with a timer cancelled by the # finaliser. No need for a timer in the "else" code segment below. try: @@ -128,12 +134,18 @@ log.error("Command %s could not be instantiated" % commandName) # send the last message and abandon ship. if remote: - log.buffer.flush() + log.buffer.flush(force=True) sys.exit(-1) log.info("Pilot tasks finished.") + + if not remote: + log.buffer.flush() if pilotParams.jwt: + if remote: + log.buffer.flush(force=True) + log.info("Revoking pilot token.") revokePilotToken( pilotParams.diracXServer, diff --git a/Pilot/pilotCommands.py b/Pilot/pilotCommands.py index 1b684df2..20ccd014 100644 --- a/Pilot/pilotCommands.py +++ b/Pilot/pilotCommands.py @@ -101,16 +101,37 @@ def wrapper(self): self.log.info( "Flushing the remote logger buffer for pilot on sys.exit(): %s (exit code:%s)" % (pRef, str(exCode)) ) - self.log.buffer.flush() # flush the buffer unconditionally (on sys.exit()). - try: - sendMessage(self.log.url, self.log.pilotUUID, self.log.wnVO, "finaliseLogs", {"retCode": str(exCode)}) - except Exception as exc: - self.log.error("Remote logger couldn't be finalised %s " % str(exc)) + if self.pp.jwt: + try: + sendMessage(self.log.url, self.log.pilotUUID, self.pp.jwt, [ + { + "severity": "ERROR", + "message": str(exCode) + }, + { + "severity": "ERROR", + "message": traceback.format_exc() + } + ]) + + self.log.buffer.flush(force=True) + except Exception as exc: + self.log.error("Remote logger couldn't be finalised %s " % str(exc)) + raise + + # No force here because there's no remote logger if we're here + self.log.buffer.flush() raise except Exception as exc: # unexpected exit: document it and bail out. self.log.error(str(exc)) self.log.error(traceback.format_exc()) + + if self.pp.jwt: + # Force flush if it's a remote logger + self.log.buffer.flush(force=True) + else: + self.log.buffer.flush() raise finally: self.log.buffer.cancelTimer() @@ -141,7 +162,7 @@ def __init__(self, pilotParams): @logFinalizer def execute(self): """Get host and local user info, and other basic checks, e.g. space available""" - + self.log.info("Uname = %s" % " ".join(os.uname())) self.log.info("Host Name = %s" % socket.gethostname()) self.log.info("Host FQDN = %s" % socket.getfqdn()) diff --git a/Pilot/pilotTools.py b/Pilot/pilotTools.py index a5ad854a..e52953f0 100644 --- a/Pilot/pilotTools.py +++ b/Pilot/pilotTools.py @@ -2,6 +2,7 @@ from __future__ import absolute_import, division, print_function +import enum import fcntl import getopt import json @@ -523,7 +524,7 @@ def __init__( pilotUUID="unknown", flushInterval=10, bufsize=1000, - wnVO="unknown", + jwt = {} ): """ c'tor @@ -533,36 +534,45 @@ def __init__( super(RemoteLogger, self).__init__(name, debugFlag, pilotOutput) self.url = url self.pilotUUID = pilotUUID - self.wnVO = wnVO self.isPilotLoggerOn = isPilotLoggerOn - sendToURL = partial(sendMessage, url, pilotUUID, wnVO, "sendMessage") - self.buffer = FixedSizeBuffer(sendToURL, bufsize=bufsize, autoflush=flushInterval) + sendToURL = partial(sendMessage, url, pilotUUID) + self.buffer = FixedSizeBuffer(sendToURL, bufsize=bufsize, autoflush=flushInterval, jwt=jwt) + + def format_to_json(self, level, message): + splitted_message = message.split("\n") + + output = [] + for mess in splitted_message: + if mess: + output.append({ + "timestamp": datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S.%fZ"), + "severity": level, + "message": mess, + "scope": self.name + }) + return output def debug(self, msg, header=True, _sendPilotLog=False): - # TODO: Send pilot log remotely? super(RemoteLogger, self).debug(msg, header) if ( self.isPilotLoggerOn and self.debugFlag ): # the -d flag activates this debug flag in CommandBase via PilotParams - self.sendMessage(self.messageTemplate.format(level="DEBUG", message=msg)) + self.sendMessage(self.format_to_json(level="DEBUG", message=msg)) def error(self, msg, header=True, _sendPilotLog=False): - # TODO: Send pilot log remotely? super(RemoteLogger, self).error(msg, header) if self.isPilotLoggerOn: - self.sendMessage(self.messageTemplate.format(level="ERROR", message=msg)) + self.sendMessage(self.format_to_json(level="ERROR", message=msg)) def warn(self, msg, header=True, _sendPilotLog=False): - # TODO: Send pilot log remotely? super(RemoteLogger, self).warn(msg, header) if self.isPilotLoggerOn: - self.sendMessage(self.messageTemplate.format(level="WARNING", message=msg)) + self.sendMessage(self.format_to_json(level="WARNING", message=msg)) def info(self, msg, header=True, _sendPilotLog=False): - # TODO: Send pilot log remotely? super(RemoteLogger, self).info(msg, header) if self.isPilotLoggerOn: - self.sendMessage(self.messageTemplate.format(level="INFO", message=msg)) + self.sendMessage(self.format_to_json(level="INFO", message=msg)) def sendMessage(self, msg): """ @@ -574,7 +584,7 @@ def sendMessage(self, msg): :rtype: None """ try: - self.buffer.write(msg + "\n") + self.buffer.write(msg) except Exception as err: super(RemoteLogger, self).error("Message not sent") super(RemoteLogger, self).error(str(err)) @@ -601,7 +611,7 @@ class FixedSizeBuffer(object): Once it's full, a message is sent to a remote server and the buffer is renewed. """ - def __init__(self, senderFunc, bufsize=1000, autoflush=10): + def __init__(self, senderFunc, bufsize=1000, autoflush=10, jwt={}): """ Constructor. @@ -619,34 +629,32 @@ def __init__(self, senderFunc, bufsize=1000, autoflush=10): self._timer.start() else: self._timer = None - self.output = StringIO() + self.output = [] self.bufsize = bufsize self._nlines = 0 self.senderFunc = senderFunc + self.jwt = jwt @synchronized - def write(self, text): + def write(self, content_json): """ Write text to a string buffer. Newline characters are counted and number of lines in the buffer is increased accordingly. - :param text: text string to write - :type text: str + :param content_json: Json to send, format following format_to_json + :type content_json: list[dict] :return: None :rtype: None """ - # reopen the buffer in a case we had to flush a partially filled buffer - if self.output.closed: - self.output = StringIO() - self.output.write(text) - self._nlines += max(1, text.count("\n")) + + self.output.extend(content_json) + + try: + self._nlines += max(1, len(content_json)) + except Exception: + raise ValueError(content_json) self.sendFullBuffer() - @synchronized - def getValue(self): - content = self.output.getvalue() - return content - @synchronized def sendFullBuffer(self): """ @@ -656,22 +664,19 @@ def sendFullBuffer(self): if self._nlines >= self.bufsize: self.flush() - self.output = StringIO() + self.output = [] @synchronized - def flush(self): + def flush(self, force=False): """ Flush the buffer and send log records to a remote server. The buffer is closed as well. :return: None :rtype: None """ - if not self.output.closed and self._nlines > 0: - self.output.flush() - buf = self.getValue() - self.senderFunc(buf) + if force or (self.output and self._nlines > 0): + self.senderFunc(self.jwt, self.output) self._nlines = 0 - self.output.close() def cancelTimer(self): """ @@ -684,13 +689,12 @@ def cancelTimer(self): self._timer.cancel() -def sendMessage(url, pilotUUID, wnVO, method, rawMessage, jwt={}): +def sendMessage(url, pilotUUID, jwt={}, rawMessage=[]): """ Invoke a remote method on a Tornado server and pass a JSON message to it. :param str url: Server URL :param str pilotUUID: pilot unique ID - :param str wnVO: VO name, relevant only if not contained in a proxy :param str method: a method to be invoked :param str rawMessage: a message to be sent, in JSON format :param dict jwt: JWT for the requests @@ -699,34 +703,25 @@ def sendMessage(url, pilotUUID, wnVO, method, rawMessage, jwt={}): caPath = os.getenv("X509_CERT_DIR") - message = json.dumps((json.dumps(rawMessage), pilotUUID, wnVO)) - raw_data = {"method": method, "args": message} + raw_data = { + "pilot_stamp": pilotUUID, + "lines": rawMessage + } config = None - if jwt: - - config = TokenBasedRequest( - url=url, - caPath=caPath, - jwtData=jwt, - pilotUUID=pilotUUID - ) - - else: - cert = os.getenv("X509_USER_PROXY") + config = TokenBasedRequest( + url=url, + caPath=caPath, + jwtData=jwt, + pilotUUID=pilotUUID + ) - config = X509BasedRequest( - url=url, - caPath=caPath, - certEnv=cert, - pilotUUID=pilotUUID - ) - # Do the request _res = config.executeRequest( raw_data=raw_data, - content_type="x-www-form-urlencoded" + insecure=True, + json_output=False ) @@ -762,7 +757,7 @@ def __init__(self, pilotParams): debugFlag=self.debugFlag, flushInterval=interval, bufsize=bufsize, - wnVO=pilotParams.wnVO, + jwt=pilotParams.jwt ) self.log.isPilotLoggerOn = isPilotLoggerOn @@ -813,7 +808,10 @@ def executeAndGetOutput(self, cmd, environDict=None): sys.stdout.write(outChunk) sys.stdout.flush() if hasattr(self.log, "buffer") and self.log.isPilotLoggerOn: - self.log.buffer.write(outChunk) + self.log.buffer.write(self.log.format_to_json( + "COMMAND", + outChunk + )) outData += outChunk # If no data was read on any of the pipes then the process has finished if not dataWasRead: diff --git a/Pilot/proxyTools.py b/Pilot/proxyTools.py index d490c927..b7c882b2 100644 --- a/Pilot/proxyTools.py +++ b/Pilot/proxyTools.py @@ -3,6 +3,7 @@ from __future__ import absolute_import, division, print_function import json +from multiprocessing import Value import os import time import re @@ -112,15 +113,17 @@ def addHeader(self, key, value): """Add a header (key, value) into the request header""" self.headers[key] = value - def executeRequest(self, raw_data, insecure=False, content_type="json"): + def executeRequest(self, raw_data, insecure=False, content_type="json", json_output=True): """Execute a HTTP request with the data, headers, and the pre-defined data (SSL + auth) :param raw_data: Data to send :type raw_data: dict :param insecure: Deactivate proxy verification WARNING Debug ONLY :type insecure: bool - :param content_type: Data format to send, either "json" or "x-www-form-urlencoded" + :param content_type: Data format to send, either "json" or "x-www-form-urlencoded" or "query" :type content_type: str + :param json_output: If we have an output + :type json_output: bool :return: Parsed JSON response :rtype: dict """ @@ -172,10 +175,11 @@ def executeRequest(self, raw_data, insecure=False, content_type="json"): except HTTPError as e: raise RuntimeError("HTTPError : %s" % e.read().decode()) - try: - return json.loads(response_data) # Parse JSON response - except ValueError: # In Python 2, json.JSONDecodeError is a subclass of ValueError - raise ValueError("Invalid JSON response: %s" % response_data) + if json_output: + try: + return json.loads(response_data) # Parse JSON response + except ValueError: # In Python 2, json.JSONDecodeError is a subclass of ValueError + raise ValueError("Invalid JSON response: %s" % response_data) class TokenBasedRequest(BaseRequest): @@ -190,12 +194,13 @@ def addJwtToHeader(self): # Adds the JWT in the HTTP request (in the Bearer field) self.headers["Authorization"] = "Bearer %s" % self.jwtData["access_token"] - def executeRequest(self, raw_data, insecure=False, content_type="json"): + def executeRequest(self, raw_data, insecure=False, content_type="json", json_output=True): return super(TokenBasedRequest, self).executeRequest( raw_data, insecure=insecure, - content_type=content_type + content_type=content_type, + json_output=json_output ) class X509BasedRequest(BaseRequest): @@ -216,14 +221,15 @@ def __init__(self, url, caPath, certEnv, pilotUUID): ) self._hasExtraCredentials = True - def executeRequest(self, raw_data, insecure=False, content_type="json"): + def executeRequest(self, raw_data, insecure=False, content_type="json", json_output=True): # Adds a flag if the passed cert is a Directory if self._hasExtraCredentials: raw_data["extraCredentials"] = '"hosts"' return super(X509BasedRequest, self).executeRequest( raw_data, insecure=insecure, - content_type=content_type + content_type=content_type, + json_output=json_output )