1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189 |
x2
x2
x2
x2
x14
x14
x5
x5
x5
x9
x9
x9
x9
x9
x9
x30
x30
x2
x2
x2
x28
x30
x9
x9
x9
x9
x9
x9
x17
x2
x2
x2
x2
x2
x2
x2
x17
x9
x28
x28
x28
x28
x25
x25
x28
x28
x28
x9
x9
x9
x28
x28
x28
x28
x28
x28
x28
x17
x16
x28
x6
x6
x9
x3
x3
x2
x3
x1
x1
x3
x9
x14
x14
x14
x14
x1
x1
x4
x4
x3
x3
x1
x14
x14 |
|
// Copyright 2018-2026 the Deno authors. MIT license.
// This module is browser compatible.
/** Options for {@linkcode pooledMapSettled}. */
export interface PooledMapSettledOptions {
/**
* The maximum count of items being processed concurrently. Must be a
* positive integer.
*/
poolLimit: number;
/**
* An AbortSignal to cancel the pooled mapping operation.
*
* If the signal is aborted, no new items will begin processing. All currently
* executing items are allowed to finish and their settled results are yielded.
* The iterator then rejects with the signal's reason.
*
* @default {undefined}
*/
signal?: AbortSignal;
}
/**
* Like {@linkcode pooledMap}, but does not fail fast. Every item is processed
* regardless of earlier failures. Results are yielded as
* {@linkcode PromiseSettledResult} objects in input order.
*
* The relationship to `pooledMap` mirrors `Promise.allSettled` vs `Promise.all`.
*
* If the input iterable itself throws, all currently executing items are
* allowed to finish and their settled results are yielded, then the iterator
* closes. The error from the input iterable is not propagated to the consumer.
*
* @experimental **UNSTABLE**: New API, yet to be vetted.
*
* @example Usage
* ```ts
* import { pooledMapSettled } from "@std/async/unstable-pool-settled";
* import { assertEquals } from "@std/assert";
*
* const results = pooledMapSettled(
* [1, 2, 3],
* (i) => {
* if (i === 2) throw new Error("bad");
* return Promise.resolve(i);
* },
* { poolLimit: 2 },
* );
*
* const settled = await Array.fromAsync(results);
* assertEquals(settled.length, 3);
* assertEquals(settled[0], { status: "fulfilled", value: 1 });
* assertEquals(settled[1]!.status, "rejected");
* assertEquals(settled[2], { status: "fulfilled", value: 3 });
* ```
*
* @example With AbortSignal
* ```ts no-assert ignore
* import { pooledMapSettled } from "@std/async/unstable-pool-settled";
*
* const results = pooledMapSettled([1, 2, 3], async (i) => {
* await new Promise((r) => setTimeout(r, 1000));
* return i;
* }, { poolLimit: 2, signal: AbortSignal.timeout(5_000) });
*
* for await (const result of results) {
* console.log(result);
* }
* ```
*
* @typeParam T the input type.
* @typeParam R the output type.
* @param array The input iterable.
* @param iteratorFn The transform function (sync or async).
* @param options Configuration for concurrency and cancellation.
* @returns An async iterator yielding `PromiseSettledResult<R>` for each item,
* in the order items were yielded from the input.
* @throws {RangeError} If `poolLimit` is not a positive integer.
*/
export function pooledMapSettled<T, R>(
array: Iterable<T> | AsyncIterable<T>,
iteratorFn: (data: T) => R | Promise<R>,
options: PooledMapSettledOptions,
): AsyncIterableIterator<PromiseSettledResult<R>> {
const { poolLimit, signal } = options;
if (!Number.isInteger(poolLimit) || poolLimit < 1) {
throw new RangeError(
`Cannot pool as 'poolLimit' must be a positive integer: received ${poolLimit}`,
);
}
type Settled = PromiseSettledResult<R>;
const ABORT_SENTINEL = Symbol();
const res = new TransformStream<
Promise<Settled | typeof ABORT_SENTINEL>,
Settled
>({
async transform(
p: Promise<Settled | typeof ABORT_SENTINEL>,
controller: TransformStreamDefaultController<Settled>,
) {
const result = await p;
if (result === ABORT_SENTINEL) {
controller.error(signal?.reason);
return;
}
controller.enqueue(result);
},
});
(async () => {
const writer = res.writable.getWriter();
const executing: Array<Promise<unknown>> = [];
function raceWithSignal(
promises: Array<Promise<unknown>>,
): Promise<unknown> {
if (!signal) return Promise.race(promises);
const { promise, resolve, reject } = Promise.withResolvers<never>();
const onAbort = () => reject(signal.reason);
signal.addEventListener("abort", onAbort, { once: true });
return Promise.race([...promises, promise]).finally(() => {
signal.removeEventListener("abort", onAbort);
resolve(undefined as never);
});
}
function settle(fn: () => R | Promise<R>): Promise<Settled> {
return Promise.resolve()
.then(fn)
.then(
(value): PromiseFulfilledResult<R> => ({
status: "fulfilled",
value,
}),
(reason): PromiseRejectedResult => ({ status: "rejected", reason }),
);
}
try {
signal?.throwIfAborted();
for await (const item of array) {
signal?.throwIfAborted();
const p = settle(() => iteratorFn(item));
writer.write(p);
const e: Promise<unknown> = p.then(() =>
executing.splice(executing.indexOf(e), 1)
);
executing.push(e);
if (executing.length >= poolLimit) {
await raceWithSignal(executing);
}
}
await Promise.all(executing);
writer.close();
} catch {
// Wait for in-flight work so their settled results are still yielded in
// order, then write a sentinel that causes the stream to error with the
// abort reason.
await Promise.all(executing).catch(() => {});
if (signal?.aborted) {
writer.write(Promise.resolve(ABORT_SENTINEL)).catch(() => {});
} else {
writer.close();
}
}
})();
// Feature test until browser coverage is adequate
return Symbol.asyncIterator in res.readable &&
typeof res.readable[Symbol.asyncIterator] === "function"
? (res.readable[Symbol.asyncIterator] as () => AsyncIterableIterator<
Settled
>)()
: (async function* () {
const reader = res.readable.getReader();
while (true) {
const { done, value } = await reader.read();
if (done) break;
yield value;
}
reader.releaseLock();
})();
}
|