feat: broadcast to a specific room instead of broadcasting

This commit is contained in:
abdou6666
2025-04-07 18:43:28 +01:00
parent 6e16308db1
commit 60e72dbf0e
5 changed files with 54 additions and 16 deletions

View File

@@ -6,7 +6,11 @@
* 2. All derivative works must include clear attribution to the original creator and software, Hexastack and Hexabot, in a prominent location (e.g., in the software's "About" section, documentation, and README file).
*/
import { Injectable } from '@nestjs/common';
import {
Injectable,
InternalServerErrorException,
Optional,
} from '@nestjs/common';
import { OnEvent } from '@nestjs/event-emitter';
import EventWrapper from '@/channel/lib/EventWrapper';
@@ -21,6 +25,15 @@ import { PluginType } from '@/plugins/types';
import { SettingService } from '@/setting/services/setting.service';
import { BaseService } from '@/utils/generics/base-service';
import { getRandomElement } from '@/utils/helpers/safeRandom';
import {
SocketGet,
SocketPost,
} from '@/websocket/decorators/socket-method.decorator';
import { SocketReq } from '@/websocket/decorators/socket-req.decorator';
import { SocketRes } from '@/websocket/decorators/socket-res.decorator';
import { SocketRequest } from '@/websocket/utils/socket-request';
import { SocketResponse } from '@/websocket/utils/socket-response';
import { WebsocketGateway } from '@/websocket/websocket.gateway';
import { BlockDto } from '../dto/block.dto';
import { EnvelopeFactory } from '../helpers/envelope-factory';
@@ -46,6 +59,8 @@ export class BlockService extends BaseService<
BlockFull,
BlockDto
> {
private readonly gateway: WebsocketGateway;
constructor(
readonly repository: BlockRepository,
private readonly contentService: ContentService,
@@ -53,8 +68,31 @@ export class BlockService extends BaseService<
private readonly pluginService: PluginService,
protected readonly i18n: I18nService,
protected readonly languageService: LanguageService,
@Optional() gateway?: WebsocketGateway,
) {
super(repository);
if (gateway) {
this.gateway = gateway;
}
}
@SocketGet('/block/subscribe/')
@SocketPost('/block/subscribe/')
subscribe(@SocketReq() req: SocketRequest, @SocketRes() res: SocketResponse) {
debugger;
try {
if (req.session.web?.profile?.id) {
this.gateway.io.socketsJoin(`blocks:${req.session.web.profile.id}`);
return res.status(200).json({
success: true,
});
} else {
throw new Error('Unable to join highlight blocks room');
}
} catch (e) {
this.logger.error('Websocket subscription', e);
throw new InternalServerErrorException(e);
}
}
/**
@@ -577,7 +615,7 @@ export class BlockService extends BaseService<
this.logger.log('triggered: hook:highlight:error');
this.eventEmitter.emit('hook:highlight:error', {
flowId,
userId: recipient.foreign_id,
userId: recipient.id,
blockId: block.id,
});
} else {
@@ -641,7 +679,7 @@ export class BlockService extends BaseService<
this.logger.log('triggered: hook:highlight:error');
this.eventEmitter.emit('hook:highlight:error', {
flowId,
userId: recipient.foreign_id,
userId: recipient.id,
blockId: block.id,
});
} else {
@@ -678,7 +716,7 @@ export class BlockService extends BaseService<
this.logger.log('triggered: hook:highlight:error');
this.eventEmitter.emit('hook:highlight:error', {
flowId,
userId: recipient.foreign_id,
userId: recipient.id,
blockId: block.id,
});
} else {

View File

@@ -154,7 +154,7 @@ export class BotService {
this.eventEmitter.emit('hook:highlight:block', {
flowId: block.category!.id,
blockId: block.id,
userId: recipient.foreign_id,
userId: recipient.id,
});
if (envelope.format !== OutgoingMessageFormat.system) {
await this.sendMessageToSubscriber(
@@ -330,7 +330,7 @@ export class BotService {
if (next && next.id !== fallbackBlock?.id) {
this.eventEmitter.emit('hook:highlight:error', {
flowId: matchedBlock!.category!.id,
userId: convo.sender.foreign_id,
userId: convo.sender.id,
blockId: next.id!,
});
}

View File

@@ -413,12 +413,7 @@ export class WebsocketGateway
async handleHighlightBlock(
payload: IHookOperationMap['highlight']['operations']['block'],
) {
this.logger.log(
'broadcasting event highlight:flow through socketio ',
payload,
);
// todo: fix emit event to subscriber
this.io.emit('highlight:flow', payload);
this.io.to(`blocks:${payload.userId}`).emit('highlight:block', payload);
}
@OnEvent('hook:highlight:error')
@@ -426,7 +421,6 @@ export class WebsocketGateway
payload: IHookOperationMap['highlight']['operations']['error'],
) {
this.logger.warn('hook:highlight:error ', payload);
// todo: fix emit event to subscriber
this.io.emit('highlight:error', payload);
this.io.to(`blocks:${payload.userId}`).emit('highlight:error', payload);
}
}

View File

@@ -52,6 +52,7 @@ import { useTranslate } from "@/hooks/useTranslate";
import { EntityType, Format, QueryType, RouterType } from "@/services/types";
import { IBlock } from "@/types/block.types";
import { BlockPorts } from "@/types/visual-editor.types";
import { useSocketGetQuery } from "@/websocket/socket-hooks";
import { BlockEditFormDialog } from "../BlockEditFormDialog";
import { ZOOM_LEVEL } from "../constants";
@@ -161,6 +162,9 @@ const Diagrams = () => {
const getBlockFromCache = useGetFromCache(EntityType.BLOCK);
const updateCachedBlock = useUpdateCache(EntityType.BLOCK);
const deleteCachedBlock = useDeleteFromCache(EntityType.BLOCK);
useSocketGetQuery("/block/subscribe/");
const onCategoryChange = (targetCategory: number) => {
if (categories) {
const { id } = categories[targetCategory];

View File

@@ -41,8 +41,10 @@ export const SocketProvider = (props: PropsWithChildren) => {
const [connected, setConnected] = useState(false);
const { toast } = useToast();
const { user } = useAuth();
// todo: fix we aren't sending auth token
const socket = useMemo(() => new SocketIoClient(apiUrl), [apiUrl]);
const socket = useMemo(
() => new SocketIoClient(apiUrl, { auth: user }),
[apiUrl],
);
useEffect(() => {
if (user && apiUrl)