From 2638e845e6474694917f2620bb25711a76bf15cb Mon Sep 17 00:00:00 2001 From: yassinedorbozgithub Date: Fri, 9 May 2025 14:37:06 +0100 Subject: [PATCH] fix(api): resolve socketsJoin issue --- api/src/chat/services/message.service.ts | 10 ++++++++-- api/src/chat/services/subscriber.service.ts | 12 ++++++++++-- api/src/websocket/pipes/io-message.pipe.ts | 9 ++++++++- api/src/websocket/utils/socket-response.ts | 6 +++--- api/src/websocket/websocket.gateway.ts | 20 +++++++++++++++++++- 5 files changed, 48 insertions(+), 9 deletions(-) diff --git a/api/src/chat/services/message.service.ts b/api/src/chat/services/message.service.ts index d6b210ea..68629358 100644 --- a/api/src/chat/services/message.service.ts +++ b/api/src/chat/services/message.service.ts @@ -25,6 +25,7 @@ import { } from '@/websocket/decorators/socket-method.decorator'; import { SocketReq } from '@/websocket/decorators/socket-req.decorator'; import { SocketRes } from '@/websocket/decorators/socket-res.decorator'; +import { IOOutgoingSubscribeMessage } from '@/websocket/pipes/io-message.pipe'; import { Room } from '@/websocket/types'; import { SocketRequest } from '@/websocket/utils/socket-request'; import { SocketResponse } from '@/websocket/utils/socket-response'; @@ -60,11 +61,16 @@ export class MessageService extends BaseService< */ @SocketGet('/message/subscribe/') @SocketPost('/message/subscribe/') - subscribe(@SocketReq() req: SocketRequest, @SocketRes() res: SocketResponse) { + async subscribe( + @SocketReq() req: SocketRequest, + @SocketRes() res: SocketResponse, + ): Promise { try { - this.gateway.io.socketsJoin(Room.MESSAGE); + await this.gateway.joinNotificationSockets(req.sessionID, Room.MESSAGE); + return res.status(200).json({ success: true, + subscribe: Room.MESSAGE, }); } catch (e) { this.logger.error('Websocket subscription', e); diff --git a/api/src/chat/services/subscriber.service.ts b/api/src/chat/services/subscriber.service.ts index faa02ffc..e5a394ee 100644 --- a/api/src/chat/services/subscriber.service.ts +++ b/api/src/chat/services/subscriber.service.ts @@ -22,6 +22,7 @@ import { } from '@/websocket/decorators/socket-method.decorator'; import { SocketReq } from '@/websocket/decorators/socket-req.decorator'; import { SocketRes } from '@/websocket/decorators/socket-res.decorator'; +import { IOOutgoingSubscribeMessage } from '@/websocket/pipes/io-message.pipe'; import { Room } from '@/websocket/types'; import { SocketRequest } from '@/websocket/utils/socket-request'; import { SocketResponse } from '@/websocket/utils/socket-response'; @@ -63,9 +64,16 @@ export class SubscriberService extends BaseService< */ @SocketGet('/subscriber/subscribe/') @SocketPost('/subscriber/subscribe/') - subscribe(@SocketReq() req: SocketRequest, @SocketRes() res: SocketResponse) { + async subscribe( + @SocketReq() req: SocketRequest, + @SocketRes() res: SocketResponse, + ): Promise { try { - this.gateway.io.socketsJoin(Room.SUBSCRIBER); + await this.gateway.joinNotificationSockets( + req.sessionID, + Room.SUBSCRIBER, + ); + return res.json({ success: true, subscribe: Room.SUBSCRIBER, diff --git a/api/src/websocket/pipes/io-message.pipe.ts b/api/src/websocket/pipes/io-message.pipe.ts index 102247a0..c82c56db 100644 --- a/api/src/websocket/pipes/io-message.pipe.ts +++ b/api/src/websocket/pipes/io-message.pipe.ts @@ -1,5 +1,5 @@ /* - * Copyright © 2024 Hexastack. All rights reserved. + * Copyright © 2025 Hexastack. All rights reserved. * * Licensed under the GNU Affero General Public License v3.0 (AGPLv3) with the following additional terms: * 1. The name "Hexabot" is a trademark of Hexastack. You may not use this name in derivative works without express written permission. @@ -15,6 +15,8 @@ import { import { config } from '@/config'; +import { Room } from '../types'; + export interface IOOutgoingMessage { statusCode: number; body: any; @@ -29,6 +31,11 @@ export interface IOIncomingMessage { url: string; } +export interface IOOutgoingSubscribeMessage { + success: boolean; + subscribe: Room; +} + @Injectable() export class IOMessagePipe implements PipeTransform { transform(value: string, _metadata: ArgumentMetadata): IOIncomingMessage { diff --git a/api/src/websocket/utils/socket-response.ts b/api/src/websocket/utils/socket-response.ts index 18756904..48e8cf39 100644 --- a/api/src/websocket/utils/socket-response.ts +++ b/api/src/websocket/utils/socket-response.ts @@ -1,5 +1,5 @@ /* - * Copyright © 2024 Hexastack. All rights reserved. + * Copyright © 2025 Hexastack. All rights reserved. * * Licensed under the GNU Affero General Public License v3.0 (AGPLv3) with the following additional terms: * 1. The name "Hexabot" is a trademark of Hexastack. You may not use this name in derivative works without express written permission. @@ -70,9 +70,9 @@ export class SocketResponse { return response; } - json(data: any) { + json>(data: T): T { this.set('Content-Type', 'application/json'); - return this.send(data); + return this.send(data) as T; } public getPromise() { diff --git a/api/src/websocket/websocket.gateway.ts b/api/src/websocket/websocket.gateway.ts index f6e1b0f6..7ea1a812 100644 --- a/api/src/websocket/websocket.gateway.ts +++ b/api/src/websocket/websocket.gateway.ts @@ -21,7 +21,8 @@ import cookie from 'cookie'; import * as cookieParser from 'cookie-parser'; import signature from 'cookie-signature'; import { Session as ExpressSession, SessionData } from 'express-session'; -import { Server, Socket } from 'socket.io'; +import { RemoteSocket, Server, Socket } from 'socket.io'; +import { DefaultEventsMap } from 'socket.io/dist/typed-events'; import { sync as uid } from 'uid-safe'; import { MessageFull } from '@/chat/schemas/message.schema'; @@ -405,4 +406,21 @@ export class WebsocketGateway ); return response.getPromise(); } + + async getNotificationSockets( + sessionId: string, + ): Promise[]> { + return (await this.io.fetchSockets()).filter( + ({ handshake, data }) => + !handshake.query.channel && data.sessionID === sessionId, + ); + } + + async joinNotificationSockets(sessionID: string, room: Room): Promise { + const notificationSockets = await this.getNotificationSockets(sessionID); + + notificationSockets.forEach((notificationSocket) => + notificationSocket.join(room), + ); + } }