|
| 1 | +# Filter Pushdown Protocol |
| 2 | + |
| 3 | +Filter pushdown allows VGI **table functions** to receive SQL WHERE clause predicates. Workers can apply filters during data generation, reducing transferred data. |
| 4 | + |
| 5 | +VGI uses a **hybrid JSON + Arrow** format: |
| 6 | +- **JSON** describes filter structure (operators, column references) |
| 7 | +- **Arrow columns** store filter values (preserves exact types) |
| 8 | + |
| 9 | +## Transport |
| 10 | + |
| 11 | +Filters are sent in **Stream 3 (InitInput)** as a binary field containing Arrow IPC bytes: |
| 12 | + |
| 13 | +``` |
| 14 | +InitInput Schema: |
| 15 | +├── projection_ids: list<int32> |
| 16 | +└── filters: binary (nullable) -- Arrow IPC bytes |
| 17 | +``` |
| 18 | + |
| 19 | +Table functions must declare `filter_pushdown: true` in metadata to receive filters. |
| 20 | + |
| 21 | +## Format |
| 22 | + |
| 23 | +The Arrow RecordBatch contains: |
| 24 | + |
| 25 | +| Column | Name | Type | Content | |
| 26 | +|--------|------|------|---------| |
| 27 | +| 0 | `filter_spec` | string | JSON array of filters | |
| 28 | +| 1+ | `_val_0`, `_val_1`, ... | varies | Values referenced by filters | |
| 29 | + |
| 30 | +**Version metadata** on `filter_spec` field: `{"vgi_filter_version": "1"}` |
| 31 | + |
| 32 | +## Example |
| 33 | + |
| 34 | +SQL: `WHERE salary > 50000 AND name = 'Alice'` |
| 35 | + |
| 36 | +**RecordBatch:** |
| 37 | +``` |
| 38 | +filter_spec: "[{...}, {...}]" (string) |
| 39 | +_val_0: 50000 (int64) |
| 40 | +_val_1: "Alice" (string) |
| 41 | +``` |
| 42 | + |
| 43 | +**filter_spec JSON:** |
| 44 | +```json |
| 45 | +[ |
| 46 | + {"column_name": "salary", "column_index": 2, "type": "constant", "op": "gt", "value_ref": 0}, |
| 47 | + {"column_name": "name", "column_index": 0, "type": "constant", "op": "eq", "value_ref": 1} |
| 48 | +] |
| 49 | +``` |
| 50 | + |
| 51 | +The `value_ref` points to value columns: `value_ref: 0` → column `_val_0` (index 1 in batch). |
| 52 | + |
| 53 | +## Filter Types |
| 54 | + |
| 55 | +### constant |
| 56 | +Comparison filter: `col > value` |
| 57 | + |
| 58 | +```json |
| 59 | +{"column_name": "age", "column_index": 1, "type": "constant", "op": "ge", "value_ref": 0} |
| 60 | +``` |
| 61 | + |
| 62 | +**Operators:** `eq` (=), `ne` (!=), `gt` (>), `ge` (>=), `lt` (<), `le` (<=) |
| 63 | + |
| 64 | +### is_null / is_not_null |
| 65 | +NULL check: `col IS NULL` or `col IS NOT NULL` |
| 66 | + |
| 67 | +```json |
| 68 | +{"column_name": "email", "column_index": 3, "type": "is_null"} |
| 69 | +``` |
| 70 | + |
| 71 | +### in |
| 72 | +Set membership: `col IN (v1, v2, v3)` |
| 73 | + |
| 74 | +```json |
| 75 | +{"column_name": "status", "column_index": 4, "type": "in", "value_ref": 0} |
| 76 | +``` |
| 77 | + |
| 78 | +The value column is a list type containing all IN values: `_val_0: ["active", "pending", "review"]` |
| 79 | + |
| 80 | +### and / or |
| 81 | +Conjunction combining multiple filters on same column: |
| 82 | + |
| 83 | +```json |
| 84 | +{"column_name": "age", "column_index": 1, "type": "and", "children": [ |
| 85 | + {"column_name": "age", "column_index": 1, "type": "constant", "op": "ge", "value_ref": 0}, |
| 86 | + {"column_name": "age", "column_index": 1, "type": "constant", "op": "lt", "value_ref": 1} |
| 87 | +]} |
| 88 | +``` |
| 89 | + |
| 90 | +### struct |
| 91 | +Nested field filter: `address.city = 'Seattle'` |
| 92 | + |
| 93 | +```json |
| 94 | +{"column_name": "address", "column_index": 5, "type": "struct", |
| 95 | + "child_index": 1, "child_name": "city", |
| 96 | + "child_filter": {"column_name": "address", "column_index": 5, "type": "constant", "op": "eq", "value_ref": 0}} |
| 97 | +``` |
| 98 | + |
| 99 | +## Unsupported Filter Types |
| 100 | + |
| 101 | +Some DuckDB filter types cannot be serialized for pushdown. When these are encountered, VGI skips filter pushdown entirely and the `filters` field is null: |
| 102 | + |
| 103 | +- **DynamicFilter** - Created by TOP-N queries (`ORDER BY ... LIMIT N`). The filter value mutates during query execution. |
| 104 | +- **BloomFilter** - Created by join optimization. Contains a large binary buffer. |
| 105 | +- **ExpressionFilter** - Created by complex predicates like `UPPER(col) = 'X'`. Contains expression trees that may reference functions unavailable in the worker. |
| 106 | + |
| 107 | +## Deserialization |
| 108 | + |
| 109 | +```python |
| 110 | +import pyarrow as pa |
| 111 | +import json |
| 112 | + |
| 113 | +def deserialize_filters(ipc_bytes: bytes): |
| 114 | + reader = pa.ipc.open_stream(ipc_bytes) |
| 115 | + batch = reader.read_next_batch() |
| 116 | + |
| 117 | + # Check version |
| 118 | + version = batch.schema.field(0).metadata.get(b"vgi_filter_version", b"").decode() |
| 119 | + assert version == "1", f"Unknown filter version: {version}" |
| 120 | + |
| 121 | + # Parse filters |
| 122 | + filters = json.loads(batch.column(0)[0].as_py()) |
| 123 | + |
| 124 | + # Get value by ref: value_ref N → column N+1 |
| 125 | + # Returns Arrow scalar to preserve exact type |
| 126 | + def get_value(ref: int) -> pa.Scalar: |
| 127 | + return batch.column(ref + 1)[0] |
| 128 | + |
| 129 | + return filters, get_value |
| 130 | +``` |
| 131 | + |
| 132 | +## JSON Schema |
| 133 | + |
| 134 | +```json |
| 135 | +{ |
| 136 | + "$schema": "https://json-schema.org/draft/2020-12/schema", |
| 137 | + "$id": "https://vgi-protocol.dev/filter-pushdown/v1", |
| 138 | + "title": "VGI Filter Specification", |
| 139 | + "type": "array", |
| 140 | + "items": {"$ref": "#/$defs/filter"}, |
| 141 | + |
| 142 | + "$defs": { |
| 143 | + "filter": { |
| 144 | + "type": "object", |
| 145 | + "required": ["column_name", "column_index", "type"], |
| 146 | + "properties": { |
| 147 | + "column_name": {"type": "string"}, |
| 148 | + "column_index": {"type": "integer", "minimum": 0}, |
| 149 | + "type": {"enum": ["constant", "is_null", "is_not_null", "in", "and", "or", "struct"]}, |
| 150 | + "op": {"enum": ["eq", "ne", "gt", "ge", "lt", "le"]}, |
| 151 | + "value_ref": {"type": "integer", "minimum": 0}, |
| 152 | + "children": {"type": "array", "items": {"$ref": "#/$defs/filter"}}, |
| 153 | + "child_index": {"type": "integer", "minimum": 0}, |
| 154 | + "child_name": {"type": "string"}, |
| 155 | + "child_filter": {"$ref": "#/$defs/filter"} |
| 156 | + } |
| 157 | + } |
| 158 | + } |
| 159 | +} |
| 160 | +``` |
| 161 | + |
| 162 | +## Worker Implementation Notes |
| 163 | + |
| 164 | +- **Partial application OK**: Apply filters you can handle; DuckDB always re-verifies results |
| 165 | +- **Unsupported filters**: Return all rows for that column, let DuckDB filter locally |
| 166 | +- **Type fidelity**: Values preserve exact Arrow types (decimal, timestamp with timezone, nested types) |
0 commit comments