Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 59 additions & 33 deletions pygazpar/api_client.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import http.cookiejar
import json
import logging
import re
import time
import traceback
from datetime import date
Expand All @@ -9,21 +8,20 @@

from requests import Response, Session

SESSION_TOKEN_URL = "https://connexion.grdf.fr/api/v1/authn"
SESSION_TOKEN_PAYLOAD = """{{
"username": "{0}",
"password": "{1}",
"options": {{
"multiOptionalFactorEnroll": "false",
"warnBeforePasswordExpired": "false"
}}
START_URL = "https://monespace.grdf.fr/"

MAIL_SESSION_TOKEN_URL = "https://connexion.grdf.fr/idp/idx/identify"
MAIL_SESSION_TOKEN_PAYLOAD = """{{
"identifier": "{0}",
"stateHandle": "{1}"
}}"""

AUTH_TOKEN_URL = "https://connexion.grdf.fr/login/sessionCookieRedirect"
AUTH_TOKEN_PARAMS = """{{
"checkAccountSetupComplete": "true",
"token": "{0}",
"redirectUrl": "https://monespace.grdf.fr"
PASSWORD_SESSION_TOKEN_URL = "https://connexion.grdf.fr/idp/idx/challenge/answer"
PASSWORD_SESSION_TOKEN_PAYLOAD = """{{
"credentials": {{
"passcode": "{0}"
}},
"stateHandle": "{1}"
}}"""

API_BASE_URL = "https://monespace.grdf.fr/api"
Expand Down Expand Up @@ -79,38 +77,66 @@ def login(self):
return

session = Session()
session.headers.update({"domain": "grdf.fr"})
session.headers.update({"Content-Type": "application/json"})
session.headers.update({"X-Requested-With": "XMLHttpRequest"})
session.headers.update({"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64)"})

payload = SESSION_TOKEN_PAYLOAD.format(self._username, self._password)
start_response = session.get(START_URL)
if start_response.status_code != 200:
raise ServerError(
f"An error occurred while logging in start. Status code: {start_response.status_code} - {start_response.url}",
start_response.status_code,
)

response = session.post(SESSION_TOKEN_URL, data=payload)
pattern = r'"stateToken"\s*:\s*"([^"]+)"'
match = re.search(pattern, start_response.text)
if match:
state_token_html = match.group(1)
state_token = state_token_html.replace("\\x2D", "-")
else:
raise ValueError("Cannot retrieve stateToken inside HTML response")

payload = MAIL_SESSION_TOKEN_PAYLOAD.format(self._username, state_token)
session.cookies.set("ln", self._username)

mail_response = session.post(
MAIL_SESSION_TOKEN_URL,
data=payload,
headers={"Accept": "application/json; okta-version=1.0.0", "Content-Type": "application/json"},
)

if response.status_code != 200:
if mail_response.status_code != 200:
raise ServerError(
f"An error occurred while logging in. Status code: {response.status_code} - {response.text}",
response.status_code,
f"An error occurred while logging in mail. Status code: {mail_response.status_code} - {mail_response.text}",
mail_response.status_code,
)

session_token = response.json().get("sessionToken")
state_handle = mail_response.json().get("stateHandle")

jar = http.cookiejar.CookieJar()
payload = PASSWORD_SESSION_TOKEN_PAYLOAD.format(self._password, state_handle)

self._session = Session() # pylint: disable=attribute-defined-outside-init
self._session.headers.update({"Content-Type": "application/json"})
self._session.headers.update({"X-Requested-With": "XMLHttpRequest"})
password_response = session.post(
PASSWORD_SESSION_TOKEN_URL,
data=payload,
headers={"Accept": "application/json; okta-version=1.0.0", "Content-Type": "application/json"},
)

params = json.loads(AUTH_TOKEN_PARAMS.format(session_token))
if password_response.status_code != 200:
raise ServerError(
f"An error occurred while logging in password. Status code: {password_response.status_code} - {password_response.text}",
password_response.status_code,
)

response = self._session.get(AUTH_TOKEN_URL, params=params, allow_redirects=True, cookies=jar) # type: ignore
success_url = password_response.json()["success"]["href"]

if response.status_code != 200:
response_redirect = session.get(success_url)

if response_redirect.status_code != 200:
raise ServerError(
f"An error occurred while getting the auth token. Status code: {response.status_code} - {response.text}",
response.status_code,
f"An error occurred while logging in response_redirect. Status code: {response_redirect.status_code} - {response_redirect.url}",
response_redirect.status_code,
)

self._session = session

# ------------------------------------------------------
def is_logged_in(self) -> bool:
return self._session is not None
Expand Down
Loading