Skip to content

Commit b18945f

Browse files
authored
Merge branch 'main' into indirect_change_to_materialized_view_is_breaking
2 parents 3dc0657 + d7dda8f commit b18945f

File tree

30 files changed

+406
-174
lines changed

30 files changed

+406
-174
lines changed

docs/integrations/engines/trino.md

Lines changed: 34 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -81,19 +81,21 @@ hive.metastore.glue.default-warehouse-dir=s3://my-bucket/
8181

8282
### Connection options
8383

84-
| Option | Description | Type | Required |
85-
|----------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------|:------:|:--------:|
86-
| `type` | Engine type name - must be `trino` | string | Y |
87-
| `user` | The username (of the account) to log in to your cluster. When connecting to Starburst Galaxy clusters, you must include the role of the user as a suffix to the username. | string | Y |
88-
| `host` | The hostname of your cluster. Don't include the `http://` or `https://` prefix. | string | Y |
89-
| `catalog` | The name of a catalog in your cluster. | string | Y |
90-
| `http_scheme` | The HTTP scheme to use when connecting to your cluster. By default, it's `https` and can only be `http` for no-auth or basic auth. | string | N |
91-
| `port` | The port to connect to your cluster. By default, it's `443` for `https` scheme and `80` for `http` | int | N |
92-
| `roles` | Mapping of catalog name to a role | dict | N |
93-
| `http_headers` | Additional HTTP headers to send with each request. | dict | N |
94-
| `session_properties` | Trino session properties. Run `SHOW SESSION` to see all options. | dict | N |
95-
| `retries` | Number of retries to attempt when a request fails. Default: `3` | int | N |
96-
| `timezone` | Timezone to use for the connection. Default: client-side local timezone | string | N |
84+
| Option | Description | Type | Required |
85+
|---------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|:------:|:--------:|
86+
| `type` | Engine type name - must be `trino` | string | Y |
87+
| `user` | The username (of the account) to log in to your cluster. When connecting to Starburst Galaxy clusters, you must include the role of the user as a suffix to the username. | string | Y |
88+
| `host` | The hostname of your cluster. Don't include the `http://` or `https://` prefix. | string | Y |
89+
| `catalog` | The name of a catalog in your cluster. | string | Y |
90+
| `http_scheme` | The HTTP scheme to use when connecting to your cluster. By default, it's `https` and can only be `http` for no-auth or basic auth. | string | N |
91+
| `port` | The port to connect to your cluster. By default, it's `443` for `https` scheme and `80` for `http` | int | N |
92+
| `roles` | Mapping of catalog name to a role | dict | N |
93+
| `http_headers` | Additional HTTP headers to send with each request. | dict | N |
94+
| `session_properties` | Trino session properties. Run `SHOW SESSION` to see all options. | dict | N |
95+
| `retries` | Number of retries to attempt when a request fails. Default: `3` | int | N |
96+
| `timezone` | Timezone to use for the connection. Default: client-side local timezone | string | N |
97+
| `schema_location_mapping` | A mapping of regex patterns to S3 locations to use for the `LOCATION` property when creating schemas. See [Table and Schema locations](#table-and-schema-locations) for more details. | dict | N |
98+
| `catalog_type_overrides` | A mapping of catalog names to their connector type. This is used to enable/disable connector specific behavior. See [Catalog Type Overrides](#catalog-type-overrides) for more details. | dict | N |
9799

98100
## Table and Schema locations
99101

@@ -204,6 +206,25 @@ SELECT ...
204206

205207
This will cause SQLMesh to set the specified `LOCATION` when issuing a `CREATE TABLE` statement.
206208

209+
## Catalog Type Overrides
210+
211+
SQLMesh attempts to determine the connector type of a catalog by querying the `system.metadata.catalogs` table and checking the `connector_name` column.
212+
It checks if the connector name is `hive` for Hive connector behavior or contains `iceberg` or `delta_lake` for Iceberg or Delta Lake connector behavior respectively.
213+
However, the connector name may not always be a reliable way to determine the connector type, for example when using a custom connector or a fork of an existing connector.
214+
To handle such cases, you can use the `catalog_type_overrides` connection property to explicitly specify the connector type for specific catalogs.
215+
For example, to specify that the `datalake` catalog is using the Iceberg connector and the `analytics` catalog is using the Hive connector, you can configure the connection as follows:
216+
217+
```yaml title="config.yaml"
218+
gateways:
219+
trino:
220+
connection:
221+
type: trino
222+
...
223+
catalog_type_overrides:
224+
datalake: iceberg
225+
analytics: hive
226+
```
227+
207228
## Authentication
208229

209230
=== "No Auth"

sqlmesh/core/config/connection.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,7 @@ class ConnectionConfig(abc.ABC, BaseConfig):
101101
pre_ping: bool
102102
pretty_sql: bool = False
103103
schema_differ_overrides: t.Optional[t.Dict[str, t.Any]] = None
104+
catalog_type_overrides: t.Optional[t.Dict[str, str]] = None
104105

105106
# Whether to share a single connection across threads or create a new connection per thread.
106107
shared_connection: t.ClassVar[bool] = False
@@ -176,6 +177,7 @@ def create_engine_adapter(
176177
pretty_sql=self.pretty_sql,
177178
shared_connection=self.shared_connection,
178179
schema_differ_overrides=self.schema_differ_overrides,
180+
catalog_type_overrides=self.catalog_type_overrides,
179181
**self._extra_engine_config,
180182
)
181183

sqlmesh/core/console.py

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2022,7 +2022,34 @@ def _prompt_categorize(
20222022
plan = plan_builder.build()
20232023

20242024
if plan.restatements:
2025-
self._print("\n[bold]Restating models\n")
2025+
# A plan can have restatements for the following reasons:
2026+
# - The user specifically called `sqlmesh plan` with --restate-model.
2027+
# This creates a "restatement plan" which disallows all other changes and simply force-backfills
2028+
# the selected models and their downstream dependencies using the versions of the models stored in state.
2029+
# - There are no specific restatements (so changes are allowed) AND dev previews need to be computed.
2030+
# The "restatements" feature is currently reused for dev previews.
2031+
if plan.selected_models_to_restate:
2032+
# There were legitimate restatements, no dev previews
2033+
tree = Tree(
2034+
"[bold]Models selected for restatement:[/bold]\n"
2035+
"This causes backfill of the model itself as well as affected downstream models"
2036+
)
2037+
model_fqn_to_snapshot = {s.name: s for s in plan.snapshots.values()}
2038+
for model_fqn in plan.selected_models_to_restate:
2039+
snapshot = model_fqn_to_snapshot[model_fqn]
2040+
display_name = snapshot.display_name(
2041+
plan.environment_naming_info,
2042+
default_catalog if self.verbosity < Verbosity.VERY_VERBOSE else None,
2043+
dialect=self.dialect,
2044+
)
2045+
tree.add(
2046+
display_name
2047+
) # note: we deliberately dont show any intervals here; they get shown in the backfill section
2048+
self._print(tree)
2049+
else:
2050+
# We are computing dev previews, do not confuse the user by printing out something to do
2051+
# with restatements. Dev previews are already highlighted in the backfill step
2052+
pass
20262053
else:
20272054
self.show_environment_difference_summary(
20282055
plan.context_diff,

sqlmesh/core/engine_adapter/base.py

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -223,6 +223,10 @@ def schema_differ(self) -> SchemaDiffer:
223223
}
224224
)
225225

226+
@property
227+
def _catalog_type_overrides(self) -> t.Dict[str, str]:
228+
return self._extra_config.get("catalog_type_overrides") or {}
229+
226230
@classmethod
227231
def _casted_columns(
228232
cls,
@@ -430,7 +434,11 @@ def get_catalog_type(self, catalog: t.Optional[str]) -> str:
430434
raise UnsupportedCatalogOperationError(
431435
f"{self.dialect} does not support catalogs and a catalog was provided: {catalog}"
432436
)
433-
return self.DEFAULT_CATALOG_TYPE
437+
return (
438+
self._catalog_type_overrides.get(catalog, self.DEFAULT_CATALOG_TYPE)
439+
if catalog
440+
else self.DEFAULT_CATALOG_TYPE
441+
)
434442

435443
def get_catalog_type_from_table(self, table: TableName) -> str:
436444
"""Get the catalog type from a table name if it has a catalog specified, otherwise return the current catalog type"""
@@ -1633,6 +1641,30 @@ def _insert_overwrite_by_condition(
16331641
target_columns_to_types=target_columns_to_types,
16341642
order_projections=False,
16351643
)
1644+
elif insert_overwrite_strategy.is_merge:
1645+
columns = [exp.column(col) for col in target_columns_to_types]
1646+
when_not_matched_by_source = exp.When(
1647+
matched=False,
1648+
source=True,
1649+
condition=where,
1650+
then=exp.Delete(),
1651+
)
1652+
when_not_matched_by_target = exp.When(
1653+
matched=False,
1654+
source=False,
1655+
then=exp.Insert(
1656+
this=exp.Tuple(expressions=columns),
1657+
expression=exp.Tuple(expressions=columns),
1658+
),
1659+
)
1660+
self._merge(
1661+
target_table=table_name,
1662+
query=query,
1663+
on=exp.false(),
1664+
whens=exp.Whens(
1665+
expressions=[when_not_matched_by_source, when_not_matched_by_target]
1666+
),
1667+
)
16361668
else:
16371669
insert_exp = exp.insert(
16381670
query,

sqlmesh/core/engine_adapter/bigquery.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99

1010
from sqlmesh.core.dialect import to_schema
1111
from sqlmesh.core.engine_adapter.mixins import (
12-
InsertOverwriteWithMergeMixin,
1312
ClusteredByMixin,
1413
RowDiffMixin,
1514
TableAlterClusterByOperation,
@@ -20,6 +19,7 @@
2019
DataObjectType,
2120
SourceQuery,
2221
set_catalog,
22+
InsertOverwriteStrategy,
2323
)
2424
from sqlmesh.core.node import IntervalUnit
2525
from sqlmesh.core.schema_diff import TableAlterOperation, NestedSupport
@@ -54,7 +54,7 @@
5454

5555

5656
@set_catalog()
57-
class BigQueryEngineAdapter(InsertOverwriteWithMergeMixin, ClusteredByMixin, RowDiffMixin):
57+
class BigQueryEngineAdapter(ClusteredByMixin, RowDiffMixin):
5858
"""
5959
BigQuery Engine Adapter using the `google-cloud-bigquery` library's DB API.
6060
"""
@@ -68,6 +68,7 @@ class BigQueryEngineAdapter(InsertOverwriteWithMergeMixin, ClusteredByMixin, Row
6868
MAX_COLUMN_COMMENT_LENGTH = 1024
6969
SUPPORTS_QUERY_EXECUTION_TRACKING = True
7070
SUPPORTED_DROP_CASCADE_OBJECT_KINDS = ["SCHEMA"]
71+
INSERT_OVERWRITE_STRATEGY = InsertOverwriteStrategy.MERGE
7172

7273
SCHEMA_DIFFER_KWARGS = {
7374
"compatible_types": {

sqlmesh/core/engine_adapter/fabric.py

Lines changed: 1 addition & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -7,22 +7,15 @@
77
from functools import cached_property
88
from sqlglot import exp
99
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_result
10+
from sqlmesh.core.engine_adapter.mixins import LogicalMergeMixin
1011
from sqlmesh.core.engine_adapter.mssql import MSSQLEngineAdapter
1112
from sqlmesh.core.engine_adapter.shared import (
1213
InsertOverwriteStrategy,
13-
SourceQuery,
1414
)
15-
from sqlmesh.core.engine_adapter.base import EngineAdapter
1615
from sqlmesh.utils.errors import SQLMeshError
1716
from sqlmesh.utils.connection_pool import ConnectionPool
1817

1918

20-
if t.TYPE_CHECKING:
21-
from sqlmesh.core._typing import TableName
22-
23-
24-
from sqlmesh.core.engine_adapter.mixins import LogicalMergeMixin
25-
2619
logger = logging.getLogger(__name__)
2720

2821

@@ -58,26 +51,6 @@ def _target_catalog(self) -> t.Optional[str]:
5851
def _target_catalog(self, value: t.Optional[str]) -> None:
5952
self._connection_pool.set_attribute("target_catalog", value)
6053

61-
def _insert_overwrite_by_condition(
62-
self,
63-
table_name: TableName,
64-
source_queries: t.List[SourceQuery],
65-
target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None,
66-
where: t.Optional[exp.Condition] = None,
67-
insert_overwrite_strategy_override: t.Optional[InsertOverwriteStrategy] = None,
68-
**kwargs: t.Any,
69-
) -> None:
70-
# Override to avoid MERGE statement which isn't fully supported in Fabric
71-
return EngineAdapter._insert_overwrite_by_condition(
72-
self,
73-
table_name=table_name,
74-
source_queries=source_queries,
75-
target_columns_to_types=target_columns_to_types,
76-
where=where,
77-
insert_overwrite_strategy_override=InsertOverwriteStrategy.DELETE_INSERT,
78-
**kwargs,
79-
)
80-
8154
@property
8255
def api_client(self) -> FabricHttpClient:
8356
# the requests Session is not guaranteed to be threadsafe

sqlmesh/core/engine_adapter/mixins.py

Lines changed: 0 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99
from sqlglot.helper import seq_get
1010

1111
from sqlmesh.core.engine_adapter.base import EngineAdapter
12-
from sqlmesh.core.engine_adapter.shared import InsertOverwriteStrategy, SourceQuery
1312
from sqlmesh.core.node import IntervalUnit
1413
from sqlmesh.core.dialect import schema_
1514
from sqlmesh.core.schema_diff import TableAlterOperation
@@ -75,52 +74,6 @@ def _fetch_native_df(
7574
return df
7675

7776

78-
class InsertOverwriteWithMergeMixin(EngineAdapter):
79-
def _insert_overwrite_by_condition(
80-
self,
81-
table_name: TableName,
82-
source_queries: t.List[SourceQuery],
83-
target_columns_to_types: t.Optional[t.Dict[str, exp.DataType]] = None,
84-
where: t.Optional[exp.Condition] = None,
85-
insert_overwrite_strategy_override: t.Optional[InsertOverwriteStrategy] = None,
86-
**kwargs: t.Any,
87-
) -> None:
88-
"""
89-
Some engines do not support `INSERT OVERWRITE` but instead support
90-
doing an "INSERT OVERWRITE" using a Merge expression but with the
91-
predicate being `False`.
92-
"""
93-
target_columns_to_types = target_columns_to_types or self.columns(table_name)
94-
for source_query in source_queries:
95-
with source_query as query:
96-
query = self._order_projections_and_filter(
97-
query, target_columns_to_types, where=where
98-
)
99-
columns = [exp.column(col) for col in target_columns_to_types]
100-
when_not_matched_by_source = exp.When(
101-
matched=False,
102-
source=True,
103-
condition=where,
104-
then=exp.Delete(),
105-
)
106-
when_not_matched_by_target = exp.When(
107-
matched=False,
108-
source=False,
109-
then=exp.Insert(
110-
this=exp.Tuple(expressions=columns),
111-
expression=exp.Tuple(expressions=columns),
112-
),
113-
)
114-
self._merge(
115-
target_table=table_name,
116-
query=query,
117-
on=exp.false(),
118-
whens=exp.Whens(
119-
expressions=[when_not_matched_by_source, when_not_matched_by_target]
120-
),
121-
)
122-
123-
12477
class HiveMetastoreTablePropertiesMixin(EngineAdapter):
12578
MAX_TABLE_COMMENT_LENGTH = 4000
12679
MAX_COLUMN_COMMENT_LENGTH = 4000

sqlmesh/core/engine_adapter/mssql.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
)
1717
from sqlmesh.core.engine_adapter.mixins import (
1818
GetCurrentCatalogFromFunctionMixin,
19-
InsertOverwriteWithMergeMixin,
2019
PandasNativeFetchDFSupportMixin,
2120
VarcharSizeWorkaroundMixin,
2221
RowDiffMixin,
@@ -41,7 +40,6 @@
4140
class MSSQLEngineAdapter(
4241
EngineAdapterWithIndexSupport,
4342
PandasNativeFetchDFSupportMixin,
44-
InsertOverwriteWithMergeMixin,
4543
GetCurrentCatalogFromFunctionMixin,
4644
VarcharSizeWorkaroundMixin,
4745
RowDiffMixin,
@@ -74,6 +72,7 @@ class MSSQLEngineAdapter(
7472
},
7573
}
7674
VARIABLE_LENGTH_DATA_TYPES = {"binary", "varbinary", "char", "varchar", "nchar", "nvarchar"}
75+
INSERT_OVERWRITE_STRATEGY = InsertOverwriteStrategy.MERGE
7776

7877
@property
7978
def catalog_support(self) -> CatalogSupport:

sqlmesh/core/engine_adapter/shared.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -243,6 +243,8 @@ class InsertOverwriteStrategy(Enum):
243243
# Issue a single INSERT query to replace a data range. The assumption is that the query engine will transparently match partition bounds
244244
# and replace data rather than append to it. Trino is an example of this when `hive.insert-existing-partitions-behavior=OVERWRITE` is configured
245245
INTO_IS_OVERWRITE = 4
246+
# Do the INSERT OVERWRITE using merge since the engine doesn't support it natively
247+
MERGE = 5
246248

247249
@property
248250
def is_delete_insert(self) -> bool:
@@ -260,6 +262,10 @@ def is_replace_where(self) -> bool:
260262
def is_into_is_overwrite(self) -> bool:
261263
return self == InsertOverwriteStrategy.INTO_IS_OVERWRITE
262264

265+
@property
266+
def is_merge(self) -> bool:
267+
return self == InsertOverwriteStrategy.MERGE
268+
263269

264270
class SourceQuery:
265271
def __init__(

0 commit comments

Comments
 (0)