Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
169 changes: 169 additions & 0 deletions backend/endpoints/activity.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,169 @@
from datetime import datetime, timezone

Check failure on line 1 in backend/endpoints/activity.py

View workflow job for this annotation

GitHub Actions / Trunk Check

black

Incorrect formatting, autoformat by running 'trunk fmt'

import socketio # type: ignore
from fastapi import HTTPException, Request, status
from pydantic import BaseModel, Field

from config import REDIS_URL
from decorators.auth import protected_route
from endpoints.responses.activity import ActivityClearSchema, ActivityEntrySchema
from handler.activity_handler import ActivityEntry, activity_handler
from handler.auth.constants import Scope
from handler.database import db_device_handler, db_rom_handler
from logger.logger import log
from utils.router import APIRouter

router = APIRouter(
prefix="/activity",
tags=["activity"],
)


class DeviceHeartbeatPayload(BaseModel):
rom_id: int = Field(ge=1)
device_id: str = Field(min_length=1, max_length=255)


def _get_socket_manager() -> socketio.AsyncRedisManager:
"""Create a write-only Redis manager for emitting from REST endpoints."""
return socketio.AsyncRedisManager(REDIS_URL, write_only=True)


def _build_activity_entry(
*,
user_id: int,
username: str,
avatar_path: str,
rom_id: int,
rom_name: str,
rom_cover_path: str,
platform_slug: str,
platform_name: str,
device_id: str,
device_type: str,
started_at: str,
) -> ActivityEntry:
return ActivityEntry(
user_id=user_id,
username=username,
avatar_path=avatar_path,
rom_id=rom_id,
rom_name=rom_name,
rom_cover_path=rom_cover_path,
platform_slug=platform_slug,
platform_name=platform_name,
device_id=device_id,
device_type=device_type,
started_at=started_at,
)


@protected_route(router.get, "", [Scope.ROMS_USER_READ])
async def get_all_activity(request: Request) -> list[ActivityEntrySchema]:
"""Return every currently active play session across all users."""
entries = await activity_handler.get_all_active()
return [ActivityEntrySchema(**e) for e in entries]


@protected_route(router.get, "/rom/{rom_id}", [Scope.ROMS_USER_READ])
async def get_rom_activity(
request: Request, rom_id: int
) -> list[ActivityEntrySchema]:
"""Return all active play sessions for a specific ROM."""
entries = await activity_handler.get_active_for_rom(rom_id)
return [ActivityEntrySchema(**e) for e in entries]


@protected_route(router.post, "/heartbeat", [Scope.ROMS_USER_WRITE])
async def device_heartbeat(
request: Request, payload: DeviceHeartbeatPayload
) -> ActivityEntrySchema:
"""Heartbeat endpoint for external devices (muOS, Android, etc.).

Called periodically by devices while the user is playing a game. Writes
activity state to Redis and broadcasts an ``activity:update`` event over
the main Socket.IO namespace.
"""
rom = db_rom_handler.get_rom(payload.rom_id)
if rom is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"ROM {payload.rom_id} not found",
)

device = db_device_handler.get_device(
device_id=payload.device_id, user_id=request.user.id
)
if device is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Device {payload.device_id} not found for this user",
)

# Preserve the started_at from the existing entry if we are refreshing.
existing = await activity_handler.get_active(request.user.id, device.id)
started_at = (
existing["started_at"]
if existing
else datetime.now(timezone.utc).isoformat()
)

platform = rom.platform
entry = _build_activity_entry(
user_id=request.user.id,
username=request.user.username,
avatar_path=request.user.avatar_path or "",
rom_id=rom.id,
rom_name=rom.name or rom.fs_name,
rom_cover_path=rom.path_cover_s or "",
platform_slug=platform.slug if platform else "",
platform_name=(platform.custom_name or platform.name) if platform else "",
device_id=device.id,
device_type=device.client or "unknown",
started_at=started_at,
)

await activity_handler.set_active(entry)

# Update the device last_seen as a side-effect (mirrors play session ingest).
db_device_handler.update_last_seen(device_id=device.id, user_id=request.user.id)

# Broadcast to all connected sockets.
try:
sm = _get_socket_manager()
await sm.emit("activity:update", dict(entry))
except Exception as e: # noqa: BLE001
log.warning(f"Failed to broadcast activity:update for user {request.user.id}: {e}")

return ActivityEntrySchema(**entry)


@protected_route(
router.delete,
"/heartbeat",
[Scope.ROMS_USER_WRITE],
status_code=status.HTTP_204_NO_CONTENT,
)
async def clear_device_activity(
request: Request, device_id: str
) -> None:
"""Immediately clear an active session for a device (e.g. on graceful exit)."""
rom_id = await activity_handler.clear_active(request.user.id, device_id)
if rom_id is None:
return None

try:
sm = _get_socket_manager()
await sm.emit(
"activity:clear",
ActivityClearSchema(
user_id=request.user.id,
device_id=device_id,
rom_id=rom_id,
).model_dump(),
)
except Exception as e: # noqa: BLE001
log.warning(
f"Failed to broadcast activity:clear for user {request.user.id}: {e}"
)
return None
21 changes: 21 additions & 0 deletions backend/endpoints/responses/activity.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
from .base import BaseModel


class ActivityEntrySchema(BaseModel):
user_id: int
username: str
avatar_path: str
rom_id: int
rom_name: str
rom_cover_path: str = ""
platform_slug: str
platform_name: str
device_id: str
device_type: str
started_at: str


class ActivityClearSchema(BaseModel):
user_id: int
device_id: str
rom_id: int
206 changes: 206 additions & 0 deletions backend/endpoints/sockets/activity.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,206 @@
"""Socket.IO events for real-time user game activity.

Handles:
- activity:start - client reports starting a game (emits activity:update)
- activity:heartbeat - client refreshes TTL while playing (emits activity:update)
- activity:stop - client reports stopping (emits activity:clear)
- disconnect - safety net: clears any activity registered for the socket

All events broadcast to every connected client on the main `/ws` namespace.
"""

from __future__ import annotations

from datetime import datetime, timezone
from typing import TypedDict

from endpoints.responses.activity import ActivityClearSchema
from handler.activity_handler import ActivityEntry, activity_handler
from handler.database import db_rom_handler, db_user_handler
from handler.socket_handler import socket_handler
from logger.logger import log


class ActivityEventPayload(TypedDict, total=False):
rom_id: int
user_id: int
device_id: str


def _empty_string(value: object) -> str:
if value is None:
return ""
return str(value)


async def _store_session(sid: str, user_id: int, device_id: str) -> None:
"""Remember the user/device associated with a socket for disconnect cleanup."""
try:
existing = await socket_handler.socket_server.get_session(sid) or {}
except KeyError:
existing = {}
existing["activity_user_id"] = user_id
existing["activity_device_id"] = device_id
await socket_handler.socket_server.save_session(sid, existing)


async def _build_entry(
*, user_id: int, device_id: str, rom_id: int, preserve_started_at: bool
) -> ActivityEntry | None:
"""Look up DB info and assemble an ActivityEntry. Returns None if invalid."""
user = db_user_handler.get_user(user_id)
if user is None:
log.debug(f"activity: unknown user_id {user_id}")
return None

rom = db_rom_handler.get_rom(rom_id)
if rom is None:
log.debug(f"activity: unknown rom_id {rom_id}")
return None

platform = rom.platform
started_at = datetime.now(timezone.utc).isoformat()

if preserve_started_at:
existing = await activity_handler.get_active(user_id, device_id)
if existing:
started_at = existing["started_at"]

# Infer device_type: web is the default for browser-emitted events.
device_type = "web"
if device_id != "web":
# The browser may pass its device_id (a UUID) - we still treat it as "web"
# because Socket.IO events are only emitted from browser clients.
device_type = "web"

return ActivityEntry(
user_id=user.id,
username=user.username,
avatar_path=_empty_string(user.avatar_path),
rom_id=rom.id,
rom_name=rom.name or rom.fs_name,
rom_cover_path=_empty_string(rom.path_cover_s),
platform_slug=_empty_string(platform.slug) if platform else "",
platform_name=_empty_string(
(platform.custom_name or platform.name) if platform else ""
),
device_id=device_id,
device_type=device_type,
started_at=started_at,
)


def _extract_payload(data: object) -> tuple[int | None, str | None, int | None]:
"""Return ``(user_id, device_id, rom_id)`` parsed from an event payload."""
if not isinstance(data, dict):
return None, None, None
try:
user_id = int(data.get("user_id")) if data.get("user_id") is not None else None

Check failure on line 98 in backend/endpoints/sockets/activity.py

View workflow job for this annotation

GitHub Actions / Trunk Check

mypy(arg-type)

[new] Argument 1 to "int" has incompatible type "Any | None"; expected "str | Buffer | SupportsInt | SupportsIndex | SupportsTrunc"
except (TypeError, ValueError):
user_id = None
device_id = data.get("device_id")
if not isinstance(device_id, str) or not device_id:
device_id = None
try:
rom_id = int(data.get("rom_id")) if data.get("rom_id") is not None else None

Check failure on line 105 in backend/endpoints/sockets/activity.py

View workflow job for this annotation

GitHub Actions / Trunk Check

mypy(arg-type)

[new] Argument 1 to "int" has incompatible type "Any | None"; expected "str | Buffer | SupportsInt | SupportsIndex | SupportsTrunc"
except (TypeError, ValueError):
rom_id = None
return user_id, device_id, rom_id


@socket_handler.socket_server.on("activity:start") # type: ignore
async def activity_start(sid: str, data: ActivityEventPayload) -> None:
user_id, device_id, rom_id = _extract_payload(data)
if user_id is None or device_id is None or rom_id is None:
log.debug(f"activity:start ignored (invalid payload): {data}")
return

entry = await _build_entry(
user_id=user_id,
device_id=device_id,
rom_id=rom_id,
preserve_started_at=False,
)
if entry is None:
return

await activity_handler.set_active(entry)
await _store_session(sid, user_id, device_id)
await socket_handler.socket_server.emit("activity:update", dict(entry))


@socket_handler.socket_server.on("activity:heartbeat") # type: ignore
async def activity_heartbeat(sid: str, data: ActivityEventPayload) -> None:
user_id, device_id, rom_id = _extract_payload(data)
if user_id is None or device_id is None or rom_id is None:
return

entry = await _build_entry(
user_id=user_id,
device_id=device_id,
rom_id=rom_id,
preserve_started_at=True,
)
if entry is None:
return

await activity_handler.set_active(entry)
await _store_session(sid, user_id, device_id)
await socket_handler.socket_server.emit("activity:update", dict(entry))


@socket_handler.socket_server.on("activity:stop") # type: ignore
async def activity_stop(sid: str, data: ActivityEventPayload | None = None) -> None:
user_id: int | None = None
device_id: str | None = None

if data:
user_id, device_id, _ = _extract_payload(data)

# Fall back to the stored session if the payload is missing fields.
if user_id is None or device_id is None:
try:
session = await socket_handler.socket_server.get_session(sid) or {}
except KeyError:
session = {}
user_id = user_id if user_id is not None else session.get("activity_user_id")
device_id = device_id if device_id else session.get("activity_device_id")

if user_id is None or not device_id:
return

rom_id = await activity_handler.clear_active(int(user_id), device_id)
if rom_id is None:
return

await socket_handler.socket_server.emit(
"activity:clear",
ActivityClearSchema(
user_id=int(user_id), device_id=device_id, rom_id=rom_id
).model_dump(),
)


@socket_handler.socket_server.on("disconnect") # type: ignore
async def activity_on_disconnect(sid: str) -> None:
"""Safety net: clear any activity tied to a disconnecting socket."""
try:
session = await socket_handler.socket_server.get_session(sid) or {}
except KeyError:
return

user_id = session.get("activity_user_id")
device_id = session.get("activity_device_id")
if user_id is None or not device_id:
return

rom_id = await activity_handler.clear_active(int(user_id), device_id)
if rom_id is None:
return

await socket_handler.socket_server.emit(
"activity:clear",
ActivityClearSchema(
user_id=int(user_id), device_id=device_id, rom_id=rom_id
).model_dump(),
)
Loading
Loading