From 2ff1dc30bf8624cddd898e4f283c954af1ed5600 Mon Sep 17 00:00:00 2001 From: Colin Nolan Date: Thu, 29 Jan 2026 13:56:51 +0000 Subject: [PATCH 1/7] fix: corrects typo in variable name --- postgresql_watcher/watcher.py | 6 +++--- tests/test_postgresql_watcher.py | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/postgresql_watcher/watcher.py b/postgresql_watcher/watcher.py index 818b38a..7789a35 100644 --- a/postgresql_watcher/watcher.py +++ b/postgresql_watcher/watcher.py @@ -83,7 +83,7 @@ def _create_subscription_process( self._cleanup_connections_and_processes() self.parent_conn, self.child_conn = Pipe() - self.subscription_proces = Process( + self.subscription_process = Process( target=casbin_channel_subscription, args=( self.child_conn, @@ -109,9 +109,9 @@ def start( self, timeout=20, # seconds ): - if not self.subscription_proces.is_alive(): + if not self.subscription_process.is_alive(): # Start listening to messages - self.subscription_proces.start() + self.subscription_process.start() # And wait for the Process to be ready to listen for updates # from PostgreSQL timeout_time = time() + timeout diff --git a/tests/test_postgresql_watcher.py b/tests/test_postgresql_watcher.py index d3f9d70..b4c7c3c 100644 --- a/tests/test_postgresql_watcher.py +++ b/tests/test_postgresql_watcher.py @@ -50,7 +50,7 @@ def test_pg_watcher_init(self): assert isinstance(pg_watcher.parent_conn, connection.PipeConnection) else: assert isinstance(pg_watcher.parent_conn, connection.Connection) - assert isinstance(pg_watcher.subscription_proces, context.Process) + assert isinstance(pg_watcher.subscription_process, context.Process) def test_update_single_pg_watcher(self): pg_watcher = get_watcher("test_update_single_pg_watcher") From d0098d79e09bebfeaacba630ff524ab65d7804de Mon Sep 17 00:00:00 2001 From: Colin Nolan Date: Thu, 29 Jan 2026 13:58:07 +0000 Subject: [PATCH 2/7] fix: join process on termination --- postgresql_watcher/watcher.py | 1 + 1 file changed, 1 insertion(+) diff --git a/postgresql_watcher/watcher.py b/postgresql_watcher/watcher.py index 7789a35..b723f2f 100644 --- a/postgresql_watcher/watcher.py +++ b/postgresql_watcher/watcher.py @@ -134,6 +134,7 @@ def _cleanup_connections_and_processes(self) -> None: self.child_conn = None if self.subscription_process is not None: self.subscription_process.terminate() + self.subscription_process.join() self.subscription_process = None def set_update_callback(self, update_handler: Optional[Callable[[None], None]]): From 96ea26d3dea214ce444214c7d1235ce9ebdaa862 Mon Sep 17 00:00:00 2001 From: Colin Nolan Date: Thu, 29 Jan 2026 14:01:11 +0000 Subject: [PATCH 3/7] feat: adds explit `stop` method to stop the watcher --- postgresql_watcher/watcher.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/postgresql_watcher/watcher.py b/postgresql_watcher/watcher.py index b723f2f..94ce0c6 100644 --- a/postgresql_watcher/watcher.py +++ b/postgresql_watcher/watcher.py @@ -124,6 +124,9 @@ def start( raise PostgresqlWatcherChannelSubscriptionTimeoutError(timeout) sleep(1 / 1000) # wait for 1 ms + def stop(self): + self._cleanup_connections_and_processes() + def _cleanup_connections_and_processes(self) -> None: # Clean up potentially existing Connections and Processes if self.parent_conn is not None: From 5066d1543f2e1ef1380ebcfe01e68c07d9ddf1de Mon Sep 17 00:00:00 2001 From: Colin Nolan Date: Thu, 29 Jan 2026 14:03:31 +0000 Subject: [PATCH 4/7] fix: don't init `self.parent_conn` twice --- postgresql_watcher/watcher.py | 1 - 1 file changed, 1 deletion(-) diff --git a/postgresql_watcher/watcher.py b/postgresql_watcher/watcher.py index 94ce0c6..d68597f 100644 --- a/postgresql_watcher/watcher.py +++ b/postgresql_watcher/watcher.py @@ -50,7 +50,6 @@ def __init__( logger (Optional[Logger], optional): Custom logger to use. Defaults to None. """ self.update_callback = None - self.parent_conn = None self.host = host self.port = port self.user = user From 637d5b95ecaaa8eef0ca3cbbb87c6b912b244802 Mon Sep 17 00:00:00 2001 From: Colin Nolan Date: Mon, 2 Feb 2026 12:27:30 +0000 Subject: [PATCH 5/7] fix: only terminate process if has pid --- postgresql_watcher/watcher.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/postgresql_watcher/watcher.py b/postgresql_watcher/watcher.py index d68597f..85ff35b 100644 --- a/postgresql_watcher/watcher.py +++ b/postgresql_watcher/watcher.py @@ -134,7 +134,7 @@ def _cleanup_connections_and_processes(self) -> None: if self.child_conn is not None: self.child_conn.close() self.child_conn = None - if self.subscription_process is not None: + if self.subscription_process is not None and self.subscription_process.pid is not None: self.subscription_process.terminate() self.subscription_process.join() self.subscription_process = None From 5636720d26566ab74e3b0333e89a689faa48daa3 Mon Sep 17 00:00:00 2001 From: Colin Nolan Date: Mon, 2 Feb 2026 12:28:32 +0000 Subject: [PATCH 6/7] feat: allow re-start after stop --- postgresql_watcher/watcher.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/postgresql_watcher/watcher.py b/postgresql_watcher/watcher.py index 85ff35b..50ff524 100644 --- a/postgresql_watcher/watcher.py +++ b/postgresql_watcher/watcher.py @@ -108,6 +108,9 @@ def start( self, timeout=20, # seconds ): + if self.subscription_process is None: + self._create_subscription_process(start_listening=False) + if not self.subscription_process.is_alive(): # Start listening to messages self.subscription_process.start() From b9431fd4a8b676025f6ef9b2da1c13d44b6619de Mon Sep 17 00:00:00 2001 From: Colin Nolan Date: Mon, 2 Feb 2026 12:33:59 +0000 Subject: [PATCH 7/7] test: stop and (re)start --- tests/test_postgresql_watcher.py | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/tests/test_postgresql_watcher.py b/tests/test_postgresql_watcher.py index b4c7c3c..5311ccb 100644 --- a/tests/test_postgresql_watcher.py +++ b/tests/test_postgresql_watcher.py @@ -115,6 +115,28 @@ def test_update_handler_not_called(self): self.assertFalse(main_watcher.should_reload()) self.assertTrue(handler.call_count == 0) + def test_stop_and_restart(self): + channel_name = "test_stop_and_restart" + pg_watcher = get_watcher(channel_name) + + # Verify initially started + self.assertTrue(pg_watcher.subscription_process.is_alive()) + + # Stop the watcher + pg_watcher.stop() + self.assertIsNone(pg_watcher.subscription_process) + + # Restart the watcher + pg_watcher.start() + + # Verify resources are recreated and process is alive + self.assertTrue(pg_watcher.subscription_process.is_alive()) + + # Verify it still works after restart + pg_watcher.update() + sleep(CASBIN_CHANNEL_SELECT_TIMEOUT * 2) + self.assertTrue(pg_watcher.should_reload()) + if __name__ == "__main__": main()