Skip to content

Commit 2bdfabf

Browse files
Merge branch 'main' into vchan/fix-macro-dispatch
2 parents c90674d + 7cfddd8 commit 2bdfabf

File tree

91 files changed

+2837
-619
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

91 files changed

+2837
-619
lines changed

docs/integrations/engines/trino.md

Lines changed: 34 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -81,19 +81,21 @@ hive.metastore.glue.default-warehouse-dir=s3://my-bucket/
8181

8282
### Connection options
8383

84-
| Option | Description | Type | Required |
85-
|----------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------|:------:|:--------:|
86-
| `type` | Engine type name - must be `trino` | string | Y |
87-
| `user` | The username (of the account) to log in to your cluster. When connecting to Starburst Galaxy clusters, you must include the role of the user as a suffix to the username. | string | Y |
88-
| `host` | The hostname of your cluster. Don't include the `http://` or `https://` prefix. | string | Y |
89-
| `catalog` | The name of a catalog in your cluster. | string | Y |
90-
| `http_scheme` | The HTTP scheme to use when connecting to your cluster. By default, it's `https` and can only be `http` for no-auth or basic auth. | string | N |
91-
| `port` | The port to connect to your cluster. By default, it's `443` for `https` scheme and `80` for `http` | int | N |
92-
| `roles` | Mapping of catalog name to a role | dict | N |
93-
| `http_headers` | Additional HTTP headers to send with each request. | dict | N |
94-
| `session_properties` | Trino session properties. Run `SHOW SESSION` to see all options. | dict | N |
95-
| `retries` | Number of retries to attempt when a request fails. Default: `3` | int | N |
96-
| `timezone` | Timezone to use for the connection. Default: client-side local timezone | string | N |
84+
| Option | Description | Type | Required |
85+
|---------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|:------:|:--------:|
86+
| `type` | Engine type name - must be `trino` | string | Y |
87+
| `user` | The username (of the account) to log in to your cluster. When connecting to Starburst Galaxy clusters, you must include the role of the user as a suffix to the username. | string | Y |
88+
| `host` | The hostname of your cluster. Don't include the `http://` or `https://` prefix. | string | Y |
89+
| `catalog` | The name of a catalog in your cluster. | string | Y |
90+
| `http_scheme` | The HTTP scheme to use when connecting to your cluster. By default, it's `https` and can only be `http` for no-auth or basic auth. | string | N |
91+
| `port` | The port to connect to your cluster. By default, it's `443` for `https` scheme and `80` for `http` | int | N |
92+
| `roles` | Mapping of catalog name to a role | dict | N |
93+
| `http_headers` | Additional HTTP headers to send with each request. | dict | N |
94+
| `session_properties` | Trino session properties. Run `SHOW SESSION` to see all options. | dict | N |
95+
| `retries` | Number of retries to attempt when a request fails. Default: `3` | int | N |
96+
| `timezone` | Timezone to use for the connection. Default: client-side local timezone | string | N |
97+
| `schema_location_mapping` | A mapping of regex patterns to S3 locations to use for the `LOCATION` property when creating schemas. See [Table and Schema locations](#table-and-schema-locations) for more details. | dict | N |
98+
| `catalog_type_overrides` | A mapping of catalog names to their connector type. This is used to enable/disable connector specific behavior. See [Catalog Type Overrides](#catalog-type-overrides) for more details. | dict | N |
9799

98100
## Table and Schema locations
99101

@@ -204,6 +206,25 @@ SELECT ...
204206

205207
This will cause SQLMesh to set the specified `LOCATION` when issuing a `CREATE TABLE` statement.
206208

209+
## Catalog Type Overrides
210+
211+
SQLMesh attempts to determine the connector type of a catalog by querying the `system.metadata.catalogs` table and checking the `connector_name` column.
212+
It checks if the connector name is `hive` for Hive connector behavior or contains `iceberg` or `delta_lake` for Iceberg or Delta Lake connector behavior respectively.
213+
However, the connector name may not always be a reliable way to determine the connector type, for example when using a custom connector or a fork of an existing connector.
214+
To handle such cases, you can use the `catalog_type_overrides` connection property to explicitly specify the connector type for specific catalogs.
215+
For example, to specify that the `datalake` catalog is using the Iceberg connector and the `analytics` catalog is using the Hive connector, you can configure the connection as follows:
216+
217+
```yaml title="config.yaml"
218+
gateways:
219+
trino:
220+
connection:
221+
type: trino
222+
...
223+
catalog_type_overrides:
224+
datalake: iceberg
225+
analytics: hive
226+
```
227+
207228
## Authentication
208229

209230
=== "No Auth"

pyproject.toml

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ dependencies = [
2424
"requests",
2525
"rich[jupyter]",
2626
"ruamel.yaml",
27-
"sqlglot[rs]~=27.13.2",
27+
"sqlglot[rs]~=27.15.3",
2828
"tenacity",
2929
"time-machine",
3030
"json-stream"
@@ -56,7 +56,7 @@ dev = [
5656
"agate",
5757
"beautifulsoup4",
5858
"clickhouse-connect",
59-
"cryptography",
59+
"cryptography<46.0.0",
6060
"databricks-sql-connector",
6161
"dbt-bigquery",
6262
"dbt-core",
@@ -119,7 +119,7 @@ postgres = ["psycopg2"]
119119
redshift = ["redshift_connector"]
120120
slack = ["slack_sdk"]
121121
snowflake = [
122-
"cryptography",
122+
"cryptography<46.0.0",
123123
"snowflake-connector-python[pandas,secure-local-storage]",
124124
"snowflake-snowpark-python",
125125
]
@@ -135,7 +135,7 @@ lsp = [
135135
# Duplicate of web
136136
"fastapi==0.115.5",
137137
"watchfiles>=0.19.0",
138-
"uvicorn[standard]==0.22.0",
138+
# "uvicorn[standard]==0.22.0",
139139
"sse-starlette>=0.2.2",
140140
"pyarrow",
141141
# For lsp
@@ -225,7 +225,8 @@ module = [
225225
"pydantic_core.*",
226226
"dlt.*",
227227
"bigframes.*",
228-
"json_stream.*"
228+
"json_stream.*",
229+
"duckdb.*"
229230
]
230231
ignore_missing_imports = true
231232

sqlmesh/core/config/connection.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,7 @@ class ConnectionConfig(abc.ABC, BaseConfig):
101101
pre_ping: bool
102102
pretty_sql: bool = False
103103
schema_differ_overrides: t.Optional[t.Dict[str, t.Any]] = None
104+
catalog_type_overrides: t.Optional[t.Dict[str, str]] = None
104105

105106
# Whether to share a single connection across threads or create a new connection per thread.
106107
shared_connection: t.ClassVar[bool] = False
@@ -176,6 +177,7 @@ def create_engine_adapter(
176177
pretty_sql=self.pretty_sql,
177178
shared_connection=self.shared_connection,
178179
schema_differ_overrides=self.schema_differ_overrides,
180+
catalog_type_overrides=self.catalog_type_overrides,
179181
**self._extra_engine_config,
180182
)
181183

sqlmesh/core/console.py

Lines changed: 76 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -551,6 +551,22 @@ def log_skipped_models(self, snapshot_names: t.Set[str]) -> None:
551551
def log_failed_models(self, errors: t.List[NodeExecutionFailedError]) -> None:
552552
"""Display list of models that failed during evaluation to the user."""
553553

554+
@abc.abstractmethod
555+
def log_models_updated_during_restatement(
556+
self,
557+
snapshots: t.List[t.Tuple[SnapshotTableInfo, SnapshotTableInfo]],
558+
environment_naming_info: EnvironmentNamingInfo,
559+
default_catalog: t.Optional[str],
560+
) -> None:
561+
"""Display a list of models where new versions got deployed to the specified :environment while we were restating data the old versions
562+
563+
Args:
564+
snapshots: a list of (snapshot_we_restated, snapshot_it_got_replaced_with_during_restatement) tuples
565+
environment: which environment got updated while we were restating models
566+
environment_naming_info: how snapshots are named in that :environment (for display name purposes)
567+
default_catalog: the configured default catalog (for display name purposes)
568+
"""
569+
554570
@abc.abstractmethod
555571
def loading_start(self, message: t.Optional[str] = None) -> uuid.UUID:
556572
"""Starts loading and returns a unique ID that can be used to stop the loading. Optionally can display a message."""
@@ -771,6 +787,14 @@ def log_skipped_models(self, snapshot_names: t.Set[str]) -> None:
771787
def log_failed_models(self, errors: t.List[NodeExecutionFailedError]) -> None:
772788
pass
773789

790+
def log_models_updated_during_restatement(
791+
self,
792+
snapshots: t.List[t.Tuple[SnapshotTableInfo, SnapshotTableInfo]],
793+
environment_naming_info: EnvironmentNamingInfo,
794+
default_catalog: t.Optional[str],
795+
) -> None:
796+
pass
797+
774798
def log_destructive_change(
775799
self,
776800
snapshot_name: str,
@@ -1998,7 +2022,34 @@ def _prompt_categorize(
19982022
plan = plan_builder.build()
19992023

20002024
if plan.restatements:
2001-
self._print("\n[bold]Restating models\n")
2025+
# A plan can have restatements for the following reasons:
2026+
# - The user specifically called `sqlmesh plan` with --restate-model.
2027+
# This creates a "restatement plan" which disallows all other changes and simply force-backfills
2028+
# the selected models and their downstream dependencies using the versions of the models stored in state.
2029+
# - There are no specific restatements (so changes are allowed) AND dev previews need to be computed.
2030+
# The "restatements" feature is currently reused for dev previews.
2031+
if plan.selected_models_to_restate:
2032+
# There were legitimate restatements, no dev previews
2033+
tree = Tree(
2034+
"[bold]Models selected for restatement:[/bold]\n"
2035+
"This causes backfill of the model itself as well as affected downstream models"
2036+
)
2037+
model_fqn_to_snapshot = {s.name: s for s in plan.snapshots.values()}
2038+
for model_fqn in plan.selected_models_to_restate:
2039+
snapshot = model_fqn_to_snapshot[model_fqn]
2040+
display_name = snapshot.display_name(
2041+
plan.environment_naming_info,
2042+
default_catalog if self.verbosity < Verbosity.VERY_VERBOSE else None,
2043+
dialect=self.dialect,
2044+
)
2045+
tree.add(
2046+
display_name
2047+
) # note: we deliberately dont show any intervals here; they get shown in the backfill section
2048+
self._print(tree)
2049+
else:
2050+
# We are computing dev previews, do not confuse the user by printing out something to do
2051+
# with restatements. Dev previews are already highlighted in the backfill step
2052+
pass
20022053
else:
20032054
self.show_environment_difference_summary(
20042055
plan.context_diff,
@@ -2225,6 +2276,30 @@ def log_failed_models(self, errors: t.List[NodeExecutionFailedError]) -> None:
22252276
for node_name, msg in error_messages.items():
22262277
self._print(f" [red]{node_name}[/red]\n\n{msg}")
22272278

2279+
def log_models_updated_during_restatement(
2280+
self,
2281+
snapshots: t.List[t.Tuple[SnapshotTableInfo, SnapshotTableInfo]],
2282+
environment_naming_info: EnvironmentNamingInfo,
2283+
default_catalog: t.Optional[str] = None,
2284+
) -> None:
2285+
if snapshots:
2286+
tree = Tree(
2287+
f"[yellow]The following models had new versions deployed while data was being restated:[/yellow]"
2288+
)
2289+
2290+
for restated_snapshot, updated_snapshot in snapshots:
2291+
display_name = restated_snapshot.display_name(
2292+
environment_naming_info,
2293+
default_catalog if self.verbosity < Verbosity.VERY_VERBOSE else None,
2294+
dialect=self.dialect,
2295+
)
2296+
current_branch = tree.add(display_name)
2297+
current_branch.add(f"restated version: '{restated_snapshot.version}'")
2298+
current_branch.add(f"currently active version: '{updated_snapshot.version}'")
2299+
2300+
self._print(tree)
2301+
self._print("") # newline spacer
2302+
22282303
def log_destructive_change(
22292304
self,
22302305
snapshot_name: str,

sqlmesh/core/context.py

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1429,6 +1429,7 @@ def plan_builder(
14291429
explain: t.Optional[bool] = None,
14301430
ignore_cron: t.Optional[bool] = None,
14311431
min_intervals: t.Optional[int] = None,
1432+
always_include_local_changes: t.Optional[bool] = None,
14321433
) -> PlanBuilder:
14331434
"""Creates a plan builder.
14341435
@@ -1467,6 +1468,8 @@ def plan_builder(
14671468
diff_rendered: Whether the diff should compare raw vs rendered models
14681469
min_intervals: Adjust the plan start date on a per-model basis in order to ensure at least this many intervals are covered
14691470
on every model when checking for missing intervals
1471+
always_include_local_changes: Usually when restatements are present, local changes in the filesystem are ignored.
1472+
However, it can be desirable to deploy changes + restatements in the same plan, so this flag overrides the default behaviour.
14701473
14711474
Returns:
14721475
The plan builder.
@@ -1583,13 +1586,20 @@ def plan_builder(
15831586
"Selector did not return any models. Please check your model selection and try again."
15841587
)
15851588

1589+
if always_include_local_changes is None:
1590+
# default behaviour - if restatements are detected; we operate entirely out of state and ignore local changes
1591+
force_no_diff = restate_models is not None or (
1592+
backfill_models is not None and not backfill_models
1593+
)
1594+
else:
1595+
force_no_diff = not always_include_local_changes
1596+
15861597
snapshots = self._snapshots(models_override)
15871598
context_diff = self._context_diff(
15881599
environment or c.PROD,
15891600
snapshots=snapshots,
15901601
create_from=create_from,
1591-
force_no_diff=restate_models is not None
1592-
or (backfill_models is not None and not backfill_models),
1602+
force_no_diff=force_no_diff,
15931603
ensure_finalized_snapshots=self.config.plan.use_finalized_state,
15941604
diff_rendered=diff_rendered,
15951605
always_recreate_environment=self.config.plan.always_recreate_environment,
@@ -1644,13 +1654,22 @@ def plan_builder(
16441654
elif forward_only is None:
16451655
forward_only = self.config.plan.forward_only
16461656

1657+
# When handling prod restatements, only clear intervals from other model versions if we are using full virtual environments
1658+
# If we are not, then there is no point, because none of the data in dev environments can be promoted by definition
1659+
restate_all_snapshots = (
1660+
expanded_restate_models is not None
1661+
and not is_dev
1662+
and self.config.virtual_environment_mode.is_full
1663+
)
1664+
16471665
return self.PLAN_BUILDER_TYPE(
16481666
context_diff=context_diff,
16491667
start=start,
16501668
end=end,
16511669
execution_time=execution_time,
16521670
apply=self.apply,
16531671
restate_models=expanded_restate_models,
1672+
restate_all_snapshots=restate_all_snapshots,
16541673
backfill_models=backfill_models,
16551674
no_gaps=no_gaps,
16561675
skip_backfill=skip_backfill,

sqlmesh/core/dialect.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,7 @@ def _parse_id_var(
174174

175175
while (
176176
identifier
177+
and not identifier.args.get("quoted")
177178
and self._is_connected()
178179
and (
179180
self._match_texts(("{", SQLMESH_MACRO_PREFIX))
@@ -349,13 +350,15 @@ def _parse_select(
349350
parse_subquery_alias: bool = True,
350351
parse_set_operation: bool = True,
351352
consume_pipe: bool = True,
353+
from_: t.Optional[exp.From] = None,
352354
) -> t.Optional[exp.Expression]:
353355
select = self.__parse_select( # type: ignore
354356
nested=nested,
355357
table=table,
356358
parse_subquery_alias=parse_subquery_alias,
357359
parse_set_operation=parse_set_operation,
358360
consume_pipe=consume_pipe,
361+
from_=from_,
359362
)
360363

361364
if (

0 commit comments

Comments
 (0)