From ee653fa93ac2f29a1eb4218c432ab9df1346ee62 Mon Sep 17 00:00:00 2001 From: Eric Fleming Date: Tue, 28 Apr 2026 11:52:56 -0400 Subject: [PATCH] fix: avoid AlreadySentError on duplicate SSE GET When a second `GET /mcp` arrives for an existing session, `maybe_track_session_stream/1` correctly responds with HTTP 409 and a JSON-RPC `-32000` error. However, the conn was already sent and halted at that point, so the surrounding `dispatch/2` clause then tried to call `put_resp_header` and `send_chunked` on the same conn, raising `Plug.Conn.AlreadySentError`. Branch on `conn.halted` after `maybe_track_session_stream/1` so the 409 response is returned as-is and SSE setup is skipped. Adds a regression test that performs two consecutive GETs on the same session id and asserts the second returns a clean 409 with the expected JSON-RPC error envelope. --- CHANGELOG.md | 6 ++++++ lib/phantom/plug.ex | 25 +++++++++++++++---------- test/phantom/plug_test.exs | 22 ++++++++++++++++++++++ 3 files changed, 43 insertions(+), 10 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6ca26ab..bb96624 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,9 @@ +## Unreleased + +- Fix `Plug.Conn.AlreadySentError` when a second SSE GET arrives for an + existing session. The conflict response (`409 -32000`) is now returned + cleanly without attempting to write streaming headers on the sent conn. + ## 0.4.3 (2026-04-03) - Fix invalid response when client request an invalid resource_uri diff --git a/lib/phantom/plug.ex b/lib/phantom/plug.ex index be0c934..e0ba866 100644 --- a/lib/phantom/plug.ex +++ b/lib/phantom/plug.ex @@ -278,17 +278,22 @@ defmodule Phantom.Plug do defp dispatch(%Plug.Conn{method: "GET"} = conn, opts) do if opts.pubsub do - conn = maybe_track_session_stream(conn) - session = conn.private.phantom.session + case maybe_track_session_stream(conn) do + %Plug.Conn{halted: true} = conn -> + conn - conn - |> put_resp_header("mcp-session-id", session.id) - |> put_resp_header("cache-control", "no-cache, no-transform") - |> put_resp_content_type("text/event-stream") - |> put_resp_header("connection", "keep-alive") - |> put_resp_header("x-accel-buffering", "no") - |> send_chunked(202) - |> stream_loop(opts) + conn -> + session = conn.private.phantom.session + + conn + |> put_resp_header("mcp-session-id", session.id) + |> put_resp_header("cache-control", "no-cache, no-transform") + |> put_resp_content_type("text/event-stream") + |> put_resp_header("connection", "keep-alive") + |> put_resp_header("x-accel-buffering", "no") + |> send_chunked(202) + |> stream_loop(opts) + end else conn |> put_status(405) diff --git a/test/phantom/plug_test.exs b/test/phantom/plug_test.exs index fcaca1a..8b4417c 100644 --- a/test/phantom/plug_test.exs +++ b/test/phantom/plug_test.exs @@ -299,6 +299,28 @@ defmodule Phantom.PlugTest do assert_sse_connected() assert Phantom.Tracker.list_sessions() != [] end + + test "second GET on same session returns 409 without raising AlreadySentError" do + session_id = "019dd3d8-0000-0000-0000-000000000001" + + :get + |> conn("/mcp") + |> put_req_header("accept", "text/event-stream") + |> call(session_id: session_id) + + assert_sse_connected() + + :get + |> conn("/mcp") + |> put_req_header("accept", "text/event-stream") + |> call(session_id: session_id) + + assert_receive {:conn, conn} + assert conn.status == 409 + error = JSON.decode!(conn.resp_body) + assert error["error"]["code"] == -32000 + assert error["error"]["message"] == "Only one SSE stream is allowed per session" + end end test "handles prompt responses" do