Skip to content

Commit 463932a

Browse files
authored
Feat: Add support for YC managed Airflow (#3396)
1 parent 4fd59ba commit 463932a

File tree

4 files changed

+68
-0
lines changed

4 files changed

+68
-0
lines changed

docs/integrations/airflow.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -155,3 +155,7 @@ default_scheduler:
155155
type: mwaa
156156
environment: <The MWAA Environment Name>
157157
```
158+
159+
### YC Airflow
160+
161+
SQLMesh fully supports Airflow hosted on Yandex [managed Airflow instances](https://yandex.cloud/en/services/managed-airflow) - see the [configuration reference page](../reference/configuration.md#yc-airflow) for more information.

docs/reference/configuration.md

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -227,6 +227,17 @@ See [Airflow Integration Guide](../integrations/airflow.md) for information abou
227227

228228
The Google Cloud Composer scheduler type shares the same configuration options as the `airflow` type, except for `username` and `password`. Cloud Composer relies on `gcloud` authentication, so the `username` and `password` options are not required.
229229

230+
#### YC Airflow
231+
232+
**Type:** `yc_airflow`
233+
234+
Yandex Managed Airflow shares similar configuration options with the standard `airflow` type, with the following exceptions:
235+
236+
- `max_snapshot_ids_per_request`: This option is deprecated and not supported.
237+
- Authentication: YC Airflow requires additional credentials, including both a `token` and a combination of `username` and `password`.
238+
239+
Unlike the `airflow` type, YC Airflow leverages Yandex Cloud's internal authentication mechanisms. Therefore, all requests to the Airflow API must include a valid Yandex Cloud IAM-token for authentication.
240+
230241
## Gateway/connection defaults
231242

232243
The default gateway and connection keys specify what should happen when gateways or connections are not explicitly specified. Find additional details in the configuration overview page [gateway/connection defaults section](../guides/configuration.md#gatewayconnection-defaults).

sqlmesh/core/config/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,4 +37,5 @@
3737
BuiltInSchedulerConfig as BuiltInSchedulerConfig,
3838
CloudComposerSchedulerConfig as CloudComposerSchedulerConfig,
3939
MWAASchedulerConfig as MWAASchedulerConfig,
40+
YCAirflowSchedulerConfig as YCAirflowSchedulerConfig,
4041
)

sqlmesh/core/config/scheduler.py

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -251,6 +251,58 @@ def get_client(self, console: t.Optional[Console] = None) -> AirflowClient:
251251
)
252252

253253

254+
class YCAirflowSchedulerConfig(_BaseAirflowSchedulerConfig, BaseConfig):
255+
"""The Yandex Cloud Managed Airflow Scheduler configuration.
256+
257+
Args:
258+
airflow_url: The URL of the Airflow Webserver.
259+
username: The Airflow username.
260+
password: The Airflow password.
261+
dag_run_poll_interval_secs: Determines how often a running DAG can be polled (in seconds).
262+
dag_creation_poll_interval_secs: Determines how often SQLMesh should check whether a DAG has been created (in seconds).
263+
dag_creation_max_retry_attempts: Determines the maximum number of attempts that SQLMesh will make while checking for
264+
whether a DAG has been created.
265+
backfill_concurrent_tasks: The number of concurrent tasks used for model backfilling during plan application.
266+
ddl_concurrent_tasks: The number of concurrent tasks used for DDL operations (table / view creation, deletion, etc).
267+
max_snapshot_ids_per_request: The maximum number of snapshot IDs that can be sent in a single HTTP GET request to the Airflow Webserver (Deprecated).
268+
use_state_connection: Whether to use the `state_connection` configuration to access the SQLMesh state.
269+
default_catalog_override: Overrides the default catalog value for this project. If specified, this value takes precedence
270+
over the default catalog value set on the Airflow side.
271+
token: The IAM-token for API authentification.
272+
"""
273+
274+
airflow_url: str
275+
username: str
276+
password: str
277+
token: str
278+
dag_run_poll_interval_secs: int = 10
279+
dag_creation_poll_interval_secs: int = 30
280+
dag_creation_max_retry_attempts: int = 10
281+
282+
backfill_concurrent_tasks: int = 4
283+
ddl_concurrent_tasks: int = 4
284+
285+
use_state_connection: bool = False
286+
287+
default_catalog_override: t.Optional[str] = None
288+
289+
_concurrent_tasks_validator = concurrent_tasks_validator
290+
291+
type_: Literal["yc_airflow"] = Field(alias="type", default="yc_airflow")
292+
293+
def get_client(self, console: t.Optional[Console] = None) -> AirflowClient:
294+
session = Session()
295+
296+
session.auth = (self.username, self.password)
297+
session.headers.update({"X-Cloud-Authorization": f"Bearer {self.token}"})
298+
299+
return AirflowClient(
300+
session=session,
301+
airflow_url=self.airflow_url,
302+
console=console,
303+
)
304+
305+
254306
class CloudComposerSchedulerConfig(_BaseAirflowSchedulerConfig, BaseConfig, extra="allow"):
255307
"""The Google Cloud Composer configuration.
256308

0 commit comments

Comments
 (0)