diff --git a/async/pool.ts b/async/pool.ts index 95ca516b4c4b..2d136e9e865c 100644 --- a/async/pool.ts +++ b/async/pool.ts @@ -89,13 +89,20 @@ export function pooledMap( // Wait until all ongoing events have processed, then close the writer. await Promise.all(executing); writer.close(); - } catch { + } catch (iterError) { const errors = []; for (const result of await Promise.allSettled(executing)) { if (result.status === "rejected") { errors.push(result.reason); } } + // The catch fires both when a transformation rejects and when the + // input iterable itself throws. When it's a transformation rejection + // the same reason is already in `executing`, but when the iterable + // throws the original error would otherwise be swallowed, leaving + // callers with an empty AggregateError (see #6716). Add it only if + // it isn't already accounted for. + if (!errors.includes(iterError)) errors.push(iterError); writer.write(Promise.reject( new AggregateError(errors, ERROR_WHILE_MAPPING_MESSAGE), )).catch(() => {}); diff --git a/async/pool_test.ts b/async/pool_test.ts index feeefe5290b8..06eeba300a8b 100644 --- a/async/pool_test.ts +++ b/async/pool_test.ts @@ -2,6 +2,7 @@ import { delay } from "./delay.ts"; import { pooledMap } from "./pool.ts"; import { + assert, assertEquals, assertGreaterOrEqual, assertLess, @@ -78,6 +79,40 @@ Deno.test("pooledMap() returns ordered items", async () => { assertEquals(returned, [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]); }); +Deno.test( + "pooledMap() surfaces errors thrown by the input iterable (#6716)", + async () => { + const sentinel = new Error("Iterator failed on first step!"); + // deno-lint-ignore require-yield + async function* errorThrowing(): AsyncGenerator { + throw sentinel; + } + const results = pooledMap( + 2, + errorThrowing(), + (i: number) => Promise.resolve(i), + ); + let caught: unknown; + try { + for await (const _ of results) { + // drain + } + } catch (e) { + caught = e; + } + assert(caught instanceof AggregateError); + const ag = caught as AggregateError; + assertEquals( + ag.message, + "Cannot complete the mapping as an error was thrown from an item", + ); + // The previous behavior left `errors` empty; the iterable's error was + // swallowed. Now it must be present so callers can introspect it. + assertEquals(ag.errors.length, 1); + assertEquals(ag.errors[0], sentinel); + }, +); + Deno.test("pooledMap() checks browser compat", async () => { // Simulates the environment where Symbol.asyncIterator is not available const asyncIterFunc = ReadableStream.prototype[Symbol.asyncIterator];