From b319a4a16066087b6ac066c9c4f4b8c0b4d8cd04 Mon Sep 17 00:00:00 2001 From: razbroc Date: Wed, 11 Feb 2026 16:35:20 +0200 Subject: [PATCH 1/3] fix: handle ConflictError in JobHandler and add corresponding tests --- src/job/models/jobHandler.ts | 15 ++++++++--- tests/unit/job/jobHnadler/jobHandler.spec.ts | 16 ++++++++++++ .../tileMergeTaskManager.spec.ts | 26 +++++++++++++++++++ 3 files changed, 53 insertions(+), 4 deletions(-) diff --git a/src/job/models/jobHandler.ts b/src/job/models/jobHandler.ts index f5655f5..f00250b 100644 --- a/src/job/models/jobHandler.ts +++ b/src/job/models/jobHandler.ts @@ -5,6 +5,7 @@ import type { LayerNameFormats } from '@map-colonies/raster-shared'; import { TaskHandler as QueueClient } from '@map-colonies/mc-priority-queue'; import { SpanStatusCode } from '@opentelemetry/api'; import type { Logger } from '@map-colonies/js-logger'; +import { ConflictError } from '@map-colonies/error-types'; import type { IConfig, JobAndTaskTelemetry, PollingConfig, StepKey } from '../../common/interfaces'; import { JobTrackerClient } from '../../httpClients/jobTrackerClient'; @@ -71,16 +72,22 @@ export class JobHandler { logger.info({ msg: 'task attempts', taskAttempts, maxAttempts: this.pollingConfig.maxTaskAttempts }); const reachedMaxAttempts = taskAttempts >= this.pollingConfig.maxTaskAttempts; const isRecoverable = !(error instanceof ZodError) && !reachedMaxAttempts; - - await this.queueClient.reject(job.id, task.id, isRecoverable, reason); - - logger.error({ msg, reason, error, reachedMaxAttempts, isRecoverable }); + const isAborted = error instanceof ConflictError; taskTracker?.failure(errName); tracingSpan?.setStatus({ code: SpanStatusCode.ERROR, message: reason }); tracingSpan?.recordException(error instanceof Error ? error : new Error(reason)); + if (isAborted) { + logger.warn({ msg: 'Task was aborted due to a conflict. Returning to polling loop.', reason, error }); + return; + } + + logger.error({ msg, reason, error, reachedMaxAttempts, isRecoverable }); + await this.queueClient.reject(job.id, task.id, isRecoverable, reason); + if (!isRecoverable) { await this.jobTrackerClient.notify(task); } + } } diff --git a/tests/unit/job/jobHnadler/jobHandler.spec.ts b/tests/unit/job/jobHnadler/jobHandler.spec.ts index 041ba38..3d95d6e 100644 --- a/tests/unit/job/jobHnadler/jobHandler.spec.ts +++ b/tests/unit/job/jobHnadler/jobHandler.spec.ts @@ -1,4 +1,5 @@ import { ZodError } from 'zod'; +import { ConflictError } from '@map-colonies/error-types'; import { JobAndTaskTelemetry } from '../../../../src/common/interfaces'; import { registerDefaultConfig } from '../../mocks/configMock'; import { ingestionNewJob } from '../../mocks/jobsMockData'; @@ -39,5 +40,20 @@ describe('JobHandler', () => { expect(jobTrackerClientMock.notify).toHaveBeenCalledWith(task); /* eslint-enable @typescript-eslint/unbound-method */ }); + + it('should handle ConflictError without rejecting or notifying', async () => { + const { newJobHandler, queueClientMock, jobTrackerClientMock } = setupJobHandlerTest(); + const job = ingestionNewJob; + const task = createTasksTaskForIngestionNew; + const telemetry: JobAndTaskTelemetry = { taskTracker: undefined, tracingSpan: undefined }; + const error = new ConflictError('Job already aborted'); + + await newJobHandler['handleError'](error, job, task, telemetry); + + /* eslint-disable @typescript-eslint/unbound-method */ + expect(queueClientMock.reject).not.toHaveBeenCalled(); + expect(jobTrackerClientMock.notify).not.toHaveBeenCalled(); + /* eslint-enable @typescript-eslint/unbound-method */ + }); }); }); diff --git a/tests/unit/task/tileMergeTaskManager/tileMergeTaskManager.spec.ts b/tests/unit/task/tileMergeTaskManager/tileMergeTaskManager.spec.ts index 99003c2..876f8e6 100644 --- a/tests/unit/task/tileMergeTaskManager/tileMergeTaskManager.spec.ts +++ b/tests/unit/task/tileMergeTaskManager/tileMergeTaskManager.spec.ts @@ -2,6 +2,7 @@ import { randomUUID } from 'crypto'; import nock from 'nock'; import { TileOutputFormat } from '@map-colonies/raster-shared'; +import { ConflictError } from '@map-colonies/error-types'; import { configMock, registerDefaultConfig } from '../../mocks/configMock'; import { ingestionNewJob } from '../../mocks/jobsMockData'; import type { MergeTaskParameters, JobResumeState, MergeTilesTaskParams } from '../../../../src/common/interfaces'; @@ -477,5 +478,30 @@ describe('tileMergeTaskManager', () => { // Should reject when HTTP requests fail await expect(tileMergeTaskManager.pushTasks(mockInitTask, jobId, ingestionNewJob.type, tasks)).rejects.toThrow(); }); + + it('should handle ConflictError during batch publishing and stop immediately', async () => { + const { tileMergeTaskManager } = testContext; + + const jobId = randomUUID(); + + // Setup HTTP mock to return 409 Conflict + const jobManagerBaseUrl = configMock.get('jobManagement.config.jobManagerBaseUrl'); + const createTasksPath = `/jobs/${jobId}/tasks`; + + nock(jobManagerBaseUrl).post(createTasksPath).reply(409, { message: 'Job already aborted' }).persist(); + + const tasks = tileMergeTaskManager.buildTasks(buildTasksParams, mockInitTask); + + // Spy on enqueueTasks to verify it's only called once (stops after conflict) + const enqueueTasksSpy = jest.spyOn(tileMergeTaskManager as unknown as { enqueueTasks: jest.Func }, 'enqueueTasks'); + + // Should throw ConflictError when job manager returns 409 + await expect(tileMergeTaskManager.pushTasks(mockInitTask, jobId, ingestionNewJob.type, tasks)).rejects.toThrow(ConflictError); + + // Verify that enqueueTasks was called (at least once before the conflict) + expect(enqueueTasksSpy).toHaveBeenCalled(); + + enqueueTasksSpy.mockRestore(); + }); }); }); From 13a379d2439848435be411417509c8317620677c Mon Sep 17 00:00:00 2001 From: razbroc Date: Wed, 11 Feb 2026 16:40:16 +0200 Subject: [PATCH 2/3] style: remove unnecessary newline in JobHandler class --- src/job/models/jobHandler.ts | 1 - 1 file changed, 1 deletion(-) diff --git a/src/job/models/jobHandler.ts b/src/job/models/jobHandler.ts index f00250b..2af3b54 100644 --- a/src/job/models/jobHandler.ts +++ b/src/job/models/jobHandler.ts @@ -88,6 +88,5 @@ export class JobHandler { if (!isRecoverable) { await this.jobTrackerClient.notify(task); } - } } From c4c0011c683ab2058c63dd7405a287406f298c7b Mon Sep 17 00:00:00 2001 From: razbroc Date: Mon, 16 Feb 2026 17:40:45 +0200 Subject: [PATCH 3/3] fix: update handleError to treat ConflictError as unrecoverable and adjust tests accordingly --- src/job/models/jobHandler.ts | 8 +------- tests/unit/job/jobHnadler/jobHandler.spec.ts | 6 +++--- 2 files changed, 4 insertions(+), 10 deletions(-) diff --git a/src/job/models/jobHandler.ts b/src/job/models/jobHandler.ts index 2af3b54..634dc92 100644 --- a/src/job/models/jobHandler.ts +++ b/src/job/models/jobHandler.ts @@ -71,17 +71,11 @@ export class JobHandler { const taskAttempts = task.attempts + 1; // rejecting the task increments the attempts logger.info({ msg: 'task attempts', taskAttempts, maxAttempts: this.pollingConfig.maxTaskAttempts }); const reachedMaxAttempts = taskAttempts >= this.pollingConfig.maxTaskAttempts; - const isRecoverable = !(error instanceof ZodError) && !reachedMaxAttempts; - const isAborted = error instanceof ConflictError; + const isRecoverable = !(error instanceof ZodError) && !(error instanceof ConflictError) && !reachedMaxAttempts; taskTracker?.failure(errName); tracingSpan?.setStatus({ code: SpanStatusCode.ERROR, message: reason }); tracingSpan?.recordException(error instanceof Error ? error : new Error(reason)); - if (isAborted) { - logger.warn({ msg: 'Task was aborted due to a conflict. Returning to polling loop.', reason, error }); - return; - } - logger.error({ msg, reason, error, reachedMaxAttempts, isRecoverable }); await this.queueClient.reject(job.id, task.id, isRecoverable, reason); diff --git a/tests/unit/job/jobHnadler/jobHandler.spec.ts b/tests/unit/job/jobHnadler/jobHandler.spec.ts index 3d95d6e..d8ef5a4 100644 --- a/tests/unit/job/jobHnadler/jobHandler.spec.ts +++ b/tests/unit/job/jobHnadler/jobHandler.spec.ts @@ -41,7 +41,7 @@ describe('JobHandler', () => { /* eslint-enable @typescript-eslint/unbound-method */ }); - it('should handle ConflictError without rejecting or notifying', async () => { + it('should handle ConflictError as unrecoverable error', async () => { const { newJobHandler, queueClientMock, jobTrackerClientMock } = setupJobHandlerTest(); const job = ingestionNewJob; const task = createTasksTaskForIngestionNew; @@ -51,8 +51,8 @@ describe('JobHandler', () => { await newJobHandler['handleError'](error, job, task, telemetry); /* eslint-disable @typescript-eslint/unbound-method */ - expect(queueClientMock.reject).not.toHaveBeenCalled(); - expect(jobTrackerClientMock.notify).not.toHaveBeenCalled(); + expect(queueClientMock.reject).toHaveBeenCalledWith(job.id, task.id, false, error.message); + expect(jobTrackerClientMock.notify).toHaveBeenCalledWith(task); /* eslint-enable @typescript-eslint/unbound-method */ }); });