diff --git a/.gitignore b/.gitignore index d55a5c2e2..636bd8c5f 100644 --- a/.gitignore +++ b/.gitignore @@ -27,6 +27,8 @@ pyvenv.cfg MANIFEST venv/ .venv/ +private.key +public.key # path for the test lib. plex/ diff --git a/README.rst b/README.rst index cc54942ad..32a004564 100644 --- a/README.rst +++ b/README.rst @@ -37,6 +37,7 @@ Installation & Documentation .. code-block:: python pip install plexapi[alert] # Install with dependencies required for plexapi.alert + pip install plexapi[jwt] # Install with dependencies required for Plex JWT authentication Documentation_ can be found at Read the Docs. diff --git a/plexapi/myplex.py b/plexapi/myplex.py index 1821a873a..14dc5486d 100644 --- a/plexapi/myplex.py +++ b/plexapi/myplex.py @@ -1,12 +1,27 @@ # -*- coding: utf-8 -*- import copy +import hashlib import html +import os import threading import time +from datetime import datetime, timedelta, timezone from urllib.parse import parse_qsl, urlencode, urlsplit, urlunsplit import requests +try: + import cryptography + from cryptography.hazmat.primitives import serialization + from cryptography.hazmat.primitives.asymmetric import ed25519 +except ImportError: # pragma: no cover + cryptography = None + +try: + import jwt +except ImportError: # pragma: no cover + jwt = None + from plexapi import (BASE_HEADERS, CONFIG, TIMEOUT, X_PLEX_ENABLE_FAST_CONNECT, X_PLEX_IDENTIFIER, log, logfilter, utils) from plexapi.base import PlexObject, cached_data_property @@ -1668,9 +1683,11 @@ def syncItems(self): class MyPlexPinLogin: """ - MyPlex PIN login class which supports getting the four character PIN which the user must - enter on https://plex.tv/link to authenticate the client and provide an access token to - create a :class:`~plexapi.myplex.MyPlexAccount` instance. + MyPlex PIN login class which supports getting a token for authenticating the client and + providing an access token to create a :class:`~plexapi.myplex.MyPlexAccount` instance. + The login can be done using the four character PIN which the user must enter at + https://plex.tv/link or using Plex OAuth. + This helper class supports a polling, threaded and callback approach. - The polling approach expects the developer to periodically check if the PIN login was @@ -1679,36 +1696,49 @@ class MyPlexPinLogin: :func:`~plexapi.myplex.MyPlexPinLogin.run` and then at a later time call :func:`~plexapi.myplex.MyPlexPinLogin.waitForLogin` to wait for and check the result. - The callback approach is an extension of the threaded approach and expects the developer - to pass the `callback` parameter to the call to :func:`~plexapi.myplex.MyPlexPinLogin.run`. + to pass the ``callback`` parameter to the call to :func:`~plexapi.myplex.MyPlexPinLogin.run`. The callback will be called when the thread waiting for the PIN login to succeed either finishes or expires. The parameter passed to the callback is the received authentication - token or `None` if the login expired. + token or ``None`` if the login expired. Parameters: session (requests.Session, optional): Use your own session object if you want to - cache the http responses from PMS - requestTimeout (int): timeout in seconds on initial connect to plex.tv (default config.TIMEOUT). - headers (dict): A dict of X-Plex headers to send with requests. - oauth (bool): True to use Plex OAuth instead of PIN login. + cache the http responses from Plex. + requestTimeout (int, optional): Timeout in seconds on initial connect to plex.tv (default config.TIMEOUT). + headers (dict, optional): A dict of X-Plex headers to send with requests. + oauth (bool, optional): True to use Plex OAuth instead of PIN login. Attributes: PINS (str): 'https://plex.tv/api/v2/pins' - CHECKPINS (str): 'https://plex.tv/api/v2/pins/{pinid}' POLLINTERVAL (int): 1 + pin (str): Four character PIN to use for the login at https://plex.tv/link. finished (bool): Whether the pin login has finished or not. expired (bool): Whether the pin login has expired or not. - token (str): Token retrieved through the pin login. - pin (str): Pin to use for the login on https://plex.tv/link. + token (str): Token retrieved after login. + + Example: + + .. code-block:: python + + from plexapi.myplex import MyPlexAccount, MyPlexPinLogin + + pinlogin = MyPlexPinLogin() + pinlogin.run() + print(f'Login to Plex at the following url:\\n{pinlogin.oauthUrl()}') + pinlogin.waitForLogin() + token = pinlogin.token + + account = MyPlexAccount(token=token) + """ - PINS = 'https://plex.tv/api/v2/pins' # get - CHECKPINS = 'https://plex.tv/api/v2/pins/{pinid}' # get + PINS = 'https://plex.tv/api/v2/pins' POLLINTERVAL = 1 def __init__(self, session=None, requestTimeout=None, headers=None, oauth=False): super(MyPlexPinLogin, self).__init__() self._session = session or requests.Session() self._requestTimeout = requestTimeout or TIMEOUT - self.headers = headers + self._customHeaders = headers self._oauth = oauth self._loginTimeout = None @@ -1717,7 +1747,6 @@ def __init__(self, session=None, requestTimeout=None, headers=None, oauth=False) self._abort = False self._id = None self._code = None - self._getCode() self.finished = False self.expired = False @@ -1725,7 +1754,7 @@ def __init__(self, session=None, requestTimeout=None, headers=None, oauth=False) @property def pin(self): - """ Return the 4 character PIN used for linking a device at + """ Return the four character PIN used for linking a device at https://plex.tv/link. """ if self._oauth: @@ -1757,22 +1786,24 @@ def oauthUrl(self, forwardUrl=None): return f'https://app.plex.tv/auth/#!?{urlencode(params)}' - def run(self, callback=None, timeout=None): + def run(self, callback=None, timeout=120): """ Starts the thread which monitors the PIN login state. Parameters: - callback (Callable[str]): Callback called with the received authentication token (optional). - timeout (int): Timeout in seconds waiting for the PIN login to succeed (optional). + callback (Callable[str], optional): Callback called with the received authentication token. + timeout (int, optional): Timeout in seconds to wait for user login. Default 120 seconds. Raises: - :class:`RuntimeError`: If the thread is already running. - :class:`RuntimeError`: If the PIN login for the current PIN has expired. + :exc:`RuntimeError`: If the thread is already running. + :exc:`RuntimeError`: If the PIN login for the current PIN has expired. """ if self._thread and not self._abort: raise RuntimeError('MyPlexPinLogin thread is already running') if self.expired: raise RuntimeError('MyPlexPinLogin has expired') + self._getCode() + self._loginTimeout = timeout self._callback = callback self._abort = False @@ -1783,12 +1814,8 @@ def run(self, callback=None, timeout=None): def waitForLogin(self): """ Waits for the PIN login to succeed or expire. - Parameters: - callback (Callable[str]): Callback called with the received authentication token (optional). - timeout (int): Timeout in seconds waiting for the PIN login to succeed (optional). - Returns: - `True` if the PIN login succeeded or `False` otherwise. + bool: ``True`` if the PIN login succeeded or ``False`` otherwise. """ if not self._thread or self._abort: return False @@ -1808,7 +1835,7 @@ def stop(self): self._thread.join() def checkLogin(self): - """ Returns `True` if the PIN login has succeeded. """ + """ Returns ``True`` if the PIN login has succeeded. """ if self._thread: return False @@ -1844,7 +1871,7 @@ def _checkLogin(self): if self.token: return True - url = self.CHECKPINS.format(pinid=self._id) + url = f'{self.PINS}/{self._id}' response = self._query(url) if response is None: return False @@ -1880,9 +1907,514 @@ def _pollLogin(self): def _headers(self, **kwargs): """ Returns dict containing base headers for all requests for pin login. """ headers = BASE_HEADERS.copy() - if self.headers: - headers.update(self.headers) + if self._customHeaders: + headers.update(self._customHeaders) + headers.update(kwargs) + return headers + + def _query(self, url, method=None, headers=None, **kwargs): + method = method or self._session.get + log.debug('%s %s', method.__name__.upper(), url) + headers = headers or self._headers() + response = method(url, headers=headers, timeout=self._requestTimeout, **kwargs) + if not response.ok: # pragma: no cover + codename = codes.get(response.status_code)[0] + errtext = response.text.replace('\n', ' ') + raise BadRequest(f'({response.status_code}) {codename} {response.url}; {errtext}') + return utils.parseXMLString(response.text) + + +class MyPlexJWTLogin: + """ + MyPlex JWT login class which supports getting a JWT for authenticating the client and + providing an access token to create a :class:`~plexapi.myplex.MyPlexAccount` instance. + The login can be done using the four character PIN which the user must enter at + https://plex.tv/link or using Plex OAuth. + This class can also be used to refresh or verify an existing JWT. + + See: https://developer.plex.tv/pms/#section/API-Info/Authenticating-with-Plex + + Using this class requires the ``PyJWT`` with ``cryptography`` packages to be installed + (``pyjwt[crypto]``). + + This helper class supports a polling, threaded and callback approach. + + - The polling approach expects the developer to periodically check if the PIN login was + successful using :func:`~plexapi.myplex.MyPlexJWTLogin.checkLogin`. + - The threaded approach expects the developer to call + :func:`~plexapi.myplex.MyPlexJWTLogin.run` and then at a later time call + :func:`~plexapi.myplex.MyPlexJWTLogin.waitForLogin` to wait for and check the result. + - The callback approach is an extension of the threaded approach and expects the developer + to pass the ``callback`` parameter to the call to :func:`~plexapi.myplex.MyPlexJWTLogin.run`. + The callback will be called when the thread waiting for the PIN login to succeed either + finishes or expires. The parameter passed to the callback is the received authentication + token or ``None`` if the login expired. + + Parameters: + session (requests.Session, optional): Use your own session object if you want to + cache the http responses from Plex. + requestTimeout (int, optional): Timeout in seconds on initial connect to plex.tv (default config.TIMEOUT). + headers (dict, optional): A dict of X-Plex headers to send with requests. + oauth (bool, optional): True to use Plex OAuth instead of PIN login. + token (str, optional): Plex token only required to register the device initially if not using OAuth. + jwtToken (str, optional): Existing Plex JWT to verify or refresh. + keypair (tuple[str|bytes], optional): A tuple of the full file paths (str) to the ED25519 private and public + key pair or the raw keys themselves (bytes) to use for signing the JWT. + Use :func:`~plexapi.myplex.MyPlexJWTLogin.generateKeypair` to generate a new keypair if not provided. + scope (list[str], optional): List of scopes to request in the new token. + Default is all available scopes. + + Attributes: + PINS (str): 'https://plex.tv/api/v2/pins' + AUTH (str): 'https://clients.plex.tv/api/v2/auth' + POLLINTERVAL (int): 1 + SCOPES (list): List of all available scopes to request for the JWT. + pin (str): Four character PIN to use for the login at https://plex.tv/link. + finished (bool): Whether the JWT login has finished or not. + expired (bool): Whether the JWT login has expired or not. + jwtToken (str): The Plex JWT received after login or refreshing. + decodedJWT (dict): The decoded Plex JWT payload. + + Example: + + .. code-block:: python + + from plexapi.myplex import MyPlexAccount, MyPlexJWTLogin + + # Method 1: Generate a new Plex JWT using Plex OAuth + jwtlogin = MyPlexJWTLogin( + oauth=True, + scopes=['username', 'email', 'friendly_name'] + ) + jwtlogin.generateKeypair(keyfiles=('private.key', 'public.key')) + jwtlogin.run() + print(f'Login to Plex at the following url:\\n{jwtlogin.oauthUrl()}') + jwtlogin.waitForLogin() + jwtToken = jwtlogin.jwtToken + + account = MyPlexAccount(token=jwtToken) + + # Method 2: Generate a new Plex JWT using an existing Plex token and keypair + jwtlogin = MyPlexJWTLogin( + token='2ffLuB84dqLswk9skLos', + keypair=('private.key', 'public.key'), + scopes=['username', 'email', 'friendly_name'] + ) + jwtlogin.registerDevice() + jwtToken = jwtlogin.refreshJWT() + + account = MyPlexAccount(token=jwtToken) + + # Refresh an existing Plex JWT + jwtlogin = MyPlexJWTLogin( + jwtToken=jwtToken, + keypair=('private.key', 'public.key'), + scopes=['username', 'email', 'friendly_name'] + ) + if not jwtlogin.verifyJWT(): + jwtToken = jwtlogin.refreshJWT() + + account = MyPlexAccount(token=jwtToken) + + """ + PINS = 'https://clients.plex.tv/api/v2/pins' + AUTH = 'https://clients.plex.tv/api/v2/auth' + POLLINTERVAL = 1 + SCOPES = ['username', 'email', 'friendly_name', 'restricted', 'anonymous', 'joinedAt'] + + def __init__(self, session=None, requestTimeout=None, headers=None, oauth=False, + token=None, jwtToken=None, keypair=(None, None), scopes=None): + super(MyPlexJWTLogin, self).__init__() + self._session = session or requests.Session() + self._requestTimeout = requestTimeout or TIMEOUT + self._customHeaders = headers + self._token = token + self._privateKey = utils.openOrRead(keypair[0]) if keypair[0] else None + self._publicKey = utils.openOrRead(keypair[1]) if keypair[1] else None + self._scopes = scopes or self.SCOPES + self._clientJWT = None + + self._oauth = oauth + self._loginTimeout = None + self._callback = None + self._thread = None + self._abort = False + self._id = None + self._code = None + + self.finished = False + self.expired = False + self.jwtToken = jwtToken + + if not jwt: + log.warning('PyJWT package is not installed, cannot use Plex JWT login') + return + + def generateKeypair(self, keyfiles=(None, None), overwrite=False): + """ Generates a new ED25519 private/public keypair for signing the JWT and saves them to files. + Requires the ``cryptography`` package to be installed. + + Parameters: + keyfiles (tuple[str]): A tuple of the full file paths to write the private and public keypair to. + overwrite (bool): Set to True to overwrite existing keypair files. Default is False. + + Raises: + :exc:`FileExistsError`: when keypair files already exist and overwrite is False. + """ + if not cryptography: + log.warning('Cryptography package is not installed, cannot generate ED25519 keypair') + return + + privateKey = ed25519.Ed25519PrivateKey.generate() + publicKey = privateKey.public_key() + self._privateKey = privateKey.private_bytes( + encoding=serialization.Encoding.Raw, + format=serialization.PrivateFormat.Raw, + encryption_algorithm=serialization.NoEncryption() + ) + self._publicKey = publicKey.public_bytes( + encoding=serialization.Encoding.Raw, + format=serialization.PublicFormat.Raw + ) + + if keyfiles[0] and keyfiles[1]: + if not overwrite and (os.path.exists(keyfiles[0]) or os.path.exists(keyfiles[1])): + raise FileExistsError('Keypair files already exist, set overwrite=True to overwrite them.') + + with open(keyfiles[0], 'wb') as privateFile, open(keyfiles[1], 'wb') as publicFile: + privateFile.write(self._privateKey) + publicFile.write(self._publicKey) + + @property + def _clientIdentifier(self): + """ Returns the client identifier from the headers. """ + headers = self._headers() + return headers['X-Plex-Client-Identifier'] + + @property + def _keyID(self): + """ Returns the key ID (thumbprint) for the ED25519 keypair. """ + if not self._privateKey or not self._publicKey: + return None + return hashlib.sha256(self._privateKey + self._publicKey).hexdigest() + + @property + def _privateJWK(self): + """ Returns the private JWK (JSON Web Key) for the ED25519 keypair.""" + return jwt.PyJWK.from_dict({ + 'kty': 'OKP', + 'crv': 'Ed25519', + 'x': utils.base64urlEncode(self._publicKey), + 'd': utils.base64urlEncode(self._privateKey), + 'use': 'sig', + 'alg': 'EdDSA', + 'kid': self._keyID, + }) + + @property + def _publicJWK(self): + """ Returns the public JWK (JSON Web Key) for the ED25519 keypair.""" + return jwt.PyJWK.from_dict({ + 'kty': 'OKP', + 'crv': 'Ed25519', + 'x': utils.base64urlEncode(self._publicKey), + 'use': 'sig', + 'alg': 'EdDSA', + 'kid': self._keyID, + }) + + def _encodeClientJWT(self): + """ Returns the encoded client JWT using the private JWK. """ + payload = { + 'nonce': self._getPlexNonce(), + 'scope': ','.join(self._scopes), + 'aud': 'plex.tv', + 'iss': self._clientIdentifier, + 'iat': int(datetime.now(timezone.utc).timestamp()), + 'exp': int((datetime.now(timezone.utc) + timedelta(minutes=5)).timestamp()), + } + headers = { + 'kid': self._keyID + } + return jwt.encode( + payload, + key=self._privateJWK, + algorithm='EdDSA', + headers=headers + ) + + def _decodePlexJWT(self): + """ Returns the decoded and verified Plex JWT using the Plex public JWK. """ + return jwt.decode( + self.jwtToken, + key=jwt.PyJWK.from_dict(self._getPlexPublicJWK()), + algorithms=['EdDSA'], + options={ + 'require': ['aud', 'iss', 'exp', 'iat', 'thumbprint'] + }, + audience=['plex.tv', self._clientIdentifier], + issuer='plex.tv', + ) + + @property + def decodedJWT(self): + """ Returns the decoded Plex JWT. """ + return self._decodePlexJWT() + + def _registerPlexDevice(self): + """ Registers the public JWK with Plex. """ + url = f'{self.AUTH}/jwk' + headers = self._headers(**{'X-Plex-Token': self._token}) + body = {'jwk': self._publicJWK._jwk_data} + self._query(url, method=self._session.post, headers=headers, json=body) + + def _getPlexNonce(self): + """ Gets a nonce from Plex. """ + url = f'{self.AUTH}/nonce' + data = self._query(url, method=self._session.get) + return data['nonce'] + + def _exchangePlexJWT(self): + """ Exchanges the client JWT for a Plex JWT. """ + url = f'{self.AUTH}/token' + body = {'jwt': self._clientJWT} + data = self._query(url, method=self._session.post, json=body) + return data['auth_token'] + + def _getPlexPublicJWK(self): + """ Gets the Plex public JWK. """ + url = f'{self.AUTH}/keys' + data = self._query(url, method=self._session.get) + return data['keys'][0] + + def registerDevice(self): + """ Registers the device with Plex using the provided token and private/public keypair. + This must be done once if OAuth was not used before the Plex JWT can be refreshed. + + Raises: + :exc:`~plexapi.exceptions.BadRequest`: when token or keypair is missing. + """ + if not self._token: + raise BadRequest('Plex token is required to register device.') + + if not self._privateKey or not self._publicKey: + raise BadRequest('ED25519 private and public keys are required to register device. ' + 'Use generateKeypair() to generate a new keypair.') + + self._registerPlexDevice() + + def refreshJWT(self): + """ Refreshes the Plex JWT using the existing private/public keypair. + + Returns: + str: The new Plex JWT. + + Raises: + :exc:`~plexapi.exceptions.BadRequest`: when keypair is missing. + :exc:`~plexapi.exceptions.BadRequest`: when the newly obtained JWT cannot be verified. + """ + if not self._privateKey or not self._publicKey: + raise BadRequest('ED25519 private and public keys are required to refresh JWT.') + + self._clientJWT = self._encodeClientJWT() + self.jwtToken = self._exchangePlexJWT() + if self.verifyJWT(): + return self.jwtToken + raise BadRequest('Failed to verify newly obtained JWT.') + + def verifyJWT(self, refreshWithinDays=1): + """ Verifies the existing Plex JWT is valid and not expiring within the specified number of days. + + Parameters: + refreshWithinDays (int): Number of days before expiration to consider + the JWT invalid and in need of refresh. Default is 1 day. + """ + try: + decodedJWT = self.decodedJWT + except jwt.ExpiredSignatureError: + log.warning('Existing JWT has expired') + return False + except jwt.InvalidSignatureError: + log.warning('Existing JWT has invalid signature') + return False + except jwt.InvalidTokenError as e: + log.warning(f'Existing JWT is invalid: {e}') + return False + else: + if decodedJWT['thumbprint'] != self._keyID: + log.warning('Existing JWT was signed with a different key') + return False + elif decodedJWT['exp'] < int((datetime.now(timezone.utc) + timedelta(days=refreshWithinDays)).timestamp()): + log.warning(f'Existing JWT is expiring within {refreshWithinDays} day(s)') + return False + return True + + @property + def pin(self): + """ Return the four character PIN used for linking a device at + https://plex.tv/link. + """ + if self._oauth: + raise BadRequest('Cannot use PIN for Plex OAuth login') + return self._code + + def oauthUrl(self, forwardUrl=None): + """ Return the Plex OAuth url for login. + + Parameters: + forwardUrl (str, optional): The url to redirect the client to after login. + """ + if not self._oauth: + raise BadRequest('Must use "MyPlexJWTLogin(oauth=True)" for Plex OAuth login.') + + headers = self._headers() + params = { + 'clientID': headers['X-Plex-Client-Identifier'], + 'context[device][product]': headers['X-Plex-Product'], + 'context[device][version]': headers['X-Plex-Version'], + 'context[device][platform]': headers['X-Plex-Platform'], + 'context[device][platformVersion]': headers['X-Plex-Platform-Version'], + 'context[device][device]': headers['X-Plex-Device'], + 'context[device][deviceName]': headers['X-Plex-Device-Name'], + 'code': self._code + } + if forwardUrl: + params['forwardUrl'] = forwardUrl + + return f'https://app.plex.tv/auth/#!?{urlencode(params)}' + + def run(self, callback=None, timeout=120): + """ Starts the thread which monitors the PIN login state. + + Parameters: + callback (Callable[str], optional): Callback called with the received authentication token. + timeout (int, optional): Timeout in seconds to wait for user login. Default 120 seconds. + + Raises: + :exc:`RuntimeError`: If the thread is already running. + :exc:`RuntimeError`: If the PIN login for the current PIN has expired. + """ + if self._thread and not self._abort: + raise RuntimeError('MyPlexJWTLogin thread is already running') + if self.expired: + raise RuntimeError('MyPlexJWTLogin has expired') + + self._getCode() + self._clientJWT = self._encodeClientJWT() + + self._loginTimeout = timeout + self._callback = callback + self._abort = False + self.finished = False + self._thread = threading.Thread(target=self._pollLogin, name='plexapi.myplex.MyPlexJWTLogin') + self._thread.start() + + def waitForLogin(self): + """ Waits for the user login to succeed or expire. + + Returns: + bool: ``True`` if the user login succeeded or ``False`` otherwise. + """ + if not self._thread or self._abort: + return False + + self._thread.join() + if self.expired or not self.jwtToken: + return False + + return True + + def stop(self): + """ Stops the thread monitoring the user login state. """ + if not self._thread or self._abort: + return + + self._abort = True + self._thread.join() + + def checkLogin(self): + """ Returns ``True`` if the user login has succeeded. """ + if self._thread: + return False + + try: + return self._checkLogin() + except Exception: + self.expired = True + self.finished = True + + return False + + def _getCode(self): + url = self.PINS + + if self._oauth: + body = { + 'jwk': self._publicJWK._jwk_data, + 'strong': True, + } + else: + body = { + 'jwk': self._publicJWK._jwk_data, + } + + response = self._query(url, self._session.post, json=body) + if response is None: + return None + + self._id = response.get('id') + self._code = response.get('code') + + return self._code + + def _checkLogin(self): + if not self._id: + return False + + if self.jwtToken: + return True + + url = f'{self.PINS}/{self._id}' + params = {'deviceJWT': self._clientJWT} + response = self._query(url, params=params) + if response is None: + return False + + token = response.get('authToken') + if not token: + return False + + self.jwtToken = token + self.finished = True + return True + + def _pollLogin(self): + try: + start = time.time() + while not self._abort and (not self._loginTimeout or (time.time() - start) < self._loginTimeout): + try: + result = self._checkLogin() + except Exception: + self.expired = True + break + + if result: + break + + time.sleep(self.POLLINTERVAL) + + if self.jwtToken and self._callback: + self._callback(self.jwtToken) + finally: + self.finished = True + + def _headers(self, **kwargs): + """ Returns dict containing base headers for all requests for Plex JWT login. """ + headers = BASE_HEADERS.copy() + if self._customHeaders: + headers.update(self._customHeaders) headers.update(kwargs) + headers['Accept'] = 'application/json' return headers def _query(self, url, method=None, headers=None, **kwargs): @@ -1894,6 +2426,8 @@ def _query(self, url, method=None, headers=None, **kwargs): codename = codes.get(response.status_code)[0] errtext = response.text.replace('\n', ' ') raise BadRequest(f'({response.status_code}) {codename} {response.url}; {errtext}') + if 'application/json' in response.headers.get('Content-Type', ''): + return response.json() return utils.parseXMLString(response.text) diff --git a/plexapi/utils.py b/plexapi/utils.py index bbff6a8e0..f32654876 100644 --- a/plexapi/utils.py +++ b/plexapi/utils.py @@ -9,6 +9,7 @@ import sys import time import unicodedata +import uuid import warnings import zipfile from collections import deque @@ -532,18 +533,22 @@ def getMyPlexAccount(opts=None): # pragma: no cover return MyPlexAccount(username, password) -def createMyPlexDevice(headers, account, timeout=10): # pragma: no cover +def createMyPlexDevice(headers=None, account=None, timeout=10): # pragma: no cover """ Helper function to create a new MyPlexDevice. Returns a new MyPlexDevice instance. Parameters: headers (dict): Provide the X-Plex- headers for the new device. - A unique X-Plex-Client-Identifier is required. + A unique X-Plex-Client-Identifier is required or one will be generated if not provided. account (MyPlexAccount): The Plex account to create the device on. timeout (int): Timeout in seconds to wait for device login. """ from plexapi.myplex import MyPlexPinLogin - if 'X-Plex-Client-Identifier' not in headers: + if not headers: + client_identifier = generateUUID() + headers = {'X-Plex-Client-Identifier': client_identifier} + print(f'No X-Plex-Client-Identifier provided, generated: {client_identifier}') + elif 'X-Plex-Client-Identifier' not in headers: raise BadRequest('The X-Plex-Client-Identifier header is required.') clientIdentifier = headers['X-Plex-Client-Identifier'] @@ -561,19 +566,22 @@ def plexOAuth(headers, forwardUrl=None, timeout=120): # pragma: no cover Parameters: headers (dict): Provide the X-Plex- headers for the new device. - A unique X-Plex-Client-Identifier is required. + A unique X-Plex-Client-Identifier is required or one will be generated if not provided. forwardUrl (str, optional): The url to redirect the client to after login. - timeout (int, optional): Timeout in seconds to wait for device login. Default 120 seconds. + timeout (int, optional): Timeout in seconds to wait for user login. Default 120 seconds. """ from plexapi.myplex import MyPlexAccount, MyPlexPinLogin - if 'X-Plex-Client-Identifier' not in headers: + if not headers: + client_identifier = generateUUID() + headers = {'X-Plex-Client-Identifier': client_identifier} + print(f'No X-Plex-Client-Identifier provided, generated: {client_identifier}') + elif 'X-Plex-Client-Identifier' not in headers: raise BadRequest('The X-Plex-Client-Identifier header is required.') pinlogin = MyPlexPinLogin(headers=headers, oauth=True) - print('Login to Plex at the following url:') - print(pinlogin.oauthUrl(forwardUrl)) pinlogin.run(timeout=timeout) + print(f'Login to Plex at the following url:\n{pinlogin.oauthUrl(forwardUrl=forwardUrl)}') pinlogin.waitForLogin() if pinlogin.token: @@ -583,6 +591,44 @@ def plexOAuth(headers, forwardUrl=None, timeout=120): # pragma: no cover print('Login failed.') +def plexJWTAuth(headers=None, forwardUrl=None, timeout=120, keypair=(None, None), scopes=None): # pragma: no cover + """ Helper function for Plex JWT authentication using Plex OAuth. Returns a new MyPlexAccount instance. + + Parameters: + headers (dict, optional): Provide the X-Plex- headers for the new device. + A unique X-Plex-Client-Identifier is required or one will be generated if not provided. + forwardUrl (str, optional): The url to redirect the client to after login. + timeout (int, optional): Timeout in seconds to wait for user login. Default 120 seconds. + keypair (tuple, optional): A tuple of the ED25519 (privateKey, publicKey) to use for signing the JWT. + If not provided, a new keypair will be generated and saved to 'private.key' and 'public.key'. + scopes (list[str], optional): List of scopes to request in the JWT. + """ + from plexapi.myplex import MyPlexAccount, MyPlexJWTLogin + + if not headers: + client_identifier = generateUUID() + headers = {'X-Plex-Client-Identifier': client_identifier} + print(f'No X-Plex-Client-Identifier provided, generated: {client_identifier}') + elif 'X-Plex-Client-Identifier' not in headers: + raise BadRequest('The X-Plex-Client-Identifier header is required.') + + jwtlogin = MyPlexJWTLogin(headers=headers, oauth=True, keypair=keypair, scopes=scopes) + + if not keypair[0] or not keypair[1]: + jwtlogin.generateKeypair(keyfiles=('private.key', 'public.key')) + print('Generated new ED25519 keypair and saved to "private.key" and "public.key".') + + jwtlogin.run(timeout=timeout) + print(f'Login to Plex at the following url:\n{jwtlogin.oauthUrl(forwardUrl=forwardUrl)}') + jwtlogin.waitForLogin() + + if jwtlogin.jwtToken: + print('JWT authentication successful!') + return MyPlexAccount(token=jwtlogin.jwtToken) + else: + print('JWT authentication failed.') + + def choose(msg, items, attr): # pragma: no cover """ Command line helper to display a list of choices, asking the user to choose one of the options. @@ -625,6 +671,10 @@ def base64str(text): return base64.b64encode(text.encode('utf-8')).decode('utf-8') +def base64urlEncode(data: bytes) -> str: + return base64.urlsafe_b64encode(data).rstrip(b'=').decode('utf-8') + + def deprecated(message, stacklevel=2): def decorator(func): """This is a decorator which can be used to mark functions @@ -667,6 +717,8 @@ def serialize(obj): def openOrRead(file): + if isinstance(file, bytes): + return file if hasattr(file, 'read'): return file.read() with open(file, 'rb') as f: @@ -729,3 +781,7 @@ def parseXMLString(s: str): except ElementTree.ParseError: # If it fails, clean the string and try again cleaned_s = cleanXMLString(s).encode('utf-8') return ElementTree.fromstring(cleaned_s) if cleaned_s.strip() else None + + +def generateUUID() -> str: + return str(uuid.uuid4()) diff --git a/pyproject.toml b/pyproject.toml index 212267f24..2f17c8300 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -17,6 +17,7 @@ dynamic = ["version"] [project.optional-dependencies] alert = ["websocket-client>=1.3.3"] +jwt = ["pyjwt[crypto]"] [project.urls] Homepage = "https://github.com/pushingkarmaorg/python-plexapi" diff --git a/requirements_dev.txt b/requirements_dev.txt index 74e555424..c3486bed8 100644 --- a/requirements_dev.txt +++ b/requirements_dev.txt @@ -4,6 +4,7 @@ #--------------------------------------------------------- flake8==7.3.0 pillow==11.3.0 +pyjwt[crypto]==2.10.1 pytest==8.4.1 pytest-cache==1.0 pytest-cov==6.2.1