All files / streams / unstable_to_byte_stream.ts

100.00% Branches 13/13
100.00% Lines 43/43
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
x49
x49
 
x2305
x6915
x2305
x2305
x2305
x3533
x3533
x3533
x3533
x3533
x160108
x316683
x473260
x473260
x784152
x473260
x160108
 
x160108
x473444
x473444
x473444
 
x627565
x627565
x627565
x627565
 
x627565
x316680
x316996
x316996
x316996
x160108
x315140
x315140
x315140
x3533
x3533
x3534
x3534
x3533
x3533
x2305












































































// Copyright 2018-2025 the Deno authors. MIT license.
// This module is browser compatible.

import type { Uint8Array_ } from "./_types.ts";

/**
 * The function takes a `ReadableStream<Uint8Array>` and wraps it in a BYOB
 * stream if it doesn't already support it.
 *
 * @experimental **UNSTABLE**: New API, yet to be vetted.
 *
 * @example Usage
 * ```ts
 * import { assertEquals } from "@std/assert";
 * import { toByteStream } from "@std/streams/unstable-to-byte-stream";
 *
 * const reader = toByteStream(ReadableStream.from([new Uint8Array(100)]))
 *   .getReader({ mode: "byob" });
 *
 * while (true) {
 *   const { done, value } = await reader.read(new Uint8Array(10), { min: 10 });
 *   if (done) break;
 *   assertEquals(value.length, 10);
 * }
 *
 * reader.releaseLock();
 * ```
 *
 * @param readable The ReadableStream to be wrapped if needed.
 * @returns A BYOB ReadableStream.
 */
export function toByteStream(
  readable: ReadableStream<Uint8Array>,
): ReadableStream<Uint8Array> {
  try {
    const reader = readable.getReader({ mode: "byob" });
    reader.releaseLock();
    return readable;
  } catch {
    const reader = readable.getReader();
    return new ReadableStream({
      type: "bytes",
      autoAllocateChunkSize: 1024,
      async pull(controller) {
        const value = await async function () {
          while (true) {
            const { done, value } = await reader.read();
            if (done) return undefined;
            if (value.length) return value;
          }
        }();

        if (value == undefined) {
          controller.close();
          return controller.byobRequest!.respond(0);
        }

        const buffer = new Uint8Array(
          controller.byobRequest!.view!.buffer,
          controller.byobRequest!.view!.byteOffset,
          controller.byobRequest!.view!.byteLength,
        );
        const size = buffer.length;
        if (value.length > size) {
          buffer.set(value.subarray(0, size));
          controller.byobRequest!.respond(size);
          controller.enqueue(value.subarray(size) as Uint8Array_);
        } else {
          buffer.set(value);
          controller.byobRequest!.respond(value.length);
        }
      },
      async cancel(reason) {
        await reader.cancel(reason);
      },
    });
  }
}