From 4bcb2b11a175da2558ee132b4854bbdf50fdfcc7 Mon Sep 17 00:00:00 2001 From: Jianong Date: Fri, 26 Jun 2026 20:08:16 +0800 Subject: [PATCH 1/2] fix: reuse MySQLRecordManager DataSource --- .../MySQLrecordManager.test.ts | 138 ++++++++++++++++++ .../MySQLRecordManager/MySQLrecordManager.ts | 128 +++++++++------- packages/components/src/indexing.test.ts | 80 ++++++++++ packages/components/src/indexing.ts | 11 ++ .../src/services/documentstore/index.ts | 6 +- 5 files changed, 313 insertions(+), 50 deletions(-) create mode 100644 packages/components/nodes/recordmanager/MySQLRecordManager/MySQLrecordManager.test.ts create mode 100644 packages/components/src/indexing.test.ts diff --git a/packages/components/nodes/recordmanager/MySQLRecordManager/MySQLrecordManager.test.ts b/packages/components/nodes/recordmanager/MySQLRecordManager/MySQLrecordManager.test.ts new file mode 100644 index 00000000000..6e2c3e4487e --- /dev/null +++ b/packages/components/nodes/recordmanager/MySQLRecordManager/MySQLrecordManager.test.ts @@ -0,0 +1,138 @@ +const mockDataSourceInstances: Array<{ + isInitialized: boolean + initialize: jest.Mock + destroy: jest.Mock + createQueryRunner: jest.Mock +}> = [] + +const mockQueryRunners: Array<{ + isReleased: boolean + manager: { + query: jest.Mock + } + release: jest.Mock +}> = [] + +jest.mock('typeorm', () => { + class MockDataSource { + options: unknown + isInitialized = false + initialize = jest.fn(async () => { + this.isInitialized = true + return this + }) + destroy = jest.fn(async () => { + this.isInitialized = false + }) + createQueryRunner = jest.fn(() => { + const queryRunner = { + isReleased: false, + manager: { + query: jest.fn(async (query: string) => { + if (query.includes('UNIX_TIMESTAMP')) { + return [{ epoch: '123' }] + } + + if (query.includes('SELECT `key`')) { + return [{ key: 'a' }] + } + + return [] + }) + }, + release: jest.fn(async () => { + queryRunner.isReleased = true + }) + } + + mockQueryRunners.push(queryRunner) + return queryRunner + }) + + constructor(options: unknown) { + this.options = options + mockDataSourceInstances.push(this) + } + } + + return { + DataSource: MockDataSource + } +}) + +const { MySQLRecordManager } = require('./MySQLrecordManager') + +const createManager = () => + new MySQLRecordManager('test_namespace', { + mysqlOptions: { + type: 'mysql', + host: 'localhost', + port: 3306, + username: 'user', + password: 'password', + database: 'flowise' + }, + tableName: 'upsertion_records' + }) + +describe('MySQLRecordManager connection lifecycle', () => { + beforeEach(() => { + jest.clearAllMocks() + mockDataSourceInstances.length = 0 + mockQueryRunners.length = 0 + }) + + it('reuses one initialized DataSource across operations and destroys it only on close', async () => { + const manager = createManager() + + await expect(manager.exists(['a'])).resolves.toEqual([true]) + await expect(manager.listKeys()).resolves.toEqual(['a']) + await expect(manager.getTime()).resolves.toBe(123) + + expect(mockDataSourceInstances).toHaveLength(1) + expect(mockDataSourceInstances[0].initialize).toHaveBeenCalledTimes(1) + expect(mockDataSourceInstances[0].destroy).not.toHaveBeenCalled() + expect(mockDataSourceInstances[0].createQueryRunner).toHaveBeenCalledTimes(3) + expect(mockQueryRunners).toHaveLength(3) + mockQueryRunners.forEach((queryRunner) => { + expect(queryRunner.release).toHaveBeenCalledTimes(1) + expect(queryRunner.isReleased).toBe(true) + }) + + await manager.close() + + expect(mockDataSourceInstances[0].destroy).toHaveBeenCalledTimes(1) + expect(mockDataSourceInstances[0].isInitialized).toBe(false) + }) + + it('shares the pending DataSource initialization across concurrent operations', async () => { + const manager = createManager() + + await expect(Promise.all([manager.exists(['a']), manager.listKeys()])).resolves.toEqual([[true], ['a']]) + + expect(mockDataSourceInstances).toHaveLength(1) + expect(mockDataSourceInstances[0].initialize).toHaveBeenCalledTimes(1) + expect(mockDataSourceInstances[0].destroy).not.toHaveBeenCalled() + expect(mockDataSourceInstances[0].createQueryRunner).toHaveBeenCalledTimes(2) + + await manager.close() + }) + + it('releases query runners when update validation fails', async () => { + const consoleErrorSpy = jest.spyOn(console, 'error').mockImplementation(() => undefined) + const manager = createManager() + + await expect(manager.update(['a'], { timeAtLeast: 999 })).rejects.toThrow('Time sync issue with database 123 < 999') + + expect(mockDataSourceInstances).toHaveLength(1) + expect(mockDataSourceInstances[0].destroy).not.toHaveBeenCalled() + expect(mockQueryRunners).toHaveLength(2) + mockQueryRunners.forEach((queryRunner) => { + expect(queryRunner.release).toHaveBeenCalledTimes(1) + expect(queryRunner.isReleased).toBe(true) + }) + + await manager.close() + consoleErrorSpy.mockRestore() + }) +}) diff --git a/packages/components/nodes/recordmanager/MySQLRecordManager/MySQLrecordManager.ts b/packages/components/nodes/recordmanager/MySQLRecordManager/MySQLrecordManager.ts index 37793f38aa1..efa5770240d 100644 --- a/packages/components/nodes/recordmanager/MySQLRecordManager/MySQLrecordManager.ts +++ b/packages/components/nodes/recordmanager/MySQLRecordManager/MySQLrecordManager.ts @@ -2,7 +2,7 @@ import { ICommonObject, INode, INodeData, INodeParams } from '../../../src/Inter import { getBaseClasses, getCredentialData, getCredentialParam } from '../../../src/utils' import { sanitizeDataSourceOptions } from '../../../src/sanitizeDataSourceOptions' import { ListKeyOptions, RecordManagerInterface, UpdateOptions } from '@langchain/community/indexes/base' -import { DataSource } from 'typeorm' +import { DataSource, QueryRunner } from 'typeorm' class MySQLRecordManager_RecordManager implements INode { label: string @@ -173,6 +173,8 @@ class MySQLRecordManager implements RecordManagerInterface { config: MySQLRecordManagerOptions tableName: string namespace: string + private dataSource?: DataSource + private dataSourcePromise?: Promise constructor(namespace: string, config: MySQLRecordManagerOptions) { const { tableName } = config @@ -202,15 +204,56 @@ class MySQLRecordManager implements RecordManagerInterface { if (mysqlOptions.port === 5432) { throw new Error('Invalid port number') } - const dataSource = new DataSource(mysqlOptions) - await dataSource.initialize() - return dataSource + + if (this.dataSource?.isInitialized) { + return this.dataSource + } + + if (this.dataSourcePromise) { + return this.dataSourcePromise + } + + this.dataSourcePromise = (async () => { + const dataSource = new DataSource(mysqlOptions) + await dataSource.initialize() + this.dataSource = dataSource + return dataSource + })() + + try { + return await this.dataSourcePromise + } catch (error) { + this.dataSource = undefined + throw error + } finally { + this.dataSourcePromise = undefined + } } - async createSchema(): Promise { + private async getQueryRunner(): Promise { const dataSource = await this.getDataSource() + return dataSource.createQueryRunner() + } + + async close(): Promise { + const dataSource = this.dataSourcePromise ? await this.dataSourcePromise.catch(() => undefined) : this.dataSource + this.dataSourcePromise = undefined + this.dataSource = undefined + + if (dataSource?.isInitialized) { + await dataSource.destroy() + } + } + + private async releaseQueryRunner(queryRunner: QueryRunner): Promise { + if (!queryRunner.isReleased) { + await queryRunner.release() + } + } + + async createSchema(): Promise { + const queryRunner = await this.getQueryRunner() try { - const queryRunner = dataSource.createQueryRunner() const tableName = this.sanitizeTableName(this.tableName) await queryRunner.manager.query(`create table if not exists \`${this.sanitizeTableName(tableName)}\` ( @@ -259,8 +302,6 @@ class MySQLRecordManager implements RecordManagerInterface { } } } - - await queryRunner.release() } catch (e: any) { // This error indicates that the table already exists // Due to asynchronous nature of the code, it is possible that @@ -271,22 +312,20 @@ class MySQLRecordManager implements RecordManagerInterface { } throw e } finally { - await dataSource.destroy() + await this.releaseQueryRunner(queryRunner) } } async getTime(): Promise { - const dataSource = await this.getDataSource() + const queryRunner = await this.getQueryRunner() try { - const queryRunner = dataSource.createQueryRunner() const res = await queryRunner.manager.query(`SELECT UNIX_TIMESTAMP(NOW()) AS epoch`) - await queryRunner.release() return Number.parseFloat(res[0].epoch) } catch (error) { console.error('Error getting time in MySQLRecordManager:') throw error } finally { - await dataSource.destroy() + await this.releaseQueryRunner(queryRunner) } } @@ -295,48 +334,45 @@ class MySQLRecordManager implements RecordManagerInterface { return } - const dataSource = await this.getDataSource() - const queryRunner = dataSource.createQueryRunner() - const tableName = this.sanitizeTableName(this.tableName) + const queryRunner = await this.getQueryRunner() + try { + const tableName = this.sanitizeTableName(this.tableName) - const updatedAt = await this.getTime() - const { timeAtLeast, groupIds: _groupIds } = updateOptions ?? {} + const updatedAt = await this.getTime() + const { timeAtLeast, groupIds: _groupIds } = updateOptions ?? {} - if (timeAtLeast && updatedAt < timeAtLeast) { - throw new Error(`Time sync issue with database ${updatedAt} < ${timeAtLeast}`) - } + if (timeAtLeast && updatedAt < timeAtLeast) { + throw new Error(`Time sync issue with database ${updatedAt} < ${timeAtLeast}`) + } - // Handle both new format (objects with uid and docId) and old format (strings) - const isNewFormat = keys.length > 0 && typeof keys[0] === 'object' && 'uid' in keys[0] - const keyStrings = isNewFormat ? (keys as Array<{ uid: string; docId: string }>).map((k) => k.uid) : (keys as string[]) - const docIds = isNewFormat ? (keys as Array<{ uid: string; docId: string }>).map((k) => k.docId) : keys.map(() => null) + // Handle both new format (objects with uid and docId) and old format (strings) + const isNewFormat = keys.length > 0 && typeof keys[0] === 'object' && 'uid' in keys[0] + const keyStrings = isNewFormat ? (keys as Array<{ uid: string; docId: string }>).map((k) => k.uid) : (keys as string[]) + const docIds = isNewFormat ? (keys as Array<{ uid: string; docId: string }>).map((k) => k.docId) : keys.map(() => null) - const groupIds = _groupIds ?? keyStrings.map(() => null) + const groupIds = _groupIds ?? keyStrings.map(() => null) - if (groupIds.length !== keyStrings.length) { - throw new Error(`Number of keys (${keyStrings.length}) does not match number of group_ids (${groupIds.length})`) - } + if (groupIds.length !== keyStrings.length) { + throw new Error(`Number of keys (${keyStrings.length}) does not match number of group_ids (${groupIds.length})`) + } - const recordsToUpsert = keyStrings.map((key, i) => [key, this.namespace, updatedAt, groupIds[i] ?? null, docIds[i] ?? null]) + const recordsToUpsert = keyStrings.map((key, i) => [key, this.namespace, updatedAt, groupIds[i] ?? null, docIds[i] ?? null]) - const query = ` + const query = ` INSERT INTO \`${tableName}\` (\`key\`, \`namespace\`, \`updated_at\`, \`group_id\`, \`doc_id\`) VALUES (?, ?, ?, ?, ?) ON DUPLICATE KEY UPDATE \`updated_at\` = VALUES(\`updated_at\`), \`doc_id\` = VALUES(\`doc_id\`)` - // To handle multiple files upsert - try { + // To handle multiple files upsert for (const record of recordsToUpsert) { // Consider using a transaction for batch operations await queryRunner.manager.query(query, record.flat()) } - - await queryRunner.release() } catch (error) { console.error('Error updating in MySQLRecordManager:') throw error } finally { - await dataSource.destroy() + await this.releaseQueryRunner(queryRunner) } } @@ -345,8 +381,7 @@ class MySQLRecordManager implements RecordManagerInterface { return [] } - const dataSource = await this.getDataSource() - const queryRunner = dataSource.createQueryRunner() + const queryRunner = await this.getQueryRunner() const tableName = this.sanitizeTableName(this.tableName) // Prepare the placeholders and the query @@ -368,19 +403,17 @@ class MySQLRecordManager implements RecordManagerInterface { keys.forEach((key, index) => { existsArray[index] = existingKeysSet.has(key) }) - await queryRunner.release() return existsArray } catch (error) { console.error('Error checking existence of keys') throw error } finally { - await dataSource.destroy() + await this.releaseQueryRunner(queryRunner) } } async listKeys(options?: ListKeyOptions & { docId?: string }): Promise { - const dataSource = await this.getDataSource() - const queryRunner = dataSource.createQueryRunner() + const queryRunner = await this.getQueryRunner() const tableName = this.sanitizeTableName(this.tableName) try { @@ -420,13 +453,12 @@ class MySQLRecordManager implements RecordManagerInterface { // Directly using try/catch with async/await for cleaner flow const result = await queryRunner.manager.query(query, values) - await queryRunner.release() return result.map((row: { key: string }) => row.key) } catch (error) { console.error('MySQLRecordManager listKeys Error: ') throw error } finally { - await dataSource.destroy() + await this.releaseQueryRunner(queryRunner) } } @@ -435,8 +467,7 @@ class MySQLRecordManager implements RecordManagerInterface { return } - const dataSource = await this.getDataSource() - const queryRunner = dataSource.createQueryRunner() + const queryRunner = await this.getQueryRunner() const tableName = this.sanitizeTableName(this.tableName) const placeholders = keys.map(() => '?').join(', ') @@ -446,14 +477,13 @@ class MySQLRecordManager implements RecordManagerInterface { // Directly using try/catch with async/await for cleaner flow try { await queryRunner.manager.query(query, values) - await queryRunner.release() } catch (error) { console.error('Error deleting keys') throw error } finally { - await dataSource.destroy() + await this.releaseQueryRunner(queryRunner) } } } -module.exports = { nodeClass: MySQLRecordManager_RecordManager } +module.exports = { nodeClass: MySQLRecordManager_RecordManager, MySQLRecordManager } diff --git a/packages/components/src/indexing.test.ts b/packages/components/src/indexing.test.ts new file mode 100644 index 00000000000..f126f93bd1f --- /dev/null +++ b/packages/components/src/indexing.test.ts @@ -0,0 +1,80 @@ +import { Document } from '@langchain/core/documents' +import { index } from './indexing' + +const createRecordManager = () => ({ + getTime: jest.fn().mockResolvedValue(100), + exists: jest.fn().mockResolvedValue([false]), + update: jest.fn().mockResolvedValue(undefined), + listKeys: jest.fn().mockResolvedValue(['stored-key']), + deleteKeys: jest.fn().mockResolvedValue(undefined), + close: jest.fn().mockResolvedValue(undefined) +}) + +const createDocs = () => [ + new Document({ + pageContent: 'hello', + metadata: { + docId: 'doc-1', + source: 'source-1' + } + }) +] + +describe('index record manager lifecycle', () => { + it('closes record managers that expose a close method after successful indexing', async () => { + const recordManager = createRecordManager() + const vectorStore = { + addDocuments: jest.fn().mockResolvedValue(undefined), + delete: jest.fn().mockResolvedValue(undefined) + } + + await expect( + index({ + docsSource: createDocs(), + recordManager: recordManager as any, + vectorStore: vectorStore as any, + options: { + sourceIdKey: 'source' + } + }) + ).resolves.toEqual({ + numAdded: 1, + numDeleted: 0, + numUpdated: 0, + numSkipped: 0, + totalKeys: 1, + addedDocs: [ + { + pageContent: 'hello', + metadata: { + docId: 'doc-1', + source: 'source-1' + } + } + ] + }) + + expect(recordManager.close).toHaveBeenCalledTimes(1) + }) + + it('closes record managers that expose a close method when indexing fails', async () => { + const recordManager = createRecordManager() + const vectorStore = { + addDocuments: jest.fn().mockRejectedValue(new Error('add failed')), + delete: jest.fn().mockResolvedValue(undefined) + } + + await expect( + index({ + docsSource: createDocs(), + recordManager: recordManager as any, + vectorStore: vectorStore as any, + options: { + sourceIdKey: 'source' + } + }) + ).rejects.toThrow('add failed') + + expect(recordManager.close).toHaveBeenCalledTimes(1) + }) +}) diff --git a/packages/components/src/indexing.ts b/packages/components/src/indexing.ts index 44c22469625..b870f70c064 100644 --- a/packages/components/src/indexing.ts +++ b/packages/components/src/indexing.ts @@ -9,6 +9,7 @@ type Metadata = Record export interface ExtendedRecordManagerInterface extends RecordManagerInterface { update(keys: Array<{ uid: string; docId: string }> | string[], updateOptions?: Record): Promise + close?(): Promise } type StringOrDocFunc = string | ((doc: DocumentInterface) => string) @@ -255,6 +256,16 @@ interface IndexArgs { * @returns {Promise} */ export async function index(args: IndexArgs): Promise { + const { recordManager } = args + + try { + return await indexWithRecordManager(args) + } finally { + await recordManager.close?.() + } +} + +async function indexWithRecordManager(args: IndexArgs): Promise { const { docsSource, recordManager, vectorStore, options } = args const { batchSize = 100, cleanup, sourceIdKey, cleanupBatchSize = 1000, forceUpdate = false, vectorStoreName } = options ?? {} diff --git a/packages/server/src/services/documentstore/index.ts b/packages/server/src/services/documentstore/index.ts index a7dc444f1d6..6edac1bf914 100644 --- a/packages/server/src/services/documentstore/index.ts +++ b/packages/server/src/services/documentstore/index.ts @@ -397,6 +397,8 @@ const deleteDocumentStoreFileChunk = async (storeId: string, docId: string, chun } const deleteVectorStoreFromStore = async (storeId: string, workspaceId: string, docId?: string) => { + let recordManagerObj: any + try { const appServer = getRunningExpressApp() const componentNodes = appServer.nodesPool.componentNodes @@ -433,7 +435,7 @@ const deleteVectorStoreFromStore = async (storeId: string, workspaceId: string, // Get Record Manager Instance const recordManagerConfig = JSON.parse(entity.recordManagerConfig) - const recordManagerObj = await _createRecordManagerObject( + recordManagerObj = await _createRecordManagerObject( componentNodes, { recordManagerName: recordManagerConfig.name, recordManagerConfig: recordManagerConfig.config }, options @@ -473,6 +475,8 @@ const deleteVectorStoreFromStore = async (storeId: string, workspaceId: string, StatusCodes.INTERNAL_SERVER_ERROR, `Error: documentStoreServices.deleteVectorStoreFromStore - ${getErrorMessage(error)}` ) + } finally { + await recordManagerObj?.close?.() } } From c3383d116160182c6d5ff6906e37ab2203c3d3d3 Mon Sep 17 00:00:00 2001 From: Jianong Date: Fri, 26 Jun 2026 20:29:10 +0800 Subject: [PATCH 2/2] fix: address MySQLRecordManager lifecycle review --- .../MySQLrecordManager.test.ts | 44 ++++++++++-- .../MySQLRecordManager/MySQLrecordManager.ts | 69 ++++++++++++------- 2 files changed, 84 insertions(+), 29 deletions(-) diff --git a/packages/components/nodes/recordmanager/MySQLRecordManager/MySQLrecordManager.test.ts b/packages/components/nodes/recordmanager/MySQLRecordManager/MySQLrecordManager.test.ts index 6e2c3e4487e..892bcc51734 100644 --- a/packages/components/nodes/recordmanager/MySQLRecordManager/MySQLrecordManager.test.ts +++ b/packages/components/nodes/recordmanager/MySQLRecordManager/MySQLrecordManager.test.ts @@ -62,7 +62,7 @@ jest.mock('typeorm', () => { const { MySQLRecordManager } = require('./MySQLrecordManager') -const createManager = () => +const createManager = (tableName = 'upsertion_records') => new MySQLRecordManager('test_namespace', { mysqlOptions: { type: 'mysql', @@ -72,7 +72,7 @@ const createManager = () => password: 'password', database: 'flowise' }, - tableName: 'upsertion_records' + tableName }) describe('MySQLRecordManager connection lifecycle', () => { @@ -118,7 +118,18 @@ describe('MySQLRecordManager connection lifecycle', () => { await manager.close() }) - it('releases query runners when update validation fails', async () => { + it('destroys the cached DataSource once when close is called concurrently', async () => { + const manager = createManager() + + await expect(manager.getTime()).resolves.toBe(123) + await Promise.all([manager.close(), manager.close()]) + + expect(mockDataSourceInstances).toHaveLength(1) + expect(mockDataSourceInstances[0].destroy).toHaveBeenCalledTimes(1) + expect(mockDataSourceInstances[0].isInitialized).toBe(false) + }) + + it('uses the active query runner for update time checks', async () => { const consoleErrorSpy = jest.spyOn(console, 'error').mockImplementation(() => undefined) const manager = createManager() @@ -126,8 +137,33 @@ describe('MySQLRecordManager connection lifecycle', () => { expect(mockDataSourceInstances).toHaveLength(1) expect(mockDataSourceInstances[0].destroy).not.toHaveBeenCalled() - expect(mockQueryRunners).toHaveLength(2) + expect(mockDataSourceInstances[0].createQueryRunner).toHaveBeenCalledTimes(1) + expect(mockQueryRunners).toHaveLength(1) + expect(mockQueryRunners[0].manager.query).toHaveBeenCalledTimes(1) + expect(mockQueryRunners[0].manager.query).toHaveBeenCalledWith('SELECT UNIX_TIMESTAMP(NOW()) AS epoch') + mockQueryRunners.forEach((queryRunner) => { + expect(queryRunner.release).toHaveBeenCalledTimes(1) + expect(queryRunner.isReleased).toBe(true) + }) + + await manager.close() + consoleErrorSpy.mockRestore() + }) + + it('releases query runners when table name validation fails', async () => { + const consoleErrorSpy = jest.spyOn(console, 'error').mockImplementation(() => undefined) + const manager = createManager('invalid-table-name') + + await expect(manager.exists(['a'])).rejects.toThrow('Invalid table name') + await expect(manager.listKeys()).rejects.toThrow('Invalid table name') + await expect(manager.deleteKeys(['a'])).rejects.toThrow('Invalid table name') + + expect(mockDataSourceInstances).toHaveLength(1) + expect(mockDataSourceInstances[0].destroy).not.toHaveBeenCalled() + expect(mockDataSourceInstances[0].createQueryRunner).toHaveBeenCalledTimes(3) + expect(mockQueryRunners).toHaveLength(3) mockQueryRunners.forEach((queryRunner) => { + expect(queryRunner.manager.query).not.toHaveBeenCalled() expect(queryRunner.release).toHaveBeenCalledTimes(1) expect(queryRunner.isReleased).toBe(true) }) diff --git a/packages/components/nodes/recordmanager/MySQLRecordManager/MySQLrecordManager.ts b/packages/components/nodes/recordmanager/MySQLRecordManager/MySQLrecordManager.ts index efa5770240d..4896734b4f3 100644 --- a/packages/components/nodes/recordmanager/MySQLRecordManager/MySQLrecordManager.ts +++ b/packages/components/nodes/recordmanager/MySQLRecordManager/MySQLrecordManager.ts @@ -175,6 +175,7 @@ class MySQLRecordManager implements RecordManagerInterface { namespace: string private dataSource?: DataSource private dataSourcePromise?: Promise + private dataSourceInitToken?: symbol constructor(namespace: string, config: MySQLRecordManagerOptions) { const { tableName } = config @@ -213,20 +214,31 @@ class MySQLRecordManager implements RecordManagerInterface { return this.dataSourcePromise } - this.dataSourcePromise = (async () => { + const dataSourceInitToken = Symbol('dataSourceInit') + this.dataSourceInitToken = dataSourceInitToken + + const dataSourcePromise = (async () => { const dataSource = new DataSource(mysqlOptions) await dataSource.initialize() - this.dataSource = dataSource + if (this.dataSourceInitToken === dataSourceInitToken) { + this.dataSource = dataSource + } return dataSource })() + this.dataSourcePromise = dataSourcePromise try { - return await this.dataSourcePromise + return await dataSourcePromise } catch (error) { - this.dataSource = undefined + if (this.dataSourceInitToken === dataSourceInitToken) { + this.dataSource = undefined + this.dataSourceInitToken = undefined + } throw error } finally { - this.dataSourcePromise = undefined + if (this.dataSourcePromise === dataSourcePromise) { + this.dataSourcePromise = undefined + } } } @@ -236,12 +248,15 @@ class MySQLRecordManager implements RecordManagerInterface { } async close(): Promise { - const dataSource = this.dataSourcePromise ? await this.dataSourcePromise.catch(() => undefined) : this.dataSource + const dataSourcePromise = this.dataSourcePromise + const dataSource = this.dataSource this.dataSourcePromise = undefined this.dataSource = undefined + this.dataSourceInitToken = undefined - if (dataSource?.isInitialized) { - await dataSource.destroy() + const ds = dataSourcePromise ? await dataSourcePromise.catch(() => undefined) : dataSource + if (ds?.isInitialized) { + await ds.destroy() } } @@ -319,8 +334,7 @@ class MySQLRecordManager implements RecordManagerInterface { async getTime(): Promise { const queryRunner = await this.getQueryRunner() try { - const res = await queryRunner.manager.query(`SELECT UNIX_TIMESTAMP(NOW()) AS epoch`) - return Number.parseFloat(res[0].epoch) + return await this.getTimeWithQueryRunner(queryRunner) } catch (error) { console.error('Error getting time in MySQLRecordManager:') throw error @@ -329,6 +343,11 @@ class MySQLRecordManager implements RecordManagerInterface { } } + private async getTimeWithQueryRunner(queryRunner: QueryRunner): Promise { + const res = await queryRunner.manager.query(`SELECT UNIX_TIMESTAMP(NOW()) AS epoch`) + return Number.parseFloat(res[0].epoch) + } + async update(keys: Array<{ uid: string; docId: string }> | string[], updateOptions?: UpdateOptions): Promise { if (keys.length === 0) { return @@ -338,7 +357,7 @@ class MySQLRecordManager implements RecordManagerInterface { try { const tableName = this.sanitizeTableName(this.tableName) - const updatedAt = await this.getTime() + const updatedAt = await this.getTimeWithQueryRunner(queryRunner) const { timeAtLeast, groupIds: _groupIds } = updateOptions ?? {} if (timeAtLeast && updatedAt < timeAtLeast) { @@ -382,19 +401,18 @@ class MySQLRecordManager implements RecordManagerInterface { } const queryRunner = await this.getQueryRunner() - const tableName = this.sanitizeTableName(this.tableName) + try { + const tableName = this.sanitizeTableName(this.tableName) - // Prepare the placeholders and the query - const placeholders = keys.map(() => `?`).join(', ') - const query = ` + // Prepare the placeholders and the query + const placeholders = keys.map(() => `?`).join(', ') + const query = ` SELECT \`key\` FROM \`${tableName}\` WHERE \`namespace\` = ? AND \`key\` IN (${placeholders})` - // Initialize an array to fill with the existence checks - const existsArray = new Array(keys.length).fill(false) - - try { + // Initialize an array to fill with the existence checks + const existsArray = new Array(keys.length).fill(false) // Execute the query const rows = await queryRunner.manager.query(query, [this.namespace, ...keys.flat()]) // Create a set of existing keys for faster lookup @@ -414,9 +432,9 @@ class MySQLRecordManager implements RecordManagerInterface { async listKeys(options?: ListKeyOptions & { docId?: string }): Promise { const queryRunner = await this.getQueryRunner() - const tableName = this.sanitizeTableName(this.tableName) try { + const tableName = this.sanitizeTableName(this.tableName) const { before, after, limit, groupIds, docId } = options ?? {} let query = `SELECT \`key\` FROM \`${tableName}\` WHERE \`namespace\` = ?` const values: (string | number | string[])[] = [this.namespace] @@ -468,14 +486,15 @@ class MySQLRecordManager implements RecordManagerInterface { } const queryRunner = await this.getQueryRunner() - const tableName = this.sanitizeTableName(this.tableName) - - const placeholders = keys.map(() => '?').join(', ') - const query = `DELETE FROM \`${tableName}\` WHERE \`namespace\` = ? AND \`key\` IN (${placeholders});` - const values = [this.namespace, ...keys].map((v) => (typeof v !== 'string' ? `${v}` : v)) // Directly using try/catch with async/await for cleaner flow try { + const tableName = this.sanitizeTableName(this.tableName) + + const placeholders = keys.map(() => '?').join(', ') + const query = `DELETE FROM \`${tableName}\` WHERE \`namespace\` = ? AND \`key\` IN (${placeholders});` + const values = [this.namespace, ...keys].map((v) => (typeof v !== 'string' ? `${v}` : v)) + await queryRunner.manager.query(query, values) } catch (error) { console.error('Error deleting keys')