Merge pull request #1125 from Hexastack/feat/flow-escape-mechanism

Feat/flow escape mechanism
This commit is contained in:
Med Marrouchi 2025-06-18 15:44:24 +01:00 committed by GitHub
commit 4cae55405c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 113 additions and 54 deletions

View File

@ -217,7 +217,7 @@ describe('BotService', () => {
]); ]);
}); });
afterEach(jest.clearAllMocks); afterEach(jest.resetAllMocks);
afterAll(closeInMongodConnection); afterAll(closeInMongodConnection);
describe('startConversation', () => { describe('startConversation', () => {
afterAll(() => { afterAll(() => {

View File

@ -11,6 +11,8 @@ import { EventEmitter2 } from '@nestjs/event-emitter';
import { BotStatsType } from '@/analytics/schemas/bot-stats.schema'; import { BotStatsType } from '@/analytics/schemas/bot-stats.schema';
import EventWrapper from '@/channel/lib/EventWrapper'; import EventWrapper from '@/channel/lib/EventWrapper';
import { HelperService } from '@/helper/helper.service';
import { FlowEscape, HelperType } from '@/helper/types';
import { LoggerService } from '@/logger/logger.service'; import { LoggerService } from '@/logger/logger.service';
import { SettingService } from '@/setting/services/setting.service'; import { SettingService } from '@/setting/services/setting.service';
@ -24,7 +26,7 @@ import {
OutgoingMessageFormat, OutgoingMessageFormat,
StdOutgoingMessageEnvelope, StdOutgoingMessageEnvelope,
} from '../schemas/types/message'; } from '../schemas/types/message';
import { BlockOptions, FallbackOptions } from '../schemas/types/options'; import { FallbackOptions } from '../schemas/types/options';
import { BlockService } from './block.service'; import { BlockService } from './block.service';
import { ConversationService } from './conversation.service'; import { ConversationService } from './conversation.service';
@ -39,6 +41,7 @@ export class BotService {
private readonly conversationService: ConversationService, private readonly conversationService: ConversationService,
private readonly subscriberService: SubscriberService, private readonly subscriberService: SubscriberService,
private readonly settingService: SettingService, private readonly settingService: SettingService,
private readonly helperService: HelperService,
) {} ) {}
/** /**
@ -244,10 +247,12 @@ export class BotService {
/** /**
* Handles advancing the conversation to the specified *next* block. * Handles advancing the conversation to the specified *next* block.
* *
* 1. Updates popular blocks stats. * @param convo - The current conversation object containing context and state.
* 2. Persists the updated conversation context. * @param next - The next block to proceed to in the conversation flow.
* 3. Triggers the next block. * @param event - The incoming event that triggered the conversation flow.
* 4. Ends the conversation if an unrecoverable error occurs. * @param fallback - Boolean indicating if this is a fallback response in case no appropriate reply was found.
*
* @returns A promise that resolves to a boolean indicating whether the next block was successfully triggered.
*/ */
async proceedToNextBlock( async proceedToNextBlock(
convo: ConversationFull, convo: ConversationFull,
@ -258,10 +263,7 @@ export class BotService {
// Increment stats about popular blocks // Increment stats about popular blocks
this.eventEmitter.emit('hook:stats:entry', BotStatsType.popular, next.name); this.eventEmitter.emit('hook:stats:entry', BotStatsType.popular, next.name);
this.logger.debug( this.logger.debug(
'Proceeding to next block ', `Proceeding to next block ${next.id} for conversation ${convo.id}`,
next.id,
' for conversation ',
convo.id,
); );
try { try {
@ -348,45 +350,16 @@ export class BotService {
) { ) {
try { try {
let fallback = false; let fallback = false;
const currentBlock = convo.current; this.logger.debug('Handling ongoing conversation message ...', convo.id);
const fallbackOptions: BlockOptions['fallback'] = convo.current?.options const matchedBlock = await this.findNextMatchingBlock(convo, event);
?.fallback
? convo.current.options.fallback
: {
active: false,
max_attempts: 0,
message: [],
};
// We will avoid having multiple matches when we are not at the start of a conversation
// and only if local fallback is enabled
const canHaveMultipleMatches = !fallbackOptions.active;
// Find the next block that matches
const nextBlocks = await this.blockService.findAndPopulate({
_id: { $in: convo.next.map(({ id }) => id) },
});
const matchedBlock = await this.blockService.match(
nextBlocks,
event,
canHaveMultipleMatches,
);
// If there is no match in next block then loopback (current fallback)
// This applies only to text messages + there's a max attempt to be specified
let fallbackBlock: BlockFull | undefined = undefined; let fallbackBlock: BlockFull | undefined = undefined;
if (!matchedBlock && this.shouldAttemptLocalFallback(convo, event)) { if (!matchedBlock && this.shouldAttemptLocalFallback(convo, event)) {
// Trigger block fallback const fallbackResult = await this.handleFlowEscapeFallback(
// NOTE : current is not populated, this may cause some anomaly convo,
fallbackBlock = { event,
...currentBlock, );
nextBlocks: convo.next, fallbackBlock = fallbackResult.nextBlock;
// If there's labels, they should be already have been assigned fallback = fallbackResult.fallback;
assign_labels: [],
trigger_labels: [],
attachedBlock: null,
category: null,
previousBlocks: [],
};
fallback = true;
} }
const next = matchedBlock || fallbackBlock; const next = matchedBlock || fallbackBlock;
@ -410,6 +383,91 @@ export class BotService {
} }
} }
/**
* Handles the flow escape fallback logic for a conversation.
*
* This method adjudicates the flow escape event and helps determine the next block to execute based on the helper's response.
* It can coerce the event to a specific next block, create a new context, or reprompt the user with a fallback message.
* If the helper cannot handle the flow escape, it returns a fallback block with the current conversation's state.
*
* @param convo - The current conversation object.
* @param event - The incoming event that triggered the fallback.
*
* @returns An object containing the next block to execute (if any) and a flag indicating if a fallback should occur.
*/
async handleFlowEscapeFallback(
convo: ConversationFull,
event: EventWrapper<any, any>,
): Promise<{ nextBlock?: BlockFull; fallback: boolean }> {
const currentBlock = convo.current;
const fallbackOptions: FallbackOptions =
this.blockService.getFallbackOptions(currentBlock);
const fallbackBlock: BlockFull = {
...currentBlock,
nextBlocks: convo.next,
assign_labels: [],
trigger_labels: [],
attachedBlock: null,
category: null,
previousBlocks: [],
};
try {
const helper = await this.helperService.getDefaultHelper(
HelperType.FLOW_ESCAPE,
);
if (!helper.canHandleFlowEscape(currentBlock)) {
return { nextBlock: fallbackBlock, fallback: true };
}
// Adjudicate the flow escape event
this.logger.debug(
`Adjudicating flow escape for block '${currentBlock.id}' in conversation '${convo.id}'.`,
);
const result = await helper.adjudicate(event, currentBlock);
switch (result.action) {
case FlowEscape.Action.COERCE: {
// Coerce the option to the next block
this.logger.debug(`Coercing option to the next block ...`, convo.id);
const proxiedEvent = new Proxy(event, {
get(target, prop, receiver) {
if (prop === 'getText') {
return () => result.coercedOption + '';
}
return Reflect.get(target, prop, receiver);
},
});
const matchedBlock = await this.findNextMatchingBlock(
convo,
proxiedEvent,
);
return { nextBlock: matchedBlock, fallback: false };
}
case FlowEscape.Action.NEW_CTX:
return { nextBlock: undefined, fallback: false };
case FlowEscape.Action.REPROMPT:
default:
if (result.repromptMessage) {
fallbackBlock.options.fallback = {
...fallbackOptions,
message: [result.repromptMessage],
};
}
return { nextBlock: fallbackBlock, fallback: true };
}
} catch (err) {
this.logger.warn(
'Unable to handle flow escape, using default local fallback ...',
err,
);
return { nextBlock: fallbackBlock, fallback: true };
}
}
/** /**
* Determines if the incoming message belongs to an active conversation and processes it accordingly. * Determines if the incoming message belongs to an active conversation and processes it accordingly.
* If an active conversation is found, the message is handled as part of that conversation. * If an active conversation is found, the message is handled as part of that conversation.

View File

@ -1,5 +1,5 @@
/* /*
* Copyright © 2024 Hexastack. All rights reserved. * Copyright © 2025 Hexastack. All rights reserved.
* *
* Licensed under the GNU Affero General Public License v3.0 (AGPLv3) with the following additional terms: * Licensed under the GNU Affero General Public License v3.0 (AGPLv3) with the following additional terms:
* 1. The name "Hexabot" is a trademark of Hexastack. You may not use this name in derivative works without express written permission. * 1. The name "Hexabot" is a trademark of Hexastack. You may not use this name in derivative works without express written permission.
@ -10,6 +10,7 @@ import { HttpModule } from '@nestjs/axios';
import { Global, Module } from '@nestjs/common'; import { Global, Module } from '@nestjs/common';
import { InjectDynamicProviders } from 'nestjs-dynamic-providers'; import { InjectDynamicProviders } from 'nestjs-dynamic-providers';
import { CmsModule } from '@/cms/cms.module';
import { NlpModule } from '@/nlp/nlp.module'; import { NlpModule } from '@/nlp/nlp.module';
import { HelperController } from './helper.controller'; import { HelperController } from './helper.controller';
@ -25,7 +26,7 @@ import { HelperService } from './helper.service';
'dist/.hexabot/custom/extensions/helpers/**/*.helper.js', 'dist/.hexabot/custom/extensions/helpers/**/*.helper.js',
) )
@Module({ @Module({
imports: [HttpModule, NlpModule], imports: [HttpModule, NlpModule, CmsModule],
controllers: [HelperController], controllers: [HelperController],
providers: [HelperService], providers: [HelperService],
exports: [HelperService], exports: [HelperService],

View File

@ -36,7 +36,7 @@ export default abstract class BaseFlowEscapeHelper<
* @param _blockMessage - The block message to check. * @param _blockMessage - The block message to check.
* @returns - Whether the helper can handle the flow escape for the given block message. * @returns - Whether the helper can handle the flow escape for the given block message.
*/ */
abstract canHandleFlowEscape<T extends BlockStub>(_blockMessage: T): boolean; abstract canHandleFlowEscape<T extends BlockStub>(block: T): boolean;
/** /**
* Adjudicates the flow escape event. * Adjudicates the flow escape event.
@ -46,7 +46,7 @@ export default abstract class BaseFlowEscapeHelper<
* @returns - A promise that resolves to a FlowEscape.AdjudicationResult. * @returns - A promise that resolves to a FlowEscape.AdjudicationResult.
*/ */
abstract adjudicate<T extends BlockStub>( abstract adjudicate<T extends BlockStub>(
_event: EventWrapper<any, any>, event: EventWrapper<any, any>,
_block: T, block: T,
): Promise<FlowEscape.AdjudicationResult>; ): Promise<FlowEscape.AdjudicationResult>;
} }

View File

@ -81,7 +81,7 @@ const position = {
y: 0, y: 0,
}; };
export const baseBlockInstance = { export const baseBlockInstance: Partial<BlockFull> = {
trigger_labels: [labelMock], trigger_labels: [labelMock],
assign_labels: [labelMock], assign_labels: [labelMock],
options: blockOptions, options: blockOptions,
@ -90,7 +90,7 @@ export const baseBlockInstance = {
position, position,
builtin: true, builtin: true,
attachedBlock: null, attachedBlock: null,
category: undefined, category: null,
previousBlocks: [], previousBlocks: [],
trigger_channels: [], trigger_channels: [],
nextBlocks: [], nextBlocks: [],