Merge pull request #1117 from Hexastack/refactor/handle-ongoing-convo-msg

Refactor/handle ongoing convo msg
This commit is contained in:
Med Marrouchi 2025-06-12 11:23:57 +01:00 committed by GitHub
commit 4e1d461ddb
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 436 additions and 181 deletions

View File

@ -51,6 +51,8 @@ import { SettingService } from '@/setting/services/setting.service';
import { installBlockFixtures } from '@/utils/test/fixtures/block';
import { installContentFixtures } from '@/utils/test/fixtures/content';
import { installSubscriberFixtures } from '@/utils/test/fixtures/subscriber';
import { mockWebChannelData, textBlock } from '@/utils/test/mocks/block';
import { conversationGetStarted } from '@/utils/test/mocks/conversation';
import {
closeInMongodConnection,
rootMongooseTestModule,
@ -88,6 +90,7 @@ import { SubscriberService } from './subscriber.service';
describe('BotService', () => {
let blockService: BlockService;
let subscriberService: SubscriberService;
let conversationService: ConversationService;
let botService: BotService;
let handler: WebChannelHandler;
let eventEmitter: EventEmitter2;
@ -192,163 +195,379 @@ describe('BotService', () => {
},
],
});
[subscriberService, botService, blockService, eventEmitter, handler] =
await getMocks([
SubscriberService,
BotService,
BlockService,
EventEmitter2,
WebChannelHandler,
]);
[
subscriberService,
conversationService,
botService,
blockService,
eventEmitter,
handler,
] = await getMocks([
SubscriberService,
ConversationService,
BotService,
BlockService,
EventEmitter2,
WebChannelHandler,
]);
});
afterEach(jest.clearAllMocks);
afterAll(closeInMongodConnection);
it('should start a conversation', async () => {
const triggeredEvents: any[] = [];
eventEmitter.on('hook:stats:entry', (...args) => {
triggeredEvents.push(args);
describe('startConversation', () => {
afterAll(() => {
jest.restoreAllMocks();
});
const event = new WebEventWrapper(handler, webEventText, {
isSocket: false,
ipAddress: '1.1.1.1',
agent: 'Chromium',
});
it('should start a conversation', async () => {
const triggeredEvents: any[] = [];
const [block] = await blockService.findAndPopulate({ patterns: ['Hi'] });
const webSubscriber = (await subscriberService.findOne({
foreign_id: 'foreign-id-web-1',
}))!;
eventEmitter.on('hook:stats:entry', (...args) => {
triggeredEvents.push(args);
});
event.setSender(webSubscriber);
let hasBotSpoken = false;
const clearMock = jest
.spyOn(botService, 'triggerBlock')
.mockImplementation(
(
actualEvent: WebEventWrapper<typeof WEB_CHANNEL_NAME>,
actualConversation: Conversation,
actualBlock: BlockFull,
isFallback: boolean,
) => {
expect(actualConversation).toEqualPayload({
sender: webSubscriber.id,
active: true,
next: [],
context: {
user: {
first_name: webSubscriber.first_name,
last_name: webSubscriber.last_name,
language: 'en',
id: webSubscriber.id,
},
user_location: {
lat: 0,
lon: 0,
},
skip: {},
vars: {},
nlp: null,
payload: null,
attempt: 0,
channel: 'web-channel',
text: webEventText.data.text,
},
});
expect(actualEvent).toEqual(event);
expect(actualBlock).toEqual(block);
expect(isFallback).toEqual(false);
hasBotSpoken = true;
},
const event = new WebEventWrapper(
handler,
webEventText,
mockWebChannelData,
);
await botService.startConversation(event, block);
expect(hasBotSpoken).toEqual(true);
expect(triggeredEvents).toEqual([
['popular', 'hasNextBlocks'],
['new_conversations', 'New conversations'],
]);
clearMock.mockClear();
});
const [block] = await blockService.findAndPopulate({ patterns: ['Hi'] });
const webSubscriber = (await subscriberService.findOne({
foreign_id: 'foreign-id-web-1',
}))!;
it('should capture a conversation', async () => {
const triggeredEvents: any[] = [];
event.setSender(webSubscriber);
eventEmitter.on('hook:stats:entry', (...args) => {
triggeredEvents.push(args);
});
const event = new WebEventWrapper(handler, webEventText, {
isSocket: false,
ipAddress: '1.1.1.1',
agent: 'Chromium',
});
const webSubscriber = (await subscriberService.findOne({
foreign_id: 'foreign-id-web-1',
}))!;
event.setSender(webSubscriber);
const clearMock = jest
.spyOn(botService, 'handleOngoingConversationMessage')
.mockImplementation(
async (
actualConversation: ConversationFull,
event: WebEventWrapper<typeof WEB_CHANNEL_NAME>,
) => {
expect(actualConversation).toEqualPayload({
next: [],
sender: webSubscriber,
active: true,
context: {
user: {
first_name: webSubscriber.first_name,
last_name: webSubscriber.last_name,
language: 'en',
id: webSubscriber.id,
let hasBotSpoken = false;
const clearMock = jest
.spyOn(botService, 'triggerBlock')
.mockImplementation(
(
actualEvent: WebEventWrapper<typeof WEB_CHANNEL_NAME>,
actualConversation: Conversation,
actualBlock: BlockFull,
isFallback: boolean,
) => {
expect(actualConversation).toEqualPayload({
sender: webSubscriber.id,
active: true,
next: [],
context: {
user: {
first_name: webSubscriber.first_name,
last_name: webSubscriber.last_name,
language: 'en',
id: webSubscriber.id,
},
user_location: {
lat: 0,
lon: 0,
},
skip: {},
vars: {},
nlp: null,
payload: null,
attempt: 0,
channel: 'web-channel',
text: webEventText.data.text,
},
user_location: { lat: 0, lon: 0 },
vars: {},
skip: {},
nlp: null,
payload: null,
attempt: 0,
channel: 'web-channel',
text: webEventText.data.text,
},
});
expect(event).toEqual(event);
return true;
},
);
const captured = await botService.processConversationMessage(event);
expect(captured).toBe(true);
expect(triggeredEvents).toEqual([
['existing_conversations', 'Existing conversations'],
]);
clearMock.mockClear();
});
expect(actualEvent).toEqual(event);
expect(actualBlock).toEqual(block);
expect(isFallback).toEqual(false);
hasBotSpoken = true;
},
);
await botService.startConversation(event, block);
expect(hasBotSpoken).toEqual(true);
expect(triggeredEvents).toEqual([
['popular', 'hasNextBlocks'],
['new_conversations', 'New conversations'],
]);
clearMock.mockClear();
});
});
it('has no active conversation', async () => {
const triggeredEvents: any[] = [];
eventEmitter.on('hook:stats:entry', (...args) => {
triggeredEvents.push(args);
describe('processConversationMessage', () => {
afterAll(() => {
jest.restoreAllMocks();
});
const event = new WebEventWrapper(handler, webEventText, {
isSocket: false,
ipAddress: '1.1.1.1',
agent: 'Chromium',
});
const webSubscriber = (await subscriberService.findOne({
foreign_id: 'foreign-id-web-2',
}))!;
event.setSender(webSubscriber);
const captured = await botService.processConversationMessage(event);
expect(captured).toBe(false);
expect(triggeredEvents).toEqual([]);
it('has no active conversation', async () => {
const triggeredEvents: any[] = [];
eventEmitter.on('hook:stats:entry', (...args) => {
triggeredEvents.push(args);
});
const event = new WebEventWrapper(
handler,
webEventText,
mockWebChannelData,
);
const webSubscriber = (await subscriberService.findOne({
foreign_id: 'foreign-id-web-2',
}))!;
event.setSender(webSubscriber);
const captured = await botService.processConversationMessage(event);
expect(captured).toBe(false);
expect(triggeredEvents).toEqual([]);
});
it('should capture a conversation', async () => {
const triggeredEvents: any[] = [];
eventEmitter.on('hook:stats:entry', (...args) => {
triggeredEvents.push(args);
});
const event = new WebEventWrapper(
handler,
webEventText,
mockWebChannelData,
);
const webSubscriber = (await subscriberService.findOne({
foreign_id: 'foreign-id-web-1',
}))!;
event.setSender(webSubscriber);
jest
.spyOn(botService, 'handleOngoingConversationMessage')
.mockImplementation(() => Promise.resolve(true));
const captured = await botService.processConversationMessage(event);
expect(captured).toBe(true);
expect(triggeredEvents).toEqual([
['existing_conversations', 'Existing conversations'],
]);
});
});
describe('proceedToNextBlock', () => {
const mockEvent = new WebEventWrapper(
handler,
webEventText,
mockWebChannelData,
);
afterAll(() => {
jest.restoreAllMocks();
});
it('should emit stats and call triggerBlock, returning true on success and reset attempt if not fallback', async () => {
const mockConvo = {
...conversationGetStarted,
id: 'convo1',
context: { attempt: 2 },
next: [],
sender: 'user1',
active: true,
} as unknown as ConversationFull;
const next = { id: 'block1', name: 'Block 1' } as BlockFull;
const fallback = false;
jest
.spyOn(conversationService, 'storeContextData')
.mockImplementation(() => {
return Promise.resolve(mockConvo as unknown as Conversation);
});
jest.spyOn(botService, 'triggerBlock').mockResolvedValue(undefined);
const emitSpy = jest.spyOn(eventEmitter, 'emit');
const result = await botService.proceedToNextBlock(
mockConvo,
next,
mockEvent,
fallback,
);
expect(emitSpy).toHaveBeenCalledWith(
'hook:stats:entry',
'popular',
next.name,
);
expect(botService.triggerBlock).toHaveBeenCalledWith(
mockEvent,
expect.objectContaining({ id: 'convo1' }),
next,
fallback,
);
expect(result).toBe(true);
expect(mockConvo.context.attempt).toBe(0);
});
it('should increment attempt if fallback is true', async () => {
const mockConvo = {
...conversationGetStarted,
id: 'convo2',
context: { attempt: 1 },
next: [],
sender: 'user2',
active: true,
} as unknown as ConversationFull;
const next = { id: 'block2', name: 'Block 2' } as any;
const fallback = true;
const result = await botService.proceedToNextBlock(
mockConvo,
next,
mockEvent,
fallback,
);
expect(mockConvo.context.attempt).toBe(2);
expect(result).toBe(true);
});
it('should handle errors and emit conversation:end, returning false', async () => {
const mockConvo = {
...conversationGetStarted,
id: 'convo3',
context: { attempt: 1 },
next: [],
sender: 'user3',
active: true,
} as unknown as ConversationFull;
const next = { id: 'block3', name: 'Block 3' } as any;
const fallback = false;
jest
.spyOn(conversationService, 'storeContextData')
.mockRejectedValue(new Error('fail'));
const emitSpy = jest.spyOn(eventEmitter, 'emit');
const result = await botService.proceedToNextBlock(
mockConvo,
next,
mockEvent,
fallback,
);
expect(emitSpy).toHaveBeenCalledWith('hook:conversation:end', mockConvo);
expect(result).toBe(false);
});
});
describe('handleOngoingConversationMessage', () => {
const mockConvo = {
...conversationGetStarted,
id: 'convo1',
context: { ...conversationGetStarted.context, attempt: 0 },
next: [{ id: 'block1' }],
current: {
...conversationGetStarted.current,
id: 'block0',
options: {
...conversationGetStarted.current.options,
fallback: {
active: true,
max_attempts: 2,
message: [],
},
},
},
} as unknown as ConversationFull;
const mockEvent = new WebEventWrapper(
handler,
webEventText,
mockWebChannelData,
);
beforeAll(() => {
jest.clearAllMocks();
});
afterAll(() => {
jest.clearAllMocks();
});
it('should proceed to the matched next block', async () => {
const matchedBlock = {
...textBlock,
id: 'block1',
name: 'Block 1',
} as BlockFull;
jest
.spyOn(blockService, 'findAndPopulate')
.mockResolvedValue([matchedBlock]);
jest.spyOn(blockService, 'match').mockResolvedValue(matchedBlock);
jest.spyOn(botService, 'proceedToNextBlock').mockResolvedValue(true);
const result = await botService.handleOngoingConversationMessage(
mockConvo,
mockEvent,
);
expect(blockService.findAndPopulate).toHaveBeenCalled();
expect(blockService.match).toHaveBeenCalled();
expect(botService.proceedToNextBlock).toHaveBeenCalled();
expect(result).toBe(true);
});
it('should proceed to fallback block if no match and fallback is allowed', async () => {
jest.spyOn(blockService, 'findAndPopulate').mockResolvedValue([]);
jest.spyOn(blockService, 'match').mockResolvedValue(undefined);
const proceedSpy = jest
.spyOn(botService, 'proceedToNextBlock')
.mockResolvedValue(true);
const result = await botService.handleOngoingConversationMessage(
mockConvo,
mockEvent,
);
expect(proceedSpy).toHaveBeenCalledWith(
mockConvo,
expect.objectContaining({ id: 'block0', nextBlocks: mockConvo.next }),
mockEvent,
true,
);
expect(result).toBe(true);
});
it('should end conversation and return false if no match and fallback not allowed', async () => {
const mockConvoWithoutFallback = {
...mockConvo,
current: {
...mockConvo.current,
options: {
...mockConvo.current.options,
fallback: {
active: false,
max_attempts: 2,
message: [],
},
},
},
} as unknown as ConversationFull;
jest.spyOn(blockService, 'findAndPopulate').mockResolvedValue([]);
jest.spyOn(blockService, 'match').mockResolvedValue(undefined);
const emitSpy = jest.spyOn(eventEmitter, 'emit');
const result = await botService.handleOngoingConversationMessage(
mockConvoWithoutFallback,
mockEvent,
);
expect(emitSpy).toHaveBeenCalledWith(
'hook:conversation:end',
mockConvoWithoutFallback,
);
expect(result).toBe(false);
});
it('should end conversation and throw if an error occurs', async () => {
jest
.spyOn(blockService, 'findAndPopulate')
.mockRejectedValue(new Error('fail'));
const emitSpy = jest.spyOn(eventEmitter, 'emit');
await expect(
botService.handleOngoingConversationMessage(mockConvo, mockEvent),
).rejects.toThrow('fail');
expect(emitSpy).toHaveBeenCalledWith('hook:conversation:end', mockConvo);
});
});
});

View File

@ -27,6 +27,7 @@ import {
OutgoingMessageFormat,
StdOutgoingMessageEnvelope,
} from '../schemas/types/message';
import { BlockOptions } from '../schemas/types/options';
import { BlockService } from './block.service';
import { ConversationService } from './conversation.service';
@ -243,6 +244,70 @@ export class BotService {
}
}
/**
* Handles advancing the conversation to the specified *next* block.
*
* 1. Updates popular blocks stats.
* 2. Persists the updated conversation context.
* 3. Triggers the next block.
* 4. Ends the conversation if an unrecoverable error occurs.
*/
async proceedToNextBlock(
convo: ConversationFull,
next: BlockFull,
event: EventWrapper<any, any>,
fallback: boolean,
): Promise<boolean> {
// Increment stats about popular blocks
this.eventEmitter.emit('hook:stats:entry', BotStatsType.popular, next.name);
this.logger.debug(
'Proceeding to next block ',
next.id,
' for conversation ',
convo.id,
);
try {
convo.context.attempt = fallback ? convo.context.attempt + 1 : 0;
const updatedConversation =
await this.conversationService.storeContextData(
convo,
next,
event,
// If this is a local fallback then we dont capture vars.
!fallback,
);
await this.triggerBlock(event, updatedConversation, next, fallback);
return true;
} catch (err) {
this.logger.error('Unable to proceed to the next block!', err);
this.eventEmitter.emit('hook:conversation:end', convo);
return false;
}
}
/**
* Determines if a fallback should be attempted based on the event type, fallback options, and conversation context.
*
* @param event - The incoming event that triggered the conversation flow.
* @param fallbackOptions - The options for fallback behavior defined in the block.
* @param convo - The current conversation object containing context and state.
*
* @returns A boolean indicating whether a fallback should be attempted.
*/
private shouldAttemptLocalFallback(
event: EventWrapper<any, any>,
fallbackOptions: BlockOptions['fallback'],
convo: ConversationFull,
): boolean {
return (
event.getMessageType() === IncomingMessageType.message &&
!!fallbackOptions?.active &&
convo.context.attempt < (fallbackOptions?.max_attempts ?? 0)
);
}
/**
* Processes and responds to an incoming message within an ongoing conversation flow.
* Determines the next block in the conversation, attempts to match the message with available blocks,
@ -257,25 +322,25 @@ export class BotService {
convo: ConversationFull,
event: EventWrapper<any, any>,
) {
const nextIds = convo.next.map(({ id }) => id);
// Reload blocks in order to populate his nextBlocks
// nextBlocks & trigger/assign _labels
try {
const nextBlocks = await this.blockService.findAndPopulate({
_id: { $in: nextIds },
});
let fallback = false;
const fallbackOptions = convo.current?.options?.fallback
const currentBlock = convo.current;
const fallbackOptions: BlockOptions['fallback'] = convo.current?.options
?.fallback
? convo.current.options.fallback
: {
active: false,
max_attempts: 0,
message: [],
};
// We will avoid having multiple matches when we are not at the start of a conversation
// and only if local fallback is enabled
const canHaveMultipleMatches = !fallbackOptions.active;
// Find the next block that matches
const nextBlocks = await this.blockService.findAndPopulate({
_id: { $in: convo.next.map(({ id }) => id) },
});
const matchedBlock = await this.blockService.match(
nextBlocks,
event,
@ -283,16 +348,13 @@ export class BotService {
);
// If there is no match in next block then loopback (current fallback)
// This applies only to text messages + there's a max attempt to be specified
let fallbackBlock: BlockFull | undefined;
let fallbackBlock: BlockFull | undefined = undefined;
if (
!matchedBlock &&
event.getMessageType() === IncomingMessageType.message &&
fallbackOptions.active &&
convo.context.attempt < fallbackOptions.max_attempts
this.shouldAttemptLocalFallback(event, fallbackOptions, convo)
) {
// Trigger block fallback
// NOTE : current is not populated, this may cause some anomaly
const currentBlock = convo.current;
fallbackBlock = {
...currentBlock,
nextBlocks: convo.next,
@ -303,11 +365,7 @@ export class BotService {
category: null,
previousBlocks: [],
};
convo.context.attempt++;
fallback = true;
} else {
convo.context.attempt = 0;
fallbackBlock = undefined;
}
const next = matchedBlock || fallbackBlock;
@ -315,30 +373,8 @@ export class BotService {
this.logger.debug('Responding ...', convo.id);
if (next) {
// Increment stats about popular blocks
this.eventEmitter.emit(
'hook:stats:entry',
BotStatsType.popular,
next.name,
);
// Go next!
this.logger.debug('Respond to nested conversion! Go next ', next.id);
try {
const updatedConversation =
await this.conversationService.storeContextData(
convo,
next,
event,
// If this is a local fallback then we don't capture vars
// Otherwise, old captured const value may be replaced by another const value
!fallback,
);
await this.triggerBlock(event, updatedConversation, next, fallback);
} catch (err) {
this.logger.error('Unable to store context data!', err);
return this.eventEmitter.emit('hook:conversation:end', convo);
}
return true;
// Proceed to the execution of the next block
return await this.proceedToNextBlock(convo, next, event, fallback);
} else {
// Conversation is still active, but there's no matching block to call next
// We'll end the conversation but this message is probably lost in time and space.