Skip to content

Commit 665a655

Browse files
Merge branch 'main' into fix/get-data-objects
2 parents b8b983a + 4c5a6d3 commit 665a655

File tree

109 files changed

+1106
-477
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

109 files changed

+1106
-477
lines changed

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ dependencies = [
2424
"requests",
2525
"rich[jupyter]",
2626
"ruamel.yaml",
27-
"sqlglot[rs]~=27.24.2",
27+
"sqlglot[rs]~=27.27.0",
2828
"tenacity",
2929
"time-machine",
3030
"json-stream"

sqlmesh/__init__.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -188,15 +188,23 @@ def configure_logging(
188188
write_to_file: bool = True,
189189
log_file_dir: t.Optional[t.Union[str, Path]] = None,
190190
ignore_warnings: bool = False,
191+
log_level: t.Optional[t.Union[str, int]] = None,
191192
) -> None:
192193
# Remove noisy grpc logs that are not useful for users
193194
os.environ["GRPC_VERBOSITY"] = os.environ.get("GRPC_VERBOSITY", "NONE")
194195

195196
logger = logging.getLogger()
196197
debug = force_debug or debug_mode_enabled()
197198

198-
# base logger needs to be the lowest level that we plan to log
199-
level = logging.DEBUG if debug else logging.INFO
199+
if log_level is not None:
200+
if isinstance(log_level, str):
201+
level = logging._nameToLevel.get(log_level.upper()) or logging.INFO
202+
else:
203+
level = log_level
204+
else:
205+
# base logger needs to be the lowest level that we plan to log
206+
level = logging.DEBUG if debug else logging.INFO
207+
200208
logger.setLevel(level)
201209

202210
if debug:

sqlmesh/core/config/connection.py

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@
5858
"clickhouse",
5959
}
6060
MOTHERDUCK_TOKEN_REGEX = re.compile(r"(\?|\&)(motherduck_token=)(\S*)")
61+
PASSWORD_REGEX = re.compile(r"(password=)(\S+)")
6162

6263

6364
def _get_engine_import_validator(
@@ -479,13 +480,13 @@ def create_engine_adapter(
479480
adapter = BaseDuckDBConnectionConfig._data_file_to_adapter.get(key)
480481
if adapter is not None:
481482
logger.info(
482-
f"Using existing DuckDB adapter due to overlapping data file: {self._mask_motherduck_token(key)}"
483+
f"Using existing DuckDB adapter due to overlapping data file: {self._mask_sensitive_data(key)}"
483484
)
484485
return adapter
485486

486487
if data_files:
487488
masked_files = {
488-
self._mask_motherduck_token(file if isinstance(file, str) else file.path)
489+
self._mask_sensitive_data(file if isinstance(file, str) else file.path)
489490
for file in data_files
490491
}
491492
logger.info(f"Creating new DuckDB adapter for data files: {masked_files}")
@@ -507,10 +508,14 @@ def get_catalog(self) -> t.Optional[str]:
507508
return list(self.catalogs)[0]
508509
return None
509510

510-
def _mask_motherduck_token(self, string: str) -> str:
511-
return MOTHERDUCK_TOKEN_REGEX.sub(
512-
lambda m: f"{m.group(1)}{m.group(2)}{'*' * len(m.group(3))}", string
511+
def _mask_sensitive_data(self, string: str) -> str:
512+
# Mask MotherDuck tokens with fixed number of asterisks
513+
result = MOTHERDUCK_TOKEN_REGEX.sub(
514+
lambda m: f"{m.group(1)}{m.group(2)}{'*' * 8 if m.group(3) else ''}", string
513515
)
516+
# Mask PostgreSQL/MySQL passwords with fixed number of asterisks
517+
result = PASSWORD_REGEX.sub(lambda m: f"{m.group(1)}{'*' * 8}", result)
518+
return result
514519

515520

516521
class MotherDuckConnectionConfig(BaseDuckDBConnectionConfig):

sqlmesh/core/config/loader.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -178,6 +178,7 @@ def load_config_from_paths(
178178

179179
dbt_python_config = sqlmesh_config(
180180
project_root=dbt_project_file.parent,
181+
profiles_dir=kwargs.pop("profiles_dir", None),
181182
dbt_profile_name=kwargs.pop("profile", None),
182183
dbt_target_name=kwargs.pop("target", None),
183184
variables=variables,

sqlmesh/core/context.py

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,7 @@
139139
)
140140
from sqlmesh.utils.config import print_config
141141
from sqlmesh.utils.jinja import JinjaMacroRegistry
142+
from sqlmesh.utils.windows import IS_WINDOWS, fix_windows_path
142143

143144
if t.TYPE_CHECKING:
144145
import pandas as pd
@@ -2590,12 +2591,15 @@ def table_name(
25902591
)
25912592

25922593
def clear_caches(self) -> None:
2593-
for path in self.configs:
2594-
cache_path = path / c.CACHE
2595-
if cache_path.exists():
2596-
rmtree(cache_path)
2597-
if self.cache_dir.exists():
2598-
rmtree(self.cache_dir)
2594+
paths_to_remove = [path / c.CACHE for path in self.configs]
2595+
paths_to_remove.append(self.cache_dir)
2596+
2597+
if IS_WINDOWS:
2598+
paths_to_remove = [fix_windows_path(path) for path in paths_to_remove]
2599+
2600+
for path in paths_to_remove:
2601+
if path.exists():
2602+
rmtree(path)
25992603

26002604
if isinstance(self._state_sync, CachingStateSync):
26012605
self._state_sync.clear_cache()

sqlmesh/core/engine_adapter/base.py

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818

1919
from sqlglot import Dialect, exp
2020
from sqlglot.errors import ErrorLevel
21-
from sqlglot.helper import ensure_list
21+
from sqlglot.helper import ensure_list, seq_get
2222
from sqlglot.optimizer.qualify_columns import quote_identifiers
2323

2424
from sqlmesh.core.dialect import (
@@ -551,11 +551,13 @@ def replace_query(
551551
target_table,
552552
source_queries,
553553
target_columns_to_types,
554+
**kwargs,
554555
)
555556
return self._insert_overwrite_by_condition(
556557
target_table,
557558
source_queries,
558559
target_columns_to_types,
560+
**kwargs,
559561
)
560562

561563
def create_index(
@@ -1614,7 +1616,7 @@ def _insert_overwrite_by_time_partition(
16141616
**kwargs: t.Any,
16151617
) -> None:
16161618
return self._insert_overwrite_by_condition(
1617-
table_name, source_queries, target_columns_to_types, where
1619+
table_name, source_queries, target_columns_to_types, where, **kwargs
16181620
)
16191621

16201622
def _values_to_sql(
@@ -1772,7 +1774,7 @@ def scd_type_2_by_column(
17721774
valid_from_col: exp.Column,
17731775
valid_to_col: exp.Column,
17741776
execution_time: t.Union[TimeLike, exp.Column],
1775-
check_columns: t.Union[exp.Star, t.Sequence[exp.Column]],
1777+
check_columns: t.Union[exp.Star, t.Sequence[exp.Expression]],
17761778
invalidate_hard_deletes: bool = True,
17771779
execution_time_as_valid_from: bool = False,
17781780
target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None,
@@ -1810,7 +1812,7 @@ def _scd_type_2(
18101812
execution_time: t.Union[TimeLike, exp.Column],
18111813
invalidate_hard_deletes: bool = True,
18121814
updated_at_col: t.Optional[exp.Column] = None,
1813-
check_columns: t.Optional[t.Union[exp.Star, t.Sequence[exp.Column]]] = None,
1815+
check_columns: t.Optional[t.Union[exp.Star, t.Sequence[exp.Expression]]] = None,
18141816
updated_at_as_valid_from: bool = False,
18151817
execution_time_as_valid_from: bool = False,
18161818
target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None,
@@ -1885,8 +1887,10 @@ def remove_managed_columns(
18851887
# they are equal or not, the extra check is not a problem and we gain simplified logic here.
18861888
# If we want to change this, then we just need to check the expressions in unique_key and pull out the
18871889
# column names and then remove them from the unmanaged_columns
1888-
if check_columns and check_columns == exp.Star():
1889-
check_columns = [exp.column(col) for col in unmanaged_columns_to_types]
1890+
if check_columns:
1891+
# Handle both Star directly and [Star()] (which can happen during serialization/deserialization)
1892+
if isinstance(seq_get(ensure_list(check_columns), 0), exp.Star):
1893+
check_columns = [exp.column(col) for col in unmanaged_columns_to_types]
18901894
execution_ts = (
18911895
exp.cast(execution_time, time_data_type, dialect=self.dialect)
18921896
if isinstance(execution_time, exp.Column)
@@ -1923,7 +1927,8 @@ def remove_managed_columns(
19231927
col_qualified.set("table", exp.to_identifier("joined"))
19241928

19251929
t_col = col_qualified.copy()
1926-
t_col.this.set("this", f"t_{col.name}")
1930+
for column in t_col.find_all(exp.Column):
1931+
column.this.set("this", f"t_{column.name}")
19271932

19281933
row_check_conditions.extend(
19291934
[

sqlmesh/core/engine_adapter/mssql.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ class MSSQLEngineAdapter(
5656
COMMENT_CREATION_TABLE = CommentCreationTable.UNSUPPORTED
5757
COMMENT_CREATION_VIEW = CommentCreationView.UNSUPPORTED
5858
SUPPORTS_REPLACE_TABLE = False
59+
MAX_IDENTIFIER_LENGTH = 128
5960
SUPPORTS_QUERY_EXECUTION_TRACKING = True
6061
SCHEMA_DIFFER_KWARGS = {
6162
"parameterized_type_defaults": {
@@ -422,7 +423,9 @@ def _insert_overwrite_by_condition(
422423
insert_overwrite_strategy_override: t.Optional[InsertOverwriteStrategy] = None,
423424
**kwargs: t.Any,
424425
) -> None:
425-
if not where or where == exp.true():
426+
# note that this is passed as table_properties here rather than physical_properties
427+
use_merge_strategy = kwargs.get("table_properties", {}).get("mssql_merge_exists")
428+
if (not where or where == exp.true()) and not use_merge_strategy:
426429
# this is a full table replacement, call the base strategy to do DELETE+INSERT
427430
# which will result in TRUNCATE+INSERT due to how we have overridden self.delete_from()
428431
return EngineAdapter._insert_overwrite_by_condition(
@@ -435,7 +438,7 @@ def _insert_overwrite_by_condition(
435438
**kwargs,
436439
)
437440

438-
# For actual conditional overwrites, use MERGE from InsertOverwriteWithMergeMixin
441+
# For conditional overwrites or when mssql_merge_exists is set use MERGE
439442
return super()._insert_overwrite_by_condition(
440443
table_name=table_name,
441444
source_queries=source_queries,

sqlmesh/core/engine_adapter/trino.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -302,7 +302,7 @@ def _scd_type_2(
302302
execution_time: t.Union[TimeLike, exp.Column],
303303
invalidate_hard_deletes: bool = True,
304304
updated_at_col: t.Optional[exp.Column] = None,
305-
check_columns: t.Optional[t.Union[exp.Star, t.Sequence[exp.Column]]] = None,
305+
check_columns: t.Optional[t.Union[exp.Star, t.Sequence[exp.Expression]]] = None,
306306
updated_at_as_valid_from: bool = False,
307307
execution_time_as_valid_from: bool = False,
308308
target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None,

sqlmesh/core/model/kind.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
PydanticModel,
2424
SQLGlotBool,
2525
SQLGlotColumn,
26-
SQLGlotListOfColumnsOrStar,
26+
SQLGlotListOfFieldsOrStar,
2727
SQLGlotListOfFields,
2828
SQLGlotPositiveInt,
2929
SQLGlotString,
@@ -852,7 +852,7 @@ def to_expression(
852852

853853
class SCDType2ByColumnKind(_SCDType2Kind):
854854
name: t.Literal[ModelKindName.SCD_TYPE_2_BY_COLUMN] = ModelKindName.SCD_TYPE_2_BY_COLUMN
855-
columns: SQLGlotListOfColumnsOrStar
855+
columns: SQLGlotListOfFieldsOrStar
856856
execution_time_as_valid_from: SQLGlotBool = False
857857
updated_at_name: t.Optional[SQLGlotColumn] = None
858858

sqlmesh/core/snapshot/evaluator.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2188,7 +2188,13 @@ def _get_target_and_source_columns(
21882188
if model.on_destructive_change.is_ignore or model.on_additive_change.is_ignore:
21892189
# We need to identify the columns that are only in the source so we create an empty table with
21902190
# the user query to determine that
2191-
with self.adapter.temp_table(model.ctas_query(**render_kwargs)) as temp_table:
2191+
temp_table_name = exp.table_(
2192+
"diff",
2193+
db=model.physical_schema,
2194+
)
2195+
with self.adapter.temp_table(
2196+
model.ctas_query(**render_kwargs), name=temp_table_name
2197+
) as temp_table:
21922198
source_columns = list(self.adapter.columns(temp_table))
21932199
else:
21942200
source_columns = None

0 commit comments

Comments
 (0)