-
Notifications
You must be signed in to change notification settings - Fork 24
Expand file tree
/
Copy path_oidc_utils.py
More file actions
195 lines (152 loc) · 6.26 KB
/
_oidc_utils.py
File metadata and controls
195 lines (152 loc) · 6.26 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
import base64
import hashlib
import json
import os
from urllib.parse import urlencode, urlparse
import httpx
from ..._utils._http_clients import get_httpx_client_kwargs
from .._utils._console import ConsoleLogger
from ._models import AuthConfig
from ._url_utils import build_service_url
def generate_code_verifier_and_challenge():
"""Generate PKCE code verifier and challenge."""
code_verifier = base64.urlsafe_b64encode(os.urandom(32)).decode("utf-8").rstrip("=")
code_challenge_bytes = hashlib.sha256(code_verifier.encode("utf-8")).digest()
code_challenge = (
base64.urlsafe_b64encode(code_challenge_bytes).decode("utf-8").rstrip("=")
)
return code_verifier, code_challenge
def get_state_param() -> str:
return base64.urlsafe_b64encode(os.urandom(32)).decode("utf-8").rstrip("=")
def _get_version_from_api(domain: str) -> str | None:
"""Fetch the version from the UiPath orchestrator API.
Args:
domain: The UiPath domain (e.g., 'https://alpha.uipath.com')
Returns:
The version string (e.g., '25.10.0-beta.415') or None if unable to fetch
"""
try:
version_url = build_service_url(domain, "/orchestrator_/api/status/version")
client_kwargs = get_httpx_client_kwargs()
# Override timeout for version check
client_kwargs["timeout"] = 5.0
with httpx.Client(**client_kwargs) as client:
response = client.get(version_url)
response.raise_for_status()
data = response.json()
return data.get("version")
except Exception:
# Silently fail and return None if we can't fetch the version
return None
def _is_cloud_domain(domain: str) -> bool:
"""Check if the domain is a cloud domain (alpha, staging, or cloud.uipath.com).
Args:
domain: The domain string (e.g., 'https://alpha.uipath.com')
Returns:
True if it's a cloud domain, False otherwise
"""
parsed = urlparse(domain)
netloc = parsed.netloc.lower()
return netloc in [
"alpha.uipath.com",
"staging.uipath.com",
"cloud.uipath.com",
]
def _select_config_file(domain: str) -> str:
"""Select the appropriate auth config file based on domain and version.
Logic:
1. If domain is alpha/staging/cloud.uipath.com -> use auth_config_cloud.json
2. Otherwise, try to get version from API
3. If version starts with '25.10' -> use auth_config_25_10.json
4. If version can't be determined -> fallback to auth_config_cloud.json
5. Otherwise -> fallback to auth_config_cloud.json
Args:
domain: The UiPath domain
Returns:
The filename of the config to use
"""
# Check if it's a known cloud domain
if _is_cloud_domain(domain):
return "auth_config_cloud.json"
# Try to get version from API
version = _get_version_from_api(domain)
# If we can't determine version, fallback to cloud config
if version is None:
return "auth_config_cloud.json"
# Check if version is 25.10.*
if version.startswith("25.10"):
return "auth_config_25_10.json"
# Default fallback to cloud config
return "auth_config_cloud.json"
class OidcUtils:
_console = ConsoleLogger()
@classmethod
def _find_free_port(cls, candidates: list[int]):
from socket import AF_INET, SOCK_STREAM, error, socket
def is_free(port: int) -> bool:
with socket(AF_INET, SOCK_STREAM) as s:
try:
s.bind(("localhost", port))
return True
except error:
return False
return next((p for p in candidates if is_free(p)), None)
@classmethod
def get_auth_config(cls, domain: str | None = None) -> AuthConfig:
"""Get the appropriate auth configuration based on domain.
Args:
domain: The UiPath domain (e.g., 'https://cloud.uipath.com').
If None, uses default auth_config_cloud.json
Returns:
AuthConfig with the appropriate configuration
"""
# Select the appropriate config file based on domain
if domain:
config_file = _select_config_file(domain)
else:
config_file = "auth_config_cloud.json"
config_path = os.path.join(os.path.dirname(__file__), config_file)
with open(config_path, "r") as f:
auth_config = json.load(f)
custom_port = os.getenv("UIPATH_AUTH_PORT")
candidates = [int(custom_port)] if custom_port else [8104, 8055, 42042]
port = cls._find_free_port(candidates)
if port is None:
ports_str = ", ".join(str(p) for p in candidates)
cls._console.error(
f"All configured ports ({ports_str}) are in use. Please close applications using these ports or configure different ports."
)
redirect_uri = auth_config["redirect_uri"].replace(
"__PY_REPLACE_PORT__", str(port)
)
return AuthConfig(
client_id=auth_config["client_id"],
redirect_uri=redirect_uri,
scope=auth_config["scope"],
port=port,
)
@classmethod
def get_auth_url(cls, domain: str, auth_config: AuthConfig) -> tuple[str, str, str]:
"""Get the authorization URL for OAuth2 PKCE flow.
Args:
domain (str): The UiPath domain to authenticate against (e.g. 'alpha', 'cloud')
auth_config (AuthConfig): The authentication configuration to use
Returns:
tuple[str, str]: A tuple containing:
- The authorization URL with query parameters
- The code verifier for PKCE flow
"""
code_verifier, code_challenge = generate_code_verifier_and_challenge()
state = get_state_param()
query_params = {
"client_id": auth_config["client_id"],
"redirect_uri": auth_config["redirect_uri"],
"response_type": "code",
"scope": auth_config["scope"],
"state": state,
"code_challenge": code_challenge,
"code_challenge_method": "S256",
}
query_string = urlencode(query_params)
url = build_service_url(domain, f"/identity_/connect/authorize?{query_string}")
return url, code_verifier, state