All files / streams / unstable_to_byte_stream.ts

100.00% Branches 22/22
100.00% Functions 3/3
100.00% Lines 43/43
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
x64
x64
 
x2292
x2292
x2292
x2292
x2292
x1255
x1255
x1255
x1255
x1255
x160138
x160138
x160141
x160141
x158892
x160141
x160138
 
x160138
x1253
x1253
x1253
 
x158884
x158884
x158884
x158884
 
x158884
x160130
x329
x329
x329
x160138
x158555
x158555
x158555
x1255
x1255
x1
x1
x1255
x1255
x2292












































































// Copyright 2018-2026 the Deno authors. MIT license.
// This module is browser compatible.

import type { Uint8Array_ } from "./_types.ts";

/**
 * The function takes a `ReadableStream<Uint8Array>` and wraps it in a BYOB
 * stream if it doesn't already support it.
 *
 * @experimental **UNSTABLE**: New API, yet to be vetted.
 *
 * @example Usage
 * ```ts
 * import { assertEquals } from "@std/assert";
 * import { toByteStream } from "@std/streams/unstable-to-byte-stream";
 *
 * const reader = toByteStream(ReadableStream.from([new Uint8Array(100)]))
 *   .getReader({ mode: "byob" });
 *
 * while (true) {
 *   const { done, value } = await reader.read(new Uint8Array(10), { min: 10 });
 *   if (done) break;
 *   assertEquals(value.length, 10);
 * }
 *
 * reader.releaseLock();
 * ```
 *
 * @param readable The ReadableStream to be wrapped if needed.
 * @returns A BYOB ReadableStream.
 */
export function toByteStream(
  readable: ReadableStream<Uint8Array>,
): ReadableStream<Uint8Array> {
  try {
    const reader = readable.getReader({ mode: "byob" });
    reader.releaseLock();
    return readable;
  } catch {
    const reader = readable.getReader();
    return new ReadableStream({
      type: "bytes",
      autoAllocateChunkSize: 1024,
      async pull(controller) {
        const value = await async function () {
          while (true) {
            const { done, value } = await reader.read();
            if (done) return undefined;
            if (value.length) return value;
          }
        }();

        if (value == undefined) {
          controller.close();
          return controller.byobRequest!.respond(0);
        }

        const buffer = new Uint8Array(
          controller.byobRequest!.view!.buffer,
          controller.byobRequest!.view!.byteOffset,
          controller.byobRequest!.view!.byteLength,
        );
        const size = buffer.length;
        if (value.length > size) {
          buffer.set(value.subarray(0, size));
          controller.byobRequest!.respond(size);
          controller.enqueue(value.subarray(size) as Uint8Array_);
        } else {
          buffer.set(value);
          controller.byobRequest!.respond(value.length);
        }
      },
      async cancel(reason) {
        await reader.cancel(reason);
      },
    });
  }
}