Skip to content

Commit 9331ad4

Browse files
authored
Fix tag updates atomicity and deadlocks (#483)
* Fix the tag updates atomicity and deadlocks * Replace `Map.drop()` with `Map.take()` as per Liam's feedback in DM * Fix error handling since there will be a tuple of more than 2 items returned as per docs * Replace `+1` with `1` for update_images_counts * Rename `update_images_counts` to `update_image_counts` and replace `case` with a function-level match * Replace the capture operator with an explicit `fn tag_name ->` * Fix remove repo nil match autocompleted by Github Copilot (bruh)
1 parent 5bb80fe commit 9331ad4

2 files changed

Lines changed: 133 additions & 42 deletions

File tree

lib/philomena/images.ex

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -96,22 +96,25 @@ defmodule Philomena.Images do
9696
|> Multi.insert(:image, image)
9797
|> Multi.run(:added_tag_count, fn repo, %{image: image} ->
9898
tag_ids = image.added_tags |> Enum.map(& &1.id)
99-
tags = Tag |> where([t], t.id in ^tag_ids)
10099

101-
{count, nil} = repo.update_all(tags, inc: [images_count: 1])
100+
count = Tags.update_image_counts(repo, 1, tag_ids)
102101

103102
{:ok, count}
104103
end)
105104
|> maybe_subscribe_on(:image, attribution[:user], :watch_on_upload)
106105
|> Repo.transaction()
107106
|> case do
108-
{:ok, %{image: image}} = result ->
109-
async_upload(image, attrs["image"])
107+
{:ok, %{image: image}} ->
108+
upload_pid = async_upload(image, attrs["image"])
110109
reindex_image(image)
111110
Tags.reindex_tags(image.added_tags)
112111
maybe_approve_image(image, attribution[:user])
113112

114-
result
113+
# Return the upload PID along with the created image so that the caller
114+
# can control the lifecycle of the upload if needed. It's useful, for
115+
# example for the seeding process to know when to delete the temp file
116+
# used for uploading.
117+
{:ok, %{image: image, upload_pid: upload_pid}}
115118

116119
result ->
117120
result
@@ -138,6 +141,8 @@ defmodule Philomena.Images do
138141

139142
# Free up the linked process
140143
send(linked_pid, :ready)
144+
145+
linked_pid
141146
end
142147

143148
defp try_upload(image, retry_count) when retry_count < 100 do
@@ -665,9 +670,8 @@ defmodule Philomena.Images do
665670

666671
repo, %{image: {_image, _added, removed_tags}} ->
667672
tag_ids = removed_tags |> Enum.map(& &1.id)
668-
tags = Tag |> where([t], t.id in ^tag_ids)
669673

670-
{count, nil} = repo.update_all(tags, inc: [images_count: -1])
674+
count = Tags.update_image_counts(repo, -1, tag_ids)
671675

672676
{:ok, count}
673677
end)
@@ -954,9 +958,8 @@ defmodule Philomena.Images do
954958
# to way too much drift, and the index has to be
955959
# maintained.
956960
tag_ids = Enum.map(image.tags, & &1.id)
957-
query = where(Tag, [t], t.id in ^tag_ids)
958961

959-
repo.update_all(query, inc: [images_count: -1])
962+
Tags.update_image_counts(repo, -1, tag_ids)
960963

961964
{:ok, image.tags}
962965
end)

lib/philomena/tags.ex

Lines changed: 121 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ defmodule Philomena.Tags do
44
"""
55

66
import Ecto.Query, warn: false
7+
alias Ecto.Multi
78
alias Philomena.Repo
89

910
alias PhilomenaQuery.Search
@@ -24,6 +25,47 @@ defmodule Philomena.Tags do
2425
alias Philomena.DnpEntries.DnpEntry
2526
alias Philomena.Channels.Channel
2627

28+
# There is a really delicate nuance that must be known to avoid deadlocks in
29+
# vectorized mutation queries such as `INSERT ON CONFLICT UPDATE`, `UPDATE`,
30+
# `DELETE`, `SELECT FOR [NO KEY] UPDATE` that touch multiple records. Note that
31+
# `INSERT ON CONFLICT DO NOTHING` doesn't lock the conflicting records, so this
32+
# nuance doesn't apply in that case (https://dba.stackexchange.com/questions/322912/will-insert-on-conflict-do-nothing-lock-the-row-in-case-of-conflict)
33+
#
34+
# If a vectorized mutation is run without a consistent locking order of the records,
35+
# it can end up with a deadlock where one transaction locks a set of records
36+
# that overlap with the other transaction while the other transaction locks
37+
# the other set that overlaps with the first transaction. Thus, both transactions
38+
# wait for each other to release the locks on records they locked resulting in
39+
# a deadlock.
40+
#
41+
# For raw `UPDATE/DELETE ... WHERE ... IN (...)` queries, the items inside `IN (...)`
42+
# don't influence the order of locking. These queries also don't have an `ORDER BY`
43+
# clause. Thus, this function returns a `SELECT [lock_type]` query that establishes a
44+
# consistent order of records by primary keys that must be used with all vectorized
45+
# mutation queries to avoid deadlocks. This query can be used as a subquery in
46+
# the `WHERE` clause for the vectorized mutation.
47+
#
48+
# If no locking order is set, the deadlock can appear randomly and its probability
49+
# increases with the amount of items in the vectorized mutation query and with
50+
# the number of overlapping records in concurrent transactions.
51+
#
52+
# This phenomena was discovered when @MareStare was trying to parallelize
53+
# the image creation process for seeding the images during development, where
54+
# tons of image uploads are issued in parallel with many overlapping tags
55+
# (https://github.com/philomena-dev/philomena/pull/481).
56+
#
57+
# Big thanks to this StackOverflow post for explanations:
58+
# https://stackoverflow.com/questions/27262900/postgres-update-and-lock-ordering/27263824#27263824
59+
defmacro vectorized_mutation_lock(lock_type, tag_ids) do
60+
quote do
61+
Tag
62+
|> select([t], t.id)
63+
|> lock(unquote(lock_type))
64+
|> where([t], t.id in ^unquote(tag_ids))
65+
|> order_by([t], t.id)
66+
end
67+
end
68+
2769
@doc """
2870
Gets existing tags or creates new ones from a tag list string.
2971
@@ -39,43 +81,72 @@ defmodule Philomena.Tags do
3981
"""
4082
@spec get_or_create_tags(String.t()) :: list()
4183
def get_or_create_tags(tag_list) do
42-
tag_names = Tag.parse_tag_list(tag_list)
43-
44-
existent_tags =
45-
Tag
46-
|> where([t], t.name in ^tag_names)
47-
|> preload([:implied_tags, aliased_tag: :implied_tags])
48-
|> Repo.all()
49-
|> Enum.uniq_by(& &1.name)
50-
51-
existent_tag_names =
52-
existent_tags
53-
|> Map.new(&{&1.name, true})
84+
case Tag.parse_tag_list(tag_list) do
85+
[] -> []
86+
tag_names -> get_or_create_non_empty_tags_list(tag_names)
87+
end
88+
end
5489

55-
nonexistent_tag_names =
90+
@spec get_or_create_non_empty_tags_list(list(String.t())) :: list()
91+
defp get_or_create_non_empty_tags_list(tag_names) do
92+
tags =
5693
tag_names
57-
|> Enum.reject(&existent_tag_names[&1])
58-
59-
# Now get rid of the aliases
60-
existent_tags =
61-
existent_tags
62-
|> Enum.map(&(&1.aliased_tag || &1))
63-
64-
new_tags =
65-
nonexistent_tag_names
66-
|> Enum.map(fn name ->
67-
{:ok, tag} =
68-
%Tag{}
69-
|> Tag.creation_changeset(%{name: name})
70-
|> Repo.insert()
71-
72-
%{tag | implied_tags: []}
94+
|> Enum.sort()
95+
|> Enum.dedup()
96+
|> Enum.map(fn tag_name ->
97+
%Tag{}
98+
|> Tag.creation_changeset(%{name: tag_name})
99+
|> Ecto.Changeset.apply_changes()
100+
|> Map.take([
101+
:slug,
102+
:name,
103+
:category,
104+
:images_count,
105+
:description,
106+
:short_description,
107+
:namespace,
108+
:name_in_namespace,
109+
:image,
110+
:image_format,
111+
:image_mime_type,
112+
:mod_notes
113+
])
114+
|> Map.merge(%{
115+
created_at: {:placeholder, :timestamp},
116+
updated_at: {:placeholder, :timestamp}
117+
})
73118
end)
74119

120+
%{new_tags: {_rows_affected, new_tags}, all_tags: all_tags} =
121+
Multi.new()
122+
|> Multi.insert_all(
123+
:new_tags,
124+
Tag,
125+
tags,
126+
placeholders: %{timestamp: DateTime.utc_now() |> DateTime.truncate(:second)},
127+
on_conflict: :nothing,
128+
returning: [:id]
129+
)
130+
|> Multi.all(
131+
:all_tags,
132+
Tag
133+
|> where([t], t.name in ^tag_names)
134+
|> distinct([t], t.name)
135+
|> preload([:implied_tags, aliased_tag: :implied_tags])
136+
)
137+
|> Repo.transaction()
138+
|> case do
139+
{:ok, ok} ->
140+
ok
141+
142+
result ->
143+
raise "get_or_create_tags failed: #{inspect(result)}\ntag_names: #{inspect(tag_names)}"
144+
end
145+
75146
new_tags
76147
|> reindex_tags()
77148

78-
existent_tags ++ new_tags
149+
all_tags
79150
end
80151

81152
@doc """
@@ -548,9 +619,7 @@ defmodule Philomena.Tags do
548619

549620
tag_ids = Enum.map(taggings, & &1.tag_id)
550621

551-
Tag
552-
|> where([t], t.id in ^tag_ids)
553-
|> Repo.update_all(inc: [images_count: 1])
622+
update_image_counts(Repo, 1, tag_ids)
554623

555624
tag_ids
556625
end)
@@ -560,6 +629,25 @@ defmodule Philomena.Tags do
560629
|> Repo.all()
561630
end
562631

632+
@doc """
633+
Accepts IDs of tags and increments their `images_count` by 1.
634+
"""
635+
@spec update_image_counts(term(), integer(), [integer()]) :: integer()
636+
def update_image_counts(repo, diff, tag_ids)
637+
638+
def update_image_counts(_repo, _diff, []), do: 0
639+
640+
def update_image_counts(repo, diff, tag_ids) do
641+
locked_tags = vectorized_mutation_lock("FOR NO KEY UPDATE", tag_ids)
642+
643+
{rows_affected, _} =
644+
Tag
645+
|> where([t], t.id in subquery(locked_tags))
646+
|> repo.update_all(inc: [images_count: diff])
647+
648+
rows_affected
649+
end
650+
563651
@doc """
564652
Returns an `%Ecto.Changeset{}` for tracking tag changes.
565653

0 commit comments

Comments
 (0)