|
33 | 33 | disjoint integer range. Verifies the wire path; DuckDB falls back to |
34 | 34 | ``HASH_GROUP_BY`` for GROUP BY queries against it. |
35 | 35 |
|
| 36 | +* :class:`OverlappingRangePartitionedFunction` — declares |
| 37 | + ``OVERLAPPING_PARTITIONS`` (the only fixture that does). Consecutive |
| 38 | + chunks share ``key`` values. Wire-level only; DuckDB falls back to |
| 39 | + ``HASH_GROUP_BY``. |
| 40 | +
|
36 | 41 | All fixtures use the in-memory state pattern (no work-queue / no |
37 | 42 | stream_state) — they're simpler than the v1 partitioned_batch_index |
38 | 43 | since the v2 plan is about correctness of the partition contract, |
@@ -470,3 +475,99 @@ def process( |
470 | 475 | ) |
471 | 476 | out.emit(batch) |
472 | 477 | state.current_idx = rpp |
| 478 | + |
| 479 | + |
| 480 | +# ============================================================================= |
| 481 | +# OVERLAPPING_PARTITIONS — wire-level only |
| 482 | +# ============================================================================= |
| 483 | + |
| 484 | + |
| 485 | +@dataclass(slots=True, frozen=True) |
| 486 | +class _OverlappingArgs: |
| 487 | + """Arguments for ``overlapping_range_partitioned``.""" |
| 488 | + |
| 489 | + partitions: Annotated[int, Arg(0, doc="Number of overlapping partitions", ge=1)] |
| 490 | + rows_per_partition: Annotated[int, Arg("rows_per_partition", default=10, doc="Rows per partition", ge=1)] |
| 491 | + |
| 492 | + |
| 493 | +@dataclass(kw_only=True) |
| 494 | +class _OverlappingState(ArrowSerializableDataclass): |
| 495 | + current_partition_idx: int = -1 |
| 496 | + current_idx: int = 0 |
| 497 | + started: bool = False |
| 498 | + |
| 499 | + |
| 500 | +@bind_fixed_schema |
| 501 | +@_cardinality_from_count |
| 502 | +class OverlappingRangePartitionedFunction(TableFunctionGenerator[_OverlappingArgs, _OverlappingState]): |
| 503 | + """Per-chunk *overlapping* integer ranges on ``key``. |
| 504 | +
|
| 505 | + Each chunk N emits ``key`` values in ``[N*500, N*500 + rows)``. With the |
| 506 | + default ``rows_per_partition`` of 10 the ranges are disjoint, but callers |
| 507 | + pass ``rows_per_partition > 500`` to make consecutive chunks overlap on |
| 508 | + ``key`` — distinguishing this from |
| 509 | + :class:`DisjointRangePartitionedFunction`. |
| 510 | +
|
| 511 | + Declares ``OVERLAPPING_PARTITIONS``. Like DISJOINT, DuckDB has no consumer |
| 512 | + for OVERLAPPING today, so GROUP BY queries fall back to ``HASH_GROUP_BY``; |
| 513 | + the value's purpose here is to keep the wire path (declaration, per-batch |
| 514 | + min/max metadata, C++ extraction → ``get_partition_info``) exercised so |
| 515 | + other-language workers must specify it. This is the only fixture that emits |
| 516 | + ``OVERLAPPING_PARTITIONS``. |
| 517 | + """ |
| 518 | + |
| 519 | + FIXED_SCHEMA: ClassVar[pa.Schema] = pa.schema( |
| 520 | + [ |
| 521 | + partition_field("key", pa.int64()), |
| 522 | + pa.field("value", pa.int64()), |
| 523 | + ] |
| 524 | + ) |
| 525 | + |
| 526 | + class Meta: |
| 527 | + name = "overlapping_range_partitioned" |
| 528 | + description = ( |
| 529 | + "Overlapping per-chunk integer ranges on ``key``. Declares " |
| 530 | + "OVERLAPPING_PARTITIONS (wire-level only; DuckDB falls back to " |
| 531 | + "HASH_GROUP_BY for now)." |
| 532 | + ) |
| 533 | + categories = ["generator", "partitioning", "testing"] |
| 534 | + partition_kind = PartitionKind.OVERLAPPING_PARTITIONS |
| 535 | + |
| 536 | + @classmethod |
| 537 | + def on_init(cls, params: InitParams[_OverlappingArgs]) -> GlobalInitResponse: |
| 538 | + items = [struct.pack(_QUEUE_ITEM_FMT, i) for i in range(params.args.partitions)] |
| 539 | + params.storage.queue_push(items) |
| 540 | + return GlobalInitResponse() |
| 541 | + |
| 542 | + @classmethod |
| 543 | + def initial_state(cls, params: ProcessParams[_OverlappingArgs]) -> _OverlappingState: |
| 544 | + return _OverlappingState() |
| 545 | + |
| 546 | + @classmethod |
| 547 | + def process( |
| 548 | + cls, |
| 549 | + params: ProcessParams[_OverlappingArgs], |
| 550 | + state: _OverlappingState, |
| 551 | + out: OutputCollector, |
| 552 | + ) -> None: |
| 553 | + if not state.started or state.current_idx >= params.args.rows_per_partition: |
| 554 | + item = params.storage.queue_pop() |
| 555 | + if item is None: |
| 556 | + out.finish() |
| 557 | + return |
| 558 | + (state.current_partition_idx,) = struct.unpack(_QUEUE_ITEM_FMT, item) |
| 559 | + state.current_idx = 0 |
| 560 | + state.started = True |
| 561 | + |
| 562 | + rpp = params.args.rows_per_partition |
| 563 | + # Stride of 500 (< rpp when callers want overlap) makes consecutive |
| 564 | + # chunks share key values. |
| 565 | + base = state.current_partition_idx * 500 |
| 566 | + keys = [base + i for i in range(rpp)] |
| 567 | + values = [state.current_partition_idx * 10 + i for i in range(rpp)] |
| 568 | + batch = pa.RecordBatch.from_pydict( |
| 569 | + {"key": keys, "value": values}, |
| 570 | + schema=cls.FIXED_SCHEMA, |
| 571 | + ) |
| 572 | + out.emit(batch) |
| 573 | + state.current_idx = rpp |
0 commit comments