All files / streams / unstable_to_byte_stream.ts

100.00% Branches 13/13
100.00% Lines 43/43
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
x49
x49
 
x2305
x6915
x2305
x2305
x2305
x3533
x3533
x3533
x3533
x3533
x171777
x340021
x508270
x508270
x842512
x508270
x171777
 
x171777
x508451
x508451
x508451
 
x674241
x674241
x674241
x674241
 
x674241
x340018
x340335
x340335
x340335
x171777
x338477
x338477
x338477
x3533
x3533
x3534
x3534
x3533
x3533
x2305












































































// Copyright 2018-2025 the Deno authors. MIT license.
// This module is browser compatible.

import type { Uint8Array_ } from "./_types.ts";

/**
 * The function takes a `ReadableStream<Uint8Array>` and wraps it in a BYOB
 * stream if it doesn't already support it.
 *
 * @experimental **UNSTABLE**: New API, yet to be vetted.
 *
 * @example Usage
 * ```ts
 * import { assertEquals } from "@std/assert";
 * import { toByteStream } from "@std/streams/unstable-to-byte-stream";
 *
 * const reader = toByteStream(ReadableStream.from([new Uint8Array(100)]))
 *   .getReader({ mode: "byob" });
 *
 * while (true) {
 *   const { done, value } = await reader.read(new Uint8Array(10), { min: 10 });
 *   if (done) break;
 *   assertEquals(value.length, 10);
 * }
 *
 * reader.releaseLock();
 * ```
 *
 * @param readable The ReadableStream to be wrapped if needed.
 * @returns A BYOB ReadableStream.
 */
export function toByteStream(
  readable: ReadableStream<Uint8Array>,
): ReadableStream<Uint8Array> {
  try {
    const reader = readable.getReader({ mode: "byob" });
    reader.releaseLock();
    return readable;
  } catch {
    const reader = readable.getReader();
    return new ReadableStream({
      type: "bytes",
      autoAllocateChunkSize: 1024,
      async pull(controller) {
        const value = await async function () {
          while (true) {
            const { done, value } = await reader.read();
            if (done) return undefined;
            if (value.length) return value;
          }
        }();

        if (value == undefined) {
          controller.close();
          return controller.byobRequest!.respond(0);
        }

        const buffer = new Uint8Array(
          controller.byobRequest!.view!.buffer,
          controller.byobRequest!.view!.byteOffset,
          controller.byobRequest!.view!.byteLength,
        );
        const size = buffer.length;
        if (value.length > size) {
          buffer.set(value.subarray(0, size));
          controller.byobRequest!.respond(size);
          controller.enqueue(value.subarray(size) as Uint8Array_);
        } else {
          buffer.set(value);
          controller.byobRequest!.respond(value.length);
        }
      },
      async cancel(reason) {
        await reader.cancel(reason);
      },
    });
  }
}