All files / streams / unstable_to_byte_stream.ts

100.00% Branches 22/22
100.00% Functions 3/3
100.00% Lines 43/43
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
x64
x64
 
x2292
x2292
x2292
x2292
x2292
x1255
x1255
x1255
x1255
x1255
x133609
x133609
x133611
x133611
x132360
x133611
x133609
 
x133609
x1253
x1253
x1253
 
x132355
x132355
x132355
x132355
 
x132355
x133601
x321
x321
x321
x133609
x132034
x132034
x132034
x1255
x1255
x1
x1
x1255
x1255
x2292












































































// Copyright 2018-2026 the Deno authors. MIT license.
// This module is browser compatible.

import type { Uint8Array_ } from "./_types.ts";

/**
 * The function takes a `ReadableStream<Uint8Array>` and wraps it in a BYOB
 * stream if it doesn't already support it.
 *
 * @experimental **UNSTABLE**: New API, yet to be vetted.
 *
 * @example Usage
 * ```ts
 * import { assertEquals } from "@std/assert";
 * import { toByteStream } from "@std/streams/unstable-to-byte-stream";
 *
 * const reader = toByteStream(ReadableStream.from([new Uint8Array(100)]))
 *   .getReader({ mode: "byob" });
 *
 * while (true) {
 *   const { done, value } = await reader.read(new Uint8Array(10), { min: 10 });
 *   if (done) break;
 *   assertEquals(value.length, 10);
 * }
 *
 * reader.releaseLock();
 * ```
 *
 * @param readable The ReadableStream to be wrapped if needed.
 * @returns A BYOB ReadableStream.
 */
export function toByteStream(
  readable: ReadableStream<Uint8Array>,
): ReadableStream<Uint8Array> {
  try {
    const reader = readable.getReader({ mode: "byob" });
    reader.releaseLock();
    return readable;
  } catch {
    const reader = readable.getReader();
    return new ReadableStream({
      type: "bytes",
      autoAllocateChunkSize: 1024,
      async pull(controller) {
        const value = await async function () {
          while (true) {
            const { done, value } = await reader.read();
            if (done) return undefined;
            if (value.length) return value;
          }
        }();

        if (value == undefined) {
          controller.close();
          return controller.byobRequest!.respond(0);
        }

        const buffer = new Uint8Array(
          controller.byobRequest!.view!.buffer,
          controller.byobRequest!.view!.byteOffset,
          controller.byobRequest!.view!.byteLength,
        );
        const size = buffer.length;
        if (value.length > size) {
          buffer.set(value.subarray(0, size));
          controller.byobRequest!.respond(size);
          controller.enqueue(value.subarray(size) as Uint8Array_);
        } else {
          buffer.set(value);
          controller.byobRequest!.respond(value.length);
        }
      },
      async cancel(reason) {
        await reader.cancel(reason);
      },
    });
  }
}