All files / cbor / byte_encoder_stream.ts

85.71% Branches 6/7
94.87% Lines 37/39
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
 
 
x41
x41
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
x41
 
x41
x41
 
 
 
x41
x66
 
 
 
x66
x273
x91
x432
x141
x600
x141
x588
x147
x588
 
 
x128
x128
x273
x66
x66
x66
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
x41
x41
x41
x61
x61
x61
x61
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
x41
x66
x66
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
x41
x66
x66
x41



























































I



















































































































// Copyright 2018-2025 the Deno authors. MIT license.

import { toByteStream } from "@std/streams/unstable-to-byte-stream";
import { numberToArray } from "./_common.ts";

/**
 * A {@link TransformStream} that encodes a {@link ReadableStream<Uint8Array>}
 * into CBOR "Indefinite Length Byte String".
 * [RFC 8949 - Concise Binary Object Representation (CBOR)](https://datatracker.ietf.org/doc/html/rfc8949)
 *
 * **Notice:** Each chunk of the {@link ReadableStream<Uint8Array>} is encoded
 * as its own "Definite Length Byte String" meaning space can be saved if large
 * chunks are pipped through instead of small chunks.
 *
 * @example Usage
 * ```ts
 * import { assert, assertEquals } from "@std/assert";
 * import { concat } from "@std/bytes";
 * import {
 *   CborByteDecodedStream,
 *   CborByteEncoderStream,
 *   CborSequenceDecoderStream,
 * } from "@std/cbor";
 *
 * const rawMessage = new Uint8Array(100);
 *
 * for await (
 *   const value of ReadableStream.from([rawMessage])
 *     .pipeThrough(new CborByteEncoderStream())
 *     .pipeThrough(new CborSequenceDecoderStream())
 * ) {
 *   assert(value instanceof Uint8Array || value instanceof CborByteDecodedStream);
 *   if (value instanceof CborByteDecodedStream) {
 *     assertEquals(concat(await Array.fromAsync(value)), new Uint8Array(100));
 *   } else assertEquals(value, new Uint8Array(100));
 * }
 * ```
 */
export class CborByteEncoderStream
  implements TransformStream<Uint8Array, Uint8Array> {
  #readable: ReadableStream<Uint8Array>;
  #writable: WritableStream<Uint8Array>;
  /**
   * Constructs a new instance.
   */
  constructor() {
    const { readable, writable } = new TransformStream<
      Uint8Array,
      Uint8Array
    >();
    this.#readable = toByteStream(ReadableStream.from(async function* () {
      yield new Uint8Array([0b010_11111]);
      for await (const x of readable) {
        if (x.length < 24) yield new Uint8Array([0b010_00000 + x.length]);
        else if (x.length < 2 ** 8) {
          yield new Uint8Array([0b010_11000, x.length]);
        } else if (x.length < 2 ** 16) {
          yield new Uint8Array([0b010_11001, ...numberToArray(2, x.length)]);
        } else if (x.length < 2 ** 32) {
          yield new Uint8Array([0b010_11010, ...numberToArray(4, x.length)]);
        } // Can safely assume `x.length < 2 ** 64` as JavaScript doesn't support a `Uint8Array` being that large.
        else yield new Uint8Array([0b010_11011, ...numberToArray(8, x.length)]);
        yield x;
      }
      yield new Uint8Array([0b111_11111]);
    }()));
    this.#writable = writable;
  }

  /**
   * Creates a {@link CborByteEncoderStream} instance from an iterable of
   * {@link Uint8Array} chunks.
   *
   * @example Usage
   * ```ts
   * import { assert, assertEquals } from "@std/assert";
   * import { concat } from "@std/bytes";
   * import {
   *   CborByteDecodedStream,
   *   CborByteEncoderStream,
   *   CborSequenceDecoderStream,
   * } from "@std/cbor";
   *
   * const rawMessage = new Uint8Array(100);
   *
   * for await (
   *   const value of CborByteEncoderStream.from([rawMessage])
   *     .readable
   *     .pipeThrough(new CborSequenceDecoderStream())
   * ) {
   *   assert(value instanceof Uint8Array || value instanceof CborByteDecodedStream);
   *   if (value instanceof CborByteDecodedStream) {
   *     assertEquals(concat(await Array.fromAsync(value)), new Uint8Array(100));
   *   } else assertEquals(value, new Uint8Array(100));
   * }
   * ```
   *
   * @param asyncIterable The value to encode of type
   * {@link AsyncIterable<Uint8Array>} or {@link Iterable<Uint8Array>}.
   * @returns A {@link CborByteEncoderStream} instance of the encoded data.
   */
  static from(
    asyncIterable: AsyncIterable<Uint8Array> | Iterable<Uint8Array>,
  ): CborByteEncoderStream {
    const encoder = new CborByteEncoderStream();
    ReadableStream.from(asyncIterable).pipeTo(encoder.writable);
    return encoder;
  }

  /**
   * The {@link ReadableStream<Uint8Array>} associated with the instance, which
   * provides the encoded CBOR data as {@link Uint8Array} chunks.
   *
   * @example Usage
   * ```ts
   * import { assert, assertEquals } from "@std/assert";
   * import { concat } from "@std/bytes";
   * import {
   *   CborByteDecodedStream,
   *   CborByteEncoderStream,
   *   CborSequenceDecoderStream,
   * } from "@std/cbor";
   *
   * const rawMessage = new Uint8Array(100);
   *
   * for await (
   *   const value of ReadableStream.from([rawMessage])
   *     .pipeThrough(new CborByteEncoderStream())
   *     .pipeThrough(new CborSequenceDecoderStream())
   * ) {
   *   assert(value instanceof Uint8Array || value instanceof CborByteDecodedStream);
   *   if (value instanceof CborByteDecodedStream) {
   *     assertEquals(concat(await Array.fromAsync(value)), new Uint8Array(100));
   *   } else assertEquals(value, new Uint8Array(100));
   * }
   * ```
   *
   * @returns A {@link ReadableStream<Uint8Array>}.
   */
  get readable(): ReadableStream<Uint8Array> {
    return this.#readable;
  }

  /**
   * The {@link WritableStream<Uint8Array>} associated with the instance, which
   * accepts {@link Uint8Array} chunks to be encoded into CBOR format.
   *
   * @example Usage
   * ```ts
   * import { assert, assertEquals } from "@std/assert";
   * import { concat } from "@std/bytes";
   * import {
   *   CborByteDecodedStream,
   *   CborByteEncoderStream,
   *   CborSequenceDecoderStream,
   * } from "@std/cbor";
   *
   * const rawMessage = new Uint8Array(100);
   *
   * for await (
   *   const value of ReadableStream.from([rawMessage])
   *     .pipeThrough(new CborByteEncoderStream())
   *     .pipeThrough(new CborSequenceDecoderStream())
   * ) {
   *   assert(value instanceof Uint8Array || value instanceof CborByteDecodedStream);
   *   if (value instanceof CborByteDecodedStream) {
   *     assertEquals(concat(await Array.fromAsync(value)), new Uint8Array(100));
   *   } else assertEquals(value, new Uint8Array(100));
   * }
   * ```
   *
   * @returns a {@link WritableStream<Uint8Array>}.
   */
  get writable(): WritableStream<Uint8Array> {
    return this.#writable;
  }
}