All files / streams / unstable_to_byte_stream.ts

100.00% Branches 16/16
100.00% Lines 42/42
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
x47
x47
 
x240
x720
x240
x240
x240
x240
x240
x240
x240
x88483
x176726
x264972
x264972
x441274
x264972
x88483
 
x88483
x176915
x176915
x176915
 
x88483
x89035
x89035
x89035
x89035
x89283
x89283
x89283
x89035
x89339
x89339
x89339
x88483
x240
x240
x241
x241
x240
x240
x240








































































// Copyright 2018-2025 the Deno authors. MIT license.
// This module is browser compatible.

/**
 * The function takes a `ReadableStream<Uint8Array>` and wraps it in a BYOB
 * stream if it doesn't already support it.
 *
 * @experimental **UNSTABLE**: New API, yet to be vetted.
 *
 * @example Usage
 * ```ts
 * import { assertEquals } from "@std/assert";
 * import { toByteStream } from "@std/streams/unstable-to-byte-stream";
 *
 * const reader = toByteStream(ReadableStream.from([new Uint8Array(100)]))
 *   .getReader({ mode: "byob" });
 *
 * while (true) {
 *   const { done, value } = await reader.read(new Uint8Array(10), { min: 10 });
 *   if (done) break;
 *   assertEquals(value.length, 10);
 * }
 *
 * reader.releaseLock();
 * ```
 *
 * @param readable The ReadableStream to be wrapped if needed.
 * @returns A BYOB ReadableStream.
 */
export function toByteStream(
  readable: ReadableStream<Uint8Array>,
): ReadableStream<Uint8Array> {
  try {
    const reader = readable.getReader({ mode: "byob" });
    reader.releaseLock();
    return readable;
  } catch {
    const reader = readable.getReader();
    return new ReadableStream({
      type: "bytes",
      async pull(controller) {
        const value = await async function () {
          while (true) {
            const { done, value } = await reader.read();
            if (done) return undefined;
            if (value.length) return value;
          }
        }();

        if (value == undefined) {
          controller.close();
          return controller.byobRequest?.respond(0);
        }

        if (controller.byobRequest?.view) {
          const buffer = new Uint8Array(controller.byobRequest.view.buffer);
          const offset = controller.byobRequest.view.byteOffset;
          const size = buffer.length - offset;
          if (value.length > size) {
            buffer.set(value.slice(0, size), offset);
            controller.byobRequest.respond(size);
            controller.enqueue(value.slice(size));
          } else {
            buffer.set(value, offset);
            controller.byobRequest.respond(value.length);
          }
        } else controller.enqueue(value);
      },
      async cancel(reason) {
        await reader.cancel(reason);
      },
    });
  }
}