diff --git a/lib/lightning/accounts/user.ex b/lib/lightning/accounts/user.ex index 98a964f69db..f7670cc2947 100644 --- a/lib/lightning/accounts/user.ex +++ b/lib/lightning/accounts/user.ex @@ -53,6 +53,29 @@ defmodule Lightning.Accounts.User do timestamps() end + @doc """ + Returns true when the user's email is on the @openfn.org domain. + + Used to derive Langfuse tracking flags (metrics_opt_in / persona) when + making AI chat calls to Apollo. + """ + @spec core_contributor?(t()) :: boolean() + def core_contributor?(%__MODULE__{email: email}) when is_binary(email) do + email |> String.downcase() |> String.ends_with?("@openfn.org") + end + + def core_contributor?(_), do: false + + @doc """ + Returns the Langfuse persona string for a user. + + `"core-contributor"` for @openfn.org users, `"user"` otherwise. + """ + @spec langfuse_persona(t()) :: String.t() + def langfuse_persona(%__MODULE__{} = user) do + if core_contributor?(user), do: "core-contributor", else: "user" + end + def changeset(user, attrs) do user |> cast(attrs, [ diff --git a/lib/lightning/ai_assistant/ai_assistant.ex b/lib/lightning/ai_assistant/ai_assistant.ex index 9dc22b41d19..b95886e1a4f 100644 --- a/lib/lightning/ai_assistant/ai_assistant.ex +++ b/lib/lightning/ai_assistant/ai_assistant.ex @@ -919,13 +919,14 @@ defmodule Lightning.AiAssistant do context = build_context(initial_context, opts) history = build_history(session) - meta = session.meta || %{} + {meta, metrics_opt_in} = apollo_meta(session) ApolloClient.job_chat( content, context: context, history: history, - meta: meta + meta: meta, + metrics_opt_in: metrics_opt_in ) |> handle_ai_response(session, &build_job_message/1) end @@ -950,12 +951,13 @@ defmodule Lightning.AiAssistant do context = build_context(initial_context, opts) history = build_history(session) - meta = session.meta || %{} + {meta, metrics_opt_in} = apollo_meta(session) case ApolloClient.job_chat_stream(content, context: context, history: history, - meta: meta + meta: meta, + metrics_opt_in: metrics_opt_in ) do {:ok, %Tesla.Env{status: status, body: body}} when status in @success_status_range -> @@ -990,6 +992,21 @@ defmodule Lightning.AiAssistant do end) end + defp apollo_meta(session) do + session = Repo.preload(session, :user) + user = session.user + + meta = + (session.meta || %{}) + |> Map.put("session_id", session.id) + |> Map.put("user", %{ + "id" => user.id, + "persona" => User.langfuse_persona(user) + }) + + {meta, User.core_contributor?(user)} + end + @doc """ Queries the AI service for workflow template generation. @@ -1003,7 +1020,6 @@ defmodule Lightning.AiAssistant do - `opts` - Keyword list of options: - `:code` - Current YAML to modify (default: uses latest from session) - `:errors` - Validation errors from previous workflow attempts - - `:meta` - Additional metadata to pass to the AI service (default: session.meta) ## Returns @@ -1015,16 +1031,18 @@ defmodule Lightning.AiAssistant do def query_workflow(session, content, opts \\ []) do code = Keyword.get(opts, :code) errors = Keyword.get(opts, :errors) - meta = Keyword.get(opts, :meta, session.meta || %{}) Logger.metadata(prompt_size: byte_size(content), session_id: session.id) + {meta, metrics_opt_in} = apollo_meta(session) + ApolloClient.workflow_chat( content, code: code, errors: errors, history: build_history(session), - meta: meta + meta: meta, + metrics_opt_in: metrics_opt_in ) |> handle_ai_response(session, &build_workflow_message/1) end @@ -1039,15 +1057,17 @@ defmodule Lightning.AiAssistant do def query_workflow_stream(session, content, opts \\ []) do code = Keyword.get(opts, :code) errors = Keyword.get(opts, :errors) - meta = Keyword.get(opts, :meta, session.meta || %{}) Logger.metadata(prompt_size: byte_size(content), session_id: session.id) + {meta, metrics_opt_in} = apollo_meta(session) + case ApolloClient.workflow_chat_stream(content, code: code, errors: errors, history: build_history(session), - meta: meta + meta: meta, + metrics_opt_in: metrics_opt_in ) do {:ok, %Tesla.Env{status: status, body: body}} when status in @success_status_range -> @@ -1073,10 +1093,14 @@ defmodule Lightning.AiAssistant do Logger.metadata(prompt_size: byte_size(content), session_id: session.id) + {meta, metrics_opt_in} = apollo_meta(session) + case ApolloClient.global_chat_stream(content, workflow_yaml: workflow_yaml, page: page, - history: history + history: history, + meta: meta, + metrics_opt_in: metrics_opt_in ) do {:ok, %Tesla.Env{status: status, body: body}} when status in @success_status_range -> diff --git a/lib/lightning/apollo_client.ex b/lib/lightning/apollo_client.ex index e5be654a9b6..1465862f220 100644 --- a/lib/lightning/apollo_client.ex +++ b/lib/lightning/apollo_client.ex @@ -69,6 +69,8 @@ defmodule Lightning.ApolloClient do - `:context` - Job context including expression code and adaptor info (default: %{}) - `:history` - Previous conversation messages for context (default: []) - `:meta` - Additional metadata like session IDs or user preferences (default: %{}) + - `:metrics_opt_in` - Optional boolean enabling Langfuse metrics tracking + on the Apollo side. Omitted from the wire payload when not supplied. ## Returns @@ -82,15 +84,20 @@ defmodule Lightning.ApolloClient do context = Keyword.get(opts, :context, %{}) history = Keyword.get(opts, :history, []) meta = Keyword.get(opts, :meta, %{}) + metrics_opt_in = Keyword.get(opts, :metrics_opt_in) - payload = %{ - "api_key" => Lightning.Config.apollo(:ai_assistant_api_key), - "content" => content, - "context" => context, - "history" => history, - "meta" => meta, - "suggest_code" => true - } + payload = + %{ + "api_key" => Lightning.Config.apollo(:ai_assistant_api_key), + "content" => content, + "context" => context, + "history" => history, + "meta" => meta, + "suggest_code" => true, + "metrics_opt_in" => metrics_opt_in + } + |> Enum.reject(fn {_, v} -> is_nil(v) end) + |> Enum.into(%{}) client() |> Tesla.post("/services/job_chat", payload) @@ -111,6 +118,8 @@ defmodule Lightning.ApolloClient do - `:errors` - Optional validation errors from previous workflow attempts - `:history` - Previous conversation messages for context (default: []) - `:meta` - Additional metadata (default: %{}) + - `:metrics_opt_in` - Optional boolean enabling Langfuse metrics tracking + on the Apollo side. Omitted from the wire payload when not supplied. ## Returns @@ -125,6 +134,7 @@ defmodule Lightning.ApolloClient do errors = Keyword.get(opts, :errors) history = Keyword.get(opts, :history, []) meta = Keyword.get(opts, :meta, %{}) + metrics_opt_in = Keyword.get(opts, :metrics_opt_in) payload = %{ @@ -133,7 +143,8 @@ defmodule Lightning.ApolloClient do "existing_yaml" => code, "errors" => errors, "history" => history, - "meta" => meta + "meta" => meta, + "metrics_opt_in" => metrics_opt_in } |> Enum.reject(fn {_, v} -> is_nil(v) end) |> Enum.into(%{}) @@ -151,23 +162,34 @@ defmodule Lightning.ApolloClient do The stream emits Anthropic-formatted events (`content_block_delta`, etc.) followed by a final `complete` event containing the full response payload (same shape as the synchronous `job_chat/2` response). + + ## Options + + Accepts the same options as `job_chat/2`, including `:metrics_opt_in` which, + when supplied, enables Langfuse metrics tracking on the Apollo side. Omitted + from the wire payload when not supplied. """ @spec job_chat_stream(String.t(), opts()) :: Tesla.Env.result() def job_chat_stream(content, opts \\ []) do context = Keyword.get(opts, :context, %{}) history = Keyword.get(opts, :history, []) meta = Keyword.get(opts, :meta, %{}) + metrics_opt_in = Keyword.get(opts, :metrics_opt_in) payload = - Jason.encode!(%{ + %{ "api_key" => Lightning.Config.apollo(:ai_assistant_api_key), "content" => content, "context" => context, "history" => history, "meta" => meta, "suggest_code" => true, - "stream" => true - }) + "stream" => true, + "metrics_opt_in" => metrics_opt_in + } + |> Enum.reject(fn {_, v} -> is_nil(v) end) + |> Enum.into(%{}) + |> Jason.encode!() stream_client() |> Tesla.post("/services/job_chat/stream", payload, @@ -181,6 +203,12 @@ defmodule Lightning.ApolloClient do Same as `workflow_chat/2` but connects to Apollo's streaming endpoint. See `job_chat_stream/2` for details on the stream format. + + ## Options + + Accepts the same options as `workflow_chat/2`, including `:metrics_opt_in` + which, when supplied, enables Langfuse metrics tracking on the Apollo side. + Omitted from the wire payload when not supplied. """ @spec workflow_chat_stream(String.t(), opts()) :: Tesla.Env.result() def workflow_chat_stream(content, opts \\ []) do @@ -188,6 +216,7 @@ defmodule Lightning.ApolloClient do errors = Keyword.get(opts, :errors) history = Keyword.get(opts, :history, []) meta = Keyword.get(opts, :meta, %{}) + metrics_opt_in = Keyword.get(opts, :metrics_opt_in) payload = %{ @@ -197,7 +226,8 @@ defmodule Lightning.ApolloClient do "errors" => errors, "history" => history, "meta" => meta, - "stream" => true + "stream" => true, + "metrics_opt_in" => metrics_opt_in } |> Enum.reject(fn {_, v} -> is_nil(v) end) |> Enum.into(%{}) @@ -225,12 +255,18 @@ defmodule Lightning.ApolloClient do - `:workflow_yaml` - Full workflow YAML including job bodies (optional) - `:page` - Current page URL for routing (optional) - `:history` - Previous conversation messages (default: []) + - `:meta` - Optional metadata map (e.g. session IDs, Langfuse keys). + Omitted from the wire payload when not supplied. + - `:metrics_opt_in` - Optional boolean enabling Langfuse metrics tracking + on the Apollo side. Omitted from the wire payload when not supplied. """ @spec global_chat_stream(String.t(), opts()) :: Tesla.Env.result() def global_chat_stream(content, opts \\ []) do workflow_yaml = Keyword.get(opts, :workflow_yaml) page = Keyword.get(opts, :page) history = Keyword.get(opts, :history, []) + meta = Keyword.get(opts, :meta) + metrics_opt_in = Keyword.get(opts, :metrics_opt_in) payload = %{ @@ -239,6 +275,8 @@ defmodule Lightning.ApolloClient do "workflow_yaml" => workflow_yaml, "page" => page, "history" => history, + "meta" => meta, + "metrics_opt_in" => metrics_opt_in, "options" => %{"stream" => true} } |> Enum.reject(fn {_, v} -> is_nil(v) end) diff --git a/test/lightning/accounts/user_test.exs b/test/lightning/accounts/user_test.exs index 2329b0acd6c..3ad72b74a00 100644 --- a/test/lightning/accounts/user_test.exs +++ b/test/lightning/accounts/user_test.exs @@ -483,4 +483,34 @@ defmodule Lightning.Accounts.UserTest do :superuser end end + + describe "core_contributor?/1" do + test "true for @openfn.org email" do + assert User.core_contributor?(%User{email: "alice@openfn.org"}) + end + + test "case-insensitive" do + assert User.core_contributor?(%User{email: "Bob@OpenFN.ORG"}) + end + + test "false for other domains" do + refute User.core_contributor?(%User{email: "alice@example.com"}) + end + + test "false for nil/empty email" do + refute User.core_contributor?(%User{email: nil}) + refute User.core_contributor?(%User{email: ""}) + end + end + + describe "langfuse_persona/1" do + test "core-contributor for @openfn user" do + assert User.langfuse_persona(%User{email: "x@openfn.org"}) == + "core-contributor" + end + + test "user for everyone else" do + assert User.langfuse_persona(%User{email: "x@example.com"}) == "user" + end + end end diff --git a/test/lightning/ai_assistant/ai_assistant_test.exs b/test/lightning/ai_assistant/ai_assistant_test.exs index 46ecf5b64e7..d57b1e3f158 100644 --- a/test/lightning/ai_assistant/ai_assistant_test.exs +++ b/test/lightning/ai_assistant/ai_assistant_test.exs @@ -479,6 +479,187 @@ defmodule Lightning.AiAssistantTest do end end + describe "query/3 — langfuse metadata" do + setup do + Mox.stub(Lightning.MockConfig, :apollo, fn key -> + case key do + :endpoint -> "http://localhost:3000" + :ai_assistant_api_key -> "api_key" + :timeout -> 5_000 + :streaming_timeout -> 120_000 + end + end) + + :ok + end + + test "core-contributor user sends metrics_opt_in true and persona core-contributor", + %{workflow: %{jobs: [job_1 | _]}} do + user = insert(:user, email: "alice@openfn.org") + session = insert(:chat_session, user: user, job: job_1, meta: %{}) + + expect(Lightning.Tesla.Mock, :call, fn %{method: :post, body: body}, + _opts -> + decoded = Jason.decode!(body) + + assert decoded["metrics_opt_in"] == true + assert decoded["meta"]["session_id"] == session.id + + assert decoded["meta"]["user"] == %{ + "id" => user.id, + "persona" => "core-contributor" + } + + {:ok, + %Tesla.Env{ + status: 200, + body: %{ + "history" => [ + %{"role" => "user", "content" => "hi"}, + %{"role" => "assistant", "content" => "hello"} + ] + } + }} + end) + + assert {:ok, _updated_session} = AiAssistant.query(session, "hi") + end + + test "non-openfn user sends metrics_opt_in false and persona user", + %{workflow: %{jobs: [job_1 | _]}} do + user = insert(:user, email: "ext@example.com") + session = insert(:chat_session, user: user, job: job_1, meta: %{}) + + expect(Lightning.Tesla.Mock, :call, fn %{method: :post, body: body}, + _opts -> + decoded = Jason.decode!(body) + + assert decoded["metrics_opt_in"] == false + assert decoded["meta"]["user"]["persona"] == "user" + assert decoded["meta"]["user"]["id"] == user.id + + {:ok, + %Tesla.Env{ + status: 200, + body: %{ + "history" => [ + %{"role" => "user", "content" => "hi"}, + %{"role" => "assistant", "content" => "hello"} + ] + } + }} + end) + + assert {:ok, _updated_session} = AiAssistant.query(session, "hi") + end + + test "preserves existing session.meta keys (rag echo regression guard)", + %{workflow: %{jobs: [job_1 | _]}} do + user = insert(:user, email: "alice@openfn.org") + + session = + insert(:chat_session, + user: user, + job: job_1, + meta: %{"rag" => %{"search_results" => ["x"]}} + ) + + expect(Lightning.Tesla.Mock, :call, fn %{method: :post, body: body}, + _opts -> + decoded = Jason.decode!(body) + + assert decoded["meta"]["rag"] == %{"search_results" => ["x"]} + assert decoded["meta"]["session_id"] == session.id + + {:ok, + %Tesla.Env{ + status: 200, + body: %{ + "history" => [ + %{"role" => "user", "content" => "hi"}, + %{"role" => "assistant", "content" => "hello"} + ] + } + }} + end) + + assert {:ok, _updated_session} = AiAssistant.query(session, "hi") + end + end + + describe "query_global_stream/3 — langfuse metadata" do + setup do + Mox.stub(Lightning.MockConfig, :apollo, fn key -> + case key do + :endpoint -> "http://localhost:3000" + :ai_assistant_api_key -> "api_key" + :timeout -> 5_000 + :streaming_timeout -> 120_000 + end + end) + + :ok + end + + test "forwards meta and metrics_opt_in to global_chat_stream", %{ + project: project, + workflow: workflow + } do + user = insert(:user, email: "alice@openfn.org") + + session = + insert(:chat_session, + user: user, + project: project, + workflow: workflow, + session_type: "workflow_template", + meta: %{"rag" => %{"search_results" => ["y"]}}, + messages: [ + %{ + role: :user, + content: "help", + user: user, + status: :pending, + inserted_at: DateTime.utc_now() |> DateTime.add(-1) + } + ] + ) + + complete_payload = + Jason.encode!(%{ + "response" => "ok", + "attachments" => [], + "usage" => %{} + }) + + sse_stream = [%{event: "complete", data: complete_payload}] + + expect(Lightning.Tesla.Mock, :call, fn %{ + method: :post, + url: url, + body: body + }, + _opts -> + assert url =~ "/services/global_chat/stream" + decoded = Jason.decode!(body) + + assert decoded["metrics_opt_in"] == true + assert decoded["meta"]["session_id"] == session.id + assert decoded["meta"]["rag"] == %{"search_results" => ["y"]} + + assert decoded["meta"]["user"] == %{ + "id" => user.id, + "persona" => "core-contributor" + } + + {:ok, %Tesla.Env{status: 200, body: sse_stream}} + end) + + assert {:ok, _updated_session} = + AiAssistant.query_global_stream(session, "help") + end + end + describe "create_session/4" do test "creates a new session", %{ user: user, diff --git a/test/lightning/apollo_client_test.exs b/test/lightning/apollo_client_test.exs index 8fc45c8cf60..c53c889e2fc 100644 --- a/test/lightning/apollo_client_test.exs +++ b/test/lightning/apollo_client_test.exs @@ -243,6 +243,35 @@ defmodule Lightning.ApolloClientTest do {:error, :timeout} = ApolloClient.job_chat("test") end + + test "sends metrics_opt_in and meta with langfuse keys" do + stub_apollo_config() + + meta = %{ + "session_id" => "sess-1", + "user" => %{"id" => "u-1", "persona" => "core-contributor"} + } + + expect(Lightning.Tesla.Mock, :call, fn env, _opts -> + decoded = Jason.decode!(env.body) + assert decoded["metrics_opt_in"] == true + assert decoded["meta"] == meta + {:ok, %Tesla.Env{status: 200, body: %{}}} + end) + + {:ok, _} = ApolloClient.job_chat("hi", meta: meta, metrics_opt_in: true) + end + + test "omits metrics_opt_in when not given" do + stub_apollo_config() + + expect(Lightning.Tesla.Mock, :call, fn env, _opts -> + refute Map.has_key?(Jason.decode!(env.body), "metrics_opt_in") + {:ok, %Tesla.Env{status: 200, body: %{}}} + end) + + {:ok, _} = ApolloClient.job_chat("hi") + end end describe "workflow_chat/5" do @@ -437,6 +466,36 @@ defmodule Lightning.ApolloClientTest do {:ok, response} = ApolloClient.workflow_chat("test") assert response.status == 503 end + + test "sends metrics_opt_in and meta with langfuse keys" do + stub_apollo_config() + + meta = %{ + "session_id" => "sess-2", + "user" => %{"id" => "u-2", "persona" => "core-contributor"} + } + + expect(Lightning.Tesla.Mock, :call, fn env, _opts -> + decoded = Jason.decode!(env.body) + assert decoded["metrics_opt_in"] == true + assert decoded["meta"] == meta + {:ok, %Tesla.Env{status: 200, body: %{}}} + end) + + {:ok, _} = + ApolloClient.workflow_chat("hi", meta: meta, metrics_opt_in: true) + end + + test "omits metrics_opt_in when not given" do + stub_apollo_config() + + expect(Lightning.Tesla.Mock, :call, fn env, _opts -> + refute Map.has_key?(Jason.decode!(env.body), "metrics_opt_in") + {:ok, %Tesla.Env{status: 200, body: %{}}} + end) + + {:ok, _} = ApolloClient.workflow_chat("hi") + end end describe "test/0" do @@ -711,6 +770,36 @@ defmodule Lightning.ApolloClientTest do assert {:error, :timeout} = ApolloClient.job_chat_stream("test") end + + test "sends metrics_opt_in and meta with langfuse keys" do + stub_apollo_config() + + meta = %{ + "session_id" => "sess-3", + "user" => %{"id" => "u-3", "persona" => "core-contributor"} + } + + expect(Lightning.Tesla.Mock, :call, fn env, _opts -> + decoded = Jason.decode!(env.body) + assert decoded["metrics_opt_in"] == true + assert decoded["meta"] == meta + {:ok, %Tesla.Env{status: 200, body: ""}} + end) + + {:ok, _} = + ApolloClient.job_chat_stream("hi", meta: meta, metrics_opt_in: true) + end + + test "omits metrics_opt_in when not given" do + stub_apollo_config() + + expect(Lightning.Tesla.Mock, :call, fn env, _opts -> + refute Map.has_key?(Jason.decode!(env.body), "metrics_opt_in") + {:ok, %Tesla.Env{status: 200, body: ""}} + end) + + {:ok, _} = ApolloClient.job_chat_stream("hi") + end end describe "workflow_chat_stream/2" do @@ -765,6 +854,39 @@ defmodule Lightning.ApolloClientTest do assert {:error, :econnrefused} = ApolloClient.workflow_chat_stream("test") end + + test "sends metrics_opt_in and meta with langfuse keys" do + stub_apollo_config() + + meta = %{ + "session_id" => "sess-4", + "user" => %{"id" => "u-4", "persona" => "core-contributor"} + } + + expect(Lightning.Tesla.Mock, :call, fn env, _opts -> + decoded = Jason.decode!(env.body) + assert decoded["metrics_opt_in"] == true + assert decoded["meta"] == meta + {:ok, %Tesla.Env{status: 200, body: ""}} + end) + + {:ok, _} = + ApolloClient.workflow_chat_stream("hi", + meta: meta, + metrics_opt_in: true + ) + end + + test "omits metrics_opt_in when not given" do + stub_apollo_config() + + expect(Lightning.Tesla.Mock, :call, fn env, _opts -> + refute Map.has_key?(Jason.decode!(env.body), "metrics_opt_in") + {:ok, %Tesla.Env{status: 200, body: ""}} + end) + + {:ok, _} = ApolloClient.workflow_chat_stream("hi") + end end describe "global_chat_stream/2" do @@ -824,6 +946,52 @@ defmodule Lightning.ApolloClientTest do assert {:error, :timeout} = ApolloClient.global_chat_stream("test") end + + test "sends metrics_opt_in and meta with langfuse keys" do + stub_apollo_config() + + meta = %{ + "session_id" => "sess-5", + "user" => %{"id" => "u-5", "persona" => "core-contributor"} + } + + expect(Lightning.Tesla.Mock, :call, fn env, _opts -> + decoded = Jason.decode!(env.body) + assert decoded["metrics_opt_in"] == true + assert decoded["meta"] == meta + {:ok, %Tesla.Env{status: 200, body: ""}} + end) + + {:ok, _} = + ApolloClient.global_chat_stream("hi", + meta: meta, + metrics_opt_in: true + ) + end + + test "forwards meta when supplied" do + stub_apollo_config() + + meta = %{"session_id" => "sess-2"} + + expect(Lightning.Tesla.Mock, :call, fn env, _opts -> + assert Jason.decode!(env.body)["meta"] == meta + {:ok, %Tesla.Env{status: 200, body: ""}} + end) + + {:ok, _} = ApolloClient.global_chat_stream("hi", meta: meta) + end + + test "omits metrics_opt_in when not given" do + stub_apollo_config() + + expect(Lightning.Tesla.Mock, :call, fn env, _opts -> + refute Map.has_key?(Jason.decode!(env.body), "metrics_opt_in") + {:ok, %Tesla.Env{status: 200, body: ""}} + end) + + {:ok, _} = ApolloClient.global_chat_stream("hi") + end end # Private helper function to stub Apollo configuration