From e23fda1f7dd46537e1e96d221d84e8362e2b090a Mon Sep 17 00:00:00 2001 From: Anshuman Singh Date: Thu, 9 Apr 2026 17:10:35 +0530 Subject: [PATCH 1/8] feat: add event import service with batch persistence --- src/services/event-import-service.ts | 450 +++++++++++++++++++++++++++ 1 file changed, 450 insertions(+) create mode 100644 src/services/event-import-service.ts diff --git a/src/services/event-import-service.ts b/src/services/event-import-service.ts new file mode 100644 index 00000000..7c366cb1 --- /dev/null +++ b/src/services/event-import-service.ts @@ -0,0 +1,450 @@ +import fs from 'fs' +import readline from 'readline' + +import { Knex } from 'knex' + +import { DatabaseClient, EventId } from '../@types/base' +import { + getEventExpiration, + isDeleteEvent, + isEphemeralEvent, + isEventIdValid, + isEventSignatureValid, + isParameterizedReplaceableEvent, + isReplaceableEvent, +} from '../utils/event' +import { toBuffer, toJSON } from '../utils/transform' +import { attemptValidation } from '../utils/validation' + +import { Event } from '../@types/event' +import { eventSchema } from '../schemas/event-schema' +import { EventTags } from '../constants/base' + +const DEFAULT_BATCH_SIZE = 1000 + +const REPLACEABLE_EVENT_CONFLICT_TARGET = + '(event_pubkey, event_kind, event_deduplication) ' + + 'WHERE (event_kind = 0 OR event_kind = 3 OR event_kind = 41 ' + + 'OR (event_kind >= 10000 AND event_kind < 20000)) ' + + 'OR (event_kind >= 30000 AND event_kind < 40000)' + +interface ImportEventRow { + deleted_at: null + event_content: string + event_created_at: number + event_deduplication: string | null + event_id: Buffer + event_kind: number + event_pubkey: Buffer + event_signature: Buffer + event_tags: string + expires_at: number | null +} + +export interface EventImportStats { + errors: number + inserted: number + processed: number + skipped: number +} + +export interface EventImportLineError { + lineNumber: number + reason: string +} + +export interface EventImportOptions { + batchSize?: number + onLineError?: (lineError: EventImportLineError) => void + onProgress?: (stats: EventImportStats) => void +} + +const getErrorMessage = (error: unknown): string => { + if (error instanceof Error) { + return error.message + } + + return String(error) +} + +const getAffectedRowCount = (result: unknown): number => { + if (Array.isArray(result)) { + return result.length + } + + if ( + typeof result === 'object' + && result !== null + && 'rowCount' in result + && typeof (result as { rowCount: unknown }).rowCount === 'number' + ) { + return Number((result as { rowCount: number }).rowCount) + } + + return 0 +} + +const isEventIdUniqueViolation = (error: unknown): boolean => { + if (typeof error !== 'object' || error === null) { + return false + } + + const dbError = error as { + code?: string + constraint?: string + message?: string + } + + return dbError.code === '23505' + && ( + dbError.constraint === 'events_event_id_unique' + || dbError.message?.includes('events_event_id_unique') === true + ) +} + +const isValidDeleteTag = (tag: string[]): boolean => { + return tag.length >= 2 + && tag[0] === EventTags.Event + && /^[0-9a-f]{64}$/.test(tag[1]) +} + +const getDeleteTargetEventIds = (event: Event): EventId[] => { + return event.tags.reduce((eventIds, tag) => { + if (isValidDeleteTag(tag)) { + eventIds.push(tag[1]) + } + + return eventIds + }, [] as EventId[]) +} + +const isEventReplaceableForStorage = (event: Event): boolean => { + return isReplaceableEvent(event) || isParameterizedReplaceableEvent(event) +} + +const getReplaceableEventDeduplication = (event: Event): string => { + if (isParameterizedReplaceableEvent(event)) { + const [, ...deduplication] = event.tags.find( + (tag) => tag.length >= 2 && tag[0] === EventTags.Deduplication, + ) ?? [null, ''] + + return toJSON(deduplication) + } + + return toJSON([event.pubkey, event.kind]) +} + +const getReplaceableEventKey = (event: Event): string => { + return `${event.pubkey}:${event.kind}:${getReplaceableEventDeduplication(event)}` +} + +const toImportEventRow = (event: Event): ImportEventRow => { + const expiresAt = getEventExpiration(event) + + return { + deleted_at: null, + event_content: event.content, + event_created_at: event.created_at, + event_deduplication: ( + isReplaceableEvent(event) || isParameterizedReplaceableEvent(event) + ? getReplaceableEventDeduplication(event) + : null + ), + event_id: toBuffer(event.id), + event_kind: event.kind, + event_pubkey: toBuffer(event.pubkey), + event_signature: toBuffer(event.sig), + event_tags: toJSON(event.tags), + expires_at: typeof expiresAt === 'number' ? expiresAt : null, + } +} + +const applyDeleteEvents = async ( + transaction: Knex.Transaction, + deleteEvent: Event, +): Promise => { + const eventIds = getDeleteTargetEventIds(deleteEvent) + if (!eventIds.length) { + return + } + + await transaction('events') + .where('event_pubkey', toBuffer(deleteEvent.pubkey)) + .whereIn('event_id', eventIds.map(toBuffer)) + .whereNull('deleted_at') + .update({ + deleted_at: transaction.raw('now()'), + }) +} + +const insertRegularEvents = async ( + transaction: Knex.Transaction, + events: Event[], +): Promise => { + if (!events.length) { + return 0 + } + + const rows = events.map(toImportEventRow) + + const result = await transaction('events') + .insert(rows) + .onConflict() + .ignore() + .returning('event_id') + + return getAffectedRowCount(result) +} + +const filterOutExistingEventIds = async ( + transaction: Knex.Transaction, + events: Event[], +): Promise => { + if (!events.length) { + return [] + } + + const existingRows = await transaction('events') + .select('event_id') + .whereIn('event_id', events.map((event) => toBuffer(event.id))) as Array<{ event_id: Buffer }> + + const existingEventIds = new Set(existingRows.map((row) => row.event_id.toString('hex'))) + + return events.filter((event) => !existingEventIds.has(event.id)) +} + +const upsertReplaceableEvents = async ( + transaction: Knex.Transaction, + events: Event[], +): Promise => { + if (!events.length) { + return 0 + } + + let pendingEvents = events + + while (pendingEvents.length) { + const deduplicatedByEventId = new Map() + for (const event of pendingEvents) { + deduplicatedByEventId.set(event.id, event) + } + + pendingEvents = Array.from(deduplicatedByEventId.values()) + + const rows = pendingEvents.map(toImportEventRow) + + try { + const result = await transaction('events') + .insert(rows) + .onConflict(transaction.raw(REPLACEABLE_EVENT_CONFLICT_TARGET)) + .merge([ + 'deleted_at', + 'event_content', + 'event_created_at', + 'event_id', + 'event_signature', + 'event_tags', + 'expires_at', + ]) + .whereRaw('"events"."event_created_at" < "excluded"."event_created_at"') + .returning('event_id') + + return getAffectedRowCount(result) + } catch (error) { + if (!isEventIdUniqueViolation(error)) { + throw error + } + + const filteredEvents = await filterOutExistingEventIds(transaction, pendingEvents) + + if (filteredEvents.length === pendingEvents.length) { + throw error + } + + pendingEvents = filteredEvents + } + } + + return 0 +} + +export const createEventBatchPersister = + (dbClient: DatabaseClient) => + async (events: Event[]): Promise => { + if (!events.length) { + return 0 + } + + return dbClient.transaction(async (transaction) => { + let inserted = 0 + + let nonDeleteSegment: Event[] = [] + + const flushNonDeleteSegment = async () => { + if (!nonDeleteSegment.length) { + return + } + + const regularEvents: Event[] = [] + const replaceableEventsByKey = new Map() + + for (const event of nonDeleteSegment) { + if (isEventReplaceableForStorage(event)) { + const deduplicationKey = getReplaceableEventKey(event) + const existingEvent = replaceableEventsByKey.get(deduplicationKey) + + if (!existingEvent || existingEvent.created_at < event.created_at) { + replaceableEventsByKey.set(deduplicationKey, event) + } + + continue + } + + regularEvents.push(event) + } + + inserted += await insertRegularEvents(transaction, regularEvents) + + const upsertEvents = await filterOutExistingEventIds( + transaction, + Array.from(replaceableEventsByKey.values()), + ) + + inserted += await upsertReplaceableEvents(transaction, upsertEvents) + + nonDeleteSegment = [] + } + + for (const event of events) { + if (isEphemeralEvent(event)) { + continue + } + + if (isDeleteEvent(event)) { + await flushNonDeleteSegment() + + await applyDeleteEvents(transaction, event) + + inserted += await insertRegularEvents(transaction, [event]) + + continue + } + + nonDeleteSegment.push(event) + } + + await flushNonDeleteSegment() + + return inserted + }) + } + +export class EventImportService { + public constructor( + private readonly persistBatch: (events: Event[]) => Promise, + ) {} + + public async importFromJsonl( + filePath: string, + options: EventImportOptions = {}, + ): Promise { + const batchSize = ( + typeof options.batchSize === 'number' + && Number.isInteger(options.batchSize) + && options.batchSize > 0 + ) ? options.batchSize : DEFAULT_BATCH_SIZE + + const onLineError = options.onLineError ?? (() => undefined) + const onProgress = options.onProgress ?? (() => undefined) + + const validateEventSchema = attemptValidation(eventSchema) + + const batch: Event[] = [] + const stats: EventImportStats = { + errors: 0, + inserted: 0, + processed: 0, + skipped: 0, + } + + let lineNumber = 0 + + const flushBatch = async () => { + if (!batch.length) { + return + } + + const batchSize = batch.length + const inserted = await this.persistBatch(batch) + + if (!Number.isInteger(inserted) || inserted < 0 || inserted > batchSize) { + throw new Error( + `Invalid insert count (${inserted}) for batch size ${batchSize}`, + ) + } + + stats.inserted += inserted + stats.skipped += batchSize - inserted + batch.length = 0 + + onProgress({ ...stats }) + } + + const stream = fs.createReadStream(filePath, { + encoding: 'utf-8', + }) + + const lineReader = readline.createInterface({ + crlfDelay: Infinity, + input: stream, + }) + + try { + for await (const line of lineReader) { + lineNumber += 1 + + const trimmedLine = line.trim() + if (!trimmedLine.length) { + continue + } + + stats.processed += 1 + + let event: Event + try { + event = validateEventSchema(JSON.parse(trimmedLine)) as Event + + if (!await isEventIdValid(event)) { + throw new Error('invalid: event id does not match') + } + + if (!await isEventSignatureValid(event)) { + throw new Error('invalid: event signature verification failed') + } + } catch (error) { + stats.errors += 1 + onLineError({ + lineNumber, + reason: getErrorMessage(error), + }) + + continue + } + + batch.push(event) + + if (batch.length >= batchSize) { + await flushBatch() + } + } + + await flushBatch() + onProgress({ ...stats }) + + return stats + } finally { + lineReader.close() + stream.destroy() + } + } +} From 38c7adf347beef7b48038c150ce0000c4a865a3f Mon Sep 17 00:00:00 2001 From: Anshuman Singh Date: Thu, 9 Apr 2026 17:10:47 +0530 Subject: [PATCH 2/8] feat: add CLI entrypoint for jsonl import --- src/import-events.ts | 181 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 181 insertions(+) create mode 100644 src/import-events.ts diff --git a/src/import-events.ts b/src/import-events.ts new file mode 100644 index 00000000..579deef0 --- /dev/null +++ b/src/import-events.ts @@ -0,0 +1,181 @@ +import { extname, resolve } from 'path' + +import fs from 'fs' + +import dotenv from 'dotenv' + +dotenv.config() + +import { + createEventBatchPersister, + EventImportLineError, + EventImportService, + EventImportStats, +} from './services/event-import-service' +import { getMasterDbClient } from './database/client' + +interface CliOptions { + batchSize: number + filePath: string + showHelp: boolean +} + +const DEFAULT_BATCH_SIZE = 1000 +const MAX_ERROR_LOGS = 20 + +const formatNumber = (value: number): string => value.toLocaleString('en-US') + +const formatProgress = (stats: EventImportStats): string => { + return `[Processed: ${formatNumber(stats.processed)} | Inserted: ${formatNumber(stats.inserted)} | Skipped: ${formatNumber(stats.skipped)} | Errors: ${formatNumber(stats.errors)}]` +} + +const printUsage = (): void => { + console.log('Usage: npm run import -- [--batch-size ]') + console.log('Example: npm run import -- ./events.jsonl --batch-size 1000') +} + +const parseBatchSize = (value: string): number => { + const parsedValue = Number(value) + + if (!Number.isInteger(parsedValue) || parsedValue <= 0) { + throw new Error(`Invalid --batch-size value: ${value}`) + } + + return parsedValue +} + +const parseCliArgs = (args: string[]): CliOptions => { + let batchSize = DEFAULT_BATCH_SIZE + let filePath: string | undefined + + if (args.includes('--help') || args.includes('-h')) { + return { + batchSize, + filePath: '', + showHelp: true, + } + } + + for (let i = 0; i < args.length; i++) { + const arg = args[i] + + if (arg === '--batch-size') { + const nextArg = args[i + 1] + if (typeof nextArg !== 'string') { + throw new Error('Missing value for --batch-size') + } + + batchSize = parseBatchSize(nextArg) + i += 1 + continue + } + + if (arg.startsWith('--batch-size=')) { + batchSize = parseBatchSize(arg.split('=', 2)[1]) + continue + } + + if (arg.startsWith('--')) { + throw new Error(`Unknown option: ${arg}`) + } + + if (filePath) { + throw new Error(`Unexpected extra argument: ${arg}`) + } + + filePath = arg + } + + if (!filePath) { + throw new Error('Missing path to .jsonl file') + } + + return { + batchSize, + filePath, + showHelp: false, + } +} + +const ensureValidInputFile = (filePath: string): string => { + const absolutePath = resolve(process.cwd(), filePath) + + if (extname(absolutePath).toLowerCase() !== '.jsonl') { + throw new Error('Input file must have a .jsonl extension') + } + + if (!fs.existsSync(absolutePath)) { + throw new Error(`Input file does not exist: ${absolutePath}`) + } + + const stats = fs.statSync(absolutePath) + if (!stats.isFile()) { + throw new Error(`Input path is not a file: ${absolutePath}`) + } + + return absolutePath +} + +const run = async (): Promise => { + const options = parseCliArgs(process.argv.slice(2)) + + if (options.showHelp) { + printUsage() + return + } + + const absoluteFilePath = ensureValidInputFile(options.filePath) + + const dbClient = getMasterDbClient() + const importer = new EventImportService(createEventBatchPersister(dbClient)) + + let loggedErrors = 0 + let suppressedErrors = 0 + + const onLineError = ({ lineNumber, reason }: EventImportLineError) => { + if (loggedErrors < MAX_ERROR_LOGS) { + console.warn(`[line ${lineNumber}] ${reason}`) + loggedErrors += 1 + return + } + + suppressedErrors += 1 + } + + const onProgress = (stats: EventImportStats) => { + console.log(formatProgress(stats)) + } + + const startedAt = Date.now() + + try { + const stats = await importer.importFromJsonl(absoluteFilePath, { + batchSize: options.batchSize, + onLineError, + onProgress, + }) + + if (suppressedErrors > 0) { + console.warn(`Suppressed ${formatNumber(suppressedErrors)} additional line errors`) + } + + const elapsedSeconds = ((Date.now() - startedAt) / 1000).toFixed(2) + + console.log(`Import completed in ${elapsedSeconds}s`) + console.log(formatProgress(stats)) + } finally { + await dbClient.destroy() + } +} + +if (require.main === module) { + run().catch((error: unknown) => { + if (error instanceof Error) { + console.error(`Import failed: ${error.message}`) + } else { + console.error('Import failed with unknown error') + } + + process.exit(1) + }) +} From 7b078fdf522720dbbfae69942fd38d001b42c7ac Mon Sep 17 00:00:00 2001 From: Anshuman Singh Date: Thu, 9 Apr 2026 17:11:04 +0530 Subject: [PATCH 3/8] test: add unit tests for event import service --- .../services/event-import-service.spec.ts | 179 ++++++++++++++++++ 1 file changed, 179 insertions(+) create mode 100644 test/unit/services/event-import-service.spec.ts diff --git a/test/unit/services/event-import-service.spec.ts b/test/unit/services/event-import-service.spec.ts new file mode 100644 index 00000000..a7e35411 --- /dev/null +++ b/test/unit/services/event-import-service.spec.ts @@ -0,0 +1,179 @@ +import { join } from 'path' + +import fs from 'fs' +import os from 'os' + +import { + EventImportLineError, + EventImportService, + EventImportStats, +} from '../../../src/services/event-import-service' +import { Event } from '../../../src/@types/event' +import { expect } from 'chai' +import { getEvents } from '../data/events' + +describe('EventImportService', () => { + const tmpDirs: string[] = [] + + const createJsonlFile = (lines: string[]): string => { + const tmpDir = fs.mkdtempSync(join(os.tmpdir(), 'nostream-import-')) + tmpDirs.push(tmpDir) + + const filePath = join(tmpDir, 'events.jsonl') + + fs.writeFileSync(filePath, lines.join('\n'), { + encoding: 'utf-8', + }) + + return filePath + } + + afterEach(() => { + for (const tmpDir of tmpDirs.splice(0)) { + fs.rmSync(tmpDir, { + force: true, + recursive: true, + }) + } + }) + + it('imports valid events in batches and tracks skipped duplicates', async () => { + const [event] = getEvents() + const filePath = createJsonlFile([ + JSON.stringify(event), + JSON.stringify(event), + JSON.stringify(event), + ]) + + const batchCalls: Event[][] = [] + const persistBatch = async (events: Event[]): Promise => { + batchCalls.push([...events]) + + if (batchCalls.length === 1) { + return 2 + } + + return 0 + } + + const progressUpdates: EventImportStats[] = [] + + const importer = new EventImportService(persistBatch) + + const stats = await importer.importFromJsonl(filePath, { + batchSize: 2, + onProgress: (progress) => { + progressUpdates.push(progress) + }, + }) + + expect(stats).to.deep.equal({ + errors: 0, + inserted: 2, + processed: 3, + skipped: 1, + }) + + expect(batchCalls.length).to.equal(2) + + const [firstBatch, secondBatch] = batchCalls + + expect(firstBatch.map(({ id }) => id)).to.deep.equal([event.id, event.id]) + expect(secondBatch.map(({ id }) => id)).to.deep.equal([event.id]) + + const finalProgress = progressUpdates[progressUpdates.length - 1] + + expect(finalProgress).to.deep.equal(stats) + }) + + it('counts malformed and invalid events as errors and keeps importing', async () => { + const [event] = getEvents() + + const invalidIdEvent: Event = { + ...event, + content: `${event.content} changed`, + } + + const invalidSignatureEvent: Event = { + ...event, + sig: 'f'.repeat(128), + } + + const filePath = createJsonlFile([ + JSON.stringify(event), + '{not-json}', + JSON.stringify(invalidIdEvent), + JSON.stringify(invalidSignatureEvent), + ]) + + const batchCalls: Event[][] = [] + const persistBatch = async (events: Event[]): Promise => { + batchCalls.push([...events]) + + return 1 + } + + const lineErrors: EventImportLineError[] = [] + + const importer = new EventImportService(persistBatch) + + const stats = await importer.importFromJsonl(filePath, { + batchSize: 10, + onLineError: (lineError) => { + lineErrors.push(lineError) + }, + }) + + expect(stats).to.deep.equal({ + errors: 3, + inserted: 1, + processed: 4, + skipped: 0, + }) + + expect(batchCalls.length).to.equal(1) + expect(batchCalls[0].length).to.equal(1) + expect(lineErrors.length).to.equal(3) + }) + + it('rejects when persistence returns an invalid insert count', async () => { + const [event] = getEvents() + const filePath = createJsonlFile([JSON.stringify(event)]) + + const persistBatch = async (): Promise => 2 + + const importer = new EventImportService(persistBatch) + + try { + await importer.importFromJsonl(filePath) + expect.fail('Expected import to reject when persistence returns invalid insert count') + } catch (error) { + expect((error as Error).message).to.include('Invalid insert count') + } + }) + + it('propagates persistence failures as import failures', async () => { + const [event] = getEvents() + const filePath = createJsonlFile([JSON.stringify(event)]) + + const persistBatch = async (): Promise => { + throw new Error('database unavailable') + } + + const lineErrors: EventImportLineError[] = [] + + const importer = new EventImportService(persistBatch) + + try { + await importer.importFromJsonl(filePath, { + onLineError: (lineError) => { + lineErrors.push(lineError) + }, + }) + expect.fail('Expected import to reject when persistence fails') + } catch (error) { + expect((error as Error).message).to.equal('database unavailable') + expect(lineErrors.length).to.equal(0) + } + }) +}) From ae18173f094016f4a1c9c94f93608bb3e146477f Mon Sep 17 00:00:00 2001 From: Anshuman Singh Date: Thu, 9 Apr 2026 17:13:47 +0530 Subject: [PATCH 4/8] feat: add npm run import script --- package.json | 1 + 1 file changed, 1 insertion(+) diff --git a/package.json b/package.json index 59cf9877..975cfa9d 100644 --- a/package.json +++ b/package.json @@ -31,6 +31,7 @@ "lint": "eslint --ext .ts ./src ./test", "lint:report": "eslint -o .lint-reports/eslint.json -f json --ext .ts ./src ./test", "lint:fix": "npm run lint -- --fix", + "import": "node -r ts-node/register src/import-events.ts", "db:migrate": "knex migrate:latest", "db:migrate:rollback": "knex migrate:rollback", "db:seed": "knex seed:run", From 21a85d6d7949bae2cbe29edd10673d9354f3bb36 Mon Sep 17 00:00:00 2001 From: Anshuman Singh Date: Thu, 9 Apr 2026 17:14:01 +0530 Subject: [PATCH 5/8] docs: add jsonl import usage to README --- README.md | 23 +++++++++++++++++++++++ 1 file changed, 23 insertions(+) diff --git a/README.md b/README.md index 7ddc0ca7..9f809b3c 100644 --- a/README.md +++ b/README.md @@ -220,6 +220,29 @@ Print the Tor hostname: ./scripts/print_tor_hostname ``` +### Importing events from JSON Lines + +You can import NIP-01 events from a `.jsonl` file directly into the relay database. + +Basic import: + ``` + npm run import -- ./events.jsonl + ``` + +Set a custom batch size (default: `1000`): + ``` + npm run import -- ./events.jsonl --batch-size 500 + ``` + +The importer: + +- Processes the file line-by-line to keep memory usage bounded. +- Validates NIP-01 schema, event id hash, and Schnorr signature before insertion. +- Inserts in database transactions per batch. +- Skips duplicates without failing the whole import. +- Prints progress in the format: + `[Processed: 50,000 | Inserted: 45,000 | Skipped: 5,000 | Errors: 0]` + ### Running as a Service By default this server will run continuously until you stop it with Ctrl+C or until the system restarts. From 791c01f58ba12d75faa0826752615331ee9c8d02 Mon Sep 17 00:00:00 2001 From: Anshuman Singh Date: Thu, 9 Apr 2026 20:30:29 +0530 Subject: [PATCH 6/8] refactor: reuse EventRepository instead of raw queries in import service --- src/import-events.ts | 4 +- src/services/event-import-service.ts | 305 +++------------------------ 2 files changed, 30 insertions(+), 279 deletions(-) diff --git a/src/import-events.ts b/src/import-events.ts index 579deef0..52fefa58 100644 --- a/src/import-events.ts +++ b/src/import-events.ts @@ -13,6 +13,7 @@ import { EventImportStats, } from './services/event-import-service' import { getMasterDbClient } from './database/client' +import { EventRepository } from './repositories/event-repository' interface CliOptions { batchSize: number @@ -127,7 +128,8 @@ const run = async (): Promise => { const absoluteFilePath = ensureValidInputFile(options.filePath) const dbClient = getMasterDbClient() - const importer = new EventImportService(createEventBatchPersister(dbClient)) + const eventRepository = new EventRepository(dbClient, dbClient) + const importer = new EventImportService(createEventBatchPersister(eventRepository)) let loggedErrors = 0 let suppressedErrors = 0 diff --git a/src/services/event-import-service.ts b/src/services/event-import-service.ts index 7c366cb1..3945ed27 100644 --- a/src/services/event-import-service.ts +++ b/src/services/event-import-service.ts @@ -1,11 +1,7 @@ import fs from 'fs' import readline from 'readline' -import { Knex } from 'knex' - -import { DatabaseClient, EventId } from '../@types/base' import { - getEventExpiration, isDeleteEvent, isEphemeralEvent, isEventIdValid, @@ -13,34 +9,15 @@ import { isParameterizedReplaceableEvent, isReplaceableEvent, } from '../utils/event' -import { toBuffer, toJSON } from '../utils/transform' import { attemptValidation } from '../utils/validation' import { Event } from '../@types/event' import { eventSchema } from '../schemas/event-schema' import { EventTags } from '../constants/base' +import { IEventRepository } from '../@types/repositories' const DEFAULT_BATCH_SIZE = 1000 -const REPLACEABLE_EVENT_CONFLICT_TARGET = - '(event_pubkey, event_kind, event_deduplication) ' - + 'WHERE (event_kind = 0 OR event_kind = 3 OR event_kind = 41 ' - + 'OR (event_kind >= 10000 AND event_kind < 20000)) ' - + 'OR (event_kind >= 30000 AND event_kind < 40000)' - -interface ImportEventRow { - deleted_at: null - event_content: string - event_created_at: number - event_deduplication: string | null - event_id: Buffer - event_kind: number - event_pubkey: Buffer - event_signature: Buffer - event_tags: string - expires_at: number | null -} - export interface EventImportStats { errors: number inserted: number @@ -67,276 +44,48 @@ const getErrorMessage = (error: unknown): string => { return String(error) } -const getAffectedRowCount = (result: unknown): number => { - if (Array.isArray(result)) { - return result.length - } - - if ( - typeof result === 'object' - && result !== null - && 'rowCount' in result - && typeof (result as { rowCount: unknown }).rowCount === 'number' - ) { - return Number((result as { rowCount: number }).rowCount) - } - - return 0 -} - -const isEventIdUniqueViolation = (error: unknown): boolean => { - if (typeof error !== 'object' || error === null) { - return false - } - - const dbError = error as { - code?: string - constraint?: string - message?: string - } - - return dbError.code === '23505' - && ( - dbError.constraint === 'events_event_id_unique' - || dbError.message?.includes('events_event_id_unique') === true - ) -} - -const isValidDeleteTag = (tag: string[]): boolean => { - return tag.length >= 2 - && tag[0] === EventTags.Event - && /^[0-9a-f]{64}$/.test(tag[1]) -} - -const getDeleteTargetEventIds = (event: Event): EventId[] => { - return event.tags.reduce((eventIds, tag) => { - if (isValidDeleteTag(tag)) { - eventIds.push(tag[1]) - } - - return eventIds - }, [] as EventId[]) -} - -const isEventReplaceableForStorage = (event: Event): boolean => { - return isReplaceableEvent(event) || isParameterizedReplaceableEvent(event) -} - -const getReplaceableEventDeduplication = (event: Event): string => { - if (isParameterizedReplaceableEvent(event)) { - const [, ...deduplication] = event.tags.find( - (tag) => tag.length >= 2 && tag[0] === EventTags.Deduplication, - ) ?? [null, ''] - - return toJSON(deduplication) - } - - return toJSON([event.pubkey, event.kind]) -} - -const getReplaceableEventKey = (event: Event): string => { - return `${event.pubkey}:${event.kind}:${getReplaceableEventDeduplication(event)}` -} - -const toImportEventRow = (event: Event): ImportEventRow => { - const expiresAt = getEventExpiration(event) - - return { - deleted_at: null, - event_content: event.content, - event_created_at: event.created_at, - event_deduplication: ( - isReplaceableEvent(event) || isParameterizedReplaceableEvent(event) - ? getReplaceableEventDeduplication(event) - : null - ), - event_id: toBuffer(event.id), - event_kind: event.kind, - event_pubkey: toBuffer(event.pubkey), - event_signature: toBuffer(event.sig), - event_tags: toJSON(event.tags), - expires_at: typeof expiresAt === 'number' ? expiresAt : null, - } -} - -const applyDeleteEvents = async ( - transaction: Knex.Transaction, - deleteEvent: Event, -): Promise => { - const eventIds = getDeleteTargetEventIds(deleteEvent) - if (!eventIds.length) { - return - } - - await transaction('events') - .where('event_pubkey', toBuffer(deleteEvent.pubkey)) - .whereIn('event_id', eventIds.map(toBuffer)) - .whereNull('deleted_at') - .update({ - deleted_at: transaction.raw('now()'), - }) -} - -const insertRegularEvents = async ( - transaction: Knex.Transaction, - events: Event[], -): Promise => { - if (!events.length) { - return 0 - } - - const rows = events.map(toImportEventRow) - - const result = await transaction('events') - .insert(rows) - .onConflict() - .ignore() - .returning('event_id') - - return getAffectedRowCount(result) -} - -const filterOutExistingEventIds = async ( - transaction: Knex.Transaction, - events: Event[], -): Promise => { - if (!events.length) { - return [] - } - - const existingRows = await transaction('events') - .select('event_id') - .whereIn('event_id', events.map((event) => toBuffer(event.id))) as Array<{ event_id: Buffer }> - - const existingEventIds = new Set(existingRows.map((row) => row.event_id.toString('hex'))) - - return events.filter((event) => !existingEventIds.has(event.id)) -} - -const upsertReplaceableEvents = async ( - transaction: Knex.Transaction, - events: Event[], -): Promise => { - if (!events.length) { - return 0 - } - - let pendingEvents = events - - while (pendingEvents.length) { - const deduplicatedByEventId = new Map() - for (const event of pendingEvents) { - deduplicatedByEventId.set(event.id, event) - } - - pendingEvents = Array.from(deduplicatedByEventId.values()) - - const rows = pendingEvents.map(toImportEventRow) - - try { - const result = await transaction('events') - .insert(rows) - .onConflict(transaction.raw(REPLACEABLE_EVENT_CONFLICT_TARGET)) - .merge([ - 'deleted_at', - 'event_content', - 'event_created_at', - 'event_id', - 'event_signature', - 'event_tags', - 'expires_at', - ]) - .whereRaw('"events"."event_created_at" < "excluded"."event_created_at"') - .returning('event_id') - - return getAffectedRowCount(result) - } catch (error) { - if (!isEventIdUniqueViolation(error)) { - throw error - } - - const filteredEvents = await filterOutExistingEventIds(transaction, pendingEvents) - - if (filteredEvents.length === pendingEvents.length) { - throw error - } - - pendingEvents = filteredEvents - } - } - - return 0 -} - export const createEventBatchPersister = - (dbClient: DatabaseClient) => + (eventRepository: IEventRepository) => async (events: Event[]): Promise => { if (!events.length) { return 0 } - return dbClient.transaction(async (transaction) => { - let inserted = 0 + let inserted = 0 - let nonDeleteSegment: Event[] = [] - - const flushNonDeleteSegment = async () => { - if (!nonDeleteSegment.length) { - return - } - - const regularEvents: Event[] = [] - const replaceableEventsByKey = new Map() - - for (const event of nonDeleteSegment) { - if (isEventReplaceableForStorage(event)) { - const deduplicationKey = getReplaceableEventKey(event) - const existingEvent = replaceableEventsByKey.get(deduplicationKey) - - if (!existingEvent || existingEvent.created_at < event.created_at) { - replaceableEventsByKey.set(deduplicationKey, event) - } - - continue - } - - regularEvents.push(event) - } - - inserted += await insertRegularEvents(transaction, regularEvents) + for (const event of events) { + if (isEphemeralEvent(event)) { + continue + } - const upsertEvents = await filterOutExistingEventIds( - transaction, - Array.from(replaceableEventsByKey.values()), + if (isDeleteEvent(event)) { + const eventIdsToDelete = event.tags.reduce( + (ids, tag) => + tag.length >= 2 + && tag[0] === EventTags.Event + && /^[0-9a-f]{64}$/.test(tag[1]) + ? [...ids, tag[1]] + : ids, + [] as string[] ) - inserted += await upsertReplaceableEvents(transaction, upsertEvents) - - nonDeleteSegment = [] - } - - for (const event of events) { - if (isEphemeralEvent(event)) { - continue + if (eventIdsToDelete.length) { + await eventRepository.deleteByPubkeyAndIds(event.pubkey, eventIdsToDelete) } - if (isDeleteEvent(event)) { - await flushNonDeleteSegment() - - await applyDeleteEvents(transaction, event) - - inserted += await insertRegularEvents(transaction, [event]) - - continue - } + inserted += await eventRepository.create(event) + continue + } - nonDeleteSegment.push(event) + if (isReplaceableEvent(event) || isParameterizedReplaceableEvent(event)) { + inserted += await eventRepository.upsert(event) + continue } - await flushNonDeleteSegment() + inserted += await eventRepository.create(event) + } - return inserted - }) + return inserted } export class EventImportService { From 2dd115a606aabcc3fe9754b3e28b91fcb7922022 Mon Sep 17 00:00:00 2001 From: Anshuman Singh Date: Thu, 9 Apr 2026 22:20:51 +0530 Subject: [PATCH 7/8] fix: enrich event metadata before persistence in import service --- src/@types/repositories.ts | 94 ++--- src/import-events.ts | 2 +- src/repositories/event-repository.ts | 549 ++++++++++++++------------- src/services/event-import-service.ts | 40 +- 4 files changed, 380 insertions(+), 305 deletions(-) diff --git a/src/@types/repositories.ts b/src/@types/repositories.ts index 671dfaee..88c756b7 100644 --- a/src/@types/repositories.ts +++ b/src/@types/repositories.ts @@ -1,46 +1,48 @@ -import { PassThrough } from 'stream' - -import { DatabaseClient, EventId, Pubkey } from './base' -import { DBEvent, Event } from './event' -import { Invoice } from './invoice' -import { SubscriptionFilter } from './subscription' -import { User } from './user' - -export type ExposedPromiseKeys = 'then' | 'catch' | 'finally' - -export interface IQueryResult extends Pick, keyof Promise & ExposedPromiseKeys> { - stream(options?: Record): PassThrough & AsyncIterable -} - -export interface IEventRepository { - create(event: Event): Promise - upsert(event: Event): Promise - findByFilters(filters: SubscriptionFilter[]): IQueryResult - deleteByPubkeyAndIds(pubkey: Pubkey, ids: EventId[]): Promise -} - -export interface IInvoiceRepository { - findById(id: string, client?: DatabaseClient): Promise - upsert(invoice: Partial, client?: DatabaseClient): Promise - updateStatus( - invoice: Pick, - client?: DatabaseClient, - ): Promise - confirmInvoice( - invoiceId: string, - amountReceived: bigint, - confirmedAt: Date, - client?: DatabaseClient, - ): Promise - findPendingInvoices( - offset?: number, - limit?: number, - client?: DatabaseClient, - ): Promise -} - -export interface IUserRepository { - findByPubkey(pubkey: Pubkey, client?: DatabaseClient): Promise - upsert(user: Partial, client?: DatabaseClient): Promise - getBalanceByPubkey(pubkey: Pubkey, client?: DatabaseClient): Promise -} +import { PassThrough } from 'stream' + +import { DatabaseClient, EventId, Pubkey } from './base' +import { DBEvent, Event } from './event' +import { Invoice } from './invoice' +import { SubscriptionFilter } from './subscription' +import { User } from './user' + +export type ExposedPromiseKeys = 'then' | 'catch' | 'finally' + +export interface IQueryResult extends Pick, keyof Promise & ExposedPromiseKeys> { + stream(options?: Record): PassThrough & AsyncIterable +} + +export interface IEventRepository { + create(event: Event): Promise + createMany(events: Event[]): Promise + upsert(event: Event): Promise + upsertMany(events: Event[]): Promise + findByFilters(filters: SubscriptionFilter[]): IQueryResult + deleteByPubkeyAndIds(pubkey: Pubkey, ids: EventId[]): Promise +} + +export interface IInvoiceRepository { + findById(id: string, client?: DatabaseClient): Promise + upsert(invoice: Partial, client?: DatabaseClient): Promise + updateStatus( + invoice: Pick, + client?: DatabaseClient, + ): Promise + confirmInvoice( + invoiceId: string, + amountReceived: bigint, + confirmedAt: Date, + client?: DatabaseClient, + ): Promise + findPendingInvoices( + offset?: number, + limit?: number, + client?: DatabaseClient, + ): Promise +} + +export interface IUserRepository { + findByPubkey(pubkey: Pubkey, client?: DatabaseClient): Promise + upsert(user: Partial, client?: DatabaseClient): Promise + getBalanceByPubkey(pubkey: Pubkey, client?: DatabaseClient): Promise +} diff --git a/src/import-events.ts b/src/import-events.ts index 52fefa58..e6392a75 100644 --- a/src/import-events.ts +++ b/src/import-events.ts @@ -12,8 +12,8 @@ import { EventImportService, EventImportStats, } from './services/event-import-service' -import { getMasterDbClient } from './database/client' import { EventRepository } from './repositories/event-repository' +import { getMasterDbClient } from './database/client' interface CliOptions { batchSize: number diff --git a/src/repositories/event-repository.ts b/src/repositories/event-repository.ts index 9c645b8a..8ef5d3d6 100644 --- a/src/repositories/event-repository.ts +++ b/src/repositories/event-repository.ts @@ -1,254 +1,295 @@ -import { - __, - always, - applySpec, - complement, - cond, - equals, - evolve, - filter, - forEach, - forEachObjIndexed, - groupBy, - ifElse, - invoker, - is, - isEmpty, - isNil, - map, - modulo, - nth, - omit, - path, - paths, - pipe, - prop, - propSatisfies, - T, - toPairs, -} from 'ramda' - -import { ContextMetadataKey, EventDeduplicationMetadataKey, EventExpirationTimeMetadataKey } from '../constants/base' -import { DatabaseClient, EventId } from '../@types/base' -import { DBEvent, Event } from '../@types/event' -import { IEventRepository, IQueryResult } from '../@types/repositories' -import { toBuffer, toJSON } from '../utils/transform' -import { createLogger } from '../factories/logger-factory' -import { isGenericTagQuery } from '../utils/filter' -import { SubscriptionFilter } from '../@types/subscription' - -const even = pipe(modulo(__, 2), equals(0)) - -const groupByLengthSpec = groupBy( - pipe( - prop('length'), - cond([ - [equals(64), always('exact')], - [even, always('even')], - [T, always('odd')], - ]) - ) -) - -const debug = createLogger('event-repository') - -export class EventRepository implements IEventRepository { - public constructor( - private readonly masterDbClient: DatabaseClient, - private readonly readReplicaDbClient: DatabaseClient, - ) { } - - public findByFilters(filters: SubscriptionFilter[]): IQueryResult { - debug('querying for %o', filters) - if (!Array.isArray(filters) || !filters.length) { - throw new Error('Filters cannot be empty') - } - const queries = filters.map((currentFilter) => { - const builder = this.readReplicaDbClient('events') - - forEachObjIndexed((tableFields: string[], filterName: string | number) => { - builder.andWhere((bd) => { - cond([ - [isEmpty, () => void bd.whereRaw('1 = 0')], - [ - complement(isNil), - pipe( - groupByLengthSpec, - evolve({ - exact: (pubkeys: string[]) => - tableFields.forEach((tableField) => - bd.orWhereIn(tableField, pubkeys.map(toBuffer)) - ), - even: forEach((prefix: string) => - tableFields.forEach((tableField) => - bd.orWhereRaw( - `substring("${tableField}" from 1 for ?) = ?`, - [prefix.length >> 1, toBuffer(prefix)] - ) - ) - ), - odd: forEach((prefix: string) => - tableFields.forEach((tableField) => - bd.orWhereRaw( - `substring("${tableField}" from 1 for ?) BETWEEN ? AND ?`, - [ - (prefix.length >> 1) + 1, - `\\x${prefix}0`, - `\\x${prefix}f`, - ], - ) - ) - ), - } as any), - ), - ], - ])(currentFilter[filterName] as string[]) - }) - })({ - authors: ['event_pubkey'], - ids: ['event_id'], - }) - - if (Array.isArray(currentFilter.kinds)) { - builder.whereIn('event_kind', currentFilter.kinds) - } - - if (typeof currentFilter.since === 'number') { - builder.where('event_created_at', '>=', currentFilter.since) - } - - if (typeof currentFilter.until === 'number') { - builder.where('event_created_at', '<=', currentFilter.until) - } - - if (typeof currentFilter.limit === 'number') { - builder.limit(currentFilter.limit).orderBy('event_created_at', 'DESC') - } else { - builder.limit(500).orderBy('event_created_at', 'asc') - } - - const andWhereRaw = invoker(1, 'andWhereRaw') - const orWhereRaw = invoker(2, 'orWhereRaw') - - let isTagQuery = false - pipe( - toPairs, - filter(pipe(nth(0) as () => string, isGenericTagQuery)) as any, - forEach(([filterName, criteria]: [string, string[]]) => { - isTagQuery = true - builder.andWhere((bd) => { - ifElse( - isEmpty, - () => andWhereRaw('1 = 0', bd), - forEach((criterion: string) => void orWhereRaw( - 'event_tags.tag_name = ? AND event_tags.tag_value = ?', - [filterName[1], criterion], - bd, - )), - )(criteria) - }) - }), - )(currentFilter as any) - - if (isTagQuery) { - builder.leftJoin('event_tags', 'events.event_id', 'event_tags.event_id') - .select('events.*') - } - - return builder - }) - - const [query, ...subqueries] = queries - if (subqueries.length) { - query.union(subqueries, true) - } - - return query - } - - public async create(event: Event): Promise { - return this.insert(event).then(prop('rowCount') as () => number, () => 0) - } - - private insert(event: Event) { - debug('inserting event: %o', event) - const row = applySpec({ - event_id: pipe(prop('id'), toBuffer), - event_pubkey: pipe(prop('pubkey'), toBuffer), - event_created_at: prop('created_at'), - event_kind: prop('kind'), - event_tags: pipe(prop('tags'), toJSON), - event_content: prop('content'), - event_signature: pipe(prop('sig'), toBuffer), - remote_address: path([ContextMetadataKey as any, 'remoteAddress', 'address']), - expires_at: ifElse( - propSatisfies(is(Number), EventExpirationTimeMetadataKey), - prop(EventExpirationTimeMetadataKey as any), - always(null), - ), - })(event) - - return this.masterDbClient('events') - .insert(row) - .onConflict() - .ignore() - } - - public upsert(event: Event): Promise { - debug('upserting event: %o', event) - - const toJSON = (input: any) => JSON.stringify(input) - - const row = applySpec({ - event_id: pipe(prop('id'), toBuffer), - event_pubkey: pipe(prop('pubkey'), toBuffer), - event_created_at: prop('created_at'), - event_kind: prop('kind'), - event_tags: pipe(prop('tags'), toJSON), - event_content: prop('content'), - event_signature: pipe(prop('sig'), toBuffer), - event_deduplication: ifElse( - propSatisfies(isNil, EventDeduplicationMetadataKey), - pipe(paths([['pubkey'], ['kind']]), toJSON), - pipe(prop(EventDeduplicationMetadataKey as any), toJSON), - ), - remote_address: path([ContextMetadataKey as any, 'remoteAddress', 'address']), - expires_at: ifElse( - propSatisfies(is(Number), EventExpirationTimeMetadataKey), - prop(EventExpirationTimeMetadataKey as any), - always(null), - ), - deleted_at: always(null), - })(event) - - const query = this.masterDbClient('events') - .insert(row) - // NIP-16: Replaceable Events - // NIP-33: Parameterized Replaceable Events - .onConflict( - this.masterDbClient.raw( - '(event_pubkey, event_kind, event_deduplication) WHERE (event_kind = 0 OR event_kind = 3 OR event_kind = 41 OR (event_kind >= 10000 AND event_kind < 20000)) OR (event_kind >= 30000 AND event_kind < 40000)' - ) - ) - .merge(omit(['event_pubkey', 'event_kind', 'event_deduplication'])(row)) - .where('events.event_created_at', '<', row.event_created_at) - - return { - then: (onfulfilled: (value: number) => T1 | PromiseLike, onrejected: (reason: any) => T2 | PromiseLike) => query.then(prop('rowCount') as () => number).then(onfulfilled, onrejected), - catch: (onrejected: (reason: any) => T | PromiseLike) => query.catch(onrejected), - toString: (): string => query.toString(), - } as Promise - } - - public deleteByPubkeyAndIds(pubkey: string, eventIdsToDelete: EventId[]): Promise { - debug('deleting events from %s: %o', pubkey, eventIdsToDelete) - - return this.masterDbClient('events') - .where('event_pubkey', toBuffer(pubkey)) - .whereIn('event_id', map(toBuffer)(eventIdsToDelete)) - .whereNull('deleted_at') - .update({ - deleted_at: this.masterDbClient.raw('now()'), - }) - } -} +import { + __, + always, + applySpec, + complement, + cond, + equals, + evolve, + filter, + forEach, + forEachObjIndexed, + groupBy, + ifElse, + invoker, + is, + isEmpty, + isNil, + map, + modulo, + nth, + omit, + path, + paths, + pipe, + prop, + propSatisfies, + T, + toPairs, +} from 'ramda' + +import { ContextMetadataKey, EventDeduplicationMetadataKey, EventExpirationTimeMetadataKey } from '../constants/base' +import { DatabaseClient, EventId } from '../@types/base' +import { DBEvent, Event } from '../@types/event' +import { IEventRepository, IQueryResult } from '../@types/repositories' +import { toBuffer, toJSON } from '../utils/transform' +import { createLogger } from '../factories/logger-factory' +import { isGenericTagQuery } from '../utils/filter' +import { SubscriptionFilter } from '../@types/subscription' + +const even = pipe(modulo(__, 2), equals(0)) + +const groupByLengthSpec = groupBy( + pipe( + prop('length'), + cond([ + [equals(64), always('exact')], + [even, always('even')], + [T, always('odd')], + ]) + ) +) + +const debug = createLogger('event-repository') + +export class EventRepository implements IEventRepository { + public constructor( + private readonly masterDbClient: DatabaseClient, + private readonly readReplicaDbClient: DatabaseClient, + ) { } + + public findByFilters(filters: SubscriptionFilter[]): IQueryResult { + debug('querying for %o', filters) + if (!Array.isArray(filters) || !filters.length) { + throw new Error('Filters cannot be empty') + } + const queries = filters.map((currentFilter) => { + const builder = this.readReplicaDbClient('events') + + forEachObjIndexed((tableFields: string[], filterName: string | number) => { + builder.andWhere((bd) => { + cond([ + [isEmpty, () => void bd.whereRaw('1 = 0')], + [ + complement(isNil), + pipe( + groupByLengthSpec, + evolve({ + exact: (pubkeys: string[]) => + tableFields.forEach((tableField) => + bd.orWhereIn(tableField, pubkeys.map(toBuffer)) + ), + even: forEach((prefix: string) => + tableFields.forEach((tableField) => + bd.orWhereRaw( + `substring("${tableField}" from 1 for ?) = ?`, + [prefix.length >> 1, toBuffer(prefix)] + ) + ) + ), + odd: forEach((prefix: string) => + tableFields.forEach((tableField) => + bd.orWhereRaw( + `substring("${tableField}" from 1 for ?) BETWEEN ? AND ?`, + [ + (prefix.length >> 1) + 1, + `\\x${prefix}0`, + `\\x${prefix}f`, + ], + ) + ) + ), + } as any), + ), + ], + ])(currentFilter[filterName] as string[]) + }) + })({ + authors: ['event_pubkey'], + ids: ['event_id'], + }) + + if (Array.isArray(currentFilter.kinds)) { + builder.whereIn('event_kind', currentFilter.kinds) + } + + if (typeof currentFilter.since === 'number') { + builder.where('event_created_at', '>=', currentFilter.since) + } + + if (typeof currentFilter.until === 'number') { + builder.where('event_created_at', '<=', currentFilter.until) + } + + if (typeof currentFilter.limit === 'number') { + builder.limit(currentFilter.limit).orderBy('event_created_at', 'DESC') + } else { + builder.limit(500).orderBy('event_created_at', 'asc') + } + + const andWhereRaw = invoker(1, 'andWhereRaw') + const orWhereRaw = invoker(2, 'orWhereRaw') + + let isTagQuery = false + pipe( + toPairs, + filter(pipe(nth(0) as () => string, isGenericTagQuery)) as any, + forEach(([filterName, criteria]: [string, string[]]) => { + isTagQuery = true + builder.andWhere((bd) => { + ifElse( + isEmpty, + () => andWhereRaw('1 = 0', bd), + forEach((criterion: string) => void orWhereRaw( + 'event_tags.tag_name = ? AND event_tags.tag_value = ?', + [filterName[1], criterion], + bd, + )), + )(criteria) + }) + }), + )(currentFilter as any) + + if (isTagQuery) { + builder.leftJoin('event_tags', 'events.event_id', 'event_tags.event_id') + .select('events.*') + } + + return builder + }) + + const [query, ...subqueries] = queries + if (subqueries.length) { + query.union(subqueries, true) + } + + return query + } + + public async create(event: Event): Promise { + return this.insert(event).then(prop('rowCount') as () => number, () => 0) + } + + public async createMany(events: Event[]): Promise { + if (!events.length) { + return 0 + } + + const rows = events.map((event) => this.toInsertRow(event)) + + return this.masterDbClient('events') + .insert(rows) + .onConflict() + .ignore() + .then(prop('rowCount') as () => number, () => 0) + } + + private toInsertRow(event: Event) { + return applySpec({ + event_id: pipe(prop('id'), toBuffer), + event_pubkey: pipe(prop('pubkey'), toBuffer), + event_created_at: prop('created_at'), + event_kind: prop('kind'), + event_tags: pipe(prop('tags'), toJSON), + event_content: prop('content'), + event_signature: pipe(prop('sig'), toBuffer), + remote_address: path([ContextMetadataKey as any, 'remoteAddress', 'address']), + expires_at: ifElse( + propSatisfies(is(Number), EventExpirationTimeMetadataKey), + prop(EventExpirationTimeMetadataKey as any), + always(null), + ), + })(event) + } + + private insert(event: Event) { + debug('inserting event: %o', event) + const row = this.toInsertRow(event) + + return this.masterDbClient('events') + .insert(row) + .onConflict() + .ignore() + } + + public upsert(event: Event): Promise { + debug('upserting event: %o', event) + + const row = this.toUpsertRow(event) + + const query = this.masterDbClient('events') + .insert(row) + // NIP-16: Replaceable Events + // NIP-33: Parameterized Replaceable Events + .onConflict( + this.masterDbClient.raw( + '(event_pubkey, event_kind, event_deduplication) WHERE (event_kind = 0 OR event_kind = 3 OR event_kind = 41 OR (event_kind >= 10000 AND event_kind < 20000)) OR (event_kind >= 30000 AND event_kind < 40000)' + ) + ) + .merge(omit(['event_pubkey', 'event_kind', 'event_deduplication'])(row)) + .where('events.event_created_at', '<', row.event_created_at) + + return { + then: (onfulfilled: (value: number) => T1 | PromiseLike, onrejected: (reason: any) => T2 | PromiseLike) => query.then(prop('rowCount') as () => number).then(onfulfilled, onrejected), + catch: (onrejected: (reason: any) => T | PromiseLike) => query.catch(onrejected), + toString: (): string => query.toString(), + } as Promise + } + + public async upsertMany(events: Event[]): Promise { + if (!events.length) { + return 0 + } + + const rows = events.map((event) => this.toUpsertRow(event)) + + return this.masterDbClient('events') + .insert(rows) + .onConflict( + this.masterDbClient.raw( + '(event_pubkey, event_kind, event_deduplication) WHERE (event_kind = 0 OR event_kind = 3 OR event_kind = 41 OR (event_kind >= 10000 AND event_kind < 20000)) OR (event_kind >= 30000 AND event_kind < 40000)' + ) + ) + .merge(['deleted_at', 'event_content', 'event_created_at', 'event_id', 'event_signature', 'event_tags', 'expires_at']) + .whereRaw('"events"."event_created_at" < "excluded"."event_created_at"') + .then(prop('rowCount') as () => number, () => 0) + } + + private toUpsertRow(event: Event) { + const toJSON = (input: any) => JSON.stringify(input) + + return applySpec({ + event_id: pipe(prop('id'), toBuffer), + event_pubkey: pipe(prop('pubkey'), toBuffer), + event_created_at: prop('created_at'), + event_kind: prop('kind'), + event_tags: pipe(prop('tags'), toJSON), + event_content: prop('content'), + event_signature: pipe(prop('sig'), toBuffer), + event_deduplication: ifElse( + propSatisfies(isNil, EventDeduplicationMetadataKey), + pipe(paths([['pubkey'], ['kind']]), toJSON), + pipe(prop(EventDeduplicationMetadataKey as any), toJSON), + ), + remote_address: path([ContextMetadataKey as any, 'remoteAddress', 'address']), + expires_at: ifElse( + propSatisfies(is(Number), EventExpirationTimeMetadataKey), + prop(EventExpirationTimeMetadataKey as any), + always(null), + ), + deleted_at: always(null), + })(event) + } + + public deleteByPubkeyAndIds(pubkey: string, eventIdsToDelete: EventId[]): Promise { + debug('deleting events from %s: %o', pubkey, eventIdsToDelete) + + return this.masterDbClient('events') + .where('event_pubkey', toBuffer(pubkey)) + .whereIn('event_id', map(toBuffer)(eventIdsToDelete)) + .whereNull('deleted_at') + .update({ + deleted_at: this.masterDbClient.raw('now()'), + }) + } +} diff --git a/src/services/event-import-service.ts b/src/services/event-import-service.ts index 3945ed27..2b8590e3 100644 --- a/src/services/event-import-service.ts +++ b/src/services/event-import-service.ts @@ -2,6 +2,7 @@ import fs from 'fs' import readline from 'readline' import { + getEventExpiration, isDeleteEvent, isEphemeralEvent, isEventIdValid, @@ -11,11 +12,29 @@ import { } from '../utils/event' import { attemptValidation } from '../utils/validation' +import { EventDeduplicationMetadataKey, EventExpirationTimeMetadataKey, EventTags } from '../constants/base' import { Event } from '../@types/event' import { eventSchema } from '../schemas/event-schema' -import { EventTags } from '../constants/base' import { IEventRepository } from '../@types/repositories' +const enrichEventMetadata = (event: Event): Event => { + let enriched: any = event + + const expiration = getEventExpiration(event) + if (expiration) { + enriched = { ...enriched, [EventExpirationTimeMetadataKey]: expiration } + } + + if (isParameterizedReplaceableEvent(event)) { + const [, ...deduplication] = event.tags.find( + (tag) => tag.length >= 2 && tag[0] === EventTags.Deduplication, + ) ?? [null, ''] + enriched = { ...enriched, [EventDeduplicationMetadataKey]: deduplication } + } + + return enriched as Event +} + const DEFAULT_BATCH_SIZE = 1000 export interface EventImportStats { @@ -53,12 +72,19 @@ export const createEventBatchPersister = let inserted = 0 + const regularEvents: Event[] = [] + const replaceableEvents: Event[] = [] + for (const event of events) { if (isEphemeralEvent(event)) { continue } if (isDeleteEvent(event)) { + // flush pending batches before applying deletes + inserted += await eventRepository.createMany(regularEvents.splice(0)) + inserted += await eventRepository.upsertMany(replaceableEvents.splice(0)) + const eventIdsToDelete = event.tags.reduce( (ids, tag) => tag.length >= 2 @@ -73,18 +99,24 @@ export const createEventBatchPersister = await eventRepository.deleteByPubkeyAndIds(event.pubkey, eventIdsToDelete) } - inserted += await eventRepository.create(event) + inserted += await eventRepository.create(enrichEventMetadata(event)) continue } + const enrichedEvent = enrichEventMetadata(event) + if (isReplaceableEvent(event) || isParameterizedReplaceableEvent(event)) { - inserted += await eventRepository.upsert(event) + replaceableEvents.push(enrichedEvent) continue } - inserted += await eventRepository.create(event) + regularEvents.push(enrichedEvent) } + // flush remaining + inserted += await eventRepository.createMany(regularEvents) + inserted += await eventRepository.upsertMany(replaceableEvents) + return inserted } From c076a6afff4b40bf29853a38dfb18a9386ce5626 Mon Sep 17 00:00:00 2001 From: Anshuman Singh Date: Thu, 9 Apr 2026 22:38:58 +0530 Subject: [PATCH 8/8] fix: remove double onProgress call at end of import --- src/@types/repositories.ts | 96 +-- src/repositories/event-repository.ts | 590 +++++++++--------- src/services/event-import-service.ts | 1 - .../services/event-import-service.spec.ts | 2 + 4 files changed, 345 insertions(+), 344 deletions(-) diff --git a/src/@types/repositories.ts b/src/@types/repositories.ts index 88c756b7..86c91b4b 100644 --- a/src/@types/repositories.ts +++ b/src/@types/repositories.ts @@ -1,48 +1,48 @@ -import { PassThrough } from 'stream' - -import { DatabaseClient, EventId, Pubkey } from './base' -import { DBEvent, Event } from './event' -import { Invoice } from './invoice' -import { SubscriptionFilter } from './subscription' -import { User } from './user' - -export type ExposedPromiseKeys = 'then' | 'catch' | 'finally' - -export interface IQueryResult extends Pick, keyof Promise & ExposedPromiseKeys> { - stream(options?: Record): PassThrough & AsyncIterable -} - -export interface IEventRepository { - create(event: Event): Promise - createMany(events: Event[]): Promise - upsert(event: Event): Promise - upsertMany(events: Event[]): Promise - findByFilters(filters: SubscriptionFilter[]): IQueryResult - deleteByPubkeyAndIds(pubkey: Pubkey, ids: EventId[]): Promise -} - -export interface IInvoiceRepository { - findById(id: string, client?: DatabaseClient): Promise - upsert(invoice: Partial, client?: DatabaseClient): Promise - updateStatus( - invoice: Pick, - client?: DatabaseClient, - ): Promise - confirmInvoice( - invoiceId: string, - amountReceived: bigint, - confirmedAt: Date, - client?: DatabaseClient, - ): Promise - findPendingInvoices( - offset?: number, - limit?: number, - client?: DatabaseClient, - ): Promise -} - -export interface IUserRepository { - findByPubkey(pubkey: Pubkey, client?: DatabaseClient): Promise - upsert(user: Partial, client?: DatabaseClient): Promise - getBalanceByPubkey(pubkey: Pubkey, client?: DatabaseClient): Promise -} +import { PassThrough } from 'stream' + +import { DatabaseClient, EventId, Pubkey } from './base' +import { DBEvent, Event } from './event' +import { Invoice } from './invoice' +import { SubscriptionFilter } from './subscription' +import { User } from './user' + +export type ExposedPromiseKeys = 'then' | 'catch' | 'finally' + +export interface IQueryResult extends Pick, keyof Promise & ExposedPromiseKeys> { + stream(options?: Record): PassThrough & AsyncIterable +} + +export interface IEventRepository { + create(event: Event): Promise + createMany(events: Event[]): Promise + upsert(event: Event): Promise + upsertMany(events: Event[]): Promise + findByFilters(filters: SubscriptionFilter[]): IQueryResult + deleteByPubkeyAndIds(pubkey: Pubkey, ids: EventId[]): Promise +} + +export interface IInvoiceRepository { + findById(id: string, client?: DatabaseClient): Promise + upsert(invoice: Partial, client?: DatabaseClient): Promise + updateStatus( + invoice: Pick, + client?: DatabaseClient, + ): Promise + confirmInvoice( + invoiceId: string, + amountReceived: bigint, + confirmedAt: Date, + client?: DatabaseClient, + ): Promise + findPendingInvoices( + offset?: number, + limit?: number, + client?: DatabaseClient, + ): Promise +} + +export interface IUserRepository { + findByPubkey(pubkey: Pubkey, client?: DatabaseClient): Promise + upsert(user: Partial, client?: DatabaseClient): Promise + getBalanceByPubkey(pubkey: Pubkey, client?: DatabaseClient): Promise +} diff --git a/src/repositories/event-repository.ts b/src/repositories/event-repository.ts index 8ef5d3d6..6e848178 100644 --- a/src/repositories/event-repository.ts +++ b/src/repositories/event-repository.ts @@ -1,295 +1,295 @@ -import { - __, - always, - applySpec, - complement, - cond, - equals, - evolve, - filter, - forEach, - forEachObjIndexed, - groupBy, - ifElse, - invoker, - is, - isEmpty, - isNil, - map, - modulo, - nth, - omit, - path, - paths, - pipe, - prop, - propSatisfies, - T, - toPairs, -} from 'ramda' - -import { ContextMetadataKey, EventDeduplicationMetadataKey, EventExpirationTimeMetadataKey } from '../constants/base' -import { DatabaseClient, EventId } from '../@types/base' -import { DBEvent, Event } from '../@types/event' -import { IEventRepository, IQueryResult } from '../@types/repositories' -import { toBuffer, toJSON } from '../utils/transform' -import { createLogger } from '../factories/logger-factory' -import { isGenericTagQuery } from '../utils/filter' -import { SubscriptionFilter } from '../@types/subscription' - -const even = pipe(modulo(__, 2), equals(0)) - -const groupByLengthSpec = groupBy( - pipe( - prop('length'), - cond([ - [equals(64), always('exact')], - [even, always('even')], - [T, always('odd')], - ]) - ) -) - -const debug = createLogger('event-repository') - -export class EventRepository implements IEventRepository { - public constructor( - private readonly masterDbClient: DatabaseClient, - private readonly readReplicaDbClient: DatabaseClient, - ) { } - - public findByFilters(filters: SubscriptionFilter[]): IQueryResult { - debug('querying for %o', filters) - if (!Array.isArray(filters) || !filters.length) { - throw new Error('Filters cannot be empty') - } - const queries = filters.map((currentFilter) => { - const builder = this.readReplicaDbClient('events') - - forEachObjIndexed((tableFields: string[], filterName: string | number) => { - builder.andWhere((bd) => { - cond([ - [isEmpty, () => void bd.whereRaw('1 = 0')], - [ - complement(isNil), - pipe( - groupByLengthSpec, - evolve({ - exact: (pubkeys: string[]) => - tableFields.forEach((tableField) => - bd.orWhereIn(tableField, pubkeys.map(toBuffer)) - ), - even: forEach((prefix: string) => - tableFields.forEach((tableField) => - bd.orWhereRaw( - `substring("${tableField}" from 1 for ?) = ?`, - [prefix.length >> 1, toBuffer(prefix)] - ) - ) - ), - odd: forEach((prefix: string) => - tableFields.forEach((tableField) => - bd.orWhereRaw( - `substring("${tableField}" from 1 for ?) BETWEEN ? AND ?`, - [ - (prefix.length >> 1) + 1, - `\\x${prefix}0`, - `\\x${prefix}f`, - ], - ) - ) - ), - } as any), - ), - ], - ])(currentFilter[filterName] as string[]) - }) - })({ - authors: ['event_pubkey'], - ids: ['event_id'], - }) - - if (Array.isArray(currentFilter.kinds)) { - builder.whereIn('event_kind', currentFilter.kinds) - } - - if (typeof currentFilter.since === 'number') { - builder.where('event_created_at', '>=', currentFilter.since) - } - - if (typeof currentFilter.until === 'number') { - builder.where('event_created_at', '<=', currentFilter.until) - } - - if (typeof currentFilter.limit === 'number') { - builder.limit(currentFilter.limit).orderBy('event_created_at', 'DESC') - } else { - builder.limit(500).orderBy('event_created_at', 'asc') - } - - const andWhereRaw = invoker(1, 'andWhereRaw') - const orWhereRaw = invoker(2, 'orWhereRaw') - - let isTagQuery = false - pipe( - toPairs, - filter(pipe(nth(0) as () => string, isGenericTagQuery)) as any, - forEach(([filterName, criteria]: [string, string[]]) => { - isTagQuery = true - builder.andWhere((bd) => { - ifElse( - isEmpty, - () => andWhereRaw('1 = 0', bd), - forEach((criterion: string) => void orWhereRaw( - 'event_tags.tag_name = ? AND event_tags.tag_value = ?', - [filterName[1], criterion], - bd, - )), - )(criteria) - }) - }), - )(currentFilter as any) - - if (isTagQuery) { - builder.leftJoin('event_tags', 'events.event_id', 'event_tags.event_id') - .select('events.*') - } - - return builder - }) - - const [query, ...subqueries] = queries - if (subqueries.length) { - query.union(subqueries, true) - } - - return query - } - - public async create(event: Event): Promise { - return this.insert(event).then(prop('rowCount') as () => number, () => 0) - } - - public async createMany(events: Event[]): Promise { - if (!events.length) { - return 0 - } - - const rows = events.map((event) => this.toInsertRow(event)) - - return this.masterDbClient('events') - .insert(rows) - .onConflict() - .ignore() - .then(prop('rowCount') as () => number, () => 0) - } - - private toInsertRow(event: Event) { - return applySpec({ - event_id: pipe(prop('id'), toBuffer), - event_pubkey: pipe(prop('pubkey'), toBuffer), - event_created_at: prop('created_at'), - event_kind: prop('kind'), - event_tags: pipe(prop('tags'), toJSON), - event_content: prop('content'), - event_signature: pipe(prop('sig'), toBuffer), - remote_address: path([ContextMetadataKey as any, 'remoteAddress', 'address']), - expires_at: ifElse( - propSatisfies(is(Number), EventExpirationTimeMetadataKey), - prop(EventExpirationTimeMetadataKey as any), - always(null), - ), - })(event) - } - - private insert(event: Event) { - debug('inserting event: %o', event) - const row = this.toInsertRow(event) - - return this.masterDbClient('events') - .insert(row) - .onConflict() - .ignore() - } - - public upsert(event: Event): Promise { - debug('upserting event: %o', event) - - const row = this.toUpsertRow(event) - - const query = this.masterDbClient('events') - .insert(row) - // NIP-16: Replaceable Events - // NIP-33: Parameterized Replaceable Events - .onConflict( - this.masterDbClient.raw( - '(event_pubkey, event_kind, event_deduplication) WHERE (event_kind = 0 OR event_kind = 3 OR event_kind = 41 OR (event_kind >= 10000 AND event_kind < 20000)) OR (event_kind >= 30000 AND event_kind < 40000)' - ) - ) - .merge(omit(['event_pubkey', 'event_kind', 'event_deduplication'])(row)) - .where('events.event_created_at', '<', row.event_created_at) - - return { - then: (onfulfilled: (value: number) => T1 | PromiseLike, onrejected: (reason: any) => T2 | PromiseLike) => query.then(prop('rowCount') as () => number).then(onfulfilled, onrejected), - catch: (onrejected: (reason: any) => T | PromiseLike) => query.catch(onrejected), - toString: (): string => query.toString(), - } as Promise - } - - public async upsertMany(events: Event[]): Promise { - if (!events.length) { - return 0 - } - - const rows = events.map((event) => this.toUpsertRow(event)) - - return this.masterDbClient('events') - .insert(rows) - .onConflict( - this.masterDbClient.raw( - '(event_pubkey, event_kind, event_deduplication) WHERE (event_kind = 0 OR event_kind = 3 OR event_kind = 41 OR (event_kind >= 10000 AND event_kind < 20000)) OR (event_kind >= 30000 AND event_kind < 40000)' - ) - ) - .merge(['deleted_at', 'event_content', 'event_created_at', 'event_id', 'event_signature', 'event_tags', 'expires_at']) - .whereRaw('"events"."event_created_at" < "excluded"."event_created_at"') - .then(prop('rowCount') as () => number, () => 0) - } - - private toUpsertRow(event: Event) { - const toJSON = (input: any) => JSON.stringify(input) - - return applySpec({ - event_id: pipe(prop('id'), toBuffer), - event_pubkey: pipe(prop('pubkey'), toBuffer), - event_created_at: prop('created_at'), - event_kind: prop('kind'), - event_tags: pipe(prop('tags'), toJSON), - event_content: prop('content'), - event_signature: pipe(prop('sig'), toBuffer), - event_deduplication: ifElse( - propSatisfies(isNil, EventDeduplicationMetadataKey), - pipe(paths([['pubkey'], ['kind']]), toJSON), - pipe(prop(EventDeduplicationMetadataKey as any), toJSON), - ), - remote_address: path([ContextMetadataKey as any, 'remoteAddress', 'address']), - expires_at: ifElse( - propSatisfies(is(Number), EventExpirationTimeMetadataKey), - prop(EventExpirationTimeMetadataKey as any), - always(null), - ), - deleted_at: always(null), - })(event) - } - - public deleteByPubkeyAndIds(pubkey: string, eventIdsToDelete: EventId[]): Promise { - debug('deleting events from %s: %o', pubkey, eventIdsToDelete) - - return this.masterDbClient('events') - .where('event_pubkey', toBuffer(pubkey)) - .whereIn('event_id', map(toBuffer)(eventIdsToDelete)) - .whereNull('deleted_at') - .update({ - deleted_at: this.masterDbClient.raw('now()'), - }) - } -} +import { + __, + always, + applySpec, + complement, + cond, + equals, + evolve, + filter, + forEach, + forEachObjIndexed, + groupBy, + ifElse, + invoker, + is, + isEmpty, + isNil, + map, + modulo, + nth, + omit, + path, + paths, + pipe, + prop, + propSatisfies, + T, + toPairs, +} from 'ramda' + +import { ContextMetadataKey, EventDeduplicationMetadataKey, EventExpirationTimeMetadataKey } from '../constants/base' +import { DatabaseClient, EventId } from '../@types/base' +import { DBEvent, Event } from '../@types/event' +import { IEventRepository, IQueryResult } from '../@types/repositories' +import { toBuffer, toJSON } from '../utils/transform' +import { createLogger } from '../factories/logger-factory' +import { isGenericTagQuery } from '../utils/filter' +import { SubscriptionFilter } from '../@types/subscription' + +const even = pipe(modulo(__, 2), equals(0)) + +const groupByLengthSpec = groupBy( + pipe( + prop('length'), + cond([ + [equals(64), always('exact')], + [even, always('even')], + [T, always('odd')], + ]) + ) +) + +const debug = createLogger('event-repository') + +export class EventRepository implements IEventRepository { + public constructor( + private readonly masterDbClient: DatabaseClient, + private readonly readReplicaDbClient: DatabaseClient, + ) { } + + public findByFilters(filters: SubscriptionFilter[]): IQueryResult { + debug('querying for %o', filters) + if (!Array.isArray(filters) || !filters.length) { + throw new Error('Filters cannot be empty') + } + const queries = filters.map((currentFilter) => { + const builder = this.readReplicaDbClient('events') + + forEachObjIndexed((tableFields: string[], filterName: string | number) => { + builder.andWhere((bd) => { + cond([ + [isEmpty, () => void bd.whereRaw('1 = 0')], + [ + complement(isNil), + pipe( + groupByLengthSpec, + evolve({ + exact: (pubkeys: string[]) => + tableFields.forEach((tableField) => + bd.orWhereIn(tableField, pubkeys.map(toBuffer)) + ), + even: forEach((prefix: string) => + tableFields.forEach((tableField) => + bd.orWhereRaw( + `substring("${tableField}" from 1 for ?) = ?`, + [prefix.length >> 1, toBuffer(prefix)] + ) + ) + ), + odd: forEach((prefix: string) => + tableFields.forEach((tableField) => + bd.orWhereRaw( + `substring("${tableField}" from 1 for ?) BETWEEN ? AND ?`, + [ + (prefix.length >> 1) + 1, + `\\x${prefix}0`, + `\\x${prefix}f`, + ], + ) + ) + ), + } as any), + ), + ], + ])(currentFilter[filterName] as string[]) + }) + })({ + authors: ['event_pubkey'], + ids: ['event_id'], + }) + + if (Array.isArray(currentFilter.kinds)) { + builder.whereIn('event_kind', currentFilter.kinds) + } + + if (typeof currentFilter.since === 'number') { + builder.where('event_created_at', '>=', currentFilter.since) + } + + if (typeof currentFilter.until === 'number') { + builder.where('event_created_at', '<=', currentFilter.until) + } + + if (typeof currentFilter.limit === 'number') { + builder.limit(currentFilter.limit).orderBy('event_created_at', 'DESC') + } else { + builder.limit(500).orderBy('event_created_at', 'asc') + } + + const andWhereRaw = invoker(1, 'andWhereRaw') + const orWhereRaw = invoker(2, 'orWhereRaw') + + let isTagQuery = false + pipe( + toPairs, + filter(pipe(nth(0) as () => string, isGenericTagQuery)) as any, + forEach(([filterName, criteria]: [string, string[]]) => { + isTagQuery = true + builder.andWhere((bd) => { + ifElse( + isEmpty, + () => andWhereRaw('1 = 0', bd), + forEach((criterion: string) => void orWhereRaw( + 'event_tags.tag_name = ? AND event_tags.tag_value = ?', + [filterName[1], criterion], + bd, + )), + )(criteria) + }) + }), + )(currentFilter as any) + + if (isTagQuery) { + builder.leftJoin('event_tags', 'events.event_id', 'event_tags.event_id') + .select('events.*') + } + + return builder + }) + + const [query, ...subqueries] = queries + if (subqueries.length) { + query.union(subqueries, true) + } + + return query + } + + public async create(event: Event): Promise { + return this.insert(event).then(prop('rowCount') as () => number, () => 0) + } + + public async createMany(events: Event[]): Promise { + if (!events.length) { + return 0 + } + + const rows = events.map((event) => this.toInsertRow(event)) + + return this.masterDbClient('events') + .insert(rows) + .onConflict() + .ignore() + .then(prop('rowCount') as () => number, () => 0) + } + + private toInsertRow(event: Event) { + return applySpec({ + event_id: pipe(prop('id'), toBuffer), + event_pubkey: pipe(prop('pubkey'), toBuffer), + event_created_at: prop('created_at'), + event_kind: prop('kind'), + event_tags: pipe(prop('tags'), toJSON), + event_content: prop('content'), + event_signature: pipe(prop('sig'), toBuffer), + remote_address: path([ContextMetadataKey as any, 'remoteAddress', 'address']), + expires_at: ifElse( + propSatisfies(is(Number), EventExpirationTimeMetadataKey), + prop(EventExpirationTimeMetadataKey as any), + always(null), + ), + })(event) + } + + private insert(event: Event) { + debug('inserting event: %o', event) + const row = this.toInsertRow(event) + + return this.masterDbClient('events') + .insert(row) + .onConflict() + .ignore() + } + + public upsert(event: Event): Promise { + debug('upserting event: %o', event) + + const row = this.toUpsertRow(event) + + const query = this.masterDbClient('events') + .insert(row) + // NIP-16: Replaceable Events + // NIP-33: Parameterized Replaceable Events + .onConflict( + this.masterDbClient.raw( + '(event_pubkey, event_kind, event_deduplication) WHERE (event_kind = 0 OR event_kind = 3 OR event_kind = 41 OR (event_kind >= 10000 AND event_kind < 20000)) OR (event_kind >= 30000 AND event_kind < 40000)' + ) + ) + .merge(omit(['event_pubkey', 'event_kind', 'event_deduplication'])(row)) + .where('events.event_created_at', '<', row.event_created_at) + + return { + then: (onfulfilled: (value: number) => T1 | PromiseLike, onrejected: (reason: any) => T2 | PromiseLike) => query.then(prop('rowCount') as () => number).then(onfulfilled, onrejected), + catch: (onrejected: (reason: any) => T | PromiseLike) => query.catch(onrejected), + toString: (): string => query.toString(), + } as Promise + } + + public async upsertMany(events: Event[]): Promise { + if (!events.length) { + return 0 + } + + const rows = events.map((event) => this.toUpsertRow(event)) + + return this.masterDbClient('events') + .insert(rows) + .onConflict( + this.masterDbClient.raw( + '(event_pubkey, event_kind, event_deduplication) WHERE (event_kind = 0 OR event_kind = 3 OR event_kind = 41 OR (event_kind >= 10000 AND event_kind < 20000)) OR (event_kind >= 30000 AND event_kind < 40000)' + ) + ) + .merge(['deleted_at', 'event_content', 'event_created_at', 'event_id', 'event_signature', 'event_tags', 'expires_at']) + .whereRaw('"events"."event_created_at" < "excluded"."event_created_at"') + .then(prop('rowCount') as () => number, () => 0) + } + + private toUpsertRow(event: Event) { + const toJSON = (input: any) => JSON.stringify(input) + + return applySpec({ + event_id: pipe(prop('id'), toBuffer), + event_pubkey: pipe(prop('pubkey'), toBuffer), + event_created_at: prop('created_at'), + event_kind: prop('kind'), + event_tags: pipe(prop('tags'), toJSON), + event_content: prop('content'), + event_signature: pipe(prop('sig'), toBuffer), + event_deduplication: ifElse( + propSatisfies(isNil, EventDeduplicationMetadataKey), + pipe(paths([['pubkey'], ['kind']]), toJSON), + pipe(prop(EventDeduplicationMetadataKey as any), toJSON), + ), + remote_address: path([ContextMetadataKey as any, 'remoteAddress', 'address']), + expires_at: ifElse( + propSatisfies(is(Number), EventExpirationTimeMetadataKey), + prop(EventExpirationTimeMetadataKey as any), + always(null), + ), + deleted_at: always(null), + })(event) + } + + public deleteByPubkeyAndIds(pubkey: string, eventIdsToDelete: EventId[]): Promise { + debug('deleting events from %s: %o', pubkey, eventIdsToDelete) + + return this.masterDbClient('events') + .where('event_pubkey', toBuffer(pubkey)) + .whereIn('event_id', map(toBuffer)(eventIdsToDelete)) + .whereNull('deleted_at') + .update({ + deleted_at: this.masterDbClient.raw('now()'), + }) + } +} diff --git a/src/services/event-import-service.ts b/src/services/event-import-service.ts index 2b8590e3..83855a93 100644 --- a/src/services/event-import-service.ts +++ b/src/services/event-import-service.ts @@ -220,7 +220,6 @@ export class EventImportService { } await flushBatch() - onProgress({ ...stats }) return stats } finally { diff --git a/test/unit/services/event-import-service.spec.ts b/test/unit/services/event-import-service.spec.ts index a7e35411..f6b5572c 100644 --- a/test/unit/services/event-import-service.spec.ts +++ b/test/unit/services/event-import-service.spec.ts @@ -81,6 +81,8 @@ describe('EventImportService', () => { expect(firstBatch.map(({ id }) => id)).to.deep.equal([event.id, event.id]) expect(secondBatch.map(({ id }) => id)).to.deep.equal([event.id]) + expect(progressUpdates.length).to.equal(2) + const finalProgress = progressUpdates[progressUpdates.length - 1] expect(finalProgress).to.deep.equal(stats)