From a147b7de22c43fe38971b308ce468ea1a2f83191 Mon Sep 17 00:00:00 2001 From: MBM Date: Thu, 21 May 2026 16:27:29 +0530 Subject: [PATCH] fix: fix signal.signal() ValueError in Trino worker threads Signed-off-by: MBM --- .../tests/test_trino_queries.py | 37 +++++++++++++++++++ .../trino_offline_store/trino_queries.py | 6 ++- 2 files changed, 41 insertions(+), 2 deletions(-) create mode 100644 sdk/python/feast/infra/offline_stores/contrib/trino_offline_store/tests/test_trino_queries.py diff --git a/sdk/python/feast/infra/offline_stores/contrib/trino_offline_store/tests/test_trino_queries.py b/sdk/python/feast/infra/offline_stores/contrib/trino_offline_store/tests/test_trino_queries.py new file mode 100644 index 00000000000..883ce71ada1 --- /dev/null +++ b/sdk/python/feast/infra/offline_stores/contrib/trino_offline_store/tests/test_trino_queries.py @@ -0,0 +1,37 @@ +import threading +from unittest.mock import MagicMock + +from feast.infra.offline_stores.contrib.trino_offline_store.trino_queries import ( + Query, +) + + +def test_query_init_in_main_thread_registers_signals(): + """signal.signal() should work fine in main thread.""" + cursor = MagicMock() + # Should not raise any exception in main thread + query = Query(query_text="SELECT 1", cursor=cursor) + assert query.query_text == "SELECT 1" + + +def test_query_init_in_worker_thread_does_not_raise(): + """Regression test: signal.signal() fails in non-main threads.""" + # signal.signal() raises ValueError when called outside the main thread. + # This test verifies the fix guards against that by running Query.__init__ + # in a worker thread and ensuring no exception is raised. + + errors = [] + cursor = MagicMock() + + def create_query(): + try: + query = Query(query_text="SELECT 1", cursor=cursor) + assert query.query_text == "SELECT 1" + except ValueError as e: + errors.append(e) + + thread = threading.Thread(target=create_query) + thread.start() + thread.join() + + assert not errors, f"Unexpected ValueError in worker thread: {errors[0]}" diff --git a/sdk/python/feast/infra/offline_stores/contrib/trino_offline_store/trino_queries.py b/sdk/python/feast/infra/offline_stores/contrib/trino_offline_store/trino_queries.py index 302745fc0e9..ef23a5cf021 100644 --- a/sdk/python/feast/infra/offline_stores/contrib/trino_offline_store/trino_queries.py +++ b/sdk/python/feast/infra/offline_stores/contrib/trino_offline_store/trino_queries.py @@ -1,6 +1,7 @@ from __future__ import annotations import signal +import threading from dataclasses import dataclass from enum import Enum from typing import Any, Dict, List, Optional @@ -92,8 +93,9 @@ def __init__(self, query_text: str, cursor: Cursor): self.status = QueryStatus.PENDING self._cursor = cursor - signal.signal(signal.SIGINT, self.cancel) - signal.signal(signal.SIGTERM, self.cancel) + if threading.current_thread() is threading.main_thread(): + signal.signal(signal.SIGINT, self.cancel) + signal.signal(signal.SIGTERM, self.cancel) def execute(self) -> Results: try: