From 6420075b56d78130280cb4b689080756739bcbe1 Mon Sep 17 00:00:00 2001 From: Claudio Alvarado Date: Mon, 18 May 2026 22:33:00 -0400 Subject: [PATCH 1/3] add stream implementation --- lib/tds.ex | 6 + lib/tds/messages.ex | 25 +++- lib/tds/protocol.ex | 160 +++++++++++++++++++++++-- lib/tds/result.ex | 6 +- lib/tds/tokens.ex | 17 ++- test/stream_test.exs | 272 +++++++++++++++++++++++++++++++++++++++++++ 6 files changed, 473 insertions(+), 13 deletions(-) create mode 100644 test/stream_test.exs diff --git a/lib/tds.ex b/lib/tds.ex index caa0cc1..ad0f118 100644 --- a/lib/tds.ex +++ b/lib/tds.ex @@ -174,6 +174,12 @@ defmodule Tds do DBConnection.transaction(conn, fun, opts) end + @spec stream(conn, iodata, list, [execute_option]) :: DBConnection.Stream.t() + def stream(conn, statement, params \\ [], opts \\ []) do + query = %Query{statement: statement} + DBConnection.stream(conn, query, params, opts) + end + @spec rollback(DBConnection.t(), reason :: any) :: no_return defdelegate rollback(conn, any), to: DBConnection diff --git a/lib/tds/messages.ex b/lib/tds/messages.ex index 5294e80..f5ff2b1 100644 --- a/lib/tds/messages.ex +++ b/lib/tds/messages.ex @@ -32,14 +32,14 @@ defmodule Tds.Messages do ## Microsoft Stored Procedures # @tds_sp_cursor 1 - # @tds_sp_cursoropen 2 + @tds_sp_cursoropen 2 # @tds_sp_cursorprepare 3 # @tds_sp_cursorexecute 4 # @tds_sp_cursorprepexec 5 # @tds_sp_cursorunprepare 6 - # @tds_sp_cursorfetch 7 + @tds_sp_cursorfetch 7 # @tds_sp_cursoroption 8 - # @tds_sp_cursorclose 9 + @tds_sp_cursorclose 9 @tds_sp_executesql 10 @tds_sp_prepare 11 @tds_sp_execute 12 @@ -186,6 +186,10 @@ defmodule Tds.Messages do m = msg_result(m, params: [param | params]) {m, c, s} + {:returnvalue, param}, {msg_result(params: params) = m, c, s} -> + m = msg_result(m, params: [param | params]) + {m, c, s} + {:returnstatus, status}, {msg_result() = m, c, s} -> m = msg_result(m, status: status) {m, c, s} @@ -401,6 +405,21 @@ defmodule Tds.Messages do encode_rpc_params(params, "") end + defp encode_rpc(:sp_cursoropen, params) do + <<0xFF, 0xFF, @tds_sp_cursoropen::little-size(2)-unit(8), 0x00, 0x00>> <> + encode_rpc_params(params, "") + end + + defp encode_rpc(:sp_cursorfetch, params) do + <<0xFF, 0xFF, @tds_sp_cursorfetch::little-size(2)-unit(8), 0x00, 0x00>> <> + encode_rpc_params(params, "") + end + + defp encode_rpc(:sp_cursorclose, params) do + <<0xFF, 0xFF, @tds_sp_cursorclose::little-size(2)-unit(8), 0x00, 0x00>> <> + encode_rpc_params(params, "") + end + # Finished processing params defp encode_rpc_params([], ret), do: ret diff --git a/lib/tds/protocol.ex b/lib/tds/protocol.ex index 2fc918e..d441893 100644 --- a/lib/tds/protocol.ex +++ b/lib/tds/protocol.ex @@ -310,8 +310,41 @@ defmodule Tds.Protocol do ) :: {:cont | :halt, Tds.Result.t(), new_state :: t()} | {:error | :disconnect, Exception.t(), new_state :: t()} - def handle_fetch(_query, _cursor, _opts, state) do - {:error, Tds.Error.exception("Cursor is not supported by TDS"), state} + def handle_fetch(_query, cursor, opts, %{sock: _sock} = s) do + fetch_type = Keyword.get(opts, :fetch_type, 2) + max_rows = Keyword.get(opts, :max_rows, 500) + + params = [ + %Tds.Parameter{name: "@cursor", type: :integer, direction: :input, value: cursor}, + %Tds.Parameter{name: "@fetchtype", type: :integer, direction: :input, value: fetch_type}, + %Tds.Parameter{name: "@rownum", type: :integer, direction: :input, value: 0}, + %Tds.Parameter{name: "@nrows", type: :integer, direction: :input, value: max_rows} + ] + + msg = msg_rpc(proc: :sp_cursorfetch, params: params) + + Process.put(:resultset, false) + + s = %{s | state: :executing} + + case msg_send(msg, s) do + {:ok, %{result: %Tds.Result{num_rows: 0}} = s} -> + {:halt, %Tds.Result{columns: [], rows: [], num_rows: 0}, s} + + {:ok, %{result: %Tds.Result{} = result} = s} -> + {:cont, result, s} + + {:error, err, %{transaction: :started} = s} -> + {:error, err, %{s | transaction: :failed}} + + {:error, err, s} -> + {:error, err, s} + end + rescue + exception -> + {:error, exception, s} + after + Process.delete(:resultset) end @spec handle_deallocate( @@ -322,17 +355,123 @@ defmodule Tds.Protocol do ) :: {:ok, Tds.Result.t(), new_state :: t()} | {:error | :disconnect, Exception.t(), new_state :: t()} - def handle_deallocate(_query, _cursor, _opts, state) do - {:error, Tds.Error.exception("Cursor operations are not supported in TDS"), state} + def handle_deallocate(_query, cursor, _opts, %{sock: _sock} = s) do + params = [ + %Tds.Parameter{name: "@cursor", type: :integer, direction: :input, value: cursor} + ] + + msg = msg_rpc(proc: :sp_cursorclose, params: params) + + s = %{s | state: :executing} + + case msg_send(msg, s) do + {:ok, %{result: result} = s} -> + {:ok, result, %{s | state: :ready}} + + {:error, err, %{transaction: :started} = s} -> + {:error, err, %{s | transaction: :failed}} + + {:error, err, s} -> + {:error, err, s} + end end @spec handle_declare(Query.t(), params :: any, opts :: Keyword.t(), state :: t) :: {:ok, Query.t(), cursor :: any, new_state :: t} | {:error | :disconnect, Exception.t(), new_state :: t} - def handle_declare(_query, _params, _opts, state) do - {:error, Tds.Error.exception("Cursor operations are not supported in TDS"), state} + def handle_declare(%Query{statement: statement} = query, params, opts, %{sock: _sock} = s) do + resultset? = Keyword.get(opts, :resultset, false) + + prepared_params = Tds.Parameter.prepared_params(params) + + cursor_param = %Tds.Parameter{ + name: "@cursor", + type: :integer, + direction: :output, + value: nil + } + + scrollopt = + if prepared_params != "" do + 0x1004 + else + 0x0004 + end + + rpc_params = + [ + cursor_param, + %Tds.Parameter{ + name: "@stmt", + type: :string, + direction: :input, + value: statement + }, + %Tds.Parameter{ + name: "@scrollopt", + type: :integer, + direction: :input, + value: scrollopt + }, + %Tds.Parameter{ + name: "@ccopt", + type: :integer, + direction: :input, + value: 0x2001 + }, + %Tds.Parameter{ + name: "@rowcount", + type: :integer, + direction: :output, + value: nil + } + ] ++ + if prepared_params != "" do + [ + %Tds.Parameter{ + name: "@params", + type: :string, + direction: :input, + value: prepared_params + } + ] ++ Tds.Parameter.prepare_params(params) + else + [] + end + + msg = msg_rpc(proc: :sp_cursoropen, params: rpc_params) + + Process.put(:resultset, resultset?) + + case msg_send(msg, %{s | state: :executing}) do + {:ok, %{result: %Tds.Result{} = result} = s} -> + Process.delete(:resultset) + cursor_handle = extract_cursor_handle(result) + {:ok, query, cursor_handle, %{s | state: :ready}} + + {:error, err, %{transaction: :started} = s} -> + Process.delete(:resultset) + {:error, err, %{s | transaction: :failed}} + + {:error, err, s} -> + Process.delete(:resultset) + {:error, err, s} + end + rescue + exception -> + Process.delete(:resultset) + {:error, exception, s} end + defp extract_cursor_handle(%Tds.Result{params: params}) do + case Enum.find(params, &(&1.name == "@cursor" and &1.direction == :output)) do + %Tds.Parameter{value: cursor_id} when is_integer(cursor_id) -> cursor_id + _ -> nil + end + end + + defp extract_cursor_handle(_), do: nil + # CONNECTION defp instance(opts, s) do @@ -762,7 +901,7 @@ defmodule Tds.Protocol do |> send_query(state) end - def message(:executing, msg_result(set: set), s) do + def message(:executing, msg_result(set: set, params: params), s) do resultset? = Process.get(:resultset, false) result = @@ -772,6 +911,13 @@ defmodule Tds.Protocol do {[h | _t], _false} -> h end + result = + if result != nil and params != [] do + %{result | params: params} + else + result + end + {:ok, mark_ready(%{s | result: result})} end diff --git a/lib/tds/result.ex b/lib/tds/result.ex index c3daf34..2f31155 100644 --- a/lib/tds/result.ex +++ b/lib/tds/result.ex @@ -8,16 +8,18 @@ defmodule Tds.Result do * `rows`: The result set as a list of tuples. Each tuple corresponds to a row, while each element in the tuple corresponds to a column. * `num_rows`: The number of fetched or affected rows. + * `params`: Output parameters returned from stored procedures or cursors. """ @typedoc "The result of a database query." @type t :: %__MODULE__{ columns: nil | [String.t()], rows: nil | [[any()]], - num_rows: integer + num_rows: integer, + params: [Tds.Parameter.t()] } - defstruct columns: nil, rows: nil, num_rows: 0 + defstruct columns: nil, rows: nil, num_rows: 0, params: [] if Code.ensure_loaded?(Table.Reader) do defimpl Table.Reader, for: Tds.Result do diff --git a/lib/tds/tokens.ex b/lib/tds/tokens.ex index 31e26e2..42828a3 100644 --- a/lib/tds/tokens.ex +++ b/lib/tds/tokens.ex @@ -47,7 +47,8 @@ defmodule Tds.Tokens do {token_data, tail, collmetadata} = case token do 0x81 -> decode_colmetadata(tail, collmetadata) - # 0xA5 -> decode_colinfo(tail, collmetadata) + 0xA4 -> decode_tabname(tail, collmetadata) + 0xA5 -> decode_colinfo(tail, collmetadata) 0xFD -> decode_done(tail, collmetadata) 0xFE -> decode_doneproc(tail, collmetadata) 0xFF -> decode_doneinproc(tail, collmetadata) @@ -496,6 +497,20 @@ defmodule Tds.Tokens do decode_column_order(tail, n - 1, [col_id | acc]) end + defp decode_tabname( + <>, + collmetadata + ) do + {{:tabname, :ok}, tail, collmetadata} + end + + defp decode_colinfo( + <>, + collmetadata + ) do + {{:colinfo, :ok}, tail, collmetadata} + end + ## Row and Column Decoders defp bitmap_list(tail, n) when n <= 0 do diff --git a/test/stream_test.exs b/test/stream_test.exs new file mode 100644 index 0000000..15c2e50 --- /dev/null +++ b/test/stream_test.exs @@ -0,0 +1,272 @@ +defmodule Tds.StreamTest do + use ExUnit.Case, async: true + import Tds.TestHelper + + @moduletag capture_log: true + + setup context do + opts = [ + isolation_level: :snapshot, + idle: :active, + backoff_type: :stop, + prepare: context[:prepare] || :named + ] + + opts = Keyword.merge(Tds.TestHelper.opts(), opts) + + {:ok, pid} = Tds.start_link(opts) + {:ok, [pid: pid]} + end + + @tag :stream + test "stream rows from simple query", context do + pid = context[:pid] + + :ok = + query( + "IF OBJECT_ID('stream_test', 'U') IS NOT NULL DROP TABLE stream_test; CREATE TABLE stream_test (id INT PRIMARY KEY, name NVARCHAR(100));", + [] + ) + + :ok = + query( + "INSERT INTO stream_test (id, name) VALUES (1, N'one'), (2, N'two'), (3, N'three'), (4, N'four'), (5, N'five');", + [] + ) + + Tds.transaction(pid, fn conn -> + stream = Tds.stream(conn, "SELECT id, name FROM stream_test ORDER BY id", []) + results = Enum.to_list(stream) + + assert length(results) > 0 + + all_rows = + results + |> Enum.flat_map(fn %Tds.Result{rows: rows} -> rows || [] end) + + assert length(all_rows) == 5 + end) + + :ok = query("DROP TABLE stream_test", []) + end + + @tag :stream + test "stream with max_rows chunks results", context do + pid = context[:pid] + + :ok = + query( + "IF OBJECT_ID('stream_test_max', 'U') IS NOT NULL DROP TABLE stream_test_max; CREATE TABLE stream_test_max (id INT PRIMARY KEY, value INT);", + [] + ) + + values = Enum.map_join(1..10, ", ", fn i -> "(#{i}, #{i * 10})" end) + + :ok = query("INSERT INTO stream_test_max (id, value) VALUES #{values};", []) + + Tds.transaction(pid, fn conn -> + stream = + Tds.stream(conn, "SELECT id, value FROM stream_test_max ORDER BY id", [], max_rows: 3) + + results = Enum.to_list(stream) + + assert length(results) >= 1 + + all_rows = + results + |> Enum.flat_map(fn %Tds.Result{rows: rows} -> rows || [] end) + + assert length(all_rows) == 10 + end) + + :ok = query("DROP TABLE stream_test_max", []) + end + + @tag :stream + test "stream with parameterized query", context do + pid = context[:pid] + + :ok = + query( + "IF OBJECT_ID('stream_test_params', 'U') IS NOT NULL DROP TABLE stream_test_params; CREATE TABLE stream_test_params (id INT PRIMARY KEY, name NVARCHAR(100));", + [] + ) + + :ok = + query( + "INSERT INTO stream_test_params (id, name) VALUES (1, N'alice'), (2, N'bob'), (3, N'charlie');", + [] + ) + + Tds.transaction(pid, fn conn -> + stream = + Tds.stream( + conn, + "SELECT id, name FROM stream_test_params WHERE id > @1 ORDER BY id", + [%Tds.Parameter{name: "@1", type: :integer, value: 1}] + ) + + results = Enum.to_list(stream) + + all_rows = + results + |> Enum.flat_map(fn %Tds.Result{rows: rows} -> rows || [] end) + + assert length(all_rows) == 2 + end) + + :ok = query("DROP TABLE stream_test_params", []) + end + + @tag :stream + test "stream empty result set", context do + pid = context[:pid] + + :ok = + query( + "IF OBJECT_ID('stream_test_empty', 'U') IS NOT NULL DROP TABLE stream_test_empty; CREATE TABLE stream_test_empty (id INT PRIMARY KEY, name NVARCHAR(100));", + [] + ) + + Tds.transaction(pid, fn conn -> + stream = Tds.stream(conn, "SELECT id, name FROM stream_test_empty", []) + results = Enum.to_list(stream) + + all_rows = + results + |> Enum.flat_map(fn %Tds.Result{rows: rows} -> rows || [] end) + + assert all_rows == [] + end) + + :ok = query("DROP TABLE stream_test_empty", []) + end + + @tag :stream + test "stream with DBConnection.stream/4", context do + pid = context[:pid] + + :ok = + query( + "IF OBJECT_ID('stream_test_dbc', 'U') IS NOT NULL DROP TABLE stream_test_dbc; CREATE TABLE stream_test_dbc (id INT PRIMARY KEY, name NVARCHAR(50));", + [] + ) + + :ok = + query( + "INSERT INTO stream_test_dbc (id, name) VALUES (1, N'a'), (2, N'b');", + [] + ) + + Tds.transaction(pid, fn conn -> + query = %Tds.Query{statement: "SELECT id, name FROM stream_test_dbc ORDER BY id"} + stream = DBConnection.stream(conn, query, []) + results = Enum.to_list(stream) + + all_rows = + results + |> Enum.flat_map(fn %Tds.Result{rows: rows} -> rows || [] end) + + assert length(all_rows) == 2 + end) + + :ok = query("DROP TABLE stream_test_dbc", []) + end + + @tag :stream + test "MAY take part of stream", context do + pid = context[:pid] + + :ok = + query( + "IF OBJECT_ID('stream_take', 'U') IS NOT NULL DROP TABLE stream_take; CREATE TABLE stream_take (id INT PRIMARY KEY);", + [] + ) + + :ok = query("INSERT INTO stream_take (id) VALUES (1), (2), (3);", []) + + Tds.transaction(pid, fn conn -> + query = %Tds.Query{statement: "SELECT id FROM stream_take ORDER BY id"} + stream = DBConnection.stream(conn, query, [], max_rows: 1) + + rows = + stream + |> Stream.map(fn %Tds.Result{rows: rows} -> rows end) + |> Enum.take(1) + + assert rows == [[[1, 1]]] + end) + + :ok = query("DROP TABLE stream_take", []) + end + + @tag :stream + test "stream works after prior query on same connection", context do + pid = context[:pid] + + :ok = + query( + "IF OBJECT_ID('stream_after_query', 'U') IS NOT NULL DROP TABLE stream_after_query; CREATE TABLE stream_after_query (id INT PRIMARY KEY);", + [] + ) + + :ok = query("INSERT INTO stream_after_query (id) VALUES (1), (2);", []) + + Tds.transaction(pid, fn conn -> + {:ok, %Tds.Result{rows: [[42]]}} = Tds.query(conn, "SELECT 42", []) + + stream = Tds.stream(conn, "SELECT id FROM stream_after_query ORDER BY id", []) + results = Enum.to_list(stream) + + all_rows = + results + |> Enum.flat_map(fn %Tds.Result{rows: rows} -> rows || [] end) + + assert length(all_rows) == 2 + end) + + :ok = query("DROP TABLE stream_after_query", []) + end + + @tag :stream + test "connection works after stream error", context do + pid = context[:pid] + + :ok = + query( + "IF OBJECT_ID('uniques_stream', 'U') IS NOT NULL DROP TABLE uniques_stream; CREATE TABLE uniques_stream (id INT PRIMARY KEY, CONSTRAINT UIX_uniques_stream_id UNIQUE(id));", + [] + ) + + Tds.transaction(pid, fn conn -> + stream = Tds.stream(conn, "INSERT INTO uniques_stream (id) VALUES (1), (1)", []) + + assert_raise Tds.Error, fn -> + Enum.to_list(stream) + end + end) + + assert [[42]] = query("SELECT 42", []) + + :ok = query("DROP TABLE uniques_stream", []) + end + + @tag :stream + test "prepare, stream and close", context do + pid = context[:pid] + + Tds.transaction(pid, fn conn -> + {:ok, query} = Tds.prepare(conn, "SELECT 42") + stream = DBConnection.stream(conn, query, []) + results = Enum.to_list(stream) + + data_results = Enum.filter(results, fn %Tds.Result{num_rows: n} -> n > 0 end) + assert [%Tds.Result{rows: [[42, 1]]}] = data_results + + stream = DBConnection.stream(conn, query, []) + results2 = Enum.to_list(stream) + data_results2 = Enum.filter(results2, fn %Tds.Result{num_rows: n} -> n > 0 end) + assert [%Tds.Result{rows: [[42, 1]]}] = data_results2 + end) + end +end From 8817c2bb6a2ba1164165a7e8d9c22a9920abb172 Mon Sep 17 00:00:00 2001 From: Claudio Alvarado Date: Mon, 18 May 2026 22:42:17 -0400 Subject: [PATCH 2/3] add streaming processing test add documentation --- README.md | 94 +++++++++++++++++++++++++++++++++ test/stream_test.exs | 121 +++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 215 insertions(+) diff --git a/README.md b/README.md index 465d5c9..c113f88 100644 --- a/README.md +++ b/README.md @@ -54,6 +54,100 @@ iex> Tds.query!(pid, "INSERT INTO MyTable (MyColumn) VALUES (@my_value)", * Automatic decoding and encoding of Elixir values to and from MSSQL's binary format * Support of TDS Versions 7.3, 7.4 +* Streaming/cursor support via `Tds.stream/4` and `DBConnection.stream/4` + +## Streaming + +Tds supports streaming large result sets using SQL Server server-side cursors. This allows you to process rows in batches without loading the entire result set into memory. Streaming requires an active transaction since SQL Server cursors must operate within a transaction context. + +### Basic Usage + +```elixir +Tds.transaction(pid, fn conn -> + stream = Tds.stream(conn, "SELECT id, name FROM users ORDER BY id", []) + results = Enum.to_list(stream) + + # Process each batch + for %Tds.Result{rows: rows} <- results do + for row <- rows do + IO.inspect(row) + end + end +end) +``` + +### Chunked Streaming with `max_rows` + +Control the number of rows fetched per batch with the `:max_rows` option (defaults to 500): + +```elixir +Tds.transaction(pid, fn conn -> + stream = Tds.stream(conn, "SELECT id, name FROM users ORDER BY id", [], max_rows: 100) + results = Enum.to_list(stream) +end) +``` + +### Parameterized Queries + +Streaming works with parameterized queries. When parameters are provided, Tds automatically uses `PARAMETERIZED_STMT` mode for the cursor: + +```elixir +Tds.transaction(pid, fn conn -> + stream = Tds.stream( + conn, + "SELECT id, name FROM users WHERE id > @1 ORDER BY id", + [%Tds.Parameter{name: "@1", type: :integer, value: 100}] + ) + results = Enum.to_list(stream) +end) +``` + +### Using `DBConnection.stream/4` Directly + +You can also use the `DBConnection.stream/4` function with a prepared query: + +```elixir +Tds.transaction(pid, fn conn -> + {:ok, query} = Tds.prepare(conn, "SELECT id FROM users ORDER BY id") + stream = DBConnection.stream(conn, query, [], max_rows: 100) + results = Enum.to_list(stream) + # ... +end) +``` + +### Lazy Enumeration with `Stream.map` + +Use `Stream.map` to process rows lazily without loading all batches: + +```elixir +Tds.transaction(pid, fn conn -> + stream = Tds.stream(conn, "SELECT id FROM users ORDER BY id", [], max_rows: 50) + stream + |> Stream.map(fn %Tds.Result{rows: rows} -> rows end) + |> Enum.take(3) # Only processes the first 3 batches +end) +``` + +### Ecto Integration + +When using `Ecto.Adapters.Tds` (from `ecto_sql`), `Repo.stream/2` works out of the box since it delegates to `DBConnection.stream/4`: + +```elixir +Repo.transaction(fn -> + Repo.stream(User, max_rows: 500) + |> Enum.to_list() +end) +``` + +### Technical Details + +Streaming is implemented via the TDS cursor stored procedures: + +- `sp_cursoropen` (ProcID 2) — Opens a FORWARD_ONLY, READ_ONLY cursor +- `sp_cursorfetch` (ProcID 7) — Fetches the next batch of rows (NEXT fetch type) +- `sp_cursorclose` (ProcID 9) — Closes and deallocates the cursor + +For parameterized queries, the cursor uses `PARAMETERIZED_STMT` mode (scrollopt flag `0x1004`) which allows passing query parameters via `@params` (similar to `sp_executesql`). ## Configuration diff --git a/test/stream_test.exs b/test/stream_test.exs index 15c2e50..96be217 100644 --- a/test/stream_test.exs +++ b/test/stream_test.exs @@ -269,4 +269,125 @@ defmodule Tds.StreamTest do assert [%Tds.Result{rows: [[42, 1]]}] = data_results2 end) end + + @tag :stream + test "stream processes rows lazily in batches", context do + pid = context[:pid] + + :ok = + query( + "IF OBJECT_ID('stream_lazy', 'U') IS NOT NULL DROP TABLE stream_lazy; CREATE TABLE stream_lazy (id INT PRIMARY KEY, val INT);", + [] + ) + + values = Enum.map_join(1..20, ", ", fn i -> "(#{i}, #{i * 100})" end) + :ok = query("INSERT INTO stream_lazy (id, val) VALUES #{values};", []) + + Tds.transaction(pid, fn conn -> + stream = + Tds.stream(conn, "SELECT id, val FROM stream_lazy ORDER BY id", [], max_rows: 4) + + batches = + stream + |> Stream.map(fn %Tds.Result{rows: rows, num_rows: n} -> {rows, n} end) + |> Enum.to_list() + + data_batches = Enum.filter(batches, fn {_, n} -> n > 0 end) + + assert length(data_batches) == 5 + + for {rows, n} <- data_batches do + assert n == 4 + assert length(rows) == 4 + end + + all_ids = + data_batches + |> Enum.flat_map(fn {rows, _} -> Enum.map(rows, fn [id, _val, _rowstat] -> id end) end) + + assert all_ids == Enum.to_list(1..20) + end) + + :ok = query("DROP TABLE stream_lazy", []) + end + + @tag :stream + test "stream processes rows with side effects per batch", context do + pid = context[:pid] + + :ok = + query( + "IF OBJECT_ID('stream_side_effects', 'U') IS NOT NULL DROP TABLE stream_side_effects; CREATE TABLE stream_side_effects (id INT PRIMARY KEY);", + [] + ) + + :ok = query("INSERT INTO stream_side_effects (id) VALUES (1), (2), (3), (4), (5), (6);", []) + + Tds.transaction(pid, fn conn -> + stream = + Tds.stream(conn, "SELECT id FROM stream_side_effects ORDER BY id", [], max_rows: 2) + + {:ok, agent} = Agent.start_link(fn -> [] end) + + stream + |> Stream.each(fn %Tds.Result{rows: rows, num_rows: n} -> + if n > 0 do + ids = Enum.map(rows, fn [id, _rowstat] -> id end) + Agent.update(agent, fn acc -> acc ++ ids end) + end + end) + |> Stream.run() + + collected_ids = Agent.get(agent, & &1) + Agent.stop(agent) + + assert collected_ids == [1, 2, 3, 4, 5, 6] + end) + + :ok = query("DROP TABLE stream_side_effects", []) + end + + @tag :stream + test "stream chunks arrive progressively and can be transformed", context do + pid = context[:pid] + + :ok = + query( + "IF OBJECT_ID('stream_transform', 'U') IS NOT NULL DROP TABLE stream_transform; CREATE TABLE stream_transform (id INT PRIMARY KEY, name NVARCHAR(50));", + [] + ) + + :ok = + query( + "INSERT INTO stream_transform (id, name) VALUES (1, N'alice'), (2, N'bob'), (3, N'carol'), (4, N'dave'), (5, N'eve'), (6, N'frank');", + [] + ) + + Tds.transaction(pid, fn conn -> + stream = + Tds.stream(conn, "SELECT id, name FROM stream_transform ORDER BY id", [], max_rows: 2) + + names = + stream + |> Stream.flat_map(fn %Tds.Result{rows: rows, num_rows: n} -> + if n > 0 do + Enum.map(rows, fn [id, name, _rowstat] -> {id, name} end) + else + [] + end + end) + |> Enum.to_list() + + assert names == [ + {1, "alice"}, + {2, "bob"}, + {3, "carol"}, + {4, "dave"}, + {5, "eve"}, + {6, "frank"} + ] + end) + + :ok = query("DROP TABLE stream_transform", []) + end end From 2bf8b664e8d60a728d211aedd2c467975fe72211 Mon Sep 17 00:00:00 2001 From: Claudio Alvarado Date: Mon, 18 May 2026 23:28:47 -0400 Subject: [PATCH 3/3] fix(ci): resolve signed-by key conflict in mssql-tools install GitHub Actions runners have a pre-existing /etc/apt/sources.list.d/microsoft-prod.list that references signed-by=/usr/share/keyrings/microsoft-prod.gpg. Adding the same repo without signed-by causes a conflicting values error. Fix by removing the pre-existing source list, using gpg --dearmor, and adding signed-by consistently. --- .github/workflows/elixir.yml | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/.github/workflows/elixir.yml b/.github/workflows/elixir.yml index 60b1512..37b2d46 100644 --- a/.github/workflows/elixir.yml +++ b/.github/workflows/elixir.yml @@ -51,10 +51,11 @@ jobs: steps: - name: Install MSSql Client Tools run: | - curl https://packages.microsoft.com/keys/microsoft.asc | sudo tee /etc/apt/trusted.gpg.d/microsoft.asc - curl https://packages.microsoft.com/config/ubuntu/22.04/prod.list | sudo tee /etc/apt/sources.list.d/mssql-release.list + sudo rm -f /etc/apt/sources.list.d/microsoft-prod.list + curl https://packages.microsoft.com/keys/microsoft.asc | sudo gpg --batch --yes --dearmor -o /usr/share/keyrings/microsoft-prod.gpg + echo "deb [signed-by=/usr/share/keyrings/microsoft-prod.gpg arch=amd64,arm64,armhf] https://packages.microsoft.com/ubuntu/22.04/prod jammy main" | sudo tee /etc/apt/sources.list.d/mssql-release.list sudo apt-get update - sudo apt-get install mssql-tools18 unixodbc-dev + sudo ACCEPT_EULA=Y apt-get install -y mssql-tools18 unixodbc-dev - uses: actions/checkout@v2