Skip to content

Commit c5b6624

Browse files
zxqfd555Manul from Pathway
authored andcommitted
improve postgres input connector docs (#9967)
GitOrigin-RevId: f05d3aab2351ed5ed1fd27b65ceeea98e826b51a
1 parent 97e0607 commit c5b6624

1 file changed

Lines changed: 117 additions & 0 deletions

File tree

python/pathway/io/postgres/__init__.py

Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -270,6 +270,123 @@ def read(
270270
replication slot internally: it uses a temporary slot that is automatically dropped
271271
when the session ends, and continuously acknowledges processed LSN positions while
272272
the program is running.
273+
274+
The examples above use an integer primary key, but other primary key types are
275+
supported as well.
276+
277+
Suppose you have a ``products`` table where each row is identified by a string
278+
product code such as ``"SKU-001"``, alongside a ``name`` column and a ``price``
279+
column. The schema in this case is:
280+
281+
>>> class ProductsSchema(pw.Schema):
282+
... sku: str = pw.column_definition(primary_key=True)
283+
... name: str
284+
... price: float
285+
286+
Both ``"static"`` and ``"streaming"`` modes are supported, set up in exactly
287+
the same way as for an integer primary key. For a one-time snapshot:
288+
289+
>>> table = pw.io.postgres.read(
290+
... postgres_settings=connection_string_parts,
291+
... table_name="products",
292+
... schema=ProductsSchema,
293+
... mode="static",
294+
... )
295+
296+
For continuous CDC, create a publication first:
297+
298+
.. code-block:: sql
299+
300+
CREATE PUBLICATION products_pub FOR TABLE products;
301+
302+
Then configure the streaming connector:
303+
304+
>>> table = pw.io.postgres.read(
305+
... postgres_settings=connection_string_parts,
306+
... table_name="products",
307+
... schema=ProductsSchema,
308+
... mode="streaming",
309+
... publication_name="products_pub",
310+
... )
311+
312+
PostgreSQL's ``UUID`` type is also supported. Because Pathway represents UUID
313+
values as strings, the corresponding schema field must be declared as ``str``.
314+
Suppose you have a ``messages`` table whose primary key is a UUID column ``id``,
315+
alongside a string ``body`` column:
316+
317+
>>> class MessagesSchema(pw.Schema):
318+
... id: str = pw.column_definition(primary_key=True)
319+
... body: str
320+
321+
Pathway will read the UUID values as standard hyphenated strings, for example
322+
``"a0eebc99-9c0b-4ef8-bb6d-6bb9bd380a11"``. Both modes are supported. For a
323+
one-time snapshot:
324+
325+
>>> table = pw.io.postgres.read(
326+
... postgres_settings=connection_string_parts,
327+
... table_name="messages",
328+
... schema=MessagesSchema,
329+
... mode="static",
330+
... )
331+
332+
For continuous CDC, create a publication first:
333+
334+
.. code-block:: sql
335+
336+
CREATE PUBLICATION messages_pub FOR TABLE messages;
337+
338+
Then configure the streaming connector:
339+
340+
>>> table = pw.io.postgres.read(
341+
... postgres_settings=connection_string_parts,
342+
... table_name="messages",
343+
... schema=MessagesSchema,
344+
... mode="streaming",
345+
... publication_name="messages_pub",
346+
... )
347+
348+
Tables with composite primary keys — where the primary key spans multiple columns
349+
— are supported as well. To declare a composite primary key in Pathway, mark every
350+
participating column with ``pw.column_definition(primary_key=True)``. Suppose you
351+
have an ``order_items`` table where each row is uniquely identified by the
352+
combination of ``order_id`` and ``product_id``, both integers, alongside a
353+
``quantity`` column:
354+
355+
>>> class OrderItemsSchema(pw.Schema):
356+
... order_id: int = pw.column_definition(primary_key=True)
357+
... product_id: int = pw.column_definition(primary_key=True)
358+
... quantity: int
359+
360+
Both ``order_id`` and ``product_id`` are marked as primary key columns, matching
361+
the ``PRIMARY KEY (order_id, product_id)`` constraint on the PostgreSQL side. In
362+
streaming mode, this is especially important: when an update or delete event arrives
363+
in the WAL, PostgreSQL only exposes the primary key columns of the affected row, so
364+
all primary key columns must be declared as such in the schema.
365+
366+
Both modes are supported. For a one-time snapshot:
367+
368+
>>> table = pw.io.postgres.read(
369+
... postgres_settings=connection_string_parts,
370+
... table_name="order_items",
371+
... schema=OrderItemsSchema,
372+
... mode="static",
373+
... )
374+
375+
For continuous CDC, create a publication first:
376+
377+
.. code-block:: sql
378+
379+
CREATE PUBLICATION order_items_pub FOR TABLE order_items;
380+
381+
Then configure the streaming connector:
382+
383+
>>> table = pw.io.postgres.read(
384+
... postgres_settings=connection_string_parts,
385+
... table_name="order_items",
386+
... schema=OrderItemsSchema,
387+
... mode="streaming",
388+
... publication_name="order_items_pub",
389+
... )
273390
"""
274391
_check_entitlements("postgres-wal-reader")
275392

0 commit comments

Comments
 (0)