All files / async / unstable_semaphore.ts

100.00% Branches 11/11
100.00% Lines 44/44
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
x6
x6
 
x20
 
x20
 
x6
 
 
 
 
 
 
x6
x20
x22
x22
 
x22
x32
x20
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
x6
x78
x26
x38
x38
x38
x46
x216
x54
x56
x54
x60
x60
x46
x26
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
x6
x9
x11
x33
x11
x12
x9
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
x6
x22
x30
x30
x30
x30
x35
x35
x22
x6
















































































































































// Copyright 2018-2025 the Deno authors. MIT license.
// This module is browser compatible.

/** Internal node for the FIFO waiting queue. */
interface Node {
  res: () => void;
  next: Node | undefined;
}

/**
 * A counting semaphore for limiting concurrent access to a resource.
 *
 * @experimental **UNSTABLE**: New API, yet to be vetted.
 *
 * @example Usage
 * ```ts
 * import { Semaphore } from "@std/async/unstable-semaphore";
 *
 * const sem = new Semaphore(2);
 * {
 *   using _permit = await sem.acquire();
 *   // critical section
 * } // permit is automatically released when exiting the block
 * ```
 */
export class Semaphore {
  #max: number;
  /** Current number of available permits. */
  #count: number;
  /** Head of the waiting queue. */
  #head: Node | undefined;
  /** Tail of the waiting queue. */
  #tail: Node | undefined;

  /**
   * Creates a new semaphore with the specified number of permits.
   *
   * @param max Maximum concurrent permits. Defaults to 1 (mutex).
   */
  constructor(max: number = 1) {
    if (max < 1) {
      throw new TypeError(
        `Cannot create semaphore as 'max' must be at least 1: current value is ${max}`,
      );
    }
    this.#count = this.#max = max;
  }

  /**
   * Acquires a permit, waiting if none are available.
   *
   * @example Usage
   * ```ts no-assert
   * import { Semaphore } from "@std/async/unstable-semaphore";
   *
   * const sem = new Semaphore(1);
   * await sem.acquire();
   * try {
   *   // critical section
   * } finally {
   *   sem.release();
   * }
   * ```
   *
   * @example Using `using` statement
   * ```ts no-assert
   * import { Semaphore } from "@std/async/unstable-semaphore";
   *
   * const sem = new Semaphore(1);
   * {
   *   using _permit = await sem.acquire();
   *   // critical section
   * } // permit is automatically released when exiting the block
   * ```
   *
   * @returns A promise that resolves to a {@linkcode Disposable} when a permit is acquired.
   */
  acquire(): Promise<Disposable> {
    const disposable: Disposable = { [Symbol.dispose]: () => this.release() };
    if (this.#count > 0) {
      this.#count--;
      return Promise.resolve(disposable);
    }
    return new Promise((res) => {
      const node: Node = { res: () => res(disposable), next: undefined };
      if (this.#tail) {
        this.#tail = this.#tail.next = node;
      } else {
        this.#head = this.#tail = node;
      }
    });
  }

  /**
   * Tries to acquire a permit without waiting.
   *
   * @example Usage
   * ```ts no-assert
   * import { Semaphore } from "@std/async/unstable-semaphore";
   *
   * const sem = new Semaphore(1);
   * const permit = sem.tryAcquire();
   * if (permit) {
   *   using _ = permit;
   *   // critical section
   * } else {
   *   // resource is busy
   * }
   * ```
   *
   * @returns A {@linkcode Disposable} if a permit was acquired, `undefined` otherwise.
   */
  tryAcquire(): Disposable | undefined {
    if (this.#count > 0) {
      this.#count--;
      return { [Symbol.dispose]: () => this.release() };
    }
    return undefined;
  }

  /**
   * Releases a permit, allowing the next waiter to proceed.
   *
   * @example Usage
   * ```ts no-assert
   * import { Semaphore } from "@std/async/unstable-semaphore";
   *
   * const sem = new Semaphore(1);
   * await sem.acquire();
   * try {
   *   // critical section
   * } finally {
   *   sem.release();
   * }
   * ```
   */
  release(): void {
    if (this.#head) {
      this.#head.res();
      this.#head = this.#head.next;
      if (!this.#head) this.#tail = undefined;
    } else if (this.#count < this.#max) {
      this.#count++;
    }
  }
}