Skip to content

Database

ShaerWare edited this page Mar 3, 2026 · 6 revisions

Database (База данных)

Система использует SQLite в качестве основной СУБД через async SQLAlchemy + aiosqlite. Три независимых файла БД:

База данных Путь Назначение
Основная data/secretary.db Пользователи, сессии, чаты, боты, виджеты, CRM, канбан и т.д.
Telegram Sales data/sales.db (single) / data/sales_{bot_id}.db (multi) Воронка продаж Telegram-бота (пользователи, события, discovery, платежи)
WhatsApp Sales data/wa_sales.db (single) / data/wa_sales_{instance_id}.db (multi) Воронка продаж WhatsApp-бота (пользователи, события, discovery)

Миграция legacy-путей: До v1.1 Telegram sales DB по умолчанию создавалась в корне проекта (sales.db). Теперь все БД хранятся в data/. При старте бота старый файл автоматически мигрирует в data/sales.db вместе с WAL/SHM сайдкарами.

PRAGMA-инициализация

При подключении к каждой базе данных автоматически устанавливаются оптимальные PRAGMA:

PRAGMA Значение Назначение
journal_mode WAL Конкурентное чтение во время записи (вместо полной блокировки)
foreign_keys ON Включение проверки внешних ключей и ON DELETE CASCADE
busy_timeout 5000 5 секунд ожидания при блокировке (вместо мгновенной ошибки)
synchronous NORMAL Оптимальный баланс скорости и надёжности для WAL-режима

Основная БД (SQLAlchemy)

PRAGMA устанавливаются через event listener на engine.sync_engine:

# db/database.py
@event.listens_for(engine.sync_engine, "connect")
def _set_sqlite_pragmas(dbapi_conn, connection_record):
    cursor = dbapi_conn.cursor()
    cursor.execute("PRAGMA journal_mode=WAL")
    cursor.execute("PRAGMA foreign_keys=ON")
    cursor.execute("PRAGMA busy_timeout=5000")
    cursor.execute("PRAGMA synchronous=NORMAL")
    cursor.close()

С StaticPool (единственное постоянное соединение) listener срабатывает ровно 1 раз.

Sales БД (aiosqlite)

PRAGMA устанавливаются напрямую в SalesDatabase.init():

# telegram_bot/sales/database.py, whatsapp_bot/sales/database.py
await self._db.execute("PRAGMA journal_mode=WAL")
await self._db.execute("PRAGMA foreign_keys=ON")
await self._db.execute("PRAGMA busy_timeout=5000")
await self._db.execute("PRAGMA synchronous=NORMAL")

WAL-режим

Write-Ahead Logging (WAL) — режим журналирования, который позволяет читателям не блокироваться при записи. Особенности:

  • При WAL появляются дополнительные файлы: secretary.db-wal и secretary.db-shm
  • При корректном завершении все три БД выполняют PRAGMA wal_checkpoint(TRUNCATE):
    • Основная: close_db() в db/database.py
    • Sales (Telegram/WhatsApp): SalesDatabase.close() в соответствующих sales/database.py
  • Бэкапы безопасны: BackupService автоматически вызывает WAL checkpoint перед копированием каждого .db-файла и включает -wal/-shm сайдкары как safety net (см. Backup)

SQLITE_BUSY retry

SQLite WAL-режим с busy_timeout=5000 обрабатывает большинство конкурентных записей автоматически. Если 5 секунд ожидания исчерпаны (высокая нагрузка), OperationalError("database is locked") обрабатывается декоратором @retry_on_busy():

Параметр Значение
Модуль db/retry.py
Макс. попыток 3 (+ первая)
Задержка Экспоненциальная: 0.1s → 0.2s → 0.4s
Логирование WARNING на каждый retry, ERROR при исчерпании
Метрика busy_retry_count в /health

Декоратор применён ко всем 16 write-методам в db/integration.py:

  • AsyncChatManager — 10 методов (create/update/delete session, add/edit/delete message, branch, switch)
  • AsyncTelegramSessionManager.set_session
  • AsyncAuditLogger.log
  • AsyncPaymentManager.log_payment
  • AsyncUserSessionManager — 3 метода (create, revoke, cleanup)

Каждый менеджер-метод = полная транзакция (async with AsyncSessionLocal(): ... commit()), поэтому retry автоматически получает свежую сессию.

Health Check

Эндпоинт GET /health возвращает расширенную информацию о БД. Запрос обёрнут в asyncio.wait_for(timeout=5s) — при залоченной БД вернёт {"status": "timeout"} вместо зависания.

{
  "database": {
    "sqlite": {
      "status": "ok",
      "path": "/app/data/secretary.db",
      "size_bytes": 3489792,
      "size_mb": 3.33,
      "wal_size_bytes": 0,
      "integrity": true,
      "pragma": {
        "journal_mode": "wal",
        "foreign_keys": true,
        "busy_timeout": 5000
      },
      "busy_retry_count": 0
    }
  }
}
Поле Описание
integrity PRAGMA integrity_check(1) — быстрая проверка (только первая ошибка)
wal_size_bytes Размер WAL-файла (0 = данные перенесены в основной файл)
pragma Текущие PRAGMA (если не совпадают с ожидаемыми — проблема инициализации)
busy_retry_count Кумулятивное число SQLITE_BUSY retries с момента старта

Обслуживание

Периодический VACUUM

SQLite не возвращает место на диск после удалений. VACUUM пересобирает файл БД, освобождая неиспользуемое пространство.

  • Расписание: background task в orchestrator.py — первый запуск через 24 часа, затем каждые 7 дней
  • Реализация: db/database.py:run_vacuum() через AUTOCOMMIT isolation (VACUUM нельзя выполнять в транзакции)
  • Лог: "Database VACUUM completed in X.Xms"

Graceful Shutdown

close_db() выполняет PRAGMA wal_checkpoint(TRUNCATE) перед engine.dispose() — все данные из WAL переносятся в основной файл, WAL обнуляется.

Unit of Work (транзакции)

Основная БД использует паттерн Unit of Work: репозитории только делают flush() (отправляют SQL в БД без коммита), а коммит выполняет вызывающий код — менеджер (db/integration.py), роутер или сервис.

┌──────────────────────────────────────────┐
│ Caller (manager / router / service)      │
│  async with AsyncSessionLocal() as s:    │
│      repo = SomeRepo(s)                  │
│      await repo.create(entity)  # flush  │
│      await repo.update(other)   # flush  │
│      await session.commit()     # ← ✅   │
└──────────────────────────────────────────┘

Зачем?

  • Атомарность: несколько операций коммитятся как одна транзакция — либо все, либо ни одна
  • Консистентность: данные не попадают в БД частично при ошибке между операциями
  • Контроль: вызывающий код решает, когда фиксировать изменения

Правила

Слой flush() commit() rollback()
Репозитории (db/repositories/) Да Нет Нет
BaseRepository (create, update, delete) Да (автоматически) Нет Нет
Менеджеры (db/integration.py) Нет Да (после всех операций) Нет (обрабатывается session context)
Роутеры (app/routers/) Нет Да (write-блоки с AsyncSessionLocal) Нет
get_async_session() (FastAPI DI) Нет Да (автоматически при успехе) Да (при исключении)
get_session_context() Нет Да (автоматически при успехе) Да (при исключении)

Пример

# db/repositories/user.py — только flush
async def create(self, entity):
    self.session.add(entity)
    await self.session.flush()  # SQL INSERT, но не зафиксирован
    return entity

# db/integration.py — коммит в менеджере
async def create_user(self, ...):
    async with AsyncSessionLocal() as session:
        repo = UserRepository(session)
        user = await repo.create(User(...))
        await session.commit()  # ← фиксация транзакции
        return user.to_dict()

Антипаттерны

# ❌ НЕЛЬЗЯ: коммит в репозитории
async def create(self, entity):
    self.session.add(entity)
    await self.session.commit()  # нарушает Unit of Work

# ❌ НЕЛЬЗЯ: забыть коммит в роутере
async with AsyncSessionLocal() as session:
    repo = SomeRepo(session)
    await repo.create(entity)
    # нет commit → данные потеряются!

Модели

53 SQLAlchemy модели разделены по 16 доменным файлам modules/{domain}/models.py. Base определён в db/database.py. db/models.py — фасад-реэкспорт для backward compatibility.

Домен Файл Модели
core modules/core/models.py User, UserIdentity, UserSession, Role, RolePermission, Workspace, WorkspaceMember, WorkspaceInvite, SystemConfig
chat modules/chat/models.py ChatSession, ChatMessage, ChatSessionShare, ResourceShare
telegram modules/channels/telegram/models.py BotInstance, TelegramSession + 13 Bot*-моделей
whatsapp modules/channels/whatsapp/models.py WhatsAppInstance
widget modules/channels/widget/models.py WidgetInstance
llm modules/llm/models.py CloudLLMProvider, LLMPreset
knowledge modules/knowledge/models.py KnowledgeCollection, KnowledgeDocument, FAQEntry, GitHubRepoProject
speech modules/speech/models.py TTSPreset
monitoring modules/monitoring/models.py AuditLog, UsageLog, UsageLimits
crm modules/crm/models.py AmoCRMConfig, AmoCRMSyncLog
ecommerce modules/ecommerce/models.py WooCommerceConfig
kanban modules/kanban/models.py KanbanProject, KanbanTask, KanbanTaskDependency, KanbanChecklistItem
claude_code modules/claude_code/models.py ClaudeCodeSession, ClaudeCodeProject
sales modules/sales/models.py PaymentLog
admin modules/admin/models.py UserConsent
telephony modules/telephony/models.py GSMCallLog, GSMSMSLog

Добавление новой модели

  1. Создать/редактировать modules/{domain}/models.pyfrom db.database import Base
  2. Добавить реэкспорт в db/models.py (import + __all__)
  3. Создать Alembic-миграцию: alembic revision --autogenerate -m "description"

Подробнее: PR #499, issue #491

Миграции

Три системы миграций:

Система Когда использовать
Alembic (предпочтительно) Schema changes: ALTER TABLE, новые таблицы, индексы
scripts/migrate_*.py Data migrations: бэкфиллы, переносы данных, ренейм значений
Base.metadata.create_all Автоматически при старте — создаёт отсутствующие таблицы (не изменяет существующие)

Важно: Новые data migration скрипты обязаны использовать шаблон scripts/_migration_template.py — он обеспечивает BEGIN IMMEDIATE / ROLLBACK при ошибке (нет частичных миграций). Миграции запускать только при остановленном приложении.

# Alembic (schema changes)
alembic upgrade head
alembic revision --autogenerate -m "description"
alembic history

# Data migrations (из шаблона)
cp scripts/_migration_template.py scripts/migrate_<name>.py
# Отредактировать migrate(), затем:
python scripts/migrate_<name>.py   # транзакция, откат при ошибке

Диагностика

Проверка PRAGMA

# Внутри Docker
docker exec ai-secretary python -c "
import asyncio
from db.database import AsyncSessionLocal, init_db
from sqlalchemy import text

async def check():
    await init_db()
    async with AsyncSessionLocal() as s:
        for p in ['journal_mode', 'foreign_keys', 'busy_timeout', 'synchronous']:
            r = (await s.execute(text(f'PRAGMA {p}'))).scalar()
            print(f'{p} = {r}')

asyncio.run(check())
"

Ожидаемый вывод:

journal_mode = wal
foreign_keys = 1
busy_timeout = 5000
synchronous = 1

Проверка целостности

sqlite3 data/secretary.db "PRAGMA integrity_check;"
# Ответ: ok

Проверка FK

sqlite3 data/secretary.db "PRAGMA foreign_key_check;"
# Пустой вывод = нет нарушений FK

Тесты

65 тестов в tests/unit/:

pytest tests/unit/ -v -o asyncio_mode=auto

SQLITE_BUSY retry тесты (test_retry_on_busy.py)

Тест Что проверяет
test_succeeds_on_first_try Нет retry при отсутствии ошибки
test_retries_on_locked Fail 2× → success, счётчик +2
test_exhausted_reraises Все retry исчерпаны → оригинальное исключение
test_non_busy_error_not_retried Другие OperationalError не ретраятся
test_get_busy_retry_count Getter счётчика работает
test_void_return Декоратор работает с None-returning функциями
test_passes_args_and_kwargs Аргументы пробрасываются корректно

Unit of Work тесты (test_unit_of_work.py)

Тест Что проверяет
test_repo_does_not_commit Repo flush без commit → rollback теряет данные
test_repo_flush_visible_in_session Flush делает данные видимыми в пределах сессии
test_commit_persists_data Commit сохраняет данные после закрытия сессии
test_multi_op_atomicity Несколько операций коммитятся/откатываются атомарно
test_repo_delete_does_not_commit Rollback отменяет удаление через repo
test_get_async_session_commits_on_success get_async_session() авто-коммитит при успехе
test_get_async_session_rollbacks_on_exception get_async_session() авто-откатывает при ошибке
test_get_session_context_commits get_session_context() авто-коммитит при успехе
test_get_session_context_rollbacks get_session_context() авто-откатывает при ошибке
test_base_repo_update_flush_with_autoflush_false update() делает flush при autoflush=False

PRAGMA тесты (test_sqlite_pragmas.py)

Тест Что проверяет
test_main_engine_pragmas Все 4 PRAGMA значения после init
test_pragmas_persist_across_sessions PRAGMA сохраняются между сессиями (StaticPool)
test_foreign_keys_enforced INSERT с невалидным FK вызывает IntegrityError
test_wal_checkpoint_on_close WAL checkpoint очищает WAL-файл
test_event_listener_fires_once_with_static_pool Listener вызывается ровно 1 раз
test_telegram_sales_db_pragmas PRAGMA в Telegram sales DB
test_whatsapp_sales_db_pragmas PRAGMA в WhatsApp sales DB
test_sales_db_fk_enforced FK enforcement в sales DB

Бэкап тесты (test_backup_safety.py)

Тест Что проверяет
test_checkpoint_wal_clears_wal_file WAL checkpoint обнуляет WAL-файл
test_checkpoint_wal_nonexistent_db Checkpoint на несуществующей БД не падает
test_backup_includes_sales_dbs ZIP содержит main DB + все sales DBs
test_backup_includes_wal_sidecars ZIP содержит непустые WAL/SHM сайдкары
test_backup_skips_empty_sidecars Пустые сайдкары не включаются
test_manifest_version_1_1 Manifest version = 1.1
test_restore_cleans_stale_wal Restore удаляет stale WAL/SHM
test_restore_handles_sales_dbs Restore распаковывает sales DBs
test_restore_creates_bak_files Restore создаёт .bak копии
test_get_system_info_includes_sales_dbs system_info содержит sales DBs

Sales DB тесты (test_sales_db_close.py)

Тест Что проверяет
test_telegram_sales_close_checkpoints close() делает WAL checkpoint
test_whatsapp_sales_close_checkpoints close() делает WAL checkpoint
test_telegram_sales_default_path Default = data/sales.db
test_whatsapp_sales_default_path Default = data/wa_sales.db
test_telegram_legacy_migration sales.db мигрирует в data/sales.db
test_telegram_close_idempotent Двойной close() не падает

Backup | Wiki-RAG

Clone this wiki locally