Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 37 additions & 3 deletions app/models/solid_cache/entry.rb
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,21 @@ def clear_delete
end
end

def lock_and_write(key, &block)
def lock_and_write(key, placeholder_payload: nil, &block)
transaction do
without_query_cache do
ensure_lockable_row_exists(key, placeholder_payload) if placeholder_payload

result = lock.where(key_hash: key_hash_for(key)).pick(:key, :value)
new_value = block.call(result&.first == key ? result[1] : nil)
write(key, new_value) if new_value
current_value = result&.first == key ? result[1] : nil
new_value = block.call(current_value)

if new_value
write(key, new_value)
elsif placeholder_payload && current_value == placeholder_payload
delete_by_key(key)
end

new_value
end
end
Expand All @@ -85,6 +94,31 @@ def id_range
end

private
def ensure_lockable_row_exists(key, payload)
retries = 0

begin
insert_all(
add_key_hash_and_byte_size([
{ key: key, value: payload, created_at: Time.current }
]),
unique_by: upsert_unique_by
)
rescue ActiveRecord::RecordNotUnique
# Another concurrent writer created the row first.
rescue ActiveRecord::StatementInvalid => error
raise unless sqlite_busy_exception?(error) && retries < 3

retries += 1
sleep(0.01 * retries)
retry
end
end

def sqlite_busy_exception?(error)
connection.adapter_name == "SQLite" && error.message.include?("database is locked")
end

def add_key_hash_and_byte_size(payloads)
payloads.map do |payload|
payload.dup.tap do |payload|
Expand Down
18 changes: 16 additions & 2 deletions lib/solid_cache/store/api.rb
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,10 @@ def write_entry(key, entry, raw: false, unless_exist: false, **options)

if unless_exist
written = false
entry_lock_and_write(key) do |value|
entry_lock_and_write(
key,
placeholder_payload: expired_placeholder_payload(raw: raw, **options)
) do |value|
if value.nil? || deserialize_entry(value, **options).expired?
written = true
payload
Expand Down Expand Up @@ -155,7 +158,10 @@ def adjust(name, amount, options)
options = merged_options(options)
key = normalize_key(name, options)

new_value = entry_lock_and_write(key) do |value|
new_value = entry_lock_and_write(
key,
placeholder_payload: expired_placeholder_payload(**options)
) do |value|
serialize_entry(adjusted_entry(value, amount, options))
end
deserialize_entry(new_value, **options).value if new_value
Expand All @@ -174,6 +180,14 @@ def adjusted_entry(value, amount, options)
ActiveSupport::Cache::Entry.new(amount, **options)
end
end

def expired_placeholder_payload(raw: false, **options)
serialize_entry(
ActiveSupport::Cache::Entry.new(nil, expires_in: -1),
raw: raw,
**options
)
end
end
end
end
4 changes: 2 additions & 2 deletions lib/solid_cache/store/entries.rb
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,9 @@ def entry_clear
end
end

def entry_lock_and_write(key, &block)
def entry_lock_and_write(key, placeholder_payload: nil, &block)
writing_key(key, failsafe: :increment) do
Entry.lock_and_write(key) do |value|
Entry.lock_and_write(key, placeholder_payload: placeholder_payload) do |value|
block.call(value).tap { |result| track_writes(1) if result }
end
end
Expand Down
80 changes: 80 additions & 0 deletions test/unit/solid_cache_test.rb
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# frozen_string_literal: true

require "test_helper"
require "concurrent"
require_relative "behaviors"
require "active_support/testing/method_call_assertions"

Expand Down Expand Up @@ -65,6 +66,85 @@ def test_write_expired_value_with_unless_exist
travel 2.seconds
assert_equal true, @cache.write(1, "bbbb", expires_in: 1.second, unless_exist: true)
end

def test_expired_placeholder_payload_deserializes_as_expired_entry
payload = @cache.send(
:serialize_entry,
ActiveSupport::Cache::Entry.new(nil, expires_in: -1)
)

entry = @cache.send(:deserialize_entry, payload)

assert entry
assert_predicate entry, :expired?
end

def test_concurrent_increment_on_missing_key_serializes_initial_creation
key = SecureRandom.uuid
barrier = Concurrent::CyclicBarrier.new(2)
results = Queue.new
errors = Queue.new

threads = 2.times.map do
Thread.new do
cache = lookup_store(namespace: @namespace, expires_in: 60)

barrier.wait
results << cache.increment(key)
rescue => error
errors << error
end
end

threads.each(&:join)

thread_errors = []
thread_errors << errors.pop until errors.empty?

assert_predicate thread_errors, :empty?, <<~MSG
expected no thread errors, got #{thread_errors.size}:
#{thread_errors.map { |e| "#{e.class}: #{e.message}\n#{Array(e.backtrace).first(10).join("\n")}" }.join("\n\n")}
MSG

values = 2.times.map { results.pop }.sort
assert_equal [1, 2], values
assert_equal 2, @cache.read(key, raw: true).to_i
end

def test_concurrent_write_with_unless_exist_only_writes_once_for_missing_key
key = SecureRandom.uuid
barrier = Concurrent::CyclicBarrier.new(2)
results = Queue.new
errors = Queue.new

payloads = [ "first", "second" ]

threads = payloads.map do |payload|
Thread.new(payload) do |value|
cache = lookup_store(namespace: @namespace, expires_in: 60)

barrier.wait
results << cache.write(key, value, unless_exist: true)
rescue => error
errors << error
end
end

threads.each(&:join)

thread_errors = []
thread_errors << errors.pop until errors.empty?

assert_predicate thread_errors, :empty?, <<~MSG
expected no thread errors, got #{thread_errors.size}:
#{thread_errors.map { |e| "#{e.class}: #{e.message}\n#{Array(e.backtrace).first(10).join("\n")}" }.join("\n\n")}
MSG

values = 2.times.map { results.pop }
assert_equal 1, values.count(true)
assert_equal 1, values.count(false)
assert_includes [ "first", "second" ], @cache.read(key)
end
end

class SolidCacheFailsafeTest < ActiveSupport::TestCase
Expand Down
Loading