Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions src/google/adk/sessions/database_session_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,12 @@ async def get_session(
)

if config and config.after_timestamp:
after_dt = datetime.fromtimestamp(config.after_timestamp)
if self.db_engine.dialect.name == "sqlite":
after_dt = datetime.fromtimestamp(config.after_timestamp)
else:
after_dt = datetime.fromtimestamp(
config.after_timestamp, timezone.utc
)
stmt = stmt.filter(schema.StorageEvent.timestamp >= after_dt)

stmt = stmt.order_by(schema.StorageEvent.timestamp.desc())
Expand Down Expand Up @@ -513,7 +518,7 @@ async def append_event(self, session: Session, event: Event) -> Event:
event.timestamp, timezone.utc
).replace(tzinfo=None)
else:
update_time = datetime.fromtimestamp(event.timestamp)
update_time = datetime.fromtimestamp(event.timestamp, timezone.utc)
storage_session.update_time = update_time
sql_session.add(schema.StorageEvent.from_event(session, event))

Expand Down
150 changes: 150 additions & 0 deletions src/google/adk/sessions/migration/migrate_postgresql_timestamptz.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
# Copyright 2026 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Migration script to convert TIMESTAMP to TIMESTAMPTZ for PostgreSQL.

Starting from ADK v1.24.0, DatabaseSessionService creates timezone-aware
datetime objects (with tzinfo=UTC). When using PostgreSQL with asyncpg,
this causes a conflict if existing timestamp columns are defined as
TIMESTAMP WITHOUT TIME ZONE, resulting in:

asyncpg.exceptions.DataError: can't subtract offset-naive and
offset-aware datetimes

This migration alters all timestamp columns in ADK tables to use
TIMESTAMP WITH TIME ZONE. It is safe to run on existing data as
PostgreSQL will interpret existing naive timestamps as being in the
server's timezone (typically UTC).

Usage:
python -m google.adk.sessions.migration.migrate_postgresql_timestamptz \
--db_url postgresql+asyncpg://user:pass@host:port/dbname
"""

from __future__ import annotations

import argparse
import logging
import sys

from sqlalchemy import create_engine
from sqlalchemy import text

from . import _schema_check_utils

logger = logging.getLogger("google_adk." + __name__)

# Columns to migrate: (table_name, column_name)
_TIMESTAMP_COLUMNS = [
("sessions", "create_time"),
("sessions", "update_time"),
("events", "timestamp"),
("app_states", "update_time"),
("user_states", "update_time"),
]


def migrate(db_url: str) -> None:
"""Migrates TIMESTAMP columns to TIMESTAMP WITH TIME ZONE for PostgreSQL.

Args:
db_url: The database URL (sync or async format).
"""
sync_url = _schema_check_utils.to_sync_url(db_url)
engine = create_engine(sync_url)

try:
with engine.begin() as conn:
# Only run on PostgreSQL
if engine.dialect.name != "postgresql":
logger.info(
"Skipping TIMESTAMPTZ migration: not a PostgreSQL database"
" (dialect=%s).",
engine.dialect.name,
)
return

migrated = 0
for table_name, column_name in _TIMESTAMP_COLUMNS:
# Check if table exists
result = conn.execute(
text(
"SELECT data_type FROM information_schema.columns "
"WHERE table_schema = 'public' "
"AND table_name = :table_name "
"AND column_name = :column_name"
),
{"table_name": table_name, "column_name": column_name},
).fetchone()

if result is None:
logger.debug(
"Skipping %s.%s: column not found.", table_name, column_name
)
continue

if result[0] == "timestamp with time zone":
logger.debug(
"Skipping %s.%s: already TIMESTAMP WITH TIME ZONE.",
table_name,
column_name,
)
continue

logger.info(
"Migrating %s.%s from %s to TIMESTAMP WITH TIME ZONE.",
table_name,
column_name,
result[0],
)
conn.execute(
text(
f"ALTER TABLE {table_name} "
f"ALTER COLUMN {column_name} "
f"TYPE TIMESTAMP WITH TIME ZONE"
)
)
migrated += 1

if migrated > 0:
logger.info(
"Successfully migrated %d column(s) to TIMESTAMP WITH TIME ZONE.",
migrated,
)
else:
logger.info("No columns needed migration.")

finally:
engine.dispose()


def main():
parser = argparse.ArgumentParser(
description=(
"Migrate PostgreSQL TIMESTAMP columns to TIMESTAMP WITH TIME ZONE"
" for ADK DatabaseSessionService."
)
)
parser.add_argument(
"--db_url",
required=True,
help="Database URL (e.g., postgresql+asyncpg://user:pass@host:port/db)",
)
args = parser.parse_args()

logging.basicConfig(level=logging.INFO)
migrate(args.db_url)


if __name__ == "__main__":
main()
2 changes: 2 additions & 0 deletions src/google/adk/sessions/schemas/shared.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,4 +64,6 @@ class PreciseTimestamp(TypeDecorator):
def load_dialect_impl(self, dialect):
if dialect.name == "mysql":
return dialect.type_descriptor(mysql.DATETIME(fsp=6))
elif dialect.name == "postgresql":
return dialect.type_descriptor(postgresql.TIMESTAMP(timezone=True))
return self.impl
69 changes: 69 additions & 0 deletions tests/unittests/sessions/test_precise_timestamp.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
# Copyright 2026 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Tests for PreciseTimestamp dialect-specific type resolution.

These tests verify that PreciseTimestamp maps to the correct database-specific
column types, particularly ensuring PostgreSQL uses TIMESTAMP WITH TIME ZONE
to prevent asyncpg 'offset-naive and offset-aware datetimes' errors.
"""

from google.adk.sessions.schemas.shared import PreciseTimestamp
from sqlalchemy.dialects import mysql
from sqlalchemy.dialects import postgresql
from sqlalchemy.dialects import sqlite
from sqlalchemy.types import DateTime


class TestPreciseTimestampDialectImpl:
"""Tests that PreciseTimestamp.load_dialect_impl returns the correct type."""

def test_postgresql_returns_timestamp_with_timezone(self):
"""PostgreSQL must use TIMESTAMP WITH TIME ZONE to accept timezone-aware
datetimes from asyncpg without raising DataError."""
ts = PreciseTimestamp()
dialect = postgresql.dialect()
impl = ts.load_dialect_impl(dialect)
assert isinstance(impl, postgresql.TIMESTAMP)
assert impl.timezone is True

def test_postgresql_not_timestamp_without_timezone(self):
"""Regression test: PostgreSQL must NOT use TIMESTAMP WITHOUT TIME ZONE.

Without timezone=True, asyncpg raises:
DataError: can't subtract offset-naive and offset-aware datetimes
when inserting datetime objects with tzinfo=UTC.
"""
ts = PreciseTimestamp()
dialect = postgresql.dialect()
impl = ts.load_dialect_impl(dialect)
# Ensure it's not the default TIMESTAMP (which has timezone=False)
naive_timestamp = postgresql.TIMESTAMP()
assert naive_timestamp.timezone is False # baseline: default is False
assert impl.timezone is not naive_timestamp.timezone

def test_mysql_returns_datetime_with_fsp6(self):
"""MySQL must use DATETIME(fsp=6) for microsecond precision."""
ts = PreciseTimestamp()
dialect = mysql.dialect()
impl = ts.load_dialect_impl(dialect)
assert isinstance(impl, mysql.DATETIME)
assert impl.fsp == 6

def test_sqlite_returns_default_datetime(self):
"""SQLite falls back to the default DateTime implementation."""
ts = PreciseTimestamp()
dialect = sqlite.dialect()
impl = ts.load_dialect_impl(dialect)
assert isinstance(impl, DateTime)