fix(api): resolve socketsJoin issue

This commit is contained in:
yassinedorbozgithub 2025-05-09 14:37:06 +01:00
parent 0db40680dc
commit 2638e845e6
5 changed files with 48 additions and 9 deletions

View File

@ -25,6 +25,7 @@ import {
} from '@/websocket/decorators/socket-method.decorator'; } from '@/websocket/decorators/socket-method.decorator';
import { SocketReq } from '@/websocket/decorators/socket-req.decorator'; import { SocketReq } from '@/websocket/decorators/socket-req.decorator';
import { SocketRes } from '@/websocket/decorators/socket-res.decorator'; import { SocketRes } from '@/websocket/decorators/socket-res.decorator';
import { IOOutgoingSubscribeMessage } from '@/websocket/pipes/io-message.pipe';
import { Room } from '@/websocket/types'; import { Room } from '@/websocket/types';
import { SocketRequest } from '@/websocket/utils/socket-request'; import { SocketRequest } from '@/websocket/utils/socket-request';
import { SocketResponse } from '@/websocket/utils/socket-response'; import { SocketResponse } from '@/websocket/utils/socket-response';
@ -60,11 +61,16 @@ export class MessageService extends BaseService<
*/ */
@SocketGet('/message/subscribe/') @SocketGet('/message/subscribe/')
@SocketPost('/message/subscribe/') @SocketPost('/message/subscribe/')
subscribe(@SocketReq() req: SocketRequest, @SocketRes() res: SocketResponse) { async subscribe(
@SocketReq() req: SocketRequest,
@SocketRes() res: SocketResponse,
): Promise<IOOutgoingSubscribeMessage> {
try { try {
this.gateway.io.socketsJoin(Room.MESSAGE); await this.gateway.joinNotificationSockets(req.sessionID, Room.MESSAGE);
return res.status(200).json({ return res.status(200).json({
success: true, success: true,
subscribe: Room.MESSAGE,
}); });
} catch (e) { } catch (e) {
this.logger.error('Websocket subscription', e); this.logger.error('Websocket subscription', e);

View File

@ -22,6 +22,7 @@ import {
} from '@/websocket/decorators/socket-method.decorator'; } from '@/websocket/decorators/socket-method.decorator';
import { SocketReq } from '@/websocket/decorators/socket-req.decorator'; import { SocketReq } from '@/websocket/decorators/socket-req.decorator';
import { SocketRes } from '@/websocket/decorators/socket-res.decorator'; import { SocketRes } from '@/websocket/decorators/socket-res.decorator';
import { IOOutgoingSubscribeMessage } from '@/websocket/pipes/io-message.pipe';
import { Room } from '@/websocket/types'; import { Room } from '@/websocket/types';
import { SocketRequest } from '@/websocket/utils/socket-request'; import { SocketRequest } from '@/websocket/utils/socket-request';
import { SocketResponse } from '@/websocket/utils/socket-response'; import { SocketResponse } from '@/websocket/utils/socket-response';
@ -63,9 +64,16 @@ export class SubscriberService extends BaseService<
*/ */
@SocketGet('/subscriber/subscribe/') @SocketGet('/subscriber/subscribe/')
@SocketPost('/subscriber/subscribe/') @SocketPost('/subscriber/subscribe/')
subscribe(@SocketReq() req: SocketRequest, @SocketRes() res: SocketResponse) { async subscribe(
@SocketReq() req: SocketRequest,
@SocketRes() res: SocketResponse,
): Promise<IOOutgoingSubscribeMessage> {
try { try {
this.gateway.io.socketsJoin(Room.SUBSCRIBER); await this.gateway.joinNotificationSockets(
req.sessionID,
Room.SUBSCRIBER,
);
return res.json({ return res.json({
success: true, success: true,
subscribe: Room.SUBSCRIBER, subscribe: Room.SUBSCRIBER,

View File

@ -1,5 +1,5 @@
/* /*
* Copyright © 2024 Hexastack. All rights reserved. * Copyright © 2025 Hexastack. All rights reserved.
* *
* Licensed under the GNU Affero General Public License v3.0 (AGPLv3) with the following additional terms: * Licensed under the GNU Affero General Public License v3.0 (AGPLv3) with the following additional terms:
* 1. The name "Hexabot" is a trademark of Hexastack. You may not use this name in derivative works without express written permission. * 1. The name "Hexabot" is a trademark of Hexastack. You may not use this name in derivative works without express written permission.
@ -15,6 +15,8 @@ import {
import { config } from '@/config'; import { config } from '@/config';
import { Room } from '../types';
export interface IOOutgoingMessage { export interface IOOutgoingMessage {
statusCode: number; statusCode: number;
body: any; body: any;
@ -29,6 +31,11 @@ export interface IOIncomingMessage {
url: string; url: string;
} }
export interface IOOutgoingSubscribeMessage {
success: boolean;
subscribe: Room;
}
@Injectable() @Injectable()
export class IOMessagePipe implements PipeTransform<string, IOIncomingMessage> { export class IOMessagePipe implements PipeTransform<string, IOIncomingMessage> {
transform(value: string, _metadata: ArgumentMetadata): IOIncomingMessage { transform(value: string, _metadata: ArgumentMetadata): IOIncomingMessage {

View File

@ -1,5 +1,5 @@
/* /*
* Copyright © 2024 Hexastack. All rights reserved. * Copyright © 2025 Hexastack. All rights reserved.
* *
* Licensed under the GNU Affero General Public License v3.0 (AGPLv3) with the following additional terms: * Licensed under the GNU Affero General Public License v3.0 (AGPLv3) with the following additional terms:
* 1. The name "Hexabot" is a trademark of Hexastack. You may not use this name in derivative works without express written permission. * 1. The name "Hexabot" is a trademark of Hexastack. You may not use this name in derivative works without express written permission.
@ -70,9 +70,9 @@ export class SocketResponse {
return response; return response;
} }
json(data: any) { json<T = Partial<IOOutgoingMessage>>(data: T): T {
this.set('Content-Type', 'application/json'); this.set('Content-Type', 'application/json');
return this.send(data); return this.send(data) as T;
} }
public getPromise() { public getPromise() {

View File

@ -21,7 +21,8 @@ import cookie from 'cookie';
import * as cookieParser from 'cookie-parser'; import * as cookieParser from 'cookie-parser';
import signature from 'cookie-signature'; import signature from 'cookie-signature';
import { Session as ExpressSession, SessionData } from 'express-session'; import { Session as ExpressSession, SessionData } from 'express-session';
import { Server, Socket } from 'socket.io'; import { RemoteSocket, Server, Socket } from 'socket.io';
import { DefaultEventsMap } from 'socket.io/dist/typed-events';
import { sync as uid } from 'uid-safe'; import { sync as uid } from 'uid-safe';
import { MessageFull } from '@/chat/schemas/message.schema'; import { MessageFull } from '@/chat/schemas/message.schema';
@ -405,4 +406,21 @@ export class WebsocketGateway
); );
return response.getPromise(); return response.getPromise();
} }
async getNotificationSockets(
sessionId: string,
): Promise<RemoteSocket<DefaultEventsMap, any>[]> {
return (await this.io.fetchSockets()).filter(
({ handshake, data }) =>
!handshake.query.channel && data.sessionID === sessionId,
);
}
async joinNotificationSockets(sessionID: string, room: Room): Promise<void> {
const notificationSockets = await this.getNotificationSockets(sessionID);
notificationSockets.forEach((notificationSocket) =>
notificationSocket.join(room),
);
}
} }