Skip to content

Commit cffdc1f

Browse files
hashwnathclaude
andcommitted
Fix mypy errors, ruff formatting, and add requested tests
Fix QueryOrDF import path, replace reduce with exp.and_ for type safety, add 3-column composite key test and expression-based composite key test via insert_overwrite_by_partition. Signed-off-by: Hashwanth S <s.hashwanth531@gmail.com> Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> Signed-off-by: Hashwanth S <s.hashwanth531@gmail.com>
1 parent a6907c0 commit cffdc1f

2 files changed

Lines changed: 82 additions & 14 deletions

File tree

sqlmesh/core/engine_adapter/mysql.py

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,6 @@
22

33
import logging
44
import typing as t
5-
from functools import reduce
6-
75
from sqlglot import exp, parse_one
86

97
from sqlmesh.core.dialect import to_schema
@@ -22,7 +20,8 @@
2220
)
2321

2422
if t.TYPE_CHECKING:
25-
from sqlmesh.core._typing import QueryOrDF, SchemaName, TableName
23+
from sqlmesh.core._typing import SchemaName, TableName
24+
from sqlmesh.core.engine_adapter._typing import QueryOrDF
2625

2726
logger = logging.getLogger(__name__)
2827

@@ -198,7 +197,12 @@ def _replace_by_key(
198197
) -> None:
199198
if len(key) <= 1:
200199
return super()._replace_by_key(
201-
target_table, source_table, target_columns_to_types, key, is_unique_key, source_columns
200+
target_table,
201+
source_table,
202+
target_columns_to_types,
203+
key,
204+
is_unique_key,
205+
source_columns,
202206
)
203207

204208
if target_columns_to_types is None:
@@ -222,14 +226,13 @@ def _replace_by_key(
222226
try:
223227
# Build a JOIN-based DELETE instead of using CONCAT_WS.
224228
# CONCAT_WS prevents MySQL/MariaDB from using indexes, causing full table scans.
225-
on_condition = reduce(
226-
lambda a, b: exp.And(this=a, expression=b),
227-
[
229+
on_condition = exp.and_(
230+
*[
228231
self._qualify_columns(k, target_alias).eq(
229232
self._qualify_columns(k, temp_alias)
230233
)
231234
for k in key
232-
],
235+
]
233236
)
234237

235238
target_table_aliased = exp.to_table(target_table).as_(target_alias, quoted=True)

tests/core/engine_adapter/test_mysql.py

Lines changed: 71 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -92,9 +92,7 @@ def test_replace_by_key_composite_uses_join_delete(
9292
):
9393
"""Composite key DELETE uses JOIN instead of CONCAT_WS to allow index usage."""
9494
adapter = make_mocked_engine_adapter(MySQLEngineAdapter)
95-
temp_table_mock = mocker.patch(
96-
"sqlmesh.core.engine_adapter.base.EngineAdapter._get_temp_table"
97-
)
95+
temp_table_mock = mocker.patch("sqlmesh.core.engine_adapter.base.EngineAdapter._get_temp_table")
9896
temp_table_mock.return_value = exp.to_table("temporary")
9997

10098
adapter.merge(
@@ -135,14 +133,81 @@ def test_replace_by_key_composite_uses_join_delete(
135133
)
136134

137135

136+
def test_replace_by_key_three_column_composite_key(
137+
make_mocked_engine_adapter: t.Callable, mocker: MockerFixture
138+
):
139+
"""3-column composite key matching the original issue scenario (#5711)."""
140+
adapter = make_mocked_engine_adapter(MySQLEngineAdapter)
141+
temp_table_mock = mocker.patch("sqlmesh.core.engine_adapter.base.EngineAdapter._get_temp_table")
142+
temp_table_mock.return_value = exp.to_table("temporary")
143+
144+
adapter.merge(
145+
target_table="target",
146+
source_table=t.cast(exp.Select, parse_one("SELECT id, region, ts, val FROM source")),
147+
target_columns_to_types={
148+
"id": exp.DataType(this=exp.DataType.Type.INT),
149+
"region": exp.DataType(this=exp.DataType.Type.VARCHAR),
150+
"ts": exp.DataType(this=exp.DataType.Type.TIMESTAMP),
151+
"val": exp.DataType(this=exp.DataType.Type.INT),
152+
},
153+
unique_key=[parse_one("id"), parse_one("region"), parse_one("ts")],
154+
)
155+
156+
sql_calls = to_sql_calls(adapter)
157+
158+
assert any("CONCAT_WS" in s for s in sql_calls) is False
159+
assert any("INNER JOIN" in s for s in sql_calls) is True
160+
161+
adapter.cursor.execute.assert_has_calls(
162+
[
163+
call(
164+
"DELETE `_target` FROM `target` AS `_target` INNER JOIN `temporary` AS `_temp` ON `_target`.`id` = `_temp`.`id` AND `_target`.`region` = `_temp`.`region` AND `_target`.`ts` = `_temp`.`ts`"
165+
),
166+
]
167+
)
168+
169+
170+
def test_replace_by_key_expression_based_composite_key(
171+
make_mocked_engine_adapter: t.Callable, mocker: MockerFixture
172+
):
173+
"""Expression-based composite keys (e.g. DATE_TRUNC + column) via insert_overwrite_by_partition."""
174+
adapter = make_mocked_engine_adapter(MySQLEngineAdapter)
175+
temp_table_mock = mocker.patch("sqlmesh.core.engine_adapter.base.EngineAdapter._get_temp_table")
176+
temp_table_mock.return_value = exp.to_table("temporary")
177+
178+
adapter.insert_overwrite_by_partition(
179+
table_name="target",
180+
query_or_df=t.cast(exp.Select, parse_one("SELECT id, ts, val FROM source")),
181+
partitioned_by=[
182+
parse_one("DATE_TRUNC('month', ts)"),
183+
parse_one("id"),
184+
],
185+
target_columns_to_types={
186+
"id": exp.DataType(this=exp.DataType.Type.INT),
187+
"ts": exp.DataType(this=exp.DataType.Type.TIMESTAMP),
188+
"val": exp.DataType(this=exp.DataType.Type.INT),
189+
},
190+
)
191+
192+
sql_calls = to_sql_calls(adapter)
193+
194+
assert any("CONCAT_WS" in s for s in sql_calls) is False
195+
assert any("INNER JOIN" in s for s in sql_calls) is True
196+
197+
# DATE_TRUNC transpiles to STR_TO_DATE in MySQL; both key parts should be qualified
198+
delete_sql = [s for s in sql_calls if "DELETE" in s][0]
199+
assert "`_target`.`ts`" in delete_sql
200+
assert "`_temp`.`ts`" in delete_sql
201+
assert "`_target`.`id`" in delete_sql
202+
assert "`_temp`.`id`" in delete_sql
203+
204+
138205
def test_replace_by_key_single_key_uses_in(
139206
make_mocked_engine_adapter: t.Callable, mocker: MockerFixture
140207
):
141208
"""Single key DELETE still uses the IN-based approach (indexes work fine for single column)."""
142209
adapter = make_mocked_engine_adapter(MySQLEngineAdapter)
143-
temp_table_mock = mocker.patch(
144-
"sqlmesh.core.engine_adapter.base.EngineAdapter._get_temp_table"
145-
)
210+
temp_table_mock = mocker.patch("sqlmesh.core.engine_adapter.base.EngineAdapter._get_temp_table")
146211
temp_table_mock.return_value = exp.to_table("temporary")
147212

148213
adapter.merge(

0 commit comments

Comments
 (0)