All files / streams / unstable_abort_stream.ts

100.00% Branches 2/2
100.00% Lines 10/10
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
x2
 
 
 
 
 
x2
x4
x4
x7
x10
x7
x4
x4
x2














































// Copyright 2018-2025 the Deno authors. MIT license.
// This module is browser compatible.

/**
 * A transform stream that accepts a {@linkcode AbortSignal} to easily abort a
 * stream pipeThrough.
 *
 * @experimental **UNSTABLE**: New API, yet to be vetted.
 *
 * @typeparam T The type of the chunks passing through the AbortStream.
 *
 * @example Usage
 * ```ts
 * import { AbortStream } from "@std/streams/unstable-abort-stream";
 * import { assertRejects } from "@std/assert/rejects";
 *
 * const controller = new AbortController();
 * controller.abort(new Error("STOP"));
 *
 * await assertRejects(
 *   async function () {
 *     await new Response(
 *       (await Deno.open("./deno.json"))
 *         .readable
 *         .pipeThrough(new AbortStream(controller.signal)),
 *     )
 *       .bytes();
 *   },
 *   Error,
 *   "STOP",
 * );
 * ```
 */
export class AbortStream<T> extends TransformStream<T, T> {
  /**
   * Constructs a new instance.
   *
   * @param signal The {@linkcode AbortSignal}.
   */
  constructor(signal: AbortSignal) {
    super({
      transform(chunk, controller) {
        if (signal.aborted) controller.error(signal.reason);
        else controller.enqueue(chunk);
      },
    });
  }
}