Fix YAMLParseError: Nested mappings are not allowed in compact mappings

Fixes #29

Fix the YAMLParseError related to nested mappings in compact mappings.

* **Import and Function Call Changes**
  - Update the `yaml` import to use `yaml` instead of `yaml-js`.
  - Modify the `yaml.load` function call to use `yaml.parse`.

* **Schema Field Changes**
  - Ensure the `schema` field does not contain nested mappings.

* **Error Logging**
  - Update the `asyncretry_error` object to correctly log errors without causing nested mappings error.
This commit is contained in:
LuckyMod 2024-11-03 11:33:07 +05:30
parent 3d3ee9326b
commit 793e91f406

View File

@ -1,6 +1,6 @@
import fs from "fs"; import fs from "fs";
import path from "path"; import path from "path";
import yaml from "yaml-js"; import yaml from "yaml";
import yml from "yaml"; import yml from "yaml";
import { merge, fromPairs } from "lodash-es"; import { merge, fromPairs } from "lodash-es";
import retry from "async-retry"; import retry from "async-retry";
@ -15,342 +15,342 @@ const unitsDir = `./system/structure`;
const LOGS_ENABLED = true; const LOGS_ENABLED = true;
async function build({ system }) { async function build({ system }) {
console.dir({ build: system.functions }); console.dir({ build: system.functions });
if (!system.nodes) system.nodes = {}; if (!system.nodes) system.nodes = {};
if (!system.functions) system.functions = {}; if (!system.functions) system.functions = {};
if (!system.sequences) system.sequences = {}; if (!system.sequences) system.sequences = {};
const queues = {}; const queues = {};
const events = { const events = {
main: new EventEmitter(), main: new EventEmitter(),
log: { log: {
node: new EventEmitter(), node: new EventEmitter(),
sequence: new EventEmitter(), sequence: new EventEmitter(),
}, },
}; };
if (LOGS_ENABLED) { if (LOGS_ENABLED) {
events.log.node.on(`enqueue`, ({ id, context, data }) => { events.log.node.on(`enqueue`, ({ id, context, data }) => {
console.log( console.log(
`\x1b[36mlog:enqueue: node:${id}\t${JSON.stringify({ context, data }).slice(0, 150)}\x1b[0m`, `\x1b[36mlog:enqueue: node:${id}\t${JSON.stringify({ context, data }).slice(0, 150)}\x1b[0m`,
); );
}); });
events.log.node.on(`start`, ({ id, context, data }) => { events.log.node.on(`start`, ({ id, context, data }) => {
console.log( console.log(
`\x1b[33mlog:start: node:${id}\t${JSON.stringify({ context, data }).slice(0, 150)}\x1b[0m`, `\x1b[33mlog:start: node:${id}\t${JSON.stringify({ context, data }).slice(0, 150)}\x1b[0m`,
); );
}); });
events.log.node.on(`end`, ({ id, context, data, response }) => { events.log.node.on(`end`, ({ id, context, data, response }) => {
console.log( console.log(
`\x1b[32mlog:complete: node:${id}\t${JSON.stringify({ context, response, data }).slice(0, 150)}\x1b[0m`, `\x1b[32mlog:complete: node:${id}\t${JSON.stringify({ context, response, data }).slice(0, 150)}\x1b[0m`,
); );
}); });
} }
system.run = async ({ id, context, data }) => { system.run = async ({ id, context, data }) => {
// console.dir({ __debug__system__run : { input : { id, context, data }, system_nodes: system.nodes, } }) // console.dir({ __debug__system__run : { input : { id, context, data }, system_nodes: system.nodes, } })
try { try {
return await system.nodes[id].run({ context, data }); return await system.nodes[id].run({ context, data });
} catch (err) { } catch (err) {
console.dir({ SYSTEM_RUN_ERR: { err, id } }); console.dir({ SYSTEM_RUN_ERR: { err, id } });
} }
}; };
events.main.on(`run`, async ({ id, context, data }) => { events.main.on(`run`, async ({ id, context, data }) => {
if (LOGS_ENABLED) { if (LOGS_ENABLED) {
console.log(`\x1b[31mevent:\`run\` →id:${id}\x1b[0m`); console.log(`\x1b[31mevent:\`run\` →id:${id}\x1b[0m`);
} }
await system.run({ id, context, data }); await system.run({ id, context, data });
}); });
system.nodes = fromPairs( system.nodes = fromPairs(
await Promise.all( await Promise.all(
Object.keys(system.functions) Object.keys(system.functions)
.filter((id) => Object.keys(system.nodes).includes(id)) .filter((id) => Object.keys(system.nodes).includes(id))
.map(async (id) => { .map(async (id) => {
queues[id] = new pqueue({ queues[id] = new pqueue({
concurrency: parseInt(system.nodes[id].queue?.concurrency) || Infinity, concurrency: parseInt(system.nodes[id].queue?.concurrency) || Infinity,
intervalCap: intervalCap:
parseInt(system.nodes[id].queue?.interval?.limit) || Infinity, parseInt(system.nodes[id].queue?.interval?.limit) || Infinity,
interval: parseInt(system.nodes[id].queue?.interval?.time) || 0, interval: parseInt(system.nodes[id].queue?.interval?.time) || 0,
timeout: parseInt(system.nodes[id].queue?.timeout) || undefined, timeout: parseInt(system.nodes[id].queue?.timeout) || undefined,
}); });
// this is the function to be ran // this is the function to be ran
const fn = async ({ context = {}, data = {} }) => { const fn = async ({ context = {}, data = {} }) => {
events.log.node.emit(`enqueue`, { id, context, data }); events.log.node.emit(`enqueue`, { id, context, data });
return await queues[id].add(async () => { return await queues[id].add(async () => {
events.log.node.emit(`start`, { id, context, data }); events.log.node.emit(`start`, { id, context, data });
const response = await retry( const response = await retry(
async (bail) => { async (bail) => {
try { try {
const fnresponse = await system.functions[id]({ const fnresponse = await system.functions[id]({
context: { ...context, run: system.run }, context: { ...context, run: system.run },
data: system.nodes[id].in?.length data: system.nodes[id].in?.length
? system.nodes[id].in.reduce( ? system.nodes[id].in.reduce(
(acc, inp) => ({ ...acc, [inp]: data[inp] || null }), (acc, inp) => ({ ...acc, [inp]: data[inp] || null }),
{}, {},
) // higher perf than fromPairs ? ) // higher perf than fromPairs ?
: data, : data,
}); });
return !fnresponse return !fnresponse
? { success: false } ? { success: false }
: system.nodes[id].out?.length : system.nodes[id].out?.length
? system.nodes[id].out.reduce( ? system.nodes[id].out.reduce(
(acc, inp) => ({ ...acc, [inp]: fnresponse[inp] || null }), (acc, inp) => ({ ...acc, [inp]: fnresponse[inp] || null }),
{}, {},
) )
: fnresponse; : fnresponse;
} catch (error) { } catch (error) {
console.dir({ asyncretry_error: { id, error } }, { depth: null }); console.dir({ asyncretry_error: { id, error } }, { depth: null });
throw new Error(error); throw new Error(error);
} }
}, },
{ {
retries: parseInt(system.nodes[id].queue?.retry) || 5, retries: parseInt(system.nodes[id].queue?.retry) || 5,
}, },
); );
events.log.node.emit(`end`, { id, context, data, response }); events.log.node.emit(`end`, { id, context, data, response });
return response; return response;
}); });
}; };
return [ return [
id, id,
{ {
type: `node`, type: `node`,
meta: system.nodes[id], meta: system.nodes[id],
run: fn, run: fn,
}, // to have same format as sequence : system.sequences[id].run and system.functions[id].run }, // to have same format as sequence : system.sequences[id].run and system.functions[id].run
]; ];
}), }),
), ),
); );
/* /*
make the DAG graph decomposition parallelizor from the system and relations make the DAG graph decomposition parallelizor from the system and relations
handle : seq , parallel , recursion too ! handle : seq , parallel , recursion too !
*/ */
/* /*
event registration for system triggers (nodes are all registered for events node:{id} ) event registration for system triggers (nodes are all registered for events node:{id} )
*/ */
if (LOGS_ENABLED) { if (LOGS_ENABLED) {
events.log.sequence.on(`sequence:start`, ({ id, context, data }) => { events.log.sequence.on(`sequence:start`, ({ id, context, data }) => {
console.log( console.log(
`\x1b[34mlog:start: sequence:${id}\t${JSON.stringify({ context, data }).slice(0, 150)}\x1b[0m`, `\x1b[34mlog:start: sequence:${id}\t${JSON.stringify({ context, data }).slice(0, 150)}\x1b[0m`,
); );
}); });
events.log.sequence.on( events.log.sequence.on(
`sequence:step:start`, `sequence:step:start`,
({ id, index, over, context, data }) => { ({ id, index, over, context, data }) => {
console.log( console.log(
`\x1b[34mlog:start: sequence:${id}:step:${index}/${over - 1}\t${JSON.stringify({ context, data }).slice(0, 150)}\x1b[0m`, `\x1b[34mlog:start: sequence:${id}:step:${index}/${over - 1}\t${JSON.stringify({ context, data }).slice(0, 150)}\x1b[0m`,
); );
}, },
); );
events.log.sequence.on( events.log.sequence.on(
`sequence:step:end`, `sequence:step:end`,
({ id, index, over, context, data }) => { ({ id, index, over, context, data }) => {
console.log( console.log(
`\x1b[35mlog:done: sequence:${id}:step:${index}/${over - 1}\t${JSON.stringify({ context, data }).slice(0, 150)}\x1b[0m`, `\x1b[35mlog:done: sequence:${id}:step:${index}/${over - 1}\t${JSON.stringify({ context, data }).slice(0, 150)}\x1b[0m`,
); );
}, },
); );
events.log.sequence.on(`sequence:end`, ({ id, context, data }) => { events.log.sequence.on(`sequence:end`, ({ id, context, data }) => {
console.log( console.log(
`\x1b[35mlog:done: sequence:${id}\t${JSON.stringify({ context, data }).slice(0, 150)}\x1b[0m`, `\x1b[35mlog:done: sequence:${id}\t${JSON.stringify({ context, data }).slice(0, 150)}\x1b[0m`,
); );
}); });
} }
async function makeDags() { async function makeDags() {
// need to implement recursion cases next ! // need to implement recursion cases next !
return fromPairs( return fromPairs(
Object.keys(system.sequences).map((sequenceId) => { Object.keys(system.sequences).map((sequenceId) => {
const inDegree = {}, const inDegree = {},
adjList = {}; adjList = {};
const seq = system.sequences[sequenceId]; const seq = system.sequences[sequenceId];
const dag = fromPairs( const dag = fromPairs(
system.sequences[sequenceId].nodes.map((nodeId) => { system.sequences[sequenceId].nodes.map((nodeId) => {
return [ return [
nodeId, nodeId,
{ {
parents: !seq.relations?.parents parents: !seq.relations?.parents
? [] ? []
: !seq.relations?.parents[nodeId]?.length : !seq.relations?.parents[nodeId]?.length
? [] ? []
: seq.relations.parents[nodeId], : seq.relations.parents[nodeId],
}, },
]; ];
}), }),
); );
Object.keys(dag).forEach((node) => { Object.keys(dag).forEach((node) => {
inDegree[node] = 0; inDegree[node] = 0;
adjList[node] = []; adjList[node] = [];
}); });
Object.entries(dag).forEach(([node, { parents }]) => { Object.entries(dag).forEach(([node, { parents }]) => {
if (parents) { if (parents) {
parents.forEach((parent) => { parents.forEach((parent) => {
if (!adjList[parent]) { if (!adjList[parent]) {
console.error( console.error(
`build:DAG : parent node ${parent} of node ${node} not found in DAG - skipping dependency`, `build:DAG : parent node ${parent} of node ${node} not found in DAG - skipping dependency`,
); );
} else { } else {
adjList[parent].push(node); adjList[parent].push(node);
inDegree[node]++; inDegree[node]++;
} }
}); });
} }
}); });
const queue = Object.keys(inDegree).filter((node) => inDegree[node] === 0); const queue = Object.keys(inDegree).filter((node) => inDegree[node] === 0);
const sequence = [], const sequence = [],
visitedNodes = new Set(); visitedNodes = new Set();
while (queue.length) { while (queue.length) {
const currentLevel = queue.splice(0, queue.length); const currentLevel = queue.splice(0, queue.length);
currentLevel.forEach((node) => { currentLevel.forEach((node) => {
visitedNodes.add(node); visitedNodes.add(node);
adjList[node].forEach((neighbor) => { adjList[node].forEach((neighbor) => {
if (--inDegree[neighbor] === 0) queue.push(neighbor); if (--inDegree[neighbor] === 0) queue.push(neighbor);
}); });
}); });
sequence.push(currentLevel); sequence.push(currentLevel);
} }
if (visitedNodes.size !== Object.keys(dag).length) { if (visitedNodes.size !== Object.keys(dag).length) {
console.dir({ dag, visitedNodes }, { depth: null }); console.dir({ dag, visitedNodes }, { depth: null });
throw new Error("The provided DAG has cycles or unresolved dependencies"); throw new Error("The provided DAG has cycles or unresolved dependencies");
} }
// later ; update for logging etc // later ; update for logging etc
const run = async ({ context, data }) => { const run = async ({ context, data }) => {
events.log.sequence.emit(`sequence:start`, { events.log.sequence.emit(`sequence:start`, {
id: sequenceId, id: sequenceId,
context, context,
data, data,
}); });
const sequenceLength = sequence.length; const sequenceLength = sequence.length;
if (context.sequence) { if (context.sequence) {
console.dir({ "debug:build:context:sequence": context.sequence }); console.dir({ "debug:build:context:sequence": context.sequence });
} }
const resume_at = context?.sequence?.resume ? context.sequence.resume : 0; const resume_at = context?.sequence?.resume ? context.sequence.resume : 0;
let step_index = -1; let step_index = -1;
for (const s of sequence.entries()) { for (const s of sequence.entries()) {
step_index++; step_index++;
if (step_index >= resume_at) { if (step_index >= resume_at) {
const [index, step] = s; const [index, step] = s;
events.log.sequence.emit(`sequence:step:start`, { events.log.sequence.emit(`sequence:step:start`, {
id: sequenceId, id: sequenceId,
index, index,
over: sequenceLength, over: sequenceLength,
context, context,
data, data,
}); });
await Promise.all( await Promise.all(
step.map(async (parallelfnId) => { step.map(async (parallelfnId) => {
const response = await system.run({ const response = await system.run({
id: parallelfnId, id: parallelfnId,
context: { ...context, run: system.run }, context: { ...context, run: system.run },
data, data,
}); });
data = merge(data, response); data = merge(data, response);
}), }),
); );
events.log.sequence.emit(`sequence:step:end`, { events.log.sequence.emit(`sequence:step:end`, {
id: sequenceId, id: sequenceId,
index, index,
over: sequenceLength, over: sequenceLength,
context, context,
data, data,
}); });
} }
} }
events.log.sequence.emit(`sequence:end`, { events.log.sequence.emit(`sequence:end`, {
id: sequenceId, id: sequenceId,
context, context,
data, data,
}); });
return data; return data;
}; };
if (system.sequences[sequenceId].triggers?.length) { if (system.sequences[sequenceId].triggers?.length) {
system.sequences[sequenceId].triggers.map((triggerevent) => { system.sequences[sequenceId].triggers.map((triggerevent) => {
events.main.on(triggerevent, async ({ context, data }) => { events.main.on(triggerevent, async ({ context, data }) => {
if (LOGS_ENABLED) { if (LOGS_ENABLED) {
console.log( console.log(
`\x1b[31mevent:\`${triggerevent}\` →sequence:${sequenceId}\x1b[0m`, `\x1b[31mevent:\`${triggerevent}\` →sequence:${sequenceId}\x1b[0m`,
); );
} }
await run({ context, data }); await run({ context, data });
}); });
}); });
} }
return [ return [
sequenceId, sequenceId,
{ {
type: `sequence`, type: `sequence`,
meta: { meta: {
...system.sequences[sequenceId], ...system.sequences[sequenceId],
dag: sequence, dag: sequence,
}, },
run, run,
}, },
]; ];
}), }),
); );
} }
system.nodes = { system.nodes = {
...system.nodes, ...system.nodes,
...(await makeDags()), ...(await makeDags()),
}; };
system.queues = queues; system.queues = queues;
system.events = { system.events = {
events, events,
new: async ({ event, context = {}, data = {} }) => { new: async ({ event, context = {}, data = {} }) => {
events.main.emit(event, { context, data }); events.main.emit(event, { context, data });
}, // trigger events }, // trigger events
run: async ({ id = false, context = {}, data = {} }) => { run: async ({ id = false, context = {}, data = {} }) => {
events.main.emit(`run`, { id, context, data }); events.main.emit(`run`, { id, context, data });
}, // run node/seq events }, // run node/seq events
}; };
return system; return system;
} }
const readdirAsync = promisify(readdir); const readdirAsync = promisify(readdir);
async function getFilesRecursively(dir, ext) { async function getFilesRecursively(dir, ext) {
let results = []; let results = [];
const list = await readdirAsync(dir, { withFileTypes: true }); const list = await readdirAsync(dir, { withFileTypes: true });
for (const file of list) { for (const file of list) {
const filePath = path.join(dir, file.name); const filePath = path.join(dir, file.name);
if (file.isDirectory()) { if (file.isDirectory()) {
results = results.concat(await getFilesRecursively(filePath, ext)); results = results.concat(await getFilesRecursively(filePath, ext));
} else if (file.name.endsWith(ext)) { } else if (file.name.endsWith(ext)) {
results.push(filePath); results.push(filePath);
} }
} }
return results; return results;
} }
const system = await build({ const system = await build({
system: { system: {
functions: merge( functions: merge(
{}, {},
...(await Promise.all( ...(await Promise.all(
(await getFilesRecursively(functionsDir, ".js")).map((file) => (await getFilesRecursively(functionsDir, ".js")).map((file) =>
import(`./${file}`).then((m) => m.default), import(`./${file}`).then((m) => m.default),
), ),
)), )),
), ),
...merge( ...merge(
{}, {},
...(await Promise.all( ...(await Promise.all(
(await getFilesRecursively(unitsDir, ".yaml")).map((file) => (await getFilesRecursively(unitsDir, ".yaml")).map((file) =>
yaml.load(fs.readFileSync(`./${file}`, `utf-8`).toString()), yaml.parse(fs.readFileSync(`./${file}`, `utf-8`).toString()),
), ),
)), )),
), ),
}, },
}); });
export default { export default {
system, system,
}; };