diff --git a/CHANGELOG.md b/CHANGELOG.md index 3cf3b50..7ac58e4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,23 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] ### Added +- **Steam Workshop Integration** (`steam_workshop.py`) + - `search_workshop_items` - Search Workshop mods by game + - Text search and tag filtering support + - Sort by popular, trending, recent, or rating + - Returns subscriber counts, ratings, file sizes + - `search_workshop_collections` - Search Workshop collections by game + - Discover popular modpacks and curated item lists + - Sort by popular, trending, recent, or rating + - Text search support + - Returns collection name, item count, subscriber count + - `get_workshop_item_details` - Get detailed Workshop item information + - Full description, author info, dependencies + - Subscriber/favorite counts, vote breakdown + - Creation and update timestamps + - `get_workshop_collection` - Get items from a Workshop collection + - Collection metadata and item list + - Batch fetches item details - **Family Sharing Integration** (`family_groups.py`) - `get_family_group` - Get family group membership information - Returns family members, roles (Adult/Child/Member), and cooldown status @@ -18,7 +35,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Optional `include_own` parameter to include owned apps ### Changed -- Tool count increased from 30 to 32 +- Tool count increased from 30 to 36 ## [v0.8.0] - 2025-12-11 diff --git a/README.md b/README.md index 1bbaab0..163b30e 100644 --- a/README.md +++ b/README.md @@ -18,7 +18,7 @@ Once set up, you can ask Claude things like: - "Show my pending trade offers" - "What's the current price for an AK-47 Redline?" -The server includes 32 tools covering player profiles, game libraries, achievements, stats, reviews, wishlists, news, community guides, trading/market data, and family sharing. +The server includes 36 tools covering player profiles, game libraries, achievements, stats, reviews, wishlists, news, community guides, trading/market data, family sharing, and Steam Workshop. --- @@ -179,7 +179,7 @@ No registration needed - just drop in the file and restart. --- -## Available Tools (32 total) +## Available Tools (36 total) ### Player Profiles (ISteamUser) - 6 tools @@ -260,6 +260,15 @@ No registration needed - just drop in the file and restart. | `get_family_group` | Get family group membership, members, and roles | | `get_shared_library_apps` | Get games available through family sharing | +### Steam Workshop (IPublishedFileService) - 4 tools + +| Tool | What it does | +|------|--------------| +| `search_workshop_items` | Search Workshop mods by game, with text/tag filters and sorting | +| `search_workshop_collections` | Search Workshop collections (curated item lists) by game with sorting | +| `get_workshop_item_details` | Get full details on a Workshop item (description, subscribers, dependencies) | +| `get_workshop_collection` | Get items from a Workshop collection | + --- ## Configuration Reference diff --git a/src/steam_mcp/endpoints/steam_workshop.py b/src/steam_mcp/endpoints/steam_workshop.py new file mode 100644 index 0000000..b046eca --- /dev/null +++ b/src/steam_mcp/endpoints/steam_workshop.py @@ -0,0 +1,746 @@ +"""Steam Workshop API endpoints. + +This module provides MCP tools for the IPublishedFileService Steam API interface, +which handles Steam Workshop content discovery, search, and details. + +Reference: https://partner.steamgames.com/doc/webapi/IPublishedFileService +""" + +import json +import logging +from datetime import datetime, timezone +from typing import Any + +from steam_mcp.endpoints.base import BaseEndpoint, endpoint + + +logger = logging.getLogger(__name__) + + +# Query type mappings for sorting +QUERY_TYPES = { + "popular": 0, # RankedByVote + "trend": 1, # RankedByTrend + "recent": 2, # RankedByPublicationDate + "rating": 3, # RankedByVoteScore (same as popular but different algorithm) +} + + +def _safe_int(value: Any, default: int = 0) -> int: + """Safely convert value to int, handling strings from API.""" + if value is None: + return default + try: + return int(value) + except (ValueError, TypeError): + return default + + +def _safe_float(value: Any, default: float = 0.0) -> float: + """Safely convert value to float, handling strings from API.""" + if value is None: + return default + try: + return float(value) + except (ValueError, TypeError): + return default + + +def _format_file_size(size_bytes: int | str | None) -> str: + """Format file size in human readable form.""" + try: + size_bytes = int(size_bytes) if size_bytes else 0 + except (ValueError, TypeError): + return "Unknown size" + if size_bytes < 1024: + return f"{size_bytes} B" + elif size_bytes < 1024 * 1024: + return f"{size_bytes / 1024:.1f} KB" + elif size_bytes < 1024 * 1024 * 1024: + return f"{size_bytes / (1024 * 1024):.1f} MB" + else: + return f"{size_bytes / (1024 * 1024 * 1024):.1f} GB" + + +def _format_timestamp(ts: int | str | None) -> str: + """Format Unix timestamp as readable date (UTC).""" + try: + ts = int(ts) if ts else 0 + except (ValueError, TypeError): + return "Unknown" + if ts <= 0: + return "Unknown" + try: + return datetime.fromtimestamp(ts, tz=timezone.utc).strftime("%Y-%m-%d %H:%M UTC") + except (ValueError, OSError): + return "Unknown" + + +class IPublishedFileService(BaseEndpoint): + """Steam Workshop (IPublishedFileService) API endpoints.""" + + @endpoint( + name="search_workshop_items", + description=( + "Search Steam Workshop for mods and community content by game. " + "Returns items with title, author, subscriber count, and rating. " + "Note: Workshop availability varies by game." + ), + supports_json=True, + params={ + "app_id": { + "type": "integer", + "description": "Steam App ID of the game to search Workshop for", + "required": True, + }, + "search_query": { + "type": "string", + "description": "Text search filter (optional)", + "required": False, + }, + "tags": { + "type": "array", + "items": {"type": "string"}, + "description": "Filter by Workshop tags (e.g., 'Maps', 'Weapons', 'Characters')", + "required": False, + }, + "sort_by": { + "type": "string", + "description": "Sort order: 'popular' (most voted), 'trend' (trending), 'recent' (newest), 'rating' (highest rated)", + "required": False, + "default": "popular", + "enum": ["popular", "trend", "recent", "rating"], + }, + "max_results": { + "type": "integer", + "description": "Maximum number of results to return (default: 20, max: 50)", + "required": False, + "default": 20, + "minimum": 1, + "maximum": 50, + }, + }, + ) + async def search_workshop_items( + self, + app_id: int, + search_query: str = "", + tags: list[str] | None = None, + sort_by: str = "popular", + max_results: int = 20, + format: str = "text", + ) -> str: + """Search Steam Workshop for items.""" + # Build query parameters + query_type = QUERY_TYPES.get(sort_by, 0) + + params: dict[str, Any] = { + "appid": app_id, + "query_type": query_type, + "numperpage": min(max_results, 50), + "return_metadata": True, + "return_tags": True, + "return_vote_data": True, + } + + if search_query: + params["search_text"] = search_query + + if tags: + # Steam API expects tags as separate requiredtags[n] params + for i, tag in enumerate(tags): + params[f"requiredtags[{i}]"] = tag + + try: + result = await self.client.get( + "IPublishedFileService", + "QueryFiles", + version=1, + params=params, + ) + except Exception as e: + error_msg = f"Error searching Workshop: {e}" + if format == "json": + return json.dumps({"error": error_msg}) + return error_msg + + response = result.get("response", {}) + items = response.get("publishedfiledetails", []) + total = response.get("total", len(items)) + + if not items: + msg = f"No Workshop items found for app {app_id}." + if search_query: + msg += f" Search query: '{search_query}'" + if tags: + msg += f" Tags: {', '.join(tags)}" + msg += "\n\nThis game may not have Workshop support, or no items match your filters." + if format == "json": + return json.dumps({"error": msg}) + return msg + + if format == "json": + data = { + "app_id": app_id, + "total_results": total, + "returned": len(items), + "sort_by": sort_by, + "items": [ + { + "workshop_id": item.get("publishedfileid"), + "title": item.get("title", "Untitled"), + "description": (item.get("short_description") or item.get("file_description", ""))[:500], + "author_steam_id": item.get("creator"), + "preview_url": item.get("preview_url"), + "subscriber_count": item.get("subscriptions", 0), + "favorited_count": item.get("favorited", 0), + "views": item.get("views", 0), + "vote_data": { + "score": item.get("vote_data", {}).get("score", 0), + "votes_up": item.get("vote_data", {}).get("votes_up", 0), + "votes_down": item.get("vote_data", {}).get("votes_down", 0), + }, + "file_size_bytes": item.get("file_size", 0), + "tags": [t.get("tag") for t in item.get("tags", []) if t.get("tag")], + "time_created": item.get("time_created"), + "time_updated": item.get("time_updated"), + } + for item in items + ], + } + return json.dumps(data, indent=2) + + # Text format + output = [ + f"Steam Workshop Results for App {app_id}", + f"Total: {total} items | Showing: {len(items)} | Sort: {sort_by}", + ] + + if search_query: + output.append(f"Search: '{search_query}'") + if tags: + output.append(f"Tags: {', '.join(tags)}") + output.append("") + + for item in items: + workshop_id = item.get("publishedfileid", "?") + title = item.get("title", "Untitled") + subs = _safe_int(item.get("subscriptions")) + favorites = _safe_int(item.get("favorited")) + file_size = item.get("file_size", 0) + + # Vote data + vote_data = item.get("vote_data", {}) + score = _safe_float(vote_data.get("score")) + rating_pct = f"{score * 100:.0f}%" if score else "N/A" + + # Tags + item_tags = [t.get("tag") for t in item.get("tags", []) if t.get("tag")] + tags_str = ", ".join(item_tags[:3]) if item_tags else "No tags" + if len(item_tags) > 3: + tags_str += f" (+{len(item_tags) - 3})" + + output.append( + f"[{workshop_id}] {title}\n" + f" Subscribers: {subs:,} | Favorites: {favorites:,} | Rating: {rating_pct}\n" + f" Size: {_format_file_size(file_size)} | Tags: {tags_str}" + ) + output.append("") + + return "\n".join(output) + + @endpoint( + name="get_workshop_item_details", + description=( + "Get detailed information about a specific Steam Workshop item. " + "Returns full description, author info, dependencies, changelog, and more." + ), + supports_json=True, + params={ + "workshop_id": { + "type": "string", + "description": "The Workshop item ID (publishedfileid)", + "required": True, + }, + }, + ) + async def get_workshop_item_details( + self, + workshop_id: str, + format: str = "text", + ) -> str: + """Get detailed information about a Workshop item.""" + # Validate input + if not workshop_id or not workshop_id.strip(): + error_msg = "Workshop ID is required" + if format == "json": + return json.dumps({"error": error_msg}) + return error_msg + + workshop_id = workshop_id.strip() + + try: + result = await self.client.get( + "IPublishedFileService", + "GetDetails", + version=1, + params={ + "publishedfileids[0]": workshop_id, + "includetags": True, + "includeadditionalpreviews": True, + "includechildren": True, # Dependencies + "includevotes": True, + "short_description": False, # Get full description + }, + ) + except Exception as e: + error_msg = f"Error fetching Workshop item details: {e}" + if format == "json": + return json.dumps({"error": error_msg}) + return error_msg + + response = result.get("response", {}) + items = response.get("publishedfiledetails", []) + + if not items: + error_msg = f"Workshop item {workshop_id} not found." + if format == "json": + return json.dumps({"error": error_msg}) + return error_msg + + item = items[0] + + # Check for error result (item not found or private) + if _safe_int(item.get("result")) != 1: + error_msg = f"Workshop item {workshop_id} not found or is private." + if format == "json": + return json.dumps({"error": error_msg}) + return error_msg + + if format == "json": + data = { + "workshop_id": item.get("publishedfileid"), + "title": item.get("title", "Untitled"), + "description": item.get("file_description", ""), + "app_id": item.get("consumer_appid"), + "creator_app_id": item.get("creator_appid"), + "author": { + "steam_id": item.get("creator"), + "name": item.get("creator_name"), + }, + "file_url": item.get("file_url"), + "preview_url": item.get("preview_url"), + "file_size_bytes": item.get("file_size", 0), + "subscriber_count": item.get("subscriptions", 0), + "favorited_count": item.get("favorited", 0), + "lifetime_subscriptions": item.get("lifetime_subscriptions", 0), + "lifetime_favorited": item.get("lifetime_favorited", 0), + "views": item.get("views", 0), + "vote_data": { + "score": item.get("vote_data", {}).get("score", 0), + "votes_up": item.get("vote_data", {}).get("votes_up", 0), + "votes_down": item.get("vote_data", {}).get("votes_down", 0), + }, + "tags": [t.get("tag") for t in item.get("tags", [])], + "time_created": item.get("time_created"), + "time_updated": item.get("time_updated"), + "visibility": item.get("visibility"), + "banned": item.get("banned", False), + "language": item.get("language"), + "dependencies": item.get("children", []), + } + return json.dumps(data, indent=2) + + # Text format + title = item.get("title", "Untitled") + description = item.get("file_description", "No description") + app_id = item.get("consumer_appid", "Unknown") + author_name = item.get("creator_name", item.get("creator", "Unknown")) + + subs = _safe_int(item.get("subscriptions")) + favorites = _safe_int(item.get("favorited")) + views = _safe_int(item.get("views")) + file_size = item.get("file_size", 0) + + vote_data = item.get("vote_data", {}) + score = _safe_float(vote_data.get("score")) + votes_up = _safe_int(vote_data.get("votes_up")) + votes_down = _safe_int(vote_data.get("votes_down")) + rating_pct = f"{score * 100:.0f}%" if score else "N/A" + + time_created = item.get("time_created", 0) + time_updated = item.get("time_updated", 0) + + item_tags = [t.get("tag") for t in item.get("tags", [])] + dependencies = item.get("children", []) + + output = [ + f"Workshop Item: {title}", + f"ID: {workshop_id} | App: {app_id}", + f"Author: {author_name}", + "", + f"Subscribers: {subs:,} | Favorites: {favorites:,} | Views: {views:,}", + f"Rating: {rating_pct} ({votes_up:,} up / {votes_down:,} down)", + f"File Size: {_format_file_size(file_size)}", + "", + f"Created: {_format_timestamp(time_created)}", + f"Updated: {_format_timestamp(time_updated)}", + ] + + if item_tags: + output.append(f"Tags: {', '.join(item_tags)}") + + if dependencies: + output.append(f"\nDependencies ({len(dependencies)}):") + for dep in dependencies[:10]: + dep_id = dep.get("publishedfileid", "Unknown") + output.append(f" - {dep_id}") + if len(dependencies) > 10: + output.append(f" ... and {len(dependencies) - 10} more") + + # Truncate description if too long + if len(description) > 1000: + description = description[:1000] + "...\n[Description truncated]" + + output.append(f"\nDescription:\n{description}") + + return "\n".join(output) + + @endpoint( + name="search_workshop_collections", + description=( + "Search Steam Workshop for collections (curated item lists) by game. " + "Returns collections with name, subscriber count, and item count. " + "Use this to discover popular modpacks or curated content lists." + ), + supports_json=True, + params={ + "app_id": { + "type": "integer", + "description": "Steam App ID of the game to search collections for", + "required": True, + }, + "search_query": { + "type": "string", + "description": "Text search filter (optional)", + "required": False, + }, + "tags": { + "type": "array", + "items": {"type": "string"}, + "description": "Filter by Workshop tags (e.g., 'Maps', 'Weapons', 'Characters')", + "required": False, + }, + "sort_by": { + "type": "string", + "description": "Sort order: 'popular' (most voted), 'trend' (trending), 'recent' (newest), 'rating' (highest rated)", + "required": False, + "default": "popular", + "enum": ["popular", "trend", "recent", "rating"], + }, + "max_results": { + "type": "integer", + "description": "Maximum number of results to return (default: 10, max: 50)", + "required": False, + "default": 10, + "minimum": 1, + "maximum": 50, + }, + }, + ) + async def search_workshop_collections( + self, + app_id: int, + search_query: str = "", + tags: list[str] | None = None, + sort_by: str = "popular", + max_results: int = 10, + format: str = "text", + ) -> str: + """Search Steam Workshop for collections.""" + query_type = QUERY_TYPES.get(sort_by, 0) + + params: dict[str, Any] = { + "appid": app_id, + "query_type": query_type, + "filetype": 1, # Collections (1=Collections, 2=Art, 0=Items) + "numperpage": min(max_results, 50), + "return_metadata": True, + "return_tags": True, + "return_vote_data": True, + "return_children": True, # Get item count in collections + } + + if search_query: + params["search_text"] = search_query + + if tags: + for i, tag in enumerate(tags): + params[f"requiredtags[{i}]"] = tag + + try: + result = await self.client.get( + "IPublishedFileService", + "QueryFiles", + version=1, + params=params, + ) + except Exception as e: + error_msg = f"Error searching Workshop collections: {e}" + if format == "json": + return json.dumps({"error": error_msg}) + return error_msg + + response = result.get("response", {}) + collections = response.get("publishedfiledetails", []) + total = _safe_int(response.get("total", len(collections))) + + if not collections: + msg = f"No Workshop collections found for app {app_id}." + if search_query: + msg += f" Search query: '{search_query}'" + if tags: + msg += f" Tags: {', '.join(tags)}" + msg += "\n\nThis game may not have Workshop collections, or no collections match your filters." + if format == "json": + return json.dumps({"error": msg}) + return msg + + if format == "json": + data = { + "app_id": app_id, + "total_results": total, + "returned": len(collections), + "sort_by": sort_by, + "collections": [ + { + "collection_id": coll.get("publishedfileid"), + "name": coll.get("title", "Untitled"), + "description": (coll.get("short_description") or coll.get("file_description", ""))[:300], + "author_steam_id": coll.get("creator"), + "author_name": coll.get("creator_name"), + "preview_url": coll.get("preview_url"), + "subscriber_count": _safe_int(coll.get("subscriptions")), + "favorited_count": _safe_int(coll.get("favorited")), + "item_count": len(coll.get("children", [])), + "vote_data": { + "score": _safe_float(coll.get("vote_data", {}).get("score")), + "votes_up": _safe_int(coll.get("vote_data", {}).get("votes_up")), + "votes_down": _safe_int(coll.get("vote_data", {}).get("votes_down")), + }, + "time_created": _safe_int(coll.get("time_created")), + "time_updated": _safe_int(coll.get("time_updated")), + } + for coll in collections + ], + } + return json.dumps(data, indent=2) + + # Text format + output = [ + f"Steam Workshop Collections for App {app_id}", + f"Total: {total} collections | Showing: {len(collections)} | Sort: {sort_by}", + ] + + if search_query: + output.append(f"Search: '{search_query}'") + if tags: + output.append(f"Tags: {', '.join(tags)}") + output.append("") + + for coll in collections: + coll_id = coll.get("publishedfileid", "?") + name = coll.get("title", "Untitled") + author = coll.get("creator_name", "Unknown") + subs = _safe_int(coll.get("subscriptions")) + item_count = len(coll.get("children", [])) + + vote_data = coll.get("vote_data", {}) + score = _safe_float(vote_data.get("score")) + rating_pct = f"{score * 100:.0f}%" if score else "N/A" + + desc = (coll.get("short_description") or coll.get("file_description", ""))[:100] + if len(desc) == 100: + desc += "..." + + output.append( + f"[{coll_id}] {name}\n" + f" By: {author} | Items: {item_count} | Subscribers: {subs:,} | Rating: {rating_pct}" + ) + if desc: + output.append(f" {desc}") + output.append("") + + return "\n".join(output) + + @endpoint( + name="get_workshop_collection", + description=( + "Get items from a Steam Workshop collection. " + "Returns collection name, description, and list of contained items. " + "Use 'search_workshop_collections' first to find collection IDs. " + "Note: Item details are fetched for up to 50 items." + ), + supports_json=True, + params={ + "collection_id": { + "type": "string", + "description": "The Workshop collection ID (use search_workshop_collections to find IDs)", + "required": True, + }, + }, + ) + async def get_workshop_collection( + self, + collection_id: str, + format: str = "text", + ) -> str: + """Get items from a Workshop collection.""" + # Validate input + if not collection_id or not collection_id.strip(): + error_msg = "Collection ID is required" + if format == "json": + return json.dumps({"error": error_msg}) + return error_msg + + collection_id = collection_id.strip() + + # First, get the collection details + try: + collection_result = await self.client.get( + "IPublishedFileService", + "GetDetails", + version=1, + params={ + "publishedfileids[0]": collection_id, + "includetags": True, + "includechildren": True, # This gives us collection items + }, + ) + except Exception as e: + error_msg = f"Error fetching collection: {e}" + if format == "json": + return json.dumps({"error": error_msg}) + return error_msg + + response = collection_result.get("response", {}) + items = response.get("publishedfiledetails", []) + + if not items: + error_msg = f"Collection {collection_id} not found." + if format == "json": + return json.dumps({"error": error_msg}) + return error_msg + + collection = items[0] + + # Check if this is actually a collection (file_type 2 = collection) + file_type = _safe_int(collection.get("file_type")) + if file_type != 2: + error_msg = f"Item {collection_id} is not a collection (file_type={file_type})." + if format == "json": + return json.dumps({"error": error_msg}) + return error_msg + + # Check result status + if _safe_int(collection.get("result")) != 1: + error_msg = f"Collection {collection_id} not found or is private." + if format == "json": + return json.dumps({"error": error_msg}) + return error_msg + + collection_name = collection.get("title", "Untitled Collection") + collection_desc = collection.get("file_description", "") + children = collection.get("children", []) + + # Get details for all child items (up to 50) + child_ids = [c.get("publishedfileid") for c in children[:50] if c.get("publishedfileid")] + + child_items = [] + if child_ids: + try: + # Build params for batch request + params: dict[str, Any] = { + "includetags": True, + "includevotes": True, + } + for i, cid in enumerate(child_ids): + params[f"publishedfileids[{i}]"] = cid + + child_result = await self.client.get( + "IPublishedFileService", + "GetDetails", + version=1, + params=params, + ) + child_items = child_result.get("response", {}).get("publishedfiledetails", []) + except Exception as e: + # If batch fetch fails, continue with just collection info + logger.warning(f"Failed to fetch collection child items: {e}") + + if format == "json": + data = { + "collection_id": collection_id, + "name": collection_name, + "description": collection_desc, + "app_id": collection.get("consumer_appid"), + "author": { + "steam_id": collection.get("creator"), + "name": collection.get("creator_name"), + }, + "subscriber_count": collection.get("subscriptions", 0), + "item_count": len(children), + "items": [ + { + "workshop_id": item.get("publishedfileid"), + "title": item.get("title", "Untitled"), + "subscriber_count": _safe_int(item.get("subscriptions")), + "vote_score": _safe_float(item.get("vote_data", {}).get("score")), + "file_size_bytes": _safe_int(item.get("file_size")), + } + for item in child_items + if _safe_int(item.get("result")) == 1 # Only include successful results + ], + } + if len(children) > 50: + data["truncated"] = True + data["total_items"] = len(children) + return json.dumps(data, indent=2) + + # Text format + collection_subs = _safe_int(collection.get("subscriptions")) + output = [ + f"Workshop Collection: {collection_name}", + f"ID: {collection_id} | App: {collection.get('consumer_appid', 'Unknown')}", + f"Author: {collection.get('creator_name', collection.get('creator', 'Unknown'))}", + f"Subscribers: {collection_subs:,}", + f"Items in collection: {len(children)}", + ] + + if collection_desc: + desc_preview = collection_desc[:300] + if len(collection_desc) > 300: + desc_preview += "..." + output.append(f"\nDescription: {desc_preview}") + + output.append(f"\nCollection Items ({len(child_items)} loaded):") + output.append("") + + for item in child_items: + if _safe_int(item.get("result")) != 1: + continue + + item_id = item.get("publishedfileid", "?") + item_title = item.get("title", "Untitled") + item_subs = _safe_int(item.get("subscriptions")) + item_size = item.get("file_size", 0) + + output.append( + f" [{item_id}] {item_title}\n" + f" Subscribers: {item_subs:,} | Size: {_format_file_size(item_size)}" + ) + + if len(children) > 50: + output.append(f"\n ... and {len(children) - 50} more items") + + return "\n".join(output) diff --git a/tests/test_steam_workshop.py b/tests/test_steam_workshop.py new file mode 100644 index 0000000..340e2ed --- /dev/null +++ b/tests/test_steam_workshop.py @@ -0,0 +1,938 @@ +"""Tests for Steam Workshop endpoint (IPublishedFileService).""" + +import json +import pytest +from unittest.mock import AsyncMock, MagicMock + +from steam_mcp.endpoints.steam_workshop import IPublishedFileService + + +@pytest.fixture +def mock_client(): + """Create mock Steam client.""" + client = MagicMock() + client.owner_steam_id = None + client.get = AsyncMock() + return client + + +@pytest.fixture +def workshop_service(mock_client): + """Create IPublishedFileService instance with mock client.""" + return IPublishedFileService(mock_client) + + +# --- search_workshop_items tests --- + + +class TestSearchWorkshopItems: + """Tests for search_workshop_items endpoint.""" + + @pytest.mark.asyncio + async def test_search_returns_items(self, workshop_service, mock_client): + """Should return Workshop items for a game.""" + mock_client.get.return_value = { + "response": { + "total": 2, + "publishedfiledetails": [ + { + "publishedfileid": "123456", + "title": "Test Mod", + "creator": "76561198000000001", + "subscriptions": 1000, + "favorited": 500, + "views": 5000, + "file_size": 1048576, # 1 MB + "vote_data": {"score": 0.95, "votes_up": 100, "votes_down": 5}, + "tags": [{"tag": "Maps"}, {"tag": "Multiplayer"}], + "time_created": 1609459200, + "time_updated": 1704067200, + }, + { + "publishedfileid": "789012", + "title": "Another Mod", + "creator": "76561198000000002", + "subscriptions": 500, + "favorited": 200, + "views": 2000, + "file_size": 524288, # 512 KB + "vote_data": {"score": 0.80, "votes_up": 80, "votes_down": 20}, + "tags": [{"tag": "Weapons"}], + "time_created": 1609459200, + "time_updated": 1704067200, + }, + ], + } + } + + result = await workshop_service.search_workshop_items(app_id=730) + + assert "Test Mod" in result + assert "Another Mod" in result + assert "1,000" in result # Subscriber count + assert "Maps" in result + + @pytest.mark.asyncio + async def test_search_with_query(self, workshop_service, mock_client): + """Should pass search query to API.""" + mock_client.get.return_value = { + "response": {"total": 0, "publishedfiledetails": []} + } + + await workshop_service.search_workshop_items(app_id=730, search_query="awp skin") + + call_args = mock_client.get.call_args + assert call_args[1]["params"]["search_text"] == "awp skin" + + @pytest.mark.asyncio + async def test_search_with_tags(self, workshop_service, mock_client): + """Should pass tags to API.""" + mock_client.get.return_value = { + "response": {"total": 0, "publishedfiledetails": []} + } + + await workshop_service.search_workshop_items(app_id=730, tags=["Maps", "Competitive"]) + + call_args = mock_client.get.call_args + assert call_args[1]["params"]["requiredtags[0]"] == "Maps" + assert call_args[1]["params"]["requiredtags[1]"] == "Competitive" + + @pytest.mark.asyncio + async def test_search_empty_results(self, workshop_service, mock_client): + """Should handle no results gracefully.""" + mock_client.get.return_value = { + "response": {"total": 0, "publishedfiledetails": []} + } + + result = await workshop_service.search_workshop_items(app_id=99999) + + assert "No Workshop items found" in result + assert "may not have Workshop support" in result + + @pytest.mark.asyncio + async def test_search_json_format(self, workshop_service, mock_client): + """Should return JSON when format='json'.""" + mock_client.get.return_value = { + "response": { + "total": 1, + "publishedfiledetails": [ + { + "publishedfileid": "123", + "title": "Test", + "subscriptions": 100, + "file_size": 1024, + "vote_data": {"score": 0.9}, + "tags": [], + } + ], + } + } + + result = await workshop_service.search_workshop_items(app_id=730, format="json") + + data = json.loads(result) + assert data["app_id"] == 730 + assert data["total_results"] == 1 + assert len(data["items"]) == 1 + assert data["items"][0]["workshop_id"] == "123" + + @pytest.mark.asyncio + async def test_search_sort_by_options(self, workshop_service, mock_client): + """Should map sort_by to correct query_type.""" + mock_client.get.return_value = { + "response": {"total": 0, "publishedfiledetails": []} + } + + # Test different sort options + for sort_by, expected_type in [("popular", 0), ("trend", 1), ("recent", 2), ("rating", 3)]: + await workshop_service.search_workshop_items(app_id=730, sort_by=sort_by) + call_args = mock_client.get.call_args + assert call_args[1]["params"]["query_type"] == expected_type + + @pytest.mark.asyncio + async def test_search_max_results_limit(self, workshop_service, mock_client): + """Should cap max_results at 50.""" + mock_client.get.return_value = { + "response": {"total": 0, "publishedfiledetails": []} + } + + await workshop_service.search_workshop_items(app_id=730, max_results=100) + + call_args = mock_client.get.call_args + assert call_args[1]["params"]["numperpage"] == 50 + + @pytest.mark.asyncio + async def test_search_api_error(self, workshop_service, mock_client): + """Should handle API errors gracefully.""" + mock_client.get.side_effect = Exception("API connection failed") + + result = await workshop_service.search_workshop_items(app_id=730) + + assert "Error" in result + assert "API connection failed" in result + + +# --- search_workshop_collections tests --- + + +class TestSearchWorkshopCollections: + """Tests for search_workshop_collections endpoint.""" + + @pytest.mark.asyncio + async def test_search_returns_collections(self, workshop_service, mock_client): + """Should return Workshop collections for a game.""" + mock_client.get.return_value = { + "response": { + "total": 2, + "publishedfiledetails": [ + { + "publishedfileid": "111111", + "title": "Ultimate Modpack", + "file_description": "Collection of essential mods", + "creator": "76561198000000001", + "creator_name": "CollectorOne", + "subscriptions": 10000, + "favorited": 5000, + "file_type": 2, # Collection + "vote_data": {"score": 0.95, "votes_up": 950, "votes_down": 50}, + "children": [ + {"publishedfileid": "1"}, + {"publishedfileid": "2"}, + {"publishedfileid": "3"}, + ], + "time_created": 1609459200, + "time_updated": 1704067200, + }, + { + "publishedfileid": "222222", + "title": "Starter Pack", + "file_description": "Beginner collection", + "creator": "76561198000000002", + "creator_name": "CollectorTwo", + "subscriptions": 5000, + "favorited": 2000, + "file_type": 2, + "vote_data": {"score": 0.80, "votes_up": 80, "votes_down": 20}, + "children": [{"publishedfileid": "4"}], + "time_created": 1609459200, + "time_updated": 1704067200, + }, + ], + } + } + + result = await workshop_service.search_workshop_collections(app_id=730) + + assert "Ultimate Modpack" in result + assert "Starter Pack" in result + assert "10,000" in result # Subscriber count + assert "Items: 3" in result # Item count + assert "CollectorOne" in result + + @pytest.mark.asyncio + async def test_search_collections_with_query(self, workshop_service, mock_client): + """Should pass search query to API.""" + mock_client.get.return_value = { + "response": {"total": 0, "publishedfiledetails": []} + } + + await workshop_service.search_workshop_collections( + app_id=730, search_query="weapon skins" + ) + + call_args = mock_client.get.call_args + assert call_args[1]["params"]["search_text"] == "weapon skins" + assert call_args[1]["params"]["filetype"] == 1 # Collections only + + @pytest.mark.asyncio + async def test_search_collections_with_tags(self, workshop_service, mock_client): + """Should pass tags to API.""" + mock_client.get.return_value = { + "response": {"total": 0, "publishedfiledetails": []} + } + + await workshop_service.search_workshop_collections( + app_id=730, tags=["Maps", "Competitive"] + ) + + call_args = mock_client.get.call_args + assert call_args[1]["params"]["requiredtags[0]"] == "Maps" + assert call_args[1]["params"]["requiredtags[1]"] == "Competitive" + assert call_args[1]["params"]["filetype"] == 1 # Collections only + + @pytest.mark.asyncio + async def test_search_collections_empty_results(self, workshop_service, mock_client): + """Should handle no results gracefully.""" + mock_client.get.return_value = { + "response": {"total": 0, "publishedfiledetails": []} + } + + result = await workshop_service.search_workshop_collections(app_id=99999) + + assert "No Workshop collections found" in result + assert "may not have Workshop collections" in result + + @pytest.mark.asyncio + async def test_search_collections_json_format(self, workshop_service, mock_client): + """Should return JSON when format='json'.""" + mock_client.get.return_value = { + "response": { + "total": 1, + "publishedfiledetails": [ + { + "publishedfileid": "123", + "title": "Test Collection", + "file_description": "A test", + "creator": "76561198000000001", + "creator_name": "Tester", + "subscriptions": 100, + "favorited": 50, + "file_type": 2, + "vote_data": {"score": 0.9, "votes_up": 90, "votes_down": 10}, + "children": [{"publishedfileid": "1"}, {"publishedfileid": "2"}], + "time_created": 1609459200, + "time_updated": 1704067200, + } + ], + } + } + + result = await workshop_service.search_workshop_collections( + app_id=730, format="json" + ) + + data = json.loads(result) + assert data["app_id"] == 730 + assert data["total_results"] == 1 + assert len(data["collections"]) == 1 + assert data["collections"][0]["collection_id"] == "123" + assert data["collections"][0]["name"] == "Test Collection" + assert data["collections"][0]["item_count"] == 2 + + @pytest.mark.asyncio + async def test_search_collections_sort_options(self, workshop_service, mock_client): + """Should map sort_by to correct query_type.""" + mock_client.get.return_value = { + "response": {"total": 0, "publishedfiledetails": []} + } + + for sort_by, expected_type in [ + ("popular", 0), + ("trend", 1), + ("recent", 2), + ("rating", 3), + ]: + await workshop_service.search_workshop_collections( + app_id=730, sort_by=sort_by + ) + call_args = mock_client.get.call_args + assert call_args[1]["params"]["query_type"] == expected_type + + @pytest.mark.asyncio + async def test_search_collections_max_results_limit( + self, workshop_service, mock_client + ): + """Should cap max_results at 50.""" + mock_client.get.return_value = { + "response": {"total": 0, "publishedfiledetails": []} + } + + await workshop_service.search_workshop_collections(app_id=730, max_results=100) + + call_args = mock_client.get.call_args + assert call_args[1]["params"]["numperpage"] == 50 + + @pytest.mark.asyncio + async def test_search_collections_api_error(self, workshop_service, mock_client): + """Should handle API errors gracefully.""" + mock_client.get.side_effect = Exception("API connection failed") + + result = await workshop_service.search_workshop_collections(app_id=730) + + assert "Error" in result + assert "API connection failed" in result + + @pytest.mark.asyncio + async def test_search_collections_handles_string_values( + self, workshop_service, mock_client + ): + """Should handle string numeric values from API.""" + mock_client.get.return_value = { + "response": { + "total": "1", # String + "publishedfiledetails": [ + { + "publishedfileid": "123", + "title": "String Collection", + "subscriptions": "5000", # String + "favorited": "250", # String + "file_type": "2", # String + "vote_data": {"score": "0.95", "votes_up": "95", "votes_down": "5"}, + "children": [{"publishedfileid": "1"}], + "time_created": "1609459200", + "time_updated": "1704067200", + } + ], + } + } + + result = await workshop_service.search_workshop_collections(app_id=730) + + assert "String Collection" in result + assert "5,000" in result # Formatted with comma + + +# --- get_workshop_item_details tests --- + + +class TestGetWorkshopItemDetails: + """Tests for get_workshop_item_details endpoint.""" + + @pytest.mark.asyncio + async def test_get_details_success(self, workshop_service, mock_client): + """Should return item details.""" + mock_client.get.return_value = { + "response": { + "publishedfiledetails": [ + { + "result": 1, + "publishedfileid": "123456", + "title": "Awesome Mod", + "file_description": "This is a great mod for the game.", + "consumer_appid": 730, + "creator": "76561198000000001", + "creator_name": "ModAuthor", + "subscriptions": 10000, + "favorited": 5000, + "views": 50000, + "file_size": 52428800, # 50 MB + "vote_data": {"score": 0.95, "votes_up": 950, "votes_down": 50}, + "tags": [{"tag": "Gameplay"}, {"tag": "Realism"}], + "time_created": 1609459200, + "time_updated": 1704067200, + "children": [{"publishedfileid": "111"}, {"publishedfileid": "222"}], + } + ] + } + } + + result = await workshop_service.get_workshop_item_details(workshop_id="123456") + + assert "Awesome Mod" in result + assert "ModAuthor" in result + assert "10,000" in result # Subscribers + assert "95%" in result # Rating + assert "50.0 MB" in result + assert "Dependencies (2)" in result + + @pytest.mark.asyncio + async def test_get_details_not_found(self, workshop_service, mock_client): + """Should handle item not found.""" + mock_client.get.return_value = {"response": {"publishedfiledetails": []}} + + result = await workshop_service.get_workshop_item_details(workshop_id="999999") + + assert "not found" in result.lower() + + @pytest.mark.asyncio + async def test_get_details_private_item(self, workshop_service, mock_client): + """Should handle private/invalid items.""" + mock_client.get.return_value = { + "response": { + "publishedfiledetails": [ + {"result": 9, "publishedfileid": "123456"} # Result 9 = private/deleted + ] + } + } + + result = await workshop_service.get_workshop_item_details(workshop_id="123456") + + assert "not found or is private" in result.lower() + + @pytest.mark.asyncio + async def test_get_details_json_format(self, workshop_service, mock_client): + """Should return JSON when format='json'.""" + mock_client.get.return_value = { + "response": { + "publishedfiledetails": [ + { + "result": 1, + "publishedfileid": "123", + "title": "Test", + "file_description": "Description", + "consumer_appid": 730, + "creator": "76561198000000001", + "subscriptions": 100, + "file_size": 1024, + "vote_data": {"score": 0.9, "votes_up": 90, "votes_down": 10}, + "tags": [], + "time_created": 1609459200, + "time_updated": 1704067200, + } + ] + } + } + + result = await workshop_service.get_workshop_item_details( + workshop_id="123", format="json" + ) + + data = json.loads(result) + assert data["workshop_id"] == "123" + assert data["title"] == "Test" + assert data["subscriber_count"] == 100 + assert data["vote_data"]["score"] == 0.9 + + @pytest.mark.asyncio + async def test_get_details_truncates_long_description(self, workshop_service, mock_client): + """Should truncate very long descriptions.""" + long_desc = "A" * 2000 + mock_client.get.return_value = { + "response": { + "publishedfiledetails": [ + { + "result": 1, + "publishedfileid": "123", + "title": "Test", + "file_description": long_desc, + "consumer_appid": 730, + "creator": "76561198000000001", + "subscriptions": 100, + "file_size": 1024, + "vote_data": {}, + "tags": [], + } + ] + } + } + + result = await workshop_service.get_workshop_item_details(workshop_id="123") + + assert "[Description truncated]" in result + assert len(result) < len(long_desc) + 500 # Should be much shorter + + +# --- get_workshop_collection tests --- + + +class TestGetWorkshopCollection: + """Tests for get_workshop_collection endpoint.""" + + @pytest.mark.asyncio + async def test_get_collection_success(self, workshop_service, mock_client): + """Should return collection with items.""" + # First call: get collection details + # Second call: get child item details + mock_client.get.side_effect = [ + { + "response": { + "publishedfiledetails": [ + { + "result": 1, + "publishedfileid": "999999", + "title": "My Mod Collection", + "file_description": "A collection of great mods.", + "file_type": 2, # Collection type + "consumer_appid": 730, + "creator": "76561198000000001", + "creator_name": "Collector", + "subscriptions": 5000, + "children": [ + {"publishedfileid": "111"}, + {"publishedfileid": "222"}, + ], + } + ] + } + }, + { + "response": { + "publishedfiledetails": [ + { + "result": 1, + "publishedfileid": "111", + "title": "Mod One", + "subscriptions": 1000, + "file_size": 1024, + "vote_data": {"score": 0.9}, + }, + { + "result": 1, + "publishedfileid": "222", + "title": "Mod Two", + "subscriptions": 2000, + "file_size": 2048, + "vote_data": {"score": 0.8}, + }, + ] + } + }, + ] + + result = await workshop_service.get_workshop_collection(collection_id="999999") + + assert "My Mod Collection" in result + assert "Collector" in result + assert "Items in collection: 2" in result + assert "Mod One" in result + assert "Mod Two" in result + + @pytest.mark.asyncio + async def test_get_collection_not_found(self, workshop_service, mock_client): + """Should handle collection not found.""" + mock_client.get.return_value = {"response": {"publishedfiledetails": []}} + + result = await workshop_service.get_workshop_collection(collection_id="999999") + + assert "not found" in result.lower() + + @pytest.mark.asyncio + async def test_get_collection_not_a_collection(self, workshop_service, mock_client): + """Should reject non-collection items.""" + mock_client.get.return_value = { + "response": { + "publishedfiledetails": [ + { + "result": 1, + "publishedfileid": "123", + "title": "Regular Mod", + "file_type": 0, # Not a collection + } + ] + } + } + + result = await workshop_service.get_workshop_collection(collection_id="123") + + assert "not a collection" in result.lower() + + @pytest.mark.asyncio + async def test_get_collection_json_format(self, workshop_service, mock_client): + """Should return JSON when format='json'.""" + mock_client.get.side_effect = [ + { + "response": { + "publishedfiledetails": [ + { + "result": 1, + "publishedfileid": "999", + "title": "Collection", + "file_description": "Desc", + "file_type": 2, + "consumer_appid": 730, + "creator": "76561198000000001", + "subscriptions": 100, + "children": [{"publishedfileid": "111"}], + } + ] + } + }, + { + "response": { + "publishedfiledetails": [ + { + "result": 1, + "publishedfileid": "111", + "title": "Item", + "subscriptions": 50, + "file_size": 1024, + "vote_data": {"score": 0.9}, + } + ] + } + }, + ] + + result = await workshop_service.get_workshop_collection( + collection_id="999", format="json" + ) + + data = json.loads(result) + assert data["collection_id"] == "999" + assert data["name"] == "Collection" + assert data["item_count"] == 1 + assert len(data["items"]) == 1 + + @pytest.mark.asyncio + async def test_get_collection_handles_batch_fetch_failure( + self, workshop_service, mock_client + ): + """Should still return collection info if batch item fetch fails.""" + mock_client.get.side_effect = [ + { + "response": { + "publishedfiledetails": [ + { + "result": 1, + "publishedfileid": "999", + "title": "Collection", + "file_type": 2, + "consumer_appid": 730, + "creator": "76561198000000001", + "subscriptions": 100, + "children": [{"publishedfileid": "111"}], + } + ] + } + }, + Exception("Batch fetch failed"), # Second call fails + ] + + result = await workshop_service.get_workshop_collection(collection_id="999") + + # Should still return collection info even if item fetch failed + assert "Collection" in result + assert "Items in collection: 1" in result + + +# --- Helper function tests --- + + +class TestHelperFunctions: + """Tests for module helper functions.""" + + def test_format_file_size(self): + """Test file size formatting.""" + from steam_mcp.endpoints.steam_workshop import _format_file_size + + assert _format_file_size(500) == "500 B" + assert _format_file_size(1024) == "1.0 KB" + assert _format_file_size(1048576) == "1.0 MB" + assert _format_file_size(1073741824) == "1.0 GB" + assert _format_file_size(52428800) == "50.0 MB" + + def test_format_timestamp(self): + """Test timestamp formatting.""" + from steam_mcp.endpoints.steam_workshop import _format_timestamp + + assert _format_timestamp(0) == "Unknown" + assert "2021-01-01" in _format_timestamp(1609459200) + assert "UTC" in _format_timestamp(1609459200) + + def test_format_timestamp_negative(self): + """Test timestamp formatting with negative value.""" + from steam_mcp.endpoints.steam_workshop import _format_timestamp + + assert _format_timestamp(-1) == "Unknown" + + +class TestInputValidation: + """Tests for input validation.""" + + @pytest.mark.asyncio + async def test_empty_workshop_id_returns_error(self, workshop_service): + """Empty workshop_id should return error.""" + result = await workshop_service.get_workshop_item_details(workshop_id="") + assert "Workshop ID is required" in result + + @pytest.mark.asyncio + async def test_whitespace_workshop_id_returns_error(self, workshop_service): + """Whitespace-only workshop_id should return error.""" + result = await workshop_service.get_workshop_item_details(workshop_id=" ") + assert "Workshop ID is required" in result + + @pytest.mark.asyncio + async def test_empty_workshop_id_json_format(self, workshop_service): + """Empty workshop_id should return JSON error when format='json'.""" + result = await workshop_service.get_workshop_item_details( + workshop_id="", format="json" + ) + data = json.loads(result) + assert "error" in data + assert "Workshop ID is required" in data["error"] + + @pytest.mark.asyncio + async def test_empty_collection_id_returns_error(self, workshop_service): + """Empty collection_id should return error.""" + result = await workshop_service.get_workshop_collection(collection_id="") + assert "Collection ID is required" in result + + @pytest.mark.asyncio + async def test_whitespace_collection_id_returns_error(self, workshop_service): + """Whitespace-only collection_id should return error.""" + result = await workshop_service.get_workshop_collection(collection_id=" ") + assert "Collection ID is required" in result + + +class TestTagFiltering: + """Tests for tag filtering edge cases.""" + + @pytest.mark.asyncio + async def test_tags_with_missing_key_filtered(self, workshop_service, mock_client): + """Tags missing the 'tag' key should be filtered out.""" + mock_client.get.return_value = { + "response": { + "total": 1, + "publishedfiledetails": [ + { + "publishedfileid": "123", + "title": "Test", + "subscriptions": 100, + "file_size": 1024, + "vote_data": {"score": 0.9}, + # Tags with missing 'tag' key + "tags": [{"tag": "Valid"}, {"other": "NoTag"}, {"tag": None}], + } + ], + } + } + + result = await workshop_service.search_workshop_items(app_id=730, format="json") + data = json.loads(result) + + # Only "Valid" should be in tags, others should be filtered + assert data["items"][0]["tags"] == ["Valid"] + + +class TestStringTypeCoercion: + """Tests for handling string values from Steam API (real API returns strings for some numeric fields).""" + + @pytest.mark.asyncio + async def test_search_handles_string_numeric_values(self, workshop_service, mock_client): + """API may return numeric fields as strings - should not raise TypeError.""" + mock_client.get.return_value = { + "response": { + "total": 1, + "publishedfiledetails": [ + { + "publishedfileid": "123456", + "title": "String Values Mod", + "subscriptions": "5000", # String instead of int + "favorited": "250", # String instead of int + "file_size": "1048576", # String instead of int + "vote_data": {"score": "0.95"}, # String instead of float + "tags": [{"tag": "Maps"}], + "time_created": "1609459200", # String instead of int + } + ], + } + } + + # Should not raise "'<' not supported between instances of 'str' and 'int'" + result = await workshop_service.search_workshop_items(app_id=730) + + assert "String Values Mod" in result + assert "5,000" in result # Formatted with comma + assert "1.0 MB" in result # File size formatted + + @pytest.mark.asyncio + async def test_item_details_handles_string_values(self, workshop_service, mock_client): + """get_workshop_item_details should handle string numeric values.""" + mock_client.get.return_value = { + "response": { + "publishedfiledetails": [ + { + "result": "1", # String instead of int + "publishedfileid": "999", + "title": "Test Item", + "file_description": "A test item", + "consumer_appid": "730", + "creator": "76561198000000001", + "subscriptions": "10000", + "favorited": "500", + "views": "50000", + "file_size": "2097152", + "vote_data": { + "score": "0.88", + "votes_up": "100", + "votes_down": "10", + }, + "time_created": "1609459200", + "time_updated": "1704067200", + "tags": [], + } + ] + } + } + + result = await workshop_service.get_workshop_item_details(workshop_id="999") + + assert "Test Item" in result + assert "10,000" in result # subscribers formatted + assert "2.0 MB" in result # file size + assert "88%" in result # rating + + @pytest.mark.asyncio + async def test_collection_handles_string_values(self, workshop_service, mock_client): + """get_workshop_collection should handle string numeric values.""" + mock_client.get.side_effect = [ + { + "response": { + "publishedfiledetails": [ + { + "result": "1", + "publishedfileid": "888", + "title": "Test Collection", + "file_description": "Collection desc", + "file_type": "2", # String instead of int + "consumer_appid": "730", + "creator": "76561198000000001", + "subscriptions": "3000", + "children": [{"publishedfileid": "111"}], + } + ] + } + }, + { + "response": { + "publishedfiledetails": [ + { + "result": "1", + "publishedfileid": "111", + "title": "Child Item", + "subscriptions": "1500", + "file_size": "512000", + "vote_data": {"score": "0.75"}, + } + ] + } + }, + ] + + result = await workshop_service.get_workshop_collection(collection_id="888") + + assert "Test Collection" in result + assert "3,000" in result # collection subscribers + assert "Child Item" in result + assert "1,500" in result # child item subscribers + + def test_safe_int_function(self): + """Test _safe_int helper function.""" + from steam_mcp.endpoints.steam_workshop import _safe_int + + assert _safe_int("123") == 123 + assert _safe_int(456) == 456 + assert _safe_int(None) == 0 + assert _safe_int("") == 0 + assert _safe_int("invalid") == 0 + assert _safe_int(None, default=99) == 99 + + def test_safe_float_function(self): + """Test _safe_float helper function.""" + from steam_mcp.endpoints.steam_workshop import _safe_float + + assert _safe_float("0.95") == 0.95 + assert _safe_float(0.88) == 0.88 + assert _safe_float(None) == 0.0 + assert _safe_float("") == 0.0 + assert _safe_float("invalid") == 0.0 + assert _safe_float(None, default=1.0) == 1.0 + + def test_format_file_size_string_input(self): + """_format_file_size should handle string input.""" + from steam_mcp.endpoints.steam_workshop import _format_file_size + + assert _format_file_size("1048576") == "1.0 MB" + assert _format_file_size("") == "0 B" + assert _format_file_size(None) == "0 B" + assert _format_file_size("invalid") == "Unknown size" + + def test_format_timestamp_string_input(self): + """_format_timestamp should handle string input.""" + from steam_mcp.endpoints.steam_workshop import _format_timestamp + + assert "2021-01-01" in _format_timestamp("1609459200") + assert _format_timestamp("0") == "Unknown" + assert _format_timestamp("") == "Unknown" + assert _format_timestamp(None) == "Unknown" + assert _format_timestamp("invalid") == "Unknown"