From 13e828a8e861dbdb3bfb0b6f6d72afe36ff9cacc Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Fri, 15 May 2026 10:45:28 +0100 Subject: [PATCH 1/8] build: add sqlc store generation --- internal/store/sqlc/queries.sql | 375 +++++++ internal/store/sqlc/schema.sql | 163 +++ internal/store/storedb/db.go | 31 + internal/store/storedb/models.go | 169 +++ internal/store/storedb/queries.sql.go | 1376 +++++++++++++++++++++++++ scripts/generate-sqlc.sh | 4 + sqlc.yaml | 14 + 7 files changed, 2132 insertions(+) create mode 100644 internal/store/sqlc/queries.sql create mode 100644 internal/store/sqlc/schema.sql create mode 100644 internal/store/storedb/db.go create mode 100644 internal/store/storedb/models.go create mode 100644 internal/store/storedb/queries.sql.go create mode 100755 scripts/generate-sqlc.sh create mode 100644 sqlc.yaml diff --git a/internal/store/sqlc/queries.sql b/internal/store/sqlc/queries.sql new file mode 100644 index 0000000..5512893 --- /dev/null +++ b/internal/store/sqlc/queries.sql @@ -0,0 +1,375 @@ +-- name: GetSyncState :one +select coalesce(cursor, '') as cursor +from sync_state +where scope = ?; + +-- name: SetSyncState :exec +insert into sync_state(scope, cursor, updated_at) +values(?, ?, ?) +on conflict(scope) do update set + cursor = excluded.cursor, + updated_at = excluded.updated_at; + +-- name: DeleteSyncState :exec +delete from sync_state +where scope = ?; + +-- name: ChannelMessageBounds :one +select cast(coalesce(min(id), '') as text) as oldest_id, + cast(coalesce(max(id), '') as text) as newest_id +from messages +where channel_id = ?; + +-- name: UpdateAttachmentMedia :exec +update message_attachments +set media_path = ?, + content_sha256 = ?, + content_size = ?, + fetched_at = ?, + fetch_status = ?, + fetch_error = ?, + updated_at = ? +where attachment_id = ?; + +-- name: UpdateAttachmentFetchStatus :exec +update message_attachments +set fetched_at = ?, + fetch_status = ?, + fetch_error = ?, + updated_at = ? +where attachment_id = ?; + +-- name: CountEmbeddingBacklog :one +select count(*) as count +from embedding_jobs +where state = 'pending'; + +-- name: HasMessageEmbeddings :one +select exists( + select 1 + from message_embeddings + where provider = ? + and model = ? + and input_version = ? +) as present; + +-- name: CountGuilds :one +select count(*) as count from guilds; + +-- name: CountChannels :one +select count(*) as count from channels; + +-- name: CountMessages :one +select count(*) as count from messages; + +-- name: CountMembers :one +select count(*) as count from members; + +-- name: CountThreads :one +select count(*) as count +from channels +where kind like 'thread_%'; + +-- name: GetSyncUpdatedAt :one +select coalesce(updated_at, '') as updated_at +from sync_state +where scope = ?; + +-- name: GetGuildName :one +select name +from guilds +where id = ?; + +-- name: ListGuildIDs :many +select id +from guilds +order by id; + +-- name: CountChannelsByGuild :one +select count(*) as count +from channels +where guild_id = ?; + +-- name: CountMembersByGuild :one +select count(*) as count +from members +where guild_id = ?; + +-- name: ListMembers :many +select guild_id, user_id, username, coalesce(global_name, '') as global_name, + coalesce(display_name, '') as display_name, coalesce(nick, '') as nick, + coalesce(discriminator, '') as discriminator, coalesce(avatar, '') as avatar, + role_ids_json, bot, coalesce(joined_at, '') as joined_at, raw_json +from members +order by coalesce(nullif(display_name, ''), nullif(nick, ''), nullif(global_name, ''), username), username +limit ?; + +-- name: ListMembersByGuild :many +select guild_id, user_id, username, coalesce(global_name, '') as global_name, + coalesce(display_name, '') as display_name, coalesce(nick, '') as nick, + coalesce(discriminator, '') as discriminator, coalesce(avatar, '') as avatar, + role_ids_json, bot, coalesce(joined_at, '') as joined_at, raw_json +from members +where guild_id = ? +order by coalesce(nullif(display_name, ''), nullif(nick, ''), nullif(global_name, ''), username), username +limit ?; + +-- name: ListMembersByUserID :many +select guild_id, user_id, username, coalesce(global_name, '') as global_name, + coalesce(display_name, '') as display_name, coalesce(nick, '') as nick, + coalesce(discriminator, '') as discriminator, coalesce(avatar, '') as avatar, + role_ids_json, bot, coalesce(joined_at, '') as joined_at, raw_json +from members +where user_id = ? +order by guild_id, username; + +-- name: ListChannels :many +select id, guild_id, coalesce(parent_id, '') as parent_id, kind, name, + coalesce(topic, '') as topic, position, is_nsfw, is_archived, + is_locked, is_private_thread, coalesce(thread_parent_id, '') as thread_parent_id, + coalesce(archive_timestamp, '') as archive_timestamp +from channels +order by guild_id, position, name; + +-- name: ListChannelsByGuild :many +select id, guild_id, coalesce(parent_id, '') as parent_id, kind, name, + coalesce(topic, '') as topic, position, is_nsfw, is_archived, + is_locked, is_private_thread, coalesce(thread_parent_id, '') as thread_parent_id, + coalesce(archive_timestamp, '') as archive_timestamp +from channels +where guild_id = ? +order by guild_id, position, name; + +-- name: ListIncompleteMessageChannelIDs :many +select c.id +from channels c +where c.kind in ('text', 'news', 'announcement', 'thread_public', 'thread_private', 'thread_news', 'thread_announcement') + and not exists ( + select 1 + from sync_state s + where s.scope = 'channel:' || c.id || ':history_complete' + ) + and not exists ( + select 1 + from sync_state s + where s.scope = 'channel:' || c.id || ':unavailable' + ) +order by c.id; + +-- name: ListIncompleteMessageChannelIDsByGuild :many +select c.id +from channels c +where c.kind in ('text', 'news', 'announcement', 'thread_public', 'thread_private', 'thread_news', 'thread_announcement') + and c.guild_id = ? + and not exists ( + select 1 + from sync_state s + where s.scope = 'channel:' || c.id || ':history_complete' + ) + and not exists ( + select 1 + from sync_state s + where s.scope = 'channel:' || c.id || ':unavailable' + ) +order by c.id; + +-- name: UpsertGuild :exec +insert into guilds(id, name, icon, raw_json, updated_at) +values(?, ?, ?, ?, ?) +on conflict(id) do update set + name = excluded.name, + icon = excluded.icon, + raw_json = excluded.raw_json, + updated_at = excluded.updated_at; + +-- name: UpsertChannel :exec +insert into channels( + id, guild_id, parent_id, kind, name, topic, position, is_nsfw, + is_archived, is_locked, is_private_thread, thread_parent_id, + archive_timestamp, raw_json, updated_at +) values(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) +on conflict(id) do update set + guild_id = excluded.guild_id, + parent_id = excluded.parent_id, + kind = excluded.kind, + name = excluded.name, + topic = excluded.topic, + position = excluded.position, + is_nsfw = excluded.is_nsfw, + is_archived = excluded.is_archived, + is_locked = excluded.is_locked, + is_private_thread = excluded.is_private_thread, + thread_parent_id = excluded.thread_parent_id, + archive_timestamp = excluded.archive_timestamp, + raw_json = excluded.raw_json, + updated_at = excluded.updated_at; + +-- name: DeleteMembersByGuild :exec +delete from members +where guild_id = ?; + +-- name: InsertMember :exec +insert into members( + guild_id, user_id, username, global_name, display_name, nick, discriminator, + avatar, bot, joined_at, role_ids_json, raw_json, updated_at +) values(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?); + +-- name: UpsertMember :exec +insert into members( + guild_id, user_id, username, global_name, display_name, nick, discriminator, + avatar, bot, joined_at, role_ids_json, raw_json, updated_at +) values(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) +on conflict(guild_id, user_id) do update set + username = excluded.username, + global_name = excluded.global_name, + display_name = excluded.display_name, + nick = excluded.nick, + discriminator = excluded.discriminator, + avatar = excluded.avatar, + bot = excluded.bot, + joined_at = excluded.joined_at, + role_ids_json = excluded.role_ids_json, + raw_json = excluded.raw_json, + updated_at = excluded.updated_at; + +-- name: DeleteMember :exec +delete from members +where guild_id = ? and user_id = ?; + +-- name: DeleteOrphanChannels :exec +delete from channels +where channels.guild_id = ? + and not exists ( + select 1 + from messages + where messages.channel_id = channels.id + ); + +-- name: DeleteEmbeddingJobsByGuild :exec +delete from embedding_jobs +where message_id in (select id from messages where guild_id = ?); + +-- name: DeleteMessageEmbeddingsByGuild :exec +delete from message_embeddings +where message_id in (select id from messages where guild_id = ?); + +-- name: DeleteMessageEventsByGuild :exec +delete from message_events +where guild_id = ?; + +-- name: DeleteAttachmentsByGuild :exec +delete from message_attachments +where guild_id = ?; + +-- name: DeleteMentionEventsByGuild :exec +delete from mention_events +where guild_id = ?; + +-- name: DeleteMessagesByGuild :exec +delete from messages +where guild_id = ?; + +-- name: DeleteChannelsByGuild :exec +delete from channels +where guild_id = ?; + +-- name: DeleteGuild :exec +delete from guilds +where id = ?; + +-- name: GetMessageNormalizedContent :one +select normalized_content +from messages +where id = ?; + +-- name: CountEmbeddingJobsByMessage :one +select count(*) as count +from embedding_jobs +where message_id = ?; + +-- name: UpsertMessage :exec +insert into messages( + id, guild_id, channel_id, author_id, message_type, created_at, edited_at, deleted_at, + content, normalized_content, reply_to_message_id, pinned, has_attachments, raw_json, updated_at +) values(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) +on conflict(id) do update set + guild_id = excluded.guild_id, + channel_id = excluded.channel_id, + author_id = excluded.author_id, + message_type = excluded.message_type, + created_at = excluded.created_at, + edited_at = excluded.edited_at, + deleted_at = coalesce(excluded.deleted_at, messages.deleted_at), + content = excluded.content, + normalized_content = excluded.normalized_content, + reply_to_message_id = excluded.reply_to_message_id, + pinned = excluded.pinned, + has_attachments = excluded.has_attachments, + raw_json = excluded.raw_json, + updated_at = excluded.updated_at; + +-- name: UpsertEmbeddingJobPending :exec +insert into embedding_jobs(message_id, state, attempts, updated_at) +values(?, 'pending', 0, ?) +on conflict(message_id) do update set + state = 'pending', + attempts = 0, + last_error = '', + locked_at = null, + updated_at = excluded.updated_at; + +-- name: MarkMessageDeleted :exec +update messages +set deleted_at = ?, updated_at = ? +where id = ?; + +-- name: InsertMessageEvent :exec +insert into message_events(guild_id, channel_id, message_id, event_type, event_at, payload_json) +values(?, ?, ?, ?, ?, ?); + +-- name: ListExistingAttachmentMedia :many +select attachment_id, coalesce(media_path, '') as media_path, + coalesce(content_sha256, '') as content_sha256, content_size, + coalesce(fetched_at, '') as fetched_at, coalesce(fetch_status, '') as fetch_status, + coalesce(fetch_error, '') as fetch_error +from message_attachments +where message_id = ?; + +-- name: DeleteAttachmentsByMessage :exec +delete from message_attachments +where message_id = ?; + +-- name: InsertMessageAttachment :exec +insert into message_attachments( + attachment_id, message_id, guild_id, channel_id, author_id, filename, + content_type, size, url, proxy_url, text_content, media_path, content_sha256, + content_size, fetched_at, fetch_status, fetch_error, updated_at +) values(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?); + +-- name: DeleteMentionEventsByMessage :exec +delete from mention_events +where message_id = ?; + +-- name: InsertMentionEvent :exec +insert into mention_events( + message_id, guild_id, channel_id, author_id, target_type, target_id, target_name, event_at +) values(?, ?, ?, ?, ?, ?, ?, ?); + +-- name: InsertMissingEmbeddingJobs :exec +insert or ignore into embedding_jobs( + message_id, state, attempts, provider, model, input_version, last_error, locked_at, updated_at +) +select id, 'pending', 0, ?, ?, ?, '', null, ? +from messages; + +-- name: RequeueAllEmbeddingJobs :execrows +update embedding_jobs +set state = 'pending', + attempts = 0, + provider = ?, + model = ?, + input_version = ?, + last_error = '', + locked_at = null, + updated_at = ? +where message_id in (select id from messages); diff --git a/internal/store/sqlc/schema.sql b/internal/store/sqlc/schema.sql new file mode 100644 index 0000000..8f07184 --- /dev/null +++ b/internal/store/sqlc/schema.sql @@ -0,0 +1,163 @@ +create table guilds ( + id text primary key, + name text not null, + icon text, + raw_json text not null, + updated_at text not null +); + +create table channels ( + id text primary key, + guild_id text not null, + parent_id text, + kind text not null, + name text not null, + topic text, + position integer, + is_nsfw integer not null default 0, + is_archived integer not null default 0, + is_locked integer not null default 0, + is_private_thread integer not null default 0, + thread_parent_id text, + archive_timestamp text, + raw_json text not null, + updated_at text not null +); + +create table members ( + guild_id text not null, + user_id text not null, + username text not null, + global_name text, + display_name text, + nick text, + discriminator text, + avatar text, + bot integer not null default 0, + joined_at text, + role_ids_json text not null, + raw_json text not null, + updated_at text not null, + bio text not null default '', + pronouns text not null default '', + location text not null default '', + website text not null default '', + x_handle text not null default '', + github_login text not null default '', + urls_json text not null default '[]', + profile_updated_at text, + primary key (guild_id, user_id) +); + +create table messages ( + id text primary key, + guild_id text not null, + channel_id text not null, + author_id text, + message_type integer not null, + created_at text not null, + edited_at text, + deleted_at text, + content text not null, + normalized_content text not null, + reply_to_message_id text, + pinned integer not null default 0, + has_attachments integer not null default 0, + raw_json text not null, + updated_at text not null +); + +create table message_events ( + event_id integer primary key autoincrement, + guild_id text not null, + channel_id text not null, + message_id text not null, + event_type text not null, + event_at text not null, + payload_json text not null +); + +create table message_attachments ( + attachment_id text primary key, + message_id text not null, + guild_id text not null, + channel_id text not null, + author_id text, + filename text not null, + content_type text, + size integer not null default 0, + url text, + proxy_url text, + text_content text not null default '', + media_path text, + content_sha256 text, + content_size integer not null default 0, + fetched_at text, + fetch_status text not null default '', + fetch_error text not null default '', + updated_at text not null +); + +create table mention_events ( + event_id integer primary key autoincrement, + message_id text not null, + guild_id text not null, + channel_id text not null, + author_id text, + target_type text not null, + target_id text not null, + target_name text not null default '', + event_at text not null +); + +create table sync_state ( + scope text primary key, + cursor text, + updated_at text not null +); + +create table embedding_jobs ( + message_id text primary key, + state text not null, + attempts integer not null default 0, + provider text not null default '', + model text not null default '', + input_version text not null default '', + last_error text not null default '', + locked_at text, + updated_at text not null +); + +create table message_embeddings ( + message_id text not null, + provider text not null, + model text not null, + input_version text not null, + dimensions integer not null, + embedding_blob blob not null, + embedded_at text not null, + primary key (message_id, provider, model, input_version) +); + +-- sqlc only needs parseable table shapes. Runtime migrations create real FTS5 +-- virtual tables and maintain rowids. +create table message_fts ( + rowid integer primary key, + message_id text, + guild_id text, + channel_id text, + author_id text, + author_name text, + channel_name text, + content text +); + +create table member_fts ( + rowid integer primary key, + member_key text, + guild_id text, + user_id text, + username text, + display_name text, + profile_text text +); diff --git a/internal/store/storedb/db.go b/internal/store/storedb/db.go new file mode 100644 index 0000000..e2fb046 --- /dev/null +++ b/internal/store/storedb/db.go @@ -0,0 +1,31 @@ +// Code generated by sqlc. DO NOT EDIT. +// versions: +// sqlc v1.31.1 + +package storedb + +import ( + "context" + "database/sql" +) + +type DBTX interface { + ExecContext(context.Context, string, ...interface{}) (sql.Result, error) + PrepareContext(context.Context, string) (*sql.Stmt, error) + QueryContext(context.Context, string, ...interface{}) (*sql.Rows, error) + QueryRowContext(context.Context, string, ...interface{}) *sql.Row +} + +func New(db DBTX) *Queries { + return &Queries{db: db} +} + +type Queries struct { + db DBTX +} + +func (q *Queries) WithTx(tx *sql.Tx) *Queries { + return &Queries{ + db: tx, + } +} diff --git a/internal/store/storedb/models.go b/internal/store/storedb/models.go new file mode 100644 index 0000000..dd10739 --- /dev/null +++ b/internal/store/storedb/models.go @@ -0,0 +1,169 @@ +// Code generated by sqlc. DO NOT EDIT. +// versions: +// sqlc v1.31.1 + +package storedb + +import ( + "database/sql" +) + +type Channel struct { + ID string + GuildID string + ParentID sql.NullString + Kind string + Name string + Topic sql.NullString + Position sql.NullInt64 + IsNsfw int64 + IsArchived int64 + IsLocked int64 + IsPrivateThread int64 + ThreadParentID sql.NullString + ArchiveTimestamp sql.NullString + RawJson string + UpdatedAt string +} + +type EmbeddingJob struct { + MessageID string + State string + Attempts int64 + Provider string + Model string + InputVersion string + LastError string + LockedAt sql.NullString + UpdatedAt string +} + +type Guild struct { + ID string + Name string + Icon sql.NullString + RawJson string + UpdatedAt string +} + +type Member struct { + GuildID string + UserID string + Username string + GlobalName sql.NullString + DisplayName sql.NullString + Nick sql.NullString + Discriminator sql.NullString + Avatar sql.NullString + Bot int64 + JoinedAt sql.NullString + RoleIdsJson string + RawJson string + UpdatedAt string + Bio string + Pronouns string + Location string + Website string + XHandle string + GithubLogin string + UrlsJson string + ProfileUpdatedAt sql.NullString +} + +type MemberFt struct { + Rowid int64 + MemberKey sql.NullString + GuildID sql.NullString + UserID sql.NullString + Username sql.NullString + DisplayName sql.NullString + ProfileText sql.NullString +} + +type MentionEvent struct { + EventID int64 + MessageID string + GuildID string + ChannelID string + AuthorID sql.NullString + TargetType string + TargetID string + TargetName string + EventAt string +} + +type Message struct { + ID string + GuildID string + ChannelID string + AuthorID sql.NullString + MessageType int64 + CreatedAt string + EditedAt sql.NullString + DeletedAt sql.NullString + Content string + NormalizedContent string + ReplyToMessageID sql.NullString + Pinned int64 + HasAttachments int64 + RawJson string + UpdatedAt string +} + +type MessageAttachment struct { + AttachmentID string + MessageID string + GuildID string + ChannelID string + AuthorID sql.NullString + Filename string + ContentType sql.NullString + Size int64 + Url sql.NullString + ProxyUrl sql.NullString + TextContent string + MediaPath sql.NullString + ContentSha256 sql.NullString + ContentSize int64 + FetchedAt sql.NullString + FetchStatus string + FetchError string + UpdatedAt string +} + +type MessageEmbedding struct { + MessageID string + Provider string + Model string + InputVersion string + Dimensions int64 + EmbeddingBlob []byte + EmbeddedAt string +} + +type MessageEvent struct { + EventID int64 + GuildID string + ChannelID string + MessageID string + EventType string + EventAt string + PayloadJson string +} + +type MessageFt struct { + Rowid int64 + MessageID sql.NullString + GuildID sql.NullString + ChannelID sql.NullString + AuthorID sql.NullString + AuthorName sql.NullString + ChannelName sql.NullString + Content sql.NullString +} + +type SyncState struct { + Scope string + Cursor sql.NullString + UpdatedAt string +} diff --git a/internal/store/storedb/queries.sql.go b/internal/store/storedb/queries.sql.go new file mode 100644 index 0000000..0d39769 --- /dev/null +++ b/internal/store/storedb/queries.sql.go @@ -0,0 +1,1376 @@ +// Code generated by sqlc. DO NOT EDIT. +// versions: +// sqlc v1.31.1 +// source: queries.sql + +package storedb + +import ( + "context" + "database/sql" +) + +const channelMessageBounds = `-- name: ChannelMessageBounds :one +select cast(coalesce(min(id), '') as text) as oldest_id, + cast(coalesce(max(id), '') as text) as newest_id +from messages +where channel_id = ? +` + +type ChannelMessageBoundsRow struct { + OldestID string + NewestID string +} + +func (q *Queries) ChannelMessageBounds(ctx context.Context, channelID string) (ChannelMessageBoundsRow, error) { + row := q.db.QueryRowContext(ctx, channelMessageBounds, channelID) + var i ChannelMessageBoundsRow + err := row.Scan(&i.OldestID, &i.NewestID) + return i, err +} + +const countChannels = `-- name: CountChannels :one +select count(*) as count from channels +` + +func (q *Queries) CountChannels(ctx context.Context) (int64, error) { + row := q.db.QueryRowContext(ctx, countChannels) + var count int64 + err := row.Scan(&count) + return count, err +} + +const countChannelsByGuild = `-- name: CountChannelsByGuild :one +select count(*) as count +from channels +where guild_id = ? +` + +func (q *Queries) CountChannelsByGuild(ctx context.Context, guildID string) (int64, error) { + row := q.db.QueryRowContext(ctx, countChannelsByGuild, guildID) + var count int64 + err := row.Scan(&count) + return count, err +} + +const countEmbeddingBacklog = `-- name: CountEmbeddingBacklog :one +select count(*) as count +from embedding_jobs +where state = 'pending' +` + +func (q *Queries) CountEmbeddingBacklog(ctx context.Context) (int64, error) { + row := q.db.QueryRowContext(ctx, countEmbeddingBacklog) + var count int64 + err := row.Scan(&count) + return count, err +} + +const countEmbeddingJobsByMessage = `-- name: CountEmbeddingJobsByMessage :one +select count(*) as count +from embedding_jobs +where message_id = ? +` + +func (q *Queries) CountEmbeddingJobsByMessage(ctx context.Context, messageID string) (int64, error) { + row := q.db.QueryRowContext(ctx, countEmbeddingJobsByMessage, messageID) + var count int64 + err := row.Scan(&count) + return count, err +} + +const countGuilds = `-- name: CountGuilds :one +select count(*) as count from guilds +` + +func (q *Queries) CountGuilds(ctx context.Context) (int64, error) { + row := q.db.QueryRowContext(ctx, countGuilds) + var count int64 + err := row.Scan(&count) + return count, err +} + +const countMembers = `-- name: CountMembers :one +select count(*) as count from members +` + +func (q *Queries) CountMembers(ctx context.Context) (int64, error) { + row := q.db.QueryRowContext(ctx, countMembers) + var count int64 + err := row.Scan(&count) + return count, err +} + +const countMembersByGuild = `-- name: CountMembersByGuild :one +select count(*) as count +from members +where guild_id = ? +` + +func (q *Queries) CountMembersByGuild(ctx context.Context, guildID string) (int64, error) { + row := q.db.QueryRowContext(ctx, countMembersByGuild, guildID) + var count int64 + err := row.Scan(&count) + return count, err +} + +const countMessages = `-- name: CountMessages :one +select count(*) as count from messages +` + +func (q *Queries) CountMessages(ctx context.Context) (int64, error) { + row := q.db.QueryRowContext(ctx, countMessages) + var count int64 + err := row.Scan(&count) + return count, err +} + +const countThreads = `-- name: CountThreads :one +select count(*) as count +from channels +where kind like 'thread_%' +` + +func (q *Queries) CountThreads(ctx context.Context) (int64, error) { + row := q.db.QueryRowContext(ctx, countThreads) + var count int64 + err := row.Scan(&count) + return count, err +} + +const deleteAttachmentsByGuild = `-- name: DeleteAttachmentsByGuild :exec +delete from message_attachments +where guild_id = ? +` + +func (q *Queries) DeleteAttachmentsByGuild(ctx context.Context, guildID string) error { + _, err := q.db.ExecContext(ctx, deleteAttachmentsByGuild, guildID) + return err +} + +const deleteAttachmentsByMessage = `-- name: DeleteAttachmentsByMessage :exec +delete from message_attachments +where message_id = ? +` + +func (q *Queries) DeleteAttachmentsByMessage(ctx context.Context, messageID string) error { + _, err := q.db.ExecContext(ctx, deleteAttachmentsByMessage, messageID) + return err +} + +const deleteChannelsByGuild = `-- name: DeleteChannelsByGuild :exec +delete from channels +where guild_id = ? +` + +func (q *Queries) DeleteChannelsByGuild(ctx context.Context, guildID string) error { + _, err := q.db.ExecContext(ctx, deleteChannelsByGuild, guildID) + return err +} + +const deleteEmbeddingJobsByGuild = `-- name: DeleteEmbeddingJobsByGuild :exec +delete from embedding_jobs +where message_id in (select id from messages where guild_id = ?) +` + +func (q *Queries) DeleteEmbeddingJobsByGuild(ctx context.Context, guildID string) error { + _, err := q.db.ExecContext(ctx, deleteEmbeddingJobsByGuild, guildID) + return err +} + +const deleteGuild = `-- name: DeleteGuild :exec +delete from guilds +where id = ? +` + +func (q *Queries) DeleteGuild(ctx context.Context, id string) error { + _, err := q.db.ExecContext(ctx, deleteGuild, id) + return err +} + +const deleteMember = `-- name: DeleteMember :exec +delete from members +where guild_id = ? and user_id = ? +` + +type DeleteMemberParams struct { + GuildID string + UserID string +} + +func (q *Queries) DeleteMember(ctx context.Context, arg DeleteMemberParams) error { + _, err := q.db.ExecContext(ctx, deleteMember, arg.GuildID, arg.UserID) + return err +} + +const deleteMembersByGuild = `-- name: DeleteMembersByGuild :exec +delete from members +where guild_id = ? +` + +func (q *Queries) DeleteMembersByGuild(ctx context.Context, guildID string) error { + _, err := q.db.ExecContext(ctx, deleteMembersByGuild, guildID) + return err +} + +const deleteMentionEventsByGuild = `-- name: DeleteMentionEventsByGuild :exec +delete from mention_events +where guild_id = ? +` + +func (q *Queries) DeleteMentionEventsByGuild(ctx context.Context, guildID string) error { + _, err := q.db.ExecContext(ctx, deleteMentionEventsByGuild, guildID) + return err +} + +const deleteMentionEventsByMessage = `-- name: DeleteMentionEventsByMessage :exec +delete from mention_events +where message_id = ? +` + +func (q *Queries) DeleteMentionEventsByMessage(ctx context.Context, messageID string) error { + _, err := q.db.ExecContext(ctx, deleteMentionEventsByMessage, messageID) + return err +} + +const deleteMessageEmbeddingsByGuild = `-- name: DeleteMessageEmbeddingsByGuild :exec +delete from message_embeddings +where message_id in (select id from messages where guild_id = ?) +` + +func (q *Queries) DeleteMessageEmbeddingsByGuild(ctx context.Context, guildID string) error { + _, err := q.db.ExecContext(ctx, deleteMessageEmbeddingsByGuild, guildID) + return err +} + +const deleteMessageEventsByGuild = `-- name: DeleteMessageEventsByGuild :exec +delete from message_events +where guild_id = ? +` + +func (q *Queries) DeleteMessageEventsByGuild(ctx context.Context, guildID string) error { + _, err := q.db.ExecContext(ctx, deleteMessageEventsByGuild, guildID) + return err +} + +const deleteMessagesByGuild = `-- name: DeleteMessagesByGuild :exec +delete from messages +where guild_id = ? +` + +func (q *Queries) DeleteMessagesByGuild(ctx context.Context, guildID string) error { + _, err := q.db.ExecContext(ctx, deleteMessagesByGuild, guildID) + return err +} + +const deleteOrphanChannels = `-- name: DeleteOrphanChannels :exec +delete from channels +where channels.guild_id = ? + and not exists ( + select 1 + from messages + where messages.channel_id = channels.id + ) +` + +func (q *Queries) DeleteOrphanChannels(ctx context.Context, guildID string) error { + _, err := q.db.ExecContext(ctx, deleteOrphanChannels, guildID) + return err +} + +const deleteSyncState = `-- name: DeleteSyncState :exec +delete from sync_state +where scope = ? +` + +func (q *Queries) DeleteSyncState(ctx context.Context, scope string) error { + _, err := q.db.ExecContext(ctx, deleteSyncState, scope) + return err +} + +const getGuildName = `-- name: GetGuildName :one +select name +from guilds +where id = ? +` + +func (q *Queries) GetGuildName(ctx context.Context, id string) (string, error) { + row := q.db.QueryRowContext(ctx, getGuildName, id) + var name string + err := row.Scan(&name) + return name, err +} + +const getMessageNormalizedContent = `-- name: GetMessageNormalizedContent :one +select normalized_content +from messages +where id = ? +` + +func (q *Queries) GetMessageNormalizedContent(ctx context.Context, id string) (string, error) { + row := q.db.QueryRowContext(ctx, getMessageNormalizedContent, id) + var normalized_content string + err := row.Scan(&normalized_content) + return normalized_content, err +} + +const getSyncState = `-- name: GetSyncState :one +select coalesce(cursor, '') as cursor +from sync_state +where scope = ? +` + +func (q *Queries) GetSyncState(ctx context.Context, scope string) (string, error) { + row := q.db.QueryRowContext(ctx, getSyncState, scope) + var cursor string + err := row.Scan(&cursor) + return cursor, err +} + +const getSyncUpdatedAt = `-- name: GetSyncUpdatedAt :one +select coalesce(updated_at, '') as updated_at +from sync_state +where scope = ? +` + +func (q *Queries) GetSyncUpdatedAt(ctx context.Context, scope string) (string, error) { + row := q.db.QueryRowContext(ctx, getSyncUpdatedAt, scope) + var updated_at string + err := row.Scan(&updated_at) + return updated_at, err +} + +const hasMessageEmbeddings = `-- name: HasMessageEmbeddings :one +select exists( + select 1 + from message_embeddings + where provider = ? + and model = ? + and input_version = ? +) as present +` + +type HasMessageEmbeddingsParams struct { + Provider string + Model string + InputVersion string +} + +func (q *Queries) HasMessageEmbeddings(ctx context.Context, arg HasMessageEmbeddingsParams) (bool, error) { + row := q.db.QueryRowContext(ctx, hasMessageEmbeddings, arg.Provider, arg.Model, arg.InputVersion) + var present bool + err := row.Scan(&present) + return present, err +} + +const insertMember = `-- name: InsertMember :exec +insert into members( + guild_id, user_id, username, global_name, display_name, nick, discriminator, + avatar, bot, joined_at, role_ids_json, raw_json, updated_at +) values(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) +` + +type InsertMemberParams struct { + GuildID string + UserID string + Username string + GlobalName sql.NullString + DisplayName sql.NullString + Nick sql.NullString + Discriminator sql.NullString + Avatar sql.NullString + Bot int64 + JoinedAt sql.NullString + RoleIdsJson string + RawJson string + UpdatedAt string +} + +func (q *Queries) InsertMember(ctx context.Context, arg InsertMemberParams) error { + _, err := q.db.ExecContext(ctx, insertMember, + arg.GuildID, + arg.UserID, + arg.Username, + arg.GlobalName, + arg.DisplayName, + arg.Nick, + arg.Discriminator, + arg.Avatar, + arg.Bot, + arg.JoinedAt, + arg.RoleIdsJson, + arg.RawJson, + arg.UpdatedAt, + ) + return err +} + +const insertMentionEvent = `-- name: InsertMentionEvent :exec +insert into mention_events( + message_id, guild_id, channel_id, author_id, target_type, target_id, target_name, event_at +) values(?, ?, ?, ?, ?, ?, ?, ?) +` + +type InsertMentionEventParams struct { + MessageID string + GuildID string + ChannelID string + AuthorID sql.NullString + TargetType string + TargetID string + TargetName string + EventAt string +} + +func (q *Queries) InsertMentionEvent(ctx context.Context, arg InsertMentionEventParams) error { + _, err := q.db.ExecContext(ctx, insertMentionEvent, + arg.MessageID, + arg.GuildID, + arg.ChannelID, + arg.AuthorID, + arg.TargetType, + arg.TargetID, + arg.TargetName, + arg.EventAt, + ) + return err +} + +const insertMessageAttachment = `-- name: InsertMessageAttachment :exec +insert into message_attachments( + attachment_id, message_id, guild_id, channel_id, author_id, filename, + content_type, size, url, proxy_url, text_content, media_path, content_sha256, + content_size, fetched_at, fetch_status, fetch_error, updated_at +) values(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) +` + +type InsertMessageAttachmentParams struct { + AttachmentID string + MessageID string + GuildID string + ChannelID string + AuthorID sql.NullString + Filename string + ContentType sql.NullString + Size int64 + Url sql.NullString + ProxyUrl sql.NullString + TextContent string + MediaPath sql.NullString + ContentSha256 sql.NullString + ContentSize int64 + FetchedAt sql.NullString + FetchStatus string + FetchError string + UpdatedAt string +} + +func (q *Queries) InsertMessageAttachment(ctx context.Context, arg InsertMessageAttachmentParams) error { + _, err := q.db.ExecContext(ctx, insertMessageAttachment, + arg.AttachmentID, + arg.MessageID, + arg.GuildID, + arg.ChannelID, + arg.AuthorID, + arg.Filename, + arg.ContentType, + arg.Size, + arg.Url, + arg.ProxyUrl, + arg.TextContent, + arg.MediaPath, + arg.ContentSha256, + arg.ContentSize, + arg.FetchedAt, + arg.FetchStatus, + arg.FetchError, + arg.UpdatedAt, + ) + return err +} + +const insertMessageEvent = `-- name: InsertMessageEvent :exec +insert into message_events(guild_id, channel_id, message_id, event_type, event_at, payload_json) +values(?, ?, ?, ?, ?, ?) +` + +type InsertMessageEventParams struct { + GuildID string + ChannelID string + MessageID string + EventType string + EventAt string + PayloadJson string +} + +func (q *Queries) InsertMessageEvent(ctx context.Context, arg InsertMessageEventParams) error { + _, err := q.db.ExecContext(ctx, insertMessageEvent, + arg.GuildID, + arg.ChannelID, + arg.MessageID, + arg.EventType, + arg.EventAt, + arg.PayloadJson, + ) + return err +} + +const insertMissingEmbeddingJobs = `-- name: InsertMissingEmbeddingJobs :exec +insert or ignore into embedding_jobs( + message_id, state, attempts, provider, model, input_version, last_error, locked_at, updated_at +) +select id, 'pending', 0, ?, ?, ?, '', null, ? +from messages +` + +type InsertMissingEmbeddingJobsParams struct { + Provider string + Model string + InputVersion string + UpdatedAt string +} + +func (q *Queries) InsertMissingEmbeddingJobs(ctx context.Context, arg InsertMissingEmbeddingJobsParams) error { + _, err := q.db.ExecContext(ctx, insertMissingEmbeddingJobs, + arg.Provider, + arg.Model, + arg.InputVersion, + arg.UpdatedAt, + ) + return err +} + +const listChannels = `-- name: ListChannels :many +select id, guild_id, coalesce(parent_id, '') as parent_id, kind, name, + coalesce(topic, '') as topic, position, is_nsfw, is_archived, + is_locked, is_private_thread, coalesce(thread_parent_id, '') as thread_parent_id, + coalesce(archive_timestamp, '') as archive_timestamp +from channels +order by guild_id, position, name +` + +type ListChannelsRow struct { + ID string + GuildID string + ParentID string + Kind string + Name string + Topic string + Position sql.NullInt64 + IsNsfw int64 + IsArchived int64 + IsLocked int64 + IsPrivateThread int64 + ThreadParentID string + ArchiveTimestamp string +} + +func (q *Queries) ListChannels(ctx context.Context) ([]ListChannelsRow, error) { + rows, err := q.db.QueryContext(ctx, listChannels) + if err != nil { + return nil, err + } + defer rows.Close() + var items []ListChannelsRow + for rows.Next() { + var i ListChannelsRow + if err := rows.Scan( + &i.ID, + &i.GuildID, + &i.ParentID, + &i.Kind, + &i.Name, + &i.Topic, + &i.Position, + &i.IsNsfw, + &i.IsArchived, + &i.IsLocked, + &i.IsPrivateThread, + &i.ThreadParentID, + &i.ArchiveTimestamp, + ); err != nil { + return nil, err + } + items = append(items, i) + } + if err := rows.Close(); err != nil { + return nil, err + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + +const listChannelsByGuild = `-- name: ListChannelsByGuild :many +select id, guild_id, coalesce(parent_id, '') as parent_id, kind, name, + coalesce(topic, '') as topic, position, is_nsfw, is_archived, + is_locked, is_private_thread, coalesce(thread_parent_id, '') as thread_parent_id, + coalesce(archive_timestamp, '') as archive_timestamp +from channels +where guild_id = ? +order by guild_id, position, name +` + +type ListChannelsByGuildRow struct { + ID string + GuildID string + ParentID string + Kind string + Name string + Topic string + Position sql.NullInt64 + IsNsfw int64 + IsArchived int64 + IsLocked int64 + IsPrivateThread int64 + ThreadParentID string + ArchiveTimestamp string +} + +func (q *Queries) ListChannelsByGuild(ctx context.Context, guildID string) ([]ListChannelsByGuildRow, error) { + rows, err := q.db.QueryContext(ctx, listChannelsByGuild, guildID) + if err != nil { + return nil, err + } + defer rows.Close() + var items []ListChannelsByGuildRow + for rows.Next() { + var i ListChannelsByGuildRow + if err := rows.Scan( + &i.ID, + &i.GuildID, + &i.ParentID, + &i.Kind, + &i.Name, + &i.Topic, + &i.Position, + &i.IsNsfw, + &i.IsArchived, + &i.IsLocked, + &i.IsPrivateThread, + &i.ThreadParentID, + &i.ArchiveTimestamp, + ); err != nil { + return nil, err + } + items = append(items, i) + } + if err := rows.Close(); err != nil { + return nil, err + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + +const listExistingAttachmentMedia = `-- name: ListExistingAttachmentMedia :many +select attachment_id, coalesce(media_path, '') as media_path, + coalesce(content_sha256, '') as content_sha256, content_size, + coalesce(fetched_at, '') as fetched_at, coalesce(fetch_status, '') as fetch_status, + coalesce(fetch_error, '') as fetch_error +from message_attachments +where message_id = ? +` + +type ListExistingAttachmentMediaRow struct { + AttachmentID string + MediaPath string + ContentSha256 string + ContentSize int64 + FetchedAt string + FetchStatus string + FetchError string +} + +func (q *Queries) ListExistingAttachmentMedia(ctx context.Context, messageID string) ([]ListExistingAttachmentMediaRow, error) { + rows, err := q.db.QueryContext(ctx, listExistingAttachmentMedia, messageID) + if err != nil { + return nil, err + } + defer rows.Close() + var items []ListExistingAttachmentMediaRow + for rows.Next() { + var i ListExistingAttachmentMediaRow + if err := rows.Scan( + &i.AttachmentID, + &i.MediaPath, + &i.ContentSha256, + &i.ContentSize, + &i.FetchedAt, + &i.FetchStatus, + &i.FetchError, + ); err != nil { + return nil, err + } + items = append(items, i) + } + if err := rows.Close(); err != nil { + return nil, err + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + +const listGuildIDs = `-- name: ListGuildIDs :many +select id +from guilds +order by id +` + +func (q *Queries) ListGuildIDs(ctx context.Context) ([]string, error) { + rows, err := q.db.QueryContext(ctx, listGuildIDs) + if err != nil { + return nil, err + } + defer rows.Close() + var items []string + for rows.Next() { + var id string + if err := rows.Scan(&id); err != nil { + return nil, err + } + items = append(items, id) + } + if err := rows.Close(); err != nil { + return nil, err + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + +const listIncompleteMessageChannelIDs = `-- name: ListIncompleteMessageChannelIDs :many +select c.id +from channels c +where c.kind in ('text', 'news', 'announcement', 'thread_public', 'thread_private', 'thread_news', 'thread_announcement') + and not exists ( + select 1 + from sync_state s + where s.scope = 'channel:' || c.id || ':history_complete' + ) + and not exists ( + select 1 + from sync_state s + where s.scope = 'channel:' || c.id || ':unavailable' + ) +order by c.id +` + +func (q *Queries) ListIncompleteMessageChannelIDs(ctx context.Context) ([]string, error) { + rows, err := q.db.QueryContext(ctx, listIncompleteMessageChannelIDs) + if err != nil { + return nil, err + } + defer rows.Close() + var items []string + for rows.Next() { + var id string + if err := rows.Scan(&id); err != nil { + return nil, err + } + items = append(items, id) + } + if err := rows.Close(); err != nil { + return nil, err + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + +const listIncompleteMessageChannelIDsByGuild = `-- name: ListIncompleteMessageChannelIDsByGuild :many +select c.id +from channels c +where c.kind in ('text', 'news', 'announcement', 'thread_public', 'thread_private', 'thread_news', 'thread_announcement') + and c.guild_id = ? + and not exists ( + select 1 + from sync_state s + where s.scope = 'channel:' || c.id || ':history_complete' + ) + and not exists ( + select 1 + from sync_state s + where s.scope = 'channel:' || c.id || ':unavailable' + ) +order by c.id +` + +func (q *Queries) ListIncompleteMessageChannelIDsByGuild(ctx context.Context, guildID string) ([]string, error) { + rows, err := q.db.QueryContext(ctx, listIncompleteMessageChannelIDsByGuild, guildID) + if err != nil { + return nil, err + } + defer rows.Close() + var items []string + for rows.Next() { + var id string + if err := rows.Scan(&id); err != nil { + return nil, err + } + items = append(items, id) + } + if err := rows.Close(); err != nil { + return nil, err + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + +const listMembers = `-- name: ListMembers :many +select guild_id, user_id, username, coalesce(global_name, '') as global_name, + coalesce(display_name, '') as display_name, coalesce(nick, '') as nick, + coalesce(discriminator, '') as discriminator, coalesce(avatar, '') as avatar, + role_ids_json, bot, coalesce(joined_at, '') as joined_at, raw_json +from members +order by coalesce(nullif(display_name, ''), nullif(nick, ''), nullif(global_name, ''), username), username +limit ? +` + +type ListMembersRow struct { + GuildID string + UserID string + Username string + GlobalName string + DisplayName string + Nick string + Discriminator string + Avatar string + RoleIdsJson string + Bot int64 + JoinedAt string + RawJson string +} + +func (q *Queries) ListMembers(ctx context.Context, limit int64) ([]ListMembersRow, error) { + rows, err := q.db.QueryContext(ctx, listMembers, limit) + if err != nil { + return nil, err + } + defer rows.Close() + var items []ListMembersRow + for rows.Next() { + var i ListMembersRow + if err := rows.Scan( + &i.GuildID, + &i.UserID, + &i.Username, + &i.GlobalName, + &i.DisplayName, + &i.Nick, + &i.Discriminator, + &i.Avatar, + &i.RoleIdsJson, + &i.Bot, + &i.JoinedAt, + &i.RawJson, + ); err != nil { + return nil, err + } + items = append(items, i) + } + if err := rows.Close(); err != nil { + return nil, err + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + +const listMembersByGuild = `-- name: ListMembersByGuild :many +select guild_id, user_id, username, coalesce(global_name, '') as global_name, + coalesce(display_name, '') as display_name, coalesce(nick, '') as nick, + coalesce(discriminator, '') as discriminator, coalesce(avatar, '') as avatar, + role_ids_json, bot, coalesce(joined_at, '') as joined_at, raw_json +from members +where guild_id = ? +order by coalesce(nullif(display_name, ''), nullif(nick, ''), nullif(global_name, ''), username), username +limit ? +` + +type ListMembersByGuildParams struct { + GuildID string + Limit int64 +} + +type ListMembersByGuildRow struct { + GuildID string + UserID string + Username string + GlobalName string + DisplayName string + Nick string + Discriminator string + Avatar string + RoleIdsJson string + Bot int64 + JoinedAt string + RawJson string +} + +func (q *Queries) ListMembersByGuild(ctx context.Context, arg ListMembersByGuildParams) ([]ListMembersByGuildRow, error) { + rows, err := q.db.QueryContext(ctx, listMembersByGuild, arg.GuildID, arg.Limit) + if err != nil { + return nil, err + } + defer rows.Close() + var items []ListMembersByGuildRow + for rows.Next() { + var i ListMembersByGuildRow + if err := rows.Scan( + &i.GuildID, + &i.UserID, + &i.Username, + &i.GlobalName, + &i.DisplayName, + &i.Nick, + &i.Discriminator, + &i.Avatar, + &i.RoleIdsJson, + &i.Bot, + &i.JoinedAt, + &i.RawJson, + ); err != nil { + return nil, err + } + items = append(items, i) + } + if err := rows.Close(); err != nil { + return nil, err + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + +const listMembersByUserID = `-- name: ListMembersByUserID :many +select guild_id, user_id, username, coalesce(global_name, '') as global_name, + coalesce(display_name, '') as display_name, coalesce(nick, '') as nick, + coalesce(discriminator, '') as discriminator, coalesce(avatar, '') as avatar, + role_ids_json, bot, coalesce(joined_at, '') as joined_at, raw_json +from members +where user_id = ? +order by guild_id, username +` + +type ListMembersByUserIDRow struct { + GuildID string + UserID string + Username string + GlobalName string + DisplayName string + Nick string + Discriminator string + Avatar string + RoleIdsJson string + Bot int64 + JoinedAt string + RawJson string +} + +func (q *Queries) ListMembersByUserID(ctx context.Context, userID string) ([]ListMembersByUserIDRow, error) { + rows, err := q.db.QueryContext(ctx, listMembersByUserID, userID) + if err != nil { + return nil, err + } + defer rows.Close() + var items []ListMembersByUserIDRow + for rows.Next() { + var i ListMembersByUserIDRow + if err := rows.Scan( + &i.GuildID, + &i.UserID, + &i.Username, + &i.GlobalName, + &i.DisplayName, + &i.Nick, + &i.Discriminator, + &i.Avatar, + &i.RoleIdsJson, + &i.Bot, + &i.JoinedAt, + &i.RawJson, + ); err != nil { + return nil, err + } + items = append(items, i) + } + if err := rows.Close(); err != nil { + return nil, err + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + +const markMessageDeleted = `-- name: MarkMessageDeleted :exec +update messages +set deleted_at = ?, updated_at = ? +where id = ? +` + +type MarkMessageDeletedParams struct { + DeletedAt sql.NullString + UpdatedAt string + ID string +} + +func (q *Queries) MarkMessageDeleted(ctx context.Context, arg MarkMessageDeletedParams) error { + _, err := q.db.ExecContext(ctx, markMessageDeleted, arg.DeletedAt, arg.UpdatedAt, arg.ID) + return err +} + +const requeueAllEmbeddingJobs = `-- name: RequeueAllEmbeddingJobs :execrows +update embedding_jobs +set state = 'pending', + attempts = 0, + provider = ?, + model = ?, + input_version = ?, + last_error = '', + locked_at = null, + updated_at = ? +where message_id in (select id from messages) +` + +type RequeueAllEmbeddingJobsParams struct { + Provider string + Model string + InputVersion string + UpdatedAt string +} + +func (q *Queries) RequeueAllEmbeddingJobs(ctx context.Context, arg RequeueAllEmbeddingJobsParams) (int64, error) { + result, err := q.db.ExecContext(ctx, requeueAllEmbeddingJobs, + arg.Provider, + arg.Model, + arg.InputVersion, + arg.UpdatedAt, + ) + if err != nil { + return 0, err + } + return result.RowsAffected() +} + +const setSyncState = `-- name: SetSyncState :exec +insert into sync_state(scope, cursor, updated_at) +values(?, ?, ?) +on conflict(scope) do update set + cursor = excluded.cursor, + updated_at = excluded.updated_at +` + +type SetSyncStateParams struct { + Scope string + Cursor sql.NullString + UpdatedAt string +} + +func (q *Queries) SetSyncState(ctx context.Context, arg SetSyncStateParams) error { + _, err := q.db.ExecContext(ctx, setSyncState, arg.Scope, arg.Cursor, arg.UpdatedAt) + return err +} + +const updateAttachmentFetchStatus = `-- name: UpdateAttachmentFetchStatus :exec +update message_attachments +set fetched_at = ?, + fetch_status = ?, + fetch_error = ?, + updated_at = ? +where attachment_id = ? +` + +type UpdateAttachmentFetchStatusParams struct { + FetchedAt sql.NullString + FetchStatus string + FetchError string + UpdatedAt string + AttachmentID string +} + +func (q *Queries) UpdateAttachmentFetchStatus(ctx context.Context, arg UpdateAttachmentFetchStatusParams) error { + _, err := q.db.ExecContext(ctx, updateAttachmentFetchStatus, + arg.FetchedAt, + arg.FetchStatus, + arg.FetchError, + arg.UpdatedAt, + arg.AttachmentID, + ) + return err +} + +const updateAttachmentMedia = `-- name: UpdateAttachmentMedia :exec +update message_attachments +set media_path = ?, + content_sha256 = ?, + content_size = ?, + fetched_at = ?, + fetch_status = ?, + fetch_error = ?, + updated_at = ? +where attachment_id = ? +` + +type UpdateAttachmentMediaParams struct { + MediaPath sql.NullString + ContentSha256 sql.NullString + ContentSize int64 + FetchedAt sql.NullString + FetchStatus string + FetchError string + UpdatedAt string + AttachmentID string +} + +func (q *Queries) UpdateAttachmentMedia(ctx context.Context, arg UpdateAttachmentMediaParams) error { + _, err := q.db.ExecContext(ctx, updateAttachmentMedia, + arg.MediaPath, + arg.ContentSha256, + arg.ContentSize, + arg.FetchedAt, + arg.FetchStatus, + arg.FetchError, + arg.UpdatedAt, + arg.AttachmentID, + ) + return err +} + +const upsertChannel = `-- name: UpsertChannel :exec +insert into channels( + id, guild_id, parent_id, kind, name, topic, position, is_nsfw, + is_archived, is_locked, is_private_thread, thread_parent_id, + archive_timestamp, raw_json, updated_at +) values(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) +on conflict(id) do update set + guild_id = excluded.guild_id, + parent_id = excluded.parent_id, + kind = excluded.kind, + name = excluded.name, + topic = excluded.topic, + position = excluded.position, + is_nsfw = excluded.is_nsfw, + is_archived = excluded.is_archived, + is_locked = excluded.is_locked, + is_private_thread = excluded.is_private_thread, + thread_parent_id = excluded.thread_parent_id, + archive_timestamp = excluded.archive_timestamp, + raw_json = excluded.raw_json, + updated_at = excluded.updated_at +` + +type UpsertChannelParams struct { + ID string + GuildID string + ParentID sql.NullString + Kind string + Name string + Topic sql.NullString + Position sql.NullInt64 + IsNsfw int64 + IsArchived int64 + IsLocked int64 + IsPrivateThread int64 + ThreadParentID sql.NullString + ArchiveTimestamp sql.NullString + RawJson string + UpdatedAt string +} + +func (q *Queries) UpsertChannel(ctx context.Context, arg UpsertChannelParams) error { + _, err := q.db.ExecContext(ctx, upsertChannel, + arg.ID, + arg.GuildID, + arg.ParentID, + arg.Kind, + arg.Name, + arg.Topic, + arg.Position, + arg.IsNsfw, + arg.IsArchived, + arg.IsLocked, + arg.IsPrivateThread, + arg.ThreadParentID, + arg.ArchiveTimestamp, + arg.RawJson, + arg.UpdatedAt, + ) + return err +} + +const upsertEmbeddingJobPending = `-- name: UpsertEmbeddingJobPending :exec +insert into embedding_jobs(message_id, state, attempts, updated_at) +values(?, 'pending', 0, ?) +on conflict(message_id) do update set + state = 'pending', + attempts = 0, + last_error = '', + locked_at = null, + updated_at = excluded.updated_at +` + +type UpsertEmbeddingJobPendingParams struct { + MessageID string + UpdatedAt string +} + +func (q *Queries) UpsertEmbeddingJobPending(ctx context.Context, arg UpsertEmbeddingJobPendingParams) error { + _, err := q.db.ExecContext(ctx, upsertEmbeddingJobPending, arg.MessageID, arg.UpdatedAt) + return err +} + +const upsertGuild = `-- name: UpsertGuild :exec +insert into guilds(id, name, icon, raw_json, updated_at) +values(?, ?, ?, ?, ?) +on conflict(id) do update set + name = excluded.name, + icon = excluded.icon, + raw_json = excluded.raw_json, + updated_at = excluded.updated_at +` + +type UpsertGuildParams struct { + ID string + Name string + Icon sql.NullString + RawJson string + UpdatedAt string +} + +func (q *Queries) UpsertGuild(ctx context.Context, arg UpsertGuildParams) error { + _, err := q.db.ExecContext(ctx, upsertGuild, + arg.ID, + arg.Name, + arg.Icon, + arg.RawJson, + arg.UpdatedAt, + ) + return err +} + +const upsertMember = `-- name: UpsertMember :exec +insert into members( + guild_id, user_id, username, global_name, display_name, nick, discriminator, + avatar, bot, joined_at, role_ids_json, raw_json, updated_at +) values(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) +on conflict(guild_id, user_id) do update set + username = excluded.username, + global_name = excluded.global_name, + display_name = excluded.display_name, + nick = excluded.nick, + discriminator = excluded.discriminator, + avatar = excluded.avatar, + bot = excluded.bot, + joined_at = excluded.joined_at, + role_ids_json = excluded.role_ids_json, + raw_json = excluded.raw_json, + updated_at = excluded.updated_at +` + +type UpsertMemberParams struct { + GuildID string + UserID string + Username string + GlobalName sql.NullString + DisplayName sql.NullString + Nick sql.NullString + Discriminator sql.NullString + Avatar sql.NullString + Bot int64 + JoinedAt sql.NullString + RoleIdsJson string + RawJson string + UpdatedAt string +} + +func (q *Queries) UpsertMember(ctx context.Context, arg UpsertMemberParams) error { + _, err := q.db.ExecContext(ctx, upsertMember, + arg.GuildID, + arg.UserID, + arg.Username, + arg.GlobalName, + arg.DisplayName, + arg.Nick, + arg.Discriminator, + arg.Avatar, + arg.Bot, + arg.JoinedAt, + arg.RoleIdsJson, + arg.RawJson, + arg.UpdatedAt, + ) + return err +} + +const upsertMessage = `-- name: UpsertMessage :exec +insert into messages( + id, guild_id, channel_id, author_id, message_type, created_at, edited_at, deleted_at, + content, normalized_content, reply_to_message_id, pinned, has_attachments, raw_json, updated_at +) values(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) +on conflict(id) do update set + guild_id = excluded.guild_id, + channel_id = excluded.channel_id, + author_id = excluded.author_id, + message_type = excluded.message_type, + created_at = excluded.created_at, + edited_at = excluded.edited_at, + deleted_at = coalesce(excluded.deleted_at, messages.deleted_at), + content = excluded.content, + normalized_content = excluded.normalized_content, + reply_to_message_id = excluded.reply_to_message_id, + pinned = excluded.pinned, + has_attachments = excluded.has_attachments, + raw_json = excluded.raw_json, + updated_at = excluded.updated_at +` + +type UpsertMessageParams struct { + ID string + GuildID string + ChannelID string + AuthorID sql.NullString + MessageType int64 + CreatedAt string + EditedAt sql.NullString + DeletedAt sql.NullString + Content string + NormalizedContent string + ReplyToMessageID sql.NullString + Pinned int64 + HasAttachments int64 + RawJson string + UpdatedAt string +} + +func (q *Queries) UpsertMessage(ctx context.Context, arg UpsertMessageParams) error { + _, err := q.db.ExecContext(ctx, upsertMessage, + arg.ID, + arg.GuildID, + arg.ChannelID, + arg.AuthorID, + arg.MessageType, + arg.CreatedAt, + arg.EditedAt, + arg.DeletedAt, + arg.Content, + arg.NormalizedContent, + arg.ReplyToMessageID, + arg.Pinned, + arg.HasAttachments, + arg.RawJson, + arg.UpdatedAt, + ) + return err +} diff --git a/scripts/generate-sqlc.sh b/scripts/generate-sqlc.sh new file mode 100755 index 0000000..19e7c53 --- /dev/null +++ b/scripts/generate-sqlc.sh @@ -0,0 +1,4 @@ +#!/usr/bin/env bash +set -euo pipefail + +go run github.com/sqlc-dev/sqlc/cmd/sqlc@v1.31.1 generate diff --git a/sqlc.yaml b/sqlc.yaml new file mode 100644 index 0000000..065b59f --- /dev/null +++ b/sqlc.yaml @@ -0,0 +1,14 @@ +version: "2" +sql: + - engine: "sqlite" + schema: "internal/store/sqlc/schema.sql" + queries: "internal/store/sqlc/queries.sql" + gen: + go: + package: "storedb" + out: "internal/store/storedb" + sql_package: "database/sql" + emit_json_tags: false + emit_prepared_queries: false + emit_interface: false + emit_exact_table_names: false From 63375ea573251dbf7f0936e3ee28a9783a525237 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Fri, 15 May 2026 10:47:26 +0100 Subject: [PATCH 2/8] refactor: use sqlc for stable store reads --- internal/store/attachments.go | 41 ++--- internal/store/embeddings.go | 5 +- internal/store/query.go | 322 +++++++++++++++++----------------- internal/store/store.go | 7 +- internal/store/write.go | 25 +-- 5 files changed, 206 insertions(+), 194 deletions(-) diff --git a/internal/store/attachments.go b/internal/store/attachments.go index a16c59f..f4b5347 100644 --- a/internal/store/attachments.go +++ b/internal/store/attachments.go @@ -4,6 +4,8 @@ import ( "context" "strings" "time" + + "github.com/openclaw/discrawl/internal/store/storedb" ) type AttachmentListOptions struct { @@ -241,29 +243,24 @@ func (s *Store) ExpandAttachmentChannelIDs(ctx context.Context, channelIDs []str } func (s *Store) UpdateAttachmentMedia(ctx context.Context, update AttachmentMediaUpdate) error { - _, err := s.db.ExecContext(ctx, ` - update message_attachments - set media_path = ?, - content_sha256 = ?, - content_size = ?, - fetched_at = ?, - fetch_status = ?, - fetch_error = ?, - updated_at = ? - where attachment_id = ? - `, nullable(update.MediaPath), nullable(update.ContentSHA256), update.ContentSize, nullable(update.FetchedAt), - update.FetchStatus, update.FetchError, time.Now().UTC().Format(timeLayout), update.AttachmentID) - return err + return s.q.UpdateAttachmentMedia(ctx, storedb.UpdateAttachmentMediaParams{ + MediaPath: nullString(update.MediaPath), + ContentSha256: nullString(update.ContentSHA256), + ContentSize: update.ContentSize, + FetchedAt: nullString(update.FetchedAt), + FetchStatus: update.FetchStatus, + FetchError: update.FetchError, + UpdatedAt: time.Now().UTC().Format(timeLayout), + AttachmentID: update.AttachmentID, + }) } func (s *Store) UpdateAttachmentFetchStatus(ctx context.Context, attachmentID, fetchedAt, status, message string) error { - _, err := s.db.ExecContext(ctx, ` - update message_attachments - set fetched_at = ?, - fetch_status = ?, - fetch_error = ?, - updated_at = ? - where attachment_id = ? - `, nullable(fetchedAt), status, message, time.Now().UTC().Format(timeLayout), attachmentID) - return err + return s.q.UpdateAttachmentFetchStatus(ctx, storedb.UpdateAttachmentFetchStatusParams{ + FetchedAt: nullString(fetchedAt), + FetchStatus: status, + FetchError: message, + UpdatedAt: time.Now().UTC().Format(timeLayout), + AttachmentID: attachmentID, + }) } diff --git a/internal/store/embeddings.go b/internal/store/embeddings.go index 888428a..bb2e4b9 100644 --- a/internal/store/embeddings.go +++ b/internal/store/embeddings.go @@ -495,9 +495,8 @@ func DecodeEmbeddingVector(blob []byte) ([]float32, error) { } func (s *Store) EmbeddingBacklog(ctx context.Context) (int, error) { - var count int - err := s.db.QueryRowContext(ctx, `select count(*) from embedding_jobs where state = 'pending'`).Scan(&count) - return count, err + count, err := s.q.CountEmbeddingBacklog(ctx) + return int(count), err } func (s *Store) RequeueAllEmbeddingJobs(ctx context.Context, opts EmbeddingDrainOptions) (int, error) { diff --git a/internal/store/query.go b/internal/store/query.go index 4d80db2..63be6ed 100644 --- a/internal/store/query.go +++ b/internal/store/query.go @@ -11,6 +11,7 @@ import ( "time" "github.com/openclaw/crawlkit/vector" + "github.com/openclaw/discrawl/internal/store/storedb" ) const ( @@ -42,28 +43,22 @@ type SemanticSearchOptions struct { } func (s *Store) GetSyncState(ctx context.Context, scope string) (string, error) { - var cursor sql.NullString - err := s.db.QueryRowContext(ctx, `select cursor from sync_state where scope = ?`, scope).Scan(&cursor) + cursor, err := s.q.GetSyncState(ctx, scope) if err != nil { if err == sql.ErrNoRows { return "", nil } return "", err } - return cursor.String, nil + return cursor, nil } func (s *Store) ChannelMessageBounds(ctx context.Context, channelID string) (string, string, error) { - var oldest sql.NullString - var newest sql.NullString - if err := s.db.QueryRowContext(ctx, ` - select min(id), max(id) - from messages - where channel_id = ? - `, channelID).Scan(&oldest, &newest); err != nil { + row, err := s.q.ChannelMessageBounds(ctx, channelID) + if err != nil { return "", "", err } - return oldest.String, newest.String, nil + return row.OldestID, row.NewestID, nil } func (s *Store) SearchMessages(ctx context.Context, opts SearchOptions) ([]SearchResult, error) { @@ -398,17 +393,11 @@ func (s *Store) HasMessageEmbeddings(ctx context.Context, provider, model, input } queryCtx, cancel := withQueryTimeout(ctx) defer cancel() - var exists int - err := s.db.QueryRowContext(queryCtx, ` - select exists( - select 1 - from message_embeddings - where provider = ? - and model = ? - and input_version = ? - ) - `, provider, model, inputVersion).Scan(&exists) - return exists == 1, err + return s.q.HasMessageEmbeddings(queryCtx, storedb.HasMessageEmbeddingsParams{ + Provider: provider, + Model: model, + InputVersion: inputVersion, + }) } func (s *Store) CheckMessageFTS(ctx context.Context) error { @@ -498,186 +487,205 @@ func (s *Store) Members(ctx context.Context, guildID, query string, limit int) ( if limit <= 0 { limit = 100 } - args := []any{} - clauses := []string{"1=1"} + var out []MemberRow if guildID != "" { - clauses = append(clauses, "guild_id = ?") - args = append(args, guildID) + rows, err := s.q.ListMembersByGuild(ctx, storedb.ListMembersByGuildParams{GuildID: guildID, Limit: int64(limit)}) + if err != nil { + return nil, err + } + out = make([]MemberRow, 0, len(rows)) + for _, row := range rows { + out = append(out, MemberRow{ + GuildID: row.GuildID, + UserID: row.UserID, + Username: row.Username, + GlobalName: row.GlobalName, + DisplayName: row.DisplayName, + Nick: row.Nick, + Discriminator: row.Discriminator, + Avatar: row.Avatar, + RoleIDsJSON: row.RoleIdsJson, + Bot: row.Bot == 1, + JoinedAt: parseTime(row.JoinedAt), + RawJSON: row.RawJson, + }) + } + return out, nil } - args = append(args, limit) - rows, err := s.db.QueryContext(ctx, ` - select guild_id, user_id, username, coalesce(global_name, ''), coalesce(display_name, ''), - coalesce(nick, ''), coalesce(discriminator, ''), coalesce(avatar, ''), - role_ids_json, bot, coalesce(joined_at, ''), raw_json - from members - where `+strings.Join(clauses, " and ")+` - order by coalesce(nullif(display_name, ''), nullif(nick, ''), nullif(global_name, ''), username), username - limit ? - `, args...) + rows, err := s.q.ListMembers(ctx, int64(limit)) if err != nil { return nil, err } - defer func() { _ = rows.Close() }() - return scanMemberRows(rows) + out = make([]MemberRow, 0, len(rows)) + for _, row := range rows { + out = append(out, MemberRow{ + GuildID: row.GuildID, + UserID: row.UserID, + Username: row.Username, + GlobalName: row.GlobalName, + DisplayName: row.DisplayName, + Nick: row.Nick, + Discriminator: row.Discriminator, + Avatar: row.Avatar, + RoleIDsJSON: row.RoleIdsJson, + Bot: row.Bot == 1, + JoinedAt: parseTime(row.JoinedAt), + RawJSON: row.RawJson, + }) + } + return out, nil } func (s *Store) MemberByID(ctx context.Context, userID string) ([]MemberRow, error) { - rows, err := s.db.QueryContext(ctx, ` - select guild_id, user_id, username, coalesce(global_name, ''), coalesce(display_name, ''), - coalesce(nick, ''), coalesce(discriminator, ''), coalesce(avatar, ''), - role_ids_json, bot, coalesce(joined_at, ''), raw_json - from members - where user_id = ? - order by guild_id, username - `, userID) + rows, err := s.q.ListMembersByUserID(ctx, userID) if err != nil { return nil, err } - defer func() { _ = rows.Close() }() - return scanMemberRows(rows) + out := make([]MemberRow, 0, len(rows)) + for _, row := range rows { + out = append(out, MemberRow{ + GuildID: row.GuildID, + UserID: row.UserID, + Username: row.Username, + GlobalName: row.GlobalName, + DisplayName: row.DisplayName, + Nick: row.Nick, + Discriminator: row.Discriminator, + Avatar: row.Avatar, + RoleIDsJSON: row.RoleIdsJson, + Bot: row.Bot == 1, + JoinedAt: parseTime(row.JoinedAt), + RawJSON: row.RawJson, + }) + } + return out, nil } func (s *Store) Channels(ctx context.Context, guildID string) ([]ChannelRow, error) { - args := []any{} - query := ` - select id, guild_id, coalesce(parent_id, ''), kind, name, coalesce(topic, ''), position, - is_nsfw, is_archived, is_locked, is_private_thread, coalesce(thread_parent_id, ''), coalesce(archive_timestamp, '') - from channels - ` + var out []ChannelRow if guildID != "" { - query += ` where guild_id = ?` - args = append(args, guildID) + rows, err := s.q.ListChannelsByGuild(ctx, guildID) + if err != nil { + return nil, err + } + out = make([]ChannelRow, 0, len(rows)) + for _, row := range rows { + out = append(out, ChannelRow{ + ID: row.ID, + GuildID: row.GuildID, + ParentID: row.ParentID, + Kind: row.Kind, + Name: row.Name, + Topic: row.Topic, + Position: int(row.Position.Int64), + IsNSFW: row.IsNsfw == 1, + IsArchived: row.IsArchived == 1, + IsLocked: row.IsLocked == 1, + IsPrivateThread: row.IsPrivateThread == 1, + ThreadParentID: row.ThreadParentID, + ArchiveTimestamp: parseTime(row.ArchiveTimestamp), + }) + } + return out, nil } - query += ` order by guild_id, position, name` - rows, err := s.db.QueryContext(ctx, query, args...) + rows, err := s.q.ListChannels(ctx) if err != nil { return nil, err } - defer func() { _ = rows.Close() }() - var out []ChannelRow - for rows.Next() { - var row ChannelRow - var archived int - var locked int - var nsfw int - var priv int - var archiveTS string - if err := rows.Scan(&row.ID, &row.GuildID, &row.ParentID, &row.Kind, &row.Name, &row.Topic, &row.Position, &nsfw, &archived, &locked, &priv, &row.ThreadParentID, &archiveTS); err != nil { - return nil, err - } - row.IsNSFW = nsfw == 1 - row.IsArchived = archived == 1 - row.IsLocked = locked == 1 - row.IsPrivateThread = priv == 1 - row.ArchiveTimestamp = parseTime(archiveTS) - out = append(out, row) + out = make([]ChannelRow, 0, len(rows)) + for _, row := range rows { + out = append(out, ChannelRow{ + ID: row.ID, + GuildID: row.GuildID, + ParentID: row.ParentID, + Kind: row.Kind, + Name: row.Name, + Topic: row.Topic, + Position: int(row.Position.Int64), + IsNSFW: row.IsNsfw == 1, + IsArchived: row.IsArchived == 1, + IsLocked: row.IsLocked == 1, + IsPrivateThread: row.IsPrivateThread == 1, + ThreadParentID: row.ThreadParentID, + ArchiveTimestamp: parseTime(row.ArchiveTimestamp), + }) } - return out, rows.Err() + return out, nil } func (s *Store) GuildChannelCount(ctx context.Context, guildID string) (int, error) { - var count int - if err := s.db.QueryRowContext(ctx, ` - select count(*) - from channels - where guild_id = ? - `, guildID).Scan(&count); err != nil { - return 0, err - } - return count, nil + count, err := s.q.CountChannelsByGuild(ctx, guildID) + return int(count), err } func (s *Store) GuildMemberCount(ctx context.Context, guildID string) (int, error) { - var count int - if err := s.db.QueryRowContext(ctx, ` - select count(*) - from members - where guild_id = ? - `, guildID).Scan(&count); err != nil { - return 0, err - } - return count, nil + count, err := s.q.CountMembersByGuild(ctx, guildID) + return int(count), err } func (s *Store) IncompleteMessageChannelIDs(ctx context.Context, guildID string) ([]string, error) { - args := []any{} - query := ` - select c.id - from channels c - where c.kind in ('text', 'news', 'announcement', 'thread_public', 'thread_private', 'thread_news', 'thread_announcement') - ` if guildID != "" { - query += ` and c.guild_id = ?` - args = append(args, guildID) - } - query += ` - and not exists ( - select 1 - from sync_state s - where s.scope = 'channel:' || c.id || ':history_complete' - ) - and not exists ( - select 1 - from sync_state s - where s.scope = 'channel:' || c.id || ':unavailable' - ) - order by c.id - ` - rows, err := s.db.QueryContext(ctx, query, args...) - if err != nil { - return nil, err + return s.q.ListIncompleteMessageChannelIDsByGuild(ctx, guildID) } - defer func() { _ = rows.Close() }() - var out []string - for rows.Next() { - var id string - if err := rows.Scan(&id); err != nil { - return nil, err - } - out = append(out, id) - } - return out, rows.Err() + return s.q.ListIncompleteMessageChannelIDs(ctx) } func (s *Store) Status(ctx context.Context, dbPath, defaultGuildID string) (Status, error) { status := Status{DBPath: dbPath, DefaultGuildID: defaultGuildID} - queries := map[string]*int{ - `select count(*) from guilds`: &status.GuildCount, - `select count(*) from channels`: &status.ChannelCount, - `select count(*) from messages`: &status.MessageCount, - `select count(*) from members`: &status.MemberCount, - `select count(*) from embedding_jobs where state = 'pending'`: &status.EmbeddingBacklog, - } - for query, target := range queries { - if err := s.db.QueryRowContext(ctx, query).Scan(target); err != nil { - return Status{}, err - } + guildCount, err := s.q.CountGuilds(ctx) + if err != nil { + return Status{}, err } - if err := s.db.QueryRowContext(ctx, `select count(*) from channels where kind like 'thread_%'`).Scan(&status.ThreadCount); err != nil { + channelCount, err := s.q.CountChannels(ctx) + if err != nil { return Status{}, err } - var lastSync string - _ = s.db.QueryRowContext(ctx, `select updated_at from sync_state where scope = 'sync:last_success'`).Scan(&lastSync) - status.LastSyncAt = parseTime(lastSync) - var lastTail string - _ = s.db.QueryRowContext(ctx, `select updated_at from sync_state where scope = 'tail:last_event'`).Scan(&lastTail) - status.LastTailEventAt = parseTime(lastTail) - if defaultGuildID != "" { - _ = s.db.QueryRowContext(ctx, `select name from guilds where id = ?`, defaultGuildID).Scan(&status.DefaultGuildName) + messageCount, err := s.q.CountMessages(ctx) + if err != nil { + return Status{}, err } - rows, err := s.db.QueryContext(ctx, `select id from guilds order by id`) + memberCount, err := s.q.CountMembers(ctx) if err != nil { return Status{}, err } - defer func() { _ = rows.Close() }() - for rows.Next() { - var guildID string - if err := rows.Scan(&guildID); err != nil { + embeddingBacklog, err := s.q.CountEmbeddingBacklog(ctx) + if err != nil { + return Status{}, err + } + threadCount, err := s.q.CountThreads(ctx) + if err != nil { + return Status{}, err + } + status.GuildCount = int(guildCount) + status.ChannelCount = int(channelCount) + status.MessageCount = int(messageCount) + status.MemberCount = int(memberCount) + status.EmbeddingBacklog = int(embeddingBacklog) + status.ThreadCount = int(threadCount) + + lastSync, err := s.q.GetSyncUpdatedAt(ctx, "sync:last_success") + if err != nil && err != sql.ErrNoRows { + return Status{}, err + } + status.LastSyncAt = parseTime(lastSync) + lastTail, err := s.q.GetSyncUpdatedAt(ctx, "tail:last_event") + if err != nil && err != sql.ErrNoRows { + return Status{}, err + } + status.LastTailEventAt = parseTime(lastTail) + if defaultGuildID != "" { + name, err := s.q.GetGuildName(ctx, defaultGuildID) + if err != nil && err != sql.ErrNoRows { return Status{}, err } - status.AccessibleGuildIDs = append(status.AccessibleGuildIDs, guildID) + status.DefaultGuildName = name + } + guildIDs, err := s.q.ListGuildIDs(ctx) + if err != nil { + return Status{}, err } - return status, rows.Err() + status.AccessibleGuildIDs = guildIDs + return status, nil } func (s *Store) ReadOnlyQuery(ctx context.Context, query string) ([]string, [][]string, error) { diff --git a/internal/store/store.go b/internal/store/store.go index ddd0559..73984e5 100644 --- a/internal/store/store.go +++ b/internal/store/store.go @@ -10,6 +10,7 @@ import ( "time" crawlstore "github.com/openclaw/crawlkit/store" + "github.com/openclaw/discrawl/internal/store/storedb" ) const ( @@ -23,6 +24,7 @@ var ErrSchemaVersionMismatch = errors.New("database schema version mismatch") type Store struct { db *sql.DB + q *storedb.Queries path string } @@ -119,7 +121,7 @@ func Open(ctx context.Context, path string) (*Store, error) { return nil, err } db := base.DB() - store := &Store{db: db, path: path} + store := &Store{db: db, q: storedb.New(db), path: path} if err := store.migrate(ctx); err != nil { _ = base.Close() return nil, err @@ -132,7 +134,8 @@ func OpenReadOnly(ctx context.Context, path string) (*Store, error) { if err != nil { return nil, err } - store := &Store{db: base.DB(), path: path} + db := base.DB() + store := &Store{db: db, q: storedb.New(db), path: path} if version, err := store.schemaVersion(ctx); err != nil { _ = base.Close() return nil, err diff --git a/internal/store/write.go b/internal/store/write.go index 9766a04..261c2a1 100644 --- a/internal/store/write.go +++ b/internal/store/write.go @@ -5,6 +5,8 @@ import ( "database/sql" "encoding/json" "time" + + "github.com/openclaw/discrawl/internal/store/storedb" ) type GuildRecord struct { @@ -585,19 +587,15 @@ func replaceMentionEventsTx(ctx context.Context, tx *sql.Tx, messageID string, m } func (s *Store) SetSyncState(ctx context.Context, scope, cursor string) error { - _, err := s.db.ExecContext(ctx, ` - insert into sync_state(scope, cursor, updated_at) - values(?, ?, ?) - on conflict(scope) do update set - cursor=excluded.cursor, - updated_at=excluded.updated_at - `, scope, cursor, time.Now().UTC().Format(timeLayout)) - return err + return s.q.SetSyncState(ctx, storedb.SetSyncStateParams{ + Scope: scope, + Cursor: nullString(cursor), + UpdatedAt: time.Now().UTC().Format(timeLayout), + }) } func (s *Store) DeleteSyncState(ctx context.Context, scope string) error { - _, err := s.db.ExecContext(ctx, `delete from sync_state where scope = ?`, scope) - return err + return s.q.DeleteSyncState(ctx, scope) } func rollback(tx *sql.Tx) { @@ -620,6 +618,13 @@ func nullable(v string) any { return v } +func nullString(v string) sql.NullString { + if v == "" { + return sql.NullString{} + } + return sql.NullString{String: v, Valid: true} +} + func upsertMemberFTSTx(ctx context.Context, tx *sql.Tx, member MemberRecord) error { rowID := memberFTSRowID(member.GuildID, member.UserID) if _, err := tx.ExecContext(ctx, `delete from member_fts where rowid = ?`, rowID); err != nil { From bcd2929b8ac6eeb4b76f62d15edd08339bd9a504 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Fri, 15 May 2026 10:49:32 +0100 Subject: [PATCH 3/8] refactor: use sqlc for store writes --- internal/store/write.go | 453 ++++++++++++++++++---------------------- 1 file changed, 208 insertions(+), 245 deletions(-) diff --git a/internal/store/write.go b/internal/store/write.go index 261c2a1..f32ed9a 100644 --- a/internal/store/write.go +++ b/internal/store/write.go @@ -113,46 +113,17 @@ type WriteOptions struct { } func (s *Store) UpsertGuild(ctx context.Context, guild GuildRecord) error { - now := time.Now().UTC().Format(timeLayout) - _, err := s.db.ExecContext(ctx, ` - insert into guilds(id, name, icon, raw_json, updated_at) - values(?, ?, ?, ?, ?) - on conflict(id) do update set - name=excluded.name, - icon=excluded.icon, - raw_json=excluded.raw_json, - updated_at=excluded.updated_at - `, guild.ID, guild.Name, guild.Icon, guild.RawJSON, now) - return err + return s.q.UpsertGuild(ctx, storedb.UpsertGuildParams{ + ID: guild.ID, + Name: guild.Name, + Icon: nullString(guild.Icon), + RawJson: guild.RawJSON, + UpdatedAt: time.Now().UTC().Format(timeLayout), + }) } func (s *Store) UpsertChannel(ctx context.Context, channel ChannelRecord) error { - now := time.Now().UTC().Format(timeLayout) - _, err := s.db.ExecContext(ctx, ` - insert into channels( - id, guild_id, parent_id, kind, name, topic, position, is_nsfw, - is_archived, is_locked, is_private_thread, thread_parent_id, - archive_timestamp, raw_json, updated_at - ) values(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) - on conflict(id) do update set - guild_id=excluded.guild_id, - parent_id=excluded.parent_id, - kind=excluded.kind, - name=excluded.name, - topic=excluded.topic, - position=excluded.position, - is_nsfw=excluded.is_nsfw, - is_archived=excluded.is_archived, - is_locked=excluded.is_locked, - is_private_thread=excluded.is_private_thread, - thread_parent_id=excluded.thread_parent_id, - archive_timestamp=excluded.archive_timestamp, - raw_json=excluded.raw_json, - updated_at=excluded.updated_at - `, channel.ID, channel.GuildID, channel.ParentID, channel.Kind, channel.Name, channel.Topic, channel.Position, - boolInt(channel.IsNSFW), boolInt(channel.IsArchived), boolInt(channel.IsLocked), boolInt(channel.IsPrivateThread), - channel.ThreadParentID, nullable(channel.ArchiveTimestamp), channel.RawJSON, now) - return err + return s.q.UpsertChannel(ctx, upsertChannelParams(channel, time.Now().UTC().Format(timeLayout))) } func (s *Store) ReplaceMembers(ctx context.Context, guildID string, members []MemberRecord) error { @@ -161,27 +132,16 @@ func (s *Store) ReplaceMembers(ctx context.Context, guildID string, members []Me return err } defer rollback(tx) - if _, err := tx.ExecContext(ctx, `delete from members where guild_id = ?`, guildID); err != nil { + qtx := s.q.WithTx(tx) + if err := qtx.DeleteMembersByGuild(ctx, guildID); err != nil { return err } if _, err := tx.ExecContext(ctx, `delete from member_fts where guild_id = ?`, guildID); err != nil { return err } now := time.Now().UTC().Format(timeLayout) - stmt, err := tx.PrepareContext(ctx, ` - insert into members( - guild_id, user_id, username, global_name, display_name, nick, discriminator, - avatar, bot, joined_at, role_ids_json, raw_json, updated_at - ) values(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) - `) - if err != nil { - return err - } - defer func() { _ = stmt.Close() }() for _, member := range members { - if _, err := stmt.ExecContext(ctx, member.GuildID, member.UserID, member.Username, nullable(member.GlobalName), - nullable(member.DisplayName), nullable(member.Nick), nullable(member.Discriminator), nullable(member.Avatar), - boolInt(member.Bot), nullable(member.JoinedAt), member.RoleIDsJSON, member.RawJSON, now); err != nil { + if err := qtx.InsertMember(ctx, insertMemberParams(member, now)); err != nil { return err } if err := upsertMemberFTSTx(ctx, tx, member); err != nil { @@ -197,27 +157,9 @@ func (s *Store) UpsertMember(ctx context.Context, member MemberRecord) error { return err } defer rollback(tx) + qtx := s.q.WithTx(tx) now := time.Now().UTC().Format(timeLayout) - if _, err := tx.ExecContext(ctx, ` - insert into members( - guild_id, user_id, username, global_name, display_name, nick, discriminator, - avatar, bot, joined_at, role_ids_json, raw_json, updated_at - ) values(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) - on conflict(guild_id, user_id) do update set - username=excluded.username, - global_name=excluded.global_name, - display_name=excluded.display_name, - nick=excluded.nick, - discriminator=excluded.discriminator, - avatar=excluded.avatar, - bot=excluded.bot, - joined_at=excluded.joined_at, - role_ids_json=excluded.role_ids_json, - raw_json=excluded.raw_json, - updated_at=excluded.updated_at - `, member.GuildID, member.UserID, member.Username, nullable(member.GlobalName), nullable(member.DisplayName), - nullable(member.Nick), nullable(member.Discriminator), nullable(member.Avatar), boolInt(member.Bot), - nullable(member.JoinedAt), member.RoleIDsJSON, member.RawJSON, now); err != nil { + if err := qtx.UpsertMember(ctx, upsertMemberParams(member, now)); err != nil { return err } if err := upsertMemberFTSTx(ctx, tx, member); err != nil { @@ -232,37 +174,45 @@ func (s *Store) DeleteGuildData(ctx context.Context, guildID string) error { return err } defer rollback(tx) - for _, stmt := range []string{ - `delete from embedding_jobs where message_id in (select id from messages where guild_id = ?)`, - `delete from message_embeddings where message_id in (select id from messages where guild_id = ?)`, - `delete from message_fts where guild_id = ?`, - `delete from message_events where guild_id = ?`, - `delete from message_attachments where guild_id = ?`, - `delete from mention_events where guild_id = ?`, - `delete from messages where guild_id = ?`, - `delete from member_fts where guild_id = ?`, - `delete from members where guild_id = ?`, - `delete from channels where guild_id = ?`, - `delete from guilds where id = ?`, - } { - if _, err := tx.ExecContext(ctx, stmt, guildID); err != nil { - return err - } + qtx := s.q.WithTx(tx) + if err := qtx.DeleteEmbeddingJobsByGuild(ctx, guildID); err != nil { + return err + } + if err := qtx.DeleteMessageEmbeddingsByGuild(ctx, guildID); err != nil { + return err + } + if _, err := tx.ExecContext(ctx, `delete from message_fts where guild_id = ?`, guildID); err != nil { + return err + } + if err := qtx.DeleteMessageEventsByGuild(ctx, guildID); err != nil { + return err + } + if err := qtx.DeleteAttachmentsByGuild(ctx, guildID); err != nil { + return err + } + if err := qtx.DeleteMentionEventsByGuild(ctx, guildID); err != nil { + return err + } + if err := qtx.DeleteMessagesByGuild(ctx, guildID); err != nil { + return err + } + if _, err := tx.ExecContext(ctx, `delete from member_fts where guild_id = ?`, guildID); err != nil { + return err + } + if err := qtx.DeleteMembersByGuild(ctx, guildID); err != nil { + return err + } + if err := qtx.DeleteChannelsByGuild(ctx, guildID); err != nil { + return err + } + if err := qtx.DeleteGuild(ctx, guildID); err != nil { + return err } return tx.Commit() } func (s *Store) DeleteOrphanChannels(ctx context.Context, guildID string) error { - _, err := s.db.ExecContext(ctx, ` - delete from channels - where guild_id = ? - and not exists ( - select 1 - from messages - where messages.channel_id = channels.id - ) - `, guildID) - return err + return s.q.DeleteOrphanChannels(ctx, guildID) } func (s *Store) DeleteMember(ctx context.Context, guildID, userID string) error { @@ -271,7 +221,8 @@ func (s *Store) DeleteMember(ctx context.Context, guildID, userID string) error return err } defer rollback(tx) - if _, err := tx.ExecContext(ctx, `delete from members where guild_id = ? and user_id = ?`, guildID, userID); err != nil { + qtx := s.q.WithTx(tx) + if err := qtx.DeleteMember(ctx, storedb.DeleteMemberParams{GuildID: guildID, UserID: userID}); err != nil { return err } if _, err := tx.ExecContext(ctx, `delete from member_fts where rowid = ?`, memberFTSRowID(guildID, userID)); err != nil { @@ -290,7 +241,7 @@ func (s *Store) UpsertMessageWithOptions(ctx context.Context, message MessageRec return err } defer rollback(tx) - if err := upsertMessageTx(ctx, tx, message, opts); err != nil { + if err := upsertMessageTx(ctx, tx, s.q.WithTx(tx), message, opts); err != nil { return err } return tx.Commit() @@ -305,23 +256,24 @@ func (s *Store) UpsertMessages(ctx context.Context, messages []MessageMutation) return err } defer rollback(tx) + qtx := s.q.WithTx(tx) for _, message := range messages { if err := ctx.Err(); err != nil { return err } - if err := upsertMessageTx(ctx, tx, message.Record, message.Options); err != nil { + if err := upsertMessageTx(ctx, tx, qtx, message.Record, message.Options); err != nil { return err } - if err := replaceAttachmentsTx(ctx, tx, message.Record.ID, message.Attachments); err != nil { + if err := replaceAttachmentsTx(ctx, qtx, message.Record.ID, message.Attachments); err != nil { return err } - if err := replaceMentionEventsTx(ctx, tx, message.Record.ID, message.Mentions); err != nil { + if err := replaceMentionEventsTx(ctx, qtx, message.Record.ID, message.Mentions); err != nil { return err } if message.Options.AppendEvent && message.EventType != "" { if err := appendEventTx( ctx, - tx, + qtx, message.Record.GuildID, message.Record.ChannelID, message.Record.ID, @@ -335,55 +287,27 @@ func (s *Store) UpsertMessages(ctx context.Context, messages []MessageMutation) return tx.Commit() } -func upsertMessageTx(ctx context.Context, tx *sql.Tx, message MessageRecord, opts WriteOptions) error { +func upsertMessageTx(ctx context.Context, tx *sql.Tx, qtx *storedb.Queries, message MessageRecord, opts WriteOptions) error { now := time.Now().UTC().Format(timeLayout) var previousNormalized sql.NullString previousErr := sql.ErrNoRows jobExists := false if opts.EnqueueEmbedding { - previousErr = tx.QueryRowContext(ctx, ` - select normalized_content - from messages - where id = ? - `, message.ID).Scan(&previousNormalized) + normalized, err := qtx.GetMessageNormalizedContent(ctx, message.ID) + previousErr = err if previousErr != nil && previousErr != sql.ErrNoRows { return previousErr } if previousErr == nil { - var existingJobs int - if err := tx.QueryRowContext(ctx, ` - select count(*) - from embedding_jobs - where message_id = ? - `, message.ID).Scan(&existingJobs); err != nil { + previousNormalized = sql.NullString{String: normalized, Valid: true} + existingJobs, err := qtx.CountEmbeddingJobsByMessage(ctx, message.ID) + if err != nil { return err } jobExists = existingJobs > 0 } } - if _, err := tx.ExecContext(ctx, ` - insert into messages( - id, guild_id, channel_id, author_id, message_type, created_at, edited_at, deleted_at, - content, normalized_content, reply_to_message_id, pinned, has_attachments, raw_json, updated_at - ) values(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) - on conflict(id) do update set - guild_id=excluded.guild_id, - channel_id=excluded.channel_id, - author_id=excluded.author_id, - message_type=excluded.message_type, - created_at=excluded.created_at, - edited_at=excluded.edited_at, - deleted_at=coalesce(excluded.deleted_at, messages.deleted_at), - content=excluded.content, - normalized_content=excluded.normalized_content, - reply_to_message_id=excluded.reply_to_message_id, - pinned=excluded.pinned, - has_attachments=excluded.has_attachments, - raw_json=excluded.raw_json, - updated_at=excluded.updated_at - `, message.ID, message.GuildID, message.ChannelID, nullable(message.AuthorID), message.MessageType, message.CreatedAt, - nullable(message.EditedAt), nullable(message.DeletedAt), message.Content, message.NormalizedContent, - nullable(message.ReplyToMessageID), boolInt(message.Pinned), boolInt(message.HasAttachments), message.RawJSON, now); err != nil { + if err := qtx.UpsertMessage(ctx, upsertMessageParams(message, now)); err != nil { return err } if rowID, ok := messageFTSRowID(message.ID); ok { @@ -399,16 +323,7 @@ func upsertMessageTx(ctx context.Context, tx *sql.Tx, message MessageRecord, opt } queueEmbedding := opts.EnqueueEmbedding && (previousErr == sql.ErrNoRows || previousNormalized.String != message.NormalizedContent || !jobExists) if queueEmbedding { - if _, err := tx.ExecContext(ctx, ` - insert into embedding_jobs(message_id, state, attempts, updated_at) - values(?, 'pending', 0, ?) - on conflict(message_id) do update set - state = 'pending', - attempts = 0, - last_error = '', - locked_at = null, - updated_at = excluded.updated_at - `, message.ID, now); err != nil { + if err := qtx.UpsertEmbeddingJobPending(ctx, storedb.UpsertEmbeddingJobPendingParams{MessageID: message.ID, UpdatedAt: now}); err != nil { return err } } @@ -421,19 +336,20 @@ func (s *Store) MarkMessageDeleted(ctx context.Context, guildID, channelID, mess return err } defer rollback(tx) + qtx := s.q.WithTx(tx) now := time.Now().UTC().Format(timeLayout) - if _, err := tx.ExecContext(ctx, ` - update messages - set deleted_at = ?, updated_at = ? - where id = ? - `, now, now, messageID); err != nil { + if err := qtx.MarkMessageDeleted(ctx, storedb.MarkMessageDeletedParams{ + DeletedAt: sql.NullString{String: now, Valid: true}, + UpdatedAt: now, + ID: messageID, + }); err != nil { return err } body, err := json.Marshal(payload) if err != nil { return err } - if err := appendEventTx(ctx, tx, guildID, channelID, messageID, "delete", string(body)); err != nil { + if err := appendEventTx(ctx, qtx, guildID, channelID, messageID, "delete", string(body)); err != nil { return err } return tx.Commit() @@ -444,44 +360,32 @@ func (s *Store) AppendMessageEvent(ctx context.Context, guildID, channelID, mess if err != nil { return err } - _, err = s.db.ExecContext(ctx, ` - insert into message_events(guild_id, channel_id, message_id, event_type, event_at, payload_json) - values(?, ?, ?, ?, ?, ?) - `, guildID, channelID, messageID, eventType, time.Now().UTC().Format(timeLayout), string(body)) - return err + return appendEventTx(ctx, s.q, guildID, channelID, messageID, eventType, string(body)) } -func appendEventTx(ctx context.Context, tx *sql.Tx, guildID, channelID, messageID, eventType, payload string) error { - _, err := tx.ExecContext(ctx, ` - insert into message_events(guild_id, channel_id, message_id, event_type, event_at, payload_json) - values(?, ?, ?, ?, ?, ?) - `, guildID, channelID, messageID, eventType, time.Now().UTC().Format(timeLayout), payload) - return err +func appendEventTx(ctx context.Context, q *storedb.Queries, guildID, channelID, messageID, eventType, payload string) error { + return q.InsertMessageEvent(ctx, storedb.InsertMessageEventParams{ + GuildID: guildID, + ChannelID: channelID, + MessageID: messageID, + EventType: eventType, + EventAt: time.Now().UTC().Format(timeLayout), + PayloadJson: payload, + }) } -func replaceAttachmentsTx(ctx context.Context, tx *sql.Tx, messageID string, attachments []AttachmentRecord) error { - existing, err := existingAttachmentMediaTx(ctx, tx, messageID) +func replaceAttachmentsTx(ctx context.Context, qtx *storedb.Queries, messageID string, attachments []AttachmentRecord) error { + existing, err := existingAttachmentMediaTx(ctx, qtx, messageID) if err != nil { return err } - if _, err := tx.ExecContext(ctx, `delete from message_attachments where message_id = ?`, messageID); err != nil { + if err := qtx.DeleteAttachmentsByMessage(ctx, messageID); err != nil { return err } if len(attachments) == 0 { return nil } now := time.Now().UTC().Format(timeLayout) - stmt, err := tx.PrepareContext(ctx, ` - insert into message_attachments( - attachment_id, message_id, guild_id, channel_id, author_id, filename, - content_type, size, url, proxy_url, text_content, media_path, content_sha256, - content_size, fetched_at, fetch_status, fetch_error, updated_at - ) values(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) - `) - if err != nil { - return err - } - defer func() { _ = stmt.Close() }() for _, attachment := range attachments { if media, ok := existing[attachment.AttachmentID]; ok && attachment.MediaPath == "" { attachment.MediaPath = media.MediaPath @@ -491,95 +395,55 @@ func replaceAttachmentsTx(ctx context.Context, tx *sql.Tx, messageID string, att attachment.FetchStatus = media.FetchStatus attachment.FetchError = media.FetchError } - if _, err := stmt.ExecContext( - ctx, - attachment.AttachmentID, - attachment.MessageID, - attachment.GuildID, - attachment.ChannelID, - nullable(attachment.AuthorID), - attachment.Filename, - nullable(attachment.ContentType), - attachment.Size, - nullable(attachment.URL), - nullable(attachment.ProxyURL), - attachment.TextContent, - nullable(attachment.MediaPath), - nullable(attachment.ContentSHA256), - attachment.ContentSize, - nullable(attachment.FetchedAt), - attachment.FetchStatus, - attachment.FetchError, - now, - ); err != nil { + if err := qtx.InsertMessageAttachment(ctx, insertMessageAttachmentParams(attachment, now)); err != nil { return err } } return nil } -func existingAttachmentMediaTx(ctx context.Context, tx *sql.Tx, messageID string) (map[string]AttachmentRecord, error) { - rows, err := tx.QueryContext(ctx, ` - select attachment_id, coalesce(media_path, ''), coalesce(content_sha256, ''), - content_size, coalesce(fetched_at, ''), coalesce(fetch_status, ''), coalesce(fetch_error, '') - from message_attachments - where message_id = ? - `, messageID) +func existingAttachmentMediaTx(ctx context.Context, qtx *storedb.Queries, messageID string) (map[string]AttachmentRecord, error) { + rows, err := qtx.ListExistingAttachmentMedia(ctx, messageID) if err != nil { return nil, err } - defer func() { _ = rows.Close() }() out := map[string]AttachmentRecord{} - for rows.Next() { - var record AttachmentRecord - if err := rows.Scan( - &record.AttachmentID, - &record.MediaPath, - &record.ContentSHA256, - &record.ContentSize, - &record.FetchedAt, - &record.FetchStatus, - &record.FetchError, - ); err != nil { - return nil, err + for _, row := range rows { + out[row.AttachmentID] = AttachmentRecord{ + AttachmentID: row.AttachmentID, + MediaPath: row.MediaPath, + ContentSHA256: row.ContentSha256, + ContentSize: row.ContentSize, + FetchedAt: row.FetchedAt, + FetchStatus: row.FetchStatus, + FetchError: row.FetchError, } - out[record.AttachmentID] = record } - return out, rows.Err() + return out, nil } -func replaceMentionEventsTx(ctx context.Context, tx *sql.Tx, messageID string, mentions []MentionEventRecord) error { - if _, err := tx.ExecContext(ctx, `delete from mention_events where message_id = ?`, messageID); err != nil { +func replaceMentionEventsTx(ctx context.Context, qtx *storedb.Queries, messageID string, mentions []MentionEventRecord) error { + if err := qtx.DeleteMentionEventsByMessage(ctx, messageID); err != nil { return err } if len(mentions) == 0 { return nil } - stmt, err := tx.PrepareContext(ctx, ` - insert into mention_events( - message_id, guild_id, channel_id, author_id, target_type, target_id, target_name, event_at - ) values(?, ?, ?, ?, ?, ?, ?, ?) - `) - if err != nil { - return err - } - defer func() { _ = stmt.Close() }() for _, mention := range mentions { eventAt := mention.EventAt if eventAt == "" { eventAt = time.Now().UTC().Format(timeLayout) } - if _, err := stmt.ExecContext( - ctx, - mention.MessageID, - mention.GuildID, - mention.ChannelID, - nullable(mention.AuthorID), - mention.TargetType, - mention.TargetID, - mention.TargetName, - eventAt, - ); err != nil { + if err := qtx.InsertMentionEvent(ctx, storedb.InsertMentionEventParams{ + MessageID: mention.MessageID, + GuildID: mention.GuildID, + ChannelID: mention.ChannelID, + AuthorID: nullString(mention.AuthorID), + TargetType: mention.TargetType, + TargetID: mention.TargetID, + TargetName: mention.TargetName, + EventAt: eventAt, + }); err != nil { return err } } @@ -589,7 +453,7 @@ func replaceMentionEventsTx(ctx context.Context, tx *sql.Tx, messageID string, m func (s *Store) SetSyncState(ctx context.Context, scope, cursor string) error { return s.q.SetSyncState(ctx, storedb.SetSyncStateParams{ Scope: scope, - Cursor: nullString(cursor), + Cursor: sql.NullString{String: cursor, Valid: true}, UpdatedAt: time.Now().UTC().Format(timeLayout), }) } @@ -625,6 +489,105 @@ func nullString(v string) sql.NullString { return sql.NullString{String: v, Valid: true} } +func upsertChannelParams(channel ChannelRecord, now string) storedb.UpsertChannelParams { + return storedb.UpsertChannelParams{ + ID: channel.ID, + GuildID: channel.GuildID, + ParentID: nullString(channel.ParentID), + Kind: channel.Kind, + Name: channel.Name, + Topic: nullString(channel.Topic), + Position: sql.NullInt64{Int64: int64(channel.Position), Valid: true}, + IsNsfw: int64(boolInt(channel.IsNSFW)), + IsArchived: int64(boolInt(channel.IsArchived)), + IsLocked: int64(boolInt(channel.IsLocked)), + IsPrivateThread: int64(boolInt(channel.IsPrivateThread)), + ThreadParentID: nullString(channel.ThreadParentID), + ArchiveTimestamp: nullString(channel.ArchiveTimestamp), + RawJson: channel.RawJSON, + UpdatedAt: now, + } +} + +func insertMemberParams(member MemberRecord, now string) storedb.InsertMemberParams { + return storedb.InsertMemberParams{ + GuildID: member.GuildID, + UserID: member.UserID, + Username: member.Username, + GlobalName: nullString(member.GlobalName), + DisplayName: nullString(member.DisplayName), + Nick: nullString(member.Nick), + Discriminator: nullString(member.Discriminator), + Avatar: nullString(member.Avatar), + Bot: int64(boolInt(member.Bot)), + JoinedAt: nullString(member.JoinedAt), + RoleIdsJson: member.RoleIDsJSON, + RawJson: member.RawJSON, + UpdatedAt: now, + } +} + +func upsertMemberParams(member MemberRecord, now string) storedb.UpsertMemberParams { + return storedb.UpsertMemberParams{ + GuildID: member.GuildID, + UserID: member.UserID, + Username: member.Username, + GlobalName: nullString(member.GlobalName), + DisplayName: nullString(member.DisplayName), + Nick: nullString(member.Nick), + Discriminator: nullString(member.Discriminator), + Avatar: nullString(member.Avatar), + Bot: int64(boolInt(member.Bot)), + JoinedAt: nullString(member.JoinedAt), + RoleIdsJson: member.RoleIDsJSON, + RawJson: member.RawJSON, + UpdatedAt: now, + } +} + +func upsertMessageParams(message MessageRecord, now string) storedb.UpsertMessageParams { + return storedb.UpsertMessageParams{ + ID: message.ID, + GuildID: message.GuildID, + ChannelID: message.ChannelID, + AuthorID: nullString(message.AuthorID), + MessageType: int64(message.MessageType), + CreatedAt: message.CreatedAt, + EditedAt: nullString(message.EditedAt), + DeletedAt: nullString(message.DeletedAt), + Content: message.Content, + NormalizedContent: message.NormalizedContent, + ReplyToMessageID: nullString(message.ReplyToMessageID), + Pinned: int64(boolInt(message.Pinned)), + HasAttachments: int64(boolInt(message.HasAttachments)), + RawJson: message.RawJSON, + UpdatedAt: now, + } +} + +func insertMessageAttachmentParams(attachment AttachmentRecord, now string) storedb.InsertMessageAttachmentParams { + return storedb.InsertMessageAttachmentParams{ + AttachmentID: attachment.AttachmentID, + MessageID: attachment.MessageID, + GuildID: attachment.GuildID, + ChannelID: attachment.ChannelID, + AuthorID: nullString(attachment.AuthorID), + Filename: attachment.Filename, + ContentType: nullString(attachment.ContentType), + Size: attachment.Size, + Url: nullString(attachment.URL), + ProxyUrl: nullString(attachment.ProxyURL), + TextContent: attachment.TextContent, + MediaPath: nullString(attachment.MediaPath), + ContentSha256: nullString(attachment.ContentSHA256), + ContentSize: attachment.ContentSize, + FetchedAt: nullString(attachment.FetchedAt), + FetchStatus: attachment.FetchStatus, + FetchError: attachment.FetchError, + UpdatedAt: now, + } +} + func upsertMemberFTSTx(ctx context.Context, tx *sql.Tx, member MemberRecord) error { rowID := memberFTSRowID(member.GuildID, member.UserID) if _, err := tx.ExecContext(ctx, `delete from member_fts where rowid = ?`, rowID); err != nil { From 2d3e003fae6436c033788067a2dff870654ad3a8 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Fri, 15 May 2026 10:51:21 +0100 Subject: [PATCH 4/8] refactor: use sqlc for embedding jobs --- internal/store/embeddings.go | 230 ++++++++---------- internal/store/sqlc/queries.sql | 102 ++++++++ internal/store/storedb/queries.sql.go | 326 ++++++++++++++++++++++++++ 3 files changed, 520 insertions(+), 138 deletions(-) diff --git a/internal/store/embeddings.go b/internal/store/embeddings.go index bb2e4b9..c4cfbc8 100644 --- a/internal/store/embeddings.go +++ b/internal/store/embeddings.go @@ -9,6 +9,7 @@ import ( "github.com/openclaw/crawlkit/embed" "github.com/openclaw/crawlkit/vector" + "github.com/openclaw/discrawl/internal/store/storedb" ) const ( @@ -163,62 +164,39 @@ func emptyEmbeddingIdentity(job embeddingJob) bool { } func (s *Store) pendingEmbeddingJobs(ctx context.Context, limit int, staleBefore string) ([]embeddingJob, error) { - rows, err := s.db.QueryContext(ctx, ` - select - j.message_id, - m.normalized_content, - j.attempts, - j.provider, - j.model, - j.input_version - from embedding_jobs j - join messages m on m.id = j.message_id - where j.state = 'pending' - and (j.locked_at is null or j.locked_at = '' or j.locked_at < ?) - order by j.updated_at, j.message_id - limit ? - `, staleBefore, limit) + rows, err := s.q.ListPendingEmbeddingJobs(ctx, storedb.ListPendingEmbeddingJobsParams{ + LockedAt: nullString(staleBefore), + Limit: int64(limit), + }) if err != nil { return nil, err } - defer func() { _ = rows.Close() }() - var jobs []embeddingJob - for rows.Next() { - var job embeddingJob - if err := rows.Scan(&job.MessageID, &job.NormalizedContent, &job.Attempts, &job.Provider, &job.Model, &job.InputVersion); err != nil { - return nil, err - } - jobs = append(jobs, job) - } - return jobs, rows.Err() + jobs := make([]embeddingJob, 0, len(rows)) + for _, row := range rows { + jobs = append(jobs, embeddingJob{ + MessageID: row.MessageID, + NormalizedContent: row.NormalizedContent, + Attempts: int(row.Attempts), + Provider: row.Provider, + Model: row.Model, + InputVersion: row.InputVersion, + }) + } + return jobs, nil } func (s *Store) resetEmbeddingJobIdentity(ctx context.Context, messageID string, opts EmbeddingDrainOptions, resetAttempts bool) error { + arg := storedb.ResetEmbeddingJobIdentityParams{ + Provider: opts.Provider, + Model: opts.Model, + InputVersion: opts.InputVersion, + UpdatedAt: opts.Now().Format(timeLayout), + MessageID: messageID, + } if resetAttempts { - _, err := s.db.ExecContext(ctx, ` - update embedding_jobs - set provider = ?, - model = ?, - input_version = ?, - attempts = 0, - last_error = '', - locked_at = null, - updated_at = ? - where message_id = ? - `, opts.Provider, opts.Model, opts.InputVersion, opts.Now().Format(timeLayout), messageID) - return err + return s.q.ResetEmbeddingJobIdentityAndAttempts(ctx, storedb.ResetEmbeddingJobIdentityAndAttemptsParams(arg)) } - _, err := s.db.ExecContext(ctx, ` - update embedding_jobs - set provider = ?, - model = ?, - input_version = ?, - last_error = '', - locked_at = null, - updated_at = ? - where message_id = ? - `, opts.Provider, opts.Model, opts.InputVersion, opts.Now().Format(timeLayout), messageID) - return err + return s.q.ResetEmbeddingJobIdentity(ctx, arg) } func (s *Store) processEmbeddingBatch(ctx context.Context, provider embed.Provider, opts EmbeddingDrainOptions, jobs []embeddingJob, stats *EmbeddingDrainStats) (bool, error) { @@ -279,19 +257,15 @@ func (s *Store) lockEmbeddingJobs(ctx context.Context, jobs []embeddingJob, lock return nil, err } defer rollback(tx) + qtx := s.q.WithTx(tx) claimed := make([]embeddingJob, 0, len(jobs)) for _, job := range jobs { - result, err := tx.ExecContext(ctx, ` - update embedding_jobs - set locked_at = ?, updated_at = ? - where message_id = ? - and state = 'pending' - and (locked_at is null or locked_at = '' or locked_at < ?) - `, lockedAt, lockedAt, job.MessageID, staleBefore) - if err != nil { - return nil, err - } - rows, err := result.RowsAffected() + rows, err := qtx.LockEmbeddingJob(ctx, storedb.LockEmbeddingJobParams{ + LockedAt: nullString(lockedAt), + UpdatedAt: lockedAt, + MessageID: job.MessageID, + StaleBefore: nullString(staleBefore), + }) if err != nil { return nil, err } @@ -331,35 +305,31 @@ func (s *Store) storeEmbeddingBatch(ctx context.Context, opts EmbeddingDrainOpti return err } defer rollback(tx) + qtx := s.q.WithTx(tx) embeddedAt := opts.Now().Format(timeLayout) for i, job := range jobs { blob, err := EncodeEmbeddingVector(vectors[i]) if err != nil { return err } - if _, err := tx.ExecContext(ctx, ` - insert into message_embeddings( - message_id, provider, model, input_version, dimensions, embedding_blob, embedded_at - ) values(?, ?, ?, ?, ?, ?, ?) - on conflict(message_id, provider, model, input_version) do update set - dimensions = excluded.dimensions, - embedding_blob = excluded.embedding_blob, - embedded_at = excluded.embedded_at - `, job.MessageID, opts.Provider, opts.Model, opts.InputVersion, dimensions, blob, embeddedAt); err != nil { + if err := qtx.UpsertMessageEmbedding(ctx, storedb.UpsertMessageEmbeddingParams{ + MessageID: job.MessageID, + Provider: opts.Provider, + Model: opts.Model, + InputVersion: opts.InputVersion, + Dimensions: int64(dimensions), + EmbeddingBlob: blob, + EmbeddedAt: embeddedAt, + }); err != nil { return err } - if _, err := tx.ExecContext(ctx, ` - update embedding_jobs - set state = 'done', - attempts = 0, - provider = ?, - model = ?, - input_version = ?, - last_error = '', - locked_at = null, - updated_at = ? - where message_id = ? - `, opts.Provider, opts.Model, opts.InputVersion, embeddedAt, job.MessageID); err != nil { + if err := qtx.MarkEmbeddingJobDone(ctx, storedb.MarkEmbeddingJobDoneParams{ + Provider: opts.Provider, + Model: opts.Model, + InputVersion: opts.InputVersion, + UpdatedAt: embeddedAt, + MessageID: job.MessageID, + }); err != nil { return err } } @@ -372,22 +342,19 @@ func (s *Store) markEmbeddingJobsDone(ctx context.Context, opts EmbeddingDrainOp return err } defer rollback(tx) + qtx := s.q.WithTx(tx) now := opts.Now().Format(timeLayout) for _, job := range jobs { - if _, err := tx.ExecContext(ctx, `delete from message_embeddings where message_id = ?`, job.MessageID); err != nil { + if err := qtx.DeleteMessageEmbeddingsByMessage(ctx, job.MessageID); err != nil { return err } - if _, err := tx.ExecContext(ctx, ` - update embedding_jobs - set state = 'done', - provider = ?, - model = ?, - input_version = ?, - last_error = '', - locked_at = null, - updated_at = ? - where message_id = ? - `, opts.Provider, opts.Model, opts.InputVersion, now, job.MessageID); err != nil { + if err := qtx.MarkEmptyEmbeddingJobDone(ctx, storedb.MarkEmptyEmbeddingJobDoneParams{ + Provider: opts.Provider, + Model: opts.Model, + InputVersion: opts.InputVersion, + UpdatedAt: now, + MessageID: job.MessageID, + }); err != nil { return err } } @@ -400,20 +367,18 @@ func (s *Store) markEmbeddingJobsRateLimited(ctx context.Context, opts Embedding return err } defer rollback(tx) + qtx := s.q.WithTx(tx) now := opts.Now().Format(timeLayout) lastError := trimStoredError(cause) for _, job := range jobs { - if _, err := tx.ExecContext(ctx, ` - update embedding_jobs - set state = 'pending', - provider = ?, - model = ?, - input_version = ?, - last_error = ?, - locked_at = null, - updated_at = ? - where message_id = ? - `, opts.Provider, opts.Model, opts.InputVersion, lastError, now, job.MessageID); err != nil { + if err := qtx.MarkEmbeddingJobRateLimited(ctx, storedb.MarkEmbeddingJobRateLimitedParams{ + Provider: opts.Provider, + Model: opts.Model, + InputVersion: opts.InputVersion, + LastError: lastError, + UpdatedAt: now, + MessageID: job.MessageID, + }); err != nil { return err } } @@ -426,6 +391,7 @@ func (s *Store) markEmbeddingJobsFailed(ctx context.Context, opts EmbeddingDrain return err } defer rollback(tx) + qtx := s.q.WithTx(tx) now := opts.Now().Format(timeLayout) lastError := trimStoredError(cause) for _, job := range jobs { @@ -434,18 +400,16 @@ func (s *Store) markEmbeddingJobsFailed(ctx context.Context, opts EmbeddingDrain if attempts >= maxEmbeddingAttempts { state = "failed" } - if _, err := tx.ExecContext(ctx, ` - update embedding_jobs - set state = ?, - attempts = ?, - provider = ?, - model = ?, - input_version = ?, - last_error = ?, - locked_at = null, - updated_at = ? - where message_id = ? - `, state, attempts, opts.Provider, opts.Model, opts.InputVersion, lastError, now, job.MessageID); err != nil { + if err := qtx.MarkEmbeddingJobFailed(ctx, storedb.MarkEmbeddingJobFailedParams{ + State: state, + Attempts: int64(attempts), + Provider: opts.Provider, + Model: opts.Model, + InputVersion: opts.InputVersion, + LastError: lastError, + UpdatedAt: now, + MessageID: job.MessageID, + }); err != nil { return err } } @@ -506,37 +470,27 @@ func (s *Store) RequeueAllEmbeddingJobs(ctx context.Context, opts EmbeddingDrain return 0, err } defer rollback(tx) + qtx := s.q.WithTx(tx) now := opts.Now().Format(timeLayout) - if _, err := tx.ExecContext(ctx, ` - insert or ignore into embedding_jobs( - message_id, state, attempts, provider, model, input_version, last_error, locked_at, updated_at - ) - select id, 'pending', 0, ?, ?, ?, '', null, ? - from messages - `, opts.Provider, opts.Model, opts.InputVersion, now); err != nil { + if err := qtx.InsertMissingEmbeddingJobs(ctx, storedb.InsertMissingEmbeddingJobsParams{ + Provider: opts.Provider, + Model: opts.Model, + InputVersion: opts.InputVersion, + UpdatedAt: now, + }); err != nil { return 0, err } - result, err := tx.ExecContext(ctx, ` - update embedding_jobs - set state = 'pending', - attempts = 0, - provider = ?, - model = ?, - input_version = ?, - last_error = '', - locked_at = null, - updated_at = ? - where message_id in (select id from messages) - `, opts.Provider, opts.Model, opts.InputVersion, now) + affected, err := qtx.RequeueAllEmbeddingJobs(ctx, storedb.RequeueAllEmbeddingJobsParams{ + Provider: opts.Provider, + Model: opts.Model, + InputVersion: opts.InputVersion, + UpdatedAt: now, + }) if err != nil { return 0, err } if err := tx.Commit(); err != nil { return 0, err } - affected, err := result.RowsAffected() - if err != nil { - return 0, err - } return int(affected), nil } diff --git a/internal/store/sqlc/queries.sql b/internal/store/sqlc/queries.sql index 5512893..673fbf5 100644 --- a/internal/store/sqlc/queries.sql +++ b/internal/store/sqlc/queries.sql @@ -373,3 +373,105 @@ set state = 'pending', locked_at = null, updated_at = ? where message_id in (select id from messages); + +-- name: ListPendingEmbeddingJobs :many +select + j.message_id, + m.normalized_content, + j.attempts, + j.provider, + j.model, + j.input_version +from embedding_jobs j +join messages m on m.id = j.message_id +where j.state = 'pending' + and (j.locked_at is null or j.locked_at = '' or j.locked_at < ?) +order by j.updated_at, j.message_id +limit ?; + +-- name: ResetEmbeddingJobIdentity :exec +update embedding_jobs +set provider = ?, + model = ?, + input_version = ?, + last_error = '', + locked_at = null, + updated_at = ? +where message_id = ?; + +-- name: ResetEmbeddingJobIdentityAndAttempts :exec +update embedding_jobs +set provider = ?, + model = ?, + input_version = ?, + attempts = 0, + last_error = '', + locked_at = null, + updated_at = ? +where message_id = ?; + +-- name: LockEmbeddingJob :execrows +update embedding_jobs +set locked_at = sqlc.arg(locked_at), updated_at = sqlc.arg(updated_at) +where message_id = sqlc.arg(message_id) + and state = 'pending' + and (locked_at is null or locked_at = '' or locked_at < sqlc.arg(stale_before)); + +-- name: UpsertMessageEmbedding :exec +insert into message_embeddings( + message_id, provider, model, input_version, dimensions, embedding_blob, embedded_at +) values(?, ?, ?, ?, ?, ?, ?) +on conflict(message_id, provider, model, input_version) do update set + dimensions = excluded.dimensions, + embedding_blob = excluded.embedding_blob, + embedded_at = excluded.embedded_at; + +-- name: DeleteMessageEmbeddingsByMessage :exec +delete from message_embeddings +where message_id = ?; + +-- name: MarkEmbeddingJobDone :exec +update embedding_jobs +set state = 'done', + attempts = 0, + provider = ?, + model = ?, + input_version = ?, + last_error = '', + locked_at = null, + updated_at = ? +where message_id = ?; + +-- name: MarkEmptyEmbeddingJobDone :exec +update embedding_jobs +set state = 'done', + provider = ?, + model = ?, + input_version = ?, + last_error = '', + locked_at = null, + updated_at = ? +where message_id = ?; + +-- name: MarkEmbeddingJobRateLimited :exec +update embedding_jobs +set state = 'pending', + provider = ?, + model = ?, + input_version = ?, + last_error = ?, + locked_at = null, + updated_at = ? +where message_id = ?; + +-- name: MarkEmbeddingJobFailed :exec +update embedding_jobs +set state = ?, + attempts = ?, + provider = ?, + model = ?, + input_version = ?, + last_error = ?, + locked_at = null, + updated_at = ? +where message_id = ?; diff --git a/internal/store/storedb/queries.sql.go b/internal/store/storedb/queries.sql.go index 0d39769..6ffe09e 100644 --- a/internal/store/storedb/queries.sql.go +++ b/internal/store/storedb/queries.sql.go @@ -243,6 +243,16 @@ func (q *Queries) DeleteMessageEmbeddingsByGuild(ctx context.Context, guildID st return err } +const deleteMessageEmbeddingsByMessage = `-- name: DeleteMessageEmbeddingsByMessage :exec +delete from message_embeddings +where message_id = ? +` + +func (q *Queries) DeleteMessageEmbeddingsByMessage(ctx context.Context, messageID string) error { + _, err := q.db.ExecContext(ctx, deleteMessageEmbeddingsByMessage, messageID) + return err +} + const deleteMessageEventsByGuild = `-- name: DeleteMessageEventsByGuild :exec delete from message_events where guild_id = ? @@ -1014,6 +1024,228 @@ func (q *Queries) ListMembersByUserID(ctx context.Context, userID string) ([]Lis return items, nil } +const listPendingEmbeddingJobs = `-- name: ListPendingEmbeddingJobs :many +select + j.message_id, + m.normalized_content, + j.attempts, + j.provider, + j.model, + j.input_version +from embedding_jobs j +join messages m on m.id = j.message_id +where j.state = 'pending' + and (j.locked_at is null or j.locked_at = '' or j.locked_at < ?) +order by j.updated_at, j.message_id +limit ? +` + +type ListPendingEmbeddingJobsParams struct { + LockedAt sql.NullString + Limit int64 +} + +type ListPendingEmbeddingJobsRow struct { + MessageID string + NormalizedContent string + Attempts int64 + Provider string + Model string + InputVersion string +} + +func (q *Queries) ListPendingEmbeddingJobs(ctx context.Context, arg ListPendingEmbeddingJobsParams) ([]ListPendingEmbeddingJobsRow, error) { + rows, err := q.db.QueryContext(ctx, listPendingEmbeddingJobs, arg.LockedAt, arg.Limit) + if err != nil { + return nil, err + } + defer rows.Close() + var items []ListPendingEmbeddingJobsRow + for rows.Next() { + var i ListPendingEmbeddingJobsRow + if err := rows.Scan( + &i.MessageID, + &i.NormalizedContent, + &i.Attempts, + &i.Provider, + &i.Model, + &i.InputVersion, + ); err != nil { + return nil, err + } + items = append(items, i) + } + if err := rows.Close(); err != nil { + return nil, err + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + +const lockEmbeddingJob = `-- name: LockEmbeddingJob :execrows +update embedding_jobs +set locked_at = ?1, updated_at = ?2 +where message_id = ?3 + and state = 'pending' + and (locked_at is null or locked_at = '' or locked_at < ?4) +` + +type LockEmbeddingJobParams struct { + LockedAt sql.NullString + UpdatedAt string + MessageID string + StaleBefore sql.NullString +} + +func (q *Queries) LockEmbeddingJob(ctx context.Context, arg LockEmbeddingJobParams) (int64, error) { + result, err := q.db.ExecContext(ctx, lockEmbeddingJob, + arg.LockedAt, + arg.UpdatedAt, + arg.MessageID, + arg.StaleBefore, + ) + if err != nil { + return 0, err + } + return result.RowsAffected() +} + +const markEmbeddingJobDone = `-- name: MarkEmbeddingJobDone :exec +update embedding_jobs +set state = 'done', + attempts = 0, + provider = ?, + model = ?, + input_version = ?, + last_error = '', + locked_at = null, + updated_at = ? +where message_id = ? +` + +type MarkEmbeddingJobDoneParams struct { + Provider string + Model string + InputVersion string + UpdatedAt string + MessageID string +} + +func (q *Queries) MarkEmbeddingJobDone(ctx context.Context, arg MarkEmbeddingJobDoneParams) error { + _, err := q.db.ExecContext(ctx, markEmbeddingJobDone, + arg.Provider, + arg.Model, + arg.InputVersion, + arg.UpdatedAt, + arg.MessageID, + ) + return err +} + +const markEmbeddingJobFailed = `-- name: MarkEmbeddingJobFailed :exec +update embedding_jobs +set state = ?, + attempts = ?, + provider = ?, + model = ?, + input_version = ?, + last_error = ?, + locked_at = null, + updated_at = ? +where message_id = ? +` + +type MarkEmbeddingJobFailedParams struct { + State string + Attempts int64 + Provider string + Model string + InputVersion string + LastError string + UpdatedAt string + MessageID string +} + +func (q *Queries) MarkEmbeddingJobFailed(ctx context.Context, arg MarkEmbeddingJobFailedParams) error { + _, err := q.db.ExecContext(ctx, markEmbeddingJobFailed, + arg.State, + arg.Attempts, + arg.Provider, + arg.Model, + arg.InputVersion, + arg.LastError, + arg.UpdatedAt, + arg.MessageID, + ) + return err +} + +const markEmbeddingJobRateLimited = `-- name: MarkEmbeddingJobRateLimited :exec +update embedding_jobs +set state = 'pending', + provider = ?, + model = ?, + input_version = ?, + last_error = ?, + locked_at = null, + updated_at = ? +where message_id = ? +` + +type MarkEmbeddingJobRateLimitedParams struct { + Provider string + Model string + InputVersion string + LastError string + UpdatedAt string + MessageID string +} + +func (q *Queries) MarkEmbeddingJobRateLimited(ctx context.Context, arg MarkEmbeddingJobRateLimitedParams) error { + _, err := q.db.ExecContext(ctx, markEmbeddingJobRateLimited, + arg.Provider, + arg.Model, + arg.InputVersion, + arg.LastError, + arg.UpdatedAt, + arg.MessageID, + ) + return err +} + +const markEmptyEmbeddingJobDone = `-- name: MarkEmptyEmbeddingJobDone :exec +update embedding_jobs +set state = 'done', + provider = ?, + model = ?, + input_version = ?, + last_error = '', + locked_at = null, + updated_at = ? +where message_id = ? +` + +type MarkEmptyEmbeddingJobDoneParams struct { + Provider string + Model string + InputVersion string + UpdatedAt string + MessageID string +} + +func (q *Queries) MarkEmptyEmbeddingJobDone(ctx context.Context, arg MarkEmptyEmbeddingJobDoneParams) error { + _, err := q.db.ExecContext(ctx, markEmptyEmbeddingJobDone, + arg.Provider, + arg.Model, + arg.InputVersion, + arg.UpdatedAt, + arg.MessageID, + ) + return err +} + const markMessageDeleted = `-- name: MarkMessageDeleted :exec update messages set deleted_at = ?, updated_at = ? @@ -1064,6 +1296,67 @@ func (q *Queries) RequeueAllEmbeddingJobs(ctx context.Context, arg RequeueAllEmb return result.RowsAffected() } +const resetEmbeddingJobIdentity = `-- name: ResetEmbeddingJobIdentity :exec +update embedding_jobs +set provider = ?, + model = ?, + input_version = ?, + last_error = '', + locked_at = null, + updated_at = ? +where message_id = ? +` + +type ResetEmbeddingJobIdentityParams struct { + Provider string + Model string + InputVersion string + UpdatedAt string + MessageID string +} + +func (q *Queries) ResetEmbeddingJobIdentity(ctx context.Context, arg ResetEmbeddingJobIdentityParams) error { + _, err := q.db.ExecContext(ctx, resetEmbeddingJobIdentity, + arg.Provider, + arg.Model, + arg.InputVersion, + arg.UpdatedAt, + arg.MessageID, + ) + return err +} + +const resetEmbeddingJobIdentityAndAttempts = `-- name: ResetEmbeddingJobIdentityAndAttempts :exec +update embedding_jobs +set provider = ?, + model = ?, + input_version = ?, + attempts = 0, + last_error = '', + locked_at = null, + updated_at = ? +where message_id = ? +` + +type ResetEmbeddingJobIdentityAndAttemptsParams struct { + Provider string + Model string + InputVersion string + UpdatedAt string + MessageID string +} + +func (q *Queries) ResetEmbeddingJobIdentityAndAttempts(ctx context.Context, arg ResetEmbeddingJobIdentityAndAttemptsParams) error { + _, err := q.db.ExecContext(ctx, resetEmbeddingJobIdentityAndAttempts, + arg.Provider, + arg.Model, + arg.InputVersion, + arg.UpdatedAt, + arg.MessageID, + ) + return err +} + const setSyncState = `-- name: SetSyncState :exec insert into sync_state(scope, cursor, updated_at) values(?, ?, ?) @@ -1374,3 +1667,36 @@ func (q *Queries) UpsertMessage(ctx context.Context, arg UpsertMessageParams) er ) return err } + +const upsertMessageEmbedding = `-- name: UpsertMessageEmbedding :exec +insert into message_embeddings( + message_id, provider, model, input_version, dimensions, embedding_blob, embedded_at +) values(?, ?, ?, ?, ?, ?, ?) +on conflict(message_id, provider, model, input_version) do update set + dimensions = excluded.dimensions, + embedding_blob = excluded.embedding_blob, + embedded_at = excluded.embedded_at +` + +type UpsertMessageEmbeddingParams struct { + MessageID string + Provider string + Model string + InputVersion string + Dimensions int64 + EmbeddingBlob []byte + EmbeddedAt string +} + +func (q *Queries) UpsertMessageEmbedding(ctx context.Context, arg UpsertMessageEmbeddingParams) error { + _, err := q.db.ExecContext(ctx, upsertMessageEmbedding, + arg.MessageID, + arg.Provider, + arg.Model, + arg.InputVersion, + arg.Dimensions, + arg.EmbeddingBlob, + arg.EmbeddedAt, + ) + return err +} From 349ecde72b69f1f5a17ff0fc10f011b205880b27 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Fri, 15 May 2026 10:52:15 +0100 Subject: [PATCH 5/8] refactor: use sqlc for member profile reads --- internal/store/members_profile.go | 98 +++++++--------------- internal/store/sqlc/queries.sql | 38 +++++++++ internal/store/storedb/queries.sql.go | 113 ++++++++++++++++++++++++++ 3 files changed, 180 insertions(+), 69 deletions(-) diff --git a/internal/store/members_profile.go b/internal/store/members_profile.go index d6388cf..59b7131 100644 --- a/internal/store/members_profile.go +++ b/internal/store/members_profile.go @@ -7,6 +7,8 @@ import ( "hash/fnv" "strings" "time" + + "github.com/openclaw/discrawl/internal/store/storedb" ) type MemberProfile struct { @@ -234,81 +236,39 @@ func (s *Store) MemberProfile(ctx context.Context, guildID, userID string, recen Member: *member, RawJSON: member.RawJSON, } - var first string - var last string - if err := s.db.QueryRowContext(ctx, ` - select count(*), coalesce(min(created_at), ''), coalesce(max(created_at), '') - from messages - where guild_id = ? and author_id = ? - `, member.GuildID, member.UserID).Scan(&profile.MessageCount, &first, &last); err != nil { + stats, err := s.q.MemberMessageStats(ctx, storedb.MemberMessageStatsParams{ + GuildID: member.GuildID, + AuthorID: nullString(member.UserID), + }) + if err != nil { return MemberProfile{}, err } - profile.FirstMessageAt = parseTime(first) - profile.LastMessageAt = parseTime(last) + profile.MessageCount = int(stats.MessageCount) + profile.FirstMessageAt = parseTime(stats.FirstMessageAt) + profile.LastMessageAt = parseTime(stats.LastMessageAt) - recentRows, err := s.db.QueryContext(ctx, ` - select - m.id, - m.guild_id, - m.channel_id, - coalesce(c.name, ''), - coalesce(m.author_id, ''), - coalesce( - nullif(mem.display_name, ''), - nullif(mem.nick, ''), - nullif(mem.global_name, ''), - nullif(mem.username, ''), - nullif(json_extract(m.raw_json, '$.author.global_name'), ''), - nullif(json_extract(m.raw_json, '$.author.username'), ''), - '' - ), - case - when trim(coalesce(m.content, '')) <> '' then m.content - else m.normalized_content - end, - m.created_at, - coalesce(m.reply_to_message_id, ''), - m.has_attachments, - m.pinned - from messages m - left join channels c on c.id = m.channel_id - left join members mem on mem.guild_id = m.guild_id and mem.user_id = m.author_id - where m.guild_id = ? and m.author_id = ? - order by m.created_at desc, m.id desc - limit ? - `, member.GuildID, member.UserID, recentLimit) + recentRows, err := s.q.ListRecentMemberMessages(ctx, storedb.ListRecentMemberMessagesParams{ + GuildID: member.GuildID, + AuthorID: nullString(member.UserID), + Limit: int64(recentLimit), + }) if err != nil { return MemberProfile{}, err } - defer func() { _ = recentRows.Close() }() - - for recentRows.Next() { - var row MessageRow - var created string - var hasAttachments int - var pinned int - if err := recentRows.Scan( - &row.MessageID, - &row.GuildID, - &row.ChannelID, - &row.ChannelName, - &row.AuthorID, - &row.AuthorName, - &row.Content, - &created, - &row.ReplyToMessage, - &hasAttachments, - &pinned, - ); err != nil { - return MemberProfile{}, err - } - row.CreatedAt = parseTime(created) - row.HasAttachments = hasAttachments == 1 - row.Pinned = pinned == 1 - profile.RecentMessages = append(profile.RecentMessages, row) - } - if err := recentRows.Err(); err != nil { - return MemberProfile{}, err + for _, recent := range recentRows { + profile.RecentMessages = append(profile.RecentMessages, MessageRow{ + MessageID: recent.MessageID, + GuildID: recent.GuildID, + ChannelID: recent.ChannelID, + ChannelName: recent.ChannelName, + AuthorID: recent.AuthorID, + AuthorName: recent.AuthorName, + Content: recent.Content, + CreatedAt: parseTime(recent.CreatedAt), + ReplyToMessage: recent.ReplyToMessage, + HasAttachments: recent.HasAttachments == 1, + Pinned: recent.Pinned == 1, + }) } return profile, nil } diff --git a/internal/store/sqlc/queries.sql b/internal/store/sqlc/queries.sql index 673fbf5..5e3a4d4 100644 --- a/internal/store/sqlc/queries.sql +++ b/internal/store/sqlc/queries.sql @@ -123,6 +123,44 @@ from members where user_id = ? order by guild_id, username; +-- name: MemberMessageStats :one +select count(*) as message_count, + cast(coalesce(min(created_at), '') as text) as first_message_at, + cast(coalesce(max(created_at), '') as text) as last_message_at +from messages +where guild_id = ? and author_id = ?; + +-- name: ListRecentMemberMessages :many +select + m.id as message_id, + m.guild_id, + m.channel_id, + coalesce(c.name, '') as channel_name, + coalesce(m.author_id, '') as author_id, + cast(coalesce( + nullif(mem.display_name, ''), + nullif(mem.nick, ''), + nullif(mem.global_name, ''), + nullif(mem.username, ''), + nullif(json_extract(m.raw_json, '$.author.global_name'), ''), + nullif(json_extract(m.raw_json, '$.author.username'), ''), + '' + ) as text) as author_name, + cast(case + when trim(coalesce(m.content, '')) <> '' then m.content + else m.normalized_content + end as text) as content, + m.created_at, + coalesce(m.reply_to_message_id, '') as reply_to_message, + m.has_attachments, + m.pinned +from messages m +left join channels c on c.id = m.channel_id +left join members mem on mem.guild_id = m.guild_id and mem.user_id = m.author_id +where m.guild_id = ? and m.author_id = ? +order by m.created_at desc, m.id desc +limit ?; + -- name: ListChannels :many select id, guild_id, coalesce(parent_id, '') as parent_id, kind, name, coalesce(topic, '') as topic, position, is_nsfw, is_archived, diff --git a/internal/store/storedb/queries.sql.go b/internal/store/storedb/queries.sql.go index 6ffe09e..adaf1bf 100644 --- a/internal/store/storedb/queries.sql.go +++ b/internal/store/storedb/queries.sql.go @@ -1084,6 +1084,93 @@ func (q *Queries) ListPendingEmbeddingJobs(ctx context.Context, arg ListPendingE return items, nil } +const listRecentMemberMessages = `-- name: ListRecentMemberMessages :many +select + m.id as message_id, + m.guild_id, + m.channel_id, + coalesce(c.name, '') as channel_name, + coalesce(m.author_id, '') as author_id, + cast(coalesce( + nullif(mem.display_name, ''), + nullif(mem.nick, ''), + nullif(mem.global_name, ''), + nullif(mem.username, ''), + nullif(json_extract(m.raw_json, '$.author.global_name'), ''), + nullif(json_extract(m.raw_json, '$.author.username'), ''), + '' + ) as text) as author_name, + cast(case + when trim(coalesce(m.content, '')) <> '' then m.content + else m.normalized_content + end as text) as content, + m.created_at, + coalesce(m.reply_to_message_id, '') as reply_to_message, + m.has_attachments, + m.pinned +from messages m +left join channels c on c.id = m.channel_id +left join members mem on mem.guild_id = m.guild_id and mem.user_id = m.author_id +where m.guild_id = ? and m.author_id = ? +order by m.created_at desc, m.id desc +limit ? +` + +type ListRecentMemberMessagesParams struct { + GuildID string + AuthorID sql.NullString + Limit int64 +} + +type ListRecentMemberMessagesRow struct { + MessageID string + GuildID string + ChannelID string + ChannelName string + AuthorID string + AuthorName string + Content string + CreatedAt string + ReplyToMessage string + HasAttachments int64 + Pinned int64 +} + +func (q *Queries) ListRecentMemberMessages(ctx context.Context, arg ListRecentMemberMessagesParams) ([]ListRecentMemberMessagesRow, error) { + rows, err := q.db.QueryContext(ctx, listRecentMemberMessages, arg.GuildID, arg.AuthorID, arg.Limit) + if err != nil { + return nil, err + } + defer rows.Close() + var items []ListRecentMemberMessagesRow + for rows.Next() { + var i ListRecentMemberMessagesRow + if err := rows.Scan( + &i.MessageID, + &i.GuildID, + &i.ChannelID, + &i.ChannelName, + &i.AuthorID, + &i.AuthorName, + &i.Content, + &i.CreatedAt, + &i.ReplyToMessage, + &i.HasAttachments, + &i.Pinned, + ); err != nil { + return nil, err + } + items = append(items, i) + } + if err := rows.Close(); err != nil { + return nil, err + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + const lockEmbeddingJob = `-- name: LockEmbeddingJob :execrows update embedding_jobs set locked_at = ?1, updated_at = ?2 @@ -1263,6 +1350,32 @@ func (q *Queries) MarkMessageDeleted(ctx context.Context, arg MarkMessageDeleted return err } +const memberMessageStats = `-- name: MemberMessageStats :one +select count(*) as message_count, + cast(coalesce(min(created_at), '') as text) as first_message_at, + cast(coalesce(max(created_at), '') as text) as last_message_at +from messages +where guild_id = ? and author_id = ? +` + +type MemberMessageStatsParams struct { + GuildID string + AuthorID sql.NullString +} + +type MemberMessageStatsRow struct { + MessageCount int64 + FirstMessageAt string + LastMessageAt string +} + +func (q *Queries) MemberMessageStats(ctx context.Context, arg MemberMessageStatsParams) (MemberMessageStatsRow, error) { + row := q.db.QueryRowContext(ctx, memberMessageStats, arg.GuildID, arg.AuthorID) + var i MemberMessageStatsRow + err := row.Scan(&i.MessageCount, &i.FirstMessageAt, &i.LastMessageAt) + return i, err +} + const requeueAllEmbeddingJobs = `-- name: RequeueAllEmbeddingJobs :execrows update embedding_jobs set state = 'pending', From 2f740bea15fb9320e292969dd85f16e768dda0b1 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Fri, 15 May 2026 10:53:17 +0100 Subject: [PATCH 6/8] test: guard sqlc schema drift --- internal/store/sqlc/schema.sql | 8 ---- internal/store/sqlc_schema_test.go | 73 ++++++++++++++++++++++++++++++ internal/store/storedb/models.go | 34 ++++++-------- 3 files changed, 86 insertions(+), 29 deletions(-) create mode 100644 internal/store/sqlc_schema_test.go diff --git a/internal/store/sqlc/schema.sql b/internal/store/sqlc/schema.sql index 8f07184..98d3035 100644 --- a/internal/store/sqlc/schema.sql +++ b/internal/store/sqlc/schema.sql @@ -38,14 +38,6 @@ create table members ( role_ids_json text not null, raw_json text not null, updated_at text not null, - bio text not null default '', - pronouns text not null default '', - location text not null default '', - website text not null default '', - x_handle text not null default '', - github_login text not null default '', - urls_json text not null default '[]', - profile_updated_at text, primary key (guild_id, user_id) ); diff --git a/internal/store/sqlc_schema_test.go b/internal/store/sqlc_schema_test.go new file mode 100644 index 0000000..60b8e3a --- /dev/null +++ b/internal/store/sqlc_schema_test.go @@ -0,0 +1,73 @@ +package store + +import ( + "context" + "database/sql" + "os" + "path/filepath" + "strings" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestSQLCSchemaMirrorsRuntimeTables(t *testing.T) { + t.Parallel() + + ctx := context.Background() + runtimeStore, err := Open(ctx, filepath.Join(t.TempDir(), "runtime.db")) + require.NoError(t, err) + defer func() { _ = runtimeStore.Close() }() + + schemaDB, err := sql.Open("sqlite", filepath.Join(t.TempDir(), "sqlc.db")) + require.NoError(t, err) + defer func() { _ = schemaDB.Close() }() + + body, err := os.ReadFile(filepath.Join("sqlc", "schema.sql")) + require.NoError(t, err) + for _, stmt := range strings.Split(string(body), ";") { + stmt = strings.TrimSpace(stmt) + if stmt == "" || strings.HasPrefix(stmt, "--") { + continue + } + _, err := schemaDB.ExecContext(ctx, stmt) + require.NoError(t, err, stmt) + } + + for _, table := range []string{ + "guilds", + "channels", + "members", + "messages", + "message_events", + "message_attachments", + "mention_events", + "sync_state", + "embedding_jobs", + "message_embeddings", + } { + require.Equal(t, tableColumns(t, runtimeStore.DB(), table), tableColumns(t, schemaDB, table), table) + } +} + +func tableColumns(t *testing.T, db *sql.DB, table string) []string { + t.Helper() + + rows, err := db.Query(`pragma table_info(` + table + `)`) + require.NoError(t, err) + defer func() { _ = rows.Close() }() + + var columns []string + for rows.Next() { + var cid int + var name string + var columnType string + var notNull int + var defaultValue sql.NullString + var pk int + require.NoError(t, rows.Scan(&cid, &name, &columnType, ¬Null, &defaultValue, &pk)) + columns = append(columns, name) + } + require.NoError(t, rows.Err()) + return columns +} diff --git a/internal/store/storedb/models.go b/internal/store/storedb/models.go index dd10739..eb28919 100644 --- a/internal/store/storedb/models.go +++ b/internal/store/storedb/models.go @@ -47,27 +47,19 @@ type Guild struct { } type Member struct { - GuildID string - UserID string - Username string - GlobalName sql.NullString - DisplayName sql.NullString - Nick sql.NullString - Discriminator sql.NullString - Avatar sql.NullString - Bot int64 - JoinedAt sql.NullString - RoleIdsJson string - RawJson string - UpdatedAt string - Bio string - Pronouns string - Location string - Website string - XHandle string - GithubLogin string - UrlsJson string - ProfileUpdatedAt sql.NullString + GuildID string + UserID string + Username string + GlobalName sql.NullString + DisplayName sql.NullString + Nick sql.NullString + Discriminator sql.NullString + Avatar sql.NullString + Bot int64 + JoinedAt sql.NullString + RoleIdsJson string + RawJson string + UpdatedAt string } type MemberFt struct { From d3062d3d972d7d3784ee7c37e10e4f9ccc7c0986 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Fri, 15 May 2026 10:58:53 +0100 Subject: [PATCH 7/8] fix: preserve member enrichment in sqlc mappers --- internal/store/query.go | 18 ++++++++++++------ internal/store/store_test.go | 8 ++++++-- 2 files changed, 18 insertions(+), 8 deletions(-) diff --git a/internal/store/query.go b/internal/store/query.go index 63be6ed..1772d12 100644 --- a/internal/store/query.go +++ b/internal/store/query.go @@ -495,7 +495,7 @@ func (s *Store) Members(ctx context.Context, guildID, query string, limit int) ( } out = make([]MemberRow, 0, len(rows)) for _, row := range rows { - out = append(out, MemberRow{ + member := MemberRow{ GuildID: row.GuildID, UserID: row.UserID, Username: row.Username, @@ -508,7 +508,9 @@ func (s *Store) Members(ctx context.Context, guildID, query string, limit int) ( Bot: row.Bot == 1, JoinedAt: parseTime(row.JoinedAt), RawJSON: row.RawJson, - }) + } + enrichMemberRow(&member) + out = append(out, member) } return out, nil } @@ -518,7 +520,7 @@ func (s *Store) Members(ctx context.Context, guildID, query string, limit int) ( } out = make([]MemberRow, 0, len(rows)) for _, row := range rows { - out = append(out, MemberRow{ + member := MemberRow{ GuildID: row.GuildID, UserID: row.UserID, Username: row.Username, @@ -531,7 +533,9 @@ func (s *Store) Members(ctx context.Context, guildID, query string, limit int) ( Bot: row.Bot == 1, JoinedAt: parseTime(row.JoinedAt), RawJSON: row.RawJson, - }) + } + enrichMemberRow(&member) + out = append(out, member) } return out, nil } @@ -543,7 +547,7 @@ func (s *Store) MemberByID(ctx context.Context, userID string) ([]MemberRow, err } out := make([]MemberRow, 0, len(rows)) for _, row := range rows { - out = append(out, MemberRow{ + member := MemberRow{ GuildID: row.GuildID, UserID: row.UserID, Username: row.Username, @@ -556,7 +560,9 @@ func (s *Store) MemberByID(ctx context.Context, userID string) ([]MemberRow, err Bot: row.Bot == 1, JoinedAt: parseTime(row.JoinedAt), RawJSON: row.RawJson, - }) + } + enrichMemberRow(&member) + out = append(out, member) } return out, nil } diff --git a/internal/store/store_test.go b/internal/store/store_test.go index f8dc7c1..010ec1d 100644 --- a/internal/store/store_test.go +++ b/internal/store/store_test.go @@ -1413,11 +1413,14 @@ func TestUpsertAndDeleteMember(t *testing.T) { Username: "peter", DisplayName: "Peter", RoleIDsJSON: `[]`, - RawJSON: `{}`, + RawJSON: `{"bio":"Builds tools","github":"steipete","url":"https://steipete.me"}`, })) rows, err := s.MemberByID(ctx, "u1") require.NoError(t, err) require.Len(t, rows, 1) + require.Equal(t, "Builds tools", rows[0].Bio) + require.Equal(t, "steipete", rows[0].GitHubLogin) + require.Equal(t, "https://steipete.me", rows[0].Website) require.NoError(t, s.DeleteMember(ctx, "g1", "u1")) rows, err = s.MemberByID(ctx, "u1") @@ -1429,12 +1432,13 @@ func TestUpsertAndDeleteMember(t *testing.T) { UserID: "u2", Username: "other", RoleIDsJSON: `[]`, - RawJSON: `{}`, + RawJSON: `{"bio":"Other bio"}`, }})) rows, err = s.Members(ctx, "g1", "", 10) require.NoError(t, err) require.Len(t, rows, 1) require.Equal(t, "u2", rows[0].UserID) + require.Equal(t, "Other bio", rows[0].Bio) } func TestOpenTightensDBFilePerms(t *testing.T) { From 5b5afe96dcb99744156f90c539bec809cda067e7 Mon Sep 17 00:00:00 2001 From: Peter Steinberger Date: Fri, 15 May 2026 11:11:31 +0100 Subject: [PATCH 8/8] ci: keep sqlc refactor checks green --- .github/workflows/ci.yml | 5 ++++- internal/store/attachments_test.go | 1 + internal/store/query.go | 8 ++++---- internal/store/sqlc_schema_test.go | 8 ++++---- internal/store/store_test.go | 14 ++++++++++++++ internal/store/write.go | 5 +++-- 6 files changed, 30 insertions(+), 11 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index df9585a..d319b8e 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -72,12 +72,15 @@ jobs: - name: Test with coverage run: go test -count=1 ./... -coverprofile=coverage.out + - name: Exclude generated code from coverage + run: grep -v '^github.com/openclaw/discrawl/internal/store/storedb/' coverage.out > coverage.filtered.out + - name: Test with race detector run: go test -count=1 -race ./... - name: Enforce coverage floor run: | - total="$(go tool cover -func=coverage.out | awk '/^total:/ { sub(/%$/, "", $3); print $3 }')" + total="$(go tool cover -func=coverage.filtered.out | awk '/^total:/ { sub(/%$/, "", $3); print $3 }')" awk -v total="$total" 'BEGIN { if (total == "") { print "missing coverage total" diff --git a/internal/store/attachments_test.go b/internal/store/attachments_test.go index 6183cdf..7bbb545 100644 --- a/internal/store/attachments_test.go +++ b/internal/store/attachments_test.go @@ -56,6 +56,7 @@ func TestAttachmentMediaUpdatesAndFilters(t *testing.T) { FetchStatus: "fetched", })) require.NoError(t, s.UpdateAttachmentFetchStatus(ctx, "a2", "2026-05-15T12:06:00Z", "failed", "boom")) + require.NoError(t, seedAttachmentForGuild(ctx, s, "g1", "c1", "m1", "a1")) rows, err := s.ListAttachments(ctx, AttachmentListOptions{MissingOnly: true}) require.NoError(t, err) diff --git a/internal/store/query.go b/internal/store/query.go index 1772d12..e24c343 100644 --- a/internal/store/query.go +++ b/internal/store/query.go @@ -45,7 +45,7 @@ type SemanticSearchOptions struct { func (s *Store) GetSyncState(ctx context.Context, scope string) (string, error) { cursor, err := s.q.GetSyncState(ctx, scope) if err != nil { - if err == sql.ErrNoRows { + if errors.Is(err, sql.ErrNoRows) { return "", nil } return "", err @@ -670,18 +670,18 @@ func (s *Store) Status(ctx context.Context, dbPath, defaultGuildID string) (Stat status.ThreadCount = int(threadCount) lastSync, err := s.q.GetSyncUpdatedAt(ctx, "sync:last_success") - if err != nil && err != sql.ErrNoRows { + if err != nil && !errors.Is(err, sql.ErrNoRows) { return Status{}, err } status.LastSyncAt = parseTime(lastSync) lastTail, err := s.q.GetSyncUpdatedAt(ctx, "tail:last_event") - if err != nil && err != sql.ErrNoRows { + if err != nil && !errors.Is(err, sql.ErrNoRows) { return Status{}, err } status.LastTailEventAt = parseTime(lastTail) if defaultGuildID != "" { name, err := s.q.GetGuildName(ctx, defaultGuildID) - if err != nil && err != sql.ErrNoRows { + if err != nil && !errors.Is(err, sql.ErrNoRows) { return Status{}, err } status.DefaultGuildName = name diff --git a/internal/store/sqlc_schema_test.go b/internal/store/sqlc_schema_test.go index 60b8e3a..c80c8db 100644 --- a/internal/store/sqlc_schema_test.go +++ b/internal/store/sqlc_schema_test.go @@ -25,7 +25,7 @@ func TestSQLCSchemaMirrorsRuntimeTables(t *testing.T) { body, err := os.ReadFile(filepath.Join("sqlc", "schema.sql")) require.NoError(t, err) - for _, stmt := range strings.Split(string(body), ";") { + for stmt := range strings.SplitSeq(string(body), ";") { stmt = strings.TrimSpace(stmt) if stmt == "" || strings.HasPrefix(stmt, "--") { continue @@ -46,14 +46,14 @@ func TestSQLCSchemaMirrorsRuntimeTables(t *testing.T) { "embedding_jobs", "message_embeddings", } { - require.Equal(t, tableColumns(t, runtimeStore.DB(), table), tableColumns(t, schemaDB, table), table) + require.Equal(t, tableColumns(ctx, t, runtimeStore.DB(), table), tableColumns(ctx, t, schemaDB, table), table) } } -func tableColumns(t *testing.T, db *sql.DB, table string) []string { +func tableColumns(ctx context.Context, t *testing.T, db *sql.DB, table string) []string { t.Helper() - rows, err := db.Query(`pragma table_info(` + table + `)`) + rows, err := db.QueryContext(ctx, `pragma table_info(`+table+`)`) require.NoError(t, err) defer func() { _ = rows.Close() }() diff --git a/internal/store/store_test.go b/internal/store/store_test.go index 010ec1d..1e6b7aa 100644 --- a/internal/store/store_test.go +++ b/internal/store/store_test.go @@ -153,6 +153,20 @@ func TestStoreMaintenanceHelpers(t *testing.T) { require.NoError(t, err) require.Equal(t, os.FileMode(0o600), info.Mode().Perm()) require.NoError(t, s.RebuildSearchIndexes(ctx)) + messageVersion, err := s.GetSyncState(ctx, "schema:message_fts_rowid_version") + require.NoError(t, err) + require.Equal(t, messageFTSVersion, messageVersion) + memberVersion, err := s.GetSyncState(ctx, "schema:member_fts_rowid_version") + require.NoError(t, err) + require.Equal(t, memberFTSVersion, memberVersion) + require.NoError(t, s.RebuildMessageSearchIndex(ctx)) + messageVersion, err = s.GetSyncState(ctx, "schema:message_fts_rowid_version") + require.NoError(t, err) + require.Equal(t, messageFTSVersion, messageVersion) + require.NoError(t, s.RebuildMemberSearchIndex(ctx)) + memberVersion, err = s.GetSyncState(ctx, "schema:member_fts_rowid_version") + require.NoError(t, err) + require.Equal(t, memberFTSVersion, memberVersion) version, err := s.schemaVersion(ctx) require.NoError(t, err) require.Equal(t, storeSchemaVersion, version) diff --git a/internal/store/write.go b/internal/store/write.go index f32ed9a..f174897 100644 --- a/internal/store/write.go +++ b/internal/store/write.go @@ -4,6 +4,7 @@ import ( "context" "database/sql" "encoding/json" + "errors" "time" "github.com/openclaw/discrawl/internal/store/storedb" @@ -295,7 +296,7 @@ func upsertMessageTx(ctx context.Context, tx *sql.Tx, qtx *storedb.Queries, mess if opts.EnqueueEmbedding { normalized, err := qtx.GetMessageNormalizedContent(ctx, message.ID) previousErr = err - if previousErr != nil && previousErr != sql.ErrNoRows { + if previousErr != nil && !errors.Is(previousErr, sql.ErrNoRows) { return previousErr } if previousErr == nil { @@ -321,7 +322,7 @@ func upsertMessageTx(ctx context.Context, tx *sql.Tx, qtx *storedb.Queries, mess return err } } - queueEmbedding := opts.EnqueueEmbedding && (previousErr == sql.ErrNoRows || previousNormalized.String != message.NormalizedContent || !jobExists) + queueEmbedding := opts.EnqueueEmbedding && (errors.Is(previousErr, sql.ErrNoRows) || previousNormalized.String != message.NormalizedContent || !jobExists) if queueEmbedding { if err := qtx.UpsertEmbeddingJobPending(ctx, storedb.UpsertEmbeddingJobPendingParams{MessageID: message.ID, UpdatedAt: now}); err != nil { return err