|
1 | 1 | defmodule RemotePersistentTerm.Fetcher.S3 do |
2 | 2 | @moduledoc """ |
3 | 3 | A Fetcher implementation for AWS S3. |
| 4 | +
|
| 5 | + ## Versioned vs. non-versioned buckets |
| 6 | +
|
| 7 | + This fetcher works with both versioned and non-versioned buckets. It uses the object's |
| 8 | + `ETag` as a change token and performs conditional GETs with `If-None-Match` to avoid |
| 9 | + re-downloading unchanged data. |
| 10 | +
|
| 11 | + - **Versioned buckets**: `HEAD`/`GET` responses include `ETag`; the fetcher uses it for |
| 12 | + change detection. The latest object is always whatever S3 returns for the key (no explicit |
| 13 | + version ID required). |
| 14 | + - **Non-versioned buckets**: only `ETag` is available, which is sufficient to detect |
| 15 | + content changes. Overwriting an object with identical bytes may keep the same `ETag`, |
| 16 | + which is fine because the content is unchanged. |
| 17 | +
|
| 18 | + ## S3-compatible services |
| 19 | +
|
| 20 | + S3-compatible providers (e.g., DigitalOcean Spaces, Linode Object Storage) should work |
| 21 | + as long as they support standard S3 headers: `ETag`, `If-None-Match`, and `304 Not Modified`. |
| 22 | + If a provider ignores conditional requests, the fetcher will still function but will |
| 23 | + download on every refresh. |
4 | 24 | """ |
5 | 25 | require Logger |
6 | 26 |
|
@@ -82,17 +102,20 @@ defmodule RemotePersistentTerm.Fetcher.S3 do |
82 | 102 |
|
83 | 103 | @impl true |
84 | 104 | def current_version(state) do |
85 | | - with {:ok, versions} <- list_object_versions(state), |
86 | | - {:ok, %{etag: etag, version_id: version}} <- find_latest(versions) do |
| 105 | + with {:ok, %{headers: headers}} <- head_object(state), |
| 106 | + {:ok, version} <- extract_version(headers) do |
87 | 107 | Logger.info( |
88 | 108 | bucket: state.bucket, |
89 | 109 | key: state.key, |
90 | 110 | version: version, |
91 | 111 | message: "Found latest version of object" |
92 | 112 | ) |
93 | 113 |
|
94 | | - {:ok, etag} |
| 114 | + {:ok, version} |
95 | 115 | else |
| 116 | + {:error, {:http_error, 404, _}} -> |
| 117 | + {:error, "could not find s3://#{state.bucket}/#{state.key}"} |
| 118 | + |
96 | 119 | {:error, {:unexpected_response, %{body: reason}}} -> |
97 | 120 | {:error, reason} |
98 | 121 |
|
@@ -133,60 +156,127 @@ defmodule RemotePersistentTerm.Fetcher.S3 do |
133 | 156 | end |
134 | 157 | end |
135 | 158 |
|
136 | | - defp list_object_versions(state) do |
| 159 | + @impl true |
| 160 | + def download_if_changed(state, current_version) do |
137 | 161 | res = |
138 | | - aws_client_request( |
139 | | - &ExAws.S3.get_bucket_object_versions/2, |
| 162 | + get_object_request( |
140 | 163 | state, |
141 | | - prefix: state.key |
| 164 | + if_none_match_opts(current_version), |
| 165 | + &failover_on_error?/1 |
142 | 166 | ) |
143 | 167 |
|
144 | | - with {:ok, %{body: %{versions: versions}}} <- res do |
145 | | - {:ok, versions} |
| 168 | + case res do |
| 169 | + {:ok, %{status_code: 304}} -> |
| 170 | + {:not_modified, current_version} |
| 171 | + |
| 172 | + {:error, {:http_error, 304, _}} -> |
| 173 | + {:not_modified, current_version} |
| 174 | + |
| 175 | + {:ok, %{body: body, headers: headers}} -> |
| 176 | + with {:ok, version} <- extract_version(headers) do |
| 177 | + {:ok, body, version} |
| 178 | + end |
| 179 | + |
| 180 | + {:error, reason} -> |
| 181 | + {:error, inspect(reason)} |
146 | 182 | end |
147 | 183 | end |
148 | 184 |
|
149 | 185 | defp get_object(state) do |
150 | | - aws_client_request(&ExAws.S3.get_object/2, state, state.key) |
| 186 | + get_object_request(state, []) |
| 187 | + end |
| 188 | + |
| 189 | + defp get_object_request(state, opts, failover_on_error? \\ fn _ -> true end) do |
| 190 | + aws_client_request( |
| 191 | + fn bucket, request_opts -> ExAws.S3.get_object(bucket, state.key, request_opts) end, |
| 192 | + state, |
| 193 | + opts, |
| 194 | + failover_on_error? |
| 195 | + ) |
| 196 | + end |
| 197 | + |
| 198 | + defp head_object(state) do |
| 199 | + aws_client_request(&ExAws.S3.head_object/2, state, state.key) |
151 | 200 | end |
152 | 201 |
|
153 | | - defp find_latest([_ | _] = contents) do |
154 | | - Enum.find(contents, fn |
155 | | - %{is_latest: "true"} -> |
156 | | - true |
| 202 | + defp extract_version(headers) do |
| 203 | + case header_value(headers, "etag") do |
| 204 | + nil -> {:error, :not_found} |
| 205 | + value -> {:ok, normalize_etag(value)} |
| 206 | + end |
| 207 | + end |
| 208 | + |
| 209 | + defp header_value(headers, name) do |
| 210 | + downcased = String.downcase(name) |
| 211 | + |
| 212 | + Enum.find_value(headers, fn |
| 213 | + {key, value} when is_binary(key) and is_binary(value) -> |
| 214 | + if String.downcase(key) == downcased, do: value, else: nil |
| 215 | + |
| 216 | + {key, value} when is_atom(key) and is_binary(value) -> |
| 217 | + if String.downcase(Atom.to_string(key)) == downcased, do: value, else: nil |
157 | 218 |
|
158 | 219 | _ -> |
159 | | - false |
| 220 | + nil |
160 | 221 | end) |
161 | | - |> case do |
162 | | - res when is_map(res) -> {:ok, res} |
163 | | - _ -> {:error, :not_found} |
| 222 | + end |
| 223 | + |
| 224 | + defp normalize_etag(value) when is_binary(value) do |
| 225 | + value |
| 226 | + |> String.trim() |
| 227 | + |> String.trim("\"") |
| 228 | + end |
| 229 | + |
| 230 | + defp if_none_match_opts(nil), do: [] |
| 231 | + defp if_none_match_opts(etag), do: [if_none_match: quote_etag(etag)] |
| 232 | + |
| 233 | + defp quote_etag(etag) do |
| 234 | + etag = String.trim(etag) |
| 235 | + |
| 236 | + if String.starts_with?(etag, "\"") and String.ends_with?(etag, "\"") do |
| 237 | + etag |
| 238 | + else |
| 239 | + "\"#{etag}\"" |
164 | 240 | end |
165 | 241 | end |
166 | 242 |
|
167 | | - defp find_latest(_), do: {:error, :not_found} |
| 243 | + defp failover_on_error?({:http_error, 304, _}), do: false |
| 244 | + defp failover_on_error?(_reason), do: true |
| 245 | + |
| 246 | + defp aws_client_request(op, state, opts) do |
| 247 | + aws_client_request(op, state, opts, fn _ -> true end) |
| 248 | + end |
168 | 249 |
|
169 | | - defp aws_client_request(op, %{failover_buckets: nil} = state, opts) do |
| 250 | + defp aws_client_request(op, %{failover_buckets: nil} = state, opts, _failover_on_error?) do |
170 | 251 | perform_request(op, state.bucket, state.region, opts) |
171 | 252 | end |
172 | 253 |
|
173 | 254 | defp aws_client_request( |
174 | 255 | op, |
175 | 256 | %{ |
176 | | - failover_buckets: [_|_] = failover_buckets |
| 257 | + failover_buckets: [_ | _] = failover_buckets |
177 | 258 | } = state, |
178 | | - opts |
| 259 | + opts, |
| 260 | + failover_on_error? |
179 | 261 | ) do |
180 | | - with {:error, reason} <- perform_request(op, state.bucket, state.region, opts) do |
181 | | - Logger.error(%{ |
182 | | - bucket: state.bucket, |
183 | | - key: state.key, |
184 | | - region: state.region, |
185 | | - reason: inspect(reason), |
186 | | - message: "Failed to fetch from primary bucket, attempting failover buckets" |
187 | | - }) |
188 | | - |
189 | | - try_failover_buckets(op, failover_buckets, opts, state) |
| 262 | + case perform_request(op, state.bucket, state.region, opts) do |
| 263 | + {:error, reason} = error -> |
| 264 | + if failover_on_error?.(reason) do |
| 265 | + Logger.error(%{ |
| 266 | + bucket: state.bucket, |
| 267 | + key: state.key, |
| 268 | + region: state.region, |
| 269 | + reason: inspect(reason), |
| 270 | + message: "Failed to fetch from primary bucket, attempting failover buckets" |
| 271 | + }) |
| 272 | + |
| 273 | + try_failover_buckets(op, failover_buckets, opts, state) |
| 274 | + else |
| 275 | + error |
| 276 | + end |
| 277 | + |
| 278 | + result -> |
| 279 | + result |
190 | 280 | end |
191 | 281 | end |
192 | 282 |
|
|
0 commit comments