feat: add mutex to prevent ws racing conditions

This commit is contained in:
Mohamed Marrouchi 2025-05-30 08:52:22 +01:00
parent a5ad31bd2e
commit 50f70e00b8
4 changed files with 31 additions and 8 deletions

17
api/package-lock.json generated
View File

@ -27,6 +27,7 @@
"@resvg/resvg-js": "^2.6.2", "@resvg/resvg-js": "^2.6.2",
"@socket.io/redis-adapter": "^8.3.0", "@socket.io/redis-adapter": "^8.3.0",
"@tekuconcept/nestjs-csrf": "^1.1.0", "@tekuconcept/nestjs-csrf": "^1.1.0",
"async-mutex": "^0.5.0",
"bcryptjs": "^2.4.3", "bcryptjs": "^2.4.3",
"cache-manager": "^5.3.2", "cache-manager": "^5.3.2",
"cache-manager-redis-yet": "^4.1.2", "cache-manager-redis-yet": "^4.1.2",
@ -7411,10 +7412,9 @@
"optional": true "optional": true
}, },
"node_modules/async-mutex": { "node_modules/async-mutex": {
"version": "0.4.1", "version": "0.5.0",
"resolved": "https://registry.npmjs.org/async-mutex/-/async-mutex-0.4.1.tgz", "resolved": "https://registry.npmjs.org/async-mutex/-/async-mutex-0.5.0.tgz",
"integrity": "sha512-WfoBo4E/TbCX1G95XTjbWTE3X2XLG0m1Xbv2cwOtuPdyH9CZvnaA5nCt1ucjaKEgW2A5IF71hxrRhr83Je5xjA==", "integrity": "sha512-1A94B18jkJ3DYq284ohPxoXbfTA5HsQ7/Mf4DEhcyLx3Bz27Rh59iScbB6EPiP+B+joue6YCxcMXSbFC1tZKwA==",
"dev": true,
"dependencies": { "dependencies": {
"tslib": "^2.4.0" "tslib": "^2.4.0"
} }
@ -15003,6 +15003,15 @@
"node": ">=14.20.1" "node": ">=14.20.1"
} }
}, },
"node_modules/mongodb-memory-server-core/node_modules/async-mutex": {
"version": "0.4.1",
"resolved": "https://registry.npmjs.org/async-mutex/-/async-mutex-0.4.1.tgz",
"integrity": "sha512-WfoBo4E/TbCX1G95XTjbWTE3X2XLG0m1Xbv2cwOtuPdyH9CZvnaA5nCt1ucjaKEgW2A5IF71hxrRhr83Je5xjA==",
"dev": true,
"dependencies": {
"tslib": "^2.4.0"
}
},
"node_modules/mongodb-memory-server-core/node_modules/bson": { "node_modules/mongodb-memory-server-core/node_modules/bson": {
"version": "5.5.1", "version": "5.5.1",
"resolved": "https://registry.npmjs.org/bson/-/bson-5.5.1.tgz", "resolved": "https://registry.npmjs.org/bson/-/bson-5.5.1.tgz",

View File

@ -62,6 +62,7 @@
"@resvg/resvg-js": "^2.6.2", "@resvg/resvg-js": "^2.6.2",
"@socket.io/redis-adapter": "^8.3.0", "@socket.io/redis-adapter": "^8.3.0",
"@tekuconcept/nestjs-csrf": "^1.1.0", "@tekuconcept/nestjs-csrf": "^1.1.0",
"async-mutex": "^0.5.0",
"bcryptjs": "^2.4.3", "bcryptjs": "^2.4.3",
"cache-manager": "^5.3.2", "cache-manager": "^5.3.2",
"cache-manager-redis-yet": "^4.1.2", "cache-manager-redis-yet": "^4.1.2",

View File

@ -120,13 +120,13 @@ export class ChannelService {
*/ */
@SocketGet(`/webhook/${WEB_CHANNEL_NAME}/`) @SocketGet(`/webhook/${WEB_CHANNEL_NAME}/`)
@SocketPost(`/webhook/${WEB_CHANNEL_NAME}/`) @SocketPost(`/webhook/${WEB_CHANNEL_NAME}/`)
handleWebsocketForWebChannel( async handleWebsocketForWebChannel(
@SocketReq() req: SocketRequest, @SocketReq() req: SocketRequest,
@SocketRes() res: SocketResponse, @SocketRes() res: SocketResponse,
) { ) {
this.logger.log('Channel notification (Web Socket) : ', req.method); this.logger.log('Channel notification (Web Socket) : ', req.method);
const handler = this.getChannelHandler(WEB_CHANNEL_NAME); const handler = this.getChannelHandler(WEB_CHANNEL_NAME);
return handler.handle(req, res); return await Promise.resolve(handler.handle(req, res));
} }
/** /**
@ -195,6 +195,6 @@ export class ChannelService {
} }
const handler = this.getChannelHandler(CONSOLE_CHANNEL_NAME); const handler = this.getChannelHandler(CONSOLE_CHANNEL_NAME);
return handler.handle(req, res); return await Promise.resolve(handler.handle(req, res));
} }
} }

View File

@ -14,7 +14,9 @@ import {
} from '@nestjs/common'; } from '@nestjs/common';
import { ModulesContainer } from '@nestjs/core'; import { ModulesContainer } from '@nestjs/core';
import { InstanceWrapper } from '@nestjs/core/injector/instance-wrapper'; import { InstanceWrapper } from '@nestjs/core/injector/instance-wrapper';
import { EventEmitter2 } from '@nestjs/event-emitter'; import { EventEmitter2, OnEvent } from '@nestjs/event-emitter';
import { Mutex } from 'async-mutex';
import { Socket } from 'socket.io';
import { SocketEventMetadataStorage } from '../storage/socket-event-metadata.storage'; import { SocketEventMetadataStorage } from '../storage/socket-event-metadata.storage';
import { SocketRequest } from '../utils/socket-request'; import { SocketRequest } from '../utils/socket-request';
@ -39,12 +41,21 @@ export class SocketEventDispatcherService implements OnModuleInit {
private readonly modulesContainer: ModulesContainer, private readonly modulesContainer: ModulesContainer,
) {} ) {}
@OnEvent('hook:websocket:connection')
handleConnection(client: Socket) {
client.data.mutex = new Mutex();
}
async handleEvent( async handleEvent(
socketMethod: SocketMethod, socketMethod: SocketMethod,
path: string, path: string,
req: SocketRequest, req: SocketRequest,
res: SocketResponse, res: SocketResponse,
) { ) {
// Prevent racing conditions from the same socket
const socketData = req.socket.data;
const release = await socketData.mutex.acquire();
try { try {
const handlers = this.routeHandlers[socketMethod]; const handlers = this.routeHandlers[socketMethod];
const foundHandler = Array.from(handlers.entries()).find(([key, _]) => { const foundHandler = Array.from(handlers.entries()).find(([key, _]) => {
@ -62,6 +73,8 @@ export class SocketEventDispatcherService implements OnModuleInit {
return await handler(req, res); return await handler(req, res);
} catch (error) { } catch (error) {
return this.handleException(error, res); return this.handleException(error, res);
} finally {
release();
} }
} }