diff --git a/api/src/chat/services/block.service.ts b/api/src/chat/services/block.service.ts index 06dfb3c9..77136830 100644 --- a/api/src/chat/services/block.service.ts +++ b/api/src/chat/services/block.service.ts @@ -6,7 +6,11 @@ * 2. All derivative works must include clear attribution to the original creator and software, Hexastack and Hexabot, in a prominent location (e.g., in the software's "About" section, documentation, and README file). */ -import { Injectable } from '@nestjs/common'; +import { + Injectable, + InternalServerErrorException, + Optional, +} from '@nestjs/common'; import { OnEvent } from '@nestjs/event-emitter'; import EventWrapper from '@/channel/lib/EventWrapper'; @@ -21,6 +25,15 @@ import { PluginType } from '@/plugins/types'; import { SettingService } from '@/setting/services/setting.service'; import { BaseService } from '@/utils/generics/base-service'; import { getRandomElement } from '@/utils/helpers/safeRandom'; +import { + SocketGet, + SocketPost, +} from '@/websocket/decorators/socket-method.decorator'; +import { SocketReq } from '@/websocket/decorators/socket-req.decorator'; +import { SocketRes } from '@/websocket/decorators/socket-res.decorator'; +import { SocketRequest } from '@/websocket/utils/socket-request'; +import { SocketResponse } from '@/websocket/utils/socket-response'; +import { WebsocketGateway } from '@/websocket/websocket.gateway'; import { BlockDto } from '../dto/block.dto'; import { EnvelopeFactory } from '../helpers/envelope-factory'; @@ -46,6 +59,8 @@ export class BlockService extends BaseService< BlockFull, BlockDto > { + private readonly gateway: WebsocketGateway; + constructor( readonly repository: BlockRepository, private readonly contentService: ContentService, @@ -53,8 +68,31 @@ export class BlockService extends BaseService< private readonly pluginService: PluginService, protected readonly i18n: I18nService, protected readonly languageService: LanguageService, + @Optional() gateway?: WebsocketGateway, ) { super(repository); + if (gateway) { + this.gateway = gateway; + } + } + + @SocketGet('/block/subscribe/') + @SocketPost('/block/subscribe/') + subscribe(@SocketReq() req: SocketRequest, @SocketRes() res: SocketResponse) { + debugger; + try { + if (req.session.web?.profile?.id) { + this.gateway.io.socketsJoin(`blocks:${req.session.web.profile.id}`); + return res.status(200).json({ + success: true, + }); + } else { + throw new Error('Unable to join highlight blocks room'); + } + } catch (e) { + this.logger.error('Websocket subscription', e); + throw new InternalServerErrorException(e); + } } /** @@ -577,7 +615,7 @@ export class BlockService extends BaseService< this.logger.log('triggered: hook:highlight:error'); this.eventEmitter.emit('hook:highlight:error', { flowId, - userId: recipient.foreign_id, + userId: recipient.id, blockId: block.id, }); } else { @@ -641,7 +679,7 @@ export class BlockService extends BaseService< this.logger.log('triggered: hook:highlight:error'); this.eventEmitter.emit('hook:highlight:error', { flowId, - userId: recipient.foreign_id, + userId: recipient.id, blockId: block.id, }); } else { @@ -678,7 +716,7 @@ export class BlockService extends BaseService< this.logger.log('triggered: hook:highlight:error'); this.eventEmitter.emit('hook:highlight:error', { flowId, - userId: recipient.foreign_id, + userId: recipient.id, blockId: block.id, }); } else { diff --git a/api/src/chat/services/bot.service.ts b/api/src/chat/services/bot.service.ts index 392a074e..8acbe11f 100644 --- a/api/src/chat/services/bot.service.ts +++ b/api/src/chat/services/bot.service.ts @@ -154,7 +154,7 @@ export class BotService { this.eventEmitter.emit('hook:highlight:block', { flowId: block.category!.id, blockId: block.id, - userId: recipient.foreign_id, + userId: recipient.id, }); if (envelope.format !== OutgoingMessageFormat.system) { await this.sendMessageToSubscriber( @@ -330,7 +330,7 @@ export class BotService { if (next && next.id !== fallbackBlock?.id) { this.eventEmitter.emit('hook:highlight:error', { flowId: matchedBlock!.category!.id, - userId: convo.sender.foreign_id, + userId: convo.sender.id, blockId: next.id!, }); } diff --git a/api/src/websocket/websocket.gateway.ts b/api/src/websocket/websocket.gateway.ts index 9141e441..51b61fae 100644 --- a/api/src/websocket/websocket.gateway.ts +++ b/api/src/websocket/websocket.gateway.ts @@ -413,12 +413,7 @@ export class WebsocketGateway async handleHighlightBlock( payload: IHookOperationMap['highlight']['operations']['block'], ) { - this.logger.log( - 'broadcasting event highlight:flow through socketio ', - payload, - ); - // todo: fix emit event to subscriber - this.io.emit('highlight:flow', payload); + this.io.to(`blocks:${payload.userId}`).emit('highlight:block', payload); } @OnEvent('hook:highlight:error') @@ -426,7 +421,6 @@ export class WebsocketGateway payload: IHookOperationMap['highlight']['operations']['error'], ) { this.logger.warn('hook:highlight:error ', payload); - // todo: fix emit event to subscriber - this.io.emit('highlight:error', payload); + this.io.to(`blocks:${payload.userId}`).emit('highlight:error', payload); } } diff --git a/frontend/src/components/visual-editor/v2/Diagrams.tsx b/frontend/src/components/visual-editor/v2/Diagrams.tsx index e0588e49..49b8fdb6 100644 --- a/frontend/src/components/visual-editor/v2/Diagrams.tsx +++ b/frontend/src/components/visual-editor/v2/Diagrams.tsx @@ -52,6 +52,7 @@ import { useTranslate } from "@/hooks/useTranslate"; import { EntityType, Format, QueryType, RouterType } from "@/services/types"; import { IBlock } from "@/types/block.types"; import { BlockPorts } from "@/types/visual-editor.types"; +import { useSocketGetQuery } from "@/websocket/socket-hooks"; import { BlockEditFormDialog } from "../BlockEditFormDialog"; import { ZOOM_LEVEL } from "../constants"; @@ -161,6 +162,9 @@ const Diagrams = () => { const getBlockFromCache = useGetFromCache(EntityType.BLOCK); const updateCachedBlock = useUpdateCache(EntityType.BLOCK); const deleteCachedBlock = useDeleteFromCache(EntityType.BLOCK); + + useSocketGetQuery("/block/subscribe/"); + const onCategoryChange = (targetCategory: number) => { if (categories) { const { id } = categories[targetCategory]; diff --git a/frontend/src/websocket/socket-hooks.tsx b/frontend/src/websocket/socket-hooks.tsx index 1ea78df4..b56a453f 100644 --- a/frontend/src/websocket/socket-hooks.tsx +++ b/frontend/src/websocket/socket-hooks.tsx @@ -41,8 +41,10 @@ export const SocketProvider = (props: PropsWithChildren) => { const [connected, setConnected] = useState(false); const { toast } = useToast(); const { user } = useAuth(); - // todo: fix we aren't sending auth token - const socket = useMemo(() => new SocketIoClient(apiUrl), [apiUrl]); + const socket = useMemo( + () => new SocketIoClient(apiUrl, { auth: user }), + [apiUrl], + ); useEffect(() => { if (user && apiUrl)