diff --git a/everyrow-mcp/src/everyrow_mcp/redis_store.py b/everyrow-mcp/src/everyrow_mcp/redis_store.py index 3f2dc528..bb156c90 100644 --- a/everyrow-mcp/src/everyrow_mcp/redis_store.py +++ b/everyrow-mcp/src/everyrow_mcp/redis_store.py @@ -250,12 +250,20 @@ async def pop_task_token(task_id: str) -> None: # ── Poll tokens ─────────────────────────────────────────────── -async def store_poll_token(task_id: str, poll_token: str) -> None: - await get_redis_client().setex( +async def store_poll_token(task_id: str, poll_token: str, user_id: str = "") -> None: + """Store an encrypted poll token, optionally bound to a user identity.""" + client = get_redis_client() + await client.setex( name=build_key("poll_token", task_id), time=TOKEN_TTL, value=encrypt_value(poll_token), ) + if user_id: + await client.setex( + name=build_key("poll_owner", task_id), + time=TOKEN_TTL, + value=user_id, + ) async def get_poll_token(task_id: str) -> str | None: @@ -265,6 +273,11 @@ async def get_poll_token(task_id: str) -> str | None: return decrypt_value(encrypted) +async def get_poll_token_owner(task_id: str) -> str | None: + """Return the user_id bound to a poll token, or None if not set.""" + return await get_redis_client().get(build_key("poll_owner", task_id)) + + # ── Task ownership (user-scoped data isolation) ────────────── diff --git a/everyrow-mcp/src/everyrow_mcp/routes.py b/everyrow-mcp/src/everyrow_mcp/routes.py index d042c810..4f3fcfe0 100644 --- a/everyrow-mcp/src/everyrow_mcp/routes.py +++ b/everyrow-mcp/src/everyrow_mcp/routes.py @@ -73,7 +73,57 @@ async def _validate_poll_token(task_id: str, request: Request) -> JSONResponse | return None -async def api_progress(request: Request) -> Response: +async def _validate_task_owner(task_id: str) -> JSONResponse | None: + """Verify the task has a recorded owner and that the poll token was + issued for the same user. Returns a 403 response if ownership cannot + be verified, or ``None`` if the caller may proceed. + + Fail-closed: tasks without an ownership record are rejected. When a + poll-token owner is recorded, it must match the task owner — this + cross-check detects ownership-record tampering and ensures the poll + token was legitimately issued for this task/user pair. + """ + owner = await redis_store.get_task_owner(task_id) + if not owner: + logger.warning( + "REST access denied for task %s: no owner recorded (fail-closed)", + task_id, + ) + return JSONResponse( + {"error": "Task ownership could not be verified"}, + status_code=403, + headers=_cors_headers(), + ) + + poll_owner = await redis_store.get_poll_token_owner(task_id) + if not poll_owner: + logger.warning( + "REST access denied for task %s: no poll_owner recorded (fail-closed)", + task_id, + ) + return JSONResponse( + {"error": "Task ownership could not be verified"}, + status_code=403, + headers=_cors_headers(), + ) + if poll_owner != owner: + logger.warning( + "REST access denied for task %s: poll_owner=%s != task_owner=%s", + task_id, + poll_owner, + owner, + ) + return JSONResponse( + {"error": "Task ownership could not be verified"}, + status_code=403, + headers=_cors_headers(), + ) + + logger.debug("REST access granted for task %s (owner=%s)", task_id, owner) + return None + + +async def api_progress(request: Request) -> Response: # noqa: PLR0911 """REST endpoint for the session widget to poll task progress.""" cors = _cors_headers() if request.method == "OPTIONS": @@ -90,6 +140,9 @@ async def api_progress(request: Request) -> Response: if err := await _validate_poll_token(task_id, request): return err + if err := await _validate_task_owner(task_id): + return err + api_key = await redis_store.get_task_token(task_id) if not api_key: @@ -143,6 +196,9 @@ async def api_download(request: Request) -> Response: if err := await _validate_poll_token(task_id, request): return err + if err := await _validate_task_owner(task_id): + return err + csv_text = await redis_store.get_result_csv(task_id) if csv_text is None: return JSONResponse( diff --git a/everyrow-mcp/src/everyrow_mcp/tool_helpers.py b/everyrow-mcp/src/everyrow_mcp/tool_helpers.py index af355739..1f18515c 100644 --- a/everyrow-mcp/src/everyrow_mcp/tool_helpers.py +++ b/everyrow-mcp/src/everyrow_mcp/tool_helpers.py @@ -91,19 +91,24 @@ async def _submission_ui_json( """Build JSON for the session MCP App widget, and store the token for polling.""" poll_token = secrets.token_urlsafe(32) await redis_store.store_task_token(task_id, token) - await redis_store.store_poll_token(task_id, poll_token) # Record task owner for cross-user access checks (HTTP mode only). # This MUST succeed — downstream ownership checks deny access when no # owner is recorded, so a silent failure here would lock the user out # of their own task. + user_id = "" if settings.is_http: access_token = get_access_token() if not access_token or not access_token.client_id: raise RuntimeError( f"Cannot record task owner for {task_id}: no authenticated user" ) - await redis_store.store_task_owner(task_id, access_token.client_id) + user_id = access_token.client_id + await redis_store.store_task_owner(task_id, user_id) + + # Bind the poll token to the same user identity so the REST layer + # can cross-check poll_owner == task_owner. + await redis_store.store_poll_token(task_id, poll_token, user_id=user_id) data: dict[str, Any] = { "session_url": session_url, "task_id": task_id, diff --git a/everyrow-mcp/tests/test_http_integration.py b/everyrow-mcp/tests/test_http_integration.py index d72c859e..a4fb4c89 100644 --- a/everyrow-mcp/tests/test_http_integration.py +++ b/everyrow-mcp/tests/test_http_integration.py @@ -132,10 +132,25 @@ async def test_unauthorized_with_wrong_token(self, client: httpx.AsyncClient): assert resp.status_code == 403 @pytest.mark.asyncio - async def test_unknown_task_returns_404(self, client: httpx.AsyncClient): + async def test_denied_without_owner(self, client: httpx.AsyncClient): + """Poll token is valid but no task owner recorded → fail-closed 403.""" task_id = str(uuid4()) poll_token = secrets.token_urlsafe(16) await redis_store.store_poll_token(task_id, poll_token) + # No task owner stored — ownership check should reject + + resp = await client.get( + f"/api/progress/{task_id}", params={"token": poll_token} + ) + assert resp.status_code == 403 + assert resp.json()["error"] == "Task ownership could not be verified" + + @pytest.mark.asyncio + async def test_unknown_task_returns_404(self, client: httpx.AsyncClient): + task_id = str(uuid4()) + poll_token = secrets.token_urlsafe(16) + await redis_store.store_poll_token(task_id, poll_token, user_id="test-user") + await redis_store.store_task_owner(task_id, "test-user") # No task_token stored resp = await client.get( @@ -148,8 +163,9 @@ async def test_unknown_task_returns_404(self, client: httpx.AsyncClient): async def test_running_task_returns_progress(self, client: httpx.AsyncClient): task_id = str(uuid4()) poll_token = secrets.token_urlsafe(16) - await redis_store.store_poll_token(task_id, poll_token) + await redis_store.store_poll_token(task_id, poll_token, user_id="test-user") await redis_store.store_task_token(task_id, "api-key-123") + await redis_store.store_task_owner(task_id, "test-user") status_resp = _make_status_response( status="running", completed=7, total=20, failed=1, running=4 @@ -180,8 +196,9 @@ async def test_running_task_returns_progress(self, client: httpx.AsyncClient): async def test_completed_task_cleans_up_tokens(self, client: httpx.AsyncClient): task_id = str(uuid4()) poll_token = secrets.token_urlsafe(16) - await redis_store.store_poll_token(task_id, poll_token) + await redis_store.store_poll_token(task_id, poll_token, user_id="test-user") await redis_store.store_task_token(task_id, "api-key") + await redis_store.store_task_owner(task_id, "test-user") status_resp = _make_status_response( status="completed", completed=10, total=10, failed=0, running=0 @@ -207,8 +224,9 @@ async def test_completed_task_cleans_up_tokens(self, client: httpx.AsyncClient): async def test_api_error_returns_500(self, client: httpx.AsyncClient): task_id = str(uuid4()) poll_token = secrets.token_urlsafe(16) - await redis_store.store_poll_token(task_id, poll_token) + await redis_store.store_poll_token(task_id, poll_token, user_id="test-user") await redis_store.store_task_token(task_id, "api-key") + await redis_store.store_task_owner(task_id, "test-user") with patch( "everyrow_mcp.routes.get_task_status_tasks_task_id_status_get.asyncio", @@ -237,8 +255,9 @@ async def test_progress_lifecycle(self, client: httpx.AsyncClient): poll_token = secrets.token_urlsafe(16) # 1. Store tokens (simulating what everyrow_agent does) - await redis_store.store_poll_token(task_id, poll_token) + await redis_store.store_poll_token(task_id, poll_token, user_id="test-user") await redis_store.store_task_token(task_id, "api-key-for-polling") + await redis_store.store_task_owner(task_id, "test-user") # 2. Poll progress — task is running running_resp = _make_status_response(status="running", completed=1, total=3) diff --git a/everyrow-mcp/tests/test_result_store.py b/everyrow-mcp/tests/test_result_store.py index 7c26af02..d2f66bea 100644 --- a/everyrow-mcp/tests/test_result_store.py +++ b/everyrow-mcp/tests/test_result_store.py @@ -382,8 +382,9 @@ async def test_valid_download(self, client: httpx.AsyncClient): poll_token = secrets.token_urlsafe(16) csv_text = "name,score\nAlice,95\nBob,87\n" - await redis_store.store_poll_token(task_id, poll_token) + await redis_store.store_poll_token(task_id, poll_token, user_id="test-user") await redis_store.store_result_csv(task_id, csv_text) + await redis_store.store_task_owner(task_id, "test-user") resp = await client.get( f"/api/results/{task_id}/download", params={"token": poll_token} @@ -407,11 +408,28 @@ async def test_bad_token_returns_403(self, client: httpx.AsyncClient): assert resp.status_code == 403 @pytest.mark.asyncio - async def test_missing_csv_returns_404(self, client: httpx.AsyncClient): + async def test_denied_without_owner(self, client: httpx.AsyncClient): + """Valid poll token but no task owner → fail-closed 403.""" task_id = str(uuid4()) poll_token = secrets.token_urlsafe(16) await redis_store.store_poll_token(task_id, poll_token) + await redis_store.store_result_csv(task_id, "data") + # No task owner stored + + resp = await client.get( + f"/api/results/{task_id}/download", params={"token": poll_token} + ) + assert resp.status_code == 403 + assert resp.json()["error"] == "Task ownership could not be verified" + + @pytest.mark.asyncio + async def test_missing_csv_returns_404(self, client: httpx.AsyncClient): + task_id = str(uuid4()) + poll_token = secrets.token_urlsafe(16) + + await redis_store.store_poll_token(task_id, poll_token, user_id="test-user") + await redis_store.store_task_owner(task_id, "test-user") # No CSV stored resp = await client.get( diff --git a/everyrow-mcp/tests/test_routes.py b/everyrow-mcp/tests/test_routes.py index 2dd25723..a31659e3 100644 --- a/everyrow-mcp/tests/test_routes.py +++ b/everyrow-mcp/tests/test_routes.py @@ -111,10 +111,62 @@ async def test_missing_poll_token_returns_403(self): assert resp.status_code == 403 @pytest.mark.asyncio - async def test_missing_task_token_returns_404(self): + async def test_denied_without_owner(self): + """Valid poll token but no task owner → fail-closed 403.""" task_id = str(uuid4()) poll_token = secrets.token_urlsafe(16) await redis_store.store_poll_token(task_id, poll_token) + # No task owner stored + + req = FakeRequest( + path_params={"task_id": task_id}, + headers={"authorization": f"Bearer {poll_token}"}, + ) + resp = await api_progress(req) # pyright: ignore[reportArgumentType] + assert resp.status_code == 403 + body = json.loads(bytes(resp.body).decode()) + assert body["error"] == "Task ownership could not be verified" + + @pytest.mark.asyncio + async def test_denied_without_poll_owner(self): + """Task owner exists but poll token has no bound user → fail-closed 403.""" + task_id = str(uuid4()) + poll_token = secrets.token_urlsafe(16) + await redis_store.store_poll_token(task_id, poll_token) # no user_id + await redis_store.store_task_owner(task_id, "test-user") + + req = FakeRequest( + path_params={"task_id": task_id}, + headers={"authorization": f"Bearer {poll_token}"}, + ) + resp = await api_progress(req) # pyright: ignore[reportArgumentType] + assert resp.status_code == 403 + body = json.loads(bytes(resp.body).decode()) + assert body["error"] == "Task ownership could not be verified" + + @pytest.mark.asyncio + async def test_denied_on_owner_mismatch(self): + """Poll token bound to user-A but task_owner tampered to user-B → 403.""" + task_id = str(uuid4()) + poll_token = secrets.token_urlsafe(16) + await redis_store.store_poll_token(task_id, poll_token, user_id="user-A") + await redis_store.store_task_owner(task_id, "user-B") + + req = FakeRequest( + path_params={"task_id": task_id}, + headers={"authorization": f"Bearer {poll_token}"}, + ) + resp = await api_progress(req) # pyright: ignore[reportArgumentType] + assert resp.status_code == 403 + body = json.loads(bytes(resp.body).decode()) + assert body["error"] == "Task ownership could not be verified" + + @pytest.mark.asyncio + async def test_missing_task_token_returns_404(self): + task_id = str(uuid4()) + poll_token = secrets.token_urlsafe(16) + await redis_store.store_poll_token(task_id, poll_token, user_id="test-user") + await redis_store.store_task_owner(task_id, "test-user") # No task token stored req = FakeRequest( @@ -131,8 +183,9 @@ async def test_valid_progress_via_auth_header(self): """Poll token sent via Authorization: Bearer header works.""" task_id = str(uuid4()) poll_token = secrets.token_urlsafe(16) - await redis_store.store_poll_token(task_id, poll_token) + await redis_store.store_poll_token(task_id, poll_token, user_id="test-user") await redis_store.store_task_token(task_id, "api-key-123") + await redis_store.store_task_owner(task_id, "test-user") status_resp = _make_status_response( status="running", completed=3, total=10, failed=1, running=2 @@ -166,8 +219,9 @@ async def test_backward_compat_query_param_for_download(self): """Poll token via ?token= query param still works (for download links).""" task_id = str(uuid4()) poll_token = secrets.token_urlsafe(16) - await redis_store.store_poll_token(task_id, poll_token) + await redis_store.store_poll_token(task_id, poll_token, user_id="test-user") await redis_store.store_task_token(task_id, "api-key-123") + await redis_store.store_task_owner(task_id, "test-user") status_resp = _make_status_response( status="running", completed=3, total=10, failed=1, running=2 @@ -200,8 +254,9 @@ async def test_backward_compat_query_param_for_download(self): async def test_completed_task_pops_tokens(self): task_id = str(uuid4()) poll_token = secrets.token_urlsafe(16) - await redis_store.store_poll_token(task_id, poll_token) + await redis_store.store_poll_token(task_id, poll_token, user_id="test-user") await redis_store.store_task_token(task_id, "api-key") + await redis_store.store_task_owner(task_id, "test-user") status_resp = _make_status_response(status="completed", completed=10, total=10) @@ -229,8 +284,9 @@ async def test_completed_task_pops_tokens(self): async def test_api_error_returns_500(self): task_id = str(uuid4()) poll_token = secrets.token_urlsafe(16) - await redis_store.store_poll_token(task_id, poll_token) + await redis_store.store_poll_token(task_id, poll_token, user_id="test-user") await redis_store.store_task_token(task_id, "api-key") + await redis_store.store_task_owner(task_id, "test-user") req = FakeRequest( path_params={"task_id": task_id},