Skip to content

Commit 3f8d15b

Browse files
committed
feat: make temporal partition_mapper timezone aware
1 parent 44cad95 commit 3f8d15b

9 files changed

Lines changed: 91 additions & 67 deletions

File tree

airflow-core/src/airflow/example_dags/example_asset_partition.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ def team_c_player_stats():
7777
dag_id="clean_and_combine_player_stats",
7878
schedule=PartitionedAssetTimetable(
7979
assets=team_a_player_stats & team_b_player_stats & team_c_player_stats,
80-
default_partition_mapper=HourlyMapper(),
80+
default_partition_mapper=HourlyMapper(timezone="UTC"),
8181
),
8282
catchup=False,
8383
tags=["player-stats", "cleanup"],
@@ -121,9 +121,9 @@ def compute_player_odds():
121121
schedule=PartitionedAssetTimetable(
122122
assets=(combined_player_stats & team_a_player_stats & Asset.ref(name="team_b_player_stats")),
123123
partition_mapper_config={
124-
combined_player_stats: YearlyMapper(), # incompatible on purpose
125-
team_a_player_stats: HourlyMapper(),
126-
Asset.ref(name="team_b_player_stats"): HourlyMapper(),
124+
combined_player_stats: YearlyMapper(timezone="UTC"), # incompatible on purpose
125+
team_a_player_stats: HourlyMapper(timezone="UTC"),
126+
Asset.ref(name="team_b_player_stats"): HourlyMapper(timezone="UTC"),
127127
},
128128
),
129129
catchup=False,

airflow-core/src/airflow/partition_mappers/temporal.py

Lines changed: 26 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,14 @@
1818

1919
from abc import ABC, abstractmethod
2020
from datetime import datetime, timedelta
21-
from typing import Any
21+
from typing import TYPE_CHECKING, Any
2222

23+
from airflow._shared.timezones.timezone import parse_timezone
2324
from airflow.partition_mappers.base import PartitionMapper
2425

26+
if TYPE_CHECKING:
27+
from pendulum import FixedTimezone, Timezone
28+
2529

2630
class _BaseTemporalMapper(PartitionMapper, ABC):
2731
"""Base class for Temporal Partition Mappers."""
@@ -30,14 +34,24 @@ class _BaseTemporalMapper(PartitionMapper, ABC):
3034

3135
def __init__(
3236
self,
33-
input_format: str = "%Y-%m-%dT%H:%M:%S",
37+
*,
38+
timezone: str | Timezone | FixedTimezone,
39+
input_format: str = "%Y-%m-%dT%H:%M:%S%z",
3440
output_format: str | None = None,
3541
):
3642
self.input_format = input_format
3743
self.output_format = output_format or self.default_output_format
44+
if isinstance(timezone, str):
45+
timezone = parse_timezone(timezone)
46+
self._timezone: Timezone | FixedTimezone = timezone
3847

3948
def to_downstream(self, key: str) -> str:
4049
dt = datetime.strptime(key, self.input_format)
50+
if dt.tzinfo is None:
51+
dt = dt.replace(tzinfo=self._timezone)
52+
else:
53+
dt = dt.astimezone(self._timezone)
54+
4155
normalized = self.normalize(dt)
4256
return self.format(normalized)
4357

@@ -50,14 +64,18 @@ def format(self, dt: datetime) -> str:
5064
return dt.strftime(self.output_format)
5165

5266
def serialize(self) -> dict[str, Any]:
67+
from airflow.serialization.encoders import encode_timezone
68+
5369
return {
70+
"timezone": encode_timezone(self._timezone),
5471
"input_format": self.input_format,
5572
"output_format": self.output_format,
5673
}
5774

5875
@classmethod
5976
def deserialize(cls, data: dict[str, Any]) -> PartitionMapper:
6077
return cls(
78+
timezone=parse_timezone(data["timezone"]),
6179
input_format=data["input_format"],
6280
output_format=data["output_format"],
6381
)
@@ -66,7 +84,7 @@ def deserialize(cls, data: dict[str, Any]) -> PartitionMapper:
6684
class HourlyMapper(_BaseTemporalMapper):
6785
"""Map a time-based partition key to hour."""
6886

69-
default_output_format = "%Y-%m-%dT%H"
87+
default_output_format = "%Y-%m-%dT%H%z"
7088

7189
def normalize(self, dt: datetime) -> datetime:
7290
return dt.replace(minute=0, second=0, microsecond=0)
@@ -75,7 +93,7 @@ def normalize(self, dt: datetime) -> datetime:
7593
class DailyMapper(_BaseTemporalMapper):
7694
"""Map a time-based partition key to day."""
7795

78-
default_output_format = "%Y-%m-%d"
96+
default_output_format = "%Y-%m-%d%z"
7997

8098
def normalize(self, dt: datetime) -> datetime:
8199
return dt.replace(hour=0, minute=0, second=0, microsecond=0)
@@ -84,7 +102,7 @@ def normalize(self, dt: datetime) -> datetime:
84102
class WeeklyMapper(_BaseTemporalMapper):
85103
"""Map a time-based partition key to week."""
86104

87-
default_output_format = "%Y-%m-%d (W%V)"
105+
default_output_format = "%Y-%m-%d (W%V)%z"
88106

89107
def normalize(self, dt: datetime) -> datetime:
90108
start = dt - timedelta(days=dt.weekday())
@@ -94,7 +112,7 @@ def normalize(self, dt: datetime) -> datetime:
94112
class MonthlyMapper(_BaseTemporalMapper):
95113
"""Map a time-based partition key to month."""
96114

97-
default_output_format = "%Y-%m"
115+
default_output_format = "%Y-%m%z"
98116

99117
def normalize(self, dt: datetime) -> datetime:
100118
return dt.replace(
@@ -109,7 +127,7 @@ def normalize(self, dt: datetime) -> datetime:
109127
class QuarterlyMapper(_BaseTemporalMapper):
110128
"""Map a time-based partition key to quarter."""
111129

112-
default_output_format = "%Y-Q{quarter}"
130+
default_output_format = "%Y-Q{quarter}%z"
113131

114132
def normalize(self, dt: datetime) -> datetime:
115133
quarter = (dt.month - 1) // 3
@@ -131,7 +149,7 @@ def format(self, dt: datetime) -> str:
131149
class YearlyMapper(_BaseTemporalMapper):
132150
"""Map a time-based partition key to year."""
133151

134-
default_output_format = "%Y"
152+
default_output_format = "%Y%z"
135153

136154
def normalize(self, dt: datetime) -> datetime:
137155
return dt.replace(

airflow-core/src/airflow/serialization/encoders.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -407,6 +407,7 @@ def _(
407407
| YearlyMapper,
408408
) -> dict[str, Any]:
409409
return {
410+
"timezone": partition_mapper.timezone,
410411
"input_format": partition_mapper.input_format,
411412
"output_format": partition_mapper.output_format,
412413
}

airflow-core/src/airflow/timetables/trigger.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -401,7 +401,7 @@ def __init__(
401401
run_offset: int | datetime.timedelta | relativedelta | None = None,
402402
run_immediately: bool | datetime.timedelta = False,
403403
# todo: AIP-76 we can't infer partition date from this, so we need to store it separately.
404-
key_format: str = r"%Y-%m-%dT%H:%M:%S",
404+
key_format: str = r"%Y-%m-%dT%H:%M:%S%z",
405405
) -> None:
406406
super().__init__(cron, timezone=timezone, run_immediately=run_immediately)
407407
if not isinstance(run_offset, (int, NoneType)):

airflow-core/tests/unit/cli/commands/test_dag_command.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -220,11 +220,11 @@ def test_show_dag_imgcat(self, mock_render_dag, mock_popen, stdout_capture):
220220
"timedelta(days=-5)",
221221
"CronPartitionTimetable('0 0 * * *', timezone='UTC')",
222222
"False",
223-
jan_1.strftime(r"%Y-%m-%dT%H:%M:%S"),
223+
jan_1.strftime(r"%Y-%m-%dT%H:%M:%S%z"),
224224
os.linesep.join(
225225
[
226-
jan_1.strftime(r"%Y-%m-%dT%H:%M:%S"),
227-
(jan_1 + timedelta(days=1)).strftime(r"%Y-%m-%dT%H:%M:%S"),
226+
jan_1.strftime(r"%Y-%m-%dT%H:%M:%S%z"),
227+
(jan_1 + timedelta(days=1)).strftime(r"%Y-%m-%dT%H:%M:%S%z"),
228228
],
229229
),
230230
id="partitioned",

airflow-core/tests/unit/partition_mappers/test_temporal.py

Lines changed: 19 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -33,38 +33,39 @@ class TestTemporalMappers:
3333
@pytest.mark.parametrize(
3434
("mapper_cls", "expected_downstream_key"),
3535
[
36-
(HourlyMapper, "2026-02-10T14"),
37-
(DailyMapper, "2026-02-10"),
38-
(WeeklyMapper, "2026-02-09 (W07)"),
39-
(MonthlyMapper, "2026-02"),
40-
(QuarterlyMapper, "2026-Q1"),
41-
(YearlyMapper, "2026"),
36+
(HourlyMapper, "2026-02-10T14+0000"),
37+
(DailyMapper, "2026-02-10+0000"),
38+
(WeeklyMapper, "2026-02-09 (W07)+0000"),
39+
(MonthlyMapper, "2026-02+0000"),
40+
(QuarterlyMapper, "2026-Q1+0000"),
41+
(YearlyMapper, "2026+0000"),
4242
],
4343
)
4444
def test_to_downstream(
4545
self,
4646
mapper_cls: type[_BaseTemporalMapper],
4747
expected_downstream_key: str,
4848
):
49-
pm = mapper_cls()
50-
assert pm.to_downstream("2026-02-10T14:30:45") == expected_downstream_key
49+
pm = mapper_cls(timezone="UTC")
50+
assert pm.to_downstream("2026-02-10T14:30:45+0000") == expected_downstream_key
5151

5252
@pytest.mark.parametrize(
5353
("mapper_cls", "expected_outut_format"),
5454
[
55-
(HourlyMapper, "%Y-%m-%dT%H"),
56-
(DailyMapper, "%Y-%m-%d"),
57-
(WeeklyMapper, "%Y-%m-%d (W%V)"),
58-
(MonthlyMapper, "%Y-%m"),
59-
(QuarterlyMapper, "%Y-Q{quarter}"),
60-
(YearlyMapper, "%Y"),
55+
(HourlyMapper, "%Y-%m-%dT%H%z"),
56+
(DailyMapper, "%Y-%m-%d%z"),
57+
(WeeklyMapper, "%Y-%m-%d (W%V)%z"),
58+
(MonthlyMapper, "%Y-%m%z"),
59+
(QuarterlyMapper, "%Y-Q{quarter}%z"),
60+
(YearlyMapper, "%Y%z"),
6161
],
6262
)
6363
def test_serialize(self, mapper_cls: type[_BaseTemporalMapper], expected_outut_format: str):
64-
pm = mapper_cls()
64+
pm = mapper_cls(timezone="UTC")
6565
assert pm.serialize() == {
66-
"input_format": "%Y-%m-%dT%H:%M:%S",
66+
"input_format": "%Y-%m-%dT%H:%M:%S%z",
6767
"output_format": expected_outut_format,
68+
"timezone": "UTC",
6869
}
6970

7071
@pytest.mark.parametrize(
@@ -73,11 +74,8 @@ def test_serialize(self, mapper_cls: type[_BaseTemporalMapper], expected_outut_f
7374
)
7475
def test_deserialize(self, mapper_cls):
7576
pm = mapper_cls.deserialize(
76-
{
77-
"input_format": "%Y-%m-%dT%H:%M:%S",
78-
"output_format": "customized-format",
79-
}
77+
{"input_format": "%Y-%m-%dT%H:%M:%S%z", "output_format": "customized-format", "timezone": "UTC"}
8078
)
8179
assert isinstance(pm, mapper_cls)
82-
assert pm.input_format == "%Y-%m-%dT%H:%M:%S"
80+
assert pm.input_format == "%Y-%m-%dT%H:%M:%S%z"
8381
assert pm.output_format == "customized-format"

airflow-core/tests/unit/serialization/test_serialized_objects.py

Lines changed: 22 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -751,39 +751,39 @@ def test_encode_timezone():
751751
(IdentityMapper, [], "airflow.partition_mappers.identity.IdentityMapper", {}),
752752
(
753753
HourlyMapper,
754-
[],
754+
["UTC"],
755755
"airflow.partition_mappers.temporal.HourlyMapper",
756-
{"input_format": "%Y-%m-%dT%H:%M:%S", "output_format": "%Y-%m-%dT%H"},
756+
{"input_format": "%Y-%m-%dT%H:%M:%S%z", "output_format": "%Y-%m-%dT%H%z", "timezone": "UTC"},
757757
),
758758
(
759759
DailyMapper,
760-
[],
760+
["UTC"],
761761
"airflow.partition_mappers.temporal.DailyMapper",
762-
{"input_format": "%Y-%m-%dT%H:%M:%S", "output_format": "%Y-%m-%d"},
762+
{"input_format": "%Y-%m-%dT%H:%M:%S%z", "output_format": "%Y-%m-%d%z", "timezone": "UTC"},
763763
),
764764
(
765765
WeeklyMapper,
766-
[],
766+
["UTC"],
767767
"airflow.partition_mappers.temporal.WeeklyMapper",
768-
{"input_format": "%Y-%m-%dT%H:%M:%S", "output_format": "%Y-%m-%d (W%V)"},
768+
{"input_format": "%Y-%m-%dT%H:%M:%S%z", "output_format": "%Y-%m-%d (W%V)%z", "timezone": "UTC"},
769769
),
770770
(
771771
MonthlyMapper,
772-
[],
772+
["UTC"],
773773
"airflow.partition_mappers.temporal.MonthlyMapper",
774-
{"input_format": "%Y-%m-%dT%H:%M:%S", "output_format": "%Y-%m"},
774+
{"input_format": "%Y-%m-%dT%H:%M:%S%z", "output_format": "%Y-%m%z", "timezone": "UTC"},
775775
),
776776
(
777777
QuarterlyMapper,
778-
[],
778+
["UTC"],
779779
"airflow.partition_mappers.temporal.QuarterlyMapper",
780-
{"input_format": "%Y-%m-%dT%H:%M:%S", "output_format": "%Y-Q{quarter}"},
780+
{"input_format": "%Y-%m-%dT%H:%M:%S%z", "output_format": "%Y-Q{quarter}%z", "timezone": "UTC"},
781781
),
782782
(
783783
YearlyMapper,
784-
[],
784+
["UTC"],
785785
"airflow.partition_mappers.temporal.YearlyMapper",
786-
{"input_format": "%Y-%m-%dT%H:%M:%S", "output_format": "%Y"},
786+
{"input_format": "%Y-%m-%dT%H:%M:%S%z", "output_format": "%Y%z", "timezone": "UTC"},
787787
),
788788
],
789789
)
@@ -798,22 +798,22 @@ def test_encode_partition_mapper(cls, args, encode_type, encode_var):
798798

799799

800800
@pytest.mark.parametrize(
801-
("sdk_cls", "core_cls"),
801+
("sdk_cls", "args", "core_cls"),
802802
[
803-
(IdentityMapper, CoreIdentityMapper),
804-
(HourlyMapper, CoureHourlyMapper),
805-
(DailyMapper, CoreDailyMapper),
806-
(WeeklyMapper, CoreWeeklyMapper),
807-
(MonthlyMapper, CoreMonthlyMapper),
808-
(QuarterlyMapper, CoreQuarterlyMapper),
809-
(YearlyMapper, CoreYearlyMapper),
803+
(IdentityMapper, [], CoreIdentityMapper),
804+
(HourlyMapper, ["UTC"], CoureHourlyMapper),
805+
(DailyMapper, ["UTC"], CoreDailyMapper),
806+
(WeeklyMapper, ["UTC"], CoreWeeklyMapper),
807+
(MonthlyMapper, ["UTC"], CoreMonthlyMapper),
808+
(QuarterlyMapper, ["UTC"], CoreQuarterlyMapper),
809+
(YearlyMapper, ["UTC"], CoreYearlyMapper),
810810
],
811811
)
812-
def test_decode_partition_mapper(sdk_cls, core_cls):
812+
def test_decode_partition_mapper(sdk_cls, args, core_cls):
813813
from airflow.serialization.decoders import decode_partition_mapper
814814
from airflow.serialization.encoders import encode_partition_mapper
815815

816-
partition_mapper = sdk_cls()
816+
partition_mapper = sdk_cls(*args)
817817
encoded_pm = encode_partition_mapper(partition_mapper)
818818

819819
core_pm = decode_partition_mapper(encoded_pm)

task-sdk/src/airflow/sdk/definitions/partition_mappers/temporal.py

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -16,52 +16,59 @@
1616
# under the License.
1717
from __future__ import annotations
1818

19+
from typing import TYPE_CHECKING
20+
1921
from airflow.sdk.definitions.partition_mappers.base import PartitionMapper
2022

23+
if TYPE_CHECKING:
24+
from pendulum.tz.timezone import FixedTimezone, Timezone
25+
2126

2227
class _BaseTemporalMapper(PartitionMapper):
2328
default_output_format: str
2429

2530
def __init__(
2631
self,
27-
input_format: str = "%Y-%m-%dT%H:%M:%S",
32+
timezone: str | Timezone | FixedTimezone,
33+
input_format: str = "%Y-%m-%dT%H:%M:%S%z",
2834
output_format: str | None = None,
2935
) -> None:
36+
self.timezone = timezone
3037
self.input_format = input_format
3138
self.output_format = output_format or self.default_output_format
3239

3340

3441
class HourlyMapper(_BaseTemporalMapper):
3542
"""Map a time-based partition key to hour."""
3643

37-
default_output_format = "%Y-%m-%dT%H"
44+
default_output_format = "%Y-%m-%dT%H%z"
3845

3946

4047
class DailyMapper(_BaseTemporalMapper):
4148
"""Map a time-based partition key to day."""
4249

43-
default_output_format = "%Y-%m-%d"
50+
default_output_format = "%Y-%m-%d%z"
4451

4552

4653
class WeeklyMapper(_BaseTemporalMapper):
4754
"""Map a time-based partition key to week."""
4855

49-
default_output_format = "%Y-%m-%d (W%V)"
56+
default_output_format = "%Y-%m-%d (W%V)%z"
5057

5158

5259
class MonthlyMapper(_BaseTemporalMapper):
5360
"""Map a time-based partition key to month."""
5461

55-
default_output_format = "%Y-%m"
62+
default_output_format = "%Y-%m%z"
5663

5764

5865
class QuarterlyMapper(_BaseTemporalMapper):
5966
"""Map a time-based partition key to quarter."""
6067

61-
default_output_format = "%Y-Q{quarter}"
68+
default_output_format = "%Y-Q{quarter}%z"
6269

6370

6471
class YearlyMapper(_BaseTemporalMapper):
6572
"""Map a time-based partition key to year."""
6673

67-
default_output_format = "%Y"
74+
default_output_format = "%Y%z"

task-sdk/src/airflow/sdk/definitions/timetables/trigger.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,7 @@ def __init__(
168168
run_offset: int | datetime.timedelta | relativedelta | None = None,
169169
run_immediately: bool | datetime.timedelta = False,
170170
# todo: AIP-76 we can't infer partition date from this, so we need to store it separately
171-
key_format: str = r"%Y-%m-%dT%H:%M:%S",
171+
key_format: str = r"%Y-%m-%dT%H:%M:%S%z",
172172
) -> None:
173173
if not isinstance(run_offset, (int, NoneType)):
174174
# todo: AIP-76 implement timedelta / relative delta?

0 commit comments

Comments
 (0)