[python] Add streaming reads to paimon CLI (table stream command)#7456
[python] Add streaming reads to paimon CLI (table stream command)#7456tub wants to merge 5 commits into
table stream command)#7456Conversation
JingsongLi
left a comment
There was a problem hiding this comment.
Nice CLI addition for pypaimon. The table stream command provides a user-friendly way to tail table changes.
Review:
-
Feature set:
--from,--select,--where,--format,--poll-interval-ms,--include-row-kind,--consumer-id— comprehensive flag set covering the common streaming read scenarios. -
Library API addition:
StreamReadBuilder.with_scan_from()is useful beyond CLI. Good separation between library and CLI layers. -
+859 additions is substantial. The split between CLI handler, library API, and tests is clean.
-
Consumer-id for at-least-once resume: Persisting scan progress enables resumable tailing. Important for operational use.
-
Timestamp resolution at CLI layer: Making the library API accept snapshot IDs while the CLI resolves timestamps is the right layering. Pushing timestamp resolution down to the library is optional.
-
Test coverage:
cli_table_stream_test.py,stream_read_builder_test.py,streaming_table_scan_test.py— three test files covering different layers. Good. -
Poll interval: Default 1000ms is reasonable. Consider documenting that very low intervals (< 100ms) may cause unnecessary load on the metadata path.
LGTM. Well-designed feature.
|
Please make this not a draft. |
leaves12138
left a comment
There was a problem hiding this comment.
Thanks for the update. I reviewed the latest version again.
The CLI layering looks reasonable now: table stream keeps the human-friendly parsing in the command layer, resolves timestamp-based --from values to snapshot IDs, and passes a normalized scan position down to StreamReadBuilder. The streaming scan behavior also looks consistent to me: consumer restore takes precedence over scan_from, earliest emits an initial full-scan plan from the earliest snapshot, and numeric snapshot IDs rely on the follow-up scan path.
The new tests cover the main CLI paths, output formats, --from parsing, builder propagation, and the streaming scan startup semantics. I do not see a blocking code issue from this pass.
One small follow-up you may consider (not blocking from my side): since StreamReadBuilder.with_scan_from() is a public programmatic API and its docstring restricts values to "latest", "earliest", or a positive integer snapshot ID, it would be a bit nicer to fail fast there for invalid strings or non-positive integers instead of letting the scan loop fail later. The CLI already normalizes its inputs, so this is mostly API hardening.
I noticed the latest Python lint jobs are still in progress, so I am leaving this as a review comment rather than approval for now. If those checks turn green, this looks good to merge from my side.
…reamReadBuilder - Add `paimon table stream <db.table>` CLI subcommand that continuously polls a table and prints new rows as they arrive until Ctrl+C - Flags: --select, --where, --format (table|json), --from, --poll-interval-ms, --include-row-kind, --consumer-id - --from accepts 'latest' (default), 'earliest', a numeric snapshot ID, or a timestamp string (YYYY-MM-DD, ISO 8601 with/without timezone) - Add StreamReadBuilder.with_scan_from() accepting 'latest', 'earliest', or an integer snapshot ID; passed through to AsyncStreamingTableScan with consumer restore taking highest priority over scan_from - Tests: 7 new unit tests in stream_read_builder_test.py, 4 integration tests in streaming_table_scan_test.py, 20 tests in cli_table_stream_test.py Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…check in table stream CLI Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…pport named timezones Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
… to fix flake8 Co-Authored-By: Claude <noreply@anthropic.com>
…patching module-level SnapshotManager Upstream 4aec277 removed the SnapshotManager import from streaming_table_scan.py in favor of table.snapshot_manager(). Update the ScanFromTest tests to mock via table.snapshot_manager.return_value instead of @patch at the module level. Co-Authored-By: Claude <noreply@anthropic.com>
d4b57c1 to
a37049f
Compare
|
Rebased & fixed up the tests, sorry for the delay, was out of the office for a while. |
Summary
paimon table stream <db.table>CLI subcommand that continuously polls a Paimon table and prints new rows as they arrive until Ctrl+CStreamReadBuilder.with_scan_from()to the library so programmatic users can also control starting position ("latest","earliest", or a snapshot ID integer)--fromis resolved to a snapshot ID at the CLI layer (no timestamps in the library API)Flags
--fromlatestlatest,earliest, snapshot ID, or timestamp (YYYY-MM-DD, ISO 8601)--select--where--formattabletableorjson--poll-interval-ms1000--include-row-kind_row_kindcolumn (+I,-U,+U,-D)--consumer-idChanges
paimon-python/pypaimon/cli/cli_table_stream.py— new command handler +parse_from_position()timestamp resolverpaimon-python/pypaimon/cli/cli_table.py— registerstreamsubparserpaimon-python/pypaimon/read/stream_read_builder.py—with_scan_from()paimon-python/pypaimon/read/streaming_table_scan.py—scan_fromstartup resolution (consumer restore always wins)Test plan
pypaimon/tests/stream_read_builder_test.py— 7 new unit tests forwith_scan_from()pypaimon/tests/streaming_table_scan_test.py— 4 new integration tests (earliest, numeric ID,latest, consumer-overrides-scan_from)pypaimon/tests/cli_table_stream_test.py— 20 new tests (CLI integration +parse_from_positionunit tests)🤖 Generated with Claude Code