Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 22 additions & 3 deletions lib/logflare/sources/source/bigquery/pipeline.ex
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@

# Ziinc: temporarily pass in source token until PubSubRates is refactored
def ack({queue, source_token}, successful, _failed) do
# TODO: re-queue failed

Check warning on line 111 in lib/logflare/sources/source/bigquery/pipeline.ex

View workflow job for this annotation

GitHub Actions / Checks (Code Quality - Linting, mix lint)

Found a TODO tag in a comment: # TODO: re-queue failed
metrics = Sources.get_source_metrics_for_ingest(source_token)
{sid, bid, _tid} = queue

Expand Down Expand Up @@ -361,6 +361,14 @@
bigquery_processed_bytes_limit: 10_000_000_000
}

Logger.warning("user audit: BigQuery backend auto-disconnect triggered",
action: "user.bq_auto_disconnect",
user_id: user.id,
user_email: user.email,
source_token: source_id,
reason: message
)

case Users.update_user_allowed(user, defaults) do
{:ok, user} ->
Supervisor.reset_all_user_sources(user)
Expand All @@ -369,11 +377,22 @@
|> AccountEmail.backend_disconnected(message)
|> Mailer.deliver()

Logger.warning("Backend disconnected for: #{user.email}", tesla_response: message)
Logger.warning("user audit: BigQuery backend auto-disconnected",
action: "user.bq_auto_disconnected",
user_id: user.id,
user_email: user.email,
source_token: source_id,
reason: message
)

{:error, changeset} ->
Logger.error("Failed to reset backend for user: #{user.email}",
changeset: inspect(changeset)
Logger.error("user audit: BigQuery backend auto-disconnect failed",
action: "user.bq_auto_disconnect_failed",
user_id: user.id,
user_email: user.email,
source_token: source_id,
reason: message,
errors: inspect(changeset.errors)
)
end
end
Expand Down
144 changes: 141 additions & 3 deletions lib/logflare_web/controllers/user_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ defmodule LogflareWeb.UserController do

use PhoenixHTMLHelpers

require Logger

plug LogflareWeb.Plugs.AuthMustBeOwner

alias Logflare.Backends.Adaptor.BigQueryAdaptor
Expand All @@ -12,8 +14,15 @@ defmodule LogflareWeb.UserController do
alias Logflare.User
alias Logflare.Users

defp env_service_account,
do: Application.get_env(:logflare, Logflare.Google)[:service_account] || ""
@bq_fields ~w(
bigquery_project_id
bigquery_dataset_location
bigquery_dataset_id
bigquery_reservation_alerts
bigquery_reservation_search
bigquery_processed_bytes_limit
bigquery_enable_managed_service_accounts
)a

def api_show(%{assigns: %{user: user}} = conn, _params) do
conn
Expand All @@ -30,9 +39,15 @@ defmodule LogflareWeb.UserController do
)
end

def update(%{assigns: %{plan: %{name: "Free"}}} = conn, %{
def update(%{assigns: %{plan: %{name: "Free"}, user: user}} = conn, %{
"user" => %{"bigquery_project_id" => _id}
}) do
Logger.warning("user audit: BigQuery backend update blocked for free plan",
action: "user.bq_update_blocked",
user_id: user.id,
user_email: user.email
)

message = [
"Please ",
PhoenixHTMLHelpers.Link.link("upgrade to a paid plan",
Expand All @@ -49,6 +64,24 @@ defmodule LogflareWeb.UserController do
def update(%{assigns: %{user: user}} = conn, %{"user" => params}) do
case Users.update_user_allowed(user, params) do
{:ok, updated_user} ->
Logger.info("user audit: user account updated",
action: "user.update",
user_id: user.id,
user_email: user.email,
params: inspect(params)
)

bq_changes = bq_field_changes(user, updated_user)

if bq_changes != %{} do
Logger.info("user audit: BigQuery backend settings changed",
action: "user.bq_settings_updated",
user_id: user.id,
user_email: user.email,
changes: inspect(bq_changes)
)
end

if updated_user.bigquery_project_id != user.bigquery_project_id,
do: Supervisor.reset_all_user_sources(user)

Expand All @@ -62,6 +95,14 @@ defmodule LogflareWeb.UserController do
|> redirect(to: ~p"/account/edit")

{:error, changeset} ->
Logger.warning("user audit: user account update failed",
action: "user.update_failed",
user_id: user.id,
user_email: user.email,
params: inspect(params),
errors: inspect(changeset.errors)
)

conn
|> put_flash(:error, "Something went wrong! See below for errors.")
|> put_status(406)
Expand All @@ -78,13 +119,32 @@ defmodule LogflareWeb.UserController do
conn,
_params
) do
Logger.info("user audit: user account deletion requested",
action: "user.delete",
user_id: user.id,
user_email: user.email,
has_stripe_customer: true
)

with {:ok, _user} <- Users.delete_user(user),
{:ok, _response} <- Stripe.delete_customer(stripe_customer) do
Logger.info("user audit: user account deleted",
action: "user.deleted",
user_id: user.id,
user_email: user.email
)

conn
|> configure_session(drop: true)
|> redirect(to: ~p"/auth/login?#{%{user_deleted: true}}")
else
_err ->
Logger.error("user audit: user account deletion failed",
action: "user.delete_failed",
user_id: user.id,
user_email: user.email
)

conn
|> put_flash(
:error,
Expand All @@ -95,13 +155,32 @@ defmodule LogflareWeb.UserController do
end

def delete(%{assigns: %{user: user}} = conn, _params) do
Logger.info("user audit: user account deletion requested",
action: "user.delete",
user_id: user.id,
user_email: user.email,
has_stripe_customer: false
)

case Users.delete_user(user) do
{:ok, _user} ->
Logger.info("user audit: user account deleted",
action: "user.deleted",
user_id: user.id,
user_email: user.email
)

conn
|> configure_session(drop: true)
|> redirect(to: ~p"/auth/login?#{%{user_deleted: true}}")

_err ->
Logger.error("user audit: user account deletion failed",
action: "user.delete_failed",
user_id: user.id,
user_email: user.email
)

conn
|> put_flash(
:error,
Expand All @@ -121,6 +200,12 @@ defmodule LogflareWeb.UserController do

Users.update_user_all_fields(user, auth_params)

Logger.info("user audit: API key restored",
action: "user.api_key_restored",
user_id: user.id,
user_email: user.email
)

conn
|> put_flash(:info, "API key restored!")
|> redirect(to: ~p"/dashboard")
Expand All @@ -133,6 +218,12 @@ defmodule LogflareWeb.UserController do

Users.update_user_all_fields(user, auth_params)

Logger.info("user audit: API key reset",
action: "user.api_key_reset",
user_id: user.id,
user_email: user.email
)

conn
|> put_flash(:info, [
"API key reset! ",
Expand All @@ -153,6 +244,13 @@ defmodule LogflareWeb.UserController do
user = Users.preload_team(user)

if is_nil(team_user) or team_user.team_id != user.team.id do
Logger.warning("user audit: unauthorized ownership transfer attempt",
action: "user.change_owner_unauthorized",
user_id: user.id,
user_email: user.email,
target_team_user_id: id
)

conn
|> put_flash(:error, "Not authorized to transfer ownership to this team member")
|> redirect(to: Routes.user_path(conn, :edit) <> "#change-account-owner")
Expand All @@ -163,11 +261,28 @@ defmodule LogflareWeb.UserController do
Stripe.update_customer(user.billing_account.stripe_customer, %{email: new_owner.email})
end

Logger.info("user audit: account ownership transferred",
action: "user.change_owner",
user_id: user.id,
user_email: user.email,
new_owner_id: new_owner.id,
new_owner_email: new_owner.email,
team_user_id: id
)

conn
|> put_flash(:info, "Owner successfully changed!")
|> redirect(to: Routes.user_path(conn, :edit) <> "#team-members")
else
{:error, changeset} ->
Logger.error("user audit: ownership transfer failed",
action: "user.change_owner_failed",
user_id: user.id,
user_email: user.email,
target_team_user_id: id,
errors: inspect(changeset.errors)
)

case List.first(changeset.errors) do
{:email, {"has already been taken", _data}} ->
conn
Expand All @@ -187,10 +302,33 @@ defmodule LogflareWeb.UserController do
end

_err ->
Logger.error("user audit: ownership transfer failed",
action: "user.change_owner_failed",
user_id: user.id,
user_email: user.email,
target_team_user_id: id
)

conn
|> put_flash(:error, "Something went wrong. Please contact support if this continues.")
|> redirect(to: Routes.user_path(conn, :edit) <> "#change-account-owner")
end
end
end

defp env_service_account,
do: Application.get_env(:logflare, Logflare.Google)[:service_account] || ""

defp bq_field_changes(old_user, new_user) do
Enum.reduce(@bq_fields, %{}, fn field, acc ->
old_val = Map.get(old_user, field)
new_val = Map.get(new_user, field)

if old_val != new_val do
Map.put(acc, field, %{from: old_val, to: new_val})
else
acc
end
end)
end
end
Loading
Loading