From d2e8611c09265ee5e0ee1de6997380e75624869b Mon Sep 17 00:00:00 2001 From: Anthony Accomazzo Date: Sun, 29 Jun 2025 20:15:08 -0700 Subject: [PATCH] Fix FIFO ordering for queued sync_info messages --- .gitignore | 1 + lib/gen_stage/buffer.ex | 2 +- test/gen_stage/buffer_test.exs | 136 +++++++++++++++++++++++++++++++++ test/gen_stage_test.exs | 33 ++++++++ 4 files changed, 171 insertions(+), 1 deletion(-) create mode 100644 test/gen_stage/buffer_test.exs diff --git a/.gitignore b/.gitignore index 008b35d..d4a9d99 100644 --- a/.gitignore +++ b/.gitignore @@ -4,3 +4,4 @@ /doc erl_crash.dump *.ez +.DS_Store diff --git a/lib/gen_stage/buffer.ex b/lib/gen_stage/buffer.ex index 9aa0012..9852d85 100644 --- a/lib/gen_stage/buffer.ex +++ b/lib/gen_stage/buffer.ex @@ -178,7 +178,7 @@ defmodule GenStage.Buffer do case :maps.take(pos, wheel) do {perms, wheel} -> maybe_triplet = if wheel == %{}, do: max, else: {new_pos, max, wheel} - {:ok, perms, maybe_triplet} + {:ok, :lists.reverse(perms), maybe_triplet} :error -> {:error, {new_pos, max, wheel}} diff --git a/test/gen_stage/buffer_test.exs b/test/gen_stage/buffer_test.exs new file mode 100644 index 0000000..cae390d --- /dev/null +++ b/test/gen_stage/buffer_test.exs @@ -0,0 +1,136 @@ +defmodule GenStage.BufferTest do + use ExUnit.Case, async: true + + alias GenStage.Buffer + + describe "estimate_size/1" do + test "does not count permanent events in size estimate" do + buffer = Buffer.new(10) + {buffer, _excess, _perms} = Buffer.store_temporary(buffer, [:temp1, :temp2], :first) + {:ok, buffer} = Buffer.store_permanent_unless_empty(buffer, :perm3) + {:ok, buffer} = Buffer.store_permanent_unless_empty(buffer, :perm4) + + # Size should still be 2 (only temporary events) + assert Buffer.estimate_size(buffer) == 2 + end + end + + describe "store_temporary/3 with :first keep strategy" do + test "discards excess events when buffer is full" do + buffer = Buffer.new(3) + assert {buffer, 0, _perms} = Buffer.store_temporary(buffer, [:a, :b, :c], :first) + assert {buffer, 3, _perms} = Buffer.store_temporary(buffer, [:d, :e, :f], :first) + + assert Buffer.estimate_size(buffer) == 3 + + assert {:ok, _buffer, _counter, [:a, :b, :c], []} = + Buffer.take_count_or_until_permanent(buffer, 3) + end + + test "handles infinity buffer size" do + buffer = Buffer.new(:infinity) + events = Enum.to_list(1..1000) + assert {buffer, 0, []} = Buffer.store_temporary(buffer, events, :first) + + assert Buffer.estimate_size(buffer) == 1000 + end + end + + describe "store_temporary/3 with :last keep strategy" do + test "keeps last events when buffer overflows" do + buffer = Buffer.new(3) + assert {buffer, 0, _perms} = Buffer.store_temporary(buffer, [:a, :b, :c], :last) + assert {buffer, 3, _perms} = Buffer.store_temporary(buffer, [:d, :e, :f], :last) + + assert {:ok, _buffer, _counter, [:d, :e, :f], []} = + Buffer.take_count_or_until_permanent(buffer, 3) + end + + test "emits permanent events when they are displaced by new temporaries" do + buffer = Buffer.new(3) + {buffer, _excess, _perms} = Buffer.store_temporary(buffer, [:temp1, :temp2], :last) + {:ok, buffer} = Buffer.store_permanent_unless_empty(buffer, :perm3) + {:ok, buffer} = Buffer.store_permanent_unless_empty(buffer, :perm4) + + # Now overflow with :last strategy, which should displace permanents + {_buffer, excess, perms} = Buffer.store_temporary(buffer, [:temp5, :temp6, :temp7], :last) + + assert excess == 2 + assert length(perms) > 0 + end + end + + describe "store_permanent_unless_empty/2" do + test "returns :empty when buffer is empty" do + buffer = Buffer.new(10) + result = Buffer.store_permanent_unless_empty(buffer, :perm1) + assert result == :empty + end + end + + describe "take_count_or_until_permanent/2" do + test "returns :empty when buffer is empty" do + buffer = Buffer.new(10) + result = Buffer.take_count_or_until_permanent(buffer, 5) + assert result == :empty + end + + test "takes temporary events in FIFO order, stopping at permanent events and returning them" do + buffer = Buffer.new(10) + {buffer, _excess, _perms} = Buffer.store_temporary(buffer, [:temp1, :temp2], :first) + {:ok, buffer} = Buffer.store_permanent_unless_empty(buffer, :perm3) + {buffer, _excess, _perms} = Buffer.store_temporary(buffer, [:temp4], :first) + + {:ok, _buffer, remaining_count, temps, perms} = + Buffer.take_count_or_until_permanent(buffer, 5) + + assert temps == [:temp1, :temp2] + assert perms == [:perm3] + # We wanted 5, got 2 temps, stopped at perm + assert remaining_count == 3 + end + + test "maintains FIFO order for permanent events stored at same position" do + buffer = Buffer.new(10) + {buffer, _excess, _perms} = Buffer.store_temporary(buffer, [:temp1, :temp2], :first) + + # Store multiple permanent events at the same wheel position + {:ok, buffer} = Buffer.store_permanent_unless_empty(buffer, :perm3) + {:ok, buffer} = Buffer.store_permanent_unless_empty(buffer, :perm4) + {:ok, buffer} = Buffer.store_permanent_unless_empty(buffer, :perm5) + + {:ok, _buffer, _remaining_count, temps, perms} = + Buffer.take_count_or_until_permanent(buffer, 5) + + assert temps == [:temp1, :temp2] + assert perms == [:perm3, :perm4, :perm5] + end + + test "interleaves temporary and permanent events correctly across multiple wheel positions" do + buffer = Buffer.new(10) + + {buffer, _excess, _perms} = Buffer.store_temporary(buffer, [:temp1, :temp2], :first) + {:ok, buffer} = Buffer.store_permanent_unless_empty(buffer, :perm3) + {:ok, buffer} = Buffer.store_permanent_unless_empty(buffer, :perm4) + + {buffer, _excess, _perms} = Buffer.store_temporary(buffer, [:temp5, :temp6, :temp7], :first) + {:ok, buffer} = Buffer.store_permanent_unless_empty(buffer, :perm8) + {:ok, buffer} = Buffer.store_permanent_unless_empty(buffer, :perm9) + + {buffer, _excess, _perms} = Buffer.store_temporary(buffer, [:temp10], :first) + {:ok, buffer} = Buffer.store_permanent_unless_empty(buffer, :perm11) + + {:ok, buffer, _remaining, temps1, perms1} = Buffer.take_count_or_until_permanent(buffer, 3) + assert temps1 == [:temp1, :temp2] + assert perms1 == [:perm3, :perm4] + + {:ok, buffer, _remaining, temps2, perms2} = Buffer.take_count_or_until_permanent(buffer, 4) + assert temps2 == [:temp5, :temp6, :temp7] + assert perms2 == [:perm8, :perm9] + + {:ok, _buffer, _remaining, temps3, perms3} = Buffer.take_count_or_until_permanent(buffer, 2) + assert temps3 == [:temp10] + assert perms3 == [:perm11] + end + end +end diff --git a/test/gen_stage_test.exs b/test/gen_stage_test.exs index 6d318bf..dce3e4d 100644 --- a/test/gen_stage_test.exs +++ b/test/gen_stage_test.exs @@ -1060,6 +1060,39 @@ defmodule GenStageTest do Counter.sync_queue(producer, [:c, :d, :e]) assert_receive :sync end + + test "delivers multiple info to producer in FIFO order" do + {:ok, producer} = Counter.start_link({:producer, self(), buffer_size: 10}) + {:ok, consumer} = Forwarder.start_link({:consumer, self()}) + GenStage.sync_subscribe(consumer, to: producer, consumer_demand: :manual) + assert_receive {:producer_subscribed, _} + assert_receive {:consumer_subscribed, sub} + + Counter.sync_queue(producer, [:a, :b, :c]) + GenStage.sync_info(producer, :d) + GenStage.sync_info(producer, :e) + GenStage.sync_info(producer, :f) + Counter.sync_queue(producer, [:g, :h]) + GenStage.sync_info(producer, :i) + + refute_received :d + + Forwarder.ask(consumer, sub, 5) + + assert_receive {:consumed, [:a, :b, :c]} + assert_receive {:consumed, [:g, :h]} + + messages = + for n <- 1..4 do + receive do + msg -> msg + after + 100 -> flunk("Did not receive expected number of messages (got #{n - 1})") + end + end + + assert messages == [:d, :e, :f, :i] + end end describe "sync_subscribe/2" do