空雲 Blog

JavaScript/TypeScriptのforEach内の非同期処理をセマフォで同期させる

publication: 2023/07/23
update:2024/02/20

セマフォの必要性

Node.jsで非同期の通信プログラムを作るような場合、最大並列数を指定して実行したいことがあります。最大並列数が必要な理由としては、実行ごとに同期を取る順次処理だと速度が遅くなってしまい、かといって全てを並列にするとDoS攻撃状態になってしまうからです。

ということで、簡単にセマフォを実現できるライブラリを作りました。

@node-libraries/semaphore

semaphore()でインスタンスを作成し、acquire()でロック、release()でロック解除、all()で全てのロックが解除されるまで待ちます。

並列数1の場合

いか、並列数1で実行するサンプルです。セマフォで同期させているので、forEachでも非同期関数の順次処理が行われます。時々、forEachでは非同期が使えないという間違った情報が流れることがありますが、なんの問題もなく使えます。

1import { semaphore } from "@node-libraries/semaphore";
2
3const f = (value: string) =>
4 new Promise<void>((resolve) => {
5 console.timeLog("debug", value);
6 setTimeout(resolve, 1000);
7 });
8
9const main = async () => {
10 console.time("debug");
11 const s = semaphore();
12 ["A", "B", "C", "D", "E"].forEach(async (v) => {
13 await s.acquire();
14 await f(v);
15 s.release();
16 });
17 await s.all(); // Wait for everything to be finished.
18 console.timeLog("debug", "end");
19};
20main();
21
22/* Result
23debug: 0.197ms A
24debug: 1.014s B
25debug: 2.027s C
26debug: 3.039s D
27debug: 4.040s E
28debug: 5.050s end
29*/

並列数2の場合

semaphore(2)で並列数を2にしたサンプルです。AB,CD,Eという組み合わせて、同じようなタイミングで実行されているのが確認できます。

1import { semaphore } from "@node-libraries/semaphore";
2
3const f = (value: string) =>
4 new Promise<void>((resolve) => {
5 console.timeLog("debug", value);
6 setTimeout(resolve, 1000);
7 });
8
9const main = async () => {
10 console.time("debug");
11 const s = semaphore(2);
12 ["A", "B", "C", "D", "E"].forEach(async (v) => {
13 await s.acquire();
14 await f(v);
15 s.release();
16 });
17 await s.all(); // Wait for everything to be finished.
18 console.timeLog("debug", "end");
19};
20main();
21
22/* Result
23debug: 0.19ms A
24debug: 1.826ms B
25debug: 1.005s C
26debug: 1.005s D
27debug: 2.012s E
28debug: 3.028s end
29*/

ライブラリのコード

コード量はこれだけです。ちなみにセマフォを実現するのにsetInterval系で並列インスタンスのチェックを定期的に行っているようなライブラリがありますが、そんな書き方をしなくてもカウント制御は普通にできるので、タイマー分だけ冗長になるだけです。

1export const semaphore = (
2 limit = 1,
3 count = 0,
4 rs = new Array<() => void>(),
5 all?: () => void
6) => ({
7 acquire: () =>
8 ++count > limit && new Promise<void>((resolve) => rs.push(resolve)),
9 release: () => (--count ? rs.shift()?.() : all?.()),
10 all: () => count && new Promise<void>((resolve) => (all = resolve)),
11});

まとめ

Node.jsの非同期はシングルスレッドなので、実際のところは似非セマフォです。