diff --git a/Dockerfile.plex_request b/Dockerfile.plex_request index 95ed726..d36838c 100644 --- a/Dockerfile.plex_request +++ b/Dockerfile.plex_request @@ -20,4 +20,4 @@ RUN grep -E "#.*($SERVICE_NAME|all)" requirements.txt | awk '{print $0}' > servi COPY . . # Run gunicorn using Unix socket -CMD ["gunicorn", "--bind", "unix:/app/sockets/plex_request.sock", "plex_request_wsgi:app"] \ No newline at end of file +CMD ["gunicorn", "--bind", "unix:/app/sockets/plex_request.sock", "--threads", "100", "plex_request_wsgi:app"] \ No newline at end of file diff --git a/plex_request.py b/plex_request.py index 407bbd6..9c02908 100644 --- a/plex_request.py +++ b/plex_request.py @@ -1,11 +1,18 @@ +import asyncio from datetime import datetime import os +from threading import Thread import traceback import re import requests +import json +import uuid +import websockets import declxml as xml from flask import Flask, jsonify, request, Response from flask_caching import Cache +from flask_sock import Sock +from websockets.asyncio.client import connect from shared.discord import discordError, discordUpdate from shared.shared import plex, plexHeaders, pathToScript from shared.overseerr import requestItem, getUserForPlexServerToken @@ -28,6 +35,67 @@ def to_python(self, value): def to_url(self, value): return value + +class PlexWebSocketMiddleware: + def __init__(self): + self.clients = set() + self.plexWs = None + + async def addClient(self, ws): + self.clients.add(ws) + if not self.plexWs: + # Connect to real Plex server WebSocket using same path and query params + path = ws.environ.get('RAW_URI') + wsUrl = re.sub(r'^http(s)?://', lambda m: f'ws{m.group(1) or ""}://', plex['serverHost']) + + print('connecting') + self.plexWs = await connect( + f"{wsUrl}{path}", + additional_headers={ + key[5:].replace('_', '-').lower(): value + for key, value in ws.environ.items() + if key.startswith('HTTP_') + } + ) + print('connected') + # Start forwarding Plex messages + await asyncio.create_task(self.forwardPlexMessages()) + + async def removeClient(self, ws): + self.clients.remove(ws) + if not self.clients and self.plexWs: + await self.plexWs.close() + self.plexWs = None + + async def forwardPlexMessages(self): + try: + print('forwarding all messages') + async for message in self.plexWs: + print('forwarding message') + print(message) + await self.broadcast(message) + except websockets.exceptions.ConnectionClosed: + print('connection closed') + self.plexWs = None + + async def broadcast(self, message): + if self.clients: + deadClients = set() + for client in self.clients: + try: + client.send(message) + except Exception as e: + print('dead client') + print(e) + deadClients.add(client) + # Clean up dead connections + for dead in deadClients: + await self.removeClient(dead) + + async def injectNotification(self, message): + print('injecting notification') + print(message) + await self.broadcast(json.dumps(message)) # Instantiate the app app = Flask(__name__) @@ -80,13 +148,96 @@ def traverse(key, value, processDict, processList, processElse): else: return processElse(key, value) -@app.route('/library/request///', methods=['GET']) -@app.route('/library/request////', methods=['GET']) -@app.route('/library/request////season/', methods=['GET']) -@app.route('/library/request////season//', methods=['GET']) +sock = Sock(app) + +# Create singleton middleware instance +wsMiddleware = PlexWebSocketMiddleware() + +async def checkRequestStatus(ratingKey, mediaType, mediaTypeNum, requestKey): + try: + # Generate a unique UUID for this refresh session + uuid_str = str(uuid.uuid4()) + + # Timeline notifications + await wsMiddleware.injectNotification({ + "NotificationContainer": { + "type": "timeline", + "size": 1, + "TimelineEntry": [ + { + "identifier": "com.plexapp.plugins.library", + "sectionID": "1", + "itemID": requestKey, + "type": 1, + "title": "Big Buck Bunny (2008)", + "state": 3, + "metadataState": "queued", + "updatedAt": 1736128693 + } + ] + } + }) + + # await asyncio.sleep(5) + + await wsMiddleware.injectNotification({ + "NotificationContainer": { + "type": "timeline", + "size": 1, + "TimelineEntry": [ + { + "identifier": "com.plexapp.plugins.library", + "sectionID": "1", + "itemID": requestKey, + "type": 1, + "title": "Big Buck Bunny (2008)", + "state": 5, + "updatedAt": 1736128693 + } + ] + } + }) + + except Exception as e: + print(f"Error checking request status: {e}") + +@sock.route('/:/websockets/notifications') +def websocketEndpoint(ws): + # Create event loop for this thread if it doesn't exist + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + + async def handleWebsocket(): + try: + # Add client and set up connection + print('Starting websocket') + await wsMiddleware.addClient(ws) + + while True: + try: + # Use synchronous receive from Flask-Sock + message = ws.receive() + print('Receiving: ') + print(message) + except: + break + finally: + # Clean up when connection ends + print('Ending websocket') + await wsMiddleware.removeClient(ws) + + # Run the async handler in the event loop + loop.run_until_complete(handleWebsocket()) + +@app.route('/library/metadata/--', methods=['GET']) +@app.route('/library/metadata/--/', methods=['GET']) +@app.route('/library/metadata/---season-', methods=['GET']) +@app.route('/library/metadata/---season-/', methods=['GET']) def libraryRequest(mediaType, mediaTypeNum, ratingKey, season=None, children=None): + print('libraryRequest') token = request.headers.get('X-Plex-Token', None) or request.args.get('X-Plex-Token', None) originalRatingKey = ratingKey + requestKey = f"{mediaType}-{mediaTypeNum}-{originalRatingKey}{f'-season-{season}' if mediaTypeNum == mediaTypeNums['season'] else ''}" try: if not mediaTypeNum or mediaTypeNum not in mediaTypeNums.values(): @@ -129,7 +280,7 @@ def libraryRequest(mediaType, mediaTypeNum, ratingKey, season=None, children=Non if not children: if mediaTypeNum == mediaTypeNums['season']: - mediaContainer['Metadata'][0]['key'] = f"/library/request/{mediaType}/{mediaTypeNum}/{ratingKey}/season/{season}/children" + mediaContainer['Metadata'][0]['key'] = f"/library/metadata/{mediaType}-{mediaTypeNum}-{ratingKey}-season-{season}/children" response = jsonify(all) response.headers.add('Access-Control-Allow-Origin', 'https://app.plex.tv') @@ -182,9 +333,9 @@ def libraryRequest(mediaType, mediaTypeNum, ratingKey, season=None, children=Non item['title'] = f"{title} - Requesting..." if mediaTypeNum == mediaTypeNums['season']: - item['key'] = f"/library/request/{mediaType}/{mediaTypeNum}/{originalRatingKey}/season/{season}/children" + item['key'] = f"/library/metadata/{mediaType}-{mediaTypeNum}-{originalRatingKey}-season-{season}/children" else: - item['key'] = f"/library/request/{mediaType}/{mediaTypeNum}/{originalRatingKey}/children" + item['key'] = f"/library/metadata/{mediaType}-{mediaTypeNum}-{originalRatingKey}/children" response = jsonify(metadata) response.headers.add('Access-Control-Allow-Origin', 'https://app.plex.tv') @@ -201,10 +352,12 @@ def libraryRequest(mediaType, mediaTypeNum, ratingKey, season=None, children=Non finally: if not locals().get('skipRequest', False): title = locals().get('title', 'Untitled') - requestMedia(token, originalRatingKey, mediaType, season, title) + requestMedia(token, originalRatingKey, mediaType, season, title, mediaTypeNum, requestKey) -def requestMedia(token, ratingKey, mediaType, season, title): +def requestMedia(token, ratingKey, mediaType, season, title, mediaTypeNum, requestKey): + print('requestMedia') + try: cacheKey = ratingKey if mediaType == 'movie' else f"{ratingKey}_{season}" recentlyRequested = cache.get(cacheKey) or [] @@ -216,9 +369,27 @@ def requestMedia(token, ratingKey, mediaType, season, title): recentlyRequested.append(token) cache.set(cacheKey, recentlyRequested) + + # loop = asyncio.new_event_loop() + # asyncio.set_event_loop(loop) + + # Start status checking in background + async def runStatusCheck(): + attempts = 0 + while attempts < 30: # Run for 5 minutes (300 seconds) + attempts += 1 + await asyncio.sleep(10) + await checkRequestStatus(ratingKey, mediaType, mediaTypeNum, requestKey) + + # Run the async handler in the event loop + # loop.run_until_complete(runStatusCheck()) + + # await runStatusCheck() + Thread(target=lambda: asyncio.run(runStatusCheck())).start() print(f"{title} - Requested by {user['displayName']} via Plex Request") discordUpdate(f"{title} - Requested by {user['displayName']} via Plex Request", f"User Id: {user['id']}, Media Type: {mediaType}, {f'Season: {season},' if season else ''} Rating Key: {ratingKey}") + except: e = traceback.format_exc() print(f"Error in request") @@ -262,11 +433,12 @@ def all(): additionalMetadata = metadataAllRequest.json()['MediaContainer']['Metadata'][0] if mediaTypeNum == mediaTypeNums['season'] or mediaTypeNum == mediaTypeNums['episode']: - additionalMetadata['key'] = f"/library/request/{mediaType}/{mediaTypeNum}/{guid}/season/{season}" + additionalMetadata['key'] = f"/library/metadata/{mediaType}-{mediaTypeNum}-{guid}-season-{season}" + additionalMetadata['ratingKey'] = f"{mediaType}-{mediaTypeNum}-{guid}-season-{season}" else: - additionalMetadata['key'] = f"/library/request/{mediaType}/{mediaTypeNum}/{guid}" + additionalMetadata['key'] = f"/library/metadata/{mediaType}-{mediaTypeNum}-{guid}" + additionalMetadata['ratingKey'] = f"{mediaType}-{mediaTypeNum}-{guid}" - additionalMetadata['ratingKey'] = "12065" additionalMetadata['librarySectionTitle'] = "Request Season :" if mediaTypeNum == mediaTypeNums['episode'] else "Request :" additionalMetadata['librarySectionID'] = libraryId additionalMetadata['librarySectionKey'] = f"/library/sections/{libraryId}" @@ -389,8 +561,8 @@ def addRequestableSeasons(mediaContainer, seasons, ratingKey): for item in allSeasons: if item['index'] not in existingMetadataIndices: item['title'] = f"Request - {item.get('title', '')}" - item['key'] = f"/library/request/show/{mediaTypeNums['season']}/{ratingKey}/season/{item['index']}" - item['ratingKey'] = "12065" + item['key'] = f"/library/metadata/show-{mediaTypeNums['season']}-{ratingKey}-season-{item['index']}" + item['ratingKey'] = f"show-{mediaTypeNums['season']}-{ratingKey}-season-{item['index']}" item.pop('Guid', None) item.pop('Image', None) item.pop('Role', None) diff --git a/plex_request_nginx_default.conf b/plex_request_nginx_default.conf index 5a36fdb..2e4e4a7 100644 --- a/plex_request_nginx_default.conf +++ b/plex_request_nginx_default.conf @@ -191,9 +191,19 @@ server { # access_log logs/plex.access.log; } - location /library/request/ { + location ~ "^/library/metadata/(.*-.*){2,4}" { proxy_pass http://unix:/app/sockets/plex_request.sock; - + + # access_log logs/plex.access.log; + } + + location /:/websockets/notifications { + proxy_pass http://unix:/app/sockets/plex_request.sock; + proxy_http_version 1.1; + proxy_set_header Upgrade $http_upgrade; + proxy_set_header Connection "Upgrade"; + proxy_read_timeout 86400; + # access_log logs/plex.access.log; } } diff --git a/requirements.txt b/requirements.txt index d877fce..a330bbf 100644 --- a/requirements.txt +++ b/requirements.txt @@ -7,6 +7,8 @@ watchdog==4.0.0 #blackhole Flask-Caching==2.1.0 #plex_request declxml==1.1.3 #plex_request +websockets==14.1 #plex_request +flask-sock==0.7.0 #plex_request Werkzeug==3.0.1 #plex_authentication, blackhole flask==3.0.2 #plex_authentication, plex_request