From 50f70e00b8ff2d6877f12bad1e65d22ab954f365 Mon Sep 17 00:00:00 2001 From: Mohamed Marrouchi Date: Fri, 30 May 2025 08:52:22 +0100 Subject: [PATCH] feat: add mutex to prevent ws racing conditions --- api/package-lock.json | 17 +++++++++++++---- api/package.json | 1 + api/src/channel/channel.service.ts | 6 +++--- .../services/socket-event-dispatcher.service.ts | 15 ++++++++++++++- 4 files changed, 31 insertions(+), 8 deletions(-) diff --git a/api/package-lock.json b/api/package-lock.json index 92f81953..f0e04971 100644 --- a/api/package-lock.json +++ b/api/package-lock.json @@ -27,6 +27,7 @@ "@resvg/resvg-js": "^2.6.2", "@socket.io/redis-adapter": "^8.3.0", "@tekuconcept/nestjs-csrf": "^1.1.0", + "async-mutex": "^0.5.0", "bcryptjs": "^2.4.3", "cache-manager": "^5.3.2", "cache-manager-redis-yet": "^4.1.2", @@ -7411,10 +7412,9 @@ "optional": true }, "node_modules/async-mutex": { - "version": "0.4.1", - "resolved": "https://registry.npmjs.org/async-mutex/-/async-mutex-0.4.1.tgz", - "integrity": "sha512-WfoBo4E/TbCX1G95XTjbWTE3X2XLG0m1Xbv2cwOtuPdyH9CZvnaA5nCt1ucjaKEgW2A5IF71hxrRhr83Je5xjA==", - "dev": true, + "version": "0.5.0", + "resolved": "https://registry.npmjs.org/async-mutex/-/async-mutex-0.5.0.tgz", + "integrity": "sha512-1A94B18jkJ3DYq284ohPxoXbfTA5HsQ7/Mf4DEhcyLx3Bz27Rh59iScbB6EPiP+B+joue6YCxcMXSbFC1tZKwA==", "dependencies": { "tslib": "^2.4.0" } @@ -15003,6 +15003,15 @@ "node": ">=14.20.1" } }, + "node_modules/mongodb-memory-server-core/node_modules/async-mutex": { + "version": "0.4.1", + "resolved": "https://registry.npmjs.org/async-mutex/-/async-mutex-0.4.1.tgz", + "integrity": "sha512-WfoBo4E/TbCX1G95XTjbWTE3X2XLG0m1Xbv2cwOtuPdyH9CZvnaA5nCt1ucjaKEgW2A5IF71hxrRhr83Je5xjA==", + "dev": true, + "dependencies": { + "tslib": "^2.4.0" + } + }, "node_modules/mongodb-memory-server-core/node_modules/bson": { "version": "5.5.1", "resolved": "https://registry.npmjs.org/bson/-/bson-5.5.1.tgz", diff --git a/api/package.json b/api/package.json index 86bb8c8e..2372f76e 100644 --- a/api/package.json +++ b/api/package.json @@ -62,6 +62,7 @@ "@resvg/resvg-js": "^2.6.2", "@socket.io/redis-adapter": "^8.3.0", "@tekuconcept/nestjs-csrf": "^1.1.0", + "async-mutex": "^0.5.0", "bcryptjs": "^2.4.3", "cache-manager": "^5.3.2", "cache-manager-redis-yet": "^4.1.2", diff --git a/api/src/channel/channel.service.ts b/api/src/channel/channel.service.ts index 00c987c0..acab95ea 100644 --- a/api/src/channel/channel.service.ts +++ b/api/src/channel/channel.service.ts @@ -120,13 +120,13 @@ export class ChannelService { */ @SocketGet(`/webhook/${WEB_CHANNEL_NAME}/`) @SocketPost(`/webhook/${WEB_CHANNEL_NAME}/`) - handleWebsocketForWebChannel( + async handleWebsocketForWebChannel( @SocketReq() req: SocketRequest, @SocketRes() res: SocketResponse, ) { this.logger.log('Channel notification (Web Socket) : ', req.method); const handler = this.getChannelHandler(WEB_CHANNEL_NAME); - return handler.handle(req, res); + return await Promise.resolve(handler.handle(req, res)); } /** @@ -195,6 +195,6 @@ export class ChannelService { } const handler = this.getChannelHandler(CONSOLE_CHANNEL_NAME); - return handler.handle(req, res); + return await Promise.resolve(handler.handle(req, res)); } } diff --git a/api/src/websocket/services/socket-event-dispatcher.service.ts b/api/src/websocket/services/socket-event-dispatcher.service.ts index 326bb3e9..1abb58f1 100644 --- a/api/src/websocket/services/socket-event-dispatcher.service.ts +++ b/api/src/websocket/services/socket-event-dispatcher.service.ts @@ -14,7 +14,9 @@ import { } from '@nestjs/common'; import { ModulesContainer } from '@nestjs/core'; import { InstanceWrapper } from '@nestjs/core/injector/instance-wrapper'; -import { EventEmitter2 } from '@nestjs/event-emitter'; +import { EventEmitter2, OnEvent } from '@nestjs/event-emitter'; +import { Mutex } from 'async-mutex'; +import { Socket } from 'socket.io'; import { SocketEventMetadataStorage } from '../storage/socket-event-metadata.storage'; import { SocketRequest } from '../utils/socket-request'; @@ -39,12 +41,21 @@ export class SocketEventDispatcherService implements OnModuleInit { private readonly modulesContainer: ModulesContainer, ) {} + @OnEvent('hook:websocket:connection') + handleConnection(client: Socket) { + client.data.mutex = new Mutex(); + } + async handleEvent( socketMethod: SocketMethod, path: string, req: SocketRequest, res: SocketResponse, ) { + // Prevent racing conditions from the same socket + const socketData = req.socket.data; + const release = await socketData.mutex.acquire(); + try { const handlers = this.routeHandlers[socketMethod]; const foundHandler = Array.from(handlers.entries()).find(([key, _]) => { @@ -62,6 +73,8 @@ export class SocketEventDispatcherService implements OnModuleInit { return await handler(req, res); } catch (error) { return this.handleException(error, res); + } finally { + release(); } }