1+ from __future__ import absolute_import
2+
3+ import os
4+ from typing import Literal , Tuple
5+
6+ from six .moves .urllib .parse import urlparse
7+
8+ import logging
9+
10+ from requests .auth import AuthBase
11+ import requests
12+
13+ from solvebio import SolveError
14+ from solvebio .cli .credentials import get_credentials
15+
16+ logger = logging .getLogger ("solvebio" )
17+
18+
19+ class SolveBioTokenAuth (AuthBase ):
20+ """Custom auth handler for SolveBio API token authentication"""
21+
22+ def __init__ (self , token = None , token_type = "Token" ):
23+ self .token = token
24+ self .token_type = token_type
25+
26+ def __call__ (self , r ):
27+ if self .token :
28+ r .headers ["Authorization" ] = "{0} {1}" .format (self .token_type , self .token )
29+ return r
30+
31+ def __repr__ (self ):
32+ if self .token :
33+ return self .token_type
34+ else :
35+ return "Anonymous"
36+
37+
38+ def authenticate (
39+ host : str ,
40+ token : str ,
41+ token_type : Literal ["Bearer" , "Token" ],
42+ * ,
43+ raise_on_missing : bool = True ,
44+ debug : bool = False
45+ ) -> Tuple [str , SolveBioTokenAuth ]:
46+ """
47+ Sets login credentials for SolveBio API authentication.
48+
49+ :param str host: API host url
50+ :param str token: API access token or API key
51+ :param str token_type: API token type. `Bearer` is used for access tokens, while `Token` is used for API Keys.
52+ :param bool raise_on_missing: Raise an exception if no credentials are available.
53+ """
54+ # used for debugging
55+ source_host = None
56+ source_token = None
57+
58+ # Find credentials from environment variables
59+ if not host :
60+ host = (
61+ os .environ .get ("QUARTZBIO_API_HOST" , None )
62+ or os .environ .get ("EDP_API_HOST" , None )
63+ or os .environ .get ("SOLVEBIO_API_HOST" , None )
64+ )
65+
66+ if not token :
67+ api_key = (
68+ os .environ .get ("QUARTZBIO_API_KEY" , None )
69+ or os .environ .get ("EDP_API_KEY" , None )
70+ or os .environ .get ("SOLVEBIO_API_KEY" , None )
71+ )
72+
73+ access_token = (
74+ os .environ .get ("QUARTZ_ACCESS_TOKEN" , None )
75+ or os .environ .get ("EDP_ACCESS_TOKEN" , None )
76+ or os .environ .get ("SOLVEBIO_ACCESS_TOKEN" , None )
77+ )
78+
79+ if access_token :
80+ token = access_token
81+ token_type = "Bearer"
82+ elif api_key :
83+ token = api_key
84+ token_type = "Token"
85+
86+ if token :
87+ source_token = 'envvars'
88+ else :
89+ source_token = 'params'
90+
91+ # Find credentials from local credentials file
92+ if not token :
93+ if creds := get_credentials (host ):
94+ token_type = creds .token_type
95+ token = creds .token
96+
97+ if host is None :
98+ # this happens when user/ennvars provided no API host for the login command
99+ # but the credentials file still contains login credentials
100+ host = creds .api_host
101+
102+ if host :
103+ source_host = 'creds'
104+ if token :
105+ source_token = 'creds'
106+
107+ if (not host and raise_on_missing ) or debug :
108+ # this will tell the user where QB Client found the credentials from
109+ creds_path = netrc .path ()
110+ print ('\n ' .join ([
111+ "Login Debug:" ,
112+ f"--> Host: { host } \n (source: { source_host } )" ,
113+ f"--> Token Type: { token_type } \n (source: { source_token } )" ,
114+ "\n 1) source: params" ,
115+ " Means that you've passed this through the login CLI command:" ,
116+ " quartzbio login --host <EDP_HOST> --access_token <EDP_TOKEN>" ,
117+ "\n or the quartzbio.login function:" ,
118+ " import quartzbio" ,
119+ " quartzbio.login(debug=True)" ,
120+ "\n 2) source: creds" ,
121+ " Means that the QB client has saved your credentials in:" ,
122+ f" { creds_path } " ,
123+ "\n 3) source: envvars" ,
124+ " Means that you've set your credentials through environment variables:" ,
125+ " QUARTZBIO_API_HOST" ,
126+ " QUARTZBIO_ACCESS_TOKEN" ,
127+ " QUARTZBIO_API_KEY" ,
128+ ]))
129+
130+ if not debug :
131+ raise SolveError ("No SolveBio API host is set" )
132+
133+ host = validate_api_host_url (host )
134+
135+ # If the domain ends with .solvebio.com, determine if
136+ # we are being redirected. If so, update the url with the new host
137+ # and log a warning.
138+ if host and host .rstrip ("/" ).endswith (".api.solvebio.com" ):
139+ old_host = host .rstrip ("/" )
140+ response = requests .head (old_host , allow_redirects = True )
141+ # Strip the port number from the host for comparison
142+ new_host = validate_api_host_url (response .url ).rstrip ("/" ).replace (":443" , "" )
143+
144+ if old_host != new_host :
145+ logger .warning (
146+ 'API host redirected from "{}" to "{}", '
147+ "please update your local credentials file" .format (old_host , new_host )
148+ )
149+ host = new_host
150+
151+ if token is not None :
152+ auth = SolveBioTokenAuth (token , token_type )
153+ else :
154+ auth = None
155+
156+ # TODO: warn user if WWW url is provided in edp_login!
157+
158+ # @TODO: remove references to solvebio.api_host, etc...
159+
160+ return host , auth
161+
162+
163+ def validate_api_host_url (url ):
164+ """
165+ Validate SolveBio API host url.
166+
167+ Valid urls must not be empty and
168+ must contain either HTTP or HTTPS scheme.
169+ """
170+
171+ # Default to https if no scheme is set
172+ if "://" not in url :
173+ url = "https://" + url
174+
175+ parsed = urlparse (url )
176+ if parsed .scheme not in ["http" , "https" ]:
177+ raise SolveError (
178+ "Invalid API host: %s. " "Missing url scheme (HTTP or HTTPS)." % url
179+ )
180+ elif not parsed .netloc :
181+ raise SolveError ("Invalid API host: %s." % url )
182+
183+ return parsed .geturl ()
0 commit comments