All files / streams / concat_readable_streams.ts

100.00% Branches 5/5
100.00% Lines 22/22
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
x12
x12
 
x18
x18
x18
x62
x62
x62
x75
x79
x79
x84
x84
x92
x92
x18
x18
x19
x19
x19
x18
x18

















































// Copyright 2018-2025 the Deno authors. MIT license.
// This module is browser compatible.

/**
 * Concatenates multiple `ReadableStream`s into a single ordered
 * `ReadableStream`.
 *
 * Cancelling the resulting stream will cancel all the input streams.
 *
 * @typeParam T The type of the chunks in the streams.
 * @param streams An iterable of `ReadableStream`s to concat.
 * @returns A `ReadableStream` that will emit the concatenated chunks.
 *
 * @example Usage
 * ```ts
 * import { concatReadableStreams } from "@std/streams/concat-readable-streams";
 * import { assertEquals } from "@std/assert";
 *
 * const stream1 = ReadableStream.from([1, 2, 3]);
 * const stream2 = ReadableStream.from([4, 5, 6]);
 * const stream3 = ReadableStream.from([7, 8, 9]);
 *
 * assertEquals(
 *   await Array.fromAsync(concatReadableStreams(stream1, stream2, stream3)),
 *   [1, 2, 3, 4, 5, 6, 7, 8, 9],
 * );
 * ```
 */
export function concatReadableStreams<T>(
  ...streams: ReadableStream<T>[]
): ReadableStream<T> {
  let i = 0;
  return new ReadableStream<T>({
    async pull(controller) {
      const reader = streams[i]!.getReader();
      const { done, value } = await reader.read();
      if (done) {
        if (streams.length === ++i) {
          return controller.close();
        }
        return await this.pull!(controller);
      }
      controller.enqueue(value);
      reader.releaseLock();
    },
    async cancel(reason) {
      const promises = streams.map((stream) => stream.cancel(reason));
      await Promise.allSettled(promises);
    },
  });
}