From ebcb1c77a68fb6f4e81096e4c7dd7659147035fe Mon Sep 17 00:00:00 2001 From: vinay79n Date: Wed, 3 Dec 2025 02:08:53 +0530 Subject: [PATCH 01/21] feat: replace cursor.execute() with run_sql utility for executing SQL queries using conn.execute_string .instead --- .../_snowflake/write_audit_publish.py | 25 ++++++++++++++--- .../metaflow/get_snowflake_connection.py | 17 ++++++++--- src/ds_platform_utils/metaflow/pandas.py | 28 +++++++++++-------- .../metaflow/write_audit_publish.py | 16 +++++++++-- src/ds_platform_utils/shared/utils.py | 12 ++++++++ 5 files changed, 75 insertions(+), 23 deletions(-) create mode 100644 src/ds_platform_utils/shared/utils.py diff --git a/src/ds_platform_utils/_snowflake/write_audit_publish.py b/src/ds_platform_utils/_snowflake/write_audit_publish.py index 6475bed..8a7c65d 100644 --- a/src/ds_platform_utils/_snowflake/write_audit_publish.py +++ b/src/ds_platform_utils/_snowflake/write_audit_publish.py @@ -8,6 +8,7 @@ from snowflake.connector.cursor import SnowflakeCursor from ds_platform_utils.metaflow._consts import NON_PROD_SCHEMA, PROD_SCHEMA +from ds_platform_utils.shared.utils import run_sql def write_audit_publish( # noqa: PLR0913 (too-many-arguments) this fn is an exception @@ -201,7 +202,8 @@ def run_query(query: str, cursor: Optional[SnowflakeCursor] = None) -> None: return # Count statements so we can tell Snowflake exactly how many to expect - cursor.execute(query, num_statements=0) # 0 means any number of statements + # cursor.execute(query, num_statements=0) # 0 means any number of statements + run_sql(cursor.connection, query) cursor.connection.commit() @@ -216,7 +218,11 @@ def run_audit_query(query: str, cursor: Optional[SnowflakeCursor] = None) -> dic if cursor is None: return {"mock_result": True} - cursor.execute(query) + # cursor.execute(query) + cursor = run_sql(cursor.connection, query) + if cursor is None: + return {} + result = cursor.fetchone() if not result: return {} @@ -243,11 +249,22 @@ def fetch_table_preview( if not cursor: return [{"mock_col": "mock_val"}] - cursor.execute(f""" + # cursor.execute(f""" + # SELECT * + # FROM {database}.{schema}.{table_name} + # LIMIT {n_rows}; + # """) + cursor = run_sql( + cursor.connection, + f""" SELECT * FROM {database}.{schema}.{table_name} LIMIT {n_rows}; - """) + """, + ) + if cursor is None: + return [] + columns = [col[0] for col in cursor.description] rows = cursor.fetchall() return [dict(zip(columns, row)) for row in rows] diff --git a/src/ds_platform_utils/metaflow/get_snowflake_connection.py b/src/ds_platform_utils/metaflow/get_snowflake_connection.py index 8eaeba1..a2ef1e3 100644 --- a/src/ds_platform_utils/metaflow/get_snowflake_connection.py +++ b/src/ds_platform_utils/metaflow/get_snowflake_connection.py @@ -4,6 +4,8 @@ from metaflow import Snowflake, current from snowflake.connector import SnowflakeConnection +from ds_platform_utils.shared.utils import run_sql + #################### # --- Metaflow --- # #################### @@ -67,10 +69,17 @@ def _create_snowflake_connection( queries.append(f"ALTER SESSION SET QUERY_TAG = '{query_tag}';") # Execute all queries in single batch - with conn.cursor() as cursor: - sql = "\n".join(queries) - _debug_print_query(sql) - cursor.execute(sql, num_statements=0) + # with conn.cursor() as cursor: + # sql = "\n".join(queries) + # _debug_print_query(sql) + # cursor.execute(sql, num_statements=0) + + # Merge into single SQL batch + sql = "\n".join(queries) + _debug_print_query(sql) + + if sql.strip(): + run_sql(conn, sql) return conn diff --git a/src/ds_platform_utils/metaflow/pandas.py b/src/ds_platform_utils/metaflow/pandas.py index 743d48d..417ee74 100644 --- a/src/ds_platform_utils/metaflow/pandas.py +++ b/src/ds_platform_utils/metaflow/pandas.py @@ -18,6 +18,7 @@ add_comment_to_each_sql_statement, get_select_dev_query_tags, ) +from ds_platform_utils.shared.utils import run_sql TWarehouse = Literal[ "OUTERBOUNDS_DATA_SCIENCE_ADS_PROD_XS_WH", @@ -111,15 +112,16 @@ def publish_pandas( # noqa: PLR0913 (too many arguments) # set warehouse if warehouse is not None: - with conn.cursor() as cur: - cur.execute(f"USE WAREHOUSE {warehouse};") + # with conn.cursor() as cur: + # cur.execute(f"USE WAREHOUSE {warehouse};") + run_sql(conn, f"USE WAREHOUSE {warehouse};") - # set query tag for cost tracking in select.dev - # REASON: because write_pandas() doesn't allow modifying the SQL query to add SQL comments in it directly, - # so we set a session query tag instead. - tags = get_select_dev_query_tags() - query_tag_str = json.dumps(tags) - cur.execute(f"ALTER SESSION SET QUERY_TAG = '{query_tag_str}';") + # set query tag for cost tracking in select.dev + # REASON: because write_pandas() doesn't allow modifying the SQL query to add SQL comments in it directly, + # so we set a session query tag instead. + tags = get_select_dev_query_tags() + query_tag_str = json.dumps(tags) + run_sql(conn, f"ALTER SESSION SET QUERY_TAG = '{query_tag_str}';") # https://docs.snowflake.com/en/developer-guide/snowpark/reference/python/latest/snowpark/api/snowflake.snowpark.Session.write_pandas write_pandas( @@ -199,15 +201,17 @@ def query_pandas_from_snowflake( conn: SnowflakeConnection = get_snowflake_connection(use_utc) with conn.cursor() as cur: - if warehouse is not None: - cur.execute(f"USE WAREHOUSE {warehouse};") + # if warehouse is not None: + # cur.execute(f"USE WAREHOUSE {warehouse};") + run_sql(conn, f"USE WAREHOUSE {warehouse};") # force_return_table=True -- returns a Pyarrow Table always even if the result is empty - result: pyarrow.Table = cur.execute(query).fetch_arrow_all(force_return_table=True) + # result: pyarrow.Table = cur.execute(query).fetch_arrow_all(force_return_table=True) + cur = run_sql(conn, query) + result: pyarrow.Table = cur.fetch_arrow_all(force_return_table=True) df = result.to_pandas() df.columns = df.columns.str.lower() current.card.append(Markdown("### Query Result")) current.card.append(Table.from_dataframe(df.head())) - return df diff --git a/src/ds_platform_utils/metaflow/write_audit_publish.py b/src/ds_platform_utils/metaflow/write_audit_publish.py index 04423a6..22563b8 100644 --- a/src/ds_platform_utils/metaflow/write_audit_publish.py +++ b/src/ds_platform_utils/metaflow/write_audit_publish.py @@ -11,6 +11,7 @@ from snowflake.connector.cursor import SnowflakeCursor from ds_platform_utils.metaflow.get_snowflake_connection import get_snowflake_connection +from ds_platform_utils.shared.utils import run_sql if TYPE_CHECKING: from ds_platform_utils._snowflake.write_audit_publish import ( @@ -216,7 +217,8 @@ def publish( # noqa: PLR0913, D417 with conn.cursor() as cur: if warehouse is not None: - cur.execute(f"USE WAREHOUSE {warehouse}") + # cur.execute(f"USE WAREHOUSE {warehouse}") + run_sql(conn, f"USE WAREHOUSE {warehouse}") last_op_was_write = False for operation in write_audit_publish( @@ -334,11 +336,19 @@ def fetch_table_preview( :param table_name: Table name :param cursor: Snowflake cursor """ - cursor.execute(f""" + # cursor.execute(f""" + # SELECT * + # FROM {database}.{schema}.{table_name} + # LIMIT {n_rows}; + # """) + cursor = run_sql( + cursor.connection, + f""" SELECT * FROM {database}.{schema}.{table_name} LIMIT {n_rows}; - """) + """, + ) columns = [col[0] for col in cursor.description] rows = cursor.fetchall() diff --git a/src/ds_platform_utils/shared/utils.py b/src/ds_platform_utils/shared/utils.py new file mode 100644 index 0000000..7bbf257 --- /dev/null +++ b/src/ds_platform_utils/shared/utils.py @@ -0,0 +1,12 @@ +"""Utility functions which are shared.""" + + +def run_sql(cn, sql: str): + """Runs SQL using execute_string and returns the *last* cursor. + + mimicking Snowflake's `cursor.execute()` behavior. + """ + last_cursor = None + for cur in cn.execute_string(sql): + last_cursor = cur + return last_cursor From 28bd3ec8032bfe22892597fe774a509ba96f1f4f Mon Sep 17 00:00:00 2001 From: vinay79n Date: Wed, 3 Dec 2025 02:11:39 +0530 Subject: [PATCH 02/21] bump version --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 0f163f4..5ed724a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "ds-platform-utils" -version = "0.2.3" +version = "0.2.4" description = "Utility library for Pattern Data Science." readme = "README.md" authors = [ From f84e8c3e619cafe25166a9ad05fd28469be7dd03 Mon Sep 17 00:00:00 2001 From: vinay79n Date: Wed, 3 Dec 2025 02:49:43 +0530 Subject: [PATCH 03/21] feat: added some copilot code cirrection suggetions and for the critical shared utility `run_sql` utility added unit tests --- src/ds_platform_utils/metaflow/pandas.py | 17 +- .../metaflow/write_audit_publish.py | 43 ++--- src/ds_platform_utils/shared/utils.py | 14 +- tests/unit_tests/shared/__init__.py | 1 + tests/unit_tests/shared/test__utils.py | 161 ++++++++++++++++++ 5 files changed, 207 insertions(+), 29 deletions(-) create mode 100644 tests/unit_tests/shared/__init__.py create mode 100644 tests/unit_tests/shared/test__utils.py diff --git a/src/ds_platform_utils/metaflow/pandas.py b/src/ds_platform_utils/metaflow/pandas.py index 417ee74..477b867 100644 --- a/src/ds_platform_utils/metaflow/pandas.py +++ b/src/ds_platform_utils/metaflow/pandas.py @@ -201,17 +201,20 @@ def query_pandas_from_snowflake( conn: SnowflakeConnection = get_snowflake_connection(use_utc) with conn.cursor() as cur: - # if warehouse is not None: - # cur.execute(f"USE WAREHOUSE {warehouse};") - run_sql(conn, f"USE WAREHOUSE {warehouse};") + if warehouse is not None: + # cur.execute(f"USE WAREHOUSE {warehouse};") + run_sql(conn, f"USE WAREHOUSE {warehouse};") # force_return_table=True -- returns a Pyarrow Table always even if the result is empty # result: pyarrow.Table = cur.execute(query).fetch_arrow_all(force_return_table=True) cur = run_sql(conn, query) - result: pyarrow.Table = cur.fetch_arrow_all(force_return_table=True) - - df = result.to_pandas() - df.columns = df.columns.str.lower() + if cur is None: + # No statements to execute, return empty DataFrame + df = pd.DataFrame() + else: + result: pyarrow.Table = cur.fetch_arrow_all(force_return_table=True) + df = result.to_pandas() + df.columns = df.columns.str.lower() current.card.append(Markdown("### Query Result")) current.card.append(Table.from_dataframe(df.head())) diff --git a/src/ds_platform_utils/metaflow/write_audit_publish.py b/src/ds_platform_utils/metaflow/write_audit_publish.py index 22563b8..84077ac 100644 --- a/src/ds_platform_utils/metaflow/write_audit_publish.py +++ b/src/ds_platform_utils/metaflow/write_audit_publish.py @@ -341,23 +341,26 @@ def fetch_table_preview( # FROM {database}.{schema}.{table_name} # LIMIT {n_rows}; # """) - cursor = run_sql( - cursor.connection, - f""" - SELECT * - FROM {database}.{schema}.{table_name} - LIMIT {n_rows}; - """, - ) - columns = [col[0] for col in cursor.description] - rows = cursor.fetchall() - - # Create header row plus data rows - table_rows = [[Artifact(col) for col in columns]] # Header row - for row in rows: - table_rows.append([Artifact(val) for val in row]) # Data rows - - return [ - Markdown(f"### Table Preview: ({database}.{schema}.{table_name})"), - Table(table_rows), - ] + if cursor is None: + return [] + else: + cursor = run_sql( + cursor.connection, + f""" + SELECT * + FROM {database}.{schema}.{table_name} + LIMIT {n_rows}; + """, + ) + columns = [col[0] for col in cursor.description] + rows = cursor.fetchall() + + # Create header row plus data rows + table_rows = [[Artifact(col) for col in columns]] # Header row + for row in rows: + table_rows.append([Artifact(val) for val in row]) # Data rows + + return [ + Markdown(f"### Table Preview: ({database}.{schema}.{table_name})"), + Table(table_rows), + ] diff --git a/src/ds_platform_utils/shared/utils.py b/src/ds_platform_utils/shared/utils.py index 7bbf257..31d7841 100644 --- a/src/ds_platform_utils/shared/utils.py +++ b/src/ds_platform_utils/shared/utils.py @@ -1,12 +1,22 @@ """Utility functions which are shared.""" +from typing import Optional -def run_sql(cn, sql: str): +from snowflake.connector import SnowflakeConnection +from snowflake.connector.cursor import SnowflakeCursor + + +def run_sql(conn: SnowflakeConnection, sql: str) -> Optional[SnowflakeCursor]: """Runs SQL using execute_string and returns the *last* cursor. mimicking Snowflake's `cursor.execute()` behavior. + + :param conn: Snowflake connection object + :param sql: SQL query or queries to execute + :return: The last cursor from executing the SQL statements, or None if no statements were executed + """ last_cursor = None - for cur in cn.execute_string(sql): + for cur in conn.execute_string(sql): last_cursor = cur return last_cursor diff --git a/tests/unit_tests/shared/__init__.py b/tests/unit_tests/shared/__init__.py new file mode 100644 index 0000000..b54df80 --- /dev/null +++ b/tests/unit_tests/shared/__init__.py @@ -0,0 +1 @@ +"""Unit tests for shared utilities.""" diff --git a/tests/unit_tests/shared/test__utils.py b/tests/unit_tests/shared/test__utils.py new file mode 100644 index 0000000..336cdee --- /dev/null +++ b/tests/unit_tests/shared/test__utils.py @@ -0,0 +1,161 @@ +"""Unit tests for shared utility functions.""" + +from unittest.mock import MagicMock, Mock + +import pytest + +from src.ds_platform_utils.shared.utils import run_sql + + +class TestRunSql: + """Test suite for run_sql utility function.""" + + def test_returns_last_cursor_with_multiple_statements(self): + """Test that run_sql returns the last cursor when multiple SQL statements are executed.""" + # Setup + mock_conn = Mock() + cursor1 = Mock() + cursor2 = Mock() + cursor3 = Mock() + mock_conn.execute_string.return_value = [cursor1, cursor2, cursor3] + + sql = "SELECT 1; SELECT 2; SELECT 3;" + + # Execute + result = run_sql(mock_conn, sql) + + # Verify + mock_conn.execute_string.assert_called_once_with(sql) + assert result is cursor3 + + def test_returns_single_cursor_with_single_statement(self): + """Test that run_sql returns the cursor when a single SQL statement is executed.""" + # Setup + mock_conn = Mock() + cursor = Mock() + mock_conn.execute_string.return_value = [cursor] + + sql = "SELECT * FROM table;" + + # Execute + result = run_sql(mock_conn, sql) + + # Verify + mock_conn.execute_string.assert_called_once_with(sql) + assert result is cursor + + def test_returns_none_when_no_statements_executed(self): + """Test that run_sql returns None when no statements are executed.""" + # Setup + mock_conn = Mock() + mock_conn.execute_string.return_value = [] + + sql = "" + + # Execute + result = run_sql(mock_conn, sql) + + # Verify + mock_conn.execute_string.assert_called_once_with(sql) + assert result is None + + def test_handles_empty_sql_string(self): + """Test that run_sql correctly handles empty SQL strings.""" + # Setup + mock_conn = Mock() + mock_conn.execute_string.return_value = [] + + sql = "" + + # Execute + result = run_sql(mock_conn, sql) + + # Verify + mock_conn.execute_string.assert_called_once_with(sql) + assert result is None + + def test_handles_whitespace_only_sql(self): + """Test that run_sql correctly handles SQL strings with only whitespace.""" + # Setup + mock_conn = Mock() + mock_conn.execute_string.return_value = [] + + sql = " \n\t " + + # Execute + result = run_sql(mock_conn, sql) + + # Verify + mock_conn.execute_string.assert_called_once_with(sql) + assert result is None + + def test_handles_sql_with_comments_only(self): + """Test that run_sql correctly handles SQL with only comments.""" + # Setup + mock_conn = Mock() + mock_conn.execute_string.return_value = [] + + sql = "-- This is a comment\n/* This is another comment */" + + # Execute + result = run_sql(mock_conn, sql) + + # Verify + mock_conn.execute_string.assert_called_once_with(sql) + assert result is None + + def test_preserves_cursor_iteration_order(self): + """Test that run_sql iterates through all cursors in order.""" + # Setup + mock_conn = Mock() + cursors = [Mock(name=f"cursor{i}") for i in range(5)] + mock_conn.execute_string.return_value = cursors + + sql = "SELECT 1; SELECT 2; SELECT 3; SELECT 4; SELECT 5;" + + # Execute + result = run_sql(mock_conn, sql) + + # Verify + mock_conn.execute_string.assert_called_once_with(sql) + assert result is cursors[-1] + + def test_passes_connection_object_correctly(self): + """Test that run_sql correctly uses the provided connection object.""" + # Setup + mock_conn = MagicMock() + cursor = Mock() + mock_conn.execute_string.return_value = [cursor] + + sql = "SELECT * FROM table;" + + # Execute + result = run_sql(mock_conn, sql) + + # Verify + mock_conn.execute_string.assert_called_once_with(sql) + assert result is cursor + + def test_handles_complex_multistatement_sql(self): + """Test that run_sql handles complex multi-statement SQL with various statement types.""" + # Setup + mock_conn = Mock() + cursor1 = Mock() + cursor2 = Mock() + cursor3 = Mock() + cursor4 = Mock() + mock_conn.execute_string.return_value = [cursor1, cursor2, cursor3, cursor4] + + sql = """ + CREATE TABLE temp_table AS SELECT * FROM source; + INSERT INTO target_table SELECT * FROM temp_table; + UPDATE target_table SET status = 'processed'; + DROP TABLE temp_table; + """ + + # Execute + result = run_sql(mock_conn, sql) + + # Verify + mock_conn.execute_string.assert_called_once_with(sql) + assert result is cursor4 From 657276cfb6e4ea82afa86b24ecb8edd439820d94 Mon Sep 17 00:00:00 2001 From: vinay79n Date: Wed, 3 Dec 2025 02:54:02 +0530 Subject: [PATCH 04/21] lint fix --- src/ds_platform_utils/metaflow/pandas.py | 2 +- tests/unit_tests/shared/test__utils.py | 2 -- uv.lock | 2 +- 3 files changed, 2 insertions(+), 4 deletions(-) diff --git a/src/ds_platform_utils/metaflow/pandas.py b/src/ds_platform_utils/metaflow/pandas.py index 477b867..1452f3f 100644 --- a/src/ds_platform_utils/metaflow/pandas.py +++ b/src/ds_platform_utils/metaflow/pandas.py @@ -212,7 +212,7 @@ def query_pandas_from_snowflake( # No statements to execute, return empty DataFrame df = pd.DataFrame() else: - result: pyarrow.Table = cur.fetch_arrow_all(force_return_table=True) + result: pyarrow.Table = cur.fetch_arrow_all(force_return_table=True) df = result.to_pandas() df.columns = df.columns.str.lower() diff --git a/tests/unit_tests/shared/test__utils.py b/tests/unit_tests/shared/test__utils.py index 336cdee..888afb6 100644 --- a/tests/unit_tests/shared/test__utils.py +++ b/tests/unit_tests/shared/test__utils.py @@ -2,8 +2,6 @@ from unittest.mock import MagicMock, Mock -import pytest - from src.ds_platform_utils.shared.utils import run_sql diff --git a/uv.lock b/uv.lock index de6dc7f..6de8948 100644 --- a/uv.lock +++ b/uv.lock @@ -478,7 +478,7 @@ wheels = [ [[package]] name = "ds-platform-utils" -version = "0.2.3" +version = "0.2.4" source = { editable = "." } dependencies = [ { name = "jinja2" }, From ea578dccadb1124870d941179e1eca449d40b907 Mon Sep 17 00:00:00 2001 From: vinay79n Date: Wed, 3 Dec 2025 03:16:57 +0530 Subject: [PATCH 05/21] feat: add --no-pylint option to test command in pandas read/write flow tests to resolve test failures due to pylint --- tests/functional_tests/metaflow/test__pandas.py | 2 +- tests/functional_tests/metaflow/test_pandas_utc.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/functional_tests/metaflow/test__pandas.py b/tests/functional_tests/metaflow/test__pandas.py index 2e92af4..20aa80e 100644 --- a/tests/functional_tests/metaflow/test__pandas.py +++ b/tests/functional_tests/metaflow/test__pandas.py @@ -99,7 +99,7 @@ def end(self): @pytest.mark.slow def test_pandas_read_write_flow(): """Test that the publish flow runs successfully.""" - cmd = [sys.executable, __file__, "--environment=local", "--with=card", "run"] + cmd = [sys.executable, __file__, "--environment=local", "--with=card", "--no-pylint", "run"] print("\n=== Metaflow Output ===") for line in execute_with_output(cmd): diff --git a/tests/functional_tests/metaflow/test_pandas_utc.py b/tests/functional_tests/metaflow/test_pandas_utc.py index f07f4cd..36dfe73 100644 --- a/tests/functional_tests/metaflow/test_pandas_utc.py +++ b/tests/functional_tests/metaflow/test_pandas_utc.py @@ -156,7 +156,7 @@ def end(self): @pytest.mark.slow def test_pandas_read_write_flow(): """Test that the publish flow runs successfully.""" - cmd = [sys.executable, __file__, "--environment=local", "--with=card", "run"] + cmd = [sys.executable, __file__, "--environment=local", "--with=card", "--no-pylint", "run"] print("\n=== Metaflow Output ===") for line in execute_with_output(cmd): From 01b3d6829e1f1b86430c6fc31b290ab78228b765 Mon Sep 17 00:00:00 2001 From: vinay79n Date: Wed, 3 Dec 2025 11:52:37 +0530 Subject: [PATCH 06/21] fix: refactor query_pandas_from_snowflake to use run_sql for executing SQL queries and added return df which I forgot before and was encountring Nonetype error --- src/ds_platform_utils/metaflow/pandas.py | 37 ++++++++++++------------ 1 file changed, 19 insertions(+), 18 deletions(-) diff --git a/src/ds_platform_utils/metaflow/pandas.py b/src/ds_platform_utils/metaflow/pandas.py index 1452f3f..970f62b 100644 --- a/src/ds_platform_utils/metaflow/pandas.py +++ b/src/ds_platform_utils/metaflow/pandas.py @@ -200,21 +200,22 @@ def query_pandas_from_snowflake( current.card.append(Markdown(f"```sql\n{query}\n```")) conn: SnowflakeConnection = get_snowflake_connection(use_utc) - with conn.cursor() as cur: - if warehouse is not None: - # cur.execute(f"USE WAREHOUSE {warehouse};") - run_sql(conn, f"USE WAREHOUSE {warehouse};") - - # force_return_table=True -- returns a Pyarrow Table always even if the result is empty - # result: pyarrow.Table = cur.execute(query).fetch_arrow_all(force_return_table=True) - cur = run_sql(conn, query) - if cur is None: - # No statements to execute, return empty DataFrame - df = pd.DataFrame() - else: - result: pyarrow.Table = cur.fetch_arrow_all(force_return_table=True) - df = result.to_pandas() - df.columns = df.columns.str.lower() - - current.card.append(Markdown("### Query Result")) - current.card.append(Table.from_dataframe(df.head())) + if warehouse is not None: + # cur.execute(f"USE WAREHOUSE {warehouse};") + run_sql(conn, f"USE WAREHOUSE {warehouse};") + + # force_return_table=True -- returns a Pyarrow Table always even if the result is empty + # result: pyarrow.Table = cur.execute(query).fetch_arrow_all(force_return_table=True) + cursor_result = run_sql(conn, query) + if cursor_result is None: + # No statements to execute, return empty DataFrame + df = pd.DataFrame() + else: + result: pyarrow.Table = cursor_result.fetch_arrow_all(force_return_table=True) + df = result.to_pandas() + df.columns = df.columns.str.lower() + + current.card.append(Markdown("### Query Result")) + current.card.append(Table.from_dataframe(df.head())) + + return df From 489a1a6500146b532c5d38c4884f513958a2e2f7 Mon Sep 17 00:00:00 2001 From: vinay79n Date: Wed, 3 Dec 2025 12:05:47 +0530 Subject: [PATCH 07/21] fix: remove --no-pylint option from test command in pandas read/write flow tests --- tests/functional_tests/metaflow/test__pandas.py | 2 +- tests/functional_tests/metaflow/test_pandas_utc.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/functional_tests/metaflow/test__pandas.py b/tests/functional_tests/metaflow/test__pandas.py index 20aa80e..2e92af4 100644 --- a/tests/functional_tests/metaflow/test__pandas.py +++ b/tests/functional_tests/metaflow/test__pandas.py @@ -99,7 +99,7 @@ def end(self): @pytest.mark.slow def test_pandas_read_write_flow(): """Test that the publish flow runs successfully.""" - cmd = [sys.executable, __file__, "--environment=local", "--with=card", "--no-pylint", "run"] + cmd = [sys.executable, __file__, "--environment=local", "--with=card", "run"] print("\n=== Metaflow Output ===") for line in execute_with_output(cmd): diff --git a/tests/functional_tests/metaflow/test_pandas_utc.py b/tests/functional_tests/metaflow/test_pandas_utc.py index 36dfe73..f07f4cd 100644 --- a/tests/functional_tests/metaflow/test_pandas_utc.py +++ b/tests/functional_tests/metaflow/test_pandas_utc.py @@ -156,7 +156,7 @@ def end(self): @pytest.mark.slow def test_pandas_read_write_flow(): """Test that the publish flow runs successfully.""" - cmd = [sys.executable, __file__, "--environment=local", "--with=card", "--no-pylint", "run"] + cmd = [sys.executable, __file__, "--environment=local", "--with=card", "run"] print("\n=== Metaflow Output ===") for line in execute_with_output(cmd): From 529642deb74b72a70ba4e2b820b4fde4b558561e Mon Sep 17 00:00:00 2001 From: vinay79n Date: Wed, 3 Dec 2025 12:20:39 +0530 Subject: [PATCH 08/21] chore: removed old commented code --- .../_snowflake/write_audit_publish.py | 11 ++--------- .../metaflow/get_snowflake_connection.py | 6 ------ src/ds_platform_utils/metaflow/pandas.py | 6 +----- .../metaflow/write_audit_publish.py | 14 +++++--------- 4 files changed, 8 insertions(+), 29 deletions(-) diff --git a/src/ds_platform_utils/_snowflake/write_audit_publish.py b/src/ds_platform_utils/_snowflake/write_audit_publish.py index 8a7c65d..e355008 100644 --- a/src/ds_platform_utils/_snowflake/write_audit_publish.py +++ b/src/ds_platform_utils/_snowflake/write_audit_publish.py @@ -200,9 +200,8 @@ def run_query(query: str, cursor: Optional[SnowflakeCursor] = None) -> None: if cursor is None: print(f"Would execute query:\n{query}") return - - # Count statements so we can tell Snowflake exactly how many to expect - # cursor.execute(query, num_statements=0) # 0 means any number of statements + + # run the query using run_sql utility which handles multiple statements via execute_string run_sql(cursor.connection, query) cursor.connection.commit() @@ -218,7 +217,6 @@ def run_audit_query(query: str, cursor: Optional[SnowflakeCursor] = None) -> dic if cursor is None: return {"mock_result": True} - # cursor.execute(query) cursor = run_sql(cursor.connection, query) if cursor is None: return {} @@ -249,11 +247,6 @@ def fetch_table_preview( if not cursor: return [{"mock_col": "mock_val"}] - # cursor.execute(f""" - # SELECT * - # FROM {database}.{schema}.{table_name} - # LIMIT {n_rows}; - # """) cursor = run_sql( cursor.connection, f""" diff --git a/src/ds_platform_utils/metaflow/get_snowflake_connection.py b/src/ds_platform_utils/metaflow/get_snowflake_connection.py index a2ef1e3..5fc2ed1 100644 --- a/src/ds_platform_utils/metaflow/get_snowflake_connection.py +++ b/src/ds_platform_utils/metaflow/get_snowflake_connection.py @@ -68,12 +68,6 @@ def _create_snowflake_connection( if query_tag: queries.append(f"ALTER SESSION SET QUERY_TAG = '{query_tag}';") - # Execute all queries in single batch - # with conn.cursor() as cursor: - # sql = "\n".join(queries) - # _debug_print_query(sql) - # cursor.execute(sql, num_statements=0) - # Merge into single SQL batch sql = "\n".join(queries) _debug_print_query(sql) diff --git a/src/ds_platform_utils/metaflow/pandas.py b/src/ds_platform_utils/metaflow/pandas.py index 970f62b..99c099c 100644 --- a/src/ds_platform_utils/metaflow/pandas.py +++ b/src/ds_platform_utils/metaflow/pandas.py @@ -112,8 +112,6 @@ def publish_pandas( # noqa: PLR0913 (too many arguments) # set warehouse if warehouse is not None: - # with conn.cursor() as cur: - # cur.execute(f"USE WAREHOUSE {warehouse};") run_sql(conn, f"USE WAREHOUSE {warehouse};") # set query tag for cost tracking in select.dev @@ -201,16 +199,14 @@ def query_pandas_from_snowflake( conn: SnowflakeConnection = get_snowflake_connection(use_utc) if warehouse is not None: - # cur.execute(f"USE WAREHOUSE {warehouse};") run_sql(conn, f"USE WAREHOUSE {warehouse};") - # force_return_table=True -- returns a Pyarrow Table always even if the result is empty - # result: pyarrow.Table = cur.execute(query).fetch_arrow_all(force_return_table=True) cursor_result = run_sql(conn, query) if cursor_result is None: # No statements to execute, return empty DataFrame df = pd.DataFrame() else: + # force_return_table=True -- returns a Pyarrow Table always even if the result is empty result: pyarrow.Table = cursor_result.fetch_arrow_all(force_return_table=True) df = result.to_pandas() df.columns = df.columns.str.lower() diff --git a/src/ds_platform_utils/metaflow/write_audit_publish.py b/src/ds_platform_utils/metaflow/write_audit_publish.py index 84077ac..9c9cb31 100644 --- a/src/ds_platform_utils/metaflow/write_audit_publish.py +++ b/src/ds_platform_utils/metaflow/write_audit_publish.py @@ -217,7 +217,6 @@ def publish( # noqa: PLR0913, D417 with conn.cursor() as cur: if warehouse is not None: - # cur.execute(f"USE WAREHOUSE {warehouse}") run_sql(conn, f"USE WAREHOUSE {warehouse}") last_op_was_write = False @@ -336,15 +335,10 @@ def fetch_table_preview( :param table_name: Table name :param cursor: Snowflake cursor """ - # cursor.execute(f""" - # SELECT * - # FROM {database}.{schema}.{table_name} - # LIMIT {n_rows}; - # """) if cursor is None: return [] else: - cursor = run_sql( + result_cursor = run_sql( cursor.connection, f""" SELECT * @@ -352,8 +346,10 @@ def fetch_table_preview( LIMIT {n_rows}; """, ) - columns = [col[0] for col in cursor.description] - rows = cursor.fetchall() + if result_cursor is None: + return [] + columns = [col[0] for col in result_cursor.description] + rows = result_cursor.fetchall() # Create header row plus data rows table_rows = [[Artifact(col) for col in columns]] # Header row From fc67c370cfdcfa0edbb2642962947faff90c35f3 Mon Sep 17 00:00:00 2001 From: vinay79n Date: Wed, 3 Dec 2025 12:25:30 +0530 Subject: [PATCH 09/21] lint fix --- src/ds_platform_utils/_snowflake/write_audit_publish.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/ds_platform_utils/_snowflake/write_audit_publish.py b/src/ds_platform_utils/_snowflake/write_audit_publish.py index e355008..b5adb2c 100644 --- a/src/ds_platform_utils/_snowflake/write_audit_publish.py +++ b/src/ds_platform_utils/_snowflake/write_audit_publish.py @@ -200,8 +200,8 @@ def run_query(query: str, cursor: Optional[SnowflakeCursor] = None) -> None: if cursor is None: print(f"Would execute query:\n{query}") return - - # run the query using run_sql utility which handles multiple statements via execute_string + + # run the query using run_sql utility which handles multiple statements via execute_string run_sql(cursor.connection, query) cursor.connection.commit() From 8927f143620ee402d33266e7ac7d932aaad27ed5 Mon Sep 17 00:00:00 2001 From: vinay79n Date: Wed, 3 Dec 2025 14:29:42 +0530 Subject: [PATCH 10/21] feat: add integration style tests for run_sql integration with publish , publish_pandas and query_pandas_from_snowflake --- .../metaflow/test__run_sql_integration.py | 205 ++++++++++++++++++ 1 file changed, 205 insertions(+) create mode 100644 tests/functional_tests/metaflow/test__run_sql_integration.py diff --git a/tests/functional_tests/metaflow/test__run_sql_integration.py b/tests/functional_tests/metaflow/test__run_sql_integration.py new file mode 100644 index 0000000..c38e23a --- /dev/null +++ b/tests/functional_tests/metaflow/test__run_sql_integration.py @@ -0,0 +1,205 @@ +"""Functional test for run_sql + publish/publish_pandas/query_pandas_from_snowflake.""" + +import subprocess +import sys +from datetime import datetime + +import pandas as pd +import pytest +import pytz +from metaflow import FlowSpec, project, step + +from ds_platform_utils.metaflow import ( + publish, + publish_pandas, + query_pandas_from_snowflake, +) +from ds_platform_utils.metaflow.get_snowflake_connection import get_snowflake_connection +from ds_platform_utils.shared.utils import run_sql + + +@project(name="test_run_sql_integration_flow") +class TestRunSqlIntegrationFlow(FlowSpec): + """Metaflow flow that will test run_sql integration via various paths. + + - publish_pandas() + - publish() + - query_pandas_from_snowflake() + - run_sql() with various SQL patterns (edge cases) + """ + + @step + def start(self): + """Start the flow.""" + self.next(self.test_publish_pandas_basic) + + @step + def test_publish_pandas_basic(self): + """Publish a simple DataFrame without warehouse (no run_sql here, just sanity).""" + df = pd.DataFrame( + { + "id": [1, 2, 3], + "name": ["a", "b", "c"], + "created_at": [datetime.now(pytz.UTC)] * 3, + } + ) + + publish_pandas( + table_name="RUN_SQL_ITG_PANDAS", + df=df, + auto_create_table=True, + overwrite=True, + ) + self.next(self.test_publish_pandas_with_warehouse) + + @step + def test_publish_pandas_with_warehouse(self): + """Publish using a specific warehouse. + + This hits run_sql via: + - USE WAREHOUSE ... + - ALTER SESSION SET QUERY_TAG = ... + """ + df = pd.DataFrame( + { + "id": [10, 20], + "name": ["x", "y"], + "created_at": [datetime.now(pytz.UTC)] * 2, + } + ) + + publish_pandas( + table_name="RUN_SQL_ITG_PANDAS_WH", + df=df, + auto_create_table=True, + overwrite=True, + warehouse="OUTERBOUNDS_DATA_SCIENCE_SHARED_DEV_XS_WH", + ) + self.next(self.test_publish_with_wap) + + @step + def test_publish_with_wap(self): + """Test publish() which uses write_audit_publish under the hood. + + This ensures publish still works when run_sql is used for session setup elsewhere. + """ + # Very simple WAP query: just recreate the table from itself + query = """ + CREATE OR REPLACE TABLE PATTERN_DB.{{schema}}.RUN_SQL_ITG_PUBLISH AS + SELECT * FROM PATTERN_DB.{{schema}}.RUN_SQL_ITG_PANDAS_WH; + """ + + publish( + table_name="RUN_SQL_ITG_PUBLISH", + query=query, + warehouse="OUTERBOUNDS_DATA_SCIENCE_SHARED_DEV_XS_WH", + ) + + self.next(self.test_query_pandas_multi_statement) + + @step + def test_query_pandas_multi_statement(self): + """Test query_pandas_from_snowflake with a multi-statement SQL. + + We rely on run_sql() internally (execute_string) and ensure we get + the result of the *last* statement. + """ + multi_stmt_query = """ + -- create a temp table + CREATE OR REPLACE TEMP TABLE PATTERN_DB.{{schema}}._RUN_SQL_ITG_TMP AS + SELECT 42 AS answer; + + -- final select (this should be the last cursor) + SELECT * FROM PATTERN_DB.{{schema}}._RUN_SQL_ITG_TMP; + """ + + df = query_pandas_from_snowflake( + multi_stmt_query, + warehouse="OUTERBOUNDS_DATA_SCIENCE_SHARED_DEV_XS_WH", + ) + + # We expect the last statement's result: a single row with answer = 42 + assert len(df) == 1 + assert "answer" in df.columns + assert df["answer"].iloc[0] == 42 + + self.next(self.test_run_sql_edge_cases) + + @step + def test_run_sql_edge_cases(self): + """Directly test run_sql() with different SQL patterns (edge cases). + + - empty string + - whitespace only + - comments only + - simple single statement + - simple multi-statement + """ + conn = get_snowflake_connection(use_utc=True) + + # 1) Empty string → no statements executed → None + cursor = run_sql(conn, "") + assert cursor is None + + # 2) Whitespace only → also effectively nothing + cursor = run_sql(conn, " \n\t ") + assert cursor is None + + # 3) Comments only → Snowflake treats these as "no executable statements" + cursor = run_sql(conn, "-- comment only\n/* another comment */") + assert cursor is None + + # 4) Single statement (SELECT) + cursor = run_sql(conn, "SELECT 1 AS x;") + assert cursor is not None + rows = cursor.fetchall() + assert len(rows) == 1 + assert rows[0][0] == 1 # x == 1 + + # 5) Multi-statement: ensure we get cursor for *last* stmt + cursor = run_sql(conn, "SELECT 1 AS x; SELECT 2 AS x;") + assert cursor is not None + rows = cursor.fetchall() + # Last cursor should correspond to 'SELECT 2 AS x;' + assert len(rows) == 1 + assert rows[0][0] == 2 + + self.next(self.end) + + @step + def end(self): + """End of flow.""" + pass + + +if __name__ == "__main__": + TestRunSqlIntegrationFlow() + + +@pytest.mark.slow +def test_run_sql_integration_flow(): + """Run the run_sql integration flow via Metaflow CLI.""" + cmd = [sys.executable, __file__, "--environment=local", "--with=card", "run"] + + print("\n=== Metaflow Output ===") + for line in execute_with_output(cmd): + print(line, end="") + + +def execute_with_output(cmd): + """Execute a command and yield output lines as they are produced.""" + process = subprocess.Popen( + cmd, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + universal_newlines=True, + bufsize=1, + ) + + for line in iter(process.stdout.readline, ""): + yield line + + process.stdout.close() + return_code = process.wait() + if return_code: + raise subprocess.CalledProcessError(return_code, cmd) From 4a933d6f5e01a283809c23cc860026e0fd7c5955 Mon Sep 17 00:00:00 2001 From: vinay79n Date: Wed, 3 Dec 2025 16:00:23 +0530 Subject: [PATCH 11/21] feat: rename run_sql with _execute_sql and moved it to _snowflake private sub module making the method also private and deleted shared directory , also moved test__utils.py to snowflake/ in functional tests --- pyproject.toml | 2 +- .../_snowflake/write_audit_publish.py | 30 ++++++++++--- .../metaflow/get_snowflake_connection.py | 4 +- src/ds_platform_utils/metaflow/pandas.py | 10 ++--- .../metaflow/write_audit_publish.py | 6 +-- src/ds_platform_utils/shared/utils.py | 22 ---------- .../metaflow/test__run_sql_integration.py | 44 +++++++++---------- .../snowflake}/test__utils.py | 0 tests/unit_tests/shared/__init__.py | 1 - uv.lock | 2 +- 10 files changed, 59 insertions(+), 62 deletions(-) delete mode 100644 src/ds_platform_utils/shared/utils.py rename tests/{unit_tests/shared => functional_tests/snowflake}/test__utils.py (100%) delete mode 100644 tests/unit_tests/shared/__init__.py diff --git a/pyproject.toml b/pyproject.toml index 5ed724a..4ba58fd 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "ds-platform-utils" -version = "0.2.4" +version = "0.3.0" description = "Utility library for Pattern Data Science." readme = "README.md" authors = [ diff --git a/src/ds_platform_utils/_snowflake/write_audit_publish.py b/src/ds_platform_utils/_snowflake/write_audit_publish.py index b5adb2c..4e0c680 100644 --- a/src/ds_platform_utils/_snowflake/write_audit_publish.py +++ b/src/ds_platform_utils/_snowflake/write_audit_publish.py @@ -5,10 +5,10 @@ from typing import Any, Generator, Literal, Optional, Union from jinja2 import DebugUndefined, Template +from snowflake.connector import SnowflakeConnection from snowflake.connector.cursor import SnowflakeCursor from ds_platform_utils.metaflow._consts import NON_PROD_SCHEMA, PROD_SCHEMA -from ds_platform_utils.shared.utils import run_sql def write_audit_publish( # noqa: PLR0913 (too-many-arguments) this fn is an exception @@ -164,6 +164,26 @@ def _write_audit_publish( # noqa: PLR0913 (too-many-arguments) this fn is an ex ) +def _execute_sql(conn: SnowflakeConnection, sql: str) -> Optional[SnowflakeCursor]: + """Execute SQL statement(s) using Snowflake's ``connection.execute_string()`` and return the *last* resulting cursor. + + Snowflake's ``execute_string`` allows a single string containing multiple SQL + statements (separated by semicolons) to be executed at once. Unlike + ``cursor.execute()``, which handles exactly one statement and returns a single + cursor object, ``execute_string`` returns a **list of cursors**—one cursor for each + individual SQL statement in the batch. + + :param conn: Snowflake connection object + :param sql: SQL query or batch of semicolon-delimited SQL statements + :return: The cursor corresponding to the last executed statement, or None if no + statements were executed + """ + last_cursor = None + for cur in conn.execute_string(sql): + last_cursor = cur + return last_cursor + + @dataclass class SQLOperation: """SQL operation details.""" @@ -201,8 +221,8 @@ def run_query(query: str, cursor: Optional[SnowflakeCursor] = None) -> None: print(f"Would execute query:\n{query}") return - # run the query using run_sql utility which handles multiple statements via execute_string - run_sql(cursor.connection, query) + # run the query using _execute_sql utility which handles multiple statements via execute_string + _execute_sql(cursor.connection, query) cursor.connection.commit() @@ -217,7 +237,7 @@ def run_audit_query(query: str, cursor: Optional[SnowflakeCursor] = None) -> dic if cursor is None: return {"mock_result": True} - cursor = run_sql(cursor.connection, query) + cursor = _execute_sql(cursor.connection, query) if cursor is None: return {} @@ -247,7 +267,7 @@ def fetch_table_preview( if not cursor: return [{"mock_col": "mock_val"}] - cursor = run_sql( + cursor = _execute_sql( cursor.connection, f""" SELECT * diff --git a/src/ds_platform_utils/metaflow/get_snowflake_connection.py b/src/ds_platform_utils/metaflow/get_snowflake_connection.py index 5fc2ed1..eb555ad 100644 --- a/src/ds_platform_utils/metaflow/get_snowflake_connection.py +++ b/src/ds_platform_utils/metaflow/get_snowflake_connection.py @@ -4,7 +4,7 @@ from metaflow import Snowflake, current from snowflake.connector import SnowflakeConnection -from ds_platform_utils.shared.utils import run_sql +from ds_platform_utils._snowflake.utils import _execute_sql #################### # --- Metaflow --- # @@ -73,7 +73,7 @@ def _create_snowflake_connection( _debug_print_query(sql) if sql.strip(): - run_sql(conn, sql) + _execute_sql(conn, sql) return conn diff --git a/src/ds_platform_utils/metaflow/pandas.py b/src/ds_platform_utils/metaflow/pandas.py index 99c099c..ad9b541 100644 --- a/src/ds_platform_utils/metaflow/pandas.py +++ b/src/ds_platform_utils/metaflow/pandas.py @@ -11,6 +11,7 @@ from snowflake.connector import SnowflakeConnection from snowflake.connector.pandas_tools import write_pandas +from ds_platform_utils._snowflake.utils import _execute_sql from ds_platform_utils.metaflow._consts import NON_PROD_SCHEMA, PROD_SCHEMA from ds_platform_utils.metaflow.get_snowflake_connection import _debug_print_query, get_snowflake_connection from ds_platform_utils.metaflow.write_audit_publish import ( @@ -18,7 +19,6 @@ add_comment_to_each_sql_statement, get_select_dev_query_tags, ) -from ds_platform_utils.shared.utils import run_sql TWarehouse = Literal[ "OUTERBOUNDS_DATA_SCIENCE_ADS_PROD_XS_WH", @@ -112,14 +112,14 @@ def publish_pandas( # noqa: PLR0913 (too many arguments) # set warehouse if warehouse is not None: - run_sql(conn, f"USE WAREHOUSE {warehouse};") + _execute_sql(conn, f"USE WAREHOUSE {warehouse};") # set query tag for cost tracking in select.dev # REASON: because write_pandas() doesn't allow modifying the SQL query to add SQL comments in it directly, # so we set a session query tag instead. tags = get_select_dev_query_tags() query_tag_str = json.dumps(tags) - run_sql(conn, f"ALTER SESSION SET QUERY_TAG = '{query_tag_str}';") + _execute_sql(conn, f"ALTER SESSION SET QUERY_TAG = '{query_tag_str}';") # https://docs.snowflake.com/en/developer-guide/snowpark/reference/python/latest/snowpark/api/snowflake.snowpark.Session.write_pandas write_pandas( @@ -199,9 +199,9 @@ def query_pandas_from_snowflake( conn: SnowflakeConnection = get_snowflake_connection(use_utc) if warehouse is not None: - run_sql(conn, f"USE WAREHOUSE {warehouse};") + _execute_sql(conn, f"USE WAREHOUSE {warehouse};") - cursor_result = run_sql(conn, query) + cursor_result = _execute_sql(conn, query) if cursor_result is None: # No statements to execute, return empty DataFrame df = pd.DataFrame() diff --git a/src/ds_platform_utils/metaflow/write_audit_publish.py b/src/ds_platform_utils/metaflow/write_audit_publish.py index 9c9cb31..f070973 100644 --- a/src/ds_platform_utils/metaflow/write_audit_publish.py +++ b/src/ds_platform_utils/metaflow/write_audit_publish.py @@ -10,8 +10,8 @@ from metaflow.cards import Artifact, Markdown, Table from snowflake.connector.cursor import SnowflakeCursor +from ds_platform_utils._snowflake.utils import _execute_sql from ds_platform_utils.metaflow.get_snowflake_connection import get_snowflake_connection -from ds_platform_utils.shared.utils import run_sql if TYPE_CHECKING: from ds_platform_utils._snowflake.write_audit_publish import ( @@ -217,7 +217,7 @@ def publish( # noqa: PLR0913, D417 with conn.cursor() as cur: if warehouse is not None: - run_sql(conn, f"USE WAREHOUSE {warehouse}") + _execute_sql(conn, f"USE WAREHOUSE {warehouse}") last_op_was_write = False for operation in write_audit_publish( @@ -338,7 +338,7 @@ def fetch_table_preview( if cursor is None: return [] else: - result_cursor = run_sql( + result_cursor = _execute_sql( cursor.connection, f""" SELECT * diff --git a/src/ds_platform_utils/shared/utils.py b/src/ds_platform_utils/shared/utils.py deleted file mode 100644 index 31d7841..0000000 --- a/src/ds_platform_utils/shared/utils.py +++ /dev/null @@ -1,22 +0,0 @@ -"""Utility functions which are shared.""" - -from typing import Optional - -from snowflake.connector import SnowflakeConnection -from snowflake.connector.cursor import SnowflakeCursor - - -def run_sql(conn: SnowflakeConnection, sql: str) -> Optional[SnowflakeCursor]: - """Runs SQL using execute_string and returns the *last* cursor. - - mimicking Snowflake's `cursor.execute()` behavior. - - :param conn: Snowflake connection object - :param sql: SQL query or queries to execute - :return: The last cursor from executing the SQL statements, or None if no statements were executed - - """ - last_cursor = None - for cur in conn.execute_string(sql): - last_cursor = cur - return last_cursor diff --git a/tests/functional_tests/metaflow/test__run_sql_integration.py b/tests/functional_tests/metaflow/test__run_sql_integration.py index c38e23a..a9550ed 100644 --- a/tests/functional_tests/metaflow/test__run_sql_integration.py +++ b/tests/functional_tests/metaflow/test__run_sql_integration.py @@ -1,4 +1,4 @@ -"""Functional test for run_sql + publish/publish_pandas/query_pandas_from_snowflake.""" +"""Functional test for _execute_sql + publish/publish_pandas/query_pandas_from_snowflake.""" import subprocess import sys @@ -9,23 +9,23 @@ import pytz from metaflow import FlowSpec, project, step +from ds_platform_utils._snowflake.utils import _execute_sql from ds_platform_utils.metaflow import ( publish, publish_pandas, query_pandas_from_snowflake, ) from ds_platform_utils.metaflow.get_snowflake_connection import get_snowflake_connection -from ds_platform_utils.shared.utils import run_sql -@project(name="test_run_sql_integration_flow") -class TestRunSqlIntegrationFlow(FlowSpec): - """Metaflow flow that will test run_sql integration via various paths. +@project(name="test_execute_sql_integration_flow") +class TestExecuteSqlIntegrationFlow(FlowSpec): + """Metaflow flow that will test _execute_sql integration via various paths. - publish_pandas() - publish() - query_pandas_from_snowflake() - - run_sql() with various SQL patterns (edge cases) + - _execute_sql() with various SQL patterns (edge cases) """ @step @@ -35,7 +35,7 @@ def start(self): @step def test_publish_pandas_basic(self): - """Publish a simple DataFrame without warehouse (no run_sql here, just sanity).""" + """Publish a simple DataFrame without warehouse (no _execute_sql here, just sanity).""" df = pd.DataFrame( { "id": [1, 2, 3], @@ -56,7 +56,7 @@ def test_publish_pandas_basic(self): def test_publish_pandas_with_warehouse(self): """Publish using a specific warehouse. - This hits run_sql via: + This hits _execute_sql via: - USE WAREHOUSE ... - ALTER SESSION SET QUERY_TAG = ... """ @@ -81,11 +81,11 @@ def test_publish_pandas_with_warehouse(self): def test_publish_with_wap(self): """Test publish() which uses write_audit_publish under the hood. - This ensures publish still works when run_sql is used for session setup elsewhere. + This ensures publish still works when _execute_sql is used for session setup elsewhere. """ # Very simple WAP query: just recreate the table from itself query = """ - CREATE OR REPLACE TABLE PATTERN_DB.{{schema}}.RUN_SQL_ITG_PUBLISH AS + CREATE OR REPLACE TABLE PATTERN_DB.{{schema}}.{{table_name}} AS SELECT * FROM PATTERN_DB.{{schema}}.RUN_SQL_ITG_PANDAS_WH; """ @@ -101,7 +101,7 @@ def test_publish_with_wap(self): def test_query_pandas_multi_statement(self): """Test query_pandas_from_snowflake with a multi-statement SQL. - We rely on run_sql() internally (execute_string) and ensure we get + We rely on _execute_sql() internally (execute_string) and ensure we get the result of the *last* statement. """ multi_stmt_query = """ @@ -123,11 +123,11 @@ def test_query_pandas_multi_statement(self): assert "answer" in df.columns assert df["answer"].iloc[0] == 42 - self.next(self.test_run_sql_edge_cases) + self.next(self.test_execute_sql_edge_cases) @step - def test_run_sql_edge_cases(self): - """Directly test run_sql() with different SQL patterns (edge cases). + def test_execute_sql_edge_cases(self): + """Directly test _execute_sql() with different SQL patterns (edge cases). - empty string - whitespace only @@ -138,26 +138,26 @@ def test_run_sql_edge_cases(self): conn = get_snowflake_connection(use_utc=True) # 1) Empty string → no statements executed → None - cursor = run_sql(conn, "") + cursor = _execute_sql(conn, "") assert cursor is None # 2) Whitespace only → also effectively nothing - cursor = run_sql(conn, " \n\t ") + cursor = _execute_sql(conn, " \n\t ") assert cursor is None # 3) Comments only → Snowflake treats these as "no executable statements" - cursor = run_sql(conn, "-- comment only\n/* another comment */") + cursor = _execute_sql(conn, "-- comment only\n/* another comment */") assert cursor is None # 4) Single statement (SELECT) - cursor = run_sql(conn, "SELECT 1 AS x;") + cursor = _execute_sql(conn, "SELECT 1 AS x;") assert cursor is not None rows = cursor.fetchall() assert len(rows) == 1 assert rows[0][0] == 1 # x == 1 # 5) Multi-statement: ensure we get cursor for *last* stmt - cursor = run_sql(conn, "SELECT 1 AS x; SELECT 2 AS x;") + cursor = _execute_sql(conn, "SELECT 1 AS x; SELECT 2 AS x;") assert cursor is not None rows = cursor.fetchall() # Last cursor should correspond to 'SELECT 2 AS x;' @@ -173,12 +173,12 @@ def end(self): if __name__ == "__main__": - TestRunSqlIntegrationFlow() + TestExecuteSqlIntegrationFlow() @pytest.mark.slow -def test_run_sql_integration_flow(): - """Run the run_sql integration flow via Metaflow CLI.""" +def test_execute_sql_integration_flow(): + """Run the _execute_sql integration flow via Metaflow CLI.""" cmd = [sys.executable, __file__, "--environment=local", "--with=card", "run"] print("\n=== Metaflow Output ===") diff --git a/tests/unit_tests/shared/test__utils.py b/tests/functional_tests/snowflake/test__utils.py similarity index 100% rename from tests/unit_tests/shared/test__utils.py rename to tests/functional_tests/snowflake/test__utils.py diff --git a/tests/unit_tests/shared/__init__.py b/tests/unit_tests/shared/__init__.py deleted file mode 100644 index b54df80..0000000 --- a/tests/unit_tests/shared/__init__.py +++ /dev/null @@ -1 +0,0 @@ -"""Unit tests for shared utilities.""" diff --git a/uv.lock b/uv.lock index 6de8948..c2d957a 100644 --- a/uv.lock +++ b/uv.lock @@ -478,7 +478,7 @@ wheels = [ [[package]] name = "ds-platform-utils" -version = "0.2.4" +version = "0.3.0" source = { editable = "." } dependencies = [ { name = "jinja2" }, From 8669655ed6c405336f0ddc3b0ed39f6d6454c3a7 Mon Sep 17 00:00:00 2001 From: vinay79n Date: Wed, 3 Dec 2025 16:33:42 +0530 Subject: [PATCH 12/21] feat: move _execute_sql to shared module and update references across the codebase --- src/ds_platform_utils/_snowflake/shared.py | 26 ++++++++++++ .../_snowflake/write_audit_publish.py | 22 +--------- .../metaflow/get_snowflake_connection.py | 2 +- src/ds_platform_utils/metaflow/pandas.py | 2 +- .../metaflow/write_audit_publish.py | 2 +- ...on.py => test__execute_sql_integration.py} | 2 +- .../functional_tests/snowflake/test__utils.py | 40 +++++++++---------- 7 files changed, 51 insertions(+), 45 deletions(-) create mode 100644 src/ds_platform_utils/_snowflake/shared.py rename tests/functional_tests/metaflow/{test__run_sql_integration.py => test__execute_sql_integration.py} (99%) diff --git a/src/ds_platform_utils/_snowflake/shared.py b/src/ds_platform_utils/_snowflake/shared.py new file mode 100644 index 0000000..e6e9544 --- /dev/null +++ b/src/ds_platform_utils/_snowflake/shared.py @@ -0,0 +1,26 @@ +""""Shared Snowflake utility functions.""" + +from typing import Optional + +from snowflake.connector import SnowflakeConnection +from snowflake.connector.cursor import SnowflakeCursor + + +def _execute_sql(conn: SnowflakeConnection, sql: str) -> Optional[SnowflakeCursor]: + """Execute SQL statement(s) using Snowflake's ``connection.execute_string()`` and return the *last* resulting cursor. + + Snowflake's ``execute_string`` allows a single string containing multiple SQL + statements (separated by semicolons) to be executed at once. Unlike + ``cursor.execute()``, which handles exactly one statement and returns a single + cursor object, ``execute_string`` returns a **list of cursors**—one cursor for each + individual SQL statement in the batch. + + :param conn: Snowflake connection object + :param sql: SQL query or batch of semicolon-delimited SQL statements + :return: The cursor corresponding to the last executed statement, or None if no + statements were executed + """ + last_cursor = None + for cur in conn.execute_string(sql): + last_cursor = cur + return last_cursor diff --git a/src/ds_platform_utils/_snowflake/write_audit_publish.py b/src/ds_platform_utils/_snowflake/write_audit_publish.py index 4e0c680..3030222 100644 --- a/src/ds_platform_utils/_snowflake/write_audit_publish.py +++ b/src/ds_platform_utils/_snowflake/write_audit_publish.py @@ -5,9 +5,9 @@ from typing import Any, Generator, Literal, Optional, Union from jinja2 import DebugUndefined, Template -from snowflake.connector import SnowflakeConnection from snowflake.connector.cursor import SnowflakeCursor +from ds_platform_utils._snowflake.shared import _execute_sql from ds_platform_utils.metaflow._consts import NON_PROD_SCHEMA, PROD_SCHEMA @@ -164,26 +164,6 @@ def _write_audit_publish( # noqa: PLR0913 (too-many-arguments) this fn is an ex ) -def _execute_sql(conn: SnowflakeConnection, sql: str) -> Optional[SnowflakeCursor]: - """Execute SQL statement(s) using Snowflake's ``connection.execute_string()`` and return the *last* resulting cursor. - - Snowflake's ``execute_string`` allows a single string containing multiple SQL - statements (separated by semicolons) to be executed at once. Unlike - ``cursor.execute()``, which handles exactly one statement and returns a single - cursor object, ``execute_string`` returns a **list of cursors**—one cursor for each - individual SQL statement in the batch. - - :param conn: Snowflake connection object - :param sql: SQL query or batch of semicolon-delimited SQL statements - :return: The cursor corresponding to the last executed statement, or None if no - statements were executed - """ - last_cursor = None - for cur in conn.execute_string(sql): - last_cursor = cur - return last_cursor - - @dataclass class SQLOperation: """SQL operation details.""" diff --git a/src/ds_platform_utils/metaflow/get_snowflake_connection.py b/src/ds_platform_utils/metaflow/get_snowflake_connection.py index eb555ad..3015776 100644 --- a/src/ds_platform_utils/metaflow/get_snowflake_connection.py +++ b/src/ds_platform_utils/metaflow/get_snowflake_connection.py @@ -4,7 +4,7 @@ from metaflow import Snowflake, current from snowflake.connector import SnowflakeConnection -from ds_platform_utils._snowflake.utils import _execute_sql +from ds_platform_utils._snowflake.shared import _execute_sql #################### # --- Metaflow --- # diff --git a/src/ds_platform_utils/metaflow/pandas.py b/src/ds_platform_utils/metaflow/pandas.py index ad9b541..879a082 100644 --- a/src/ds_platform_utils/metaflow/pandas.py +++ b/src/ds_platform_utils/metaflow/pandas.py @@ -11,7 +11,7 @@ from snowflake.connector import SnowflakeConnection from snowflake.connector.pandas_tools import write_pandas -from ds_platform_utils._snowflake.utils import _execute_sql +from ds_platform_utils._snowflake.shared import _execute_sql from ds_platform_utils.metaflow._consts import NON_PROD_SCHEMA, PROD_SCHEMA from ds_platform_utils.metaflow.get_snowflake_connection import _debug_print_query, get_snowflake_connection from ds_platform_utils.metaflow.write_audit_publish import ( diff --git a/src/ds_platform_utils/metaflow/write_audit_publish.py b/src/ds_platform_utils/metaflow/write_audit_publish.py index f070973..d2c633d 100644 --- a/src/ds_platform_utils/metaflow/write_audit_publish.py +++ b/src/ds_platform_utils/metaflow/write_audit_publish.py @@ -10,7 +10,7 @@ from metaflow.cards import Artifact, Markdown, Table from snowflake.connector.cursor import SnowflakeCursor -from ds_platform_utils._snowflake.utils import _execute_sql +from ds_platform_utils._snowflake.shared import _execute_sql from ds_platform_utils.metaflow.get_snowflake_connection import get_snowflake_connection if TYPE_CHECKING: diff --git a/tests/functional_tests/metaflow/test__run_sql_integration.py b/tests/functional_tests/metaflow/test__execute_sql_integration.py similarity index 99% rename from tests/functional_tests/metaflow/test__run_sql_integration.py rename to tests/functional_tests/metaflow/test__execute_sql_integration.py index a9550ed..7bd9453 100644 --- a/tests/functional_tests/metaflow/test__run_sql_integration.py +++ b/tests/functional_tests/metaflow/test__execute_sql_integration.py @@ -9,7 +9,7 @@ import pytz from metaflow import FlowSpec, project, step -from ds_platform_utils._snowflake.utils import _execute_sql +from ds_platform_utils._snowflake.shared import _execute_sql from ds_platform_utils.metaflow import ( publish, publish_pandas, diff --git a/tests/functional_tests/snowflake/test__utils.py b/tests/functional_tests/snowflake/test__utils.py index 888afb6..92db77a 100644 --- a/tests/functional_tests/snowflake/test__utils.py +++ b/tests/functional_tests/snowflake/test__utils.py @@ -2,14 +2,14 @@ from unittest.mock import MagicMock, Mock -from src.ds_platform_utils.shared.utils import run_sql +from ds_platform_utils._snowflake.shared import _execute_sql class TestRunSql: - """Test suite for run_sql utility function.""" + """Test suite for _execute_sql utility function.""" def test_returns_last_cursor_with_multiple_statements(self): - """Test that run_sql returns the last cursor when multiple SQL statements are executed.""" + """Test that _execute_sql returns the last cursor when multiple SQL statements are executed.""" # Setup mock_conn = Mock() cursor1 = Mock() @@ -20,14 +20,14 @@ def test_returns_last_cursor_with_multiple_statements(self): sql = "SELECT 1; SELECT 2; SELECT 3;" # Execute - result = run_sql(mock_conn, sql) + result = _execute_sql(mock_conn, sql) # Verify mock_conn.execute_string.assert_called_once_with(sql) assert result is cursor3 def test_returns_single_cursor_with_single_statement(self): - """Test that run_sql returns the cursor when a single SQL statement is executed.""" + """Test that _execute_sql returns the cursor when a single SQL statement is executed.""" # Setup mock_conn = Mock() cursor = Mock() @@ -36,14 +36,14 @@ def test_returns_single_cursor_with_single_statement(self): sql = "SELECT * FROM table;" # Execute - result = run_sql(mock_conn, sql) + result = _execute_sql(mock_conn, sql) # Verify mock_conn.execute_string.assert_called_once_with(sql) assert result is cursor def test_returns_none_when_no_statements_executed(self): - """Test that run_sql returns None when no statements are executed.""" + """Test that _execute_sql returns None when no statements are executed.""" # Setup mock_conn = Mock() mock_conn.execute_string.return_value = [] @@ -51,14 +51,14 @@ def test_returns_none_when_no_statements_executed(self): sql = "" # Execute - result = run_sql(mock_conn, sql) + result = _execute_sql(mock_conn, sql) # Verify mock_conn.execute_string.assert_called_once_with(sql) assert result is None def test_handles_empty_sql_string(self): - """Test that run_sql correctly handles empty SQL strings.""" + """Test that _execute_sql correctly handles empty SQL strings.""" # Setup mock_conn = Mock() mock_conn.execute_string.return_value = [] @@ -66,14 +66,14 @@ def test_handles_empty_sql_string(self): sql = "" # Execute - result = run_sql(mock_conn, sql) + result = _execute_sql(mock_conn, sql) # Verify mock_conn.execute_string.assert_called_once_with(sql) assert result is None def test_handles_whitespace_only_sql(self): - """Test that run_sql correctly handles SQL strings with only whitespace.""" + """Test that _execute_sql correctly handles SQL strings with only whitespace.""" # Setup mock_conn = Mock() mock_conn.execute_string.return_value = [] @@ -81,14 +81,14 @@ def test_handles_whitespace_only_sql(self): sql = " \n\t " # Execute - result = run_sql(mock_conn, sql) + result = _execute_sql(mock_conn, sql) # Verify mock_conn.execute_string.assert_called_once_with(sql) assert result is None def test_handles_sql_with_comments_only(self): - """Test that run_sql correctly handles SQL with only comments.""" + """Test that _execute_sql correctly handles SQL with only comments.""" # Setup mock_conn = Mock() mock_conn.execute_string.return_value = [] @@ -96,14 +96,14 @@ def test_handles_sql_with_comments_only(self): sql = "-- This is a comment\n/* This is another comment */" # Execute - result = run_sql(mock_conn, sql) + result = _execute_sql(mock_conn, sql) # Verify mock_conn.execute_string.assert_called_once_with(sql) assert result is None def test_preserves_cursor_iteration_order(self): - """Test that run_sql iterates through all cursors in order.""" + """Test that _execute_sql iterates through all cursors in order.""" # Setup mock_conn = Mock() cursors = [Mock(name=f"cursor{i}") for i in range(5)] @@ -112,14 +112,14 @@ def test_preserves_cursor_iteration_order(self): sql = "SELECT 1; SELECT 2; SELECT 3; SELECT 4; SELECT 5;" # Execute - result = run_sql(mock_conn, sql) + result = _execute_sql(mock_conn, sql) # Verify mock_conn.execute_string.assert_called_once_with(sql) assert result is cursors[-1] def test_passes_connection_object_correctly(self): - """Test that run_sql correctly uses the provided connection object.""" + """Test that _execute_sql correctly uses the provided connection object.""" # Setup mock_conn = MagicMock() cursor = Mock() @@ -128,14 +128,14 @@ def test_passes_connection_object_correctly(self): sql = "SELECT * FROM table;" # Execute - result = run_sql(mock_conn, sql) + result = _execute_sql(mock_conn, sql) # Verify mock_conn.execute_string.assert_called_once_with(sql) assert result is cursor def test_handles_complex_multistatement_sql(self): - """Test that run_sql handles complex multi-statement SQL with various statement types.""" + """Test that _execute_sql handles complex multi-statement SQL with various statement types.""" # Setup mock_conn = Mock() cursor1 = Mock() @@ -152,7 +152,7 @@ def test_handles_complex_multistatement_sql(self): """ # Execute - result = run_sql(mock_conn, sql) + result = _execute_sql(mock_conn, sql) # Verify mock_conn.execute_string.assert_called_once_with(sql) From 3eae576c1a0dd84077c13831a2e9f81051ba7bd8 Mon Sep 17 00:00:00 2001 From: vinay79n Date: Wed, 3 Dec 2025 16:34:48 +0530 Subject: [PATCH 13/21] lint fix --- src/ds_platform_utils/_snowflake/shared.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/ds_platform_utils/_snowflake/shared.py b/src/ds_platform_utils/_snowflake/shared.py index e6e9544..a81498f 100644 --- a/src/ds_platform_utils/_snowflake/shared.py +++ b/src/ds_platform_utils/_snowflake/shared.py @@ -1,4 +1,4 @@ -""""Shared Snowflake utility functions.""" +"""Shared Snowflake utility functions.""" from typing import Optional From 6306d8ff4981dbc86b075e8cc92a9b2e19b68fbc Mon Sep 17 00:00:00 2001 From: vinay79n Date: Wed, 3 Dec 2025 16:59:18 +0530 Subject: [PATCH 14/21] feat: update _execute_sql integration tests to handle comment-only SQL statements --- .../metaflow/test__execute_sql_integration.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/tests/functional_tests/metaflow/test__execute_sql_integration.py b/tests/functional_tests/metaflow/test__execute_sql_integration.py index 7bd9453..d29f443 100644 --- a/tests/functional_tests/metaflow/test__execute_sql_integration.py +++ b/tests/functional_tests/metaflow/test__execute_sql_integration.py @@ -146,7 +146,9 @@ def test_execute_sql_edge_cases(self): assert cursor is None # 3) Comments only → Snowflake treats these as "no executable statements" - cursor = _execute_sql(conn, "-- comment only\n/* another comment */") + cursor = _execute_sql(conn, "-- comment only") + assert cursor is None + cursor = _execute_sql(conn, "/* comment only */") assert cursor is None # 4) Single statement (SELECT) From 99c792c3bf79a37234023109eee459999ca64902 Mon Sep 17 00:00:00 2001 From: vinay79n Date: Wed, 3 Dec 2025 17:08:35 +0530 Subject: [PATCH 15/21] fix: remove comment only sql test --- .../metaflow/test__execute_sql_integration.py | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/tests/functional_tests/metaflow/test__execute_sql_integration.py b/tests/functional_tests/metaflow/test__execute_sql_integration.py index d29f443..169ab2f 100644 --- a/tests/functional_tests/metaflow/test__execute_sql_integration.py +++ b/tests/functional_tests/metaflow/test__execute_sql_integration.py @@ -145,20 +145,14 @@ def test_execute_sql_edge_cases(self): cursor = _execute_sql(conn, " \n\t ") assert cursor is None - # 3) Comments only → Snowflake treats these as "no executable statements" - cursor = _execute_sql(conn, "-- comment only") - assert cursor is None - cursor = _execute_sql(conn, "/* comment only */") - assert cursor is None - - # 4) Single statement (SELECT) + # 3) Single statement (SELECT) cursor = _execute_sql(conn, "SELECT 1 AS x;") assert cursor is not None rows = cursor.fetchall() assert len(rows) == 1 assert rows[0][0] == 1 # x == 1 - # 5) Multi-statement: ensure we get cursor for *last* stmt + # 4) Multi-statement: ensure we get cursor for *last* stmt cursor = _execute_sql(conn, "SELECT 1 AS x; SELECT 2 AS x;") assert cursor is not None rows = cursor.fetchall() From 3248700f9539a3afd999b98d7cdbba4134075c1a Mon Sep 17 00:00:00 2001 From: vinay79n Date: Wed, 3 Dec 2025 17:15:50 +0530 Subject: [PATCH 16/21] removed redundent test --- tests/functional_tests/snowflake/test__utils.py | 15 --------------- 1 file changed, 15 deletions(-) diff --git a/tests/functional_tests/snowflake/test__utils.py b/tests/functional_tests/snowflake/test__utils.py index 92db77a..021edc0 100644 --- a/tests/functional_tests/snowflake/test__utils.py +++ b/tests/functional_tests/snowflake/test__utils.py @@ -42,21 +42,6 @@ def test_returns_single_cursor_with_single_statement(self): mock_conn.execute_string.assert_called_once_with(sql) assert result is cursor - def test_returns_none_when_no_statements_executed(self): - """Test that _execute_sql returns None when no statements are executed.""" - # Setup - mock_conn = Mock() - mock_conn.execute_string.return_value = [] - - sql = "" - - # Execute - result = _execute_sql(mock_conn, sql) - - # Verify - mock_conn.execute_string.assert_called_once_with(sql) - assert result is None - def test_handles_empty_sql_string(self): """Test that _execute_sql correctly handles empty SQL strings.""" # Setup From 605c4747c461ad42769eeb56b3f3111ceb321ab1 Mon Sep 17 00:00:00 2001 From: avr2002 Date: Thu, 4 Dec 2025 12:31:06 +0530 Subject: [PATCH 17/21] feat: renamed generic "shared" module name. Adder error handling --- .../_snowflake/{shared.py => run_query.py} | 27 ++++++++++++++----- 1 file changed, 21 insertions(+), 6 deletions(-) rename src/ds_platform_utils/_snowflake/{shared.py => run_query.py} (57%) diff --git a/src/ds_platform_utils/_snowflake/shared.py b/src/ds_platform_utils/_snowflake/run_query.py similarity index 57% rename from src/ds_platform_utils/_snowflake/shared.py rename to src/ds_platform_utils/_snowflake/run_query.py index a81498f..1b6b517 100644 --- a/src/ds_platform_utils/_snowflake/shared.py +++ b/src/ds_platform_utils/_snowflake/run_query.py @@ -1,9 +1,11 @@ """Shared Snowflake utility functions.""" -from typing import Optional +import warnings +from typing import Iterable, Optional from snowflake.connector import SnowflakeConnection from snowflake.connector.cursor import SnowflakeCursor +from snowflake.connector.errors import ProgrammingError def _execute_sql(conn: SnowflakeConnection, sql: str) -> Optional[SnowflakeCursor]: @@ -18,9 +20,22 @@ def _execute_sql(conn: SnowflakeConnection, sql: str) -> Optional[SnowflakeCurso :param conn: Snowflake connection object :param sql: SQL query or batch of semicolon-delimited SQL statements :return: The cursor corresponding to the last executed statement, or None if no - statements were executed + statements were executed or if the SQL contains only whitespace/comments """ - last_cursor = None - for cur in conn.execute_string(sql): - last_cursor = cur - return last_cursor + if not sql.strip(): + return None + + try: + cursors: Iterable[SnowflakeCursor] = conn.execute_string(sql.strip()) + + if cursors is None: + return None + + *_, last = cursors + return last + except ProgrammingError as e: + if "Empty SQL statement" in str(e): + # raise a warning and return None + warnings.warn("Empty SQL statement encountered; returning None.", category=UserWarning, stacklevel=2) + return None + raise From d89ff790af33d914ce6c4374b9c89d7a274537c8 Mon Sep 17 00:00:00 2001 From: avr2002 Date: Thu, 4 Dec 2025 12:33:24 +0530 Subject: [PATCH 18/21] feat: refactor _execute_sql import and error handling accessing metaflow current object --- .../metaflow/get_snowflake_connection.py | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/src/ds_platform_utils/metaflow/get_snowflake_connection.py b/src/ds_platform_utils/metaflow/get_snowflake_connection.py index 3015776..926393f 100644 --- a/src/ds_platform_utils/metaflow/get_snowflake_connection.py +++ b/src/ds_platform_utils/metaflow/get_snowflake_connection.py @@ -4,7 +4,7 @@ from metaflow import Snowflake, current from snowflake.connector import SnowflakeConnection -from ds_platform_utils._snowflake.shared import _execute_sql +from ds_platform_utils._snowflake.run_query import _execute_sql #################### # --- Metaflow --- # @@ -43,7 +43,12 @@ def get_snowflake_connection( In metaflow, each step is a separate Python process, so the connection will automatically be closed at the end of any steps that use this singleton. """ - return _create_snowflake_connection(use_utc=use_utc, query_tag=current.project_name) + if current and hasattr(current, "project_name"): + query_tag = current.project_name + else: + query_tag = None + + return _create_snowflake_connection(use_utc=use_utc, query_tag=query_tag) ##################### @@ -71,9 +76,7 @@ def _create_snowflake_connection( # Merge into single SQL batch sql = "\n".join(queries) _debug_print_query(sql) - - if sql.strip(): - _execute_sql(conn, sql) + _execute_sql(conn, sql) return conn From e957fea18be7a251dcaf3b077dd87d85380c6cbf Mon Sep 17 00:00:00 2001 From: avr2002 Date: Thu, 4 Dec 2025 12:34:04 +0530 Subject: [PATCH 19/21] refactor: using updated module name for _execute_sql function --- .../_snowflake/write_audit_publish.py | 2 +- src/ds_platform_utils/metaflow/pandas.py | 2 +- .../metaflow/write_audit_publish.py | 12 ++++++------ 3 files changed, 8 insertions(+), 8 deletions(-) diff --git a/src/ds_platform_utils/_snowflake/write_audit_publish.py b/src/ds_platform_utils/_snowflake/write_audit_publish.py index 3030222..5ae6046 100644 --- a/src/ds_platform_utils/_snowflake/write_audit_publish.py +++ b/src/ds_platform_utils/_snowflake/write_audit_publish.py @@ -7,7 +7,7 @@ from jinja2 import DebugUndefined, Template from snowflake.connector.cursor import SnowflakeCursor -from ds_platform_utils._snowflake.shared import _execute_sql +from ds_platform_utils._snowflake.run_query import _execute_sql from ds_platform_utils.metaflow._consts import NON_PROD_SCHEMA, PROD_SCHEMA diff --git a/src/ds_platform_utils/metaflow/pandas.py b/src/ds_platform_utils/metaflow/pandas.py index 879a082..d89c0ec 100644 --- a/src/ds_platform_utils/metaflow/pandas.py +++ b/src/ds_platform_utils/metaflow/pandas.py @@ -11,7 +11,7 @@ from snowflake.connector import SnowflakeConnection from snowflake.connector.pandas_tools import write_pandas -from ds_platform_utils._snowflake.shared import _execute_sql +from ds_platform_utils._snowflake.run_query import _execute_sql from ds_platform_utils.metaflow._consts import NON_PROD_SCHEMA, PROD_SCHEMA from ds_platform_utils.metaflow.get_snowflake_connection import _debug_print_query, get_snowflake_connection from ds_platform_utils.metaflow.write_audit_publish import ( diff --git a/src/ds_platform_utils/metaflow/write_audit_publish.py b/src/ds_platform_utils/metaflow/write_audit_publish.py index d2c633d..4672a64 100644 --- a/src/ds_platform_utils/metaflow/write_audit_publish.py +++ b/src/ds_platform_utils/metaflow/write_audit_publish.py @@ -10,7 +10,7 @@ from metaflow.cards import Artifact, Markdown, Table from snowflake.connector.cursor import SnowflakeCursor -from ds_platform_utils._snowflake.shared import _execute_sql +from ds_platform_utils._snowflake.run_query import _execute_sql from ds_platform_utils.metaflow.get_snowflake_connection import get_snowflake_connection if TYPE_CHECKING: @@ -98,7 +98,7 @@ def get_select_dev_query_tags() -> Dict[str, str]: stacklevel=2, ) - def extract(prefix: str, default: str = "unknown") -> str: + def _extract(prefix: str, default: str = "unknown") -> str: for tag in fetched_tags: if tag.startswith(prefix + ":"): return tag.split(":", 1)[1] @@ -107,19 +107,19 @@ def extract(prefix: str, default: str = "unknown") -> str: # most of these will be unknown if no tags are set on the flow # (most likely for the flow runs which are triggered manually locally) return { - "app": extract( + "app": _extract( "ds.domain" ), # first tag after 'app:', is the domain of the flow, fetched from current tags of the flow - "workload_id": extract( + "workload_id": _extract( "ds.project" ), # second tag after 'workload_id:', is the project of the flow which it belongs to - "flow_name": current.flow_name, # name of the metaflow flow + "flow_name": current.flow_name, "project": current.project_name, # Project name from the @project decorator, lets us # identify the flow’s project without relying on user tags (added via --tag). "step_name": current.step_name, # name of the current step "run_id": current.run_id, # run_id: unique id of the current run "user": current.username, # username of user who triggered the run (argo-workflows if its a deployed flow) - "domain": extract("ds.domain"), # business unit (domain) of the flow, same as app + "domain": _extract("ds.domain"), # business unit (domain) of the flow, same as app "namespace": current.namespace, # namespace of the flow "perimeter": str(os.environ.get("OB_CURRENT_PERIMETER") or os.environ.get("OBP_PERIMETER")), "is_production": str( From 66314b02e0438ff6f7aa8f8fcc7de99da06a84af Mon Sep 17 00:00:00 2001 From: avr2002 Date: Thu, 4 Dec 2025 12:34:50 +0530 Subject: [PATCH 20/21] test: removed redundant tests and adding new unit test for _execute_sql function --- .../metaflow/test__execute_sql_integration.py | 201 ------------------ .../functional_tests/snowflake/test__utils.py | 144 ------------- .../unit_tests/snowflake/test__execute_sql.py | 59 +++++ 3 files changed, 59 insertions(+), 345 deletions(-) delete mode 100644 tests/functional_tests/metaflow/test__execute_sql_integration.py delete mode 100644 tests/functional_tests/snowflake/test__utils.py create mode 100644 tests/unit_tests/snowflake/test__execute_sql.py diff --git a/tests/functional_tests/metaflow/test__execute_sql_integration.py b/tests/functional_tests/metaflow/test__execute_sql_integration.py deleted file mode 100644 index 169ab2f..0000000 --- a/tests/functional_tests/metaflow/test__execute_sql_integration.py +++ /dev/null @@ -1,201 +0,0 @@ -"""Functional test for _execute_sql + publish/publish_pandas/query_pandas_from_snowflake.""" - -import subprocess -import sys -from datetime import datetime - -import pandas as pd -import pytest -import pytz -from metaflow import FlowSpec, project, step - -from ds_platform_utils._snowflake.shared import _execute_sql -from ds_platform_utils.metaflow import ( - publish, - publish_pandas, - query_pandas_from_snowflake, -) -from ds_platform_utils.metaflow.get_snowflake_connection import get_snowflake_connection - - -@project(name="test_execute_sql_integration_flow") -class TestExecuteSqlIntegrationFlow(FlowSpec): - """Metaflow flow that will test _execute_sql integration via various paths. - - - publish_pandas() - - publish() - - query_pandas_from_snowflake() - - _execute_sql() with various SQL patterns (edge cases) - """ - - @step - def start(self): - """Start the flow.""" - self.next(self.test_publish_pandas_basic) - - @step - def test_publish_pandas_basic(self): - """Publish a simple DataFrame without warehouse (no _execute_sql here, just sanity).""" - df = pd.DataFrame( - { - "id": [1, 2, 3], - "name": ["a", "b", "c"], - "created_at": [datetime.now(pytz.UTC)] * 3, - } - ) - - publish_pandas( - table_name="RUN_SQL_ITG_PANDAS", - df=df, - auto_create_table=True, - overwrite=True, - ) - self.next(self.test_publish_pandas_with_warehouse) - - @step - def test_publish_pandas_with_warehouse(self): - """Publish using a specific warehouse. - - This hits _execute_sql via: - - USE WAREHOUSE ... - - ALTER SESSION SET QUERY_TAG = ... - """ - df = pd.DataFrame( - { - "id": [10, 20], - "name": ["x", "y"], - "created_at": [datetime.now(pytz.UTC)] * 2, - } - ) - - publish_pandas( - table_name="RUN_SQL_ITG_PANDAS_WH", - df=df, - auto_create_table=True, - overwrite=True, - warehouse="OUTERBOUNDS_DATA_SCIENCE_SHARED_DEV_XS_WH", - ) - self.next(self.test_publish_with_wap) - - @step - def test_publish_with_wap(self): - """Test publish() which uses write_audit_publish under the hood. - - This ensures publish still works when _execute_sql is used for session setup elsewhere. - """ - # Very simple WAP query: just recreate the table from itself - query = """ - CREATE OR REPLACE TABLE PATTERN_DB.{{schema}}.{{table_name}} AS - SELECT * FROM PATTERN_DB.{{schema}}.RUN_SQL_ITG_PANDAS_WH; - """ - - publish( - table_name="RUN_SQL_ITG_PUBLISH", - query=query, - warehouse="OUTERBOUNDS_DATA_SCIENCE_SHARED_DEV_XS_WH", - ) - - self.next(self.test_query_pandas_multi_statement) - - @step - def test_query_pandas_multi_statement(self): - """Test query_pandas_from_snowflake with a multi-statement SQL. - - We rely on _execute_sql() internally (execute_string) and ensure we get - the result of the *last* statement. - """ - multi_stmt_query = """ - -- create a temp table - CREATE OR REPLACE TEMP TABLE PATTERN_DB.{{schema}}._RUN_SQL_ITG_TMP AS - SELECT 42 AS answer; - - -- final select (this should be the last cursor) - SELECT * FROM PATTERN_DB.{{schema}}._RUN_SQL_ITG_TMP; - """ - - df = query_pandas_from_snowflake( - multi_stmt_query, - warehouse="OUTERBOUNDS_DATA_SCIENCE_SHARED_DEV_XS_WH", - ) - - # We expect the last statement's result: a single row with answer = 42 - assert len(df) == 1 - assert "answer" in df.columns - assert df["answer"].iloc[0] == 42 - - self.next(self.test_execute_sql_edge_cases) - - @step - def test_execute_sql_edge_cases(self): - """Directly test _execute_sql() with different SQL patterns (edge cases). - - - empty string - - whitespace only - - comments only - - simple single statement - - simple multi-statement - """ - conn = get_snowflake_connection(use_utc=True) - - # 1) Empty string → no statements executed → None - cursor = _execute_sql(conn, "") - assert cursor is None - - # 2) Whitespace only → also effectively nothing - cursor = _execute_sql(conn, " \n\t ") - assert cursor is None - - # 3) Single statement (SELECT) - cursor = _execute_sql(conn, "SELECT 1 AS x;") - assert cursor is not None - rows = cursor.fetchall() - assert len(rows) == 1 - assert rows[0][0] == 1 # x == 1 - - # 4) Multi-statement: ensure we get cursor for *last* stmt - cursor = _execute_sql(conn, "SELECT 1 AS x; SELECT 2 AS x;") - assert cursor is not None - rows = cursor.fetchall() - # Last cursor should correspond to 'SELECT 2 AS x;' - assert len(rows) == 1 - assert rows[0][0] == 2 - - self.next(self.end) - - @step - def end(self): - """End of flow.""" - pass - - -if __name__ == "__main__": - TestExecuteSqlIntegrationFlow() - - -@pytest.mark.slow -def test_execute_sql_integration_flow(): - """Run the _execute_sql integration flow via Metaflow CLI.""" - cmd = [sys.executable, __file__, "--environment=local", "--with=card", "run"] - - print("\n=== Metaflow Output ===") - for line in execute_with_output(cmd): - print(line, end="") - - -def execute_with_output(cmd): - """Execute a command and yield output lines as they are produced.""" - process = subprocess.Popen( - cmd, - stdout=subprocess.PIPE, - stderr=subprocess.STDOUT, - universal_newlines=True, - bufsize=1, - ) - - for line in iter(process.stdout.readline, ""): - yield line - - process.stdout.close() - return_code = process.wait() - if return_code: - raise subprocess.CalledProcessError(return_code, cmd) diff --git a/tests/functional_tests/snowflake/test__utils.py b/tests/functional_tests/snowflake/test__utils.py deleted file mode 100644 index 021edc0..0000000 --- a/tests/functional_tests/snowflake/test__utils.py +++ /dev/null @@ -1,144 +0,0 @@ -"""Unit tests for shared utility functions.""" - -from unittest.mock import MagicMock, Mock - -from ds_platform_utils._snowflake.shared import _execute_sql - - -class TestRunSql: - """Test suite for _execute_sql utility function.""" - - def test_returns_last_cursor_with_multiple_statements(self): - """Test that _execute_sql returns the last cursor when multiple SQL statements are executed.""" - # Setup - mock_conn = Mock() - cursor1 = Mock() - cursor2 = Mock() - cursor3 = Mock() - mock_conn.execute_string.return_value = [cursor1, cursor2, cursor3] - - sql = "SELECT 1; SELECT 2; SELECT 3;" - - # Execute - result = _execute_sql(mock_conn, sql) - - # Verify - mock_conn.execute_string.assert_called_once_with(sql) - assert result is cursor3 - - def test_returns_single_cursor_with_single_statement(self): - """Test that _execute_sql returns the cursor when a single SQL statement is executed.""" - # Setup - mock_conn = Mock() - cursor = Mock() - mock_conn.execute_string.return_value = [cursor] - - sql = "SELECT * FROM table;" - - # Execute - result = _execute_sql(mock_conn, sql) - - # Verify - mock_conn.execute_string.assert_called_once_with(sql) - assert result is cursor - - def test_handles_empty_sql_string(self): - """Test that _execute_sql correctly handles empty SQL strings.""" - # Setup - mock_conn = Mock() - mock_conn.execute_string.return_value = [] - - sql = "" - - # Execute - result = _execute_sql(mock_conn, sql) - - # Verify - mock_conn.execute_string.assert_called_once_with(sql) - assert result is None - - def test_handles_whitespace_only_sql(self): - """Test that _execute_sql correctly handles SQL strings with only whitespace.""" - # Setup - mock_conn = Mock() - mock_conn.execute_string.return_value = [] - - sql = " \n\t " - - # Execute - result = _execute_sql(mock_conn, sql) - - # Verify - mock_conn.execute_string.assert_called_once_with(sql) - assert result is None - - def test_handles_sql_with_comments_only(self): - """Test that _execute_sql correctly handles SQL with only comments.""" - # Setup - mock_conn = Mock() - mock_conn.execute_string.return_value = [] - - sql = "-- This is a comment\n/* This is another comment */" - - # Execute - result = _execute_sql(mock_conn, sql) - - # Verify - mock_conn.execute_string.assert_called_once_with(sql) - assert result is None - - def test_preserves_cursor_iteration_order(self): - """Test that _execute_sql iterates through all cursors in order.""" - # Setup - mock_conn = Mock() - cursors = [Mock(name=f"cursor{i}") for i in range(5)] - mock_conn.execute_string.return_value = cursors - - sql = "SELECT 1; SELECT 2; SELECT 3; SELECT 4; SELECT 5;" - - # Execute - result = _execute_sql(mock_conn, sql) - - # Verify - mock_conn.execute_string.assert_called_once_with(sql) - assert result is cursors[-1] - - def test_passes_connection_object_correctly(self): - """Test that _execute_sql correctly uses the provided connection object.""" - # Setup - mock_conn = MagicMock() - cursor = Mock() - mock_conn.execute_string.return_value = [cursor] - - sql = "SELECT * FROM table;" - - # Execute - result = _execute_sql(mock_conn, sql) - - # Verify - mock_conn.execute_string.assert_called_once_with(sql) - assert result is cursor - - def test_handles_complex_multistatement_sql(self): - """Test that _execute_sql handles complex multi-statement SQL with various statement types.""" - # Setup - mock_conn = Mock() - cursor1 = Mock() - cursor2 = Mock() - cursor3 = Mock() - cursor4 = Mock() - mock_conn.execute_string.return_value = [cursor1, cursor2, cursor3, cursor4] - - sql = """ - CREATE TABLE temp_table AS SELECT * FROM source; - INSERT INTO target_table SELECT * FROM temp_table; - UPDATE target_table SET status = 'processed'; - DROP TABLE temp_table; - """ - - # Execute - result = _execute_sql(mock_conn, sql) - - # Verify - mock_conn.execute_string.assert_called_once_with(sql) - assert result is cursor4 diff --git a/tests/unit_tests/snowflake/test__execute_sql.py b/tests/unit_tests/snowflake/test__execute_sql.py new file mode 100644 index 0000000..4816841 --- /dev/null +++ b/tests/unit_tests/snowflake/test__execute_sql.py @@ -0,0 +1,59 @@ +"""Functional test for _execute_sql.""" + +from typing import Generator + +import pytest +from snowflake.connector import SnowflakeConnection + +from ds_platform_utils._snowflake.run_query import _execute_sql +from ds_platform_utils.metaflow.get_snowflake_connection import get_snowflake_connection + + +@pytest.fixture(scope="module") +def snowflake_conn() -> Generator[SnowflakeConnection, None, None]: + """Get a Snowflake connection for testing.""" + yield get_snowflake_connection(use_utc=True) + + +def test_execute_sql_empty_string(snowflake_conn): + """Empty string returns None.""" + cursor = _execute_sql(snowflake_conn, "") + assert cursor is None + + +def test_execute_sql_whitespace_only(snowflake_conn): + """Whitespace-only string returns None.""" + cursor = _execute_sql(snowflake_conn, " \n\t ") + assert cursor is None + + +def test_execute_sql_only_semicolons(snowflake_conn): + """String with only semicolons returns None and raises warning.""" + with pytest.warns(UserWarning, match="Empty SQL statement encountered"): + cursor = _execute_sql(snowflake_conn, " ; ;") + assert cursor is None + + +def test_execute_sql_only_comments(snowflake_conn): + """String with only comments returns None and raises warning.""" + with pytest.warns(UserWarning, match="Empty SQL statement encountered"): + cursor = _execute_sql(snowflake_conn, "/* only comments */") + assert cursor is None + + +def test_execute_sql_single_statement(snowflake_conn): + """Single statement returns cursor with expected result.""" + cursor = _execute_sql(snowflake_conn, "SELECT 1 AS x;") + assert cursor is not None + rows = cursor.fetchall() + assert len(rows) == 1 + assert rows[0][0] == 1 + + +def test_execute_sql_multi_statement(snowflake_conn): + """Multi-statement returns cursor for last statement only.""" + cursor = _execute_sql(snowflake_conn, "SELECT 1 AS x; SELECT 2 AS x;") + assert cursor is not None + rows = cursor.fetchall() + assert len(rows) == 1 + assert rows[0][0] == 2 # Last statement result From dcae2a55e6614536f7d879383f2b66e0b9a30c98 Mon Sep 17 00:00:00 2001 From: avr2002 Date: Thu, 4 Dec 2025 12:36:47 +0530 Subject: [PATCH 21/21] fix: correct workflow name --- .github/workflows/ci-cd-ds-platform-utils.yaml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/.github/workflows/ci-cd-ds-platform-utils.yaml b/.github/workflows/ci-cd-ds-platform-utils.yaml index f620ff8..4a7031f 100644 --- a/.github/workflows/ci-cd-ds-platform-utils.yaml +++ b/.github/workflows/ci-cd-ds-platform-utils.yaml @@ -1,4 +1,4 @@ -name: Publish DS Projen +name: Publish DS Platform Utils on: workflow_dispatch: @@ -16,7 +16,7 @@ jobs: - name: Checkout Repository uses: actions/checkout@v4 with: - fetch-depth: 0 # Fetch all history for version tagging + fetch-depth: 0 # Fetch all history for version tagging - name: Set up uv uses: astral-sh/setup-uv@v5 @@ -44,7 +44,7 @@ jobs: cache-dependency-glob: "${{ github.workspace }}/uv.lock" - name: Run pre-commit hooks - run: SKIP=no-commit-to-branch uv run poe lint # using poethepoet needs to be setup before using poe lint + run: SKIP=no-commit-to-branch uv run poe lint # using poethepoet needs to be setup before using poe lint build-wheel: name: Build Wheel