Skip to content

Commit 58c2737

Browse files
rustyconoverclaude
andcommitted
buffering: make source producers HTTP-state-serializable
Buffering source/finalize producers held a live BoundStorage (SQLite-backed) view in their StreamState. Over HTTP that state is serialized into a continuation token between /init and /exchange, and a DB connection can't be serialized (infinite recursion). Keep the storage view transient and re-acquire it from (executionId, attachId) on resume: - new BufferingStorageHolder: process-wide handle to the worker's FunctionStorage backend, registered by VgiServiceImpl; bind(executionId, attachId) re-creates a BoundStorage. - BufferingFinalizeProducer: storage view is transient; executionId/attachId/ finalizeStateId are serialized; storage() lazily re-binds after a round-trip; adds a no-arg ctor. - thread attachId through TableBufferingFinalizeParams. - the 7 finalize-producer subclasses: no-arg ctor, storage() accessor, non-final cursor fields so the reflection-based serializer can repopulate them. No behaviour change on the in-memory transports (storage view is set at init and storage() returns it). Brings the over-http suite from 30 -> 7 table-function failures (with the framework state-serializer fixes). Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
1 parent 377b3ef commit 58c2737

11 files changed

Lines changed: 106 additions & 14 deletions

vgi-example-worker/src/main/java/farm/query/vgi/example/accumulate/AccumulateFunction.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -166,12 +166,14 @@ public TableProducerState createFinalizeProducer(TableBufferingFinalizeParams pa
166166
private static final class OutDrainProducer extends BufferingFinalizeProducer {
167167
private long afterId = 0;
168168

169+
private OutDrainProducer() {}
170+
169171
OutDrainProducer(TableBufferingFinalizeParams params) {
170172
super(params);
171173
}
172174

173175
@Override public void produceTick(OutputCollector out, CallContext ctx) {
174-
List<FunctionStorage.LogEntry> rows = storage.stateLogScan(NS_OUT, KEY, afterId, 1);
176+
List<FunctionStorage.LogEntry> rows = storage().stateLogScan(NS_OUT, KEY, afterId, 1);
175177
if (rows.isEmpty()) {
176178
out.finish();
177179
return;

vgi-example-worker/src/main/java/farm/query/vgi/example/buffering/AbstractBufferAndDrain.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,16 +58,18 @@ public abstract class AbstractBufferAndDrain implements TableBufferingFunction {
5858

5959
/** Drains a {@code (ns, "")} state-log, one buffered batch per tick. */
6060
static final class LogDrainProducer extends BufferingFinalizeProducer {
61-
private final byte[] ns;
61+
private byte[] ns;
6262
private long afterId = -1;
6363

64+
LogDrainProducer() {}
65+
6466
LogDrainProducer(TableBufferingFinalizeParams params, byte[] ns) {
6567
super(params);
6668
this.ns = ns;
6769
}
6870

6971
@Override public void produceTick(OutputCollector out, CallContext ctx) {
70-
List<FunctionStorage.LogEntry> rows = storage.stateLogScan(ns, KEY, afterId, 1);
72+
List<FunctionStorage.LogEntry> rows = storage().stateLogScan(ns, KEY, afterId, 1);
7173
if (rows.isEmpty()) {
7274
out.finish();
7375
return;

vgi-example-worker/src/main/java/farm/query/vgi/example/buffering/BufferEmitWideFunction.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,9 +67,11 @@ public final class BufferEmitWideFunction implements TableBufferingFunction {
6767
}
6868

6969
private static final class WideProducer extends BufferingFinalizeProducer {
70-
private final long rows;
70+
private long rows;
7171
private boolean emitted = false;
7272

73+
private WideProducer() {}
74+
7375
WideProducer(TableBufferingFinalizeParams params, long rows) {
7476
super(params);
7577
this.rows = rows;

vgi-example-worker/src/main/java/farm/query/vgi/example/buffering/LargeStateFunction.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,12 +55,14 @@ public final class LargeStateFunction extends AbstractBufferAndDrain {
5555
private static final class TotalSizeProducer extends BufferingFinalizeProducer {
5656
private boolean emitted = false;
5757

58+
private TotalSizeProducer() {}
59+
5860
TotalSizeProducer(TableBufferingFinalizeParams params) { super(params); }
5961

6062
@Override public void produceTick(OutputCollector out, CallContext ctx) {
6163
if (emitted) { out.finish(); return; }
6264
long total = 0;
63-
for (FunctionStorage.LogEntry e : storage.stateLogScan(NS_LARGE, KEY, -1, Integer.MAX_VALUE)) {
65+
for (FunctionStorage.LogEntry e : storage().stateLogScan(NS_LARGE, KEY, -1, Integer.MAX_VALUE)) {
6466
total += e.value().length;
6567
}
6668
List<FieldVector> vectors = new ArrayList<>();

vgi-example-worker/src/main/java/farm/query/vgi/example/buffering/OrderedSourceFunction.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,9 +70,11 @@ public final class OrderedSourceFunction implements TableBufferingFunction {
7070

7171
/** Emits exactly one row carrying the integer decoded from finalize_state_id. */
7272
private static final class OneShotProducer extends BufferingFinalizeProducer {
73-
private final long value;
73+
private long value;
7474
private boolean emitted = false;
7575

76+
private OneShotProducer() {}
77+
7678
OneShotProducer(TableBufferingFinalizeParams params) {
7779
super(params);
7880
this.value = new BigInteger(1, params.finalizeStateId()).longValue();

vgi-example-worker/src/main/java/farm/query/vgi/example/buffering/SlowCancellableBufferingFunction.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,10 +65,12 @@ public final class SlowCancellableBufferingFunction implements TableBufferingFun
6565
}
6666

6767
private static final class SlowProducer extends BufferingFinalizeProducer {
68-
private final long sleepMs;
69-
private final long total;
68+
private long sleepMs;
69+
private long total;
7070
private long emitted = 0;
7171

72+
private SlowProducer() {}
73+
7274
SlowProducer(TableBufferingFinalizeParams params, long sleepMs, long total) {
7375
super(params);
7476
this.sleepMs = sleepMs;

vgi-example-worker/src/main/java/farm/query/vgi/example/buffering/SumAllColumnsBufferingFunction.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,8 @@ public class SumAllColumnsBufferingFunction extends AbstractBufferAndDrain {
103103
private static final class SumProducer extends BufferingFinalizeProducer {
104104
private boolean emitted = false;
105105

106+
private SumProducer() {}
107+
106108
SumProducer(TableBufferingFinalizeParams params) { super(params); }
107109

108110
@Override public void produceTick(OutputCollector out, CallContext ctx) {
@@ -117,7 +119,7 @@ private static final class SumProducer extends BufferingFinalizeProducer {
117119
if (dst instanceof BigIntVector bi) bi.setSafe(0, 0L);
118120
else if (dst instanceof Float8Vector fl) fl.setSafe(0, 0.0);
119121
}
120-
for (FunctionStorage.LogEntry e : storage.stateLogScan(NS_RAW, KEY, -1, Integer.MAX_VALUE)) {
122+
for (FunctionStorage.LogEntry e : storage().stateLogScan(NS_RAW, KEY, -1, Integer.MAX_VALUE)) {
121123
try (VectorSchemaRoot src = BatchUtil.readSingleBatch(e.value(), Allocators.root())) {
122124
int rows = src.getRowCount();
123125
for (Field f : outputSchema.getFields()) {

vgi/src/main/java/farm/query/vgi/buffering/BufferingFinalizeProducer.java

Lines changed: 28 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,10 +26,20 @@
2626
*/
2727
public abstract class BufferingFinalizeProducer extends TableProducerState {
2828

29-
/** Read view over the batches buffered during the Sink phase. */
30-
protected final BoundStorage storage;
29+
/** Read view over the batches buffered during the Sink phase. Transient: a
30+
* live storage view (SQLite connection) can't be serialized into an HTTP
31+
* state token, so it's re-acquired from {@link #executionId}/{@link #attachId}
32+
* via {@link #storage()} when this producer is resumed on /exchange. */
33+
protected transient BoundStorage storageView;
34+
/** Execution id the storage is scoped to (survives the state token). */
35+
protected byte[] executionId;
36+
/** Attach plaintext for per-attach shard routing (survives the state token). */
37+
protected byte[] attachId;
3138
/** State id selecting which combined partition this producer drains. */
32-
protected final byte[] finalizeStateId;
39+
protected byte[] finalizeStateId;
40+
41+
/** No-arg constructor for HTTP state-token deserialization. */
42+
protected BufferingFinalizeProducer() {}
3343

3444
/**
3545
* Captures the storage view + {@code finalize_state_id} and forwards the
@@ -39,10 +49,24 @@ public abstract class BufferingFinalizeProducer extends TableProducerState {
3949
*/
4050
protected BufferingFinalizeProducer(TableBufferingFinalizeParams params) {
4151
super(params.initParams());
42-
this.storage = params.storage();
52+
this.storageView = params.storage();
53+
this.executionId = params.executionId();
54+
this.attachId = params.attachId();
4355
this.finalizeStateId = params.finalizeStateId();
4456
}
4557

58+
/**
59+
* The storage view, re-acquired from {@code (executionId, attachId)} if this
60+
* producer was resumed from an HTTP state token (the transient view is null
61+
* after deserialization).
62+
*
63+
* @return the per-execution storage view
64+
*/
65+
protected BoundStorage storage() {
66+
if (storageView == null) storageView = BufferingStorageHolder.bind(executionId, attachId);
67+
return storageView;
68+
}
69+
4670
/**
4771
* Narrows {@code full} to {@link #outputSchema} (by field name), applies
4872
* pushdown filters, and emits the result. The emitted root's vectors are
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
// Copyright 2026 Query Farm LLC - https://query.farm
2+
3+
package farm.query.vgi.buffering;
4+
5+
import farm.query.vgi.storage.BoundStorage;
6+
import farm.query.vgi.storage.FunctionStorage;
7+
8+
/**
9+
* Process-wide handle to the worker's {@link FunctionStorage} backend, so a
10+
* buffering source producer can re-bind its {@link BoundStorage} after an HTTP
11+
* state-token round-trip. The HTTP transport is stateless: it serializes a
12+
* producer's state between {@code /init} and {@code /exchange}, and a live
13+
* storage view (a SQLite connection) can't be serialized. So buffering source
14+
* producers keep their {@code storage} view {@code transient} and re-acquire it
15+
* here from the {@code (executionId, attachId)} they did serialize.
16+
*
17+
* <p>A single worker process serves one storage backend, so a static handle is
18+
* sufficient; {@code VgiServiceImpl} registers it at construction.</p>
19+
*/
20+
public final class BufferingStorageHolder {
21+
22+
private static volatile FunctionStorage backend;
23+
24+
private BufferingStorageHolder() {}
25+
26+
/**
27+
* Register the worker's storage backend. Called once when the service is built.
28+
*
29+
* @param b the worker's {@link FunctionStorage} backend
30+
*/
31+
public static void register(FunctionStorage b) { backend = b; }
32+
33+
/**
34+
* Re-bind a {@link BoundStorage} for {@code executionId} on the registered
35+
* backend (used when resuming a buffering source producer from a state token).
36+
*
37+
* @param executionId the buffering execution id the storage is scoped to
38+
* @param attachId the attach plaintext (for per-attach shard routing), or {@code null}
39+
* @return a storage view bound to {@code executionId}
40+
* @throws IllegalStateException if no backend has been registered
41+
*/
42+
public static BoundStorage bind(byte[] executionId, byte[] attachId) {
43+
FunctionStorage b = backend;
44+
if (b == null) {
45+
throw new IllegalStateException("no buffering storage backend registered");
46+
}
47+
return new BoundStorage(b, executionId, attachId);
48+
}
49+
}

vgi/src/main/java/farm/query/vgi/buffering/TableBufferingFinalizeParams.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,13 @@
1414
*
1515
* @param executionId the opaque {@code execution_id} identifying this buffering execution.
1616
* @param finalizeStateId the opaque {@code finalize_state_id} naming the output stream this producer drains.
17+
* @param attachId the attach plaintext for per-attach shard routing (lets a source producer re-bind storage after an HTTP state-token round-trip).
1718
* @param storage the storage view bound to {@code executionId}, for reading back stashed state.
1819
* @param initParams the table-init parameters carrying the projection-narrowed output schema and pushdown filter.
1920
*/
2021
public record TableBufferingFinalizeParams(
2122
byte[] executionId,
2223
byte[] finalizeStateId,
24+
byte[] attachId,
2325
BoundStorage storage,
2426
TableInitParams initParams) {}

0 commit comments

Comments
 (0)