All files / async / pool.ts

100.00% Branches 31/31
100.00% Functions 2/2
100.00% Lines 67/67
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
 
 
 
 
x8
x8
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
x8
x8
x8
x8
 
x10
x5
x5
 
x5
x5
x5
x5
 
x23
x23
x20
x23
x3
x3
x3
x3
x1
x1
x3
x23
x5
 
x5
x5
x5
x5
x5
x22
 
 
 
 
 
x22
x22
x22
 
x22
x22
x15
x14
x22
 
x4
x4
x5
x1
x1
x3
x2
x2
x3
x1
x1
x1
x1
x5
 
x10
x10
x10
x10
x1
x1
x4
x4
x3
x3
x1
x10
x10



















































































































// Copyright 2018-2026 the Deno authors. MIT license.
// This module is browser compatible.

/** Error message emitted from the thrown error while mapping. */
const ERROR_WHILE_MAPPING_MESSAGE =
  "Cannot complete the mapping as an error was thrown from an item";

/**
 * pooledMap transforms values from an (async) iterable into another async
 * iterable. The transforms are done concurrently, with a max concurrency
 * defined by the poolLimit.
 *
 * If an error is thrown from `iterableFn`, no new transformations will begin.
 * All currently executing transformations are allowed to finish and still
 * yielded on success. After that, the rejections among them are gathered and
 * thrown by the iterator in an `AggregateError`.
 *
 * @example Usage
 * ```ts
 * import { pooledMap } from "@std/async/pool";
 * import { assertEquals } from "@std/assert";
 *
 * const results = pooledMap(
 *   2,
 *   [1, 2, 3],
 *   (i) => new Promise((r) => setTimeout(() => r(i), 1000)),
 * );
 *
 * assertEquals(await Array.fromAsync(results), [1, 2, 3]);
 * ```
 *
 * @typeParam T the input type.
 * @typeParam R the output type.
 * @param poolLimit The maximum count of items being processed concurrently.
 * Must be a positive integer.
 * @param array The input array for mapping.
 * @param iteratorFn The function to call for every item of the array.
 * @returns The async iterator with the transformed values.
 * @throws {RangeError} If `poolLimit` is not a positive integer.
 */
export function pooledMap<T, R>(
  poolLimit: number,
  array: Iterable<T> | AsyncIterable<T>,
  iteratorFn: (data: T) => Promise<R>,
): AsyncIterableIterator<R> {
  if (!Number.isInteger(poolLimit) || poolLimit < 1) {
    throw new RangeError("'poolLimit' must be a positive integer");
  }

  const res = new TransformStream<Promise<R>, R>({
    async transform(
      p: Promise<R>,
      controller: TransformStreamDefaultController<R>,
    ) {
      try {
        const s = await p;
        controller.enqueue(s);
      } catch (e) {
        if (
          e instanceof AggregateError &&
          e.message === ERROR_WHILE_MAPPING_MESSAGE
        ) {
          controller.error(e as unknown);
        }
      }
    },
  });
  // Start processing items from the iterator
  (async () => {
    const writer = res.writable.getWriter();
    const executing: Array<Promise<unknown>> = [];
    try {
      for await (const item of array) {
        const p = Promise.resolve().then(() => iteratorFn(item));
        // Only write on success. If we `writer.write()` a rejected promise,
        // that will end the iteration. We don't want that yet. Instead let it
        // fail the race, taking us to the catch block where all currently
        // executing jobs are allowed to finish and all rejections among them
        // can be reported together.
        writer.write(p);
        const e: Promise<unknown> = p.then(() =>
          executing.splice(executing.indexOf(e), 1)
        );
        executing.push(e);
        if (executing.length >= poolLimit) {
          await Promise.race(executing);
        }
      }
      // Wait until all ongoing events have processed, then close the writer.
      await Promise.all(executing);
      writer.close();
    } catch {
      const errors = [];
      for (const result of await Promise.allSettled(executing)) {
        if (result.status === "rejected") {
          errors.push(result.reason);
        }
      }
      writer.write(Promise.reject(
        new AggregateError(errors, ERROR_WHILE_MAPPING_MESSAGE),
      )).catch(() => {});
    }
  })();
  // Feature test until browser coverage is adequate
  return Symbol.asyncIterator in res.readable &&
      typeof res.readable[Symbol.asyncIterator] === "function"
    ? (res.readable[Symbol.asyncIterator] as () => AsyncIterableIterator<R>)()
    : (async function* () {
      const reader = res.readable.getReader();
      while (true) {
        const { done, value } = await reader.read();
        if (done) break;
        yield value;
      }
      reader.releaseLock();
    })();
}