From 74a2d44d0f26290d90a93a0c8059caecbb6185b5 Mon Sep 17 00:00:00 2001 From: Denis Karadas <54038084+deniskrds@users.noreply.github.com> Date: Tue, 19 May 2026 01:17:41 +0300 Subject: [PATCH 1/9] feat: add TypesenseSessionService for Typesense-backed session storage --- .../sessions/typesense_session_service.py | 538 ++++++++++++++++++ 1 file changed, 538 insertions(+) create mode 100644 src/google/adk_community/sessions/typesense_session_service.py diff --git a/src/google/adk_community/sessions/typesense_session_service.py b/src/google/adk_community/sessions/typesense_session_service.py new file mode 100644 index 00000000..09be1ec3 --- /dev/null +++ b/src/google/adk_community/sessions/typesense_session_service.py @@ -0,0 +1,538 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from __future__ import annotations + +import asyncio +import copy +import json +import logging +import time +from typing import Any +from typing import Optional +import uuid + +from google.adk.errors.already_exists_error import AlreadyExistsError +from google.adk.events.event import Event +from google.adk.sessions.base_session_service import BaseSessionService +from google.adk.sessions.base_session_service import GetSessionConfig +from google.adk.sessions.base_session_service import ListSessionsResponse +from google.adk.sessions.session import Session +from google.adk.sessions.state import State +from typing_extensions import override + +logger = logging.getLogger('google_adk.' + __name__) + +_SEP = '||' + +_SESSIONS_SCHEMA = { + 'fields': [ + {'name': 'app_name', 'type': 'string', 'facet': True}, + {'name': 'user_id', 'type': 'string', 'facet': True}, + {'name': 'session_id', 'type': 'string', 'facet': True}, + {'name': 'state', 'type': 'string', 'index': False}, + {'name': 'update_time', 'type': 'float'}, + ], +} + +_EVENTS_SCHEMA = { + 'fields': [ + {'name': 'app_name', 'type': 'string', 'facet': True}, + {'name': 'user_id', 'type': 'string', 'facet': True}, + {'name': 'session_id', 'type': 'string', 'facet': True}, + {'name': 'timestamp', 'type': 'float'}, + {'name': 'event_data', 'type': 'string', 'index': False}, + ], +} + +_APP_STATES_SCHEMA = { + 'fields': [ + {'name': 'app_name', 'type': 'string', 'facet': True}, + {'name': 'state', 'type': 'string', 'index': False}, + {'name': 'update_time', 'type': 'float'}, + ], +} + +_USER_STATES_SCHEMA = { + 'fields': [ + {'name': 'app_name', 'type': 'string', 'facet': True}, + {'name': 'user_id', 'type': 'string', 'facet': True}, + {'name': 'state', 'type': 'string', 'index': False}, + {'name': 'update_time', 'type': 'float'}, + ], +} + + +def _session_doc_id(app_name: str, user_id: str, session_id: str) -> str: + return f'{app_name}{_SEP}{user_id}{_SEP}{session_id}' + + +def _event_doc_id( + app_name: str, user_id: str, session_id: str, event_id: str +) -> str: + return f'{app_name}{_SEP}{user_id}{_SEP}{session_id}{_SEP}{event_id}' + + +def _user_state_doc_id(app_name: str, user_id: str) -> str: + return f'{app_name}{_SEP}{user_id}' + + +def _extract_state_delta( + state: dict[str, Any], +) -> dict[str, dict[str, Any]]: + """Split a flat state dict into app, user, and session-scoped sub-dicts.""" + deltas: dict[str, dict[str, Any]] = {'app': {}, 'user': {}, 'session': {}} + for key, value in state.items(): + if key.startswith(State.APP_PREFIX): + deltas['app'][key.removeprefix(State.APP_PREFIX)] = value + elif key.startswith(State.USER_PREFIX): + deltas['user'][key.removeprefix(State.USER_PREFIX)] = value + elif not key.startswith(State.TEMP_PREFIX): + deltas['session'][key] = value + return deltas + + +def _merge_state( + app_state: dict[str, Any], + user_state: dict[str, Any], + session_state: dict[str, Any], +) -> dict[str, Any]: + merged = copy.deepcopy(session_state) + for key, value in app_state.items(): + merged[State.APP_PREFIX + key] = value + for key, value in user_state.items(): + merged[State.USER_PREFIX + key] = value + return merged + + +class TypesenseSessionService(BaseSessionService): + """A session service that uses Typesense for persistent storage. + + Requires the ``typesense`` extra:: + + pip install google-adk-community[typesense] + + Four Typesense collections are managed automatically (created on first use): + + * ``{prefix}_sessions`` + * ``{prefix}_events`` + * ``{prefix}_app_states`` + * ``{prefix}_user_states`` + + Note: ``app_name``, ``user_id``, and ``session_id`` must not contain the + string ``'||'`` which is used as an internal document-ID separator. + """ + + def __init__( + self, + *, + host: str = 'localhost', + port: int = 8108, + protocol: str = 'http', + api_key: str, + collection_prefix: str = 'adk', + ): + try: + import typesense + from typesense.exceptions import ObjectNotFound + + self._typesense = typesense + self._ObjectNotFound = ObjectNotFound + except ImportError as e: + raise ImportError( + 'TypesenseSessionService requires the typesense package.' + ' Install it with: pip install google-adk-community[typesense]' + ) from e + + self._client = typesense.Client({ + 'nodes': [{'host': host, 'port': str(port), 'protocol': protocol}], + 'api_key': api_key, + 'connection_timeout_seconds': 5, + }) + + p = collection_prefix + self._sessions_col = f'{p}_sessions' + self._events_col = f'{p}_events' + self._app_states_col = f'{p}_app_states' + self._user_states_col = f'{p}_user_states' + + self._collections_ready = False + self._collections_lock = asyncio.Lock() + + async def _ensure_collections(self) -> None: + if self._collections_ready: + return + async with self._collections_lock: + if self._collections_ready: + return + await asyncio.to_thread(self._ensure_collections_sync) + self._collections_ready = True + + def _ensure_collections_sync(self) -> None: + for schema_template, col_name in [ + (_SESSIONS_SCHEMA, self._sessions_col), + (_EVENTS_SCHEMA, self._events_col), + (_APP_STATES_SCHEMA, self._app_states_col), + (_USER_STATES_SCHEMA, self._user_states_col), + ]: + try: + self._client.collections[col_name].retrieve() + except self._ObjectNotFound: + schema = dict(schema_template) + schema['name'] = col_name + self._client.collections.create(schema) + + def _get_doc_sync( + self, collection: str, doc_id: str + ) -> Optional[dict[str, Any]]: + try: + return self._client.collections[collection].documents[doc_id].retrieve() + except self._ObjectNotFound: + return None + + async def _search_all( + self, collection: str, filter_by: str, sort_by: Optional[str] = None + ) -> list[dict[str, Any]]: + """Return all documents matching filter_by, handling pagination.""" + params: dict[str, Any] = { + 'q': '*', + 'query_by': 'app_name', + 'filter_by': filter_by, + 'per_page': 250, + } + if sort_by: + params['sort_by'] = sort_by + + docs: list[dict[str, Any]] = [] + page = 1 + while True: + params['page'] = page + result = await asyncio.to_thread( + self._client.collections[collection].documents.search, params + ) + hits = result.get('hits', []) + docs.extend(hit['document'] for hit in hits) + if len(hits) < 250: + break + page += 1 + return docs + + async def _get_app_state(self, app_name: str) -> dict[str, Any]: + doc = await asyncio.to_thread( + self._get_doc_sync, self._app_states_col, app_name + ) + return json.loads(doc['state']) if doc else {} + + async def _get_user_state( + self, app_name: str, user_id: str + ) -> dict[str, Any]: + doc = await asyncio.to_thread( + self._get_doc_sync, + self._user_states_col, + _user_state_doc_id(app_name, user_id), + ) + return json.loads(doc['state']) if doc else {} + + async def _upsert_app_state( + self, app_name: str, delta: dict[str, Any], now: float + ) -> None: + existing = await self._get_app_state(app_name) + merged = existing | delta + await asyncio.to_thread( + self._client.collections[self._app_states_col].documents.upsert, + { + 'id': app_name, + 'app_name': app_name, + 'state': json.dumps(merged), + 'update_time': now, + }, + ) + + async def _upsert_user_state( + self, app_name: str, user_id: str, delta: dict[str, Any], now: float + ) -> None: + existing = await self._get_user_state(app_name, user_id) + merged = existing | delta + await asyncio.to_thread( + self._client.collections[self._user_states_col].documents.upsert, + { + 'id': _user_state_doc_id(app_name, user_id), + 'app_name': app_name, + 'user_id': user_id, + 'state': json.dumps(merged), + 'update_time': now, + }, + ) + + @override + async def create_session( + self, + *, + app_name: str, + user_id: str, + state: Optional[dict[str, Any]] = None, + session_id: Optional[str] = None, + ) -> Session: + await self._ensure_collections() + + session_id = (session_id or '').strip() or str(uuid.uuid4()) + now = time.time() + doc_id = _session_doc_id(app_name, user_id, session_id) + + existing = await asyncio.to_thread( + self._get_doc_sync, self._sessions_col, doc_id + ) + if existing: + raise AlreadyExistsError(f'Session with id {session_id} already exists.') + + state_deltas = _extract_state_delta(state or {}) + app_state_delta = state_deltas['app'] + user_state_delta = state_deltas['user'] + session_state = state_deltas['session'] + + if app_state_delta: + await self._upsert_app_state(app_name, app_state_delta, now) + if user_state_delta: + await self._upsert_user_state(app_name, user_id, user_state_delta, now) + + storage_app_state = await self._get_app_state(app_name) + storage_user_state = await self._get_user_state(app_name, user_id) + + await asyncio.to_thread( + self._client.collections[self._sessions_col].documents.create, + { + 'id': doc_id, + 'app_name': app_name, + 'user_id': user_id, + 'session_id': session_id, + 'state': json.dumps(session_state), + 'update_time': now, + }, + ) + + merged_state = _merge_state( + storage_app_state, storage_user_state, session_state + ) + return Session( + app_name=app_name, + user_id=user_id, + id=session_id, + state=merged_state, + events=[], + last_update_time=now, + ) + + @override + async def get_session( + self, + *, + app_name: str, + user_id: str, + session_id: str, + config: Optional[GetSessionConfig] = None, + ) -> Optional[Session]: + await self._ensure_collections() + + doc_id = _session_doc_id(app_name, user_id, session_id) + session_doc = await asyncio.to_thread( + self._get_doc_sync, self._sessions_col, doc_id + ) + if session_doc is None: + return None + + session_state = json.loads(session_doc['state']) + last_update_time = session_doc['update_time'] + + events: list[Event] = [] + if not (config and config.num_recent_events == 0): + filter_by = ( + f'app_name:={app_name}' + f' && user_id:={user_id}' + f' && session_id:={session_id}' + ) + if config and config.after_timestamp: + filter_by += f' && timestamp:>={config.after_timestamp}' + + if config and config.num_recent_events is not None: + params: dict[str, Any] = { + 'q': '*', + 'query_by': 'app_name', + 'filter_by': filter_by, + 'sort_by': 'timestamp:desc', + 'per_page': min(config.num_recent_events, 250), + 'page': 1, + } + result = await asyncio.to_thread( + self._client.collections[self._events_col].documents.search, + params, + ) + event_docs = [hit['document'] for hit in result.get('hits', [])] + else: + event_docs = await self._search_all( + self._events_col, + filter_by, + sort_by='timestamp:desc', + ) + + events = [ + Event.model_validate_json(doc['event_data']) + for doc in reversed(event_docs) + ] + + app_state = await self._get_app_state(app_name) + user_state = await self._get_user_state(app_name, user_id) + merged_state = _merge_state(app_state, user_state, session_state) + + return Session( + app_name=app_name, + user_id=user_id, + id=session_id, + state=merged_state, + events=events, + last_update_time=last_update_time, + ) + + @override + async def list_sessions( + self, *, app_name: str, user_id: Optional[str] = None + ) -> ListSessionsResponse: + await self._ensure_collections() + + filter_by = f'app_name:={app_name}' + if user_id: + filter_by += f' && user_id:={user_id}' + + session_docs = await self._search_all(self._sessions_col, filter_by) + app_state = await self._get_app_state(app_name) + + user_states_map: dict[str, dict[str, Any]] = {} + if user_id: + user_states_map[user_id] = await self._get_user_state(app_name, user_id) + else: + us_docs = await self._search_all( + self._user_states_col, f'app_name:={app_name}' + ) + for doc in us_docs: + user_states_map[doc['user_id']] = json.loads(doc['state']) + + sessions = [] + for doc in session_docs: + uid = doc['user_id'] + merged_state = _merge_state( + app_state, + user_states_map.get(uid, {}), + json.loads(doc['state']), + ) + sessions.append( + Session( + app_name=app_name, + user_id=uid, + id=doc['session_id'], + state=merged_state, + events=[], + last_update_time=doc['update_time'], + ) + ) + return ListSessionsResponse(sessions=sessions) + + @override + async def delete_session( + self, *, app_name: str, user_id: str, session_id: str + ) -> None: + await self._ensure_collections() + + events_filter = ( + f'app_name:={app_name}' + f' && user_id:={user_id}' + f' && session_id:={session_id}' + ) + await asyncio.to_thread( + self._client.collections[self._events_col].documents.delete, + {'filter_by': events_filter}, + ) + + doc_id = _session_doc_id(app_name, user_id, session_id) + try: + await asyncio.to_thread( + self._client.collections[self._sessions_col].documents[doc_id].delete + ) + except self._ObjectNotFound: + pass + + @override + async def append_event(self, session: Session, event: Event) -> Event: + if event.partial: + return event + + await self._ensure_collections() + + self._apply_temp_state(session, event) + event = self._trim_temp_delta_state(event) + event_timestamp = event.timestamp + + doc_id = _session_doc_id(session.app_name, session.user_id, session.id) + session_doc = await asyncio.to_thread( + self._get_doc_sync, self._sessions_col, doc_id + ) + if session_doc is None: + raise ValueError(f'Session {session.id} not found.') + + if session_doc['update_time'] > session.last_update_time: + raise ValueError( + 'The session has been modified in storage since it was loaded.' + ' Please reload the session before appending more events.' + ) + + session_state_update: dict[str, Any] = {'update_time': event_timestamp} + + if event.actions and event.actions.state_delta: + state_deltas = _extract_state_delta(event.actions.state_delta) + if state_deltas['app']: + await self._upsert_app_state( + session.app_name, state_deltas['app'], event_timestamp + ) + if state_deltas['user']: + await self._upsert_user_state( + session.app_name, + session.user_id, + state_deltas['user'], + event_timestamp, + ) + if state_deltas['session']: + current_state = json.loads(session_doc['state']) + session_state_update['state'] = json.dumps( + current_state | state_deltas['session'] + ) + + await asyncio.to_thread( + self._client.collections[self._sessions_col].documents[doc_id].update, + session_state_update, + ) + + await asyncio.to_thread( + self._client.collections[self._events_col].documents.create, + { + 'id': _event_doc_id( + session.app_name, session.user_id, session.id, event.id + ), + 'app_name': session.app_name, + 'user_id': session.user_id, + 'session_id': session.id, + 'timestamp': event.timestamp, + 'event_data': event.model_dump_json(exclude_none=True), + }, + ) + + session.last_update_time = event_timestamp + await super().append_event(session=session, event=event) + return event From 68c335893d2533411febc158ab02e3d5afbac738 Mon Sep 17 00:00:00 2001 From: Denis Karadas <54038084+deniskrds@users.noreply.github.com> Date: Tue, 19 May 2026 01:18:05 +0300 Subject: [PATCH 2/9] feat: add lazy import for TypesenseSessionService in sessions __init__ --- src/google/adk_community/sessions/__init__.py | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/src/google/adk_community/sessions/__init__.py b/src/google/adk_community/sessions/__init__.py index 90bf28d7..efb9469e 100644 --- a/src/google/adk_community/sessions/__init__.py +++ b/src/google/adk_community/sessions/__init__.py @@ -16,4 +16,12 @@ from .redis_session_service import RedisSessionService -__all__ = ["RedisSessionService"] +__all__ = ["RedisSessionService", "TypesenseSessionService"] + + +def __getattr__(name: str): + if name == "TypesenseSessionService": + from .typesense_session_service import TypesenseSessionService + + return TypesenseSessionService + raise AttributeError(f"module {__name__!r} has no attribute {name!r}") From 3bfaf04ea4ef5d532d8859aa70cc9863d79d8ffc Mon Sep 17 00:00:00 2001 From: Denis Karadas <54038084+deniskrds@users.noreply.github.com> Date: Tue, 19 May 2026 01:18:24 +0300 Subject: [PATCH 3/9] feat: add typesense optional dependency --- pyproject.toml | 1 + 1 file changed, 1 insertion(+) diff --git a/pyproject.toml b/pyproject.toml index 95341de6..8c519edc 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -51,6 +51,7 @@ sdc-agents = [ "sdc-agents>=4.3.3; python_version >= '3.11'", ] spraay = ["web3>=6.0.0"] +typesense = ["typesense>=0.20.0"] [tool.pyink] From 21543dc5b33276daa908cd193838b2acde65103b Mon Sep 17 00:00:00 2001 From: Denis Karadas <54038084+deniskrds@users.noreply.github.com> Date: Tue, 19 May 2026 01:18:44 +0300 Subject: [PATCH 4/9] test: add unit tests for TypesenseSessionService --- .../test_typesense_session_service.py | 483 ++++++++++++++++++ 1 file changed, 483 insertions(+) create mode 100644 tests/unittests/sessions/test_typesense_session_service.py diff --git a/tests/unittests/sessions/test_typesense_session_service.py b/tests/unittests/sessions/test_typesense_session_service.py new file mode 100644 index 00000000..3f6002c8 --- /dev/null +++ b/tests/unittests/sessions/test_typesense_session_service.py @@ -0,0 +1,483 @@ +# Copyright 2025 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import asyncio +import json +from unittest.mock import AsyncMock +from unittest.mock import MagicMock +from unittest.mock import patch + +from google.adk.events.event import Event +from google.adk.events.event_actions import EventActions +from google.adk.sessions.base_session_service import GetSessionConfig +from google.adk.sessions.session import Session +import pytest +import pytest_asyncio + +from google.adk_community.sessions.typesense_session_service import _session_doc_id +from google.adk_community.sessions.typesense_session_service import _user_state_doc_id +from google.adk_community.sessions.typesense_session_service import TypesenseSessionService + + +def _make_typesense_mock(): + """Build a minimal mock of the typesense module and its Client.""" + mock_ts = MagicMock() + + class _FakeObjectNotFound(Exception): + pass + + mock_ts.exceptions.ObjectNotFound = _FakeObjectNotFound + mock_client = MagicMock() + mock_ts.Client.return_value = mock_client + return mock_ts, mock_client, _FakeObjectNotFound + + +def _make_service(mock_ts, mock_client, ObjectNotFound): + """Instantiate TypesenseSessionService bypassing __init__ for unit tests.""" + svc = TypesenseSessionService.__new__(TypesenseSessionService) + svc._typesense = mock_ts + svc._ObjectNotFound = ObjectNotFound + svc._client = mock_client + svc._sessions_col = 'adk_sessions' + svc._events_col = 'adk_events' + svc._app_states_col = 'adk_app_states' + svc._user_states_col = 'adk_user_states' + svc._collections_ready = True # skip bootstrap in unit tests + svc._collections_lock = asyncio.Lock() + return svc + + +def _make_session_doc( + app_name, user_id, session_id, state=None, update_time=1000.0 +): + doc_id = _session_doc_id(app_name, user_id, session_id) + return { + 'id': doc_id, + 'app_name': app_name, + 'user_id': user_id, + 'session_id': session_id, + 'state': json.dumps(state or {}), + 'update_time': update_time, + } + + +@pytest_asyncio.fixture +def ts_mock(): + return _make_typesense_mock() + + +@pytest_asyncio.fixture +def service(ts_mock): + mock_ts, mock_client, ObjectNotFound = ts_mock + return _make_service(mock_ts, mock_client, ObjectNotFound) + + +class TestTypesenseSessionService: + + @pytest.mark.asyncio + async def test_create_session_returns_session(self, service, ts_mock): + _, mock_client, _ = ts_mock + service._get_doc_sync = MagicMock(return_value=None) + mock_client.collections.__getitem__.return_value.documents.create = ( + MagicMock() + ) + mock_client.collections.__getitem__.return_value.documents.upsert = ( + MagicMock() + ) + + session = await service.create_session( + app_name='my_app', + user_id='user1', + state={'key': 'value'}, + session_id='sess1', + ) + + assert session.app_name == 'my_app' + assert session.user_id == 'user1' + assert session.id == 'sess1' + assert session.state.get('key') == 'value' + assert session.events == [] + + @pytest.mark.asyncio + async def test_create_session_generates_id_when_none(self, service, ts_mock): + _, mock_client, _ = ts_mock + service._get_doc_sync = MagicMock(return_value=None) + mock_client.collections.__getitem__.return_value.documents.create = ( + MagicMock() + ) + + session = await service.create_session(app_name='app', user_id='u') + + assert session.id # auto-generated UUID + + @pytest.mark.asyncio + async def test_create_session_raises_if_already_exists(self, service): + existing_doc = _make_session_doc('app', 'u', 'sess1') + service._get_doc_sync = MagicMock(return_value=existing_doc) + + from google.adk.errors.already_exists_error import AlreadyExistsError + + with pytest.raises(AlreadyExistsError): + await service.create_session( + app_name='app', user_id='u', session_id='sess1' + ) + + @pytest.mark.asyncio + async def test_create_session_splits_state_scopes(self, service, ts_mock): + _, mock_client, _ = ts_mock + upserted: list[dict] = [] + service._get_doc_sync = MagicMock(return_value=None) + mock_client.collections.__getitem__.return_value.documents.upsert = ( + MagicMock(side_effect=lambda doc: upserted.append(doc)) + ) + mock_client.collections.__getitem__.return_value.documents.create = ( + MagicMock() + ) + + await service.create_session( + app_name='app', + user_id='u', + session_id='s', + state={ + 'app:shared': 'app_val', + 'user:pref': 'user_val', + 'local': 'session_val', + 'temp:scratch': 'ignored', + }, + ) + + assert len(upserted) == 2 + app_upsert = next(d for d in upserted if 'user_id' not in d) + assert json.loads(app_upsert['state']) == {'shared': 'app_val'} + user_upsert = next(d for d in upserted if 'user_id' in d) + assert json.loads(user_upsert['state']) == {'pref': 'user_val'} + + @pytest.mark.asyncio + async def test_get_session_returns_none_if_missing(self, service): + service._get_doc_sync = MagicMock(return_value=None) + + result = await service.get_session( + app_name='app', user_id='u', session_id='nonexistent' + ) + assert result is None + + @pytest.mark.asyncio + async def test_get_session_merges_state_scopes(self, service, ts_mock): + _, mock_client, _ = ts_mock + + session_doc = _make_session_doc('app', 'u', 's', state={'local': 'v'}) + app_state_doc = { + 'id': 'app', + 'app_name': 'app', + 'state': json.dumps({'shared': 'app_val'}), + 'update_time': 999.0, + } + user_state_doc = { + 'id': 'app||u', + 'app_name': 'app', + 'user_id': 'u', + 'state': json.dumps({'pref': 'user_val'}), + 'update_time': 999.0, + } + + docs = { + (service._sessions_col, _session_doc_id('app', 'u', 's')): session_doc, + (service._app_states_col, 'app'): app_state_doc, + ( + service._user_states_col, + _user_state_doc_id('app', 'u'), + ): user_state_doc, + } + service._get_doc_sync = MagicMock( + side_effect=lambda col, did: docs.get((col, did)) + ) + mock_client.collections.__getitem__.return_value.documents.search = ( + MagicMock(return_value={'hits': []}) + ) + + session = await service.get_session( + app_name='app', user_id='u', session_id='s' + ) + + assert session is not None + assert session.state['local'] == 'v' + assert session.state['app:shared'] == 'app_val' + assert session.state['user:pref'] == 'user_val' + + @pytest.mark.asyncio + async def test_get_session_with_num_recent_events(self, service, ts_mock): + _, mock_client, _ = ts_mock + + session_doc = _make_session_doc('app', 'u', 's') + service._get_doc_sync = MagicMock( + side_effect=lambda col, did: session_doc + if col == service._sessions_col + else None + ) + + event_json = Event(author='user', timestamp=1.0).model_dump_json( + exclude_none=True + ) + mock_client.collections.__getitem__.return_value.documents.search = ( + MagicMock( + return_value={'hits': [{'document': {'event_data': event_json}}]} + ) + ) + + config = GetSessionConfig(num_recent_events=3) + session = await service.get_session( + app_name='app', user_id='u', session_id='s', config=config + ) + + assert len(session.events) == 1 + params = mock_client.collections.__getitem__.return_value.documents.search.call_args[ + 0 + ][ + 0 + ] + assert params['per_page'] == 3 + assert params['sort_by'] == 'timestamp:desc' + + @pytest.mark.asyncio + async def test_get_session_num_recent_events_zero_skips_search( + self, service, ts_mock + ): + _, mock_client, _ = ts_mock + + session_doc = _make_session_doc('app', 'u', 's') + service._get_doc_sync = MagicMock( + side_effect=lambda col, did: session_doc + if col == service._sessions_col + else None + ) + + config = GetSessionConfig(num_recent_events=0) + session = await service.get_session( + app_name='app', user_id='u', session_id='s', config=config + ) + + assert session.events == [] + mock_client.collections.__getitem__.return_value.documents.search.assert_not_called() + + @pytest.mark.asyncio + async def test_list_sessions_returns_all_for_app(self, service): + docs = [ + _make_session_doc('app', 'u1', 's1'), + _make_session_doc('app', 'u2', 's2'), + ] + + async def fake_search_all(collection, filter_by, sort_by=None): + return docs if collection == service._sessions_col else [] + + service._search_all = fake_search_all + service._get_app_state = AsyncMock(return_value={}) + service._get_user_state = AsyncMock(return_value={}) + + response = await service.list_sessions(app_name='app') + + assert len(response.sessions) == 2 + assert {s.id for s in response.sessions} == {'s1', 's2'} + for s in response.sessions: + assert s.events == [] + + @pytest.mark.asyncio + async def test_list_sessions_filtered_by_user(self, service): + docs = [_make_session_doc('app', 'u1', 's1')] + + async def fake_search_all(collection, filter_by, sort_by=None): + if collection == service._sessions_col: + assert 'u1' in filter_by + return docs + return [] + + service._search_all = fake_search_all + service._get_app_state = AsyncMock(return_value={}) + service._get_user_state = AsyncMock(return_value={}) + + response = await service.list_sessions(app_name='app', user_id='u1') + + assert len(response.sessions) == 1 + assert response.sessions[0].user_id == 'u1' + + @pytest.mark.asyncio + async def test_delete_session_removes_events_and_doc(self, service, ts_mock): + _, mock_client, _ = ts_mock + mock_client.collections.__getitem__.return_value.documents.delete = ( + MagicMock() + ) + mock_client.collections.__getitem__.return_value.documents.__getitem__.return_value.delete = ( + MagicMock() + ) + + await service.delete_session(app_name='app', user_id='u', session_id='s') + + mock_client.collections.__getitem__.return_value.documents.delete.assert_called_once() + mock_client.collections.__getitem__.return_value.documents.__getitem__.return_value.delete.assert_called_once() + + @pytest.mark.asyncio + async def test_delete_session_ignores_missing_doc(self, service, ts_mock): + _, mock_client, ObjectNotFound = ts_mock + mock_client.collections.__getitem__.return_value.documents.delete = ( + MagicMock() + ) + mock_client.collections.__getitem__.return_value.documents.__getitem__.return_value.delete = MagicMock( + side_effect=ObjectNotFound() + ) + + # Must not raise + await service.delete_session( + app_name='app', user_id='u', session_id='missing' + ) + + @pytest.mark.asyncio + async def test_append_event_persists_event(self, service, ts_mock): + _, mock_client, _ = ts_mock + session_doc = _make_session_doc('app', 'u', 's', update_time=1.0) + service._get_doc_sync = MagicMock(return_value=session_doc) + mock_client.collections.__getitem__.return_value.documents.__getitem__.return_value.update = ( + MagicMock() + ) + mock_client.collections.__getitem__.return_value.documents.create = ( + MagicMock() + ) + + session = Session( + app_name='app', + user_id='u', + id='s', + state={}, + events=[], + last_update_time=1.0, + ) + event = Event(author='user', timestamp=2.0) + + result = await service.append_event(session=session, event=event) + + assert result is event + assert session.last_update_time == 2.0 + assert len(session.events) == 1 + mock_client.collections.__getitem__.return_value.documents.create.assert_called_once() + + @pytest.mark.asyncio + async def test_append_event_skips_partial_events(self, service, ts_mock): + _, mock_client, _ = ts_mock + session = Session( + app_name='app', + user_id='u', + id='s', + state={}, + events=[], + last_update_time=1.0, + ) + event = Event(author='user', timestamp=2.0, partial=True) + + result = await service.append_event(session=session, event=event) + + assert result is event + mock_client.collections.__getitem__.return_value.documents.create.assert_not_called() + + @pytest.mark.asyncio + async def test_append_event_raises_on_stale_session(self, service): + session_doc = _make_session_doc('app', 'u', 's', update_time=5.0) + service._get_doc_sync = MagicMock(return_value=session_doc) + + session = Session( + app_name='app', + user_id='u', + id='s', + state={}, + events=[], + last_update_time=1.0, + ) + event = Event(author='user', timestamp=6.0) + + with pytest.raises(ValueError, match='modified in storage'): + await service.append_event(session=session, event=event) + + @pytest.mark.asyncio + async def test_append_event_raises_if_session_missing(self, service): + service._get_doc_sync = MagicMock(return_value=None) + session = Session( + app_name='app', + user_id='u', + id='ghost', + state={}, + events=[], + last_update_time=1.0, + ) + event = Event(author='user', timestamp=2.0) + + with pytest.raises(ValueError, match='not found'): + await service.append_event(session=session, event=event) + + @pytest.mark.asyncio + async def test_append_event_updates_all_state_scopes(self, service, ts_mock): + _, mock_client, _ = ts_mock + session_doc = _make_session_doc('app', 'u', 's', update_time=1.0) + service._get_doc_sync = MagicMock(return_value=session_doc) + mock_client.collections.__getitem__.return_value.documents.__getitem__.return_value.update = ( + MagicMock() + ) + mock_client.collections.__getitem__.return_value.documents.create = ( + MagicMock() + ) + + upserted: list[dict] = [] + mock_client.collections.__getitem__.return_value.documents.upsert = ( + MagicMock(side_effect=lambda doc: upserted.append(doc)) + ) + + session = Session( + app_name='app', + user_id='u', + id='s', + state={}, + events=[], + last_update_time=1.0, + ) + event = Event( + author='user', + timestamp=2.0, + actions=EventActions( + state_delta={ + 'app:shared': 'app_val', + 'user:pref': 'user_val', + 'local': 'session_val', + 'temp:scratch': 'ignored', + } + ), + ) + + await service.append_event(session=session, event=event) + + assert len(upserted) == 2 + app_u = next(d for d in upserted if 'user_id' not in d) + assert json.loads(app_u['state']) == {'shared': 'app_val'} + user_u = next(d for d in upserted if 'user_id' in d) + assert json.loads(user_u['state']) == {'pref': 'user_val'} + + update_call = ( + mock_client.collections.__getitem__.return_value.documents.__getitem__.return_value.update.call_args + ) + updated_fields = update_call[0][0] + if 'state' in updated_fields: + persisted = json.loads(updated_fields['state']) + assert 'temp:scratch' not in persisted + + def test_missing_typesense_raises_import_error(self): + with patch.dict( + 'sys.modules', {'typesense': None, 'typesense.exceptions': None} + ): + with pytest.raises(ImportError, match='typesense'): + TypesenseSessionService(api_key='x') From 99c91023d871bc1f779caf1a939c17312692e1d8 Mon Sep 17 00:00:00 2001 From: Denis Karadas <54038084+deniskrds@users.noreply.github.com> Date: Tue, 19 May 2026 01:47:24 +0300 Subject: [PATCH 5/9] fix: prevent lost updates and edge case failures in TypesenseSessionService --- .../sessions/typesense_session_service.py | 134 +++++++++++------- .../test_typesense_session_service.py | 2 + 2 files changed, 86 insertions(+), 50 deletions(-) diff --git a/src/google/adk_community/sessions/typesense_session_service.py b/src/google/adk_community/sessions/typesense_session_service.py index 09be1ec3..23375bb1 100644 --- a/src/google/adk_community/sessions/typesense_session_service.py +++ b/src/google/adk_community/sessions/typesense_session_service.py @@ -73,6 +73,14 @@ } +def _validate_no_sep(*values: str) -> None: + for v in values: + if _SEP in v: + raise ValueError( + f'app_name, user_id, and session_id must not contain {_SEP!r}.' + ) + + def _session_doc_id(app_name: str, user_id: str, session_id: str) -> str: return f'{app_name}{_SEP}{user_id}{_SEP}{session_id}' @@ -116,21 +124,27 @@ def _merge_state( class TypesenseSessionService(BaseSessionService): - """A session service that uses Typesense for persistent storage. + """Persistent session storage backed by Typesense. - Requires the ``typesense`` extra:: - - pip install google-adk-community[typesense] - - Four Typesense collections are managed automatically (created on first use): + Stores sessions, events, and shared app/user state in four Typesense + collections that are created automatically on first use: * ``{prefix}_sessions`` * ``{prefix}_events`` * ``{prefix}_app_states`` * ``{prefix}_user_states`` - Note: ``app_name``, ``user_id``, and ``session_id`` must not contain the - string ``'||'`` which is used as an internal document-ID separator. + **Installation**:: + + pip install google-adk-community[typesense] + + **Constraints** + + * ``app_name``, ``user_id``, and ``session_id`` must not contain ``'||'`` + (used as an internal document-ID separator). + * App and user state updates are serialized per-key within a single process. + Multi-process deployments sharing the same Typesense instance may still + lose concurrent state updates because Typesense has no native transactions. """ def __init__( @@ -141,6 +155,7 @@ def __init__( protocol: str = 'http', api_key: str, collection_prefix: str = 'adk', + connection_timeout_seconds: int = 5, ): try: import typesense @@ -157,17 +172,18 @@ def __init__( self._client = typesense.Client({ 'nodes': [{'host': host, 'port': str(port), 'protocol': protocol}], 'api_key': api_key, - 'connection_timeout_seconds': 5, + 'connection_timeout_seconds': connection_timeout_seconds, }) - p = collection_prefix - self._sessions_col = f'{p}_sessions' - self._events_col = f'{p}_events' - self._app_states_col = f'{p}_app_states' - self._user_states_col = f'{p}_user_states' + self._sessions_col = f'{collection_prefix}_sessions' + self._events_col = f'{collection_prefix}_events' + self._app_states_col = f'{collection_prefix}_app_states' + self._user_states_col = f'{collection_prefix}_user_states' self._collections_ready = False self._collections_lock = asyncio.Lock() + self._app_state_locks: dict[str, asyncio.Lock] = {} + self._user_state_locks: dict[str, asyncio.Lock] = {} async def _ensure_collections(self) -> None: if self._collections_ready: @@ -246,33 +262,38 @@ async def _get_user_state( async def _upsert_app_state( self, app_name: str, delta: dict[str, Any], now: float ) -> None: - existing = await self._get_app_state(app_name) - merged = existing | delta - await asyncio.to_thread( - self._client.collections[self._app_states_col].documents.upsert, - { - 'id': app_name, - 'app_name': app_name, - 'state': json.dumps(merged), - 'update_time': now, - }, - ) + lock = self._app_state_locks.setdefault(app_name, asyncio.Lock()) + async with lock: + existing = await self._get_app_state(app_name) + merged = existing | delta + await asyncio.to_thread( + self._client.collections[self._app_states_col].documents.upsert, + { + 'id': app_name, + 'app_name': app_name, + 'state': json.dumps(merged), + 'update_time': now, + }, + ) async def _upsert_user_state( self, app_name: str, user_id: str, delta: dict[str, Any], now: float ) -> None: - existing = await self._get_user_state(app_name, user_id) - merged = existing | delta - await asyncio.to_thread( - self._client.collections[self._user_states_col].documents.upsert, - { - 'id': _user_state_doc_id(app_name, user_id), - 'app_name': app_name, - 'user_id': user_id, - 'state': json.dumps(merged), - 'update_time': now, - }, - ) + key = _user_state_doc_id(app_name, user_id) + lock = self._user_state_locks.setdefault(key, asyncio.Lock()) + async with lock: + existing = await self._get_user_state(app_name, user_id) + merged = existing | delta + await asyncio.to_thread( + self._client.collections[self._user_states_col].documents.upsert, + { + 'id': key, + 'app_name': app_name, + 'user_id': user_id, + 'state': json.dumps(merged), + 'update_time': now, + }, + ) @override async def create_session( @@ -284,6 +305,7 @@ async def create_session( session_id: Optional[str] = None, ) -> Session: await self._ensure_collections() + _validate_no_sep(app_name, user_id) session_id = (session_id or '').strip() or str(uuid.uuid4()) now = time.time() @@ -342,6 +364,7 @@ async def get_session( config: Optional[GetSessionConfig] = None, ) -> Optional[Session]: await self._ensure_collections() + _validate_no_sep(app_name, user_id, session_id) doc_id = _session_doc_id(app_name, user_id, session_id) session_doc = await asyncio.to_thread( @@ -364,19 +387,28 @@ async def get_session( filter_by += f' && timestamp:>={config.after_timestamp}' if config and config.num_recent_events is not None: - params: dict[str, Any] = { - 'q': '*', - 'query_by': 'app_name', - 'filter_by': filter_by, - 'sort_by': 'timestamp:desc', - 'per_page': min(config.num_recent_events, 250), - 'page': 1, - } - result = await asyncio.to_thread( - self._client.collections[self._events_col].documents.search, - params, - ) - event_docs = [hit['document'] for hit in result.get('hits', [])] + event_docs = [] + page = 1 + remaining = config.num_recent_events + while remaining > 0: + params: dict[str, Any] = { + 'q': '*', + 'query_by': 'app_name', + 'filter_by': filter_by, + 'sort_by': 'timestamp:desc', + 'per_page': min(remaining, 250), + 'page': page, + } + result = await asyncio.to_thread( + self._client.collections[self._events_col].documents.search, + params, + ) + hits = result.get('hits', []) + event_docs.extend(hit['document'] for hit in hits) + if len(hits) < min(remaining, 250): + break + remaining -= len(hits) + page += 1 else: event_docs = await self._search_all( self._events_col, @@ -407,6 +439,7 @@ async def list_sessions( self, *, app_name: str, user_id: Optional[str] = None ) -> ListSessionsResponse: await self._ensure_collections() + _validate_no_sep(app_name, *(([user_id]) if user_id else [])) filter_by = f'app_name:={app_name}' if user_id: @@ -450,6 +483,7 @@ async def delete_session( self, *, app_name: str, user_id: str, session_id: str ) -> None: await self._ensure_collections() + _validate_no_sep(app_name, user_id, session_id) events_filter = ( f'app_name:={app_name}' diff --git a/tests/unittests/sessions/test_typesense_session_service.py b/tests/unittests/sessions/test_typesense_session_service.py index 3f6002c8..61b75bff 100644 --- a/tests/unittests/sessions/test_typesense_session_service.py +++ b/tests/unittests/sessions/test_typesense_session_service.py @@ -55,6 +55,8 @@ def _make_service(mock_ts, mock_client, ObjectNotFound): svc._user_states_col = 'adk_user_states' svc._collections_ready = True # skip bootstrap in unit tests svc._collections_lock = asyncio.Lock() + svc._app_state_locks = {} + svc._user_state_locks = {} return svc From c446a95566276c3a2d908f7e1ca2a7974a3d426d Mon Sep 17 00:00:00 2001 From: Denis Karadas <54038084+deniskrds@users.noreply.github.com> Date: Tue, 19 May 2026 01:52:12 +0300 Subject: [PATCH 6/9] fix: validate session_id in create_session and catch duplicate-create race --- .../sessions/typesense_session_service.py | 29 +++++++++++-------- 1 file changed, 17 insertions(+), 12 deletions(-) diff --git a/src/google/adk_community/sessions/typesense_session_service.py b/src/google/adk_community/sessions/typesense_session_service.py index 23375bb1..f9053b4b 100644 --- a/src/google/adk_community/sessions/typesense_session_service.py +++ b/src/google/adk_community/sessions/typesense_session_service.py @@ -159,10 +159,12 @@ def __init__( ): try: import typesense + from typesense.exceptions import ObjectAlreadyExists from typesense.exceptions import ObjectNotFound self._typesense = typesense self._ObjectNotFound = ObjectNotFound + self._ObjectAlreadyExists = ObjectAlreadyExists except ImportError as e: raise ImportError( 'TypesenseSessionService requires the typesense package.' @@ -305,9 +307,9 @@ async def create_session( session_id: Optional[str] = None, ) -> Session: await self._ensure_collections() - _validate_no_sep(app_name, user_id) session_id = (session_id or '').strip() or str(uuid.uuid4()) + _validate_no_sep(app_name, user_id, session_id) now = time.time() doc_id = _session_doc_id(app_name, user_id, session_id) @@ -330,17 +332,20 @@ async def create_session( storage_app_state = await self._get_app_state(app_name) storage_user_state = await self._get_user_state(app_name, user_id) - await asyncio.to_thread( - self._client.collections[self._sessions_col].documents.create, - { - 'id': doc_id, - 'app_name': app_name, - 'user_id': user_id, - 'session_id': session_id, - 'state': json.dumps(session_state), - 'update_time': now, - }, - ) + try: + await asyncio.to_thread( + self._client.collections[self._sessions_col].documents.create, + { + 'id': doc_id, + 'app_name': app_name, + 'user_id': user_id, + 'session_id': session_id, + 'state': json.dumps(session_state), + 'update_time': now, + }, + ) + except self._ObjectAlreadyExists: + raise AlreadyExistsError(f'Session with id {session_id} already exists.') merged_state = _merge_state( storage_app_state, storage_user_state, session_state From 925544adc14e5dbbcfa88fab28857894a08ebae6 Mon Sep 17 00:00:00 2001 From: Denis Karadas <54038084+deniskrds@users.noreply.github.com> Date: Tue, 19 May 2026 01:55:35 +0300 Subject: [PATCH 7/9] fix: deep-copy schema template in _ensure_collections_sync --- src/google/adk_community/sessions/typesense_session_service.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/google/adk_community/sessions/typesense_session_service.py b/src/google/adk_community/sessions/typesense_session_service.py index f9053b4b..17439fbb 100644 --- a/src/google/adk_community/sessions/typesense_session_service.py +++ b/src/google/adk_community/sessions/typesense_session_service.py @@ -206,7 +206,7 @@ def _ensure_collections_sync(self) -> None: try: self._client.collections[col_name].retrieve() except self._ObjectNotFound: - schema = dict(schema_template) + schema = copy.deepcopy(schema_template) schema['name'] = col_name self._client.collections.create(schema) From 15a41816f8b3845a41b407460ffd691481be0296 Mon Sep 17 00:00:00 2001 From: Denis Karadas <54038084+deniskrds@users.noreply.github.com> Date: Thu, 21 May 2026 01:35:06 +0300 Subject: [PATCH 8/9] fix: optimize list_sessions user state fetching and document lock memory tradeoff --- .../sessions/typesense_session_service.py | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/src/google/adk_community/sessions/typesense_session_service.py b/src/google/adk_community/sessions/typesense_session_service.py index 17439fbb..21f0c427 100644 --- a/src/google/adk_community/sessions/typesense_session_service.py +++ b/src/google/adk_community/sessions/typesense_session_service.py @@ -145,6 +145,10 @@ class TypesenseSessionService(BaseSessionService): * App and user state updates are serialized per-key within a single process. Multi-process deployments sharing the same Typesense instance may still lose concurrent state updates because Typesense has no native transactions. + * Per-key asyncio locks are retained for the lifetime of the service + instance and are not evicted. For workloads with a bounded set of + apps and users this is fine; if the process sees an unbounded stream + of unique IDs without restarts, the dicts grow monotonically. """ def __init__( @@ -454,14 +458,9 @@ async def list_sessions( app_state = await self._get_app_state(app_name) user_states_map: dict[str, dict[str, Any]] = {} - if user_id: - user_states_map[user_id] = await self._get_user_state(app_name, user_id) - else: - us_docs = await self._search_all( - self._user_states_col, f'app_name:={app_name}' - ) - for doc in us_docs: - user_states_map[doc['user_id']] = json.loads(doc['state']) + unique_user_ids = {doc['user_id'] for doc in session_docs} + for uid in unique_user_ids: + user_states_map[uid] = await self._get_user_state(app_name, uid) sessions = [] for doc in session_docs: From 55433984596e96a590d9a7b2981d6c3a3d72a28e Mon Sep 17 00:00:00 2001 From: Denis Karadas <54038084+deniskrds@users.noreply.github.com> Date: Thu, 21 May 2026 01:37:04 +0300 Subject: [PATCH 9/9] refactor: simplify function call --- .../adk_community/sessions/typesense_session_service.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/google/adk_community/sessions/typesense_session_service.py b/src/google/adk_community/sessions/typesense_session_service.py index 21f0c427..a41b808e 100644 --- a/src/google/adk_community/sessions/typesense_session_service.py +++ b/src/google/adk_community/sessions/typesense_session_service.py @@ -448,7 +448,9 @@ async def list_sessions( self, *, app_name: str, user_id: Optional[str] = None ) -> ListSessionsResponse: await self._ensure_collections() - _validate_no_sep(app_name, *(([user_id]) if user_id else [])) + _validate_no_sep(app_name) + if user_id: + _validate_no_sep(user_id) filter_by = f'app_name:={app_name}' if user_id: