From 183a89ec6eb3db3209be72142b2f972482aab0a6 Mon Sep 17 00:00:00 2001 From: Toru KAWAMURA Date: Mon, 9 Mar 2026 17:11:05 +0900 Subject: [PATCH] Fix lock_and_write races for missing cache entries When lock_and_write is called for a key that has no backing row yet, concurrent callers can observe and write as if they were first. Create a lockable expired placeholder row before locking so the read-modify-write path is serialized even for missing entries. Also clean up unused placeholders and retry transient SQLite "database is locked" errors when creating the placeholder row. Add tests covering: - expired placeholder payload deserialization - concurrent increment on a missing key - concurrent write(unless_exist: true) on a missing key --- app/models/solid_cache/entry.rb | 40 ++++++++++++++-- lib/solid_cache/store/api.rb | 18 ++++++- lib/solid_cache/store/entries.rb | 4 +- test/unit/solid_cache_test.rb | 80 ++++++++++++++++++++++++++++++++ 4 files changed, 135 insertions(+), 7 deletions(-) diff --git a/app/models/solid_cache/entry.rb b/app/models/solid_cache/entry.rb index 3203aa1a..110ecc21 100644 --- a/app/models/solid_cache/entry.rb +++ b/app/models/solid_cache/entry.rb @@ -67,12 +67,21 @@ def clear_delete end end - def lock_and_write(key, &block) + def lock_and_write(key, placeholder_payload: nil, &block) transaction do without_query_cache do + ensure_lockable_row_exists(key, placeholder_payload) if placeholder_payload + result = lock.where(key_hash: key_hash_for(key)).pick(:key, :value) - new_value = block.call(result&.first == key ? result[1] : nil) - write(key, new_value) if new_value + current_value = result&.first == key ? result[1] : nil + new_value = block.call(current_value) + + if new_value + write(key, new_value) + elsif placeholder_payload && current_value == placeholder_payload + delete_by_key(key) + end + new_value end end @@ -85,6 +94,31 @@ def id_range end private + def ensure_lockable_row_exists(key, payload) + retries = 0 + + begin + insert_all( + add_key_hash_and_byte_size([ + { key: key, value: payload, created_at: Time.current } + ]), + unique_by: upsert_unique_by + ) + rescue ActiveRecord::RecordNotUnique + # Another concurrent writer created the row first. + rescue ActiveRecord::StatementInvalid => error + raise unless sqlite_busy_exception?(error) && retries < 3 + + retries += 1 + sleep(0.01 * retries) + retry + end + end + + def sqlite_busy_exception?(error) + connection.adapter_name == "SQLite" && error.message.include?("database is locked") + end + def add_key_hash_and_byte_size(payloads) payloads.map do |payload| payload.dup.tap do |payload| diff --git a/lib/solid_cache/store/api.rb b/lib/solid_cache/store/api.rb index b7859ab0..6439b9c8 100644 --- a/lib/solid_cache/store/api.rb +++ b/lib/solid_cache/store/api.rb @@ -54,7 +54,10 @@ def write_entry(key, entry, raw: false, unless_exist: false, **options) if unless_exist written = false - entry_lock_and_write(key) do |value| + entry_lock_and_write( + key, + placeholder_payload: expired_placeholder_payload(raw: raw, **options) + ) do |value| if value.nil? || deserialize_entry(value, **options).expired? written = true payload @@ -155,7 +158,10 @@ def adjust(name, amount, options) options = merged_options(options) key = normalize_key(name, options) - new_value = entry_lock_and_write(key) do |value| + new_value = entry_lock_and_write( + key, + placeholder_payload: expired_placeholder_payload(**options) + ) do |value| serialize_entry(adjusted_entry(value, amount, options)) end deserialize_entry(new_value, **options).value if new_value @@ -174,6 +180,14 @@ def adjusted_entry(value, amount, options) ActiveSupport::Cache::Entry.new(amount, **options) end end + + def expired_placeholder_payload(raw: false, **options) + serialize_entry( + ActiveSupport::Cache::Entry.new(nil, expires_in: -1), + raw: raw, + **options + ) + end end end end diff --git a/lib/solid_cache/store/entries.rb b/lib/solid_cache/store/entries.rb index 4da1240e..e3ad2073 100644 --- a/lib/solid_cache/store/entries.rb +++ b/lib/solid_cache/store/entries.rb @@ -27,9 +27,9 @@ def entry_clear end end - def entry_lock_and_write(key, &block) + def entry_lock_and_write(key, placeholder_payload: nil, &block) writing_key(key, failsafe: :increment) do - Entry.lock_and_write(key) do |value| + Entry.lock_and_write(key, placeholder_payload: placeholder_payload) do |value| block.call(value).tap { |result| track_writes(1) if result } end end diff --git a/test/unit/solid_cache_test.rb b/test/unit/solid_cache_test.rb index 2376345c..c2589274 100644 --- a/test/unit/solid_cache_test.rb +++ b/test/unit/solid_cache_test.rb @@ -1,6 +1,7 @@ # frozen_string_literal: true require "test_helper" +require "concurrent" require_relative "behaviors" require "active_support/testing/method_call_assertions" @@ -65,6 +66,85 @@ def test_write_expired_value_with_unless_exist travel 2.seconds assert_equal true, @cache.write(1, "bbbb", expires_in: 1.second, unless_exist: true) end + + def test_expired_placeholder_payload_deserializes_as_expired_entry + payload = @cache.send( + :serialize_entry, + ActiveSupport::Cache::Entry.new(nil, expires_in: -1) + ) + + entry = @cache.send(:deserialize_entry, payload) + + assert entry + assert_predicate entry, :expired? + end + + def test_concurrent_increment_on_missing_key_serializes_initial_creation + key = SecureRandom.uuid + barrier = Concurrent::CyclicBarrier.new(2) + results = Queue.new + errors = Queue.new + + threads = 2.times.map do + Thread.new do + cache = lookup_store(namespace: @namespace, expires_in: 60) + + barrier.wait + results << cache.increment(key) + rescue => error + errors << error + end + end + + threads.each(&:join) + + thread_errors = [] + thread_errors << errors.pop until errors.empty? + + assert_predicate thread_errors, :empty?, <<~MSG + expected no thread errors, got #{thread_errors.size}: + #{thread_errors.map { |e| "#{e.class}: #{e.message}\n#{Array(e.backtrace).first(10).join("\n")}" }.join("\n\n")} + MSG + + values = 2.times.map { results.pop }.sort + assert_equal [1, 2], values + assert_equal 2, @cache.read(key, raw: true).to_i + end + + def test_concurrent_write_with_unless_exist_only_writes_once_for_missing_key + key = SecureRandom.uuid + barrier = Concurrent::CyclicBarrier.new(2) + results = Queue.new + errors = Queue.new + + payloads = [ "first", "second" ] + + threads = payloads.map do |payload| + Thread.new(payload) do |value| + cache = lookup_store(namespace: @namespace, expires_in: 60) + + barrier.wait + results << cache.write(key, value, unless_exist: true) + rescue => error + errors << error + end + end + + threads.each(&:join) + + thread_errors = [] + thread_errors << errors.pop until errors.empty? + + assert_predicate thread_errors, :empty?, <<~MSG + expected no thread errors, got #{thread_errors.size}: + #{thread_errors.map { |e| "#{e.class}: #{e.message}\n#{Array(e.backtrace).first(10).join("\n")}" }.join("\n\n")} + MSG + + values = 2.times.map { results.pop } + assert_equal 1, values.count(true) + assert_equal 1, values.count(false) + assert_includes [ "first", "second" ], @cache.read(key) + end end class SolidCacheFailsafeTest < ActiveSupport::TestCase