fix: return all data when calling webhook

This commit is contained in:
Gergő Móricz 2024-08-14 17:53:47 +02:00
parent 2e5e480cc2
commit a6c81f9d62
1 changed files with 50 additions and 7 deletions

View File

@ -10,17 +10,15 @@ import { startWebScraperPipeline } from "../main/runWebScraper";
import { callWebhook } from "./webhook";
import { logJob } from "./logging/log_job";
import { initSDK } from "@hyperdx/node-opentelemetry";
import { Job, QueueEvents, tryCatch } from "bullmq";
import { Job } from "bullmq";
import { Logger } from "../lib/logger";
import { ScrapeEvents } from "../lib/scrape-events";
import { Worker } from "bullmq";
import systemMonitor from "./system-monitor";
import { v4 as uuidv4 } from "uuid";
import { WebCrawler } from "../scraper/WebScraper/crawler";
import { getAdjustedMaxDepth } from "../scraper/WebScraper/utils/maxDepthUtils";
import { addCrawlJob, addCrawlJobDone, crawlToCrawler, getCrawl, isCrawlFinished, lockURL } from "../lib/crawl-redis";
import { addCrawlJob, addCrawlJobDone, crawlToCrawler, getCrawl, getCrawlJobs, isCrawlFinished, lockURL } from "../lib/crawl-redis";
import { StoredCrawl } from "../lib/crawl-redis";
import { addScrapeJob } from "./queue-jobs";
import { supabaseGetJobById } from "../../src/lib/supabase-jobs";
if (process.env.ENV === "production") {
initSDK({
@ -170,9 +168,9 @@ async function processJob(job: Job, token: string) {
if (job.data.crawl_id) {
await addCrawlJobDone(job.data.crawl_id, job.id);
if (!job.data.sitemapped) {
const sc = await getCrawl(job.data.crawl_id) as StoredCrawl;
const sc = await getCrawl(job.data.crawl_id) as StoredCrawl;
if (!job.data.sitemapped) {
if (!sc.cancelled) {
const crawler = crawlToCrawler(job.data.crawl_id, sc);
@ -202,6 +200,51 @@ async function processJob(job: Job, token: string) {
}
if (await isCrawlFinished(job.data.crawl_id)) {
const jobIDs = await getCrawlJobs(job.data.crawl_id);
const jobs = (await Promise.all(jobIDs.map(async x => {
if (x === job.id) {
return {
async getState() {
return "completed"
},
timestamp: Date.now(),
returnvalue: docs,
}
}
const j = await getScrapeQueue().getJob(x);
if (process.env.USE_DB_AUTHENTICATION === "true") {
const supabaseData = await supabaseGetJobById(j.id);
if (supabaseData) {
j.returnvalue = supabaseData.docs;
}
}
return j;
}))).sort((a, b) => a.timestamp - b.timestamp);
const jobStatuses = await Promise.all(jobs.map(x => x.getState()));
const jobStatus = sc.cancelled ? "failed" : jobStatuses.every(x => x === "completed") ? "completed" : jobStatuses.some(x => x === "failed") ? "failed" : "active";
const docs = jobs.map(x => Array.isArray(x.returnvalue) ? x.returnvalue[0] : x.returnvalue);
const data = {
success: jobStatus !== "failed",
result: {
links: docs.map((doc) => {
return {
content: doc,
source: doc?.metadata?.sourceURL ?? doc?.url ?? "",
};
}),
},
project_id: job.data.project_id,
error: message /* etc... */,
docs,
};
await callWebhook(job.data.team_id, job.id as string, data);
}
}