mirror of
https://github.com/hexastack/hexabot
synced 2025-06-26 18:27:28 +00:00
feat: initial commit
This commit is contained in:
405
api/src/websocket/websocket.gateway.ts
Normal file
405
api/src/websocket/websocket.gateway.ts
Normal file
@@ -0,0 +1,405 @@
|
||||
/*
|
||||
* Copyright © 2024 Hexastack. All rights reserved.
|
||||
*
|
||||
* Licensed under the GNU Affero General Public License v3.0 (AGPLv3) with the following additional terms:
|
||||
* 1. The name "Hexabot" is a trademark of Hexastack. You may not use this name in derivative works without express written permission.
|
||||
* 2. All derivative works must include clear attribution to the original creator and software, Hexastack and Hexabot, in a prominent location (e.g., in the software's "About" section, documentation, and README file).
|
||||
* 3. SaaS Restriction: This software, or any derivative of it, may not be used to offer a competing product or service (SaaS) without prior written consent from Hexastack. Offering the software as a service or using it in a commercial cloud environment without express permission is strictly prohibited.
|
||||
*/
|
||||
|
||||
import { EventEmitter2 } from '@nestjs/event-emitter';
|
||||
import {
|
||||
ConnectedSocket,
|
||||
MessageBody,
|
||||
OnGatewayConnection,
|
||||
OnGatewayDisconnect,
|
||||
OnGatewayInit,
|
||||
SubscribeMessage,
|
||||
WebSocketGateway,
|
||||
WebSocketServer,
|
||||
} from '@nestjs/websockets';
|
||||
import cookie from 'cookie';
|
||||
import * as cookieParser from 'cookie-parser';
|
||||
import signature from 'cookie-signature';
|
||||
import { SessionData } from 'express-session';
|
||||
import { Server, Socket } from 'socket.io';
|
||||
import { sync as uid } from 'uid-safe';
|
||||
|
||||
import { MessageFull } from '@/chat/schemas/message.schema';
|
||||
import {
|
||||
Subscriber,
|
||||
SubscriberFull,
|
||||
SubscriberStub,
|
||||
} from '@/chat/schemas/subscriber.schema';
|
||||
import { OutgoingMessage, StdEventType } from '@/chat/schemas/types/message';
|
||||
import { config } from '@/config';
|
||||
import { LoggerService } from '@/logger/logger.service';
|
||||
import { sessionStore } from '@/utils/constants/session-store';
|
||||
|
||||
import { IOIncomingMessage, IOMessagePipe } from './pipes/io-message.pipe';
|
||||
import { SocketEventDispatcherService } from './services/socket-event-dispatcher.service';
|
||||
import { Room } from './types';
|
||||
import { buildWebSocketGatewayOptions } from './utils/gateway-options';
|
||||
import { SocketRequest } from './utils/socket-request';
|
||||
import { SocketResponse } from './utils/socket-response';
|
||||
|
||||
@WebSocketGateway(buildWebSocketGatewayOptions())
|
||||
export class WebsocketGateway
|
||||
implements OnGatewayInit, OnGatewayConnection, OnGatewayDisconnect
|
||||
{
|
||||
constructor(
|
||||
private readonly logger: LoggerService,
|
||||
private readonly eventEmitter: EventEmitter2,
|
||||
private readonly socketEventDispatcherService: SocketEventDispatcherService,
|
||||
) {}
|
||||
|
||||
@WebSocketServer() io: Server;
|
||||
|
||||
broadcastMessageSent(message: OutgoingMessage): void {
|
||||
this.io.to(Room.MESSAGE).emit('message', {
|
||||
op: 'messageSent',
|
||||
speakerId: message.recipient,
|
||||
msg: message,
|
||||
});
|
||||
}
|
||||
|
||||
broadcastMessageReceived(
|
||||
message: MessageFull,
|
||||
subscriber: Subscriber | SubscriberFull,
|
||||
): void {
|
||||
this.io.to(Room.MESSAGE).emit('message', {
|
||||
op: 'messageReceived',
|
||||
speakerId: subscriber.id,
|
||||
msg: message,
|
||||
});
|
||||
}
|
||||
|
||||
broadcastMessageDelivered(
|
||||
deliveredMessages: string[],
|
||||
subscriber: Subscriber | SubscriberFull,
|
||||
): void {
|
||||
this.io.to(Room.MESSAGE).emit('message', {
|
||||
op: 'messageDelivered',
|
||||
speakerId: subscriber.id,
|
||||
mids: deliveredMessages,
|
||||
});
|
||||
}
|
||||
|
||||
broadcastMessageRead(
|
||||
watermark: number,
|
||||
subscriber: Subscriber | SubscriberFull,
|
||||
): void {
|
||||
this.io.to(Room.MESSAGE).emit('message', {
|
||||
op: 'messageRead',
|
||||
speakerId: subscriber.id,
|
||||
watermark,
|
||||
});
|
||||
}
|
||||
|
||||
broadcastSubscriberNew(subscriber: Subscriber | SubscriberFull) {
|
||||
this.io.to(Room.SUBSCRIBER).emit('subscriber', {
|
||||
op: 'newSubscriber',
|
||||
profile: subscriber,
|
||||
});
|
||||
}
|
||||
|
||||
broadcastSubscriberUpdate(subscriber: Subscriber | SubscriberFull): void {
|
||||
this.io.to(Room.SUBSCRIBER).emit('subscriber', {
|
||||
op: 'updateSubscriber',
|
||||
profile: subscriber,
|
||||
});
|
||||
}
|
||||
|
||||
broadcast(subscriber: Subscriber, type: StdEventType, content: any) {
|
||||
this.io.to(subscriber.foreign_id).emit(type, content);
|
||||
}
|
||||
|
||||
createAndStoreSession(client: Socket, next: (err?: Error) => void): void {
|
||||
const sid = uid(24); // Sign the sessionID before sending
|
||||
const signedSid = 's:' + signature.sign(sid, config.session.secret);
|
||||
// Send session ID to client to set cookie
|
||||
const cookies = cookie.serialize(
|
||||
config.session.name,
|
||||
signedSid,
|
||||
config.session.cookie,
|
||||
);
|
||||
const newSession: SessionData<SubscriberStub> = {
|
||||
cookie: {
|
||||
// Prevent access from client-side javascript
|
||||
httpOnly: true,
|
||||
|
||||
// Restrict to path
|
||||
path: '/',
|
||||
|
||||
originalMaxAge: config.session.cookie.maxAge,
|
||||
},
|
||||
passport: { user: {} },
|
||||
}; // Initialize your session object as needed
|
||||
sessionStore.set(sid, newSession, (err) => {
|
||||
if (err) {
|
||||
this.logger.error('Error saving session:', err);
|
||||
return next(new Error('Unable to establish a new socket session'));
|
||||
}
|
||||
|
||||
client.emit('set-cookie', cookies);
|
||||
// Optionally set the cookie on the client's handshake object if needed
|
||||
client.handshake.headers.cookie = cookies;
|
||||
client.data.session = newSession;
|
||||
this.logger.verbose(`
|
||||
Could not fetch session, since connecting socket has no cookie in its handshake.
|
||||
Generated a one-time-use cookie:
|
||||
${client.handshake.headers.cookie}
|
||||
and saved it on the socket handshake.
|
||||
|
||||
> This means the socket started off with an empty session, i.e. (req.session === {})
|
||||
> That "anonymous" session will only last until the socket is disconnected. To work around this,
|
||||
> make sure the socket sends a 'cookie' header or query param when it initially connects.
|
||||
> (This usually arises due to using a non-browser client such as a native iOS/Android app,
|
||||
> React Native, a Node.js script, or some other connected device. It can also arise when
|
||||
> attempting to connect a cross-origin socket in the browser, particularly for Safari users.
|
||||
> To work around this, either supply a cookie manually, or ignore this message and use an
|
||||
> approach other than sessions-- e.g. an auth token.)
|
||||
`);
|
||||
return next();
|
||||
});
|
||||
}
|
||||
|
||||
saveSession(client: Socket): void {
|
||||
const { sessionID, session } = client.data;
|
||||
if (!sessionID || !session) {
|
||||
this.logger.warn('No socket session found ...');
|
||||
return;
|
||||
}
|
||||
|
||||
// On disconnect we may want to update the session, but
|
||||
// it shouldn't save it if the user logged out (session destroyed)
|
||||
this.loadSession(sessionID, (err, oldSession) => {
|
||||
if (err || !oldSession) {
|
||||
this.logger.debug(
|
||||
'Unable to save websocket session, probably the user logged out ...',
|
||||
);
|
||||
return;
|
||||
}
|
||||
sessionStore.set(sessionID, session, (err) => {
|
||||
if (err) {
|
||||
this.logger.error(
|
||||
'Error saving session in `config.sockets.afterDisconnect`:',
|
||||
err,
|
||||
);
|
||||
throw err;
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
loadSession(
|
||||
sessionID: string,
|
||||
next: (err: Error, session: any) => void,
|
||||
): void {
|
||||
sessionStore.get(sessionID, (err, session) => {
|
||||
this.logger.verbose('Retrieved socket session', err || session);
|
||||
return next(err, session);
|
||||
});
|
||||
}
|
||||
|
||||
afterInit(): void {
|
||||
this.logger.log('Initialized websocket gateway');
|
||||
|
||||
// Handle session
|
||||
this.io.use((client, next) => {
|
||||
this.logger.verbose('Client connected, attempting to load session.');
|
||||
if (client.request.headers.cookie) {
|
||||
const cookies = cookie.parse(client.request.headers.cookie);
|
||||
if (cookies && config.session.name in cookies) {
|
||||
const sessionID = cookieParser.signedCookie(
|
||||
cookies[config.session.name],
|
||||
config.session.secret,
|
||||
);
|
||||
if (sessionID) {
|
||||
return this.loadSession(sessionID, (err, session) => {
|
||||
if (err) {
|
||||
this.logger.warn(
|
||||
'Unable to load session, creating a new one ...',
|
||||
err,
|
||||
);
|
||||
return this.createAndStoreSession(client, next);
|
||||
}
|
||||
client.data.session = session;
|
||||
client.data.sessionID = sessionID;
|
||||
next();
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return this.createAndStoreSession(client, next);
|
||||
});
|
||||
}
|
||||
|
||||
handleConnection(client: Socket, ..._args: any[]): void {
|
||||
const { sockets } = this.io.sockets;
|
||||
const handshake = client.handshake;
|
||||
const { channel } = handshake.query;
|
||||
this.logger.log(`Client id: ${client.id} connected`);
|
||||
this.logger.debug(`Number of connected clients: ${sockets?.size}`);
|
||||
|
||||
this.eventEmitter.emit(`hook:websocket:connection`, client);
|
||||
// @TODO : Revisit once we don't use anymore in frontend
|
||||
if (!channel) {
|
||||
const response = new SocketResponse();
|
||||
client.send(
|
||||
response
|
||||
.setHeaders({
|
||||
'access-control-allow-origin':
|
||||
config.security.cors.allowOrigins.join(','),
|
||||
vary: 'Origin',
|
||||
'access-control-allow-credentials':
|
||||
config.security.cors.allowCredentials.toString(),
|
||||
})
|
||||
.status(200)
|
||||
.json({
|
||||
success: true,
|
||||
}),
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
async handleDisconnect(client: Socket): Promise<void> {
|
||||
this.logger.log(`Client id:${client.id} disconnected`);
|
||||
// Configurable custom afterDisconnect logic here
|
||||
// (default: do nothing)
|
||||
if (!config.sockets.afterDisconnect) {
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
// Check if the afterDisconnect logic is an asynchronous function
|
||||
await config.sockets.afterDisconnect(client);
|
||||
|
||||
this.saveSession(client);
|
||||
} catch (e) {
|
||||
// Catch synchronous errors
|
||||
this.logger.error(
|
||||
'Error in `config.sockets.afterDisconnect` lifecycle callback:',
|
||||
e,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@SubscribeMessage('healthcheck')
|
||||
handleHealthCheck() {
|
||||
return { event: 'event', data: 'OK' };
|
||||
}
|
||||
|
||||
@SubscribeMessage('get')
|
||||
handleGet(
|
||||
@MessageBody(new IOMessagePipe()) payload: IOIncomingMessage,
|
||||
@ConnectedSocket() client: Socket,
|
||||
) {
|
||||
const request = new SocketRequest(client, 'get', payload);
|
||||
const response = new SocketResponse();
|
||||
this.socketEventDispatcherService.handleEvent(
|
||||
'get',
|
||||
payload.url,
|
||||
request,
|
||||
response,
|
||||
);
|
||||
return response.getPromise();
|
||||
}
|
||||
|
||||
@SubscribeMessage('post')
|
||||
handlePost(
|
||||
@MessageBody(new IOMessagePipe()) payload: IOIncomingMessage,
|
||||
@ConnectedSocket() client: Socket,
|
||||
) {
|
||||
const request = new SocketRequest(client, 'post', payload);
|
||||
const response = new SocketResponse();
|
||||
this.socketEventDispatcherService.handleEvent(
|
||||
'post',
|
||||
payload.url,
|
||||
request,
|
||||
response,
|
||||
);
|
||||
return response.getPromise();
|
||||
}
|
||||
|
||||
@SubscribeMessage('put')
|
||||
handlePut(
|
||||
@MessageBody(new IOMessagePipe()) payload: IOIncomingMessage,
|
||||
@ConnectedSocket() client: Socket,
|
||||
) {
|
||||
const request = new SocketRequest(client, 'put', payload);
|
||||
const response = new SocketResponse();
|
||||
this.socketEventDispatcherService.handleEvent(
|
||||
'put',
|
||||
payload.url,
|
||||
request,
|
||||
response,
|
||||
);
|
||||
return response.getPromise();
|
||||
}
|
||||
|
||||
@SubscribeMessage('patch')
|
||||
handlePatch(
|
||||
@MessageBody(new IOMessagePipe()) payload: IOIncomingMessage,
|
||||
@ConnectedSocket() client: Socket,
|
||||
) {
|
||||
const request = new SocketRequest(client, 'patch', payload);
|
||||
const response = new SocketResponse();
|
||||
this.socketEventDispatcherService.handleEvent(
|
||||
'patch',
|
||||
payload.url,
|
||||
request,
|
||||
response,
|
||||
);
|
||||
return response.getPromise();
|
||||
}
|
||||
|
||||
@SubscribeMessage('delete')
|
||||
handleDelete(
|
||||
@MessageBody(new IOMessagePipe()) payload: IOIncomingMessage,
|
||||
@ConnectedSocket() client: Socket,
|
||||
) {
|
||||
const request = new SocketRequest(client, 'delete', payload);
|
||||
const response = new SocketResponse();
|
||||
this.socketEventDispatcherService.handleEvent(
|
||||
'delete',
|
||||
payload.url,
|
||||
request,
|
||||
response,
|
||||
);
|
||||
return response.getPromise();
|
||||
}
|
||||
|
||||
@SubscribeMessage('options')
|
||||
handleOptions(
|
||||
@MessageBody(new IOMessagePipe()) payload: IOIncomingMessage,
|
||||
@ConnectedSocket() client: Socket,
|
||||
) {
|
||||
const request = new SocketRequest(client, 'options', payload);
|
||||
const response = new SocketResponse();
|
||||
this.socketEventDispatcherService.handleEvent(
|
||||
'options',
|
||||
payload.url,
|
||||
request,
|
||||
response,
|
||||
);
|
||||
return response.getPromise();
|
||||
}
|
||||
|
||||
@SubscribeMessage('head')
|
||||
handleHead(
|
||||
@MessageBody(new IOMessagePipe()) payload: IOIncomingMessage,
|
||||
@ConnectedSocket() client: Socket,
|
||||
) {
|
||||
const request = new SocketRequest(client, 'head', payload);
|
||||
const response = new SocketResponse();
|
||||
this.socketEventDispatcherService.handleEvent(
|
||||
'head',
|
||||
payload.url,
|
||||
request,
|
||||
response,
|
||||
);
|
||||
return response.getPromise();
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user