-
Notifications
You must be signed in to change notification settings - Fork 0
Optimize S3 fetcher with conditional GETs and new download_if_changed callback #20
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
48b4930
f681a3d
dba0bae
405cc87
3df3517
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,35 @@ | ||
| # Repository Guidelines | ||
|
|
||
| ## Project Structure & Module Organization | ||
| - `lib/remote_persistent_term.ex` defines the core behaviour and `use RemotePersistentTerm` macro. | ||
| - `lib/remote_persistent_term/fetcher/` holds fetcher implementations (HTTP, S3, Static) and helpers like HTTP cache logic. | ||
| - Tests live in `test/` with paths mirroring modules (e.g., `test/remote_persistent_term/fetcher/http_test.exs`). | ||
| - Generated artifacts (`_build/`, `deps/`, `doc/`, `cover/`) are outputs of Mix tasks and should not be edited by hand. | ||
|
|
||
| ## Build, Test, and Development Commands | ||
| - `mix deps.get` installs dependencies. | ||
| - `mix compile` builds the library. | ||
| - `mix test` runs the full ExUnit suite. | ||
| - `mix test test/remote_persistent_term/fetcher/http_test.exs:30` runs a focused test by file and line. | ||
| - `mix format` applies the project formatter (see `.formatter.exs`). | ||
| - `mix docs` generates ExDoc output into `doc/`. | ||
|
|
||
| ## Coding Style & Naming Conventions | ||
| - Follow `mix format` output (Elixir defaults to 2-space indentation). | ||
| - Modules use `CamelCase`; files and functions use `snake_case` (predicates end in `?`). | ||
| - Keep option keys consistent with `RemotePersistentTerm` options and fetcher configuration keys. | ||
|
|
||
| ## Testing Guidelines | ||
| - Tests use ExUnit; Mox mocks the ExAws client and Bypass is used for HTTP fetcher tests. | ||
| - Prefer deterministic tests and keep external network access mocked or bypassed. | ||
| - Name tests with clear behaviour statements; add coverage for new fetcher logic and option validation. | ||
|
|
||
| ## Commit & Pull Request Guidelines | ||
| - Commit messages are short, imperative, and capitalized (e.g., “Improve logs”, “Fix tests”, “Increment version”). | ||
| - Keep commits scoped to one change; avoid unrelated refactors in the same commit. | ||
| - PRs should include a concise summary, motivation, and the tests you ran (or note if none). | ||
| - If a change affects public behaviour or configuration, update documentation and mention it in the PR. | ||
|
|
||
| ## Notes for Contributors | ||
| - CI runs `mix test` on recent OTP/Elixir versions; ensure your changes pass locally before pushing. | ||
| - When touching S3 or HTTP fetchers, prefer tests that use Mox/Bypass rather than real services. |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,6 +1,26 @@ | ||
| defmodule RemotePersistentTerm.Fetcher.S3 do | ||
| @moduledoc """ | ||
| A Fetcher implementation for AWS S3. | ||
|
|
||
| ## Versioned vs. non-versioned buckets | ||
|
|
||
| This fetcher works with both versioned and non-versioned buckets. It uses the object's | ||
| `ETag` as a change token and performs conditional GETs with `If-None-Match` to avoid | ||
| re-downloading unchanged data. | ||
|
|
||
| - **Versioned buckets**: `HEAD`/`GET` responses include `ETag`; the fetcher uses it for | ||
| change detection. The latest object is always whatever S3 returns for the key (no explicit | ||
| version ID required). | ||
| - **Non-versioned buckets**: only `ETag` is available, which is sufficient to detect | ||
| content changes. Overwriting an object with identical bytes may keep the same `ETag`, | ||
| which is fine because the content is unchanged. | ||
|
|
||
| ## S3-compatible services | ||
|
|
||
| S3-compatible providers (e.g., DigitalOcean Spaces, Linode Object Storage) should work | ||
| as long as they support standard S3 headers: `ETag`, `If-None-Match`, and `304 Not Modified`. | ||
| If a provider ignores conditional requests, the fetcher will still function but will | ||
| download on every refresh. | ||
| """ | ||
| require Logger | ||
|
|
||
|
|
@@ -69,6 +89,8 @@ defmodule RemotePersistentTerm.Fetcher.S3 do | |
| """ | ||
| @impl true | ||
| def init(opts) do | ||
| ensure_http_client() | ||
|
|
||
| with {:ok, valid_opts} <- NimbleOptions.validate(opts, @opts_schema) do | ||
| {:ok, | ||
| %__MODULE__{ | ||
|
|
@@ -82,17 +104,20 @@ defmodule RemotePersistentTerm.Fetcher.S3 do | |
|
|
||
| @impl true | ||
| def current_version(state) do | ||
| with {:ok, versions} <- list_object_versions(state), | ||
| {:ok, %{etag: etag, version_id: version}} <- find_latest(versions) do | ||
| with {:ok, %{headers: headers}} <- head_object(state), | ||
| {:ok, version} <- extract_version(headers) do | ||
| Logger.info( | ||
| bucket: state.bucket, | ||
| key: state.key, | ||
| version: version, | ||
| message: "Found latest version of object" | ||
| ) | ||
|
|
||
| {:ok, etag} | ||
| {:ok, version} | ||
| else | ||
| {:error, {:http_error, 404, _}} -> | ||
| {:error, "could not find s3://#{state.bucket}/#{state.key}"} | ||
|
|
||
| {:error, {:unexpected_response, %{body: reason}}} -> | ||
| {:error, reason} | ||
|
|
||
|
|
@@ -133,60 +158,134 @@ defmodule RemotePersistentTerm.Fetcher.S3 do | |
| end | ||
| end | ||
|
|
||
| defp list_object_versions(state) do | ||
| @impl true | ||
| def download_if_changed(state, current_version) do | ||
| res = | ||
| aws_client_request( | ||
| &ExAws.S3.get_bucket_object_versions/2, | ||
| get_object_request( | ||
| state, | ||
| prefix: state.key | ||
| if_none_match_opts(current_version), | ||
| &failover_on_error?/1 | ||
| ) | ||
|
|
||
| with {:ok, %{body: %{versions: versions}}} <- res do | ||
| {:ok, versions} | ||
| case res do | ||
| {:ok, %{status_code: 304}} -> | ||
| {:not_modified, current_version} | ||
|
|
||
| {:error, {:http_error, 304, _}} -> | ||
| {:not_modified, current_version} | ||
|
|
||
| {:ok, %{body: body, headers: headers}} -> | ||
| with {:ok, version} <- extract_version(headers) do | ||
| {:ok, body, version} | ||
| end | ||
|
|
||
| {:error, reason} -> | ||
| {:error, inspect(reason)} | ||
| end | ||
| end | ||
|
|
||
| defp get_object(state) do | ||
| aws_client_request(&ExAws.S3.get_object/2, state, state.key) | ||
| get_object_request(state, []) | ||
| end | ||
|
|
||
| defp get_object_request(state, opts, failover_on_error? \\ fn _ -> true end) do | ||
| aws_client_request( | ||
| fn bucket, request_opts -> ExAws.S3.get_object(bucket, state.key, request_opts) end, | ||
| state, | ||
| opts, | ||
| failover_on_error? | ||
| ) | ||
| end | ||
|
|
||
| defp head_object(state) do | ||
| aws_client_request(&ExAws.S3.head_object/2, state, state.key) | ||
| end | ||
|
|
||
| defp find_latest([_ | _] = contents) do | ||
| Enum.find(contents, fn | ||
| %{is_latest: "true"} -> | ||
| true | ||
| defp extract_version(headers) do | ||
| case header_value(headers, "etag") do | ||
| nil -> {:error, :not_found} | ||
| value -> {:ok, normalize_etag(value)} | ||
| end | ||
| end | ||
|
|
||
| defp header_value(headers, name) do | ||
| downcased = String.downcase(name) | ||
|
|
||
| Enum.find_value(headers, fn | ||
| {key, value} when is_binary(key) and is_binary(value) -> | ||
| if String.downcase(key) == downcased, do: value, else: nil | ||
|
|
||
| _ -> | ||
| false | ||
| nil | ||
| end) | ||
| |> case do | ||
| res when is_map(res) -> {:ok, res} | ||
| _ -> {:error, :not_found} | ||
| end | ||
|
|
||
| defp normalize_etag(value) when is_binary(value) do | ||
| value | ||
| |> String.trim() | ||
| |> String.trim("\"") | ||
| end | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Weak ETags are incorrectly normalized causing match failuresLow Severity The Additional Locations (1) |
||
|
|
||
| defp if_none_match_opts(nil), do: [] | ||
| defp if_none_match_opts(etag), do: [if_none_match: quote_etag(etag)] | ||
|
|
||
| defp quote_etag(etag) do | ||
| etag = String.trim(etag) | ||
|
|
||
| if String.starts_with?(etag, "\"") and String.ends_with?(etag, "\"") do | ||
| etag | ||
| else | ||
| "\"#{etag}\"" | ||
| end | ||
| end | ||
|
|
||
| defp failover_on_error?({:http_error, 304, _}), do: false | ||
| defp failover_on_error?(_reason), do: true | ||
|
|
||
| defp ensure_http_client do | ||
| case Application.get_env(:ex_aws, :http_client) do | ||
| nil -> | ||
| Application.put_env(:ex_aws, :http_client, RemotePersistentTerm.Fetcher.S3.HttpClient) | ||
|
|
||
| _ -> | ||
| :ok | ||
| end | ||
| end | ||
|
|
||
| defp find_latest(_), do: {:error, :not_found} | ||
| defp aws_client_request(op, state, opts) do | ||
| aws_client_request(op, state, opts, fn _ -> true end) | ||
| end | ||
|
|
||
| defp aws_client_request(op, %{failover_buckets: nil} = state, opts) do | ||
| defp aws_client_request(op, %{failover_buckets: nil} = state, opts, _failover_on_error?) do | ||
| perform_request(op, state.bucket, state.region, opts) | ||
| end | ||
|
|
||
| defp aws_client_request( | ||
| op, | ||
| %{ | ||
| failover_buckets: [_|_] = failover_buckets | ||
| failover_buckets: [_ | _] = failover_buckets | ||
| } = state, | ||
| opts | ||
| opts, | ||
| failover_on_error? | ||
| ) do | ||
| with {:error, reason} <- perform_request(op, state.bucket, state.region, opts) do | ||
| Logger.error(%{ | ||
| bucket: state.bucket, | ||
| key: state.key, | ||
| region: state.region, | ||
| reason: inspect(reason), | ||
| message: "Failed to fetch from primary bucket, attempting failover buckets" | ||
| }) | ||
|
|
||
| try_failover_buckets(op, failover_buckets, opts, state) | ||
| case perform_request(op, state.bucket, state.region, opts) do | ||
| {:error, reason} = error -> | ||
| if failover_on_error?.(reason) do | ||
| Logger.error(%{ | ||
| bucket: state.bucket, | ||
| key: state.key, | ||
| region: state.region, | ||
| reason: inspect(reason), | ||
| message: "Failed to fetch from primary bucket, attempting failover buckets" | ||
| }) | ||
|
|
||
| try_failover_buckets(op, failover_buckets, opts, state) | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Failover buckets ignore 304 handling during conditional GETsMedium Severity The Additional Locations (1)
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. planning to work on the failover logic in a separate PR |
||
| else | ||
| error | ||
| end | ||
|
|
||
| result -> | ||
| result | ||
| end | ||
| end | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,26 @@ | ||
| defmodule RemotePersistentTerm.Fetcher.S3.HttpClient do | ||
| @moduledoc """ | ||
| ExAws HTTP client implementation for Req. | ||
| """ | ||
|
|
||
| @behaviour ExAws.Request.HttpClient | ||
|
|
||
| @impl ExAws.Request.HttpClient | ||
| def request(method, url, body, headers, _http_opts) do | ||
| request = Req.new(decode_body: false, retry: false) | ||
|
|
||
| case Req.request(request, method: method, url: url, body: body, headers: headers) do | ||
| {:ok, response} -> | ||
| response = %{ | ||
| status_code: response.status, | ||
| headers: Req.get_headers_list(response), | ||
| body: response.body | ||
| } | ||
|
|
||
| {:ok, response} | ||
|
|
||
| {:error, reason} -> | ||
| {:error, %{reason: reason}} | ||
| end | ||
| end | ||
| end |
Uh oh!
There was an error while loading. Please reload this page.