JavaScript/TypeScriptのforEach内の非同期処理をセマフォで同期させる
publication: 2023/07/23
update:2024/02/20
セマフォの必要性
Node.jsで非同期の通信プログラムを作るような場合、最大並列数を指定して実行したいことがあります。最大並列数が必要な理由としては、実行ごとに同期を取る順次処理だと速度が遅くなってしまい、かといって全てを並列にするとDoS攻撃状態になってしまうからです。
ということで、簡単にセマフォを実現できるライブラリを作りました。
semaphore()でインスタンスを作成し、acquire()でロック、release()でロック解除、all()で全てのロックが解除されるまで待ちます。
並列数1の場合
いか、並列数1で実行するサンプルです。セマフォで同期させているので、forEachでも非同期関数の順次処理が行われます。時々、forEachでは非同期が使えないという間違った情報が流れることがありますが、なんの問題もなく使えます。
1import { semaphore } from "@node-libraries/semaphore";23const f = (value: string) =>4 new Promise<void>((resolve) => {5 console.timeLog("debug", value);6 setTimeout(resolve, 1000);7 });89const main = async () => {10 console.time("debug");11 const s = semaphore();12 ["A", "B", "C", "D", "E"].forEach(async (v) => {13 await s.acquire();14 await f(v);15 s.release();16 });17 await s.all(); // Wait for everything to be finished.18 console.timeLog("debug", "end");19};20main();2122/* Result23debug: 0.197ms A24debug: 1.014s B25debug: 2.027s C26debug: 3.039s D27debug: 4.040s E28debug: 5.050s end29*/
並列数2の場合
semaphore(2)で並列数を2にしたサンプルです。AB,CD,Eという組み合わせて、同じようなタイミングで実行されているのが確認できます。
1import { semaphore } from "@node-libraries/semaphore";23const f = (value: string) =>4 new Promise<void>((resolve) => {5 console.timeLog("debug", value);6 setTimeout(resolve, 1000);7 });89const main = async () => {10 console.time("debug");11 const s = semaphore(2);12 ["A", "B", "C", "D", "E"].forEach(async (v) => {13 await s.acquire();14 await f(v);15 s.release();16 });17 await s.all(); // Wait for everything to be finished.18 console.timeLog("debug", "end");19};20main();2122/* Result23debug: 0.19ms A24debug: 1.826ms B25debug: 1.005s C26debug: 1.005s D27debug: 2.012s E28debug: 3.028s end29*/
ライブラリのコード
コード量はこれだけです。ちなみにセマフォを実現するのにsetInterval系で並列インスタンスのチェックを定期的に行っているようなライブラリがありますが、そんな書き方をしなくてもカウント制御は普通にできるので、タイマー分だけ冗長になるだけです。
1export const semaphore = (2 limit = 1,3 count = 0,4 rs = new Array<() => void>(),5 all?: () => void6) => ({7 acquire: () =>8 ++count > limit && new Promise<void>((resolve) => rs.push(resolve)),9 release: () => (--count ? rs.shift()?.() : all?.()),10 all: () => count && new Promise<void>((resolve) => (all = resolve)),11});
まとめ
Node.jsの非同期はシングルスレッドなので、実際のところは似非セマフォです。