All files / streams / to_transform_stream.ts

100.00% Branches 23/23
100.00% Functions 3/3
100.00% Lines 46/46
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
x19
x19
x19
x19
 
x74
x74
x74
x74
 
x74
x74
x74
x74
x74
x3
x3
 
x3
x71
x71
x71
x71
x212
x212
x212
x212
 
 
x6
x6
x6
x6
x212
x62
x62
x62
x144
x71
x71
 
x3
x3
x3
x3
 
x2
x3
x3
x3
x71
x71
x74




























































































































// Copyright 2018-2026 the Deno authors. MIT license.
// This module is browser compatible.

/**
 * Convert the generator function into a {@linkcode TransformStream}.
 *
 * @typeparam I The type of the chunks in the source stream.
 * @typeparam O The type of the chunks in the transformed stream.
 * @param transformer A function to transform. Must return an iterable or async iterable.
 * @param writableStrategy An object that optionally defines a queuing strategy
 * for the stream's internal buffer between source and transformer.
 * @param readableStrategy An object that optionally defines a queuing strategy
 * for the stream's output buffer.
 * @returns A {@linkcode TransformStream} that transforms the source stream as defined by the provided transformer.
 *
 * @throws {TypeError} If `transformer` does not return an iterable or async iterable.
 *
 * When the output stream is cancelled, the cancellation is propagated to both
 * the iterator (via `throw()`) and the source readable stream.
 *
 * When the iterator throws an error, the error is propagated to both the output
 * readable stream and the source readable stream (via `cancel()`).
 *
 * @example Build a transform stream that multiplies each value by 100
 * ```ts
 * import { toTransformStream } from "@std/streams/to-transform-stream";
 * import { assertEquals } from "@std/assert";
 *
 * const stream = ReadableStream.from([0, 1, 2])
 *   .pipeThrough(toTransformStream(async function* (src) {
 *     for await (const chunk of src) {
 *       yield chunk * 100;
 *     }
 *   }));
 *
 * assertEquals(
 *   await Array.fromAsync(stream),
 *   [0, 100, 200],
 * );
 * ```
 *
 * @example JSON Lines
 * ```ts
 * import { TextLineStream } from "@std/streams/text-line-stream";
 * import { toTransformStream } from "@std/streams/to-transform-stream";
 * import { assertEquals } from "@std/assert";
 *
 * const stream = ReadableStream.from([
 *   '{"name": "Alice", "age": ',
 *   '30}\n{"name": "Bob", "age"',
 *   ": 25}\n",
 * ]);
 *
 * type Person = { name: string; age: number };
 *
 * // Split the stream by newline and parse each line as a JSON object
 * const jsonStream = stream.pipeThrough(new TextLineStream())
 *   .pipeThrough(toTransformStream(async function* (src) {
 *     for await (const chunk of src) {
 *       if (chunk.trim().length === 0) {
 *         continue;
 *       }
 *       yield JSON.parse(chunk) as Person;
 *     }
 *   }));
 *
 * assertEquals(
 *   await Array.fromAsync(jsonStream),
 *   [{ "name": "Alice", "age": 30 }, { "name": "Bob", "age": 25 }],
 * );
 * ```
 */
// deno-lint-ignore deno-style-guide/exported-function-args-maximum
export function toTransformStream<I, O>(
  transformer: (src: ReadableStream<I>) => Iterable<O> | AsyncIterable<O>,
  writableStrategy?: QueuingStrategy<I>,
  readableStrategy?: QueuingStrategy<O>,
): TransformStream<I, O> {
  const {
    writable,
    readable,
  } = new TransformStream<I, I>(undefined, writableStrategy);

  const iterable = transformer(readable);
  const iterator: Iterator<O> | AsyncIterator<O> | undefined =
    (iterable as AsyncIterable<O> | null)?.[Symbol.asyncIterator]?.() ??
      (iterable as Iterable<O> | null)?.[Symbol.iterator]?.();
  if (!iterator) {
    throw new TypeError(
      "Transformer must return an iterable or async iterable",
    );
  }
  return {
    writable,
    readable: new ReadableStream<O>({
      async pull(controller) {
        let result: IteratorResult<O>;
        try {
          result = await iterator.next();
        } catch (error) {
          // Propagate error to stream from iterator
          // If the stream status is "errored", it will be thrown, but ignore.
          await readable.cancel(error).catch(() => {});
          controller.error(error);
          return;
        }
        if (result.done) {
          controller.close();
          return;
        }
        controller.enqueue(result.value);
      },
      async cancel(reason) {
        // Propagate cancellation to readable and iterator
        if (typeof iterator.throw === "function") {
          try {
            await iterator.throw(reason);
          } catch {
            /* `iterator.throw()` always throws on site. We catch it. */
          }
        }
        await readable.cancel(reason);
      },
    }, readableStrategy),
  };
}