feat: remove core nlu engine

This commit is contained in:
hexastack 2025-01-06 10:30:47 +01:00
parent 06e06a03b9
commit f45e61604e
44 changed files with 0 additions and 2926 deletions

View File

@ -1,50 +0,0 @@
name: Build and Push Docker NLU Image
on:
push:
branches:
- 'main'
tags:
- 'v*'
jobs:
build-and-push:
runs-on: ubuntu-latest
steps:
- name: Check out repository code
uses: actions/checkout@v4
- name: Docker meta
id: meta
uses: docker/metadata-action@v5
with:
images: hexastack/hexabot-nlu
tags: |
type=ref,event=branch
type=ref,event=pr
type=semver,pattern={{version}}
type=semver,pattern={{major}}.{{minor}}
- name: Set up QEMU
uses: docker/setup-qemu-action@v3
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v3
- name: Login to Docker Hub
if: github.event_name != 'pull_request'
id: docker_login
uses: docker/login-action@v2
with:
username: ${{ secrets.DOCKERHUB_USERNAME }}
password: ${{ secrets.DOCKERHUB_TOKEN }}
- name: Build and push NLU Docker image
uses: docker/build-push-action@v6
with:
context: ./nlu/
file: ./nlu/Dockerfile
platforms: linux/amd64,linux/arm64
push: ${{ github.event_name != 'pull_request' }}
tags: ${{ steps.meta.outputs.tags }}
labels: ${{ steps.meta.outputs.labels }}

View File

@ -1,124 +0,0 @@
/*
* Copyright © 2024 Hexastack. All rights reserved.
*
* Licensed under the GNU Affero General Public License v3.0 (AGPLv3) with the following additional terms:
* 1. The name "Hexabot" is a trademark of Hexastack. You may not use this name in derivative works without express written permission.
* 2. All derivative works must include clear attribution to the original creator and software, Hexastack and Hexabot, in a prominent location (e.g., in the software's "About" section, documentation, and README file).
*/
import { NlpEntityFull } from '@/nlp/schemas/nlp-entity.schema';
import { NlpSampleFull } from '@/nlp/schemas/nlp-sample.schema';
import { NlpSampleState } from '@/nlp/schemas/types';
export const modelInstance = {
id: '1',
createdAt: new Date(),
updatedAt: new Date(),
};
export const baseNlpValue = {
...modelInstance,
expressions: [],
builtin: true,
};
export const baseNlpEntity = {
...modelInstance,
doc: '',
builtin: true,
};
export const baseLanguage = {
...modelInstance,
title: 'English',
code: 'en',
isDefault: true,
};
export const entitiesMock: NlpEntityFull[] = [
{
...baseNlpEntity,
id: 'entity-1',
name: 'intent',
lookups: ['trait'],
values: [
{
...baseNlpValue,
id: 'value-1',
entity: 'entity-1',
value: 'greeting',
},
{
...baseNlpValue,
id: 'value-2',
entity: 'entity-1',
value: 'order',
},
],
},
{
...baseNlpEntity,
id: 'entity-2',
name: 'product',
lookups: ['keywords'],
doc: '',
values: [
{
...baseNlpValue,
id: 'value-3',
entity: 'entity-2',
value: 'pizza',
expressions: ['piza', 'pizzza'],
},
{
...baseNlpValue,
id: 'value-4',
entity: 'entity-2',
value: 'sandwich',
},
],
},
];
export const samplesMock: NlpSampleFull[] = [
{
...modelInstance,
id: 'sample-1',
text: 'Hello',
entities: [
{
...baseNlpEntity,
sample: 'sample-1',
entity: 'entity-1',
value: 'value-1',
},
],
trained: false,
type: NlpSampleState.train,
language: baseLanguage,
},
{
...modelInstance,
id: 'sample-2',
text: 'i want to order a pizza',
entities: [
{
...baseNlpEntity,
sample: 'sample-2',
entity: 'entity-1',
value: 'value-2',
},
{
...baseNlpEntity,
sample: 'sample-2',
entity: 'entity-2',
value: 'value-3',
start: 19,
end: 23,
},
],
trained: false,
type: NlpSampleState.train,
language: baseLanguage,
},
];

View File

@ -1,118 +0,0 @@
/*
* Copyright © 2024 Hexastack. All rights reserved.
*
* Licensed under the GNU Affero General Public License v3.0 (AGPLv3) with the following additional terms:
* 1. The name "Hexabot" is a trademark of Hexastack. You may not use this name in derivative works without express written permission.
* 2. All derivative works must include clear attribution to the original creator and software, Hexastack and Hexabot, in a prominent location (e.g., in the software's "About" section, documentation, and README file).
*/
import { NLU } from '@/helper/types';
import { NlpParseResultType, RasaNlu } from '../types';
export const nlpEmptyFormated: RasaNlu.Dataset = {
common_examples: [],
regex_features: [],
lookup_tables: [
{
name: 'intent',
elements: ['greeting', 'order'],
},
{
name: 'product',
elements: ['pizza', 'sandwich'],
},
{
elements: ['en', 'fr'],
name: 'language',
},
],
entity_synonyms: [
{
value: 'pizza',
synonyms: ['piza', 'pizzza'],
},
],
};
export const nlpFormatted: RasaNlu.Dataset = {
common_examples: [
{
text: 'Hello',
intent: 'greeting',
entities: [
{
entity: 'language',
value: 'en',
},
],
},
{
text: 'i want to order a pizza',
intent: 'order',
entities: [
{ entity: 'product', value: 'pizza', start: 19, end: 23 },
{
entity: 'language',
value: 'en',
},
],
},
],
regex_features: [],
lookup_tables: [
{ name: 'intent', elements: ['greeting', 'order'] },
{ name: 'product', elements: ['pizza', 'sandwich'] },
{ name: 'language', elements: ['en', 'fr'] },
],
entity_synonyms: [
{
value: 'pizza',
synonyms: ['piza', 'pizzza'],
},
],
};
export const nlpParseResult: NlpParseResultType = {
entities: [
{
start: 5,
end: 7,
value: 'Joe',
entity: 'person',
confidence: 0.4081958281101719,
},
],
intent: {
confidence: 0.6081958281101719,
name: 'greeting',
},
intent_ranking: [
{
confidence: 0.6081958281101719,
name: 'greeting',
},
{
confidence: 0.3918041718898281,
name: 'goodbye',
},
],
text: 'Hello Joe',
};
export const nlpBestGuess: NLU.ParseEntities = {
entities: [
{
start: 5,
end: 7,
value: 'Joe',
entity: 'person',
confidence: 0.4081958281101719,
},
{
entity: 'intent',
value: 'greeting',
confidence: 0.6081958281101719,
},
],
};

View File

@ -1,134 +0,0 @@
/*
* Copyright © 2024 Hexastack. All rights reserved.
*
* Licensed under the GNU Affero General Public License v3.0 (AGPLv3) with the following additional terms:
* 1. The name "Hexabot" is a trademark of Hexastack. You may not use this name in derivative works without express written permission.
* 2. All derivative works must include clear attribution to the original creator and software, Hexastack and Hexabot, in a prominent location (e.g., in the software's "About" section, documentation, and README file).
*/
import { HttpModule } from '@nestjs/axios';
import { CACHE_MANAGER } from '@nestjs/cache-manager';
import { EventEmitter2 } from '@nestjs/event-emitter';
import { MongooseModule } from '@nestjs/mongoose';
import { Test, TestingModule } from '@nestjs/testing';
import { HelperService } from '@/helper/helper.service';
import { LanguageRepository } from '@/i18n/repositories/language.repository';
import { LanguageModel } from '@/i18n/schemas/language.schema';
import { LanguageService } from '@/i18n/services/language.service';
import { LoggerService } from '@/logger/logger.service';
import { SettingService } from '@/setting/services/setting.service';
import { installLanguageFixtures } from '@/utils/test/fixtures/language';
import {
closeInMongodConnection,
rootMongooseTestModule,
} from '@/utils/test/test';
import CoreNluHelper from '../index.helper';
import { entitiesMock, samplesMock } from './__mock__/base.mock';
import {
nlpBestGuess,
nlpEmptyFormated,
nlpFormatted,
nlpParseResult,
} from './index.mock';
describe('Core NLU Helper', () => {
let settingService: SettingService;
let defaultNlpHelper: CoreNluHelper;
beforeAll(async () => {
const module: TestingModule = await Test.createTestingModule({
imports: [
rootMongooseTestModule(async () => {
await installLanguageFixtures();
}),
MongooseModule.forFeature([LanguageModel]),
HttpModule,
],
providers: [
LanguageService,
LanguageRepository,
EventEmitter2,
HelperService,
CoreNluHelper,
LoggerService,
{
provide: SettingService,
useValue: {
getSettings: jest.fn(() => ({
core_nlu_helper: {
endpoint: 'path',
token: 'token',
threshold: '0.5',
},
})),
},
},
{
provide: CACHE_MANAGER,
useValue: {
del: jest.fn(),
get: jest.fn(),
set: jest.fn(),
},
},
],
}).compile();
settingService = module.get<SettingService>(SettingService);
defaultNlpHelper = module.get<CoreNluHelper>(CoreNluHelper);
});
afterAll(closeInMongodConnection);
it('should format empty training set properly', async () => {
const results = await defaultNlpHelper.format([], entitiesMock);
expect(results).toEqual(nlpEmptyFormated);
});
it('should format training set properly', async () => {
const results = await defaultNlpHelper.format(samplesMock, entitiesMock);
expect(results).toEqual(nlpFormatted);
});
it('should return best guess from empty parse results', async () => {
const results = await defaultNlpHelper.filterEntitiesByConfidence(
{
entities: [],
intent: { name: 'greeting', confidence: 0 },
intent_ranking: [],
text: 'test',
},
false,
);
expect(results).toEqual({
entities: [{ entity: 'intent', value: 'greeting', confidence: 0 }],
});
});
it('should return best guess from parse results', async () => {
const results = await defaultNlpHelper.filterEntitiesByConfidence(
nlpParseResult,
false,
);
expect(results).toEqual(nlpBestGuess);
});
it('should return best guess from parse results with threshold', async () => {
const results = await defaultNlpHelper.filterEntitiesByConfidence(
nlpParseResult,
true,
);
const settings = await settingService.getSettings();
const threshold = settings.core_nlu_helper.threshold;
const thresholdGuess = {
entities: nlpBestGuess.entities.filter(
(g) =>
g.confidence >
(typeof threshold === 'string' ? parseFloat(threshold) : threshold),
),
};
expect(results).toEqual(thresholdGuess);
});
});

View File

@ -1,5 +0,0 @@
{
"endpoint": "Enter the endpoint URL for the Core NLU API where requests will be sent.",
"token": "Provide the API token for authenticating requests to the Core NLU API.",
"threshold": "Set the minimum confidence score for predictions to be considered valid."
}

View File

@ -1,5 +0,0 @@
{
"endpoint": "Core NLU API",
"token": "API Token",
"threshold": "Confidence Threshold"
}

View File

@ -1,3 +0,0 @@
{
"core_nlu_helper": "Core NLU Engine"
}

View File

@ -1,5 +0,0 @@
{
"endpoint": "Entrez l'URL de point de terminaison pour l'API NLU Core où les requêtes seront envoyées.",
"token": "Fournissez le jeton d'API pour authentifier les requêtes à l'API NLU Core.",
"threshold": "Définissez le score de confiance minimum pour que les prédictions soient considérées comme valides."
}

View File

@ -1,5 +0,0 @@
{
"endpoint": "API NLU Core",
"token": "Jeton d'API",
"threshold": "Seuil de Confiance"
}

View File

@ -1,3 +0,0 @@
{
"core_nlu_helper": "Core NLU Engine"
}

View File

@ -1,24 +0,0 @@
/*
* Copyright © 2024 Hexastack. All rights reserved.
*
* Licensed under the GNU Affero General Public License v3.0 (AGPLv3) with the following additional terms:
* 1. The name "Hexabot" is a trademark of Hexastack. You may not use this name in derivative works without express written permission.
* 2. All derivative works must include clear attribution to the original creator and software, Hexastack and Hexabot, in a prominent location (e.g., in the software's "About" section, documentation, and README file).
*/
import CORE_NLU_HELPER_SETTINGS, {
CORE_NLU_HELPER_NAMESPACE,
} from './settings';
declare global {
interface Settings extends SettingTree<typeof CORE_NLU_HELPER_SETTINGS> {}
}
declare module '@nestjs/event-emitter' {
interface IHookExtensionsOperationMap {
[CORE_NLU_HELPER_NAMESPACE]: TDefinition<
object,
SettingMapByType<typeof CORE_NLU_HELPER_SETTINGS>
>;
}
}

View File

@ -1,281 +0,0 @@
/*
* Copyright © 2024 Hexastack. All rights reserved.
*
* Licensed under the GNU Affero General Public License v3.0 (AGPLv3) with the following additional terms:
* 1. The name "Hexabot" is a trademark of Hexastack. You may not use this name in derivative works without express written permission.
* 2. All derivative works must include clear attribution to the original creator and software, Hexastack and Hexabot, in a prominent location (e.g., in the software's "About" section, documentation, and README file).
*/
import { HttpService } from '@nestjs/axios';
import { Injectable } from '@nestjs/common';
import { HelperService } from '@/helper/helper.service';
import BaseNlpHelper from '@/helper/lib/base-nlp-helper';
import { NLU } from '@/helper/types';
import { LanguageService } from '@/i18n/services/language.service';
import { LoggerService } from '@/logger/logger.service';
import { NlpEntity, NlpEntityFull } from '@/nlp/schemas/nlp-entity.schema';
import { NlpSampleFull } from '@/nlp/schemas/nlp-sample.schema';
import { NlpValue } from '@/nlp/schemas/nlp-value.schema';
import { SettingService } from '@/setting/services/setting.service';
import { buildURL } from '@/utils/helpers/URL';
import { CORE_NLU_HELPER_NAME } from './settings';
import { NlpParseResultType, RasaNlu } from './types';
@Injectable()
export default class CoreNluHelper extends BaseNlpHelper<
typeof CORE_NLU_HELPER_NAME
> {
constructor(
settingService: SettingService,
helperService: HelperService,
logger: LoggerService,
private readonly httpService: HttpService,
private readonly languageService: LanguageService,
) {
super(CORE_NLU_HELPER_NAME, settingService, helperService, logger);
}
getPath() {
return __dirname;
}
/**
* Formats a set of NLP samples into the Rasa NLU-compatible training dataset format.
*
* @param samples - The NLP samples to format.
* @param entities - The NLP entities available in the dataset.
*
* @returns The formatted Rasa NLU training dataset.
*/
async format(
samples: NlpSampleFull[],
entities: NlpEntityFull[],
): Promise<RasaNlu.Dataset> {
const entityMap = NlpEntity.getEntityMap(entities);
const valueMap = NlpValue.getValueMap(
NlpValue.getValuesFromEntities(entities),
);
const common_examples: RasaNlu.CommonExample[] = samples
.filter((s) => s.entities.length > 0)
.map((s) => {
const intent = s.entities.find(
(e) => entityMap[e.entity].name === 'intent',
);
if (!intent) {
throw new Error('Unable to find the `intent` nlp entity.');
}
const sampleEntities: RasaNlu.ExampleEntity[] = s.entities
.filter((e) => entityMap[<string>e.entity].name !== 'intent')
.map((e) => {
const res: RasaNlu.ExampleEntity = {
entity: entityMap[<string>e.entity].name,
value: valueMap[<string>e.value].value,
};
if ('start' in e && 'end' in e) {
Object.assign(res, {
start: e.start,
end: e.end,
});
}
return res;
})
// TODO : place language at the same level as the intent
.concat({
entity: 'language',
value: s.language.code,
});
return {
text: s.text,
intent: valueMap[intent.value].value,
entities: sampleEntities,
};
});
const languages = await this.languageService.getLanguages();
const lookup_tables: RasaNlu.LookupTable[] = entities
.map((e) => {
return {
name: e.name,
elements: e.values.map((v) => {
return v.value;
}),
};
})
.concat({
name: 'language',
elements: Object.keys(languages),
});
const entity_synonyms = entities
.reduce((acc, e) => {
const synonyms = e.values.map((v) => {
return {
value: v.value,
synonyms: v.expressions,
};
});
return acc.concat(synonyms);
}, [] as RasaNlu.EntitySynonym[])
.filter((s) => {
return s.synonyms.length > 0;
});
return {
common_examples,
regex_features: [],
lookup_tables,
entity_synonyms,
};
}
/**
* Perform a training request
*
* @param samples - Samples to train
* @param entities - All available entities
* @returns The training result
*/
async train(
samples: NlpSampleFull[],
entities: NlpEntityFull[],
): Promise<any> {
const nluData: RasaNlu.Dataset = await this.format(samples, entities);
const settings = await this.getSettings();
// Train samples
return await this.httpService.axiosRef.post(
buildURL(settings.endpoint, `/train`),
nluData,
{
params: {
token: settings.token,
},
},
);
}
/**
* Perform evaluation request
*
* @param samples - Samples to evaluate
* @param entities - All available entities
* @returns Evaluation results
*/
async evaluate(
samples: NlpSampleFull[],
entities: NlpEntityFull[],
): Promise<any> {
const settings = await this.getSettings();
const nluTestData: RasaNlu.Dataset = await this.format(samples, entities);
// Evaluate model with test samples
return await this.httpService.axiosRef.post(
buildURL(settings.endpoint, `/evaluate`),
nluTestData,
{
params: {
token: settings.token,
},
},
);
}
/**
* Returns only the entities that have strong confidence (> than the threshold), can return an empty result
*
* @param nlp - The nlp returned result
* @param threshold - Whenever to apply threshold filter or not
*
* @returns The parsed entities
*/
async filterEntitiesByConfidence(
nlp: NlpParseResultType,
threshold: boolean,
): Promise<NLU.ParseEntities> {
try {
let minConfidence = 0;
const guess: NLU.ParseEntities = {
entities: nlp.entities.slice(),
};
if (threshold) {
const settings = await this.getSettings();
const threshold = settings.threshold;
minConfidence =
typeof threshold === 'string'
? Number.parseFloat(threshold)
: threshold;
guess.entities = guess.entities
.map((e) => {
e.confidence =
typeof e.confidence === 'string'
? Number.parseFloat(e.confidence)
: e.confidence;
return e;
})
.filter((e) => e.confidence >= minConfidence);
// Get past threshold and the highest confidence for the same entity
// .filter((e, idx, self) => {
// const sameEntities = self.filter((s) => s.entity === e.entity);
// const max = Math.max.apply(Math, sameEntities.map((e) => { return e.confidence; }));
// return e.confidence === max;
// });
}
['intent', 'language'].forEach((trait) => {
if (trait in nlp && (nlp as any)[trait].confidence >= minConfidence) {
guess.entities.push({
entity: trait,
value: (nlp as any)[trait].name,
confidence: (nlp as any)[trait].confidence,
});
}
});
return guess;
} catch (e) {
this.logger.error(
'Core NLU Helper : Unable to parse nlp result to extract best guess!',
e,
);
return {
entities: [],
};
}
}
/**
* Returns only the entities that have strong confidence (> than the threshold), can return an empty result
*
* @param text - The text to parse
* @param threshold - Whenever to apply threshold filter or not
* @param project - Whenever to request a specific model
*
* @returns The prediction
*/
async predict(
text: string,
threshold: boolean,
project: string = 'current',
): Promise<NLU.ParseEntities> {
try {
const settings = await this.getSettings();
const { data: nlp } =
await this.httpService.axiosRef.post<NlpParseResultType>(
buildURL(settings.endpoint, '/parse'),
{
q: text,
project,
},
{
params: {
token: settings.token,
},
},
);
return await this.filterEntitiesByConfidence(nlp, threshold);
} catch (err) {
this.logger.error('Core NLU Helper : Unable to parse nlp', err);
throw err;
}
}
}

View File

@ -1,8 +0,0 @@
{
"name": "hexabot-helper-core-nlu",
"version": "2.0.0",
"description": "The Core NLU Helper Extension for Hexabot Chatbot / Agent Builder to enable the Intent Classification and Language Detection",
"dependencies": {},
"author": "Hexastack",
"license": "AGPL-3.0-only"
}

View File

@ -1,40 +0,0 @@
/*
* Copyright © 2024 Hexastack. All rights reserved.
*
* Licensed under the GNU Affero General Public License v3.0 (AGPLv3) with the following additional terms:
* 1. The name "Hexabot" is a trademark of Hexastack. You may not use this name in derivative works without express written permission.
* 2. All derivative works must include clear attribution to the original creator and software, Hexastack and Hexabot, in a prominent location (e.g., in the software's "About" section, documentation, and README file).
*/
import { HelperSetting } from '@/helper/types';
import { SettingType } from '@/setting/schemas/types';
export const CORE_NLU_HELPER_NAME = 'core-nlu-helper';
export const CORE_NLU_HELPER_NAMESPACE = 'core_nlu_helper';
export default [
{
group: CORE_NLU_HELPER_NAMESPACE,
label: 'endpoint',
value: 'http://nlu-api:5000/',
type: SettingType.text,
},
{
group: CORE_NLU_HELPER_NAMESPACE,
label: 'token',
value: 'token123',
type: SettingType.text,
},
{
group: CORE_NLU_HELPER_NAMESPACE,
label: 'threshold',
value: 0.1,
type: SettingType.number,
config: {
min: 0,
max: 1,
step: 0.01,
},
},
] as const satisfies HelperSetting<typeof CORE_NLU_HELPER_NAME>[];

View File

@ -1,65 +0,0 @@
/*
* Copyright © 2024 Hexastack. All rights reserved.
*
* Licensed under the GNU Affero General Public License v3.0 (AGPLv3) with the following additional terms:
* 1. The name "Hexabot" is a trademark of Hexastack. You may not use this name in derivative works without express written permission.
* 2. All derivative works must include clear attribution to the original creator and software, Hexastack and Hexabot, in a prominent location (e.g., in the software's "About" section, documentation, and README file).
*/
export namespace RasaNlu {
export interface ExampleEntity {
entity: string;
value: string;
start?: number;
end?: number;
}
export interface CommonExample {
text: string;
intent: string;
entities: ExampleEntity[];
}
export interface LookupTable {
name: string;
elements: string[];
}
export interface EntitySynonym {
value: string;
synonyms: string[];
}
export interface Dataset {
common_examples: CommonExample[];
regex_features: any[];
lookup_tables: LookupTable[];
entity_synonyms: EntitySynonym[];
}
}
export interface ParseEntity {
entity: string; // Entity name
value: string; // Value name
confidence: number;
start?: number;
end?: number;
}
export interface ParseEntities {
entities: ParseEntity[];
}
export interface NlpParseResultType {
intent: {
name: string;
confidence: number;
};
language?: {
name: string;
confidence: number;
};
intent_ranking: any[];
entities: ParseEntity[];
text: string;
}

View File

@ -46,21 +46,6 @@ EMAIL_SMTP_USER=dev_only
EMAIL_SMTP_PASS=dev_only
EMAIL_SMTP_FROM=noreply@example.com
# NLU Server
AUTH_TOKEN=token123
LANGUAGE_CLASSIFIER=language-classifier
INTENT_CLASSIFIERS=en,fr
TFLC_REPO_ID=Hexastack/tflc
INTENT_CLASSIFIER_REPO_ID=Hexastack/intent-classifier
SLOT_FILLER_REPO_ID=Hexastack/slot-filler
NLU_ENGINE_PORT=5000
BERT_MODEL_BY_LANGUAGE_JSON='{
"en": "bert-base-cased",
"fr": "dbmdz/bert-base-french-europeana-cased"
}'
# Huggingface Access token to download private models for NLU inference
HF_AUTH_TOKEN=
# Frontend (Next.js)
NEXT_PUBLIC_API_ORIGIN=http://${APP_DOMAIN}:${API_PORT}/
NEXT_PUBLIC_SSO_ENABLED=false

View File

@ -1,10 +0,0 @@
version: "3.9"
services:
nlu-api:
build:
context: ../nlu
dockerfile: Dockerfile
pull_policy: build
ports:
- ${NLU_ENGINE_PORT}:5000

View File

@ -1,30 +0,0 @@
version: "3.9"
services:
api:
networks:
- nlu-network
depends_on:
nlu-api:
condition: service_healthy
nlu-api:
container_name: nlu-api
image: hexastack/hexabot-nlu:latest
env_file: .env
networks:
- nlu-network
volumes:
- nlu-data:/app/repos
healthcheck:
test: curl --fail http://localhost:5000/health || exit 1
interval: 10s
timeout: 10s
retries: 5
start_period: 10s
volumes:
nlu-data:
networks:
nlu-network:

View File

@ -1,5 +0,0 @@
__pycache__
/experiments/*
/venv
.env.dev
/repos/*

View File

@ -1,6 +0,0 @@
AUTH_TOKEN=123
LANGUAGE_CLASSIFIER=language-classifier
INTENT_CLASSIFIERS=ar,fr,tn
TFLC_REPO_ID=Hexastack/tflc
INTENT_CLASSIFIER_REPO_ID=Hexastack/intent-classifier
SLOT_FILLER_REPO_ID=Hexastack/slot-filler

View File

@ -1,5 +0,0 @@
AUTH_TOKEN=
LANGUAGE_CLASSIFIER=
INTENT_CLASSIFIERS=
INTENT_CLASSIFIER_REPO_ID=
SLOT_FILLER_REPO_ID=

23
nlu/.gitignore vendored
View File

@ -1,23 +0,0 @@
# This repository
data/*
!data/nothing.txt
experiments/*
!experiments/nothing.txt
/repos/*
# Python
*.py[cod]
__pycache__/
# Virtualenv
env/
venv/
virtualenv/
# macOS
Icon?
.DS_Store
# IDEs
*.swp
.env

View File

@ -1,21 +0,0 @@
FROM python:3.11.4
#
WORKDIR /app
#
COPY ./requirements.txt ./requirements.txt
# Update pip
RUN pip3 install --upgrade pip
# Install deps
RUN pip install --no-cache-dir --upgrade -r /app/requirements.txt
# Copy source code
COPY . .
EXPOSE 5000
# Entrypoint
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "5000"]

View File

@ -1,171 +0,0 @@
# Hexabot NLU
The [Hexabot](https://hexabot.ai/) NLU (Natural Language Understanding) engine is a Python-based project that provides tools for building, training, and evaluating machine learning models for natural language tasks such as intent detection and language recognition. It also includes a REST API for inference, built using FastAPI.
## Directory Structure
- **/run.py:** The CLI tool that provides commands for training, evaluating, and managing models.
- **/models:** Contains the different model definitions and logic for training, testing, and evaluation.
- **/data:** Placeholder for datasets used during training and evaluation.
- **/experiments:** Placeholder for stored models generated during training.
- **/data_loaders:** Classes that define the way to load datasets to be used by the different models.
- **/main.py:** The FastAPI-based REST API used for inference, exposing endpoints for real-time predictions.
## Setup
**No dependencies needed besides Python 3.11.6, virtualenv, and TensorFlow.** Start developing your new model on top of this workflow by cloning this repository:
```bash
# Set up a virtualenv
pip install virtualenv
python3.11 -m venv venv
source env.sh
pip install -r requirements.txt
```
## Directory structure
- `data`: gitignore'd, place datasets here.
- `experiments`: gitignore'd, trained models written here.
- `data_loaders`: write your data loaders here.
- `models`: write your models here.
## Usage
**Check `models/mlp.py` and `data_loaders/mnist.py` for fully working examples.**
You should run `source env.sh` on each new shell session. This activates the virtualenv and creates a nice alias for `run.py`:
```bash
$ cat env.sh
source venv/bin/activate
alias run='python run.py'
```
Most routines involve running a command like this:
```bash
# Usage: run [method] [save_dir] [model] [data_loader] [hparams...]
run fit myexperiment1 mlp mnist --batch_size=32 --learning_rate=0.1
```
Examples :
```bash
# Intent classification
run fit intent-classifier-en-30072024 intent_classifier --intent_num_labels=88 --slot_num_labels=17 --language=en
run predict intent-classifier-fr-30072024 --intent_num_labels=7 --slot_num_labels=2 --language=fr
# Language classification
run fit language-classifier-26082023 tflc
run predict language-classifier-26082023
run evaluate language-classifier-26082023
```
where the `model` and `data_loader` args are the module names (i.e., the file names without the `.py`). The command above would run the Keras model's `fit` method, but it could be any custom as long as it accepts a data loader instance as argument.
**If `save_dir` already has a model**:
- Only the first two arguments are required and the data loader may be changed, but respecifying the model is not allowed-- the existing model will always be used.
- Specified hyperparameter values in the command line WILL override previously used ones
(for this run only, not on disk).
### `tfbp.Model`
Models pretty much follow the same rules as Keras models with very slight differences: the constructor's arguments should not be overriden (since the boilerplate code handles instantiation), and the `save` and `restore` methods don't need any arguments.
```python
import tensorflow as tf
import boilerplate as tfbp
@tfbp.default_export
class MyModel(tfbp.Model):
default_hparams = {
"batch_size": 32,
"hidden_size": 512,
"learning_rate": 0.01,
}
# Don't mess with the args and keyword args, `run.py` handles that.
def __init__(self, *a, **kw):
super().__init__(*a, **kw)
self.dense1 = tf.keras.layers.Dense(self.hparams.hidden_size)
...
def call(self, x):
z = self.dense1(x)
...
```
You can also write your own training loops à la pytorch by overriding the `fit` method
or writing a custom method that you can invoke via `run.py` simply by adding the
`@tfbp.runnable` decorator. Examples of both are available in `models/mlp.py`.
### `tfbp.DataLoader`
Since model methods invoked by `run.py` receive a data loader instance, you may name your data loader methods whatever you wish and call them in your model code. A good practice is to make the data loader handle anything that is specific to a particular dataset, which allows the model to be as general as possible.
```python
import tensorflow as tf
import boilerplate as tfbp
@tfbp.default_export
class MyDataLoader(tfbp.DataLoader):
default_hparams = {
"batch_size": 32,
}
def __call__(self):
if self.method == "fit":
train_data = tf.data.TextLineDataset("data/train.txt").shuffle(10000)
valid_data = tf.data.TextLineDataset("data/valid.txt").shuffle(10000)
return self.prep_dataset(train_data), self.prep_dataset(valid_data)
elif self.method == "eval":
test_data = tf.data.TextLineDataset("data/test.txt")
return self.prep_dataset(test_data)
def prep_dataset(self, ds):
return ds.batch(self.hparams.batch_size).prefetch(1)
```
### API
API is built using FastAPI : https://fastapi.tiangolo.com/
Run the dev server in standalone with:
```sh
ENVIRONMENT=dev uvicorn main:app --host 0.0.0.0 --port 5000 --reload
```
Run the project with Docker :
```sh
docker compose -f "docker-compose.yml" up -d --build
```
## Pushing models to HuggingFace
Please refer to official HF documentation on how to host models : https://huggingface.co/docs/hub/en/repositories-getting-started
What is important to note is that big files should be tracked with git-lfs, which you can initialize with:
```
git lfs install
```
and if your files are larger than 5GB youll also need to run:
```
huggingface-cli lfs-enable-largefiles .
```
## Contributing
We welcome contributions from the community! Whether you want to report a bug, suggest new features, or submit a pull request, your input is valuable to us.
Feel free to join us on [Discord](https://discord.gg/rNb9t2MFkG)
## License
This software is licensed under the GNU Affero General Public License v3.0 (AGPLv3) with the following additional terms:
1. The name "Hexabot" is a trademark of Hexastack. You may not use this name in derivative works without express written permission.
2. All derivative works must include clear attribution to the original creator and software, Hexastack and Hexabot, in a prominent location (e.g., in the software's "About" section, documentation, and README file).

View File

@ -1,228 +0,0 @@
"""TensorFlow Boilerplate main module."""
from collections import namedtuple
import json
import os
import sys
import tensorflow as tf
from huggingface_hub import snapshot_download
import logging
# Set up logging configuration
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s')
def Hyperparameters(value):
"""Turn a dict of hyperparameters into a nameduple.
This method will also check if `value` is a namedtuple, and if so, will return it
unchanged.
"""
# Don't transform `value` if it's a namedtuple.
# https://stackoverflow.com/questions/2166818/how-to-check-if-an-object-is-an-instance-of-a-namedtuple
t = type(value)
b = t.__bases__
if len(b) == 1 and b[0] == tuple:
fields = getattr(t, "_fields", None)
if isinstance(fields, tuple) and all(type(name) == str for name in fields):
return value
_Hyperparameters = namedtuple("Hyperparameters", value.keys())
return _Hyperparameters(**value)
def validate_and_get_project_name(repo_name):
"""
Validate a HuggingFace repository name and return the project name.
Parameters:
repo_name (str): The repository name in the format 'Owner/ProjectName'.
Returns:
str: The project name if the repo_name is valid.
Raises:
ValueError: If the repo_name is not in the correct format.
"""
# Check if the repo name contains exactly one '/'
if repo_name.count('/') != 1:
raise ValueError("Invalid repository name format. It must be in 'Owner/ProjectName' format.")
# Split the repository name into owner and project name
owner, project_name = repo_name.split('/')
# Validate that both owner and project name are non-empty
if not owner or not project_name:
raise ValueError("Invalid repository name. Both owner and project name must be non-empty.")
# Return the project name if the validation is successful
return project_name
class Model(tf.keras.Model):
"""Keras model with hyperparameter parsing and a few other utilities."""
default_hparams = {}
_methods = {}
def __init__(self, save_dir=None, method=None, repo_id=None, **hparams):
super().__init__()
self._method = method
self.hparams = {**self.default_hparams, **hparams}
self.extra_params = {}
self._ckpt = None
self._mananger = None
self._repo_id = None
if repo_id is not None:
project_name = validate_and_get_project_name(repo_id)
self._repo_id = repo_id
self._repo_dir = os.path.join("repos", project_name)
if save_dir is not None:
self._save_dir = os.path.join("repos", project_name, save_dir)
else:
self._save_dir = os.path.join("repos", project_name)
self.load_model()
else:
self._save_dir = save_dir
if self._save_dir is None:
raise ValueError(
f"save_dir must be supplied."
)
# If the model's hyperparameters were saved, the saved values will be used as
# the default, but they will be overriden by hyperparameters passed to the
# constructor as keyword args.
hparams_path = os.path.join(self._save_dir, "hparams.json")
if os.path.isfile(hparams_path):
with open(hparams_path) as f:
self.hparams = {**json.load(f), **hparams}
else:
if not os.path.exists(self._save_dir):
os.makedirs(self._save_dir)
with open(hparams_path, "w") as f:
json.dump(self.hparams._asdict(), f, indent=4, # type: ignore
sort_keys=True)
# If the model's has extra parameters, the saved values will be loaded
extra_params_path = os.path.join(self._save_dir, "extra_params.json")
if os.path.isfile(extra_params_path):
with open(extra_params_path) as f:
self.extra_params = {**json.load(f)}
@property
def method(self):
return self._method
@property
def hparams(self):
return self._hparams
@hparams.setter
def hparams(self, value):
self._hparams = Hyperparameters(value)
@property
def extra_params(self):
return self._extra_params
@extra_params.setter
def extra_params(self, value):
self._extra_params = value
@property
def save_dir(self):
return self._save_dir
def save(self):
"""Save the model's weights."""
if self._ckpt is None:
self._ckpt = tf.train.Checkpoint(model=self)
self._manager = tf.train.CheckpointManager(
self._ckpt, directory=self.save_dir, max_to_keep=1
)
self._manager.save()
# Save extra parameters
if self.save_dir:
extra_params_path = os.path.join(
self.save_dir, "extra_params.json")
with open(extra_params_path, "w") as f:
json.dump(self.extra_params, f, indent=4, sort_keys=True)
def restore(self):
"""Restore the model's latest saved weights."""
if self._ckpt is None:
self._ckpt = tf.train.Checkpoint(model=self)
self._manager = tf.train.CheckpointManager(
self._ckpt, directory=self.save_dir, max_to_keep=1
)
self._ckpt.restore(self._manager.latest_checkpoint).expect_partial()
extra_params_path = os.path.join(self.save_dir, "extra_params.json")
if os.path.isfile(extra_params_path):
with open(extra_params_path) as f:
self.extra_params = json.load(f)
def make_summary_writer(self, dirname):
"""Create a TensorBoard summary writer."""
return tf.summary.create_file_writer(os.path.join(self.save_dir, dirname)) # type: ignore
def load_model(self):
if not os.path.isfile(os.path.join(self._save_dir, "checkpoint")):
os.makedirs(self._repo_dir, exist_ok=True)
snapshot_download(repo_id=self._repo_id, force_download=True,
local_dir=self._repo_dir, repo_type="model")
self.restore()
class DataLoader:
"""Data loader class akin to `Model`."""
default_hparams = {}
def __init__(self, method=None, **hparams):
self._method = method
self.hparams = {**self.default_hparams, **hparams}
@property
def method(self):
return self._method
@property
def hparams(self):
return self._hparams
@hparams.setter
def hparams(self, value):
self._hparams = Hyperparameters(value)
def runnable(f):
"""Mark a method as runnable from `run.py`."""
setattr(f, "_runnable", True)
return f
def default_export(cls):
"""Make the class the imported object of the module and compile its runnables."""
sys.modules[cls.__module__] = cls
for name, method in cls.__dict__.items():
if "_runnable" in dir(method) and method._runnable:
cls._methods[name] = method
return cls
def get_model(module_str):
"""Import the model in the given module string."""
return getattr(__import__(f"models.{module_str}"), module_str)
def get_data_loader(module_str):
"""Import the data loader in the given module string."""
return getattr(__import__(f"data_loaders.{module_str}"), module_str)

View File

View File

@ -1,237 +0,0 @@
from typing import Dict, List, Union
import tensorflow as tf
import json
import numpy as np
from transformers import PreTrainedTokenizerFast, PreTrainedTokenizer
import boilerplate as tfbp
from utils.json_helper import JsonHelper
class JointRawData(object):
id: str
intent: str
positions: Dict[str, List[int]]
slots: Dict[str, str]
text: str
def __init__(self, id, intent, positions, slots, text):
self.id = id
self.intent = intent
self.positions = positions
self.slots = slots
self.text = text
def __repr__(self):
return str(json.dumps(self.__dict__, indent=2)) # type: ignore
##
# JISFDL : Joint Intent and Slot Filling Model Data Loader
##
class JISFDL(tfbp.DataLoader):
def encode_texts(self, texts: List[str], tokenizer: Union[PreTrainedTokenizer, PreTrainedTokenizerFast]):
# https://huggingface.co/transformers/preprocessing.html
return tokenizer(texts, padding=True, truncation=True, return_tensors="tf")
def encode_intents(self, intents, intent_map) -> tf.Tensor:
"""Map to train_data values"""
encoded = []
for i in intents:
encoded.append(intent_map[i])
# convert to tf tensor
return tf.convert_to_tensor(encoded, dtype="int32")
def get_slot_from_token(self, token: str, slot_dict: Dict[str, str]):
""" this function maps a token to its slot label"""
# each token either belongs to a slot or has a null slot
for slot_label, value in slot_dict.items():
if token in value:
return slot_label
return None
def encode_slots(self, tokenizer: Union[PreTrainedTokenizer, PreTrainedTokenizerFast],
all_slots: List[Dict[str, str]], all_texts: List[str],
slot_map: Dict[str, int], max_len: int):
encoded_slots = np.zeros(
shape=(len(all_texts), max_len), dtype=np.int32)
# each slot is assigned to the tokenized sentence instead of the raw text
# so that mapping a token to its slots is easier since we can use our bert tokenizer.
for idx, slot_names in enumerate(all_slots):
for slot_name, slot_text in slot_names.items():
slot_names[slot_name] = tokenizer.tokenize(slot_text)
# we now assign the sentence's slot dictionary to its index in all_slots .
all_slots[idx] = slot_names
for idx, text in enumerate(all_texts):
enc = [] # for this idx, to be added at the end to encoded_slots
# for each text, we retrieve all the slots with the
# words in that slot.
slot_names = all_slots[idx]
# we tokenize our input text to match the tokens in the slot dictionary
tokens = tokenizer.tokenize(text)
for token in tokens:
# each token is matched to its individual label
token_slot_name = self.get_slot_from_token(token, slot_names)
# if the token has no label, we give the null label <PAD>
# the label is then appended to the labels of the current text
if token_slot_name:
enc.append(slot_map[token_slot_name])
else:
enc.append(0)
# now add to encoded_slots
# the first and the last elements
# in encoded text are special characters
encoded_slots[idx, 1:len(enc)+1] = enc
return encoded_slots
def get_synonym_map(self):
helper = JsonHelper()
helper.read_dataset_json_file('train.json')
data = helper.read_dataset_json_file('train.json')
synonyms = data["entity_synonyms"]
synonym_map = {}
for entry in synonyms:
value = entry["value"]
for synonym in entry["synonyms"]:
synonym_map[synonym] = value
return synonym_map
def parse_dataset_intents(self, data):
intents = []
k = 0
# Filter examples by language
lang = self.hparams.language
all_examples = data["common_examples"]
if not bool(lang):
examples = all_examples
else:
examples = filter(lambda exp: any(e['entity'] == 'language' and e['value'] == lang for e in exp['entities']), all_examples)
# Parse raw data
for exp in examples:
text = exp["text"].lower()
intent = exp["intent"]
entities = exp["entities"]
# Filter out language entities
slot_entities = filter(
lambda e: e["entity"] != "language", entities)
slots = {}
for e in slot_entities:
# Create slots with entity values and resolve synonyms
if "start" in e and "end" in e and isinstance(e["start"], int) and isinstance(e["end"], int):
original_value = text[e["start"]:e["end"]]
entity_value = e["value"]
if entity_value != original_value:
entity_value = original_value.lower()
slots[e["entity"]] = entity_value
else:
continue
positions = [[e.get("start", -1), e.get("end", -1)]
for e in slot_entities]
temp = JointRawData(k, intent, positions, slots, text)
k += 1
intents.append(temp)
return intents
def __call__(self, tokenizer: Union[PreTrainedTokenizer, PreTrainedTokenizerFast], model_params = None):
# I have already transformed the train and test datasets to the new format using
# the transform to new hidden method.
helper = JsonHelper()
if self.method in ["fit", "train"]:
dataset = helper.read_dataset_json_file('train.json')
train_data = self.parse_dataset_intents(dataset)
return self._transform_dataset(train_data, tokenizer)
elif self.method in ["evaluate"]:
dataset = helper.read_dataset_json_file('test.json')
test_data = self.parse_dataset_intents(dataset)
return self._transform_dataset(test_data, tokenizer, model_params)
else:
raise ValueError("Unknown method!")
def _transform_dataset(self, dataset: List[JointRawData], tokenizer: Union[PreTrainedTokenizer, PreTrainedTokenizerFast], model_params = None):
# We have to encode the texts using the tokenizer to create tensors for training
# the classifier.
texts = [d.text for d in dataset]
encoded_texts = self.encode_texts(texts, tokenizer)
# Map intents, load from the model (evaluate), recompute from dataset otherwise (train)
intents = [d.intent for d in dataset]
if not model_params:
intent_names = list(set(intents))
# Map slots, load from the model (evaluate), recompute from dataset otherwise (train)
slot_names = set()
for td in dataset:
slots = td.slots
for slot in slots:
slot_names.add(slot)
slot_names = list(slot_names)
# To pad all the texts to the same length, the tokenizer will use special characters.
# To handle those we need to add <PAD> to slots_names. It can be some other symbol as well.
slot_names.insert(0, "<PAD>")
else:
if "intent_names" in model_params:
intent_names = model_params["intent_names"]
else:
intent_names = None
if "slot_names" in model_params:
slot_names = model_params["slot_names"]
else:
slot_names = None
if intent_names:
intent_map = dict() # Dict : intent -> index
for idx, ui in enumerate(intent_names):
intent_map[ui] = idx
else:
intent_map = None
# Encode intents
if intent_map:
encoded_intents = self.encode_intents(intents, intent_map)
else:
encoded_intents = None
if slot_names:
slot_map: Dict[str, int] = dict() # slot -> index
for idx, us in enumerate(slot_names):
slot_map[us] = idx
else:
slot_map = None
# Encode slots
# Text : Add a tune to my elrow Guest List
# {'music_item': 'tune', 'playlist_owner': 'my', 'playlist': 'elrow Guest List'}
# [ 0 0 0 18 0 26 12 12 12 12 0 0 0 0 0 0 0 0 0 0 0 0 0 0
# 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0 0]
max_len = len(encoded_texts["input_ids"][0]) # type: ignore
all_slots = [td.slots for td in dataset]
all_texts = [td.text for td in dataset]
if slot_map:
encoded_slots = self.encode_slots(tokenizer,
all_slots, all_texts, slot_map, max_len)
else:
encoded_slots = None
return encoded_texts, encoded_intents, encoded_slots, intent_names, slot_names
def encode_text(self, text: str, tokenizer: Union[PreTrainedTokenizer, PreTrainedTokenizerFast]):
return self.encode_texts([text], tokenizer)

View File

@ -1,29 +0,0 @@
import tensorflow as tf
import boilerplate as tfbp
@tfbp.default_export
class MNIST(tfbp.DataLoader):
default_hparams = {"batch_size": 32}
def __call__(self):
train_data, test_data = tf.keras.datasets.mnist.load_data()
test_data = tf.data.Dataset.from_tensor_slices(test_data)
if self.method in ["fit", "train"]:
train_data = tf.data.Dataset.from_tensor_slices(train_data).shuffle(10000)
test_data = test_data.shuffle(10000)
train_data = self._transform_dataset(train_data)
return train_data, test_data
return self._transform_dataset(test_data)
def _transform_dataset(self, dataset):
dataset = dataset.batch(self.hparams.batch_size)
return dataset.map(
lambda x, y: (
tf.reshape(tf.cast(x, tf.float32) / 255.0, [-1, 28 * 28]), # type: ignore
tf.cast(y, tf.int64),
)
)

View File

@ -1,130 +0,0 @@
from sklearn.calibration import LabelEncoder
import boilerplate as tfbp
from sklearn.preprocessing import OneHotEncoder
from sklearn.feature_extraction.text import TfidfVectorizer
import re
import numpy as np
from typing import Any, Tuple, Dict, List
import os
import joblib
from utils.json_helper import JsonHelper
# TFLC (Term Frequency based Language Classifier) Data Loader
class TFLCDL(tfbp.DataLoader):
default_hparams: Dict[str, Any] = {"ngram_range": (3, 3), "test_size": .2}
# We need to store the fitted preprocessing objects so that we can transform the
# test and predict sets properly.
_save_dir: str
tfidf: TfidfVectorizer
one_hot_encoder: OneHotEncoder
label_encoder: LabelEncoder
language_names: List[str]
json_helper: JsonHelper
def __init__(self, method=None, save_dir=None, **hparams):
super().__init__(method, **hparams)
self.json_helper = JsonHelper("tflc")
self._save_dir = save_dir
# We will opt for a TF-IDF representation of the data as the frequency of word
# roots should give us a good idea about which language we're dealing with.
if method == "fit":
self.tfidf = TfidfVectorizer(analyzer="char_wb",
ngram_range=tuple(self.hparams.ngram_range))
else:
if self._save_dir is not None and os.path.isfile(os.path.join(self._save_dir, "tfidf_vectorizer.joblib")):
self.tfidf = joblib.load(os.path.join(self._save_dir, 'tfidf_vectorizer.joblib'))
else:
raise ValueError(f'Unable to load tfidf in {self._save_dir} ')
def strip_numbers(self, text: str):
return re.sub(r'[0-9]{2,}', '', text.lower())
def get_texts_and_languages(self, dataset: List[dict]):
""" Extracts the text and the language label from the text's JSON object"""
texts = []
languages = []
for item in dataset:
# An item is a JSON object that has text, entities among its keys.
language = ""
entities: List[dict] = item.get("entities", [])
# There can only be at most 1 language for a single piece of text.
# The entity we choose has to have "language as the name like this
# { "name":"language","value":"fr","start":-1,"end":-1 }
language_entities = list(filter(lambda entity: "language" in entity.values(),
entities))
if language_entities:
language = language_entities[0]["value"]
# Numbers and capital letters don't provide information about the language
# so it's better to not have them.
if language:
text = self.strip_numbers(item["text"])
texts.append(text)
languages.append(language)
return texts, languages
def preprocess_train_dataset(self) -> Tuple[np.ndarray, np.ndarray]:
"""Preprocessing the training set and fitting the proprocess steps in the process"""
json = self.json_helper.read_dataset_json_file("train.json")
dataset = json["common_examples"]
# If a sentence has a language label, we include it in our dataset
# Otherwise, we discard it.
texts, languages = self.get_texts_and_languages(dataset)
encoded_texts = np.array(self.tfidf.fit_transform(texts).toarray())
# Encoding language labels as integers
self.label_encoder = LabelEncoder()
integer_encoded = np.array(
self.label_encoder.fit_transform(languages)).reshape(-1, 1)
self.language_names = list(self.label_encoder.classes_)
# Encoding integers to one hot vectors
self.one_hot_encoder = OneHotEncoder(
sparse=False, handle_unknown="error")
encoded_languages = self.one_hot_encoder.fit_transform(integer_encoded)
# Saving the fitted tfidf vectorizer
joblib.dump(self.tfidf, os.path.join(self._save_dir, 'tfidf_vectorizer.joblib'))
# We return the training data in the format of the model input
return encoded_texts, encoded_languages
def __call__(self) -> Tuple[np.ndarray, np.ndarray, List[str]]:
# Regardless of the method, we're required to fit our preprocessing to the training data
if self.method == "fit":
encoded_texts, encoded_languages = self.preprocess_train_dataset()
return encoded_texts, encoded_languages, self.language_names
elif self.method == "evaluate":
dataset = self.json_helper.read_dataset_json_file("test.json")
# We transform the test data.
texts, languages = self.get_texts_and_languages(
dataset["common_examples"])
# Encoding text using TF-IDF.
encoded_texts = np.array(self.tfidf.transform(
texts).toarray()) # type: ignore
# Encoding language labels as integers
self.label_encoder = LabelEncoder()
# Transforming the language labels.
integer_encoded = self.label_encoder.fit_transform(
languages).reshape(-1, 1) # type:ignore
# Encoding integers to one hot vectors
self.one_hot_encoder = OneHotEncoder(
sparse=False, handle_unknown="error")
encoded_languages = np.array(self.one_hot_encoder.fit_transform(
integer_encoded))
return encoded_texts, encoded_languages
else:
raise ValueError("Unknown method!")
def encode_text(self, text: str):
sanitized_text = self.strip_numbers(text)
return self.tfidf.transform([sanitized_text]).toarray() # type: ignore

View File

@ -1,2 +0,0 @@
source venv/bin/activate
alias run='python run.py'

View File

@ -1,115 +0,0 @@
# from typing import Union
import asyncio
import os
from typing import Annotated, Union
from fastapi.responses import JSONResponse
import boilerplate as tfbp
from fastapi import Depends, FastAPI, HTTPException, status
from pydantic import BaseModel
import logging
from huggingface_hub import login
# Set up logging configuration
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s')
AUTH_TOKEN = os.getenv("AUTH_TOKEN", "TOKEN_MUST_BE_DEFINED")
AVAILABLE_LANGUAGES = os.getenv("AVAILABLE_LANGUAGES", "en,fr").split(',')
TFLC_REPO_ID = os.getenv("TFLC_REPO_ID")
INTENT_CLASSIFIER_REPO_ID = os.getenv("INTENT_CLASSIFIER_REPO_ID")
SLOT_FILLER_REPO_ID = os.getenv("SLOT_FILLER_REPO_ID")
HF_AUTH_TOKEN = os.getenv("HF_AUTH_TOKEN")
# Log in to HuggingFace using the provided access token
if HF_AUTH_TOKEN:
login(token=HF_AUTH_TOKEN)
def load_language_classifier():
# Init language classifier model
Model = tfbp.get_model("tflc")
kwargs = {}
model = Model("", method="predict", repo_id=TFLC_REPO_ID, **kwargs)
model.load_model()
logging.info(f'Successfully loaded the language classifier model')
return model
def load_intent_classifiers():
Model = tfbp.get_model("intent_classifier")
intent_classifiers = {}
for language in AVAILABLE_LANGUAGES:
kwargs = {}
intent_classifiers[language] = Model(save_dir=language, method="predict", repo_id=INTENT_CLASSIFIER_REPO_ID, **kwargs)
intent_classifiers[language].load_model()
logging.info(f'Successfully loaded the intent classifier {language} model')
return intent_classifiers
def load_slot_fillers():
Model = tfbp.get_model("slot_filler")
slot_fillers = {}
for language in AVAILABLE_LANGUAGES:
kwargs = {}
slot_fillers[language] = Model(save_dir=language, method="predict", repo_id=SLOT_FILLER_REPO_ID, **kwargs)
slot_fillers[language].load_model()
logging.info(f'Successfully loaded the slot filler {language} model')
return slot_fillers
def load_models():
app.language_classifier = load_language_classifier() # type: ignore
app.intent_classifiers = load_intent_classifiers() # type: ignore
app.slot_fillers = load_slot_fillers() # type: ignore
app = FastAPI()
def authenticate(
token: str
):
if token != AUTH_TOKEN:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Unauthorized access",
)
return True
class ParseInput(BaseModel):
q: str
project: Union[str, None] = None
@app.on_event("startup")
async def startup_event():
asyncio.create_task(asyncio.to_thread(load_models))
@app.get("/health", status_code=200,)
async def check_health():
return "Startup checked"
@app.post("/parse")
def parse(input: ParseInput, is_authenticated: Annotated[str, Depends(authenticate)]):
if not hasattr(app, 'language_classifier') or not hasattr(app, 'intent_classifiers') or not hasattr(app, 'slot_fillers'):
headers = {"Retry-After": "120"} # Suggest retrying after 2 minutes
return JSONResponse(status_code=status.HTTP_503_SERVICE_UNAVAILABLE, content={"message": "Models are still loading, please retry later."}, headers=headers)
language_prediction = app.language_classifier.get_prediction(input.q) # type: ignore
language = language_prediction.get("value")
intent_prediction = app.intent_classifiers[language].get_prediction(
input.q) # type: ignore
slot_prediction = app.slot_fillers[language].get_prediction(
input.q) # type: ignore
if slot_prediction.get("entities"):
entities = slot_prediction.get("entities")
else:
entities = []
entities.append(language_prediction)
return {
"text": input.q,
"intent": intent_prediction.get("intent"),
"entities": entities,
}

View File

View File

@ -1,239 +0,0 @@
import os
import json
import math
from typing import Tuple, Dict, List
from numpy import ndarray
import tensorflow as tf
from transformers import TFBertModel, AutoTokenizer, BatchEncoding
from keras.layers import Dropout, Dense
from sys import platform
if platform == "darwin":
from keras.optimizers.legacy import Adam
else:
from keras.optimizers import Adam
from keras.metrics import SparseCategoricalAccuracy
from focal_loss import SparseCategoricalFocalLoss
import numpy as np
from data_loaders.jisfdl import JISFDL
import boilerplate as tfbp
##
# Intent Classification with BERT
# This code is based on the paper BERT for Joint Intent Classification and Slot Filling by Chen et al. (2019),
# https://arxiv.org/abs/1902.10909 but on a different dataset made for a class project.
#
# Ideas were also taken from https://github.com/monologg/JointBERT, which is a PyTorch implementation of
# the paper with the original dataset.
##
@tfbp.default_export
class IntentClassifier(tfbp.Model):
default_hparams = {
"language": "",
"num_epochs": 2,
"dropout_prob": 0.1,
"intent_num_labels": 7,
"gamma": 2,
"k": 3
}
data_loader: JISFDL
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# Init data loader
self.data_loader = JISFDL(**kwargs)
# Load Tokenizer from transformers
# We will use a pretrained bert model bert-base-cased for both Tokenizer and our classifier.
# Read the environment variable
bert_model_by_language_json = os.getenv('BERT_MODEL_BY_LANGUAGE_JSON')
# Check if the environment variable is set
if not bert_model_by_language_json:
raise ValueError("The BERT_MODEL_BY_LANGUAGE_JSON environment variable is not set.")
# Parse the JSON string into a Python dictionary
try:
bert_models = json.loads(bert_model_by_language_json)
except json.JSONDecodeError as e:
raise ValueError(f"Failed to parse BERT_MODEL_BY_LANGUAGE_JSON: {e}")
# Ensure the parsed JSON is a dictionary
if not isinstance(bert_models, dict):
raise ValueError("The BERT_MODEL_BY_LANGUAGE_JSON must be a valid JSON object (dictionary).")
# Retrieve the BERT model name for the specified language
language = getattr(self.hparams, 'language', "en")
try:
bert_model_name = bert_models[language]
except KeyError as e:
raise ValueError(f"No BERT model is available for the provided language '{language}': {e}")
self.tokenizer = AutoTokenizer.from_pretrained(
bert_model_name, use_fast=False)
self.bert = TFBertModel.from_pretrained(bert_model_name)
self.dropout = Dropout(self.hparams.dropout_prob)
self.intent_classifier = Dense(self.hparams.intent_num_labels,
name="intent_classifier", activation="softmax")
def call(self, inputs, **kwargs):
trained_bert = self.bert(inputs, **kwargs)
pooled_output = trained_bert.pooler_output
# pooled_output for intent classification
pooled_output = self.dropout(pooled_output,
training=kwargs.get("training", False))
intent_probas = self.intent_classifier(pooled_output)
return intent_probas
def load_data(self, data_loader) -> Tuple[BatchEncoding, tf.Tensor, ndarray, int, int]:
return data_loader(self.tokenizer)
def get_metrics_by_intent(self, intent_probas: List[float], encoded_intents: tf.Tensor) -> Dict[str, dict]:
"""evaluating every intent individually"""
intent_names = self.extra_params["intent_names"] # type: ignore
count = {}
scores = {}
data_size = len(intent_probas)
# The confidence gets computed as the average probability predicted in each intent
for probas, actual_intent in zip(intent_probas, encoded_intents):
intent_name = intent_names[actual_intent]
# We sum and then divide by the number of texts in the intent.
count[intent_name] = count.get(intent_name, 0)+1
scores[intent_name] = scores.get(intent_name, {})
scores[intent_name]["intent_confidence"] = scores[intent_name].get("intent_confidence", 0)\
+ probas[actual_intent]
scores[intent_name]["loss"] = scores[intent_name].get("loss", 0)\
- math.log2(probas[actual_intent])
for intent_name in count.keys():
scores[intent_name]["frequency"] = count[intent_name]/data_size
scores[intent_name]["intent_confidence"] /= count[intent_name]
scores[intent_name]["loss"] /= count[intent_name]
return scores
def aggregate_metric(self, scores, key):
"""Group the intent metrics into a global evaluation"""
return np.sum([(scores[intent]["frequency"] * scores[intent][key]) for intent in scores.keys()])
def format_scores(self, scores: Dict[str, dict]):
for intent in scores.keys():
for metric, score in scores[intent].items():
# we will only take 4 decimals.
scores[intent][metric] = "{:.4f}".format(score)
return scores
@tfbp.runnable
def fit(self):
"""Training"""
encoded_texts, encoded_intents, encoded_slots, intent_names, slot_names = self.data_loader(
self.tokenizer)
if self.hparams.intent_num_labels != len(intent_names):
raise ValueError(
f"Hyperparam intent_num_labels mismatch, should be : {len(intent_names)}"
)
# Hyperparams, Optimizer and Loss function
opt = Adam(learning_rate=3e-5, epsilon=1e-08)
losses = SparseCategoricalFocalLoss(gamma=self.hparams.gamma)
metrics = [SparseCategoricalAccuracy("accuracy")]
# Compile model
self.compile(optimizer=opt, loss=losses, metrics=metrics)
x = {"input_ids": encoded_texts["input_ids"], "token_type_ids": encoded_texts["token_type_ids"],
"attention_mask": encoded_texts["attention_mask"]}
super().fit(
x, encoded_intents, epochs=self.hparams.num_epochs, batch_size=32, shuffle=True)
# Persist the model
self.extra_params["intent_names"] = intent_names
self.save()
@tfbp.runnable
def evaluate(self):
encoded_texts, encoded_intents, _, _, _ = self.data_loader(
self.tokenizer, self.extra_params)
metrics = [SparseCategoricalAccuracy("accuracy")]
self.compile(metrics=metrics)
intent_probas = self(encoded_texts) # type: ignore
scores = self.get_metrics_by_intent(intent_probas, encoded_intents)
overall_score = {}
overall_score["intent_confidence"] = self.aggregate_metric(
scores, "intent_confidence")
overall_score["loss"] = self.aggregate_metric(scores, "loss")
scores["Overall Scores"] = overall_score
scores = self.format_scores(scores)
print("\nScores per intent:")
for intent, score in scores.items():
print("{}: {}".format(intent, score))
return scores
def get_prediction(self, text: str):
inputs = self.data_loader.encode_text(text, self.tokenizer)
intent_probas = self(inputs) # type: ignore
intent_probas_np = intent_probas.numpy()
# Get the indices of the maximum values
intent_id = intent_probas_np.argmax(axis=-1)[0]
# get the confidences for each intent
intent_confidences = intent_probas_np[0]
margin = self.compute_normalized_confidence_margin(intent_probas_np)
output = {
"text": text,
"intent": {"name": self.extra_params["intent_names"][intent_id],
"confidence": float(intent_confidences[intent_id])},
"margin": margin,
}
return output
def compute_top_k_confidence(self, probs, k=3):
sorted_probas = np.sort(probs[0])[::-1] # Sort in descending order
top_k_sum = np.sum(sorted_probas[:k])
return top_k_sum
def compute_normalized_confidence_margin(self, probs):
highest_proba = np.max(probs[0])
sum_of_probas = self.compute_top_k_confidence(probs, self.hparams.k)
# Normalized margin
normalized_margin = highest_proba / sum_of_probas
return normalized_margin
@tfbp.runnable
def predict(self):
while True:
text = input("Provide text: ")
output = self.get_prediction(text)
print(output)
# Optionally, provide a way to exit the loop
if input("Try again? (y/n): ").lower() != 'y':
break

View File

@ -1,89 +0,0 @@
import tensorflow as tf
from keras import layers as tfkl
import boilerplate as tfbp
@tfbp.default_export
class MLP(tfbp.Model):
default_hparams = {
"layer_sizes": [512, 10],
"learning_rate": 0.001,
"num_epochs": 10,
}
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.forward = tf.keras.Sequential()
for hidden_size in self.hparams.layer_sizes[:-1]:
self.forward.add(tfkl.Dense(hidden_size, activation=tf.nn.relu))
self.forward.add(
tfkl.Dense(self.hparams.layer_sizes[-1], activation=tf.nn.softmax)
)
self.loss = tf.losses.SparseCategoricalCrossentropy()
self.optimizer = tf.optimizers.Adam(self.hparams.learning_rate)
def call(self, x):
return self.forward(x)
@tfbp.runnable
def fit(self, data_loader):
"""Example using keras training loop."""
train_data, valid_data = data_loader.load()
self.compile(self.optimizer, self.loss)
super().fit(
x=train_data,
validation_data=valid_data,
validation_steps=32, # validate 32 batches at a time
validation_freq=1, # validate every 1 epoch
epochs=self.hparams.num_epochs,
shuffle=False, # dataset instances already handle shuffling
)
self.save()
@tfbp.runnable
def train(self, data_loader):
"""Example using custom training loop."""
step = 0
train_data, valid_data = data_loader()
# Allow to call `next` builtin indefinitely.
valid_data = iter(valid_data.repeat())
for epoch in range(self.hparams.num_epochs):
for x, y in train_data:
with tf.GradientTape() as g:
train_loss = self.loss(y, self(x))
grads = g.gradient(train_loss, self.trainable_variables)
self.optimizer.apply_gradients(zip(grads, self.trainable_variables))
# Validate every 1000 training steps.
if step % 1000 == 0:
x, y = next(valid_data)
valid_loss = self.loss(y, self(x))
print(
f"step {step} (train_loss={train_loss} valid_loss={valid_loss})"
)
step += 1
print(f"epoch {epoch} finished")
self.save()
@tfbp.runnable
def evaluate(self, data_loader):
n = 0
accuracy = 0
test_data = data_loader()
for x, y in test_data:
true_pos = tf.math.equal(y, tf.math.argmax(self(x), axis=-1))
for i in true_pos.numpy():
n += 1
accuracy += (i - accuracy) / n
print(accuracy)

View File

@ -1,289 +0,0 @@
import os
import functools
import json
import re
from transformers import TFBertModel, AutoTokenizer
from keras.layers import Dropout, Dense
from sys import platform
if platform == "darwin":
from keras.optimizers.legacy import Adam
else:
from keras.optimizers import Adam
from focal_loss import SparseCategoricalFocalLoss
from keras.metrics import SparseCategoricalAccuracy
import numpy as np
from data_loaders.jisfdl import JISFDL
from sklearn.metrics import classification_report
import boilerplate as tfbp
##
# Slot filling with BERT
# This notebook is based on the paper BERT for Joint Intent Classification and Slot Filling by Chen et al. (2019),
# https://arxiv.org/abs/1902.10909 but on a different dataset made for a class project.
#
# Ideas were also taken from https://github.com/monologg/JointBERT, which is a PyTorch implementation of
# the paper with the original dataset.
##
@tfbp.default_export
class SlotFiller(tfbp.Model):
default_hparams = {
"language": "",
"num_epochs": 2,
"dropout_prob": 0.1,
"slot_num_labels": 40,
"gamma": 2.0
}
data_loader: JISFDL
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# Init data loader
self.data_loader = JISFDL(**kwargs)
# Load Tokenizer from transformers
# We will use a pretrained bert model bert-base-cased for both Tokenizer and our classifier.
# Read the environment variable
bert_model_by_language_json = os.getenv('BERT_MODEL_BY_LANGUAGE_JSON')
# Check if the environment variable is set
if not bert_model_by_language_json:
raise ValueError("The BERT_MODEL_BY_LANGUAGE_JSON environment variable is not set.")
# Parse the JSON string into a Python dictionary
try:
bert_models = json.loads(bert_model_by_language_json)
except json.JSONDecodeError as e:
raise ValueError(f"Failed to parse BERT_MODEL_BY_LANGUAGE_JSON: {e}")
# Ensure the parsed JSON is a dictionary
if not isinstance(bert_models, dict):
raise ValueError("The BERT_MODEL_BY_LANGUAGE_JSON must be a valid JSON object (dictionary).")
# Retrieve the BERT model name for the specified language
language = getattr(self.hparams, 'language', "en")
try:
bert_model_name = bert_models[language]
except KeyError as e:
raise ValueError(f"No BERT model is available for the provided language '{language}': {e}")
self.tokenizer = AutoTokenizer.from_pretrained(
bert_model_name, use_fast=False)
self.bert = TFBertModel.from_pretrained(bert_model_name)
self.dropout = Dropout(self.hparams.dropout_prob)
self.slot_classifier = Dense(self.hparams.slot_num_labels,
name="slot_classifier", activation="softmax")
def call(self, inputs, **kwargs):
trained_bert = self.bert(inputs, **kwargs)
sequence_output = trained_bert.last_hidden_state
# sequence_output will be used for slot_filling
sequence_output = self.dropout(sequence_output,
training=kwargs.get("training", False))
slot_probas = self.slot_classifier(sequence_output)
return slot_probas
@tfbp.runnable
def fit(self):
"""Training"""
encoded_texts, encoded_intents, encoded_slots, intent_names, slot_names = self.data_loader(
self.tokenizer)
if self.hparams.slot_num_labels != len(slot_names):
raise ValueError(
f"Hyperparam slot_num_labels mismatch, should be : {len(slot_names)}"
)
# Hyperparams, Optimizer and Loss function
opt = Adam(learning_rate=3e-5, epsilon=1e-08)
losses = SparseCategoricalFocalLoss(gamma=self.hparams.gamma)
metrics = [SparseCategoricalAccuracy("accuracy")]
# Compile model
self.compile(optimizer=opt, loss=losses, metrics=metrics)
x = {"input_ids": encoded_texts["input_ids"], "token_type_ids": encoded_texts["token_type_ids"],
"attention_mask": encoded_texts["attention_mask"]}
super().fit(
x, encoded_slots, epochs=self.hparams.num_epochs, batch_size=32, shuffle=True)
# Persist the model
self.extra_params["slot_names"] = slot_names
self.extra_params["synonym_map"] = self.data_loader.get_synonym_map()
self.save()
@tfbp.runnable
def evaluate(self):
"""Evaluation"""
# Load test data
# Assuming your data loader can return test data when mode='test' is specified
encoded_texts, _, encoded_slots, _, slot_names = self.data_loader(
self.tokenizer, self.extra_params)
# Get predictions
predictions = self(encoded_texts)
predicted_slot_ids = np.argmax(predictions, axis=-1) # Shape: (batch_size, sequence_length)
true_labels = encoded_slots.flatten()
pred_labels = predicted_slot_ids.flatten()
# Filter out padding tokens (assuming padding label id is 0)
mask = true_labels != 0
filtered_true_labels = true_labels[mask]
filtered_pred_labels = pred_labels[mask]
# Adjust labels to start from 0 (since padding label 0 is removed)
filtered_true_labels -= 1
filtered_pred_labels -= 1
# Get slot names excluding padding
slot_names_no_pad = self.extra_params["slot_names"][1:] # Exclude padding label
report = classification_report(
filtered_true_labels,
filtered_pred_labels,
target_names=slot_names_no_pad,
zero_division=0
)
print(report)
# Optionally, you can return the report as a string or dictionary
return report
@tfbp.runnable
def predict(self):
while True:
text = input("Provide text: ")
info = self.get_prediction(text.lower())
print(self.summary())
print("Text : " + text)
print(info)
# Optionally, provide a way to exit the loop
if input("Try again? (y/n): ").lower() != 'y':
break
def get_slots_prediction(self, text: str, inputs, slot_probas):
slot_probas_np = slot_probas.numpy()
# Get the indices of the maximum values
slot_ids = slot_probas_np.argmax(axis=-1)[0, :]
# Initialize the output dictionary
out_dict = {}
predicted_slots = set([self.extra_params["slot_names"][s] for s in slot_ids if s != 0])
for ps in predicted_slots:
out_dict[ps] = []
tokens = self.tokenizer.convert_ids_to_tokens(inputs["input_ids"][0])
# Special tokens to exclude
special_tokens = set(self.tokenizer.special_tokens_map.values())
idx = 0 # Initialize index explicitly for token tracking
while idx < len(tokens):
token = tokens[idx]
slot_id = slot_ids[idx]
# Get slot name
slot_name = self.extra_params["slot_names"][slot_id]
if slot_name == "<PAD>":
idx += 1
continue
# Collect tokens for the current slot
collected_tokens = []
# Handle regular tokens and sub-tokens
if not token.startswith("##"):
collected_tokens = [token]
else:
# Collect sub-tokens
while idx > 0 and tokens[idx - 1].startswith("##"):
idx -= 1
collected_tokens.insert(0, tokens[idx])
collected_tokens.append(token)
# Handle subsequent sub-tokens
while idx + 1 < len(tokens) and tokens[idx + 1].startswith("##"):
idx += 1
collected_tokens.append(tokens[idx])
# Add collected tokens to the appropriate slot
if slot_name in out_dict:
out_dict[slot_name].extend(collected_tokens)
idx += 1 # Move to the next token
# Map slot names to IDs
slot_names_to_ids = {value: key for key, value in enumerate(self.extra_params["slot_names"])}
# Create entities from the out_dict
entities = []
for slot_name, slot_tokens in out_dict.items():
slot_id = slot_names_to_ids[slot_name]
# Convert tokens to string
slot_value = self.tokenizer.convert_tokens_to_string(slot_tokens).strip()
slot_value = re.sub(r'\s+', '', slot_value)
# Ensure the slot value exists in the text (avoid -1 for start index)
start_idx = text.find(slot_value)
if start_idx == -1:
print(f"Skipping entity for '{slot_name}' because '{slot_value}' was not found in text.")
continue # Skip this entity if not found in text
# Post Processing
synonym_map = self.extra_params["synonym_map"]
final_slot_value = synonym_map.get(slot_value)
if final_slot_value is None:
final_slot_value = slot_value
# Calculate entity start and end indices
entity = {
"entity": slot_name,
"value": final_slot_value,
"start": start_idx,
"end": start_idx + len(slot_value),
"confidence": 0,
}
# Calculate confidence as the average of token probabilities
indices = [tokens.index(token) for token in slot_tokens]
if slot_tokens:
total_confidence = sum(slot_probas_np[0, idx, slot_id] for idx in indices)
entity["confidence"] = total_confidence / len(slot_tokens)
entities.append(entity)
return entities
def get_prediction(self, text: str):
inputs = self.data_loader.encode_text(text, self.tokenizer)
slot_probas = self(inputs) # type: ignore
entities = []
if slot_probas is not None:
entities = self.get_slots_prediction(text, inputs, slot_probas)
return {
"text": text,
"entities": entities,
}

View File

@ -1,170 +0,0 @@
import tensorflow as tf
from sys import platform
if platform == "darwin":
from keras.optimizers.legacy import Adam
else:
from keras.optimizers import Adam
from keras import layers, Sequential, regularizers
import numpy as np
from typing import Any, Dict, Tuple
from data_loaders.tflcdl import TFLCDL
import boilerplate as tfbp
def mapify(keys: list, values: list) -> dict:
return dict(zip(keys, values))
def format_float(values: np.ndarray, precision: int = 5, padding: int = 5) -> list:
return [np.format_float_positional(v, precision=precision, pad_right=padding,
min_digits=padding) for v in values]
# TFLC (Term Frequency based Language Classifier)
@tfbp.default_export
class TFLC(tfbp.Model):
default_hparams: Dict[str, Any] = {
"layer_sizes": [32, 2],
"num_epochs": 70,
"kernel_regularizer": 1e-4,
"bias_regularizer": 1e-4,
"dropout_proba": .2,
"learning_rate": 1e-3
}
data_loader: TFLCDL
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# Init data loader
self.data_loader = TFLCDL(save_dir=self._save_dir, **kwargs)
# Init layers
self.forward = Sequential()
# Dropout layer to avoid overfitting
self.forward.add(layers.Dropout(self.hparams.dropout_proba))
# Hidden feed forward layers
for hidden_size in self.hparams.layer_sizes[:-1]:
self.forward.add(layers.Dense(hidden_size, activation=tf.nn.sigmoid,
kernel_regularizer=regularizers.L2(
self.hparams.kernel_regularizer),
bias_regularizer=regularizers.L2(self.hparams.bias_regularizer)))
# Output layer
self.forward.add(layers.Dense(self.hparams.layer_sizes[-1], activation=tf.nn.softmax,
kernel_regularizer=regularizers.L2(
self.hparams.kernel_regularizer),
bias_regularizer=regularizers.L2(self.hparams.bias_regularizer)))
self.loss = tf.losses.categorical_crossentropy
self.optimizer = Adam(self.hparams.learning_rate)
def call(self, x: tf.Tensor):
return self.forward(x)
@tfbp.runnable
def fit(self):
# getting our training data
X_train, y_train, languages = self.data_loader()
self.compile(self.optimizer, self.loss)
# fitting the model to the data
super().fit(
x=X_train,
y=y_train,
# validation_split=0.1,
epochs=self.hparams.num_epochs,
shuffle=True)
self.extra_params["languages"] = languages
# Save the model
self.save()
@tfbp.runnable
def evaluate(self):
languages = list(self.extra_params['languages'])
# loading the test set
X_test, y_test = self.data_loader()
y_pred = super().predict(X_test)
self.calculate_metrics(y_test, y_pred, languages)
def preprocess_text(self, text):
# The predict file contains a single JSON object whose only key is text.
stripped_text = self.strip_numbers(text)
encoded_text = np.array(self.tfidf.transform(
[stripped_text]).toarray()) # type: ignore
return np.array([stripped_text]), encoded_text
@tfbp.runnable
def predict(self):
languages = list(self.extra_params['languages'])
input_provided = input("Provide text: ")
text, encoded_text = self.preprocess_text(input_provided)
# converting a one hot output to language index
probas = super().predict(encoded_text)
predictions = np.argmax(probas, axis=1)
results = []
for idx, prediction in enumerate(predictions):
print('The sentence "{}" is in {}.'.format(
text[idx], languages[prediction].upper()))
results.append({'text': text[idx], 'language': prediction})
return results
def get_prediction(self, text: str):
languages = self.extra_params["languages"]
encoded_text = self.data_loader.encode_text(text)
probas = super().predict(encoded_text)
predictions = np.argmax(probas, axis=1)
prediction_id = predictions[0]
return {
'entity': "language",
'value': languages[prediction_id],
'confidence': float(probas[0][prediction_id])
}
def calculate_metrics(self, y_true: np.ndarray, y_pred: np.ndarray, languages: list,
formatting: int = 5) -> Tuple[np.float64, dict, dict, dict]:
argm = np.argmax(y_pred, axis=1)
actual_pred = [i == argm[j] for j in range(
y_pred.shape[0]) for i in range(y_pred.shape[1])]
actual_pred = np.array(actual_pred).reshape(-1, y_true.shape[1])
# we use these to compute the metrics
true_positives = (np.logical_and(
actual_pred == y_true, y_true)).sum(axis=0)
actual_positives = y_true.sum(axis=0)
positive_preds = actual_pred.sum(axis=0)
# our chosen metrics are recall, precision, accuracy and F1 score
recall = (true_positives/actual_positives).T
precision = (true_positives/positive_preds).T
f1_score = (2*recall*precision/(recall+precision)).T
# converting our other metrics into a map (dict)
recall = mapify(languages, format_float(recall, padding=formatting))
precision = mapify(languages, format_float(
precision, padding=formatting))
f1_score = mapify(languages, format_float(
f1_score, padding=formatting))
# from one hot vectors to the language index
y_pred = np.array(np.argmax(y_pred, axis=1))
y_true = np.argmax(y_true, axis=1)
accuracy = (y_pred == y_true).mean()
print("accuracy: {}".format(
np.format_float_positional(accuracy, formatting)))
print("recall:\n{}".format(recall))
print("precision:\n{}".format(precision))
print("F1 score:\n{}".format(f1_score))
return (accuracy, recall, precision, f1_score)

View File

@ -1,10 +0,0 @@
tensorflow==2.13.*
transformers==4.30.2
keras==2.13.*
numpy==1.24.*
scikit_learn==1.2.2
fastapi==0.100.0
uvicorn[standard]==0.23.1
autopep8==2.0.2
focal-loss==0.0.7
h5py --only-binary=h5py

View File

@ -1,9 +0,0 @@
tensorflow==2.13.0rc1
tensorflow_macos==2.13.0rc1
transformers==4.30.2
keras==2.13.1rc0
numpy==1.25.0
scikit_learn==1.2.2
fastapi==0.100.0
uvicorn[standard]==0.23.1
autopep8==2.0.2

View File

@ -1,109 +0,0 @@
"""Generic script to run any method in a TensorFlow model."""
from argparse import ArgumentParser
import json
import os
import sys
import boilerplate as tfbp
if __name__ == "__main__":
if len(sys.argv) < 3:
print(
"Usage:\n New run: python run.py [method] [save_dir] [model] [data_loader]"
" [hyperparameters...]\n Existing run: python run.py [method] [save_dir] "
"[data_loader]? [hyperparameters...]",
file=sys.stderr,
)
exit(1)
# Avoid errors due to a missing `experiments` directory.
if not os.path.exists("experiments"):
os.makedirs("experiments")
# Dynamically parse arguments from the command line depending on the model and data
# loader provided. The `method` and `save_dir` arguments are always required.
parser = ArgumentParser()
parser.add_argument("method", type=str)
parser.add_argument("save_dir", type=str)
# If modules.json exists, the model and the data loader modules can be inferred from
# `save_dir`, and the data loader can be optionally changed from its default.
#
# Note that we need to use `sys` because we need to read the command line args to
# determine what to parse with argparse.
modules_json_path = os.path.join("experiments", sys.argv[2], "modules.json")
if os.path.exists(modules_json_path):
with open(modules_json_path) as f:
classes = json.load(f)
Model = tfbp.get_model(classes["model"])
else:
Model = tfbp.get_model(sys.argv[3])
parser.add_argument("model", type=str)
if not os.path.exists(os.path.join("experiments", sys.argv[2])):
os.makedirs(os.path.join("experiments", sys.argv[2]))
with open(modules_json_path, "w") as f:
json.dump(
{"model": sys.argv[3]},
f,
indent=4,
sort_keys=True,
)
args = {}
saved_hparams = {}
hparams_json_path = os.path.join("experiments", sys.argv[2], "hparams.json")
if os.path.exists(hparams_json_path):
with open(hparams_json_path) as f:
saved_hparams = json.load(f)
for name, value in Model.default_hparams.items():
if name in saved_hparams:
value = saved_hparams[name]
args[name] = value
# Add a keyword argument to the argument parser for each hyperparameter.
for name, value in args.items():
# Make sure to correctly parse hyperparameters whose values are lists/tuples.
if type(value) in [list, tuple]:
if not len(value):
raise ValueError(
f"Cannot infer type of hyperparameter `{name}`. Please provide a "
"default value with nonzero length."
)
parser.add_argument(
f"--{name}", f"--{name}_", nargs="+", type=type(value[0]), default=value
)
else:
parser.add_argument(f"--{name}", type=type(value), default=value)
# Collect parsed hyperparameters.
FLAGS = parser.parse_args()
kwargs = {k: v for k, v in FLAGS._get_kwargs()}
for k in ["model", "save_dir"]:
if k in kwargs:
del kwargs[k]
# Instantiate model and data loader.
model = Model(os.path.join("experiments", FLAGS.save_dir), **kwargs)
# Restore the model's weights, or save them for a new run.
if os.path.isfile(os.path.join(model.save_dir, "checkpoint")):
model.restore()
else:
model.save()
# Run the specified model method.
if FLAGS.method not in Model._methods:
methods_str = "\n ".join(Model._methods.keys())
raise ValueError(
f"Model does not have a runnable method `{FLAGS.method}`. Methods available:"
f"\n {methods_str}"
)
Model._methods[FLAGS.method](model)

View File

View File

@ -1,91 +0,0 @@
from .json_helper import JsonHelper
"""
Transform data set from Rasa structure to a compliant one
How to use:
from utils.jisf_data_mapper import JisfDataMapper
mapper = JisfDataMapper()
#mapper.transform_to_new("train.json")
mapper.transform_to_new("test.json")
"""
class JisfDataMapper(object):
def transform_to_new(self, filename: str, reverse: bool = False):
"""this method allows for changing a file's data format."""
helper=JsonHelper()
data = helper.read_dataset_json_file(filename)
copy_file = "copy of "+filename
# we create a copy of the old data format
helper.write_dataset_json_file(data, copy_file)
# alternatively, we could use this method in the opposite direction
if not reverse:
data = self.old_to_new(data)
else:
data = self.new_to_old(data)
helper.write_dataset_json_file(data, filename)
def old_to_new(self,data:dict):
converted_data=dict()
converted_data["common_examples"]=[]
all_intents=set()
all_slots=dict()
for k in data.keys():
common_example=dict()
#text and intent are the same in both formats
common_example["text"]=data[k]["text"]
common_example["intent"]=data[k]["intent"]
common_example["entities"]=[]
all_intents.add(common_example["intent"])
#for every entity, we get its corresponding value as well as the index of its
#start and finish
for slot in data[k]["slots"].keys():
all_slots[slot]=all_slots.get(slot,set())
entity=dict()
entity["entity"]=slot
entity["value"]=data[k]["slots"][slot]
all_slots[slot].add(entity["value"])
entity["start"],entity["end"]=tuple(data[k]["positions"][slot])
common_example["entities"].append(entity)
converted_data["common_examples"].append(common_example)
#lookup tables store all the intents as well as all the slot values seen in the dataset
converted_data["lookup_tables"]=[]
all_slots["intent"]=all_intents
for name,value in all_slots.items():
converted_data["lookup_tables"].append({"name":name,"elements":list(value)})
#regex features and entity synonyms will remain empty for now
converted_data["regex_features"]=[]
converted_data["entity_synonyms"]=[]
return converted_data
def new_to_old(self,data:dict):
old_data=dict()
dataset=data["common_examples"]
#for each piece of text, we make a JSON object.
for i in range(len(dataset)):
item=dict()
item["text"]=dataset[i]["text"]
item["intent"]=dataset[i]["intent"]
item["slots"]=dict()
item["positions"]=dict()
for entity in dataset[i]["entities"]:
item["slots"][entity["entity"]]=entity["value"]
item["positions"][entity["entity"]]=[entity["start"],entity["end"]]
old_data[i]=item
return old_data

View File

@ -1,23 +0,0 @@
import os
import json
class JsonHelper:
data_folder: str
def __init__(self, model:str = "intent_classifier"):
self.data_folder=os.path.join("data",model)
def read_dataset_json_file(self, filename):
file_path = os.path.join(self.data_folder, filename)
if os.path.exists(file_path):
with open(file_path, "r", encoding="utf-8") as json_file:
data = json.load(json_file)
return data
else:
raise FileNotFoundError("No file found with that path!")
def write_dataset_json_file(self, data: dict, file: str, indent: int = 2):
"""converts a dictionary to a JSON file"""
with open(os.path.join(self.data_folder, file), "w", encoding="utf-8") as outfile:
outfile.write(json.dumps(data, indent=indent))