From 883812744c7e7067ed252b9c4c8721ad56d20cfe Mon Sep 17 00:00:00 2001 From: JonnyWong16 <9099342+JonnyWong16@users.noreply.github.com> Date: Tue, 30 Sep 2025 17:08:46 -0700 Subject: [PATCH 01/10] Add private.key and public.key to .gitignore --- .gitignore | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.gitignore b/.gitignore index d55a5c2e2..636bd8c5f 100644 --- a/.gitignore +++ b/.gitignore @@ -27,6 +27,8 @@ pyvenv.cfg MANIFEST venv/ .venv/ +private.key +public.key # path for the test lib. plex/ From c3f315eea2af401fa6dc75049c24ab768870f4cc Mon Sep 17 00:00:00 2001 From: JonnyWong16 <9099342+JonnyWong16@users.noreply.github.com> Date: Tue, 30 Sep 2025 17:09:38 -0700 Subject: [PATCH 02/10] Add `MyPlexJWTAuth` class --- plexapi/myplex.py | 300 +++++++++++++++++++++++++++++++++++++++++++++- plexapi/utils.py | 6 + 2 files changed, 305 insertions(+), 1 deletion(-) diff --git a/plexapi/myplex.py b/plexapi/myplex.py index 1821a873a..49a90f64e 100644 --- a/plexapi/myplex.py +++ b/plexapi/myplex.py @@ -1,12 +1,26 @@ # -*- coding: utf-8 -*- import copy +import hashlib import html import threading import time +from datetime import datetime, timedelta from urllib.parse import parse_qsl, urlencode, urlsplit, urlunsplit import requests +try: + import cryptography + from cryptography.hazmat.primitives import serialization + from cryptography.hazmat.primitives.asymmetric import ed25519 +except ImportError: # pragma: no cover + cryptography = None + +try: + import jwt +except ImportError: # pragma: no cover + jwt = None + from plexapi import (BASE_HEADERS, CONFIG, TIMEOUT, X_PLEX_ENABLE_FAST_CONNECT, X_PLEX_IDENTIFIER, log, logfilter, utils) from plexapi.base import PlexObject, cached_data_property @@ -1686,7 +1700,7 @@ class MyPlexPinLogin: Parameters: session (requests.Session, optional): Use your own session object if you want to - cache the http responses from PMS + cache the http responses from Plex. requestTimeout (int): timeout in seconds on initial connect to plex.tv (default config.TIMEOUT). headers (dict): A dict of X-Plex headers to send with requests. oauth (bool): True to use Plex OAuth instead of PIN login. @@ -1897,6 +1911,290 @@ def _query(self, url, method=None, headers=None, **kwargs): return utils.parseXMLString(response.text) +class MyPlexJWTAuth: + """ MyPlex JWT authentication class to obtain a Plex JWT (JSON Web Token) which can be used in place of a Plex token + when creating a :class:`~plexapi.myplex.MyPlexAccount` instance. + Requires the ``PyJWT`` with ``cryptography`` packages to be installed (``pyjwt[crypto]``). + See: https://developer.plex.tv/pms/#section/API-Info/Authenticating-with-Plex + + Parameters: + session (requests.Session, optional): Use your own session object if you want to + cache the http responses from Plex. + requestTimeout (int): timeout in seconds on initial connect to plex.tv (default config.TIMEOUT). + headers (dict): A dict of X-Plex headers to send with requests. + token (str): Plex token only required to register the device initially. + jwtToken (str): Existing Plex JWT to verify or refresh. + keypair (tuple[str|bytes]): A tuple of the full file paths (str) to the ED25519 private and public key pair + or the raw keys themselves (bytes) to use for signing the JWT. + + Attributes: + AUTH (str): 'https://clients.plex.tv/api/v2/auth' + SCOPES (list): List of all available scopes to request for the JWT. + jwtToken (str): The Plex JWT received after refreshing. + + Example: + + .. code-block:: python + + from plexapi.myplex import MyPlexAccount, MyPlexJWTAuth + + # Generate a new Plex JWT + jwtauth = MyPlexJWTAuth(token='2ffLuB84dqLswk9skLos') + jwtauth.generateKeypair(keyfiles=('private.key', 'public.key')) + jwtauth.registerDevice() + jwtToken = jwtauth.refreshJWT(scope=['username', 'email', 'friendly_name']) + + account = MyPlexAccount(token=jwtToken) + + # Refresh an existing Plex JWT + jwtauth = MyPlexJWTAuth(jwtToken=jwtToken, keypair=('private.key', 'public.key')) + if not jwtauth.verifyJWT(): + jwtToken = jwtauth.refreshJWT(scope=['username', 'email', 'friendly_name']) + + account = MyPlexAccount(token=jwtToken) + + """ + AUTH = 'https://clients.plex.tv/api/v2/auth' + SCOPES = ['username', 'email', 'friendly_name', 'restricted', 'anonymous', 'joinedAt'] + + def __init__(self, session=None, requestTimeout=None, headers=None, token=None, jwtToken=None, keypair=(None, None)): + super(MyPlexJWTAuth, self).__init__() + self._session = session or requests.Session() + self._requestTimeout = requestTimeout or TIMEOUT + self.headers = headers + self._token = token + self.jwtToken = jwtToken + self._privateKey = utils.openOrRead(keypair[0]) if keypair[0] else None + self._publicKey = utils.openOrRead(keypair[1]) if keypair[1] else None + self._clientJWT = None + + if not jwt: + log.warning('PyJWT package is not installed, cannot use Plex JWT login') + return + + def generateKeypair(self, keyfiles=(None, None)): + """ Generates a new ED25519 private/public keypair for signing the JWT and saves them to files. + Requires the ``cryptography`` package to be installed. + + Parameters: + keyfiles (tuple[str]): A tuple of the full file paths to write the private and public keypair to. + """ + if not cryptography: + log.warning('Cryptography package is not installed, cannot generate ED25519 keypair') + return + + privateKey = ed25519.Ed25519PrivateKey.generate() + publicKey = privateKey.public_key() + self._privateKey = privateKey.private_bytes( + encoding=serialization.Encoding.Raw, + format=serialization.PrivateFormat.Raw, + encryption_algorithm=serialization.NoEncryption() + ) + self._publicKey = publicKey.public_bytes( + encoding=serialization.Encoding.Raw, + format=serialization.PublicFormat.Raw + ) + + if keyfiles[0] and keyfiles[1]: + with open(keyfiles[0], 'wb') as privateFile, open(keyfiles[1], 'wb') as publicFile: + privateFile.write(self._privateKey) + publicFile.write(self._publicKey) + + @cached_data_property + def _clientIdentifier(self): + """ Returns the client identifier from the headers. """ + headers = self._headers() + return headers['X-Plex-Client-Identifier'] + + @cached_data_property + def _keyID(self): + """ Returns the key ID (thumbprint) for the ED25519 keypair. """ + return hashlib.sha256(self._privateKey + self._publicKey).hexdigest() + + @cached_data_property + def _privateJWK(self): + """ Returns the private JWK (JSON Web Key) for the ED25519 keypair.""" + return jwt.PyJWK.from_dict({ + 'kty': 'OKP', + 'crv': 'Ed25519', + 'x': utils.base64urlEncode(self._publicKey), + 'd': utils.base64urlEncode(self._privateKey), + 'use': 'sig', + 'alg': 'EdDSA', + 'kid': self._keyID, + }) + + @cached_data_property + def _publicJWK(self): + """ Returns the public JWK (JSON Web Key) for the ED25519 keypair.""" + return jwt.PyJWK.from_dict({ + 'kty': 'OKP', + 'crv': 'Ed25519', + 'x': utils.base64urlEncode(self._publicKey), + 'use': 'sig', + 'alg': 'EdDSA', + 'kid': self._keyID, + }) + + def _encodeClientJWT(self, scope): + """ Encodes the client JWT using the private JWK. + + Parameters: + scope (list[str]): List of scopes to request in the token. + """ + payload = { + 'nonce': self._getPlexNonce(), + 'scope': ','.join(scope), + 'aud': 'plex.tv', + 'iss': self._clientIdentifier, + 'iat': int(datetime.now().timestamp()), + 'exp': int((datetime.now() + timedelta(minutes=5)).timestamp()), + } + headers = { + 'kid': self._keyID + } + self._clientJWT = jwt.encode( + payload, + key=self._privateJWK, + algorithm='EdDSA', + headers=headers + ) + + def _decodePlexJWT(self): + """ Decodes and verifies the Plex JWT using the Plex public JWK. """ + return jwt.decode( + self.jwtToken, + key=jwt.PyJWK.from_dict(self._getPlexPublicJWK()), + algorithms=['EdDSA'], + options={ + 'require': ['aud', 'iss', 'exp', 'iat', 'thumbprint'] + }, + audience=['plex.tv', self._clientIdentifier], + issuer='plex.tv', + ) + + def _registerPlexDevice(self): + """ Registers the public JWK with Plex. """ + url = f'{self.AUTH}/jwk' + headers = self._headers(**{'X-Plex-Token': self._token}) + body = {'jwk': self._publicJWK._jwk_data} + self._query(url, method=self._session.post, headers=headers, json=body) + + def _getPlexNonce(self): + """ Gets a nonce from Plex. """ + url = f'{self.AUTH}/nonce' + data = self._query(url, method=self._session.get) + return data['nonce'] + + def _exchangePlexJWT(self): + """ Exchanges the client JWT for a Plex JWT. """ + url = f'{self.AUTH}/token' + body = {'jwt': self._clientJWT} + data = self._query(url, method=self._session.post, json=body) + return data['auth_token'] + + def _getPlexPublicJWK(self): + """ Gets the Plex public JWK. """ + url = f'{self.AUTH}/keys' + data = self._query(url, method=self._session.get) + return data['keys'][0] + + def registerDevice(self): + """ Registers the device with Plex using the provided token and private/public keypair. + This must be done once before the Plex JWT can be refreshed. + + Raises: + :exc:`~plexapi.exceptions.BadRequest`: when token or keypair is missing. + """ + if not self._token: + raise BadRequest('Plex token is required to register device.') + + if not self._privateKey or not self._publicKey: + raise BadRequest('ED25519 private and public keys are required to register device. ' + 'Use generateKeypair() to generate a new keypair.') + + self._registerPlexDevice() + + def refreshJWT(self, scope=None): + """ Refreshes the Plex JWT using the existing private/public keypair. + + Parameters: + scope (list[str], optional): List of scopes to request in the new token. + Default is all available scopes. + + Returns: + str: The new Plex JWT. + + Raises: + :exc:`~plexapi.exceptions.BadRequest`: when keypair is missing. + :exc:`~plexapi.exceptions.BadRequest`: when the newly obtained JWT cannot be verified. + """ + if not self._privateKey or not self._publicKey: + raise BadRequest('ED25519 private and public keys are required to refresh JWT.') + + if scope is None: + scope = self.SCOPES + + self._encodeClientJWT(scope) + self.jwtToken = self._exchangePlexJWT() + if self.verifyJWT(): + return self.jwtToken + raise BadRequest('Failed to verify newly obtained JWT.') + + def verifyJWT(self, refreshWithinDays=1): + """ Verifies the existing Plex JWT is valid and not expiring within the specified number of days. + + Parameters: + refreshWithinDays (int): Number of days before expiration to consider + the JWT invalid and in need of refresh. Default is 1 day. + """ + try: + decoded_jwt = self._decodePlexJWT() + except jwt.ExpiredSignatureError: + log.warning('Existing JWT has expired') + return False + except jwt.InvalidSignatureError: + log.warning('Existing JWT has invalid signature') + return False + except jwt.InvalidTokenError as e: + log.warning(f'Existing JWT is invalid: {e}') + return False + else: + if decoded_jwt['thumbprint'] != self._keyID: + log.warning('Existing JWT was signed with a different key') + return False + elif decoded_jwt['exp'] < int((datetime.now() + timedelta(days=refreshWithinDays)).timestamp()): + log.warning(f'Existing JWT is expiring within {refreshWithinDays} day(s)') + return False + return True + + @property + def decodedJWT(self): + """ Returns the decoded Plex JWT. """ + return self._decodePlexJWT() + + def _headers(self, **kwargs): + """ Returns dict containing base headers for all requests for Plex JWT login. """ + headers = BASE_HEADERS.copy() + if self.headers: + headers.update(self.headers) + headers.update(kwargs) + headers['Accept'] = 'application/json' + return headers + + def _query(self, url, method=None, headers=None, **kwargs): + method = method or self._session.get + log.debug('%s %s', method.__name__.upper(), url) + headers = headers or self._headers() + response = method(url, headers=headers, timeout=self._requestTimeout, **kwargs) + if not response.ok: # pragma: no cover + codename = codes.get(response.status_code)[0] + errtext = response.text.replace('\n', ' ') + raise BadRequest(f'({response.status_code}) {codename} {response.url}; {errtext}') + if response.text: + return response.json() + + def _connect(cls, url, token, session, timeout, results, i, job_is_done_event=None): """ Connects to the specified cls with url and token. Stores the connection information to results[i] in a threadsafe way. diff --git a/plexapi/utils.py b/plexapi/utils.py index bbff6a8e0..e5bccf16e 100644 --- a/plexapi/utils.py +++ b/plexapi/utils.py @@ -625,6 +625,10 @@ def base64str(text): return base64.b64encode(text.encode('utf-8')).decode('utf-8') +def base64urlEncode(data: bytes) -> str: + return base64.urlsafe_b64encode(data).rstrip(b'=').decode('utf-8') + + def deprecated(message, stacklevel=2): def decorator(func): """This is a decorator which can be used to mark functions @@ -667,6 +671,8 @@ def serialize(obj): def openOrRead(file): + if isinstance(file, bytes): + return file if hasattr(file, 'read'): return file.read() with open(file, 'rb') as f: From 3a9cc927c435cbb96ef3d8ef4358910ac69ff8f9 Mon Sep 17 00:00:00 2001 From: JonnyWong16 <9099342+JonnyWong16@users.noreply.github.com> Date: Tue, 30 Sep 2025 17:09:57 -0700 Subject: [PATCH 03/10] Add `utils.plexJWTAuth` helper function --- plexapi/utils.py | 71 +++++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 67 insertions(+), 4 deletions(-) diff --git a/plexapi/utils.py b/plexapi/utils.py index e5bccf16e..7a63010b9 100644 --- a/plexapi/utils.py +++ b/plexapi/utils.py @@ -9,6 +9,7 @@ import sys import time import unicodedata +import uuid import warnings import zipfile from collections import deque @@ -537,13 +538,17 @@ def createMyPlexDevice(headers, account, timeout=10): # pragma: no cover Parameters: headers (dict): Provide the X-Plex- headers for the new device. - A unique X-Plex-Client-Identifier is required. + A unique X-Plex-Client-Identifier is required or one will be generated if not provided. account (MyPlexAccount): The Plex account to create the device on. timeout (int): Timeout in seconds to wait for device login. """ from plexapi.myplex import MyPlexPinLogin - if 'X-Plex-Client-Identifier' not in headers: + if not headers: + client_identifier = generateUUID() + headers = {'X-Plex-Client-Identifier': client_identifier} + print(f'No X-Plex-Client-Identifier provided, generated: {client_identifier}') + elif 'X-Plex-Client-Identifier' not in headers: raise BadRequest('The X-Plex-Client-Identifier header is required.') clientIdentifier = headers['X-Plex-Client-Identifier'] @@ -561,13 +566,17 @@ def plexOAuth(headers, forwardUrl=None, timeout=120): # pragma: no cover Parameters: headers (dict): Provide the X-Plex- headers for the new device. - A unique X-Plex-Client-Identifier is required. + A unique X-Plex-Client-Identifier is required or one will be generated if not provided. forwardUrl (str, optional): The url to redirect the client to after login. timeout (int, optional): Timeout in seconds to wait for device login. Default 120 seconds. """ from plexapi.myplex import MyPlexAccount, MyPlexPinLogin - if 'X-Plex-Client-Identifier' not in headers: + if not headers: + client_identifier = generateUUID() + headers = {'X-Plex-Client-Identifier': client_identifier} + print(f'No X-Plex-Client-Identifier provided, generated: {client_identifier}') + elif 'X-Plex-Client-Identifier' not in headers: raise BadRequest('The X-Plex-Client-Identifier header is required.') pinlogin = MyPlexPinLogin(headers=headers, oauth=True) @@ -583,6 +592,56 @@ def plexOAuth(headers, forwardUrl=None, timeout=120): # pragma: no cover print('Login failed.') +def plexJWTAuth(headers=None, token=None, jwtToken=None, keypair=(None, None), scope=None): # pragma: no cover + """ Helper function for Plex JWT authentication using initial Plex OAuth. + + Parameters: + headers (dict, optional): Provide the X-Plex- headers for the new device. + A unique X-Plex-Client-Identifier is required or one will be generated if not provided. + token (str, optional): The Plex token to use for initial device registration. + If not provided, Plex OAuth will be used to obtain a token. + jwtToken (str, optional): The Plex JWT (JSON Web Token) to use for authentication. + If provided, the JWT will be validated and refreshed if necessary. + keypair (tuple, optional): A tuple of the ED25519 (privateKey, publicKey) to use for signing the JWT. + If not provided, a new keypair will be generated and saved to 'private.key' and 'public.key'. + scope (list[str], optional): List of scopes to request in the JWT. + """ + from plexapi.myplex import MyPlexAccount, MyPlexJWTAuth + + if not headers: + client_identifier = generateUUID() + headers = {'X-Plex-Client-Identifier': client_identifier} + print(f'No X-Plex-Client-Identifier provided, generated: {client_identifier}') + elif 'X-Plex-Client-Identifier' not in headers: + raise BadRequest('The X-Plex-Client-Identifier header is required.') + + if not token and not jwtToken: + account = plexOAuth(headers) + token = account.authenticationToken + + jwtauth = MyPlexJWTAuth(headers=headers, token=token, jwtToken=jwtToken, keypair=keypair) + if jwtToken and (not keypair[0] or not keypair[1]): + raise BadRequest('When providing a jwtToken, the corresponding ED25519 keypair is required.') + + if not keypair[0] or not keypair[1]: + jwtauth.generateKeypair(keyfiles=('private.key', 'public.key')) + print('Generated new ED25519 keypair and saved to "private.key" and "public.key".') + + if not jwtToken: + jwtauth.registerDevice() + jwtauth.refreshJWT(scope=scope) + print('Registered new device and obtained JWT token.') + elif not jwtauth.verifyJWT(): + jwtauth.refreshJWT(scope=scope) + print('Refreshed expired/invalid JWT token.') + + if jwtauth.jwtToken: + print('JWT authentication successful!') + return MyPlexAccount(token=jwtauth.jwtToken) + else: + print('JWT authentication failed.') + + def choose(msg, items, attr): # pragma: no cover """ Command line helper to display a list of choices, asking the user to choose one of the options. @@ -735,3 +794,7 @@ def parseXMLString(s: str): except ElementTree.ParseError: # If it fails, clean the string and try again cleaned_s = cleanXMLString(s).encode('utf-8') return ElementTree.fromstring(cleaned_s) if cleaned_s.strip() else None + + +def generateUUID() -> str: + return str(uuid.uuid4()) From 6cc3d23bb557806ec55073b465b4ac7161ed7af6 Mon Sep 17 00:00:00 2001 From: JonnyWong16 <9099342+JonnyWong16@users.noreply.github.com> Date: Tue, 30 Sep 2025 17:12:31 -0700 Subject: [PATCH 04/10] Update optional dependencies for Plex JWT authentication --- README.rst | 1 + pyproject.toml | 1 + requirements_dev.txt | 1 + 3 files changed, 3 insertions(+) diff --git a/README.rst b/README.rst index cc54942ad..32a004564 100644 --- a/README.rst +++ b/README.rst @@ -37,6 +37,7 @@ Installation & Documentation .. code-block:: python pip install plexapi[alert] # Install with dependencies required for plexapi.alert + pip install plexapi[jwt] # Install with dependencies required for Plex JWT authentication Documentation_ can be found at Read the Docs. diff --git a/pyproject.toml b/pyproject.toml index 212267f24..2f17c8300 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -17,6 +17,7 @@ dynamic = ["version"] [project.optional-dependencies] alert = ["websocket-client>=1.3.3"] +jwt = ["pyjwt[crypto]"] [project.urls] Homepage = "https://github.com/pushingkarmaorg/python-plexapi" diff --git a/requirements_dev.txt b/requirements_dev.txt index 74e555424..c3486bed8 100644 --- a/requirements_dev.txt +++ b/requirements_dev.txt @@ -4,6 +4,7 @@ #--------------------------------------------------------- flake8==7.3.0 pillow==11.3.0 +pyjwt[crypto]==2.10.1 pytest==8.4.1 pytest-cache==1.0 pytest-cov==6.2.1 From 2fd34091bfc6830cd07e0bbf349b213bd2bc754a Mon Sep 17 00:00:00 2001 From: JonnyWong16 <9099342+JonnyWong16@users.noreply.github.com> Date: Wed, 1 Oct 2025 21:21:41 -0700 Subject: [PATCH 05/10] Add method to retrieve Plex JWT without initial Plex token --- plexapi/myplex.py | 343 ++++++++++++++++++++++++++++++++++++---------- plexapi/utils.py | 45 +++--- 2 files changed, 284 insertions(+), 104 deletions(-) diff --git a/plexapi/myplex.py b/plexapi/myplex.py index 49a90f64e..7e9cec847 100644 --- a/plexapi/myplex.py +++ b/plexapi/myplex.py @@ -2,6 +2,7 @@ import copy import hashlib import html +import os import threading import time from datetime import datetime, timedelta @@ -1693,10 +1694,10 @@ class MyPlexPinLogin: :func:`~plexapi.myplex.MyPlexPinLogin.run` and then at a later time call :func:`~plexapi.myplex.MyPlexPinLogin.waitForLogin` to wait for and check the result. - The callback approach is an extension of the threaded approach and expects the developer - to pass the `callback` parameter to the call to :func:`~plexapi.myplex.MyPlexPinLogin.run`. + to pass the ``callback`` parameter to the call to :func:`~plexapi.myplex.MyPlexPinLogin.run`. The callback will be called when the thread waiting for the PIN login to succeed either finishes or expires. The parameter passed to the callback is the received authentication - token or `None` if the login expired. + token or ``None`` if the login expired. Parameters: session (requests.Session, optional): Use your own session object if you want to @@ -1707,22 +1708,20 @@ class MyPlexPinLogin: Attributes: PINS (str): 'https://plex.tv/api/v2/pins' - CHECKPINS (str): 'https://plex.tv/api/v2/pins/{pinid}' POLLINTERVAL (int): 1 finished (bool): Whether the pin login has finished or not. expired (bool): Whether the pin login has expired or not. token (str): Token retrieved through the pin login. pin (str): Pin to use for the login on https://plex.tv/link. """ - PINS = 'https://plex.tv/api/v2/pins' # get - CHECKPINS = 'https://plex.tv/api/v2/pins/{pinid}' # get + PINS = 'https://plex.tv/api/v2/pins' POLLINTERVAL = 1 def __init__(self, session=None, requestTimeout=None, headers=None, oauth=False): super(MyPlexPinLogin, self).__init__() self._session = session or requests.Session() self._requestTimeout = requestTimeout or TIMEOUT - self.headers = headers + self._customHeaders = headers self._oauth = oauth self._loginTimeout = None @@ -1731,7 +1730,6 @@ def __init__(self, session=None, requestTimeout=None, headers=None, oauth=False) self._abort = False self._id = None self._code = None - self._getCode() self.finished = False self.expired = False @@ -1771,22 +1769,24 @@ def oauthUrl(self, forwardUrl=None): return f'https://app.plex.tv/auth/#!?{urlencode(params)}' - def run(self, callback=None, timeout=None): + def run(self, callback=None, timeout=120): """ Starts the thread which monitors the PIN login state. Parameters: - callback (Callable[str]): Callback called with the received authentication token (optional). - timeout (int): Timeout in seconds waiting for the PIN login to succeed (optional). + callback (Callable[str], optional): Callback called with the received authentication token. + timeout (int, optional): Timeout in seconds to wait for user login. Default 120 seconds. Raises: - :class:`RuntimeError`: If the thread is already running. - :class:`RuntimeError`: If the PIN login for the current PIN has expired. + :exc:`RuntimeError`: If the thread is already running. + :exc:`RuntimeError`: If the PIN login for the current PIN has expired. """ if self._thread and not self._abort: raise RuntimeError('MyPlexPinLogin thread is already running') if self.expired: raise RuntimeError('MyPlexPinLogin has expired') + self._getCode() + self._loginTimeout = timeout self._callback = callback self._abort = False @@ -1797,12 +1797,8 @@ def run(self, callback=None, timeout=None): def waitForLogin(self): """ Waits for the PIN login to succeed or expire. - Parameters: - callback (Callable[str]): Callback called with the received authentication token (optional). - timeout (int): Timeout in seconds waiting for the PIN login to succeed (optional). - Returns: - `True` if the PIN login succeeded or `False` otherwise. + bool: ``True`` if the PIN login succeeded or ``False`` otherwise. """ if not self._thread or self._abort: return False @@ -1822,7 +1818,7 @@ def stop(self): self._thread.join() def checkLogin(self): - """ Returns `True` if the PIN login has succeeded. """ + """ Returns ``True`` if the PIN login has succeeded. """ if self._thread: return False @@ -1858,7 +1854,7 @@ def _checkLogin(self): if self.token: return True - url = self.CHECKPINS.format(pinid=self._id) + url = f'{self.PINS}/{self._id}' response = self._query(url) if response is None: return False @@ -1894,8 +1890,8 @@ def _pollLogin(self): def _headers(self, **kwargs): """ Returns dict containing base headers for all requests for pin login. """ headers = BASE_HEADERS.copy() - if self.headers: - headers.update(self.headers) + if self._customHeaders: + headers.update(self._customHeaders) headers.update(kwargs) return headers @@ -1911,9 +1907,11 @@ def _query(self, url, method=None, headers=None, **kwargs): return utils.parseXMLString(response.text) -class MyPlexJWTAuth: - """ MyPlex JWT authentication class to obtain a Plex JWT (JSON Web Token) which can be used in place of a Plex token - when creating a :class:`~plexapi.myplex.MyPlexAccount` instance. +class MyPlexJWTLogin: + """ + MyPlex JWT login class which supports getting a JWT for authenticating the client and + providing an access token to create a :class:`~plexapi.myplex.MyPlexAccount` instance. + This helper class supports a polling, threaded and callback approach. Requires the ``PyJWT`` with ``cryptography`` packages to be installed (``pyjwt[crypto]``). See: https://developer.plex.tv/pms/#section/API-Info/Authenticating-with-Plex @@ -1922,62 +1920,106 @@ class MyPlexJWTAuth: cache the http responses from Plex. requestTimeout (int): timeout in seconds on initial connect to plex.tv (default config.TIMEOUT). headers (dict): A dict of X-Plex headers to send with requests. - token (str): Plex token only required to register the device initially. + token (str): Plex token only required to register the device initially if not using OAuth. jwtToken (str): Existing Plex JWT to verify or refresh. keypair (tuple[str|bytes]): A tuple of the full file paths (str) to the ED25519 private and public key pair or the raw keys themselves (bytes) to use for signing the JWT. + scope (list[str]): List of scopes to request in the new token. + Default is all available scopes. Attributes: + PINS (str): 'https://plex.tv/api/v2/pins' AUTH (str): 'https://clients.plex.tv/api/v2/auth' + POLLINTERVAL (int): 1 SCOPES (list): List of all available scopes to request for the JWT. - jwtToken (str): The Plex JWT received after refreshing. + finished (bool): Whether the JWT login has finished or not. + expired (bool): Whether the JWT login has expired or not. + jwtToken (str): The Plex JWT received after login or refreshing. Example: - .. code-block:: python + .. code-block:: python + + from plexapi.myplex import MyPlexAccount, MyPlexJWTLogin - from plexapi.myplex import MyPlexAccount, MyPlexJWTAuth + # Method 1: Generate a new Plex JWT using Plex OAuth + jwtlogin = MyPlexJWTLogin( + scopes=['username', 'email', 'friendly_name'] + ) + jwtlogin.generateKeypair(keyfiles=('private.key', 'public.key')) + print(f'Login to Plex at the following url:\\n{jwtlogin.oauthUrl()}') + jwtlogin.run() + jwtlogin.waitForLogin() + jwtToken = jwtlogin.jwtToken - # Generate a new Plex JWT - jwtauth = MyPlexJWTAuth(token='2ffLuB84dqLswk9skLos') - jwtauth.generateKeypair(keyfiles=('private.key', 'public.key')) - jwtauth.registerDevice() - jwtToken = jwtauth.refreshJWT(scope=['username', 'email', 'friendly_name']) + account = MyPlexAccount(token=jwtToken) - account = MyPlexAccount(token=jwtToken) + # Method 2: Generate a new Plex JWT using an existing Plex token and keypair + jwtlogin = MyPlexJWTLogin( + token='2ffLuB84dqLswk9skLos', + keypair=('private.key', 'public.key'), + scopes=['username', 'email', 'friendly_name'] + ) + jwtlogin.registerDevice() + jwtToken = jwtlogin.refreshJWT() - # Refresh an existing Plex JWT - jwtauth = MyPlexJWTAuth(jwtToken=jwtToken, keypair=('private.key', 'public.key')) - if not jwtauth.verifyJWT(): - jwtToken = jwtauth.refreshJWT(scope=['username', 'email', 'friendly_name']) + account = MyPlexAccount(token=jwtToken) - account = MyPlexAccount(token=jwtToken) + # Refresh an existing Plex JWT + jwtlogin = MyPlexJWTLogin( + jwtToken=jwtToken, + keypair=('private.key', 'public.key'), + scopes=['username', 'email', 'friendly_name'] + ) + if not jwtlogin.verifyJWT(): + jwtToken = jwtlogin.refreshJWT() + + account = MyPlexAccount(token=jwtToken) """ + PINS = 'https://clients.plex.tv/api/v2/pins' AUTH = 'https://clients.plex.tv/api/v2/auth' + POLLINTERVAL = 1 SCOPES = ['username', 'email', 'friendly_name', 'restricted', 'anonymous', 'joinedAt'] - def __init__(self, session=None, requestTimeout=None, headers=None, token=None, jwtToken=None, keypair=(None, None)): - super(MyPlexJWTAuth, self).__init__() + def __init__(self, session=None, requestTimeout=None, headers=None, oauth=False, + token=None, jwtToken=None, keypair=(None, None), scopes=None): + super(MyPlexJWTLogin, self).__init__() self._session = session or requests.Session() self._requestTimeout = requestTimeout or TIMEOUT - self.headers = headers + self._customHeaders = headers self._token = token - self.jwtToken = jwtToken self._privateKey = utils.openOrRead(keypair[0]) if keypair[0] else None self._publicKey = utils.openOrRead(keypair[1]) if keypair[1] else None + self._scopes = scopes or self.SCOPES self._clientJWT = None + self._oauth = oauth + self._loginTimeout = None + self._callback = None + self._thread = None + self._abort = False + self._id = None + self._code = None + + self.finished = False + self.expired = False + self.jwtToken = jwtToken + if not jwt: log.warning('PyJWT package is not installed, cannot use Plex JWT login') return - def generateKeypair(self, keyfiles=(None, None)): + def generateKeypair(self, keyfiles=(None, None), overwrite=False): """ Generates a new ED25519 private/public keypair for signing the JWT and saves them to files. Requires the ``cryptography`` package to be installed. Parameters: keyfiles (tuple[str]): A tuple of the full file paths to write the private and public keypair to. + overwrite (bool): Set to True to overwrite existing keypair files. Default is False. + + Raises: + :exc:`FileExistsError`: when keypair files already exist and overwrite is False. """ if not cryptography: log.warning('Cryptography package is not installed, cannot generate ED25519 keypair') @@ -1996,6 +2038,9 @@ def generateKeypair(self, keyfiles=(None, None)): ) if keyfiles[0] and keyfiles[1]: + if not overwrite and (os.path.exists(keyfiles[0]) or os.path.exists(keyfiles[1])): + raise FileExistsError('Keypair files already exist, set overwrite=True to overwrite them.') + with open(keyfiles[0], 'wb') as privateFile, open(keyfiles[1], 'wb') as publicFile: privateFile.write(self._privateKey) publicFile.write(self._publicKey) @@ -2036,24 +2081,21 @@ def _publicJWK(self): 'kid': self._keyID, }) - def _encodeClientJWT(self, scope): - """ Encodes the client JWT using the private JWK. - - Parameters: - scope (list[str]): List of scopes to request in the token. - """ + def _encodeClientJWT(self): + """ Returns the encoded client JWT using the private JWK. """ payload = { - 'nonce': self._getPlexNonce(), - 'scope': ','.join(scope), + 'scope': ','.join(self._scopes), 'aud': 'plex.tv', 'iss': self._clientIdentifier, 'iat': int(datetime.now().timestamp()), 'exp': int((datetime.now() + timedelta(minutes=5)).timestamp()), } + if self._token: + payload['nonce'] = self._getPlexNonce() headers = { 'kid': self._keyID } - self._clientJWT = jwt.encode( + return jwt.encode( payload, key=self._privateJWK, algorithm='EdDSA', @@ -2061,7 +2103,7 @@ def _encodeClientJWT(self, scope): ) def _decodePlexJWT(self): - """ Decodes and verifies the Plex JWT using the Plex public JWK. """ + """ Returns the decoded and verified Plex JWT using the Plex public JWK. """ return jwt.decode( self.jwtToken, key=jwt.PyJWK.from_dict(self._getPlexPublicJWK()), @@ -2073,6 +2115,11 @@ def _decodePlexJWT(self): issuer='plex.tv', ) + @property + def decodedJWT(self): + """ Returns the decoded Plex JWT. """ + return self._decodePlexJWT() + def _registerPlexDevice(self): """ Registers the public JWK with Plex. """ url = f'{self.AUTH}/jwk' @@ -2101,7 +2148,7 @@ def _getPlexPublicJWK(self): def registerDevice(self): """ Registers the device with Plex using the provided token and private/public keypair. - This must be done once before the Plex JWT can be refreshed. + This must be done once if OAuth was not used before the Plex JWT can be refreshed. Raises: :exc:`~plexapi.exceptions.BadRequest`: when token or keypair is missing. @@ -2115,13 +2162,9 @@ def registerDevice(self): self._registerPlexDevice() - def refreshJWT(self, scope=None): + def refreshJWT(self): """ Refreshes the Plex JWT using the existing private/public keypair. - Parameters: - scope (list[str], optional): List of scopes to request in the new token. - Default is all available scopes. - Returns: str: The new Plex JWT. @@ -2132,10 +2175,7 @@ def refreshJWT(self, scope=None): if not self._privateKey or not self._publicKey: raise BadRequest('ED25519 private and public keys are required to refresh JWT.') - if scope is None: - scope = self.SCOPES - - self._encodeClientJWT(scope) + self._clientJWT = self._encodeClientJWT() self.jwtToken = self._exchangePlexJWT() if self.verifyJWT(): return self.jwtToken @@ -2149,7 +2189,7 @@ def verifyJWT(self, refreshWithinDays=1): the JWT invalid and in need of refresh. Default is 1 day. """ try: - decoded_jwt = self._decodePlexJWT() + decodedJWT = self.decodedJWT except jwt.ExpiredSignatureError: log.warning('Existing JWT has expired') return False @@ -2160,26 +2200,180 @@ def verifyJWT(self, refreshWithinDays=1): log.warning(f'Existing JWT is invalid: {e}') return False else: - if decoded_jwt['thumbprint'] != self._keyID: + if decodedJWT['thumbprint'] != self._keyID: log.warning('Existing JWT was signed with a different key') return False - elif decoded_jwt['exp'] < int((datetime.now() + timedelta(days=refreshWithinDays)).timestamp()): + elif decodedJWT['exp'] < int((datetime.now() + timedelta(days=refreshWithinDays)).timestamp()): log.warning(f'Existing JWT is expiring within {refreshWithinDays} day(s)') return False return True @property - def decodedJWT(self): - """ Returns the decoded Plex JWT. """ - return self._decodePlexJWT() + def pin(self): + """ Return the 4 character PIN used for linking a device at + https://plex.tv/link. + """ + if self._oauth: + raise BadRequest('Cannot use PIN for Plex OAuth login') + return self._code + + def oauthUrl(self, forwardUrl=None): + """ Return the Plex OAuth url for login. + + Parameters: + forwardUrl (str, optional): The url to redirect the client to after login. + """ + if not self._oauth: + raise BadRequest('Must use "MyPlexJWTLogin(oauth=True)" for Plex OAuth login.') + + headers = self._headers() + params = { + 'clientID': headers['X-Plex-Client-Identifier'], + 'context[device][product]': headers['X-Plex-Product'], + 'context[device][version]': headers['X-Plex-Version'], + 'context[device][platform]': headers['X-Plex-Platform'], + 'context[device][platformVersion]': headers['X-Plex-Platform-Version'], + 'context[device][device]': headers['X-Plex-Device'], + 'context[device][deviceName]': headers['X-Plex-Device-Name'], + 'code': self._code + } + if forwardUrl: + params['forwardUrl'] = forwardUrl + + return f'https://app.plex.tv/auth/#!?{urlencode(params)}' + + def run(self, callback=None, timeout=120): + """ Starts the thread which monitors the PIN login state. + + Parameters: + callback (Callable[str], optional): Callback called with the received authentication token. + timeout (int, optional): Timeout in seconds to wait for user login. Default 120 seconds. + + Raises: + :exc:`RuntimeError`: If the thread is already running. + :exc:`RuntimeError`: If the PIN login for the current PIN has expired. + """ + if self._thread and not self._abort: + raise RuntimeError('MyPlexJWTLogin thread is already running') + if self.expired: + raise RuntimeError('MyPlexJWTLogin has expired') + + self._getCode() + self._clientJWT = self._encodeClientJWT() + + self._loginTimeout = timeout + self._callback = callback + self._abort = False + self.finished = False + self._thread = threading.Thread(target=self._pollLogin, name='plexapi.myplex.MyPlexJWTLogin') + self._thread.start() + + def waitForLogin(self): + """ Waits for the user login to succeed or expire. + + Returns: + bool: ``True`` if the user login succeeded or ``False`` otherwise. + """ + if not self._thread or self._abort: + return False + + self._thread.join() + if self.expired or not self.jwtToken: + return False + + return True + + def stop(self): + """ Stops the thread monitoring the user login state. """ + if not self._thread or self._abort: + return + + self._abort = True + self._thread.join() + + def checkLogin(self): + """ Returns ``True`` if the user login has succeeded. """ + if self._thread: + return False + + try: + return self._checkLogin() + except Exception: + self.expired = True + self.finished = True + + return False + + def _getCode(self): + url = self.PINS + + if self._oauth: + body = { + 'jwk': self._publicJWK._jwk_data, + 'strong': True, + } + else: + body = { + 'jwk': self._publicJWK._jwk_data, + } + + response = self._query(url, self._session.post, json=body) + if response is None: + return None + + self._id = response.attrib.get('id') + self._code = response.attrib.get('code') + + return self._code + + def _checkLogin(self): + if not self._id: + return False + + if self.jwtToken: + return True + + url = f'{self.PINS}/{self._id}' + params = {'deviceJWT': self._clientJWT} + response = self._query(url, params=params) + if response is None: + return False + + token = response.attrib.get('authToken') + if not token: + return False + + self.jwtToken = token + self.finished = True + return True + + def _pollLogin(self): + try: + start = time.time() + while not self._abort and (not self._loginTimeout or (time.time() - start) < self._loginTimeout): + try: + result = self._checkLogin() + except Exception: + self.expired = True + break + + if result: + break + + time.sleep(self.POLLINTERVAL) + + if self.jwtToken and self._callback: + self._callback(self.jwtToken) + finally: + self.finished = True def _headers(self, **kwargs): """ Returns dict containing base headers for all requests for Plex JWT login. """ headers = BASE_HEADERS.copy() - if self.headers: - headers.update(self.headers) + if self._customHeaders: + headers.update(self._customHeaders) headers.update(kwargs) - headers['Accept'] = 'application/json' + # headers['Accept'] = 'application/json' return headers def _query(self, url, method=None, headers=None, **kwargs): @@ -2191,8 +2385,7 @@ def _query(self, url, method=None, headers=None, **kwargs): codename = codes.get(response.status_code)[0] errtext = response.text.replace('\n', ' ') raise BadRequest(f'({response.status_code}) {codename} {response.url}; {errtext}') - if response.text: - return response.json() + return utils.parseXMLString(response.text) def _connect(cls, url, token, session, timeout, results, i, job_is_done_event=None): diff --git a/plexapi/utils.py b/plexapi/utils.py index 7a63010b9..f32654876 100644 --- a/plexapi/utils.py +++ b/plexapi/utils.py @@ -533,7 +533,7 @@ def getMyPlexAccount(opts=None): # pragma: no cover return MyPlexAccount(username, password) -def createMyPlexDevice(headers, account, timeout=10): # pragma: no cover +def createMyPlexDevice(headers=None, account=None, timeout=10): # pragma: no cover """ Helper function to create a new MyPlexDevice. Returns a new MyPlexDevice instance. Parameters: @@ -568,7 +568,7 @@ def plexOAuth(headers, forwardUrl=None, timeout=120): # pragma: no cover headers (dict): Provide the X-Plex- headers for the new device. A unique X-Plex-Client-Identifier is required or one will be generated if not provided. forwardUrl (str, optional): The url to redirect the client to after login. - timeout (int, optional): Timeout in seconds to wait for device login. Default 120 seconds. + timeout (int, optional): Timeout in seconds to wait for user login. Default 120 seconds. """ from plexapi.myplex import MyPlexAccount, MyPlexPinLogin @@ -580,9 +580,8 @@ def plexOAuth(headers, forwardUrl=None, timeout=120): # pragma: no cover raise BadRequest('The X-Plex-Client-Identifier header is required.') pinlogin = MyPlexPinLogin(headers=headers, oauth=True) - print('Login to Plex at the following url:') - print(pinlogin.oauthUrl(forwardUrl)) pinlogin.run(timeout=timeout) + print(f'Login to Plex at the following url:\n{pinlogin.oauthUrl(forwardUrl=forwardUrl)}') pinlogin.waitForLogin() if pinlogin.token: @@ -592,21 +591,19 @@ def plexOAuth(headers, forwardUrl=None, timeout=120): # pragma: no cover print('Login failed.') -def plexJWTAuth(headers=None, token=None, jwtToken=None, keypair=(None, None), scope=None): # pragma: no cover - """ Helper function for Plex JWT authentication using initial Plex OAuth. +def plexJWTAuth(headers=None, forwardUrl=None, timeout=120, keypair=(None, None), scopes=None): # pragma: no cover + """ Helper function for Plex JWT authentication using Plex OAuth. Returns a new MyPlexAccount instance. Parameters: headers (dict, optional): Provide the X-Plex- headers for the new device. A unique X-Plex-Client-Identifier is required or one will be generated if not provided. - token (str, optional): The Plex token to use for initial device registration. - If not provided, Plex OAuth will be used to obtain a token. - jwtToken (str, optional): The Plex JWT (JSON Web Token) to use for authentication. - If provided, the JWT will be validated and refreshed if necessary. + forwardUrl (str, optional): The url to redirect the client to after login. + timeout (int, optional): Timeout in seconds to wait for user login. Default 120 seconds. keypair (tuple, optional): A tuple of the ED25519 (privateKey, publicKey) to use for signing the JWT. If not provided, a new keypair will be generated and saved to 'private.key' and 'public.key'. - scope (list[str], optional): List of scopes to request in the JWT. + scopes (list[str], optional): List of scopes to request in the JWT. """ - from plexapi.myplex import MyPlexAccount, MyPlexJWTAuth + from plexapi.myplex import MyPlexAccount, MyPlexJWTLogin if not headers: client_identifier = generateUUID() @@ -615,29 +612,19 @@ def plexJWTAuth(headers=None, token=None, jwtToken=None, keypair=(None, None), s elif 'X-Plex-Client-Identifier' not in headers: raise BadRequest('The X-Plex-Client-Identifier header is required.') - if not token and not jwtToken: - account = plexOAuth(headers) - token = account.authenticationToken - - jwtauth = MyPlexJWTAuth(headers=headers, token=token, jwtToken=jwtToken, keypair=keypair) - if jwtToken and (not keypair[0] or not keypair[1]): - raise BadRequest('When providing a jwtToken, the corresponding ED25519 keypair is required.') + jwtlogin = MyPlexJWTLogin(headers=headers, oauth=True, keypair=keypair, scopes=scopes) if not keypair[0] or not keypair[1]: - jwtauth.generateKeypair(keyfiles=('private.key', 'public.key')) + jwtlogin.generateKeypair(keyfiles=('private.key', 'public.key')) print('Generated new ED25519 keypair and saved to "private.key" and "public.key".') - if not jwtToken: - jwtauth.registerDevice() - jwtauth.refreshJWT(scope=scope) - print('Registered new device and obtained JWT token.') - elif not jwtauth.verifyJWT(): - jwtauth.refreshJWT(scope=scope) - print('Refreshed expired/invalid JWT token.') + jwtlogin.run(timeout=timeout) + print(f'Login to Plex at the following url:\n{jwtlogin.oauthUrl(forwardUrl=forwardUrl)}') + jwtlogin.waitForLogin() - if jwtauth.jwtToken: + if jwtlogin.jwtToken: print('JWT authentication successful!') - return MyPlexAccount(token=jwtauth.jwtToken) + return MyPlexAccount(token=jwtlogin.jwtToken) else: print('JWT authentication failed.') From 736094162bdc7811378dd4f779d32c2058740551 Mon Sep 17 00:00:00 2001 From: JonnyWong16 <9099342+JonnyWong16@users.noreply.github.com> Date: Wed, 1 Oct 2025 23:20:56 -0700 Subject: [PATCH 06/10] Update doc strings --- plexapi/myplex.py | 79 +++++++++++++++++++++++++++++++++++------------ 1 file changed, 59 insertions(+), 20 deletions(-) diff --git a/plexapi/myplex.py b/plexapi/myplex.py index 7e9cec847..a81e603ab 100644 --- a/plexapi/myplex.py +++ b/plexapi/myplex.py @@ -1683,9 +1683,11 @@ def syncItems(self): class MyPlexPinLogin: """ - MyPlex PIN login class which supports getting the four character PIN which the user must - enter on https://plex.tv/link to authenticate the client and provide an access token to - create a :class:`~plexapi.myplex.MyPlexAccount` instance. + MyPlex PIN login class which supports getting a token for authenticating the client and + providing an access token to create a :class:`~plexapi.myplex.MyPlexAccount` instance. + The login can be done using the four character PIN which the user must enter at + https://plex.tv/link or using Plex OAuth. + This helper class supports a polling, threaded and callback approach. - The polling approach expects the developer to periodically check if the PIN login was @@ -1702,17 +1704,32 @@ class MyPlexPinLogin: Parameters: session (requests.Session, optional): Use your own session object if you want to cache the http responses from Plex. - requestTimeout (int): timeout in seconds on initial connect to plex.tv (default config.TIMEOUT). - headers (dict): A dict of X-Plex headers to send with requests. - oauth (bool): True to use Plex OAuth instead of PIN login. + requestTimeout (int, optional): Timeout in seconds on initial connect to plex.tv (default config.TIMEOUT). + headers (dict, optional): A dict of X-Plex headers to send with requests. + oauth (bool, optional): True to use Plex OAuth instead of PIN login. Attributes: PINS (str): 'https://plex.tv/api/v2/pins' POLLINTERVAL (int): 1 + pin (str): Four character PIN to use for the login at https://plex.tv/link. finished (bool): Whether the pin login has finished or not. expired (bool): Whether the pin login has expired or not. - token (str): Token retrieved through the pin login. - pin (str): Pin to use for the login on https://plex.tv/link. + token (str): Token retrieved after login. + + Example: + + .. code-block:: python + + from plexapi.myplex import MyPlexAccount, MyPlexPinLogin + + pinlogin = MyPlexPinLogin() + pinlogin.run() + print(f'Login to Plex at the following url:\\n{pinlogin.oauthUrl()}') + pinlogin.waitForLogin() + token = pinlogin.token + + account = MyPlexAccount(token=token) + """ PINS = 'https://plex.tv/api/v2/pins' POLLINTERVAL = 1 @@ -1737,7 +1754,7 @@ def __init__(self, session=None, requestTimeout=None, headers=None, oauth=False) @property def pin(self): - """ Return the 4 character PIN used for linking a device at + """ Return the four character PIN used for linking a device at https://plex.tv/link. """ if self._oauth: @@ -1911,20 +1928,40 @@ class MyPlexJWTLogin: """ MyPlex JWT login class which supports getting a JWT for authenticating the client and providing an access token to create a :class:`~plexapi.myplex.MyPlexAccount` instance. - This helper class supports a polling, threaded and callback approach. - Requires the ``PyJWT`` with ``cryptography`` packages to be installed (``pyjwt[crypto]``). + The login can be done using the four character PIN which the user must enter at + https://plex.tv/link or using Plex OAuth. + This class can also be used to refresh or verify an existing JWT. + See: https://developer.plex.tv/pms/#section/API-Info/Authenticating-with-Plex + Using this class requires the ``PyJWT`` with ``cryptography`` packages to be installed + (``pyjwt[crypto]``). + + This helper class supports a polling, threaded and callback approach. + + - The polling approach expects the developer to periodically check if the PIN login was + successful using :func:`~plexapi.myplex.MyPlexJWTLogin.checkLogin`. + - The threaded approach expects the developer to call + :func:`~plexapi.myplex.MyPlexJWTLogin.run` and then at a later time call + :func:`~plexapi.myplex.MyPlexJWTLogin.waitForLogin` to wait for and check the result. + - The callback approach is an extension of the threaded approach and expects the developer + to pass the ``callback`` parameter to the call to :func:`~plexapi.myplex.MyPlexJWTLogin.run`. + The callback will be called when the thread waiting for the PIN login to succeed either + finishes or expires. The parameter passed to the callback is the received authentication + token or ``None`` if the login expired. + Parameters: session (requests.Session, optional): Use your own session object if you want to cache the http responses from Plex. - requestTimeout (int): timeout in seconds on initial connect to plex.tv (default config.TIMEOUT). - headers (dict): A dict of X-Plex headers to send with requests. - token (str): Plex token only required to register the device initially if not using OAuth. - jwtToken (str): Existing Plex JWT to verify or refresh. - keypair (tuple[str|bytes]): A tuple of the full file paths (str) to the ED25519 private and public key pair - or the raw keys themselves (bytes) to use for signing the JWT. - scope (list[str]): List of scopes to request in the new token. + requestTimeout (int, optional): Timeout in seconds on initial connect to plex.tv (default config.TIMEOUT). + headers (dict, optional): A dict of X-Plex headers to send with requests. + oauth (bool, optional): True to use Plex OAuth instead of PIN login. + token (str, optional): Plex token only required to register the device initially if not using OAuth. + jwtToken (str, optional): Existing Plex JWT to verify or refresh. + keypair (tuple[str|bytes], optional): A tuple of the full file paths (str) to the ED25519 private and public + key pair or the raw keys themselves (bytes) to use for signing the JWT. + Use :func:`~plexapi.myplex.MyPlexJWTLogin.generateKeypair` to generate a new keypair if not provided. + scope (list[str], optional): List of scopes to request in the new token. Default is all available scopes. Attributes: @@ -1932,9 +1969,11 @@ class MyPlexJWTLogin: AUTH (str): 'https://clients.plex.tv/api/v2/auth' POLLINTERVAL (int): 1 SCOPES (list): List of all available scopes to request for the JWT. + pin (str): Four character PIN to use for the login at https://plex.tv/link. finished (bool): Whether the JWT login has finished or not. expired (bool): Whether the JWT login has expired or not. jwtToken (str): The Plex JWT received after login or refreshing. + decodedJWT (dict): The decoded Plex JWT payload. Example: @@ -1947,8 +1986,8 @@ class MyPlexJWTLogin: scopes=['username', 'email', 'friendly_name'] ) jwtlogin.generateKeypair(keyfiles=('private.key', 'public.key')) - print(f'Login to Plex at the following url:\\n{jwtlogin.oauthUrl()}') jwtlogin.run() + print(f'Login to Plex at the following url:\\n{jwtlogin.oauthUrl()}') jwtlogin.waitForLogin() jwtToken = jwtlogin.jwtToken @@ -2210,7 +2249,7 @@ def verifyJWT(self, refreshWithinDays=1): @property def pin(self): - """ Return the 4 character PIN used for linking a device at + """ Return the four character PIN used for linking a device at https://plex.tv/link. """ if self._oauth: From 077a5cfe205a5230de8cb672aebee55f3d5e336c Mon Sep 17 00:00:00 2001 From: JonnyWong16 <9099342+JonnyWong16@users.noreply.github.com> Date: Wed, 8 Oct 2025 14:42:51 -0700 Subject: [PATCH 07/10] Switch MyPlexJWTLogin to JSON response --- plexapi/myplex.py | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/plexapi/myplex.py b/plexapi/myplex.py index a81e603ab..69da72659 100644 --- a/plexapi/myplex.py +++ b/plexapi/myplex.py @@ -1983,6 +1983,7 @@ class MyPlexJWTLogin: # Method 1: Generate a new Plex JWT using Plex OAuth jwtlogin = MyPlexJWTLogin( + oauth=True, scopes=['username', 'email', 'friendly_name'] ) jwtlogin.generateKeypair(keyfiles=('private.key', 'public.key')) @@ -2123,14 +2124,13 @@ def _publicJWK(self): def _encodeClientJWT(self): """ Returns the encoded client JWT using the private JWK. """ payload = { + 'nonce': self._getPlexNonce(), 'scope': ','.join(self._scopes), 'aud': 'plex.tv', 'iss': self._clientIdentifier, 'iat': int(datetime.now().timestamp()), 'exp': int((datetime.now() + timedelta(minutes=5)).timestamp()), } - if self._token: - payload['nonce'] = self._getPlexNonce() headers = { 'kid': self._keyID } @@ -2360,8 +2360,8 @@ def _getCode(self): if response is None: return None - self._id = response.attrib.get('id') - self._code = response.attrib.get('code') + self._id = response.get('id') + self._code = response.get('code') return self._code @@ -2378,7 +2378,7 @@ def _checkLogin(self): if response is None: return False - token = response.attrib.get('authToken') + token = response.get('authToken') if not token: return False @@ -2412,7 +2412,7 @@ def _headers(self, **kwargs): if self._customHeaders: headers.update(self._customHeaders) headers.update(kwargs) - # headers['Accept'] = 'application/json' + headers['Accept'] = 'application/json' return headers def _query(self, url, method=None, headers=None, **kwargs): @@ -2424,6 +2424,8 @@ def _query(self, url, method=None, headers=None, **kwargs): codename = codes.get(response.status_code)[0] errtext = response.text.replace('\n', ' ') raise BadRequest(f'({response.status_code}) {codename} {response.url}; {errtext}') + if 'application/json' in response.headers.get('Content-Type', ''): + return response.json() return utils.parseXMLString(response.text) From a9cc935777b84361908ec914db7dcf990bb462f5 Mon Sep 17 00:00:00 2001 From: JonnyWong16 <9099342+JonnyWong16@users.noreply.github.com> Date: Sat, 1 Nov 2025 21:39:28 -0700 Subject: [PATCH 08/10] Remove cached data properties --- plexapi/myplex.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/plexapi/myplex.py b/plexapi/myplex.py index 69da72659..74565d2dd 100644 --- a/plexapi/myplex.py +++ b/plexapi/myplex.py @@ -2085,18 +2085,18 @@ def generateKeypair(self, keyfiles=(None, None), overwrite=False): privateFile.write(self._privateKey) publicFile.write(self._publicKey) - @cached_data_property + @property def _clientIdentifier(self): """ Returns the client identifier from the headers. """ headers = self._headers() return headers['X-Plex-Client-Identifier'] - @cached_data_property + @property def _keyID(self): """ Returns the key ID (thumbprint) for the ED25519 keypair. """ return hashlib.sha256(self._privateKey + self._publicKey).hexdigest() - @cached_data_property + @property def _privateJWK(self): """ Returns the private JWK (JSON Web Key) for the ED25519 keypair.""" return jwt.PyJWK.from_dict({ @@ -2109,7 +2109,7 @@ def _privateJWK(self): 'kid': self._keyID, }) - @cached_data_property + @property def _publicJWK(self): """ Returns the public JWK (JSON Web Key) for the ED25519 keypair.""" return jwt.PyJWK.from_dict({ From 8e31a24505157e409cae11acbf78c2cbbc5cb104 Mon Sep 17 00:00:00 2001 From: JonnyWong16 <9099342+JonnyWong16@users.noreply.github.com> Date: Sat, 1 Nov 2025 21:42:53 -0700 Subject: [PATCH 09/10] Guard against None keypair --- plexapi/myplex.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/plexapi/myplex.py b/plexapi/myplex.py index 74565d2dd..55f1fea1a 100644 --- a/plexapi/myplex.py +++ b/plexapi/myplex.py @@ -2094,6 +2094,8 @@ def _clientIdentifier(self): @property def _keyID(self): """ Returns the key ID (thumbprint) for the ED25519 keypair. """ + if not self._privateKey or not self._publicKey: + return None return hashlib.sha256(self._privateKey + self._publicKey).hexdigest() @property From 3d9267e45c88a63821100c4d23b7415d95d96d80 Mon Sep 17 00:00:00 2001 From: JonnyWong16 <9099342+JonnyWong16@users.noreply.github.com> Date: Sat, 1 Nov 2025 21:46:14 -0700 Subject: [PATCH 10/10] Use UTC timezone for jwt --- plexapi/myplex.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/plexapi/myplex.py b/plexapi/myplex.py index 55f1fea1a..14dc5486d 100644 --- a/plexapi/myplex.py +++ b/plexapi/myplex.py @@ -5,7 +5,7 @@ import os import threading import time -from datetime import datetime, timedelta +from datetime import datetime, timedelta, timezone from urllib.parse import parse_qsl, urlencode, urlsplit, urlunsplit import requests @@ -2130,8 +2130,8 @@ def _encodeClientJWT(self): 'scope': ','.join(self._scopes), 'aud': 'plex.tv', 'iss': self._clientIdentifier, - 'iat': int(datetime.now().timestamp()), - 'exp': int((datetime.now() + timedelta(minutes=5)).timestamp()), + 'iat': int(datetime.now(timezone.utc).timestamp()), + 'exp': int((datetime.now(timezone.utc) + timedelta(minutes=5)).timestamp()), } headers = { 'kid': self._keyID @@ -2244,7 +2244,7 @@ def verifyJWT(self, refreshWithinDays=1): if decodedJWT['thumbprint'] != self._keyID: log.warning('Existing JWT was signed with a different key') return False - elif decodedJWT['exp'] < int((datetime.now() + timedelta(days=refreshWithinDays)).timestamp()): + elif decodedJWT['exp'] < int((datetime.now(timezone.utc) + timedelta(days=refreshWithinDays)).timestamp()): log.warning(f'Existing JWT is expiring within {refreshWithinDays} day(s)') return False return True