Skip to content

Commit 29e8bb9

Browse files
rustyconoverclaude
andcommitted
Add table function cardinality support and empty batch handling
- Table functions now emit an empty batch if process() yields nothing, ensuring clients don't block waiting for data - Client filters out empty batches before yielding to callers - Worker sends cardinality info in bind result for TableFunctionGenerator - SequenceFunction now has configurable batch_size parameter (default 1000) - Added numpy for efficient range generation in SequenceFunction - Updated protocol documentation with empty output handling details Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
1 parent 93fceab commit 29e8bb9

9 files changed

Lines changed: 203 additions & 23 deletions

File tree

CLAUDE.md

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -390,14 +390,15 @@ class SequenceFunction(TableFunctionGenerator):
390390
max_workers = 1
391391

392392
count: Annotated[int, Arg(0, doc="Number of integers")]
393+
batch_size: Annotated[int, Arg(1, default=1000, doc="Batch size for output")]
393394

394395
@property
395396
def output_schema(self) -> pa.Schema:
396397
return pa.schema([("n", pa.int64())])
397398

398399
def process(self):
399-
for start in range(0, self.count, 1000):
400-
end = min(start + 1000, self.count)
400+
for start in range(0, self.count, self.batch_size):
401+
end = min(start + self.batch_size, self.count)
401402
yield Output(pa.RecordBatch.from_pydict(
402403
{"n": list(range(start, end))}, schema=self.output_schema
403404
))

docs/protocol.md

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,7 @@ During data transfer, each direction uses a **single long-lived IPC stream** con
9292
| 3 | Client → Worker | Handshake | InitInput | 1 |
9393
| 4 | Worker → Client | Handshake | InitResult | 1 |
9494
| 5 | Client → Worker | Data | Input batches | 0..N |
95-
| 6 | Worker → Client | Data | Output batches with status | 1..N |
95+
| 6 | Worker → Client | Data | Output batches with status | 1..N (always ≥1) |
9696

9797
**Notes:**
9898
- Stream 5 is absent for Table functions (no input)
@@ -182,6 +182,11 @@ Client Worker
182182
└───────────────────────────────────────┘
183183
```
184184

185+
**Empty output handling:**
186+
- If `process()` yields no batches, the worker emits an empty batch with `FINISHED` status
187+
- This ensures the client receives a completion signal and doesn't block waiting for data
188+
- The client filters out empty batches before yielding to callers (protocol detail not exposed to users)
189+
185190
### Table-In-Out Function Protocol
186191

187192
Table-in-out functions transform input batches with an optional finalize phase.
@@ -451,12 +456,14 @@ Workers support multiple invocations within a single process. After completing o
451456
- Schema validation before sending to client
452457
- Row count validation for scalar functions
453458
- Automatic error wrapping with stack traces
459+
- Table functions always emit at least one batch (empty if no output) to signal completion
454460

455461
**Client responsibilities:**
456462
- Spawn worker subprocess with correct command
457463
- Send Invocation as first message
458464
- Handle all status values correctly
459465
- Close stdin (not just IPC stream) to signal worker shutdown
466+
- Filter out empty batches from table functions before yielding to callers
460467

461468
**Worker responsibilities:**
462469
- Parse Invocation and lookup function

pyproject.toml

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,13 @@ version = "0.1.0"
44
description = "Vector Gateway Interface - Connect DuckDB to external programs via Apache Arrow"
55
readme = "README.md"
66
requires-python = ">=3.12.4"
7-
dependencies = ["click", "pyarrow", "structlog", "platformdirs"]
7+
dependencies = [
8+
"click",
9+
"pyarrow",
10+
"structlog",
11+
"platformdirs",
12+
"numpy>=2.4.1",
13+
]
814

915
[project.optional-dependencies]
1016
dev = ["mypy", "pyarrow-stubs", "pytest", "pytest-cov", "pytest-examples", "pytest-mypy", "pytest-ruff", "pytest-xdist", "ruff"]

tests/table/generator/test_sequence_function.py

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66

77
from tests.conftest import make_invocation
88
from vgi.arguments import Arguments
9+
from vgi.client import Client
910
from vgi.examples.table import SequenceFunction
1011
from vgi.testing import assert_table_function_output, batch
1112

@@ -79,3 +80,58 @@ def test_large_sequence_batches(
7980
table = pa.Table.from_batches(outputs)
8081
assert table.num_rows == 2500
8182
assert table.column("n").to_pylist() == list(range(2500))
83+
84+
def test_custom_batch_size(self, run_table_function_mode: RunnerWithMode) -> None:
85+
"""Custom batch size should control output batch sizes."""
86+
runner, mode = run_table_function_mode
87+
# Generate 250 values with batch size of 100
88+
outputs, logs = runner(SequenceFunction, (250, 100))
89+
90+
# Should produce 3 batches: 100, 100, 50
91+
assert len(outputs) == 3
92+
assert outputs[0].num_rows == 100
93+
assert outputs[1].num_rows == 100
94+
assert outputs[2].num_rows == 50
95+
96+
table = pa.Table.from_batches(outputs)
97+
assert table.column("n").to_pylist() == list(range(250))
98+
99+
def test_batch_size_larger_than_count(
100+
self, run_table_function_mode: RunnerWithMode
101+
) -> None:
102+
"""Batch size larger than count should produce single batch."""
103+
runner, mode = run_table_function_mode
104+
outputs, logs = runner(SequenceFunction, (50, 1000))
105+
106+
assert len(outputs) == 1
107+
assert outputs[0].num_rows == 50
108+
assert outputs[0].column("n").to_pylist() == list(range(50))
109+
110+
111+
class TestSequenceFunctionClient:
112+
"""Tests for SequenceFunction via Client (wire protocol)."""
113+
114+
def test_cardinality_returned_in_bind_result(self) -> None:
115+
"""Cardinality should be returned in bind_result via Client."""
116+
bind_results: list[pa.RecordBatch] = []
117+
118+
def capture_bind_result(result: pa.RecordBatch) -> None:
119+
bind_results.append(result)
120+
121+
with Client("vgi-example-worker") as client:
122+
list(
123+
client.table_function(
124+
function_name="sequence",
125+
arguments=Arguments(positional=(pa.scalar(100),)),
126+
bind_result_callback=capture_bind_result,
127+
)
128+
)
129+
130+
assert len(bind_results) == 1
131+
bind_result = bind_results[0]
132+
133+
# Verify cardinality fields are present and correct
134+
assert "cardinality_estimated" in bind_result.schema.names
135+
assert "cardinality_max" in bind_result.schema.names
136+
assert bind_result.column("cardinality_estimated")[0].as_py() == 100
137+
assert bind_result.column("cardinality_max")[0].as_py() == 100

0 commit comments

Comments
 (0)