From a4690cf428b4287f8b55cb086d3aa9df6c44e14c Mon Sep 17 00:00:00 2001 From: Rik van Duijn Date: Thu, 22 Jan 2026 10:14:59 +0100 Subject: [PATCH 1/2] Refactoring headers (fixed history) --- roadlib/roadtools/roadlib/auth.py | 39 +++++++++++++++++++++++++++++++ roadtx/roadtools/roadtx/main.py | 20 ++++++++++++++++ 2 files changed, 59 insertions(+) diff --git a/roadlib/roadtools/roadlib/auth.py b/roadlib/roadtools/roadlib/auth.py index bfa017ba..1bc18bde 100755 --- a/roadlib/roadtools/roadlib/auth.py +++ b/roadlib/roadtools/roadlib/auth.py @@ -66,6 +66,7 @@ def __init__(self, username=None, password=None, tenant=None, client_id='1b73095 self.claims = None self.use_pkce = False self.pkce_secret = None + self.custom_headers = {} # For cert based app auth self.appprivkey = None @@ -116,6 +117,7 @@ def set_user_agent(self, useragent): """ self.user_agent = self.lookup_user_agent(useragent) +<<<<<<< Updated upstream def get_redirect_for_client(self, client_id, interactive=False, broker=False): """ Try to resolve a valid redirect URI for a client, using roadtx lookup if available. @@ -126,6 +128,14 @@ def get_redirect_for_client(self, client_id, interactive=False, broker=False): return utils.find_redirurl_for_client(client_id, interactive=interactive, broker=broker) except Exception: return 'https://login.microsoftonline.com/common/oauth2/nativeclient' +======= + def set_custom_headers(self, headers_dict): + """ + Sets custom headers to include in all HTTP requests + """ + if headers_dict: + self.custom_headers = headers_dict +>>>>>>> Stashed changes def user_discovery_v1(self, username): """ @@ -1595,6 +1605,9 @@ def get_sub_argparse(auth_parser, for_rr=False): auth_parser.add_argument('--tokens-stdout', action='store_true', help='Do not store tokens on disk, pipe to stdout instead') + auth_parser.add_argument('--headers', + action='store', + help='Custom headers in JSON format, e.g., \'{"X-AnchorMailbox": "Oid:objectid@tenantid"}\'') auth_parser.add_argument('--debug', action='store_true', help='Enable debug logging to disk') @@ -1801,6 +1814,10 @@ def requests_get(self, *args, **kwargs): headers = kwargs.get('headers',{}) headers['User-Agent'] = self.user_agent kwargs['headers'] = headers + if self.custom_headers: + headers = kwargs.get('headers',{}) + headers.update(self.custom_headers) + kwargs['headers'] = headers return requests.get(*args, timeout=30.0, **kwargs) def requests_post(self, *args, **kwargs): @@ -1817,6 +1834,10 @@ def requests_post(self, *args, **kwargs): headers = kwargs.get('headers',{}) headers['Origin'] = self.origin kwargs['headers'] = headers + if self.custom_headers: + headers = kwargs.get('headers',{}) + headers.update(self.custom_headers) + kwargs['headers'] = headers return requests.post(*args, timeout=30.0, **kwargs) def requests_put(self, *args, **kwargs): @@ -1833,6 +1854,10 @@ def requests_put(self, *args, **kwargs): headers = kwargs.get('headers',{}) headers['Origin'] = self.origin kwargs['headers'] = headers + if self.custom_headers: + headers = kwargs.get('headers',{}) + headers.update(self.custom_headers) + kwargs['headers'] = headers return requests.put(*args, timeout=30.0, **kwargs) def requests_patch(self, *args, **kwargs): @@ -1849,6 +1874,10 @@ def requests_patch(self, *args, **kwargs): headers = kwargs.get('headers',{}) headers['Origin'] = self.origin kwargs['headers'] = headers + if self.custom_headers: + headers = kwargs.get('headers',{}) + headers.update(self.custom_headers) + kwargs['headers'] = headers return requests.patch(*args, timeout=30.0, **kwargs) def requests_delete(self, *args, **kwargs): @@ -1861,6 +1890,10 @@ def requests_delete(self, *args, **kwargs): headers = kwargs.get('headers',{}) headers['User-Agent'] = self.user_agent kwargs['headers'] = headers + if self.custom_headers: + headers = kwargs.get('headers',{}) + headers.update(self.custom_headers) + kwargs['headers'] = headers return requests.delete(*args, timeout=30.0, **kwargs) def parse_args(self, args): @@ -1877,6 +1910,12 @@ def parse_args(self, args): self.set_resource_uri(args.resource) self.set_scope(args.scope) self.set_user_agent(args.user_agent) + if hasattr(args, 'headers') and args.headers: + try: + headers_dict = json.loads(args.headers) + self.set_custom_headers(headers_dict) + except json.JSONDecodeError: + print(f'Error: Invalid JSON format for headers: {args.headers}') if args.cae: self.set_cae() if args.force_mfa: diff --git a/roadtx/roadtools/roadtx/main.py b/roadtx/roadtools/roadtx/main.py index 79c225e8..13df8515 100644 --- a/roadtx/roadtools/roadtx/main.py +++ b/roadtx/roadtools/roadtx/main.py @@ -84,6 +84,9 @@ def main(): '--broker-redirect-url', action='store', help='Broker redirect URL (for Nested App Auth)') + rttsauth_parser.add_argument('--headers', + action='store', + help='Custom headers in JSON format, e.g., \'{"X-AnchorMailbox": "Oid:objectid@tenantid"}\'') # Construct device module device_parser = subparsers.add_parser('device', help='Register or join devices to Azure AD') @@ -708,6 +711,9 @@ def main(): intauth_parser.add_argument('--otpseed', action='store', help='TOTP seed to calculate MFA code when prompted') + intauth_parser.add_argument('--headers', + action='store', + help='Custom headers in JSON format, e.g., \'{"X-AnchorMailbox": "Oid:objectid@tenantid"}\'') # Interactive auth using Selenium - creds from keepass kdbauth_parser = subparsers.add_parser('keepassauth', help='Selenium based authentication with credentials from a KeePass database') @@ -1091,6 +1097,13 @@ def main(): auth.set_user_agent(args.user_agent) auth.set_scope(args.scope) auth.outfile = args.tokenfile + if args.headers: + try: + headers_dict = json.loads(args.headers) + auth.set_custom_headers(headers_dict) + except json.JSONDecodeError: + print(f'Error: Invalid JSON format for headers: {args.headers}') + return if args.origin: auth.set_origin_value(args.origin) elif 'originheader' in tokenobject: @@ -1553,6 +1566,13 @@ def main(): auth.set_user_agent(args.user_agent) auth.tenant = args.tenant auth.use_pkce = args.pkce + if args.headers: + try: + headers_dict = json.loads(args.headers) + auth.set_custom_headers(headers_dict) + except json.JSONDecodeError: + print(f'Error: Invalid JSON format for headers: {args.headers}') + return if args.origin: auth.set_origin_value(args.origin, args.redirect_url) if args.cae: From 9d7f4a659b484938610c2e9cee9457d58ef11a23 Mon Sep 17 00:00:00 2001 From: Rik van Duijn Date: Sun, 8 Feb 2026 22:37:30 +0100 Subject: [PATCH 2/2] Remove merge conflict artifacts from auth.py --- roadlib/roadtools/roadlib/auth.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/roadlib/roadtools/roadlib/auth.py b/roadlib/roadtools/roadlib/auth.py index 1bc18bde..04066667 100755 --- a/roadlib/roadtools/roadlib/auth.py +++ b/roadlib/roadtools/roadlib/auth.py @@ -117,7 +117,6 @@ def set_user_agent(self, useragent): """ self.user_agent = self.lookup_user_agent(useragent) -<<<<<<< Updated upstream def get_redirect_for_client(self, client_id, interactive=False, broker=False): """ Try to resolve a valid redirect URI for a client, using roadtx lookup if available. @@ -128,14 +127,13 @@ def get_redirect_for_client(self, client_id, interactive=False, broker=False): return utils.find_redirurl_for_client(client_id, interactive=interactive, broker=broker) except Exception: return 'https://login.microsoftonline.com/common/oauth2/nativeclient' -======= + def set_custom_headers(self, headers_dict): """ Sets custom headers to include in all HTTP requests """ if headers_dict: self.custom_headers = headers_dict ->>>>>>> Stashed changes def user_discovery_v1(self, username): """