Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions lib/lightning/accounts/user.ex
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,29 @@ defmodule Lightning.Accounts.User do
timestamps()
end

@doc """
Returns true when the user's email is on the @openfn.org domain.

Used to derive Langfuse tracking flags (metrics_opt_in / persona) when
making AI chat calls to Apollo.
"""
@spec core_contributor?(t()) :: boolean()
def core_contributor?(%__MODULE__{email: email}) when is_binary(email) do
email |> String.downcase() |> String.ends_with?("@openfn.org")
end

def core_contributor?(_), do: false

@doc """
Returns the Langfuse persona string for a user.

`"core-contributor"` for @openfn.org users, `"user"` otherwise.
"""
@spec langfuse_persona(t()) :: String.t()
def langfuse_persona(%__MODULE__{} = user) do
if core_contributor?(user), do: "core-contributor", else: "user"
end

def changeset(user, attrs) do
user
|> cast(attrs, [
Expand Down
38 changes: 28 additions & 10 deletions lib/lightning/ai_assistant/ai_assistant.ex
Original file line number Diff line number Diff line change
Expand Up @@ -919,13 +919,13 @@ defmodule Lightning.AiAssistant do
context = build_context(initial_context, opts)

history = build_history(session)
meta = session.meta || %{}

ApolloClient.job_chat(
content,
context: context,
history: history,
meta: meta
meta: apollo_meta(session),
metrics_opt_in: metrics_opt_in?(session)
)
|> handle_ai_response(session, &build_job_message/1)
end
Expand All @@ -950,12 +950,12 @@ defmodule Lightning.AiAssistant do

context = build_context(initial_context, opts)
history = build_history(session)
meta = session.meta || %{}

case ApolloClient.job_chat_stream(content,
context: context,
history: history,
meta: meta
meta: apollo_meta(session),
metrics_opt_in: metrics_opt_in?(session)
) do
{:ok, %Tesla.Env{status: status, body: body}}
when status in @success_status_range ->
Expand Down Expand Up @@ -990,6 +990,23 @@ defmodule Lightning.AiAssistant do
end)
end

defp apollo_meta(session) do
session = Repo.preload(session, :user)
user = session.user

(session.meta || %{})
|> Map.put("session_id", session.id)
|> Map.put("user", %{
"id" => user.id,
"persona" => User.langfuse_persona(user)
})
end

defp metrics_opt_in?(session) do
session = Repo.preload(session, :user)
User.core_contributor?(session.user)
end

@doc """
Queries the AI service for workflow template generation.

Expand All @@ -1003,7 +1020,6 @@ defmodule Lightning.AiAssistant do
- `opts` - Keyword list of options:
- `:code` - Current YAML to modify (default: uses latest from session)
- `:errors` - Validation errors from previous workflow attempts
- `:meta` - Additional metadata to pass to the AI service (default: session.meta)

## Returns

Expand All @@ -1015,7 +1031,6 @@ defmodule Lightning.AiAssistant do
def query_workflow(session, content, opts \\ []) do
code = Keyword.get(opts, :code)
errors = Keyword.get(opts, :errors)
meta = Keyword.get(opts, :meta, session.meta || %{})

Logger.metadata(prompt_size: byte_size(content), session_id: session.id)

Expand All @@ -1024,7 +1039,8 @@ defmodule Lightning.AiAssistant do
code: code,
errors: errors,
history: build_history(session),
meta: meta
meta: apollo_meta(session),
metrics_opt_in: metrics_opt_in?(session)
)
|> handle_ai_response(session, &build_workflow_message/1)
end
Expand All @@ -1039,15 +1055,15 @@ defmodule Lightning.AiAssistant do
def query_workflow_stream(session, content, opts \\ []) do
code = Keyword.get(opts, :code)
errors = Keyword.get(opts, :errors)
meta = Keyword.get(opts, :meta, session.meta || %{})

Logger.metadata(prompt_size: byte_size(content), session_id: session.id)

case ApolloClient.workflow_chat_stream(content,
code: code,
errors: errors,
history: build_history(session),
meta: meta
meta: apollo_meta(session),
metrics_opt_in: metrics_opt_in?(session)
) do
{:ok, %Tesla.Env{status: status, body: body}}
when status in @success_status_range ->
Expand Down Expand Up @@ -1076,7 +1092,9 @@ defmodule Lightning.AiAssistant do
case ApolloClient.global_chat_stream(content,
workflow_yaml: workflow_yaml,
page: page,
history: history
history: history,
meta: apollo_meta(session),
metrics_opt_in: metrics_opt_in?(session)
) do
{:ok, %Tesla.Env{status: status, body: body}}
when status in @success_status_range ->
Expand Down
64 changes: 51 additions & 13 deletions lib/lightning/apollo_client.ex
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,8 @@ defmodule Lightning.ApolloClient do
- `:context` - Job context including expression code and adaptor info (default: %{})
- `:history` - Previous conversation messages for context (default: [])
- `:meta` - Additional metadata like session IDs or user preferences (default: %{})
- `:metrics_opt_in` - Optional boolean enabling Langfuse metrics tracking
on the Apollo side. Omitted from the wire payload when not supplied.

## Returns

Expand All @@ -82,15 +84,20 @@ defmodule Lightning.ApolloClient do
context = Keyword.get(opts, :context, %{})
history = Keyword.get(opts, :history, [])
meta = Keyword.get(opts, :meta, %{})
metrics_opt_in = Keyword.get(opts, :metrics_opt_in)

payload = %{
"api_key" => Lightning.Config.apollo(:ai_assistant_api_key),
"content" => content,
"context" => context,
"history" => history,
"meta" => meta,
"suggest_code" => true
}
payload =
%{
"api_key" => Lightning.Config.apollo(:ai_assistant_api_key),
"content" => content,
"context" => context,
"history" => history,
"meta" => meta,
"suggest_code" => true,
"metrics_opt_in" => metrics_opt_in
}
|> Enum.reject(fn {_, v} -> is_nil(v) end)
|> Enum.into(%{})

client()
|> Tesla.post("/services/job_chat", payload)
Expand All @@ -111,6 +118,8 @@ defmodule Lightning.ApolloClient do
- `:errors` - Optional validation errors from previous workflow attempts
- `:history` - Previous conversation messages for context (default: [])
- `:meta` - Additional metadata (default: %{})
- `:metrics_opt_in` - Optional boolean enabling Langfuse metrics tracking
on the Apollo side. Omitted from the wire payload when not supplied.

## Returns

Expand All @@ -125,6 +134,7 @@ defmodule Lightning.ApolloClient do
errors = Keyword.get(opts, :errors)
history = Keyword.get(opts, :history, [])
meta = Keyword.get(opts, :meta, %{})
metrics_opt_in = Keyword.get(opts, :metrics_opt_in)

payload =
%{
Expand All @@ -133,7 +143,8 @@ defmodule Lightning.ApolloClient do
"existing_yaml" => code,
"errors" => errors,
"history" => history,
"meta" => meta
"meta" => meta,
"metrics_opt_in" => metrics_opt_in
}
|> Enum.reject(fn {_, v} -> is_nil(v) end)
|> Enum.into(%{})
Expand All @@ -151,23 +162,34 @@ defmodule Lightning.ApolloClient do
The stream emits Anthropic-formatted events (`content_block_delta`, etc.)
followed by a final `complete` event containing the full response payload
(same shape as the synchronous `job_chat/2` response).

## Options

Accepts the same options as `job_chat/2`, including `:metrics_opt_in` which,
when supplied, enables Langfuse metrics tracking on the Apollo side. Omitted
from the wire payload when not supplied.
"""
@spec job_chat_stream(String.t(), opts()) :: Tesla.Env.result()
def job_chat_stream(content, opts \\ []) do
context = Keyword.get(opts, :context, %{})
history = Keyword.get(opts, :history, [])
meta = Keyword.get(opts, :meta, %{})
metrics_opt_in = Keyword.get(opts, :metrics_opt_in)

payload =
Jason.encode!(%{
%{
"api_key" => Lightning.Config.apollo(:ai_assistant_api_key),
"content" => content,
"context" => context,
"history" => history,
"meta" => meta,
"suggest_code" => true,
"stream" => true
})
"stream" => true,
"metrics_opt_in" => metrics_opt_in
}
|> Enum.reject(fn {_, v} -> is_nil(v) end)
|> Enum.into(%{})
|> Jason.encode!()

stream_client()
|> Tesla.post("/services/job_chat/stream", payload,
Expand All @@ -181,13 +203,20 @@ defmodule Lightning.ApolloClient do

Same as `workflow_chat/2` but connects to Apollo's streaming endpoint.
See `job_chat_stream/2` for details on the stream format.

## Options

Accepts the same options as `workflow_chat/2`, including `:metrics_opt_in`
which, when supplied, enables Langfuse metrics tracking on the Apollo side.
Omitted from the wire payload when not supplied.
"""
@spec workflow_chat_stream(String.t(), opts()) :: Tesla.Env.result()
def workflow_chat_stream(content, opts \\ []) do
code = Keyword.get(opts, :code)
errors = Keyword.get(opts, :errors)
history = Keyword.get(opts, :history, [])
meta = Keyword.get(opts, :meta, %{})
metrics_opt_in = Keyword.get(opts, :metrics_opt_in)

payload =
%{
Expand All @@ -197,7 +226,8 @@ defmodule Lightning.ApolloClient do
"errors" => errors,
"history" => history,
"meta" => meta,
"stream" => true
"stream" => true,
"metrics_opt_in" => metrics_opt_in
}
|> Enum.reject(fn {_, v} -> is_nil(v) end)
|> Enum.into(%{})
Expand Down Expand Up @@ -225,12 +255,18 @@ defmodule Lightning.ApolloClient do
- `:workflow_yaml` - Full workflow YAML including job bodies (optional)
- `:page` - Current page URL for routing (optional)
- `:history` - Previous conversation messages (default: [])
- `:meta` - Optional metadata map (e.g. session IDs, Langfuse keys).
Omitted from the wire payload when not supplied.
- `:metrics_opt_in` - Optional boolean enabling Langfuse metrics tracking
on the Apollo side. Omitted from the wire payload when not supplied.
"""
@spec global_chat_stream(String.t(), opts()) :: Tesla.Env.result()
def global_chat_stream(content, opts \\ []) do
workflow_yaml = Keyword.get(opts, :workflow_yaml)
page = Keyword.get(opts, :page)
history = Keyword.get(opts, :history, [])
meta = Keyword.get(opts, :meta)
metrics_opt_in = Keyword.get(opts, :metrics_opt_in)

payload =
%{
Expand All @@ -239,6 +275,8 @@ defmodule Lightning.ApolloClient do
"workflow_yaml" => workflow_yaml,
"page" => page,
"history" => history,
"meta" => meta,
"metrics_opt_in" => metrics_opt_in,
"options" => %{"stream" => true}
}
|> Enum.reject(fn {_, v} -> is_nil(v) end)
Expand Down
30 changes: 30 additions & 0 deletions test/lightning/accounts/user_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -483,4 +483,34 @@ defmodule Lightning.Accounts.UserTest do
:superuser
end
end

describe "core_contributor?/1" do
test "true for @openfn.org email" do
assert User.core_contributor?(%User{email: "alice@openfn.org"})
end

test "case-insensitive" do
assert User.core_contributor?(%User{email: "Bob@OpenFN.ORG"})
end

test "false for other domains" do
refute User.core_contributor?(%User{email: "alice@example.com"})
end

test "false for nil/empty email" do
refute User.core_contributor?(%User{email: nil})
refute User.core_contributor?(%User{email: ""})
end
end

describe "langfuse_persona/1" do
test "core-contributor for @openfn user" do
assert User.langfuse_persona(%User{email: "x@openfn.org"}) ==
"core-contributor"
end

test "user for everyone else" do
assert User.langfuse_persona(%User{email: "x@example.com"}) == "user"
end
end
end
Loading