1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124 |
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
x15
x15
x15
 
 
 
 
 
 
 
x15
x15
x73
x29
x29
x29
x75
x84
x94
x90
x95
x95
x75
x112
x112
x112
x75
x29
x29
x15 |
|
// Copyright 2018-2025 the Deno authors. MIT license.
// This module is browser compatible.
/** Options for {@linkcode LimitedBytesTransformStream}. */
export interface LimitedBytesTransformStreamOptions {
/**
* If true, a {@linkcode RangeError} is thrown when queueing the current chunk
* would exceed the specified size limit.
*
* @default {false}
*/
error?: boolean;
}
/**
* A {@linkcode TransformStream} that will only read & enqueue chunks until the
* total amount of enqueued data exceeds `size`. The last chunk that would
* exceed the limit will NOT be enqueued, in which case a {@linkcode RangeError}
* is thrown when `options.error` is set to true, otherwise the stream is just
* terminated.
*
* @example `size` is equal to the total byte length of the chunks
* ```ts
* import { LimitedBytesTransformStream } from "@std/streams/limited-bytes-transform-stream";
* import { assertEquals } from "@std/assert";
*
* const stream = ReadableStream.from(["1234", "5678"]);
* const transformed = stream.pipeThrough(new TextEncoderStream()).pipeThrough(
* new LimitedBytesTransformStream(8),
* ).pipeThrough(new TextDecoderStream());
*
* assertEquals(
* await Array.fromAsync(transformed),
* ["1234", "5678"],
* );
* ```
*
* @example `size` is less than the total byte length of the chunks, and at the
* boundary of the chunks
* ```ts
* import { LimitedBytesTransformStream } from "@std/streams/limited-bytes-transform-stream";
* import { assertEquals } from "@std/assert";
*
* const stream = ReadableStream.from(["1234", "5678"]);
* const transformed = stream.pipeThrough(new TextEncoderStream()).pipeThrough(
* // `4` is the boundary of the chunks
* new LimitedBytesTransformStream(4),
* ).pipeThrough(new TextDecoderStream());
*
* assertEquals(
* await Array.fromAsync(transformed),
* // The first chunk was read, but the second chunk was not
* ["1234"],
* );
* ```
*
* @example `size` is less than the total byte length of the chunks, and not at
* the boundary of the chunks
* ```ts
* import { LimitedBytesTransformStream } from "@std/streams/limited-bytes-transform-stream";
* import { assertEquals } from "@std/assert";
*
* const stream = ReadableStream.from(["1234", "5678"]);
* const transformed = stream.pipeThrough(new TextEncoderStream()).pipeThrough(
* // `5` is not the boundary of the chunks
* new LimitedBytesTransformStream(5),
* ).pipeThrough(new TextDecoderStream());
*
* assertEquals(
* await Array.fromAsync(transformed),
* // The second chunk was not read because it would exceed the specified size
* ["1234"],
* );
* ```
*
* @example Throw error when the total byte length of the chunks exceeds the
* specified size
*
* To do so, set `options.error` to `true`.
*
* ```ts
* import { LimitedBytesTransformStream } from "@std/streams/limited-bytes-transform-stream";
* import { assertRejects } from "@std/assert";
*
* const stream = ReadableStream.from(["1234", "5678"]);
* const transformed = stream.pipeThrough(new TextEncoderStream()).pipeThrough(
* new LimitedBytesTransformStream(5, { error: true }),
* ).pipeThrough(new TextDecoderStream());
*
* await assertRejects(async () => {
* await Array.fromAsync(transformed);
* }, RangeError);
* ```
*/
export class LimitedBytesTransformStream
extends TransformStream<Uint8Array, Uint8Array> {
#read = 0;
/**
* Constructs a new instance.
*
* @param size A size limit in bytes.
* @param options Options for the stream.
*/
constructor(
size: number,
options: LimitedBytesTransformStreamOptions = { error: false },
) {
super({
transform: (chunk, controller) => {
if ((this.#read + chunk.byteLength) > size) {
if (options.error) {
throw new RangeError(`Exceeded byte size limit of '${size}'`);
} else {
controller.terminate();
}
} else {
this.#read += chunk.byteLength;
controller.enqueue(chunk);
}
},
});
}
}
|