From f6fd070ae01fb15caa783d0090f37f05647687af Mon Sep 17 00:00:00 2001 From: CodeKeanu <85296798+CodeKeanu@users.noreply.github.com> Date: Fri, 12 Dec 2025 12:04:33 +0000 Subject: [PATCH 1/6] feat: add Steam Workshop integration (#5) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add new steam_workshop.py endpoint module with 3 tools: - search_workshop_items: Search Workshop mods by game with text/tag filtering - get_workshop_item_details: Get detailed Workshop item information - get_workshop_collection: Get items from a Workshop collection Features: - Text search and tag filtering support - Sort by popular, trending, recent, or rating - JSON and text output formats - Graceful handling of games without Workshop support - Comprehensive error handling Also includes: - 20 unit tests with mocked API responses - README updated (32 → 35 tools) - CHANGELOG entry added 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- CHANGELOG.md | 14 +- README.md | 12 +- src/steam_mcp/endpoints/steam_workshop.py | 526 ++++++++++++++++++++++ tests/test_steam_workshop.py | 499 ++++++++++++++++++++ 4 files changed, 1048 insertions(+), 3 deletions(-) create mode 100644 src/steam_mcp/endpoints/steam_workshop.py create mode 100644 tests/test_steam_workshop.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 3cf3b50..6333137 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,18 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] ### Added +- **Steam Workshop Integration** (`steam_workshop.py`) + - `search_workshop_items` - Search Workshop mods by game + - Text search and tag filtering support + - Sort by popular, trending, recent, or rating + - Returns subscriber counts, ratings, file sizes + - `get_workshop_item_details` - Get detailed Workshop item information + - Full description, author info, dependencies + - Subscriber/favorite counts, vote breakdown + - Creation and update timestamps + - `get_workshop_collection` - Get items from a Workshop collection + - Collection metadata and item list + - Batch fetches item details - **Family Sharing Integration** (`family_groups.py`) - `get_family_group` - Get family group membership information - Returns family members, roles (Adult/Child/Member), and cooldown status @@ -18,7 +30,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Optional `include_own` parameter to include owned apps ### Changed -- Tool count increased from 30 to 32 +- Tool count increased from 30 to 35 ## [v0.8.0] - 2025-12-11 diff --git a/README.md b/README.md index 1bbaab0..84895eb 100644 --- a/README.md +++ b/README.md @@ -18,7 +18,7 @@ Once set up, you can ask Claude things like: - "Show my pending trade offers" - "What's the current price for an AK-47 Redline?" -The server includes 32 tools covering player profiles, game libraries, achievements, stats, reviews, wishlists, news, community guides, trading/market data, and family sharing. +The server includes 35 tools covering player profiles, game libraries, achievements, stats, reviews, wishlists, news, community guides, trading/market data, family sharing, and Steam Workshop. --- @@ -179,7 +179,7 @@ No registration needed - just drop in the file and restart. --- -## Available Tools (32 total) +## Available Tools (35 total) ### Player Profiles (ISteamUser) - 6 tools @@ -260,6 +260,14 @@ No registration needed - just drop in the file and restart. | `get_family_group` | Get family group membership, members, and roles | | `get_shared_library_apps` | Get games available through family sharing | +### Steam Workshop (IPublishedFileService) - 3 tools + +| Tool | What it does | +|------|--------------| +| `search_workshop_items` | Search Workshop mods by game, with text/tag filters and sorting | +| `get_workshop_item_details` | Get full details on a Workshop item (description, subscribers, dependencies) | +| `get_workshop_collection` | Get items from a Workshop collection | + --- ## Configuration Reference diff --git a/src/steam_mcp/endpoints/steam_workshop.py b/src/steam_mcp/endpoints/steam_workshop.py new file mode 100644 index 0000000..eb434b1 --- /dev/null +++ b/src/steam_mcp/endpoints/steam_workshop.py @@ -0,0 +1,526 @@ +"""Steam Workshop API endpoints. + +This module provides MCP tools for the IPublishedFileService Steam API interface, +which handles Steam Workshop content discovery, search, and details. + +Reference: https://partner.steamgames.com/doc/webapi/IPublishedFileService +""" + +import json +from datetime import datetime +from typing import Any + +from steam_mcp.endpoints.base import BaseEndpoint, endpoint + + +# Query type mappings for sorting +QUERY_TYPES = { + "popular": 0, # RankedByVote + "trend": 1, # RankedByTrend + "recent": 2, # RankedByPublicationDate + "rating": 3, # RankedByVoteScore (same as popular but different algorithm) +} + + +def _format_file_size(size_bytes: int) -> str: + """Format file size in human readable form.""" + if size_bytes < 1024: + return f"{size_bytes} B" + elif size_bytes < 1024 * 1024: + return f"{size_bytes / 1024:.1f} KB" + elif size_bytes < 1024 * 1024 * 1024: + return f"{size_bytes / (1024 * 1024):.1f} MB" + else: + return f"{size_bytes / (1024 * 1024 * 1024):.1f} GB" + + +def _format_timestamp(ts: int) -> str: + """Format Unix timestamp as readable date.""" + if ts == 0: + return "Unknown" + try: + return datetime.fromtimestamp(ts).strftime("%Y-%m-%d %H:%M") + except (ValueError, OSError): + return "Unknown" + + +class IPublishedFileService(BaseEndpoint): + """Steam Workshop (IPublishedFileService) API endpoints.""" + + @endpoint( + name="search_workshop_items", + description=( + "Search Steam Workshop for mods and community content by game. " + "Returns items with title, author, subscriber count, and rating. " + "Note: Workshop availability varies by game." + ), + supports_json=True, + params={ + "app_id": { + "type": "integer", + "description": "Steam App ID of the game to search Workshop for", + "required": True, + }, + "search_query": { + "type": "string", + "description": "Text search filter (optional)", + "required": False, + }, + "tags": { + "type": "array", + "items": {"type": "string"}, + "description": "Filter by Workshop tags (e.g., 'Maps', 'Weapons', 'Characters')", + "required": False, + }, + "sort_by": { + "type": "string", + "description": "Sort order: 'popular' (most voted), 'trend' (trending), 'recent' (newest), 'rating' (highest rated)", + "required": False, + "default": "popular", + "enum": ["popular", "trend", "recent", "rating"], + }, + "max_results": { + "type": "integer", + "description": "Maximum number of results to return (default: 20, max: 50)", + "required": False, + "default": 20, + "minimum": 1, + "maximum": 50, + }, + }, + ) + async def search_workshop_items( + self, + app_id: int, + search_query: str = "", + tags: list[str] | None = None, + sort_by: str = "popular", + max_results: int = 20, + format: str = "text", + ) -> str: + """Search Steam Workshop for items.""" + # Build query parameters + query_type = QUERY_TYPES.get(sort_by, 0) + + params: dict[str, Any] = { + "appid": app_id, + "query_type": query_type, + "numperpage": min(max_results, 50), + "return_metadata": True, + "return_tags": True, + "return_vote_data": True, + } + + if search_query: + params["search_text"] = search_query + + if tags: + # Steam API expects tags as separate requiredtags[n] params + for i, tag in enumerate(tags): + params[f"requiredtags[{i}]"] = tag + + try: + result = await self.client.get( + "IPublishedFileService", + "QueryFiles", + version=1, + params=params, + ) + except Exception as e: + error_msg = f"Error searching Workshop: {e}" + if format == "json": + return json.dumps({"error": error_msg}) + return error_msg + + response = result.get("response", {}) + items = response.get("publishedfiledetails", []) + total = response.get("total", len(items)) + + if not items: + msg = f"No Workshop items found for app {app_id}." + if search_query: + msg += f" Search query: '{search_query}'" + if tags: + msg += f" Tags: {', '.join(tags)}" + msg += "\n\nThis game may not have Workshop support, or no items match your filters." + if format == "json": + return json.dumps({"error": msg, "total": 0, "items": []}) + return msg + + if format == "json": + data = { + "app_id": app_id, + "total_results": total, + "returned": len(items), + "sort_by": sort_by, + "items": [ + { + "workshop_id": item.get("publishedfileid"), + "title": item.get("title", "Untitled"), + "description": (item.get("short_description") or item.get("file_description", ""))[:500], + "author_steam_id": item.get("creator"), + "preview_url": item.get("preview_url"), + "subscriber_count": item.get("subscriptions", 0), + "favorited_count": item.get("favorited", 0), + "views": item.get("views", 0), + "vote_data": { + "score": item.get("vote_data", {}).get("score", 0), + "votes_up": item.get("vote_data", {}).get("votes_up", 0), + "votes_down": item.get("vote_data", {}).get("votes_down", 0), + }, + "file_size_bytes": item.get("file_size", 0), + "tags": [t.get("tag") for t in item.get("tags", [])], + "time_created": item.get("time_created"), + "time_updated": item.get("time_updated"), + } + for item in items + ], + } + return json.dumps(data, indent=2) + + # Text format + output = [ + f"Steam Workshop Results for App {app_id}", + f"Total: {total} items | Showing: {len(items)} | Sort: {sort_by}", + ] + + if search_query: + output.append(f"Search: '{search_query}'") + if tags: + output.append(f"Tags: {', '.join(tags)}") + output.append("") + + for item in items: + workshop_id = item.get("publishedfileid", "?") + title = item.get("title", "Untitled") + subs = item.get("subscriptions", 0) + favorites = item.get("favorited", 0) + file_size = item.get("file_size", 0) + + # Vote data + vote_data = item.get("vote_data", {}) + score = vote_data.get("score", 0) + rating_pct = f"{score * 100:.0f}%" if score else "N/A" + + # Tags + item_tags = [t.get("tag") for t in item.get("tags", [])] + tags_str = ", ".join(item_tags[:3]) if item_tags else "No tags" + if len(item_tags) > 3: + tags_str += f" (+{len(item_tags) - 3})" + + output.append( + f"[{workshop_id}] {title}\n" + f" Subscribers: {subs:,} | Favorites: {favorites:,} | Rating: {rating_pct}\n" + f" Size: {_format_file_size(file_size)} | Tags: {tags_str}" + ) + output.append("") + + return "\n".join(output) + + @endpoint( + name="get_workshop_item_details", + description=( + "Get detailed information about a specific Steam Workshop item. " + "Returns full description, author info, dependencies, changelog, and more." + ), + supports_json=True, + params={ + "workshop_id": { + "type": "string", + "description": "The Workshop item ID (publishedfileid)", + "required": True, + }, + }, + ) + async def get_workshop_item_details( + self, + workshop_id: str, + format: str = "text", + ) -> str: + """Get detailed information about a Workshop item.""" + try: + result = await self.client.get( + "IPublishedFileService", + "GetDetails", + version=1, + params={ + "publishedfileids[0]": workshop_id, + "includetags": True, + "includeadditionalpreviews": True, + "includechildren": True, # Dependencies + "includevotes": True, + "short_description": False, # Get full description + }, + ) + except Exception as e: + error_msg = f"Error fetching Workshop item details: {e}" + if format == "json": + return json.dumps({"error": error_msg}) + return error_msg + + response = result.get("response", {}) + items = response.get("publishedfiledetails", []) + + if not items: + error_msg = f"Workshop item {workshop_id} not found." + if format == "json": + return json.dumps({"error": error_msg}) + return error_msg + + item = items[0] + + # Check for error result (item not found or private) + if item.get("result") != 1: + error_msg = f"Workshop item {workshop_id} not found or is private." + if format == "json": + return json.dumps({"error": error_msg}) + return error_msg + + if format == "json": + data = { + "workshop_id": item.get("publishedfileid"), + "title": item.get("title", "Untitled"), + "description": item.get("file_description", ""), + "app_id": item.get("consumer_appid"), + "creator_app_id": item.get("creator_appid"), + "author": { + "steam_id": item.get("creator"), + "name": item.get("creator_name"), + }, + "file_url": item.get("file_url"), + "preview_url": item.get("preview_url"), + "file_size_bytes": item.get("file_size", 0), + "subscriber_count": item.get("subscriptions", 0), + "favorited_count": item.get("favorited", 0), + "lifetime_subscriptions": item.get("lifetime_subscriptions", 0), + "lifetime_favorited": item.get("lifetime_favorited", 0), + "views": item.get("views", 0), + "vote_data": { + "score": item.get("vote_data", {}).get("score", 0), + "votes_up": item.get("vote_data", {}).get("votes_up", 0), + "votes_down": item.get("vote_data", {}).get("votes_down", 0), + }, + "tags": [t.get("tag") for t in item.get("tags", [])], + "time_created": item.get("time_created"), + "time_updated": item.get("time_updated"), + "visibility": item.get("visibility"), + "banned": item.get("banned", False), + "language": item.get("language"), + "dependencies": item.get("children", []), + } + return json.dumps(data, indent=2) + + # Text format + title = item.get("title", "Untitled") + description = item.get("file_description", "No description") + app_id = item.get("consumer_appid", "Unknown") + author_name = item.get("creator_name", item.get("creator", "Unknown")) + + subs = item.get("subscriptions", 0) + favorites = item.get("favorited", 0) + views = item.get("views", 0) + file_size = item.get("file_size", 0) + + vote_data = item.get("vote_data", {}) + score = vote_data.get("score", 0) + votes_up = vote_data.get("votes_up", 0) + votes_down = vote_data.get("votes_down", 0) + rating_pct = f"{score * 100:.0f}%" if score else "N/A" + + time_created = item.get("time_created", 0) + time_updated = item.get("time_updated", 0) + + item_tags = [t.get("tag") for t in item.get("tags", [])] + dependencies = item.get("children", []) + + output = [ + f"Workshop Item: {title}", + f"ID: {workshop_id} | App: {app_id}", + f"Author: {author_name}", + "", + f"Subscribers: {subs:,} | Favorites: {favorites:,} | Views: {views:,}", + f"Rating: {rating_pct} ({votes_up:,} up / {votes_down:,} down)", + f"File Size: {_format_file_size(file_size)}", + "", + f"Created: {_format_timestamp(time_created)}", + f"Updated: {_format_timestamp(time_updated)}", + ] + + if item_tags: + output.append(f"Tags: {', '.join(item_tags)}") + + if dependencies: + output.append(f"\nDependencies ({len(dependencies)}):") + for dep in dependencies[:10]: + dep_id = dep.get("publishedfileid", "Unknown") + output.append(f" - {dep_id}") + if len(dependencies) > 10: + output.append(f" ... and {len(dependencies) - 10} more") + + # Truncate description if too long + if len(description) > 1000: + description = description[:1000] + "...\n[Description truncated]" + + output.append(f"\nDescription:\n{description}") + + return "\n".join(output) + + @endpoint( + name="get_workshop_collection", + description=( + "Get items from a Steam Workshop collection. " + "Returns collection name, description, and list of contained items." + ), + supports_json=True, + params={ + "collection_id": { + "type": "string", + "description": "The Workshop collection ID", + "required": True, + }, + }, + ) + async def get_workshop_collection( + self, + collection_id: str, + format: str = "text", + ) -> str: + """Get items from a Workshop collection.""" + # First, get the collection details + try: + collection_result = await self.client.get( + "IPublishedFileService", + "GetDetails", + version=1, + params={ + "publishedfileids[0]": collection_id, + "includetags": True, + "includechildren": True, # This gives us collection items + }, + ) + except Exception as e: + error_msg = f"Error fetching collection: {e}" + if format == "json": + return json.dumps({"error": error_msg}) + return error_msg + + response = collection_result.get("response", {}) + items = response.get("publishedfiledetails", []) + + if not items: + error_msg = f"Collection {collection_id} not found." + if format == "json": + return json.dumps({"error": error_msg}) + return error_msg + + collection = items[0] + + # Check if this is actually a collection (file_type 2 = collection) + file_type = collection.get("file_type", 0) + if file_type != 2: + error_msg = f"Item {collection_id} is not a collection (file_type={file_type})." + if format == "json": + return json.dumps({"error": error_msg}) + return error_msg + + # Check result status + if collection.get("result") != 1: + error_msg = f"Collection {collection_id} not found or is private." + if format == "json": + return json.dumps({"error": error_msg}) + return error_msg + + collection_name = collection.get("title", "Untitled Collection") + collection_desc = collection.get("file_description", "") + children = collection.get("children", []) + + # Get details for all child items (up to 50) + child_ids = [c.get("publishedfileid") for c in children[:50] if c.get("publishedfileid")] + + child_items = [] + if child_ids: + try: + # Build params for batch request + params: dict[str, Any] = { + "includetags": True, + "includevotes": True, + } + for i, cid in enumerate(child_ids): + params[f"publishedfileids[{i}]"] = cid + + child_result = await self.client.get( + "IPublishedFileService", + "GetDetails", + version=1, + params=params, + ) + child_items = child_result.get("response", {}).get("publishedfiledetails", []) + except Exception: + # If batch fetch fails, continue with just collection info + pass + + if format == "json": + data = { + "collection_id": collection_id, + "name": collection_name, + "description": collection_desc, + "app_id": collection.get("consumer_appid"), + "author": { + "steam_id": collection.get("creator"), + "name": collection.get("creator_name"), + }, + "subscriber_count": collection.get("subscriptions", 0), + "item_count": len(children), + "items": [ + { + "workshop_id": item.get("publishedfileid"), + "title": item.get("title", "Untitled"), + "subscriber_count": item.get("subscriptions", 0), + "vote_score": item.get("vote_data", {}).get("score", 0), + "file_size_bytes": item.get("file_size", 0), + } + for item in child_items + if item.get("result") == 1 # Only include successful results + ], + } + if len(children) > 50: + data["truncated"] = True + data["total_items"] = len(children) + return json.dumps(data, indent=2) + + # Text format + output = [ + f"Workshop Collection: {collection_name}", + f"ID: {collection_id} | App: {collection.get('consumer_appid', 'Unknown')}", + f"Author: {collection.get('creator_name', collection.get('creator', 'Unknown'))}", + f"Subscribers: {collection.get('subscriptions', 0):,}", + f"Items in collection: {len(children)}", + ] + + if collection_desc: + desc_preview = collection_desc[:300] + if len(collection_desc) > 300: + desc_preview += "..." + output.append(f"\nDescription: {desc_preview}") + + output.append(f"\nCollection Items ({len(child_items)} loaded):") + output.append("") + + for item in child_items: + if item.get("result") != 1: + continue + + item_id = item.get("publishedfileid", "?") + item_title = item.get("title", "Untitled") + item_subs = item.get("subscriptions", 0) + item_size = item.get("file_size", 0) + + output.append( + f" [{item_id}] {item_title}\n" + f" Subscribers: {item_subs:,} | Size: {_format_file_size(item_size)}" + ) + + if len(children) > 50: + output.append(f"\n ... and {len(children) - 50} more items") + + return "\n".join(output) diff --git a/tests/test_steam_workshop.py b/tests/test_steam_workshop.py new file mode 100644 index 0000000..b353394 --- /dev/null +++ b/tests/test_steam_workshop.py @@ -0,0 +1,499 @@ +"""Tests for Steam Workshop endpoint (IPublishedFileService).""" + +import json +import pytest +from unittest.mock import AsyncMock, MagicMock + +from steam_mcp.endpoints.steam_workshop import IPublishedFileService + + +@pytest.fixture +def mock_client(): + """Create mock Steam client.""" + client = MagicMock() + client.owner_steam_id = None + client.get = AsyncMock() + return client + + +@pytest.fixture +def workshop_service(mock_client): + """Create IPublishedFileService instance with mock client.""" + return IPublishedFileService(mock_client) + + +# --- search_workshop_items tests --- + + +class TestSearchWorkshopItems: + """Tests for search_workshop_items endpoint.""" + + @pytest.mark.asyncio + async def test_search_returns_items(self, workshop_service, mock_client): + """Should return Workshop items for a game.""" + mock_client.get.return_value = { + "response": { + "total": 2, + "publishedfiledetails": [ + { + "publishedfileid": "123456", + "title": "Test Mod", + "creator": "76561198000000001", + "subscriptions": 1000, + "favorited": 500, + "views": 5000, + "file_size": 1048576, # 1 MB + "vote_data": {"score": 0.95, "votes_up": 100, "votes_down": 5}, + "tags": [{"tag": "Maps"}, {"tag": "Multiplayer"}], + "time_created": 1609459200, + "time_updated": 1704067200, + }, + { + "publishedfileid": "789012", + "title": "Another Mod", + "creator": "76561198000000002", + "subscriptions": 500, + "favorited": 200, + "views": 2000, + "file_size": 524288, # 512 KB + "vote_data": {"score": 0.80, "votes_up": 80, "votes_down": 20}, + "tags": [{"tag": "Weapons"}], + "time_created": 1609459200, + "time_updated": 1704067200, + }, + ], + } + } + + result = await workshop_service.search_workshop_items(app_id=730) + + assert "Test Mod" in result + assert "Another Mod" in result + assert "1,000" in result # Subscriber count + assert "Maps" in result + + @pytest.mark.asyncio + async def test_search_with_query(self, workshop_service, mock_client): + """Should pass search query to API.""" + mock_client.get.return_value = { + "response": {"total": 0, "publishedfiledetails": []} + } + + await workshop_service.search_workshop_items(app_id=730, search_query="awp skin") + + call_args = mock_client.get.call_args + assert call_args[1]["params"]["search_text"] == "awp skin" + + @pytest.mark.asyncio + async def test_search_with_tags(self, workshop_service, mock_client): + """Should pass tags to API.""" + mock_client.get.return_value = { + "response": {"total": 0, "publishedfiledetails": []} + } + + await workshop_service.search_workshop_items(app_id=730, tags=["Maps", "Competitive"]) + + call_args = mock_client.get.call_args + assert call_args[1]["params"]["requiredtags[0]"] == "Maps" + assert call_args[1]["params"]["requiredtags[1]"] == "Competitive" + + @pytest.mark.asyncio + async def test_search_empty_results(self, workshop_service, mock_client): + """Should handle no results gracefully.""" + mock_client.get.return_value = { + "response": {"total": 0, "publishedfiledetails": []} + } + + result = await workshop_service.search_workshop_items(app_id=99999) + + assert "No Workshop items found" in result + assert "may not have Workshop support" in result + + @pytest.mark.asyncio + async def test_search_json_format(self, workshop_service, mock_client): + """Should return JSON when format='json'.""" + mock_client.get.return_value = { + "response": { + "total": 1, + "publishedfiledetails": [ + { + "publishedfileid": "123", + "title": "Test", + "subscriptions": 100, + "file_size": 1024, + "vote_data": {"score": 0.9}, + "tags": [], + } + ], + } + } + + result = await workshop_service.search_workshop_items(app_id=730, format="json") + + data = json.loads(result) + assert data["app_id"] == 730 + assert data["total_results"] == 1 + assert len(data["items"]) == 1 + assert data["items"][0]["workshop_id"] == "123" + + @pytest.mark.asyncio + async def test_search_sort_by_options(self, workshop_service, mock_client): + """Should map sort_by to correct query_type.""" + mock_client.get.return_value = { + "response": {"total": 0, "publishedfiledetails": []} + } + + # Test different sort options + for sort_by, expected_type in [("popular", 0), ("trend", 1), ("recent", 2), ("rating", 3)]: + await workshop_service.search_workshop_items(app_id=730, sort_by=sort_by) + call_args = mock_client.get.call_args + assert call_args[1]["params"]["query_type"] == expected_type + + @pytest.mark.asyncio + async def test_search_max_results_limit(self, workshop_service, mock_client): + """Should cap max_results at 50.""" + mock_client.get.return_value = { + "response": {"total": 0, "publishedfiledetails": []} + } + + await workshop_service.search_workshop_items(app_id=730, max_results=100) + + call_args = mock_client.get.call_args + assert call_args[1]["params"]["numperpage"] == 50 + + @pytest.mark.asyncio + async def test_search_api_error(self, workshop_service, mock_client): + """Should handle API errors gracefully.""" + mock_client.get.side_effect = Exception("API connection failed") + + result = await workshop_service.search_workshop_items(app_id=730) + + assert "Error" in result + assert "API connection failed" in result + + +# --- get_workshop_item_details tests --- + + +class TestGetWorkshopItemDetails: + """Tests for get_workshop_item_details endpoint.""" + + @pytest.mark.asyncio + async def test_get_details_success(self, workshop_service, mock_client): + """Should return item details.""" + mock_client.get.return_value = { + "response": { + "publishedfiledetails": [ + { + "result": 1, + "publishedfileid": "123456", + "title": "Awesome Mod", + "file_description": "This is a great mod for the game.", + "consumer_appid": 730, + "creator": "76561198000000001", + "creator_name": "ModAuthor", + "subscriptions": 10000, + "favorited": 5000, + "views": 50000, + "file_size": 52428800, # 50 MB + "vote_data": {"score": 0.95, "votes_up": 950, "votes_down": 50}, + "tags": [{"tag": "Gameplay"}, {"tag": "Realism"}], + "time_created": 1609459200, + "time_updated": 1704067200, + "children": [{"publishedfileid": "111"}, {"publishedfileid": "222"}], + } + ] + } + } + + result = await workshop_service.get_workshop_item_details(workshop_id="123456") + + assert "Awesome Mod" in result + assert "ModAuthor" in result + assert "10,000" in result # Subscribers + assert "95%" in result # Rating + assert "50.0 MB" in result + assert "Dependencies (2)" in result + + @pytest.mark.asyncio + async def test_get_details_not_found(self, workshop_service, mock_client): + """Should handle item not found.""" + mock_client.get.return_value = {"response": {"publishedfiledetails": []}} + + result = await workshop_service.get_workshop_item_details(workshop_id="999999") + + assert "not found" in result.lower() + + @pytest.mark.asyncio + async def test_get_details_private_item(self, workshop_service, mock_client): + """Should handle private/invalid items.""" + mock_client.get.return_value = { + "response": { + "publishedfiledetails": [ + {"result": 9, "publishedfileid": "123456"} # Result 9 = private/deleted + ] + } + } + + result = await workshop_service.get_workshop_item_details(workshop_id="123456") + + assert "not found or is private" in result.lower() + + @pytest.mark.asyncio + async def test_get_details_json_format(self, workshop_service, mock_client): + """Should return JSON when format='json'.""" + mock_client.get.return_value = { + "response": { + "publishedfiledetails": [ + { + "result": 1, + "publishedfileid": "123", + "title": "Test", + "file_description": "Description", + "consumer_appid": 730, + "creator": "76561198000000001", + "subscriptions": 100, + "file_size": 1024, + "vote_data": {"score": 0.9, "votes_up": 90, "votes_down": 10}, + "tags": [], + "time_created": 1609459200, + "time_updated": 1704067200, + } + ] + } + } + + result = await workshop_service.get_workshop_item_details( + workshop_id="123", format="json" + ) + + data = json.loads(result) + assert data["workshop_id"] == "123" + assert data["title"] == "Test" + assert data["subscriber_count"] == 100 + assert data["vote_data"]["score"] == 0.9 + + @pytest.mark.asyncio + async def test_get_details_truncates_long_description(self, workshop_service, mock_client): + """Should truncate very long descriptions.""" + long_desc = "A" * 2000 + mock_client.get.return_value = { + "response": { + "publishedfiledetails": [ + { + "result": 1, + "publishedfileid": "123", + "title": "Test", + "file_description": long_desc, + "consumer_appid": 730, + "creator": "76561198000000001", + "subscriptions": 100, + "file_size": 1024, + "vote_data": {}, + "tags": [], + } + ] + } + } + + result = await workshop_service.get_workshop_item_details(workshop_id="123") + + assert "[Description truncated]" in result + assert len(result) < len(long_desc) + 500 # Should be much shorter + + +# --- get_workshop_collection tests --- + + +class TestGetWorkshopCollection: + """Tests for get_workshop_collection endpoint.""" + + @pytest.mark.asyncio + async def test_get_collection_success(self, workshop_service, mock_client): + """Should return collection with items.""" + # First call: get collection details + # Second call: get child item details + mock_client.get.side_effect = [ + { + "response": { + "publishedfiledetails": [ + { + "result": 1, + "publishedfileid": "999999", + "title": "My Mod Collection", + "file_description": "A collection of great mods.", + "file_type": 2, # Collection type + "consumer_appid": 730, + "creator": "76561198000000001", + "creator_name": "Collector", + "subscriptions": 5000, + "children": [ + {"publishedfileid": "111"}, + {"publishedfileid": "222"}, + ], + } + ] + } + }, + { + "response": { + "publishedfiledetails": [ + { + "result": 1, + "publishedfileid": "111", + "title": "Mod One", + "subscriptions": 1000, + "file_size": 1024, + "vote_data": {"score": 0.9}, + }, + { + "result": 1, + "publishedfileid": "222", + "title": "Mod Two", + "subscriptions": 2000, + "file_size": 2048, + "vote_data": {"score": 0.8}, + }, + ] + } + }, + ] + + result = await workshop_service.get_workshop_collection(collection_id="999999") + + assert "My Mod Collection" in result + assert "Collector" in result + assert "Items in collection: 2" in result + assert "Mod One" in result + assert "Mod Two" in result + + @pytest.mark.asyncio + async def test_get_collection_not_found(self, workshop_service, mock_client): + """Should handle collection not found.""" + mock_client.get.return_value = {"response": {"publishedfiledetails": []}} + + result = await workshop_service.get_workshop_collection(collection_id="999999") + + assert "not found" in result.lower() + + @pytest.mark.asyncio + async def test_get_collection_not_a_collection(self, workshop_service, mock_client): + """Should reject non-collection items.""" + mock_client.get.return_value = { + "response": { + "publishedfiledetails": [ + { + "result": 1, + "publishedfileid": "123", + "title": "Regular Mod", + "file_type": 0, # Not a collection + } + ] + } + } + + result = await workshop_service.get_workshop_collection(collection_id="123") + + assert "not a collection" in result.lower() + + @pytest.mark.asyncio + async def test_get_collection_json_format(self, workshop_service, mock_client): + """Should return JSON when format='json'.""" + mock_client.get.side_effect = [ + { + "response": { + "publishedfiledetails": [ + { + "result": 1, + "publishedfileid": "999", + "title": "Collection", + "file_description": "Desc", + "file_type": 2, + "consumer_appid": 730, + "creator": "76561198000000001", + "subscriptions": 100, + "children": [{"publishedfileid": "111"}], + } + ] + } + }, + { + "response": { + "publishedfiledetails": [ + { + "result": 1, + "publishedfileid": "111", + "title": "Item", + "subscriptions": 50, + "file_size": 1024, + "vote_data": {"score": 0.9}, + } + ] + } + }, + ] + + result = await workshop_service.get_workshop_collection( + collection_id="999", format="json" + ) + + data = json.loads(result) + assert data["collection_id"] == "999" + assert data["name"] == "Collection" + assert data["item_count"] == 1 + assert len(data["items"]) == 1 + + @pytest.mark.asyncio + async def test_get_collection_handles_batch_fetch_failure( + self, workshop_service, mock_client + ): + """Should still return collection info if batch item fetch fails.""" + mock_client.get.side_effect = [ + { + "response": { + "publishedfiledetails": [ + { + "result": 1, + "publishedfileid": "999", + "title": "Collection", + "file_type": 2, + "consumer_appid": 730, + "creator": "76561198000000001", + "subscriptions": 100, + "children": [{"publishedfileid": "111"}], + } + ] + } + }, + Exception("Batch fetch failed"), # Second call fails + ] + + result = await workshop_service.get_workshop_collection(collection_id="999") + + # Should still return collection info even if item fetch failed + assert "Collection" in result + assert "Items in collection: 1" in result + + +# --- Helper function tests --- + + +class TestHelperFunctions: + """Tests for module helper functions.""" + + def test_format_file_size(self): + """Test file size formatting.""" + from steam_mcp.endpoints.steam_workshop import _format_file_size + + assert _format_file_size(500) == "500 B" + assert _format_file_size(1024) == "1.0 KB" + assert _format_file_size(1048576) == "1.0 MB" + assert _format_file_size(1073741824) == "1.0 GB" + assert _format_file_size(52428800) == "50.0 MB" + + def test_format_timestamp(self): + """Test timestamp formatting.""" + from steam_mcp.endpoints.steam_workshop import _format_timestamp + + assert _format_timestamp(0) == "Unknown" + assert "2021-01-01" in _format_timestamp(1609459200) From 64c8f223cfc0ef484bc5441148833935e6a11b7a Mon Sep 17 00:00:00 2001 From: CodeKeanu <85296798+CodeKeanu@users.noreply.github.com> Date: Fri, 12 Dec 2025 12:22:56 +0000 Subject: [PATCH 2/6] fix: address code review issues in Workshop integration MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Fixes based on ml-python-mcp-expert review: Must-fix: - Filter None values from tag list comprehensions - Add logging import and log failed batch fetches instead of silent pass - Fix timezone-naive datetime to use UTC Should-fix: - Add input validation for workshop_id and collection_id - Standardize JSON error responses (remove extra fields) - Update endpoint description with 50-item limit note Tests: - Add tests for timestamp UTC formatting - Add tests for negative timestamps - Add input validation tests for empty/whitespace IDs - Add test for tag filtering with missing keys 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- src/steam_mcp/endpoints/steam_workshop.py | 43 ++++++++++--- tests/test_steam_workshop.py | 75 +++++++++++++++++++++++ 2 files changed, 108 insertions(+), 10 deletions(-) diff --git a/src/steam_mcp/endpoints/steam_workshop.py b/src/steam_mcp/endpoints/steam_workshop.py index eb434b1..fff0bb0 100644 --- a/src/steam_mcp/endpoints/steam_workshop.py +++ b/src/steam_mcp/endpoints/steam_workshop.py @@ -7,12 +7,16 @@ """ import json -from datetime import datetime +import logging +from datetime import datetime, timezone from typing import Any from steam_mcp.endpoints.base import BaseEndpoint, endpoint +logger = logging.getLogger(__name__) + + # Query type mappings for sorting QUERY_TYPES = { "popular": 0, # RankedByVote @@ -35,11 +39,11 @@ def _format_file_size(size_bytes: int) -> str: def _format_timestamp(ts: int) -> str: - """Format Unix timestamp as readable date.""" - if ts == 0: + """Format Unix timestamp as readable date (UTC).""" + if ts <= 0: return "Unknown" try: - return datetime.fromtimestamp(ts).strftime("%Y-%m-%d %H:%M") + return datetime.fromtimestamp(ts, tz=timezone.utc).strftime("%Y-%m-%d %H:%M UTC") except (ValueError, OSError): return "Unknown" @@ -144,7 +148,7 @@ async def search_workshop_items( msg += f" Tags: {', '.join(tags)}" msg += "\n\nThis game may not have Workshop support, or no items match your filters." if format == "json": - return json.dumps({"error": msg, "total": 0, "items": []}) + return json.dumps({"error": msg}) return msg if format == "json": @@ -169,7 +173,7 @@ async def search_workshop_items( "votes_down": item.get("vote_data", {}).get("votes_down", 0), }, "file_size_bytes": item.get("file_size", 0), - "tags": [t.get("tag") for t in item.get("tags", [])], + "tags": [t.get("tag") for t in item.get("tags", []) if t.get("tag")], "time_created": item.get("time_created"), "time_updated": item.get("time_updated"), } @@ -203,7 +207,7 @@ async def search_workshop_items( rating_pct = f"{score * 100:.0f}%" if score else "N/A" # Tags - item_tags = [t.get("tag") for t in item.get("tags", [])] + item_tags = [t.get("tag") for t in item.get("tags", []) if t.get("tag")] tags_str = ", ".join(item_tags[:3]) if item_tags else "No tags" if len(item_tags) > 3: tags_str += f" (+{len(item_tags) - 3})" @@ -238,6 +242,15 @@ async def get_workshop_item_details( format: str = "text", ) -> str: """Get detailed information about a Workshop item.""" + # Validate input + if not workshop_id or not workshop_id.strip(): + error_msg = "Workshop ID is required" + if format == "json": + return json.dumps({"error": error_msg}) + return error_msg + + workshop_id = workshop_id.strip() + try: result = await self.client.get( "IPublishedFileService", @@ -369,7 +382,8 @@ async def get_workshop_item_details( name="get_workshop_collection", description=( "Get items from a Steam Workshop collection. " - "Returns collection name, description, and list of contained items." + "Returns collection name, description, and list of contained items. " + "Note: Item details are fetched for up to 50 items." ), supports_json=True, params={ @@ -386,6 +400,15 @@ async def get_workshop_collection( format: str = "text", ) -> str: """Get items from a Workshop collection.""" + # Validate input + if not collection_id or not collection_id.strip(): + error_msg = "Collection ID is required" + if format == "json": + return json.dumps({"error": error_msg}) + return error_msg + + collection_id = collection_id.strip() + # First, get the collection details try: collection_result = await self.client.get( @@ -455,9 +478,9 @@ async def get_workshop_collection( params=params, ) child_items = child_result.get("response", {}).get("publishedfiledetails", []) - except Exception: + except Exception as e: # If batch fetch fails, continue with just collection info - pass + logger.warning(f"Failed to fetch collection child items: {e}") if format == "json": data = { diff --git a/tests/test_steam_workshop.py b/tests/test_steam_workshop.py index b353394..2190e12 100644 --- a/tests/test_steam_workshop.py +++ b/tests/test_steam_workshop.py @@ -497,3 +497,78 @@ def test_format_timestamp(self): assert _format_timestamp(0) == "Unknown" assert "2021-01-01" in _format_timestamp(1609459200) + assert "UTC" in _format_timestamp(1609459200) + + def test_format_timestamp_negative(self): + """Test timestamp formatting with negative value.""" + from steam_mcp.endpoints.steam_workshop import _format_timestamp + + assert _format_timestamp(-1) == "Unknown" + + +class TestInputValidation: + """Tests for input validation.""" + + @pytest.mark.asyncio + async def test_empty_workshop_id_returns_error(self, workshop_service): + """Empty workshop_id should return error.""" + result = await workshop_service.get_workshop_item_details(workshop_id="") + assert "Workshop ID is required" in result + + @pytest.mark.asyncio + async def test_whitespace_workshop_id_returns_error(self, workshop_service): + """Whitespace-only workshop_id should return error.""" + result = await workshop_service.get_workshop_item_details(workshop_id=" ") + assert "Workshop ID is required" in result + + @pytest.mark.asyncio + async def test_empty_workshop_id_json_format(self, workshop_service): + """Empty workshop_id should return JSON error when format='json'.""" + result = await workshop_service.get_workshop_item_details( + workshop_id="", format="json" + ) + data = json.loads(result) + assert "error" in data + assert "Workshop ID is required" in data["error"] + + @pytest.mark.asyncio + async def test_empty_collection_id_returns_error(self, workshop_service): + """Empty collection_id should return error.""" + result = await workshop_service.get_workshop_collection(collection_id="") + assert "Collection ID is required" in result + + @pytest.mark.asyncio + async def test_whitespace_collection_id_returns_error(self, workshop_service): + """Whitespace-only collection_id should return error.""" + result = await workshop_service.get_workshop_collection(collection_id=" ") + assert "Collection ID is required" in result + + +class TestTagFiltering: + """Tests for tag filtering edge cases.""" + + @pytest.mark.asyncio + async def test_tags_with_missing_key_filtered(self, workshop_service, mock_client): + """Tags missing the 'tag' key should be filtered out.""" + mock_client.get.return_value = { + "response": { + "total": 1, + "publishedfiledetails": [ + { + "publishedfileid": "123", + "title": "Test", + "subscriptions": 100, + "file_size": 1024, + "vote_data": {"score": 0.9}, + # Tags with missing 'tag' key + "tags": [{"tag": "Valid"}, {"other": "NoTag"}, {"tag": None}], + } + ], + } + } + + result = await workshop_service.search_workshop_items(app_id=730, format="json") + data = json.loads(result) + + # Only "Valid" should be in tags, others should be filtered + assert data["items"][0]["tags"] == ["Valid"] From 6dd1aebc655fa143a6d34bf84b59703c95a1cf58 Mon Sep 17 00:00:00 2001 From: CodeKeanu <85296798+CodeKeanu@users.noreply.github.com> Date: Fri, 12 Dec 2025 12:34:22 +0000 Subject: [PATCH 3/6] fix: add type coercion for Steam API string responses MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The Steam API can return numeric fields as strings (e.g., "1000" instead of 1000). This caused TypeError: '<' not supported between 'str' and 'int'. Fixes: - Add _safe_int() helper for int coercion with default fallback - Add _safe_float() helper for float coercion with default fallback - Update _format_file_size() to handle string/None inputs - Update _format_timestamp() to handle string/None inputs - Apply _safe_int/_safe_float to all numeric API fields: - subscriptions, favorited, views, file_size - vote_data.score, votes_up, votes_down - result, file_type comparisons - time_created, time_updated Tests: - Add TestStringTypeCoercion class with 7 tests - Tests cover string values in search, details, and collection endpoints - Tests for _safe_int, _safe_float, and format functions with string input 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- src/steam_mcp/endpoints/steam_workshop.py | 71 +++++++--- tests/test_steam_workshop.py | 154 ++++++++++++++++++++++ 2 files changed, 204 insertions(+), 21 deletions(-) diff --git a/src/steam_mcp/endpoints/steam_workshop.py b/src/steam_mcp/endpoints/steam_workshop.py index fff0bb0..c3adcc7 100644 --- a/src/steam_mcp/endpoints/steam_workshop.py +++ b/src/steam_mcp/endpoints/steam_workshop.py @@ -26,8 +26,32 @@ } -def _format_file_size(size_bytes: int) -> str: +def _safe_int(value: Any, default: int = 0) -> int: + """Safely convert value to int, handling strings from API.""" + if value is None: + return default + try: + return int(value) + except (ValueError, TypeError): + return default + + +def _safe_float(value: Any, default: float = 0.0) -> float: + """Safely convert value to float, handling strings from API.""" + if value is None: + return default + try: + return float(value) + except (ValueError, TypeError): + return default + + +def _format_file_size(size_bytes: int | str | None) -> str: """Format file size in human readable form.""" + try: + size_bytes = int(size_bytes) if size_bytes else 0 + except (ValueError, TypeError): + return "Unknown size" if size_bytes < 1024: return f"{size_bytes} B" elif size_bytes < 1024 * 1024: @@ -38,8 +62,12 @@ def _format_file_size(size_bytes: int) -> str: return f"{size_bytes / (1024 * 1024 * 1024):.1f} GB" -def _format_timestamp(ts: int) -> str: +def _format_timestamp(ts: int | str | None) -> str: """Format Unix timestamp as readable date (UTC).""" + try: + ts = int(ts) if ts else 0 + except (ValueError, TypeError): + return "Unknown" if ts <= 0: return "Unknown" try: @@ -197,13 +225,13 @@ async def search_workshop_items( for item in items: workshop_id = item.get("publishedfileid", "?") title = item.get("title", "Untitled") - subs = item.get("subscriptions", 0) - favorites = item.get("favorited", 0) + subs = _safe_int(item.get("subscriptions")) + favorites = _safe_int(item.get("favorited")) file_size = item.get("file_size", 0) # Vote data vote_data = item.get("vote_data", {}) - score = vote_data.get("score", 0) + score = _safe_float(vote_data.get("score")) rating_pct = f"{score * 100:.0f}%" if score else "N/A" # Tags @@ -283,7 +311,7 @@ async def get_workshop_item_details( item = items[0] # Check for error result (item not found or private) - if item.get("result") != 1: + if _safe_int(item.get("result")) != 1: error_msg = f"Workshop item {workshop_id} not found or is private." if format == "json": return json.dumps({"error": error_msg}) @@ -329,15 +357,15 @@ async def get_workshop_item_details( app_id = item.get("consumer_appid", "Unknown") author_name = item.get("creator_name", item.get("creator", "Unknown")) - subs = item.get("subscriptions", 0) - favorites = item.get("favorited", 0) - views = item.get("views", 0) + subs = _safe_int(item.get("subscriptions")) + favorites = _safe_int(item.get("favorited")) + views = _safe_int(item.get("views")) file_size = item.get("file_size", 0) vote_data = item.get("vote_data", {}) - score = vote_data.get("score", 0) - votes_up = vote_data.get("votes_up", 0) - votes_down = vote_data.get("votes_down", 0) + score = _safe_float(vote_data.get("score")) + votes_up = _safe_int(vote_data.get("votes_up")) + votes_down = _safe_int(vote_data.get("votes_down")) rating_pct = f"{score * 100:.0f}%" if score else "N/A" time_created = item.get("time_created", 0) @@ -439,7 +467,7 @@ async def get_workshop_collection( collection = items[0] # Check if this is actually a collection (file_type 2 = collection) - file_type = collection.get("file_type", 0) + file_type = _safe_int(collection.get("file_type")) if file_type != 2: error_msg = f"Item {collection_id} is not a collection (file_type={file_type})." if format == "json": @@ -447,7 +475,7 @@ async def get_workshop_collection( return error_msg # Check result status - if collection.get("result") != 1: + if _safe_int(collection.get("result")) != 1: error_msg = f"Collection {collection_id} not found or is private." if format == "json": return json.dumps({"error": error_msg}) @@ -498,12 +526,12 @@ async def get_workshop_collection( { "workshop_id": item.get("publishedfileid"), "title": item.get("title", "Untitled"), - "subscriber_count": item.get("subscriptions", 0), - "vote_score": item.get("vote_data", {}).get("score", 0), - "file_size_bytes": item.get("file_size", 0), + "subscriber_count": _safe_int(item.get("subscriptions")), + "vote_score": _safe_float(item.get("vote_data", {}).get("score")), + "file_size_bytes": _safe_int(item.get("file_size")), } for item in child_items - if item.get("result") == 1 # Only include successful results + if _safe_int(item.get("result")) == 1 # Only include successful results ], } if len(children) > 50: @@ -512,11 +540,12 @@ async def get_workshop_collection( return json.dumps(data, indent=2) # Text format + collection_subs = _safe_int(collection.get("subscriptions")) output = [ f"Workshop Collection: {collection_name}", f"ID: {collection_id} | App: {collection.get('consumer_appid', 'Unknown')}", f"Author: {collection.get('creator_name', collection.get('creator', 'Unknown'))}", - f"Subscribers: {collection.get('subscriptions', 0):,}", + f"Subscribers: {collection_subs:,}", f"Items in collection: {len(children)}", ] @@ -530,12 +559,12 @@ async def get_workshop_collection( output.append("") for item in child_items: - if item.get("result") != 1: + if _safe_int(item.get("result")) != 1: continue item_id = item.get("publishedfileid", "?") item_title = item.get("title", "Untitled") - item_subs = item.get("subscriptions", 0) + item_subs = _safe_int(item.get("subscriptions")) item_size = item.get("file_size", 0) output.append( diff --git a/tests/test_steam_workshop.py b/tests/test_steam_workshop.py index 2190e12..79c7096 100644 --- a/tests/test_steam_workshop.py +++ b/tests/test_steam_workshop.py @@ -572,3 +572,157 @@ async def test_tags_with_missing_key_filtered(self, workshop_service, mock_clien # Only "Valid" should be in tags, others should be filtered assert data["items"][0]["tags"] == ["Valid"] + + +class TestStringTypeCoercion: + """Tests for handling string values from Steam API (real API returns strings for some numeric fields).""" + + @pytest.mark.asyncio + async def test_search_handles_string_numeric_values(self, workshop_service, mock_client): + """API may return numeric fields as strings - should not raise TypeError.""" + mock_client.get.return_value = { + "response": { + "total": 1, + "publishedfiledetails": [ + { + "publishedfileid": "123456", + "title": "String Values Mod", + "subscriptions": "5000", # String instead of int + "favorited": "250", # String instead of int + "file_size": "1048576", # String instead of int + "vote_data": {"score": "0.95"}, # String instead of float + "tags": [{"tag": "Maps"}], + "time_created": "1609459200", # String instead of int + } + ], + } + } + + # Should not raise "'<' not supported between instances of 'str' and 'int'" + result = await workshop_service.search_workshop_items(app_id=730) + + assert "String Values Mod" in result + assert "5,000" in result # Formatted with comma + assert "1.0 MB" in result # File size formatted + + @pytest.mark.asyncio + async def test_item_details_handles_string_values(self, workshop_service, mock_client): + """get_workshop_item_details should handle string numeric values.""" + mock_client.get.return_value = { + "response": { + "publishedfiledetails": [ + { + "result": "1", # String instead of int + "publishedfileid": "999", + "title": "Test Item", + "file_description": "A test item", + "consumer_appid": "730", + "creator": "76561198000000001", + "subscriptions": "10000", + "favorited": "500", + "views": "50000", + "file_size": "2097152", + "vote_data": { + "score": "0.88", + "votes_up": "100", + "votes_down": "10", + }, + "time_created": "1609459200", + "time_updated": "1704067200", + "tags": [], + } + ] + } + } + + result = await workshop_service.get_workshop_item_details(workshop_id="999") + + assert "Test Item" in result + assert "10,000" in result # subscribers formatted + assert "2.0 MB" in result # file size + assert "88%" in result # rating + + @pytest.mark.asyncio + async def test_collection_handles_string_values(self, workshop_service, mock_client): + """get_workshop_collection should handle string numeric values.""" + mock_client.get.side_effect = [ + { + "response": { + "publishedfiledetails": [ + { + "result": "1", + "publishedfileid": "888", + "title": "Test Collection", + "file_description": "Collection desc", + "file_type": "2", # String instead of int + "consumer_appid": "730", + "creator": "76561198000000001", + "subscriptions": "3000", + "children": [{"publishedfileid": "111"}], + } + ] + } + }, + { + "response": { + "publishedfiledetails": [ + { + "result": "1", + "publishedfileid": "111", + "title": "Child Item", + "subscriptions": "1500", + "file_size": "512000", + "vote_data": {"score": "0.75"}, + } + ] + } + }, + ] + + result = await workshop_service.get_workshop_collection(collection_id="888") + + assert "Test Collection" in result + assert "3,000" in result # collection subscribers + assert "Child Item" in result + assert "1,500" in result # child item subscribers + + def test_safe_int_function(self): + """Test _safe_int helper function.""" + from steam_mcp.endpoints.steam_workshop import _safe_int + + assert _safe_int("123") == 123 + assert _safe_int(456) == 456 + assert _safe_int(None) == 0 + assert _safe_int("") == 0 + assert _safe_int("invalid") == 0 + assert _safe_int(None, default=99) == 99 + + def test_safe_float_function(self): + """Test _safe_float helper function.""" + from steam_mcp.endpoints.steam_workshop import _safe_float + + assert _safe_float("0.95") == 0.95 + assert _safe_float(0.88) == 0.88 + assert _safe_float(None) == 0.0 + assert _safe_float("") == 0.0 + assert _safe_float("invalid") == 0.0 + assert _safe_float(None, default=1.0) == 1.0 + + def test_format_file_size_string_input(self): + """_format_file_size should handle string input.""" + from steam_mcp.endpoints.steam_workshop import _format_file_size + + assert _format_file_size("1048576") == "1.0 MB" + assert _format_file_size("") == "0 B" + assert _format_file_size(None) == "0 B" + assert _format_file_size("invalid") == "Unknown size" + + def test_format_timestamp_string_input(self): + """_format_timestamp should handle string input.""" + from steam_mcp.endpoints.steam_workshop import _format_timestamp + + assert "2021-01-01" in _format_timestamp("1609459200") + assert _format_timestamp("0") == "Unknown" + assert _format_timestamp("") == "Unknown" + assert _format_timestamp(None) == "Unknown" + assert _format_timestamp("invalid") == "Unknown" From 401ead82f1a35ee2225104f21eb5080358c8cca3 Mon Sep 17 00:00:00 2001 From: CodeKeanu <85296798+CodeKeanu@users.noreply.github.com> Date: Fri, 12 Dec 2025 20:30:55 +0000 Subject: [PATCH 4/6] feat: add search_workshop_collections tool for collection discovery MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Users previously needed to know collection IDs upfront. The new search_workshop_collections tool lets users browse and discover collections with the same sorting/filtering options as search_workshop_items: - Sort by popular, trending, recent, or rating - Text search support - Returns collection name, item count, subscriber count Also updated get_workshop_collection description to reference the new discovery tool. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- CHANGELOG.md | 7 +- README.md | 7 +- src/steam_mcp/endpoints/steam_workshop.py | 155 ++++++++++++++++- tests/test_steam_workshop.py | 194 ++++++++++++++++++++++ 4 files changed, 358 insertions(+), 5 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6333137..7ac58e4 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,11 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Text search and tag filtering support - Sort by popular, trending, recent, or rating - Returns subscriber counts, ratings, file sizes + - `search_workshop_collections` - Search Workshop collections by game + - Discover popular modpacks and curated item lists + - Sort by popular, trending, recent, or rating + - Text search support + - Returns collection name, item count, subscriber count - `get_workshop_item_details` - Get detailed Workshop item information - Full description, author info, dependencies - Subscriber/favorite counts, vote breakdown @@ -30,7 +35,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Optional `include_own` parameter to include owned apps ### Changed -- Tool count increased from 30 to 35 +- Tool count increased from 30 to 36 ## [v0.8.0] - 2025-12-11 diff --git a/README.md b/README.md index 84895eb..163b30e 100644 --- a/README.md +++ b/README.md @@ -18,7 +18,7 @@ Once set up, you can ask Claude things like: - "Show my pending trade offers" - "What's the current price for an AK-47 Redline?" -The server includes 35 tools covering player profiles, game libraries, achievements, stats, reviews, wishlists, news, community guides, trading/market data, family sharing, and Steam Workshop. +The server includes 36 tools covering player profiles, game libraries, achievements, stats, reviews, wishlists, news, community guides, trading/market data, family sharing, and Steam Workshop. --- @@ -179,7 +179,7 @@ No registration needed - just drop in the file and restart. --- -## Available Tools (35 total) +## Available Tools (36 total) ### Player Profiles (ISteamUser) - 6 tools @@ -260,11 +260,12 @@ No registration needed - just drop in the file and restart. | `get_family_group` | Get family group membership, members, and roles | | `get_shared_library_apps` | Get games available through family sharing | -### Steam Workshop (IPublishedFileService) - 3 tools +### Steam Workshop (IPublishedFileService) - 4 tools | Tool | What it does | |------|--------------| | `search_workshop_items` | Search Workshop mods by game, with text/tag filters and sorting | +| `search_workshop_collections` | Search Workshop collections (curated item lists) by game with sorting | | `get_workshop_item_details` | Get full details on a Workshop item (description, subscribers, dependencies) | | `get_workshop_collection` | Get items from a Workshop collection | diff --git a/src/steam_mcp/endpoints/steam_workshop.py b/src/steam_mcp/endpoints/steam_workshop.py index c3adcc7..e635a67 100644 --- a/src/steam_mcp/endpoints/steam_workshop.py +++ b/src/steam_mcp/endpoints/steam_workshop.py @@ -406,18 +406,171 @@ async def get_workshop_item_details( return "\n".join(output) + @endpoint( + name="search_workshop_collections", + description=( + "Search Steam Workshop for collections (curated item lists) by game. " + "Returns collections with name, subscriber count, and item count. " + "Use this to discover popular modpacks or curated content lists." + ), + supports_json=True, + params={ + "app_id": { + "type": "integer", + "description": "Steam App ID of the game to search collections for", + "required": True, + }, + "search_query": { + "type": "string", + "description": "Text search filter (optional)", + "required": False, + }, + "sort_by": { + "type": "string", + "description": "Sort order: 'popular' (most voted), 'trend' (trending), 'recent' (newest), 'rating' (highest rated)", + "required": False, + "default": "popular", + "enum": ["popular", "trend", "recent", "rating"], + }, + "max_results": { + "type": "integer", + "description": "Maximum number of results to return (default: 10, max: 50)", + "required": False, + "default": 10, + "minimum": 1, + "maximum": 50, + }, + }, + ) + async def search_workshop_collections( + self, + app_id: int, + search_query: str = "", + sort_by: str = "popular", + max_results: int = 10, + format: str = "text", + ) -> str: + """Search Steam Workshop for collections.""" + query_type = QUERY_TYPES.get(sort_by, 0) + + params: dict[str, Any] = { + "appid": app_id, + "query_type": query_type, + "filetype": 2, # Collections only + "numperpage": min(max_results, 50), + "return_metadata": True, + "return_tags": True, + "return_vote_data": True, + "return_children": True, # Get item count in collections + } + + if search_query: + params["search_text"] = search_query + + try: + result = await self.client.get( + "IPublishedFileService", + "QueryFiles", + version=1, + params=params, + ) + except Exception as e: + error_msg = f"Error searching Workshop collections: {e}" + if format == "json": + return json.dumps({"error": error_msg}) + return error_msg + + response = result.get("response", {}) + collections = response.get("publishedfiledetails", []) + total = _safe_int(response.get("total", len(collections))) + + if not collections: + msg = f"No Workshop collections found for app {app_id}." + if search_query: + msg += f" Search query: '{search_query}'" + msg += "\n\nThis game may not have Workshop collections, or no collections match your search." + if format == "json": + return json.dumps({"error": msg}) + return msg + + if format == "json": + data = { + "app_id": app_id, + "total_results": total, + "returned": len(collections), + "sort_by": sort_by, + "collections": [ + { + "collection_id": coll.get("publishedfileid"), + "name": coll.get("title", "Untitled"), + "description": (coll.get("short_description") or coll.get("file_description", ""))[:300], + "author_steam_id": coll.get("creator"), + "author_name": coll.get("creator_name"), + "preview_url": coll.get("preview_url"), + "subscriber_count": _safe_int(coll.get("subscriptions")), + "favorited_count": _safe_int(coll.get("favorited")), + "item_count": len(coll.get("children", [])), + "vote_data": { + "score": _safe_float(coll.get("vote_data", {}).get("score")), + "votes_up": _safe_int(coll.get("vote_data", {}).get("votes_up")), + "votes_down": _safe_int(coll.get("vote_data", {}).get("votes_down")), + }, + "time_created": _safe_int(coll.get("time_created")), + "time_updated": _safe_int(coll.get("time_updated")), + } + for coll in collections + ], + } + return json.dumps(data, indent=2) + + # Text format + output = [ + f"Steam Workshop Collections for App {app_id}", + f"Total: {total} collections | Showing: {len(collections)} | Sort: {sort_by}", + ] + + if search_query: + output.append(f"Search: '{search_query}'") + output.append("") + + for coll in collections: + coll_id = coll.get("publishedfileid", "?") + name = coll.get("title", "Untitled") + author = coll.get("creator_name", "Unknown") + subs = _safe_int(coll.get("subscriptions")) + item_count = len(coll.get("children", [])) + + vote_data = coll.get("vote_data", {}) + score = _safe_float(vote_data.get("score")) + rating_pct = f"{score * 100:.0f}%" if score else "N/A" + + desc = (coll.get("short_description") or coll.get("file_description", ""))[:100] + if len(desc) == 100: + desc += "..." + + output.append( + f"[{coll_id}] {name}\n" + f" By: {author} | Items: {item_count} | Subscribers: {subs:,} | Rating: {rating_pct}" + ) + if desc: + output.append(f" {desc}") + output.append("") + + return "\n".join(output) + @endpoint( name="get_workshop_collection", description=( "Get items from a Steam Workshop collection. " "Returns collection name, description, and list of contained items. " + "Use 'search_workshop_collections' first to find collection IDs. " "Note: Item details are fetched for up to 50 items." ), supports_json=True, params={ "collection_id": { "type": "string", - "description": "The Workshop collection ID", + "description": "The Workshop collection ID (use search_workshop_collections to find IDs)", "required": True, }, }, diff --git a/tests/test_steam_workshop.py b/tests/test_steam_workshop.py index 79c7096..b9f0b9e 100644 --- a/tests/test_steam_workshop.py +++ b/tests/test_steam_workshop.py @@ -172,6 +172,200 @@ async def test_search_api_error(self, workshop_service, mock_client): assert "API connection failed" in result +# --- search_workshop_collections tests --- + + +class TestSearchWorkshopCollections: + """Tests for search_workshop_collections endpoint.""" + + @pytest.mark.asyncio + async def test_search_returns_collections(self, workshop_service, mock_client): + """Should return Workshop collections for a game.""" + mock_client.get.return_value = { + "response": { + "total": 2, + "publishedfiledetails": [ + { + "publishedfileid": "111111", + "title": "Ultimate Modpack", + "file_description": "Collection of essential mods", + "creator": "76561198000000001", + "creator_name": "CollectorOne", + "subscriptions": 10000, + "favorited": 5000, + "file_type": 2, # Collection + "vote_data": {"score": 0.95, "votes_up": 950, "votes_down": 50}, + "children": [ + {"publishedfileid": "1"}, + {"publishedfileid": "2"}, + {"publishedfileid": "3"}, + ], + "time_created": 1609459200, + "time_updated": 1704067200, + }, + { + "publishedfileid": "222222", + "title": "Starter Pack", + "file_description": "Beginner collection", + "creator": "76561198000000002", + "creator_name": "CollectorTwo", + "subscriptions": 5000, + "favorited": 2000, + "file_type": 2, + "vote_data": {"score": 0.80, "votes_up": 80, "votes_down": 20}, + "children": [{"publishedfileid": "4"}], + "time_created": 1609459200, + "time_updated": 1704067200, + }, + ], + } + } + + result = await workshop_service.search_workshop_collections(app_id=730) + + assert "Ultimate Modpack" in result + assert "Starter Pack" in result + assert "10,000" in result # Subscriber count + assert "Items: 3" in result # Item count + assert "CollectorOne" in result + + @pytest.mark.asyncio + async def test_search_collections_with_query(self, workshop_service, mock_client): + """Should pass search query to API.""" + mock_client.get.return_value = { + "response": {"total": 0, "publishedfiledetails": []} + } + + await workshop_service.search_workshop_collections( + app_id=730, search_query="weapon skins" + ) + + call_args = mock_client.get.call_args + assert call_args[1]["params"]["search_text"] == "weapon skins" + assert call_args[1]["params"]["filetype"] == 2 # Collections only + + @pytest.mark.asyncio + async def test_search_collections_empty_results(self, workshop_service, mock_client): + """Should handle no results gracefully.""" + mock_client.get.return_value = { + "response": {"total": 0, "publishedfiledetails": []} + } + + result = await workshop_service.search_workshop_collections(app_id=99999) + + assert "No Workshop collections found" in result + assert "may not have Workshop collections" in result + + @pytest.mark.asyncio + async def test_search_collections_json_format(self, workshop_service, mock_client): + """Should return JSON when format='json'.""" + mock_client.get.return_value = { + "response": { + "total": 1, + "publishedfiledetails": [ + { + "publishedfileid": "123", + "title": "Test Collection", + "file_description": "A test", + "creator": "76561198000000001", + "creator_name": "Tester", + "subscriptions": 100, + "favorited": 50, + "file_type": 2, + "vote_data": {"score": 0.9, "votes_up": 90, "votes_down": 10}, + "children": [{"publishedfileid": "1"}, {"publishedfileid": "2"}], + "time_created": 1609459200, + "time_updated": 1704067200, + } + ], + } + } + + result = await workshop_service.search_workshop_collections( + app_id=730, format="json" + ) + + data = json.loads(result) + assert data["app_id"] == 730 + assert data["total_results"] == 1 + assert len(data["collections"]) == 1 + assert data["collections"][0]["collection_id"] == "123" + assert data["collections"][0]["name"] == "Test Collection" + assert data["collections"][0]["item_count"] == 2 + + @pytest.mark.asyncio + async def test_search_collections_sort_options(self, workshop_service, mock_client): + """Should map sort_by to correct query_type.""" + mock_client.get.return_value = { + "response": {"total": 0, "publishedfiledetails": []} + } + + for sort_by, expected_type in [ + ("popular", 0), + ("trend", 1), + ("recent", 2), + ("rating", 3), + ]: + await workshop_service.search_workshop_collections( + app_id=730, sort_by=sort_by + ) + call_args = mock_client.get.call_args + assert call_args[1]["params"]["query_type"] == expected_type + + @pytest.mark.asyncio + async def test_search_collections_max_results_limit( + self, workshop_service, mock_client + ): + """Should cap max_results at 50.""" + mock_client.get.return_value = { + "response": {"total": 0, "publishedfiledetails": []} + } + + await workshop_service.search_workshop_collections(app_id=730, max_results=100) + + call_args = mock_client.get.call_args + assert call_args[1]["params"]["numperpage"] == 50 + + @pytest.mark.asyncio + async def test_search_collections_api_error(self, workshop_service, mock_client): + """Should handle API errors gracefully.""" + mock_client.get.side_effect = Exception("API connection failed") + + result = await workshop_service.search_workshop_collections(app_id=730) + + assert "Error" in result + assert "API connection failed" in result + + @pytest.mark.asyncio + async def test_search_collections_handles_string_values( + self, workshop_service, mock_client + ): + """Should handle string numeric values from API.""" + mock_client.get.return_value = { + "response": { + "total": "1", # String + "publishedfiledetails": [ + { + "publishedfileid": "123", + "title": "String Collection", + "subscriptions": "5000", # String + "favorited": "250", # String + "file_type": "2", # String + "vote_data": {"score": "0.95", "votes_up": "95", "votes_down": "5"}, + "children": [{"publishedfileid": "1"}], + "time_created": "1609459200", + "time_updated": "1704067200", + } + ], + } + } + + result = await workshop_service.search_workshop_collections(app_id=730) + + assert "String Collection" in result + assert "5,000" in result # Formatted with comma + + # --- get_workshop_item_details tests --- From 755c69ab31c2221372d646d9ec45d2cf4cd4d7f1 Mon Sep 17 00:00:00 2001 From: CodeKeanu <85296798+CodeKeanu@users.noreply.github.com> Date: Fri, 12 Dec 2025 20:33:27 +0000 Subject: [PATCH 5/6] fix: add tags filter to search_workshop_collections MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Brings search_workshop_collections to feature parity with search_workshop_items - both now support: - app_id (required) - search_query (text search) - tags (array of Workshop tags to filter by) - sort_by (popular/trend/recent/rating) - max_results 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- src/steam_mcp/endpoints/steam_workshop.py | 17 ++++++++++++++++- tests/test_steam_workshop.py | 16 ++++++++++++++++ 2 files changed, 32 insertions(+), 1 deletion(-) diff --git a/src/steam_mcp/endpoints/steam_workshop.py b/src/steam_mcp/endpoints/steam_workshop.py index e635a67..ab1aa08 100644 --- a/src/steam_mcp/endpoints/steam_workshop.py +++ b/src/steam_mcp/endpoints/steam_workshop.py @@ -425,6 +425,12 @@ async def get_workshop_item_details( "description": "Text search filter (optional)", "required": False, }, + "tags": { + "type": "array", + "items": {"type": "string"}, + "description": "Filter by Workshop tags (e.g., 'Maps', 'Weapons', 'Characters')", + "required": False, + }, "sort_by": { "type": "string", "description": "Sort order: 'popular' (most voted), 'trend' (trending), 'recent' (newest), 'rating' (highest rated)", @@ -446,6 +452,7 @@ async def search_workshop_collections( self, app_id: int, search_query: str = "", + tags: list[str] | None = None, sort_by: str = "popular", max_results: int = 10, format: str = "text", @@ -467,6 +474,10 @@ async def search_workshop_collections( if search_query: params["search_text"] = search_query + if tags: + for i, tag in enumerate(tags): + params[f"requiredtags[{i}]"] = tag + try: result = await self.client.get( "IPublishedFileService", @@ -488,7 +499,9 @@ async def search_workshop_collections( msg = f"No Workshop collections found for app {app_id}." if search_query: msg += f" Search query: '{search_query}'" - msg += "\n\nThis game may not have Workshop collections, or no collections match your search." + if tags: + msg += f" Tags: {', '.join(tags)}" + msg += "\n\nThis game may not have Workshop collections, or no collections match your filters." if format == "json": return json.dumps({"error": msg}) return msg @@ -531,6 +544,8 @@ async def search_workshop_collections( if search_query: output.append(f"Search: '{search_query}'") + if tags: + output.append(f"Tags: {', '.join(tags)}") output.append("") for coll in collections: diff --git a/tests/test_steam_workshop.py b/tests/test_steam_workshop.py index b9f0b9e..623b250 100644 --- a/tests/test_steam_workshop.py +++ b/tests/test_steam_workshop.py @@ -244,6 +244,22 @@ async def test_search_collections_with_query(self, workshop_service, mock_client assert call_args[1]["params"]["search_text"] == "weapon skins" assert call_args[1]["params"]["filetype"] == 2 # Collections only + @pytest.mark.asyncio + async def test_search_collections_with_tags(self, workshop_service, mock_client): + """Should pass tags to API.""" + mock_client.get.return_value = { + "response": {"total": 0, "publishedfiledetails": []} + } + + await workshop_service.search_workshop_collections( + app_id=730, tags=["Maps", "Competitive"] + ) + + call_args = mock_client.get.call_args + assert call_args[1]["params"]["requiredtags[0]"] == "Maps" + assert call_args[1]["params"]["requiredtags[1]"] == "Competitive" + assert call_args[1]["params"]["filetype"] == 2 + @pytest.mark.asyncio async def test_search_collections_empty_results(self, workshop_service, mock_client): """Should handle no results gracefully.""" From 2a0506cc43258328571591ae7045d074fdccca5a Mon Sep 17 00:00:00 2001 From: CodeKeanu <85296798+CodeKeanu@users.noreply.github.com> Date: Fri, 12 Dec 2025 20:46:48 +0000 Subject: [PATCH 6/6] fix: use correct filetype for Workshop collections MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Was using filetype=2 (Artwork) instead of filetype=1 (Collections). Per Steamworks API docs, EPublishedFileInfoMatchingFileType values: - 0 = Items (regular mods) - 1 = Collections - 2 = Art - 3 = Videos 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- src/steam_mcp/endpoints/steam_workshop.py | 2 +- tests/test_steam_workshop.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/steam_mcp/endpoints/steam_workshop.py b/src/steam_mcp/endpoints/steam_workshop.py index ab1aa08..b046eca 100644 --- a/src/steam_mcp/endpoints/steam_workshop.py +++ b/src/steam_mcp/endpoints/steam_workshop.py @@ -463,7 +463,7 @@ async def search_workshop_collections( params: dict[str, Any] = { "appid": app_id, "query_type": query_type, - "filetype": 2, # Collections only + "filetype": 1, # Collections (1=Collections, 2=Art, 0=Items) "numperpage": min(max_results, 50), "return_metadata": True, "return_tags": True, diff --git a/tests/test_steam_workshop.py b/tests/test_steam_workshop.py index 623b250..340e2ed 100644 --- a/tests/test_steam_workshop.py +++ b/tests/test_steam_workshop.py @@ -242,7 +242,7 @@ async def test_search_collections_with_query(self, workshop_service, mock_client call_args = mock_client.get.call_args assert call_args[1]["params"]["search_text"] == "weapon skins" - assert call_args[1]["params"]["filetype"] == 2 # Collections only + assert call_args[1]["params"]["filetype"] == 1 # Collections only @pytest.mark.asyncio async def test_search_collections_with_tags(self, workshop_service, mock_client): @@ -258,7 +258,7 @@ async def test_search_collections_with_tags(self, workshop_service, mock_client) call_args = mock_client.get.call_args assert call_args[1]["params"]["requiredtags[0]"] == "Maps" assert call_args[1]["params"]["requiredtags[1]"] == "Competitive" - assert call_args[1]["params"]["filetype"] == 2 + assert call_args[1]["params"]["filetype"] == 1 # Collections only @pytest.mark.asyncio async def test_search_collections_empty_results(self, workshop_service, mock_client):