All files / async / mux_async_iterator.ts

93.75% Branches 15/16
100.00% Functions 4/4
97.67% Lines 42/43
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
x13
x13
x12
 
x12
x13
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
x13
x18
x18
x18
 
x13
x13
 
x67
x67
x67
x15
x67
x50
x50
x67
x2
x2
x67
x67
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
x13
x14
 
x34
 
 
x34
x49
x49
x49
 
x34
x3
x3
x3
 
 
x31
x31
x31
x14
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
x13
x12
x12
x13


































































































































I

































// Copyright 2018-2026 the Deno authors. MIT license.
// This module is browser compatible.

interface TaggedYieldedValue<T> {
  iterator: AsyncIterator<T>;
  value: T;
}

/**
 * Multiplexes multiple async iterators into a single stream. It currently
 * makes an assumption that the final result (the value returned and not
 * yielded from the iterator) does not matter; if there is any result, it is
 * discarded.
 *
 * @example Usage
 * ```ts
 * import { MuxAsyncIterator } from "@std/async/mux-async-iterator";
 * import { assertEquals } from "@std/assert";
 *
 * async function* gen123(): AsyncIterableIterator<number> {
 *   yield 1;
 *   yield 2;
 *   yield 3;
 * }
 *
 * async function* gen456(): AsyncIterableIterator<number> {
 *   yield 4;
 *   yield 5;
 *   yield 6;
 * }
 *
 * const mux = new MuxAsyncIterator<number>();
 * mux.add(gen123());
 * mux.add(gen456());
 *
 * const result = await Array.fromAsync(mux);
 *
 * assertEquals(result, [1, 4, 2, 5, 3, 6]);
 * ```
 *
 * @typeParam T The type of the provided async iterables and generated async iterable.
 */
export class MuxAsyncIterator<T> implements AsyncIterable<T> {
  #iteratorCount = 0;
  #yields: Array<TaggedYieldedValue<T>> = [];
  // deno-lint-ignore no-explicit-any
  #throws: any[] = [];
  #signal = Promise.withResolvers<void>();

  /**
   * Add an async iterable to the stream.
   *
   * @param iterable The async iterable to add.
   *
   * @example Usage
   * ```ts
   * import { MuxAsyncIterator } from "@std/async/mux-async-iterator";
   * import { assertEquals } from "@std/assert";
   *
   * async function* gen123(): AsyncIterableIterator<number> {
   *   yield 1;
   *   yield 2;
   *   yield 3;
   * }
   *
   * const mux = new MuxAsyncIterator<number>();
   * mux.add(gen123());
   *
   * const result = await Array.fromAsync(mux.iterate());
   *
   * assertEquals(result, [1, 2, 3]);
   * ```
   */
  add(iterable: AsyncIterable<T>) {
    ++this.#iteratorCount;
    this.#callIteratorNext(iterable[Symbol.asyncIterator]());
  }

  async #callIteratorNext(
    iterator: AsyncIterator<T>,
  ) {
    try {
      const { value, done } = await iterator.next();
      if (done) {
        --this.#iteratorCount;
      } else {
        this.#yields.push({ iterator, value });
      }
    } catch (e) {
      this.#throws.push(e);
    }
    this.#signal.resolve();
  }

  /**
   * Returns an async iterator of the stream.
   * @returns the async iterator for all the added async iterables.
   *
   * @example Usage
   * ```ts
   * import { MuxAsyncIterator } from "@std/async/mux-async-iterator";
   * import { assertEquals } from "@std/assert";
   *
   * async function* gen123(): AsyncIterableIterator<number> {
   *   yield 1;
   *   yield 2;
   *   yield 3;
   * }
   *
   * const mux = new MuxAsyncIterator<number>();
   * mux.add(gen123());
   *
   * const result = await Array.fromAsync(mux.iterate());
   *
   * assertEquals(result, [1, 2, 3]);
   * ```
   */
  async *iterate(): AsyncIterableIterator<T> {
    while (this.#iteratorCount > 0) {
      // Sleep until any of the wrapped iterators yields.
      await this.#signal.promise;

      // Note that while we're looping over `yields`, new items may be added.
      for (const { iterator, value } of this.#yields) {
        yield value;
        this.#callIteratorNext(iterator);
      }

      if (this.#throws.length) {
        for (const e of this.#throws) {
          throw e;
        }
      }
      // Clear the `yields` list and reset the `signal` promise.
      this.#yields.length = 0;
      this.#signal = Promise.withResolvers<void>();
    }
  }

  /**
   * Implements an async iterator for the stream.
   * @returns the async iterator for all the added async iterables.
   *
   * @example Usage
   * ```ts
   * import { MuxAsyncIterator } from "@std/async/mux-async-iterator";
   * import { assertEquals } from "@std/assert";
   *
   * async function* gen123(): AsyncIterableIterator<number> {
   *   yield 1;
   *   yield 2;
   *   yield 3;
   * }
   *
   * const mux = new MuxAsyncIterator<number>();
   * mux.add(gen123());
   *
   * const result = await Array.fromAsync(mux);
   *
   * assertEquals(result, [1, 2, 3]);
   * ```
   */
  [Symbol.asyncIterator](): AsyncIterator<T> {
    return this.iterate();
  }
}