All files / async / tee.ts

100.00% Branches 11/11
100.00% Functions 5/5
100.00% Lines 48/48
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
x8
x8
x5
x5
 
x8
 
x8
x5
x5
x5
x5
x5
x5
x5
x5
 
x8
x20
x20
x15
x15
x15
x15
x15
x15
x20
x5
x5
x20
x8
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
x8
x8
x8
 
x5
 
x5
x12
x12
x64
x33
x33
x58
x11
x31
x20
x20
x64
x12
 
x5
x5
 
 
 
 
x5
















































































































// Copyright 2018-2026 the Deno authors. MIT license.
// This module is browser compatible.

/**
 * Utility for representing n-tuple. Used in {@linkcode tee}.
 *
 * @internal
 */
export type Tuple<T, N extends number> = N extends N
  ? number extends N ? T[] : TupleOf<T, N, []>
  : never;

/**
 * Utility for representing n-tuple of. Used in {@linkcode Tuple}.
 *
 * @internal
 */
export type TupleOf<T, N extends number, R extends unknown[]> =
  R["length"] extends N ? R
    : TupleOf<T, N, [T, ...R]>;

interface QueueNode<T> {
  value: T;
  next: QueueNode<T> | undefined;
}

class Queue<T> {
  #source: AsyncIterator<T>;
  #queue: QueueNode<T>;
  head: QueueNode<T>;

  done: boolean;

  constructor(iterable: AsyncIterable<T>) {
    this.#source = iterable[Symbol.asyncIterator]();
    this.#queue = {
      value: undefined!,
      next: undefined,
    };
    this.head = this.#queue;
    this.done = false;
  }

  async next() {
    const result = await this.#source.next();
    if (!result.done) {
      const nextNode: QueueNode<T> = {
        value: result.value,
        next: undefined,
      };
      this.#queue.next = nextNode;
      this.#queue = nextNode;
    } else {
      this.done = true;
    }
  }
}

/**
 * Branches the given async iterable into the `n` branches.
 *
 * @example Usage
 * ```ts
 * import { tee } from "@std/async/tee";
 * import { assertEquals } from "@std/assert";
 *
 * const gen = async function* gen() {
 *   yield 1;
 *   yield 2;
 *   yield 3;
 * };
 *
 * const [branch1, branch2] = tee(gen());
 *
 * const result1 = await Array.fromAsync(branch1);
 * assertEquals(result1, [1, 2, 3]);
 *
 * const result2 = await Array.fromAsync(branch2);
 * assertEquals(result2, [1, 2, 3]);
 * ```
 *
 * @typeParam T The type of the provided async iterable and the returned async iterables.
 * @typeParam N The amount of branches to tee into.
 * @param iterable The iterable to tee.
 * @param n The amount of branches to tee into.
 * @returns The tuple where each element is an async iterable.
 */
export function tee<T, N extends number = 2>(
  iterable: AsyncIterable<T>,
  n: N = 2 as N,
): Tuple<AsyncIterable<T>, N> {
  const queue = new Queue<T>(iterable);

  async function* generator(): AsyncGenerator<T> {
    let buffer = queue.head;
    while (true) {
      if (buffer.next) {
        buffer = buffer.next;
        yield buffer.value;
      } else if (queue.done) {
        return;
      } else {
        await queue.next();
      }
    }
  }

  return Array.from({ length: n }).map(
    () => generator(),
  ) as Tuple<
    AsyncIterable<T>,
    N
  >;
}