Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
150 changes: 111 additions & 39 deletions lib/philomena_media/objects.ex
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,11 @@ defmodule PhilomenaMedia.Objects do

"""
alias PhilomenaMedia.Mime
alias ExAws.S3
require Logger

@type key :: String.t()
@typep operation_fn :: (... -> ExAws.Operation.S3.t())

@doc """
Fetch a key from the storage backend and write it into the destination path.
Expand All @@ -61,11 +63,12 @@ defmodule PhilomenaMedia.Objects do
contents =
backends()
|> Enum.find_value(fn opts ->
ExAws.S3.get_object(opts[:bucket], key)
|> ExAws.request(opts[:config_overrides])
|> case do
{:ok, result} -> result
_ -> nil
case request(&S3.get_object/2, [opts[:bucket], key], opts) do
{:ok, contents} ->
contents

{:error, _} ->
nil
end
end)

Expand All @@ -87,10 +90,10 @@ defmodule PhilomenaMedia.Objects do
{_, mime} = Mime.file(file_path)
contents = File.read!(file_path)

run_all(fn opts ->
ExAws.S3.put_object(opts[:bucket], key, contents, content_type: mime)
|> ExAws.request!(opts[:config_overrides])
end)
replicate_request(
&S3.put_object/4,
&[&1[:bucket], key, {:log_byte_size, contents}, [content_type: mime]]
)
end

@doc """
Expand Down Expand Up @@ -127,11 +130,7 @@ defmodule PhilomenaMedia.Objects do
def copy(source_key, dest_key) do
# Potential workaround for inconsistent PutObjectCopy on R2
#
# run_all(fn opts->
# ExAws.S3.put_object_copy(opts[:bucket], dest_key, opts[:bucket], source_key)
# |> ExAws.request!(opts[:config_overrides])
# end)

# replicate_request(ExAws.S3.put_object_copy/4, &[&1[:bucket], dest_key, &1[:bucket], source_key])
try do
file_path = Briefly.create!()
download_file(source_key, file_path)
Expand All @@ -154,10 +153,7 @@ defmodule PhilomenaMedia.Objects do
"""
@spec delete(key()) :: :ok
def delete(key) do
run_all(fn opts ->
ExAws.S3.delete_object(opts[:bucket], key)
|> ExAws.request!(opts[:config_overrides])
end)
replicate_request(&S3.delete_object/2, &[&1[:bucket], key])
end

@doc """
Expand All @@ -176,32 +172,108 @@ defmodule PhilomenaMedia.Objects do
"""
@spec delete_multiple([key()]) :: :ok
def delete_multiple(keys) do
run_all(fn opts ->
ExAws.S3.delete_multiple_objects(opts[:bucket], keys)
|> ExAws.request!(opts[:config_overrides])
end)
replicate_request(&S3.delete_multiple_objects/2, &[&1[:bucket], keys])
end

defp run_all(wrapped) do
fun = fn opts ->
try do
wrapped.(opts)
:ok
catch
_kind, _value -> :error
end
end
# Run the S3 operation with the given list of arguments. The `opts` parameter
# is used to select the specific S3 backend to run the operation against. See
# the functions `primary_opts/0` and `replica_opts/0` responsible for reading
# the config for the primary and replica S3-compatible storages.
#
# This function serves as a thin wrapper over this call:
# ```ex
# operation_fn(...args) |> ExAws.request(opts[:config_overrides])
# ```
#
# Everything else in this function is just logging the request and the
# potential error.
#
# # Huge Payloads Logging
#
# There is a special case of the `s3:PutObject` request that accepts a binary
# payload to upload with the maximum size of 5GB (according to AWS limits).
# For this use case, this function specially handles arguments in the `args`
# list of shape `{:log_byte_size, binary}`. This is used to log the size of the
# binary payload in MB instead of logging the entire payload itself, which
# would be wasteful and useless.
@spec request(operation_fn(), [term()], keyword()) :: term()
defp request(operation, args, opts) do
{:name, operation_name} = Function.info(operation, :name)

Logger.debug(fn ->
args_debug =
args
|> Enum.map(fn
{:log_byte_size, arg} -> "#{(byte_size(arg) / 1_000_000) |> Float.round(2)} MB"
arg -> inspect(arg)
end)
|> Enum.join(", ")

"S3.#{operation_name}(#{args_debug})"
end)

backends()
|> Task.async_stream(fun, timeout: :infinity)
|> Enum.any?(fn {_, v} -> v == :error end)
|> if do
Logger.warning("Failed to operate on all backends")
else
:ok
args =
args
|> Enum.map(fn
{:log_byte_size, arg} -> arg
arg -> arg
end)

operation
|> apply(args)
|> ExAws.request(opts[:config_overrides])
|> case do
{:ok, output} ->
{:ok, output}

# Specially handle the `:http_error` case. This is the most frequent error
# that can happen when the S3 backend responds with an error like
# `BucketNotFound` or `InvalidRequest`. In this case we are most
# interested in the response status and body which fully describe the
# error. We do it this way to provide nicer formatting for such errors.
{:error, {:http_error, status_code, %{body: body}} = err} ->
Logger.warning(
"S3.#{operation_name} failed (HTTP #{inspect(status_code)})\nError: #{body}"
)

{:error, err}

# This is a less likely generic case of an error like connection timeout
{:error, err} ->
Logger.warning("S3.#{operation_name} failed\nError: #{inspect(err, pretty: true)}")
{:error, err}
end
end

:ok
# Run the S3 request across the primary and replica backends. This is only
# useful for mutating operations that need to write the new changes to both
# destinations. Any errors will be just logged and **not** propagated to the
# caller.
#
# Ideally, a pro-active alert could be triggered to notify the ops about the
# issue immediately so they fix the problem and retry the upload. We'll leave
# this improvement for another day.
@spec replicate_request(operation_fn(), (keyword() -> [term()])) :: :ok
defp replicate_request(operation, args) do
{:name, operation_name} = Function.info(operation, :name)
backends = backends()

total_err =
backends
|> Task.async_stream(&request(operation, args.(&1), &1), timeout: :infinity)
|> Enum.filter(&(not match?({:ok, {:ok, _}}, &1)))
|> Enum.count()

cond do
total_err > 0 and total_err == length(backends) ->
Logger.error("S3.#{operation_name} failed for all (#{total_err}) backends")

total_err > 0 ->
Logger.warning("S3.#{operation_name} failed for #{total_err} backends")

true ->
:ok
end
end

defp backends do
Expand Down