All files / cbor / text_encoder_stream.ts

85.71% Branches 6/7
95.12% Lines 39/41
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
 
 
x41
x41
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
x41
 
x41
x41
 
 
 
x41
x59
x59
x231
x77
x77
x107
x357
x119
x508
x119
x500
x125
x500
 
 
x107
x107
x231
x59
x59
x59
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
x41
x41
x41
x54
x54
x54
x54
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
x41
x59
x59
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
x41
x59
x59
x41

























































I
















































































































// Copyright 2018-2025 the Deno authors. MIT license.

import { toByteStream } from "@std/streams/unstable-to-byte-stream";
import { numberToArray } from "./_common.ts";

/**
 * A {@link TransformStream} that encodes a {@link ReadableStream<string>} into
 * CBOR "Indefinite Length Text String".
 * [RFC 8949 - Concise Binary Object Representation (CBOR)](https://datatracker.ietf.org/doc/html/rfc8949)
 *
 * **Notice:** Each chunk of the {@link ReadableStream<string>} is encoded as
 * its own "Definite Length Text String" meaning space can be saved if large
 * chunks are pipped through instead of small chunks.
 *
 * @example Usage
 * ```ts
 * import { assert, assertEquals } from "@std/assert";
 * import {
 *   CborSequenceDecoderStream,
 *   CborTextDecodedStream,
 *   CborTextEncoderStream,
 * } from "@std/cbor";
 *
 * const rawMessage = "a".repeat(100);
 *
 * for await (
 *   const value of ReadableStream.from([rawMessage])
 *     .pipeThrough(new CborTextEncoderStream())
 *     .pipeThrough(new CborSequenceDecoderStream())
 * ) {
 *   assert(typeof value === "string" || value instanceof CborTextDecodedStream);
 *   if (value instanceof CborTextDecodedStream) {
 *     assertEquals((await Array.fromAsync(value)).join(""), rawMessage);
 *   } else assertEquals(value, rawMessage);
 * }
 * ```
 */
export class CborTextEncoderStream
  implements TransformStream<string, Uint8Array> {
  #readable: ReadableStream<Uint8Array>;
  #writable: WritableStream<string>;
  /**
   * Constructs a new instance.
   */
  constructor() {
    const { readable, writable } = new TransformStream<string, string>();
    this.#readable = toByteStream(ReadableStream.from(async function* () {
      yield new Uint8Array([0b011_11111]);
      const textEncoder = new TextEncoder();
      for await (const x of readable) {
        const y = textEncoder.encode(x);
        if (y.length < 24) yield new Uint8Array([0b011_00000 + y.length]);
        else if (y.length < 2 ** 8) {
          yield new Uint8Array([0b011_11000, y.length]);
        } else if (y.length < 2 ** 16) {
          yield new Uint8Array([0b011_11001, ...numberToArray(2, y.length)]);
        } else if (y.length < 2 ** 32) {
          yield new Uint8Array([0b011_11010, ...numberToArray(4, y.length)]);
        } // Can safely assume `x.length < 2 ** 64` as JavaScript doesn't support a `Uint8Array` being that large.
        else yield new Uint8Array([0b011_11011, ...numberToArray(8, y.length)]);
        yield y;
      }
      yield new Uint8Array([0b111_11111]);
    }()));
    this.#writable = writable;
  }

  /**
   * Creates a {@link CborTextEncoderStream} instance from an iterable of
   * {@link string} chunks.
   *
   * @example Usage
   * ```ts
   * import { assert, assertEquals } from "@std/assert";
   * import {
   *   CborSequenceDecoderStream,
   *   CborTextDecodedStream,
   *   CborTextEncoderStream,
   * } from "@std/cbor";
   *
   * const rawMessage = "a".repeat(100);
   *
   * for await (
   *   const value of CborTextEncoderStream.from([rawMessage])
   *     .readable
   *     .pipeThrough(new CborSequenceDecoderStream())
   * ) {
   *   assert(typeof value === "string" || value instanceof CborTextDecodedStream);
   *   if (value instanceof CborTextDecodedStream) {
   *     assertEquals((await Array.fromAsync(value)).join(""), rawMessage);
   *   } else assertEquals(value, rawMessage);
   * }
   * ```
   *
   * @param asyncIterable The value to encode of type
   * {@link AsyncIterable<string>} or {@link Iterable<string>}
   * @returns A {@link CborTextEncoderStream} instance of the encoded data.
   */
  static from(
    asyncIterable: AsyncIterable<string> | Iterable<string>,
  ): CborTextEncoderStream {
    const encoder = new CborTextEncoderStream();
    ReadableStream.from(asyncIterable).pipeTo(encoder.writable);
    return encoder;
  }

  /**
   * The {@link ReadableStream<Uint8Array>} associated with the instance, which
   * provides the encoded CBOR data as {@link Uint8Array} chunks.
   *
   * @example Usage
   * ```ts
   * import { assert, assertEquals } from "@std/assert";
   * import {
   *   CborSequenceDecoderStream,
   *   CborTextDecodedStream,
   *   CborTextEncoderStream,
   * } from "@std/cbor";
   *
   * const rawMessage = "a".repeat(100);
   *
   * for await (
   *   const value of ReadableStream.from([rawMessage])
   *     .pipeThrough(new CborTextEncoderStream())
   *     .pipeThrough(new CborSequenceDecoderStream())
   * ) {
   *   assert(typeof value === "string" || value instanceof CborTextDecodedStream);
   *   if (value instanceof CborTextDecodedStream) {
   *     assertEquals((await Array.fromAsync(value)).join(""), rawMessage);
   *   } else assertEquals(value, rawMessage);
   * }
   * ```
   *
   * @returns A {@link ReadableStream<Uint8Array>}.
   */
  get readable(): ReadableStream<Uint8Array> {
    return this.#readable;
  }

  /**
   * The {@link WritableStream<string>} associated with the instance, which
   * accepts {@link string} chunks to be encoded into CBOR format.
   *
   * @example Usage
   * ```ts
   * import { assert, assertEquals } from "@std/assert";
   * import {
   *   CborSequenceDecoderStream,
   *   CborTextDecodedStream,
   *   CborTextEncoderStream,
   * } from "@std/cbor";
   *
   * const rawMessage = "a".repeat(100);
   *
   * for await (
   *   const value of ReadableStream.from([rawMessage])
   *     .pipeThrough(new CborTextEncoderStream())
   *     .pipeThrough(new CborSequenceDecoderStream())
   * ) {
   *   assert(typeof value === "string" || value instanceof CborTextDecodedStream);
   *   if (value instanceof CborTextDecodedStream) {
   *     assertEquals((await Array.fromAsync(value)).join(""), rawMessage);
   *   } else assertEquals(value, rawMessage);
   * }
   * ```
   *
   * @returns A {@link WritableStream<string>}.
   */
  get writable(): WritableStream<string> {
    return this.#writable;
  }
}