@@ -180,7 +180,7 @@ export class DuplicateNotificationError extends Error {
180180 *
181181 * Note: Requires MongoDB replica set for transactions to work.
182182 */
183- export const process_notifications = async ( notifications : notification [ ] ) : Promise < { notification_ids : mongoose . Types . ObjectId [ ] , created_count : number , duplicate_count : number , duplicate_keys : { request_id : string ; channel : string } [ ] | undefined } > => {
183+ export const process_notifications = async ( notifications : notification [ ] ) : Promise < { notification_ids : mongoose . Types . ObjectId [ ] , created_count : number , duplicate_count : number , duplicate_keys : { request_id : string ; channel : string } [ ] | undefined } > => {
184184 const session = await mongoose . startSession ( ) ;
185185
186186 try {
@@ -218,6 +218,17 @@ export const process_notifications = async (notifications: notification[]): Prom
218218 outbox_entries . push ( outbox_entry ) ;
219219 }
220220
221+ // Check if ALL notifications are duplicates BEFORE committing
222+ // This must happen before commit to avoid throwing after commitTransaction
223+ if ( duplicate_keys . length > 0 && duplicate_keys . length === notifications . length ) {
224+ // No new notifications to save, abort the transaction
225+ await session . abortTransaction ( ) ;
226+ throw new DuplicateNotificationError (
227+ 'All notifications are duplicates' ,
228+ duplicate_keys
229+ ) ;
230+ }
231+
221232 // Insert outbox entries in bulk within transaction
222233 if ( outbox_entries . length > 0 ) {
223234 await outbox_model . insertMany ( outbox_entries , { session } ) ;
@@ -226,14 +237,8 @@ export const process_notifications = async (notifications: notification[]): Prom
226237 // Commit transaction - both succeed or both fail
227238 await session . commitTransaction ( ) ;
228239
229- // Handle duplicates after successful commit
240+ // Log partial duplicates (some succeeded, some were duplicates)
230241 if ( duplicate_keys . length > 0 ) {
231- if ( duplicate_keys . length === notifications . length ) {
232- throw new DuplicateNotificationError (
233- 'All notifications are duplicates' ,
234- duplicate_keys
235- ) ;
236- }
237242 logger . warn ( `Skipped ${ duplicate_keys . length } duplicate notifications` ) ;
238243 }
239244
@@ -245,8 +250,10 @@ export const process_notifications = async (notifications: notification[]): Prom
245250 } ;
246251
247252 } catch ( error ) {
248- // Abort transaction on any error - rolls back all changes
249- await session . abortTransaction ( ) ;
253+ // Only abort if transaction is still in progress (not already committed or aborted)
254+ if ( session . inTransaction ( ) ) {
255+ await session . abortTransaction ( ) ;
256+ }
250257 throw error ;
251258 } finally {
252259 // Always end the session
0 commit comments