From dc189e1e9ded529e15ed0375c357142166cd61b4 Mon Sep 17 00:00:00 2001 From: rafaelsideguide <150964962+rafaelsideguide@users.noreply.github.com> Date: Fri, 30 Aug 2024 16:22:59 -0300 Subject: [PATCH 01/12] feat: webhooks config on v1 --- apps/api/src/services/queue-worker.ts | 10 ++-------- apps/api/src/services/webhook.ts | 6 +++--- 2 files changed, 5 insertions(+), 11 deletions(-) diff --git a/apps/api/src/services/queue-worker.ts b/apps/api/src/services/queue-worker.ts index 31d70a0..c373e6f 100644 --- a/apps/api/src/services/queue-worker.ts +++ b/apps/api/src/services/queue-worker.ts @@ -218,8 +218,8 @@ async function processJob(job: Job, token: string) { docs, }; - if (job.data.mode === "crawl") { - await callWebhook(job.data.team_id, job.id as string, data, job.data.webhook, job.data.v1); + if (job.data.webhook && job.data.mode !== "crawl") { + await callWebhook(job.data.team_id, job.data.crawl_id, data, job.data.webhook, job.data.v1); } if (job.data.crawl_id) { @@ -342,8 +342,6 @@ async function processJob(job: Job, token: string) { error: message /* etc... */, docs: fullDocs, }; - - await callWebhook(job.data.team_id, job.data.crawl_id, data, job.data.webhook, job.data.v1); } } @@ -386,10 +384,6 @@ async function processJob(job: Job, token: string) { "Something went wrong... Contact help@mendable.ai or try again." /* etc... */, }; - if (job.data.mode === "crawl" || job.data.crawl_id) { - await callWebhook(job.data.team_id, job.data.crawl_id ?? job.id as string, data, job.data.webhook, job.data.v1); - } - if (job.data.crawl_id) { await logJob({ job_id: job.id as string, diff --git a/apps/api/src/services/webhook.ts b/apps/api/src/services/webhook.ts index b60774e..a3af9c3 100644 --- a/apps/api/src/services/webhook.ts +++ b/apps/api/src/services/webhook.ts @@ -2,9 +2,9 @@ import { legacyDocumentConverter } from "../../src/controllers/v1/types"; import { Logger } from "../../src/lib/logger"; import { supabase_service } from "./supabase"; -export const callWebhook = async (teamId: string, jobId: string, data: any, specified?: string, v1 = false) => { +export const callWebhook = async (teamId: string, id: string, data: any, specified?: string, v1 = false) => { try { - const selfHostedUrl = process.env.SELF_HOSTED_WEBHOOK_URL?.replace("{{JOB_ID}}", jobId); + const selfHostedUrl = process.env.SELF_HOSTED_WEBHOOK_URL?.replace("{{JOB_ID}}", id); const useDbAuthentication = process.env.USE_DB_AUTHENTICATION === 'true'; let webhookUrl = specified ?? selfHostedUrl; @@ -50,7 +50,7 @@ export const callWebhook = async (teamId: string, jobId: string, data: any, spec }, body: JSON.stringify({ success: data.success, - jobId: jobId, + id: id, data: dataToSend, error: data.error || undefined, }), From 87e61f2d51d6ed9749c6e7e3c9951d871880091d Mon Sep 17 00:00:00 2001 From: rafaelsideguide <150964962+rafaelsideguide@users.noreply.github.com> Date: Fri, 30 Aug 2024 16:38:55 -0300 Subject: [PATCH 02/12] v0 working --- apps/api/src/services/queue-worker.ts | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/apps/api/src/services/queue-worker.ts b/apps/api/src/services/queue-worker.ts index c373e6f..74ba544 100644 --- a/apps/api/src/services/queue-worker.ts +++ b/apps/api/src/services/queue-worker.ts @@ -218,6 +218,9 @@ async function processJob(job: Job, token: string) { docs, }; + if (job.data.mode === "crawl") { + await callWebhook(job.data.team_id, job.id as string, data, job.data.webhook, job.data.v1); + } if (job.data.webhook && job.data.mode !== "crawl") { await callWebhook(job.data.team_id, job.data.crawl_id, data, job.data.webhook, job.data.v1); } @@ -345,6 +348,10 @@ async function processJob(job: Job, token: string) { } } + if (!job.data.v1) { + await callWebhook(job.data.team_id, job.data.crawl_id, data, job.data.webhook, job.data.v1); + } + Logger.info(`🐂 Job done ${job.id}`); return data; } catch (error) { @@ -383,6 +390,10 @@ async function processJob(job: Job, token: string) { error: "Something went wrong... Contact help@mendable.ai or try again." /* etc... */, }; + + if (!job.data.v1 && (job.data.mode === "crawl" || job.data.crawl_id)) { + await callWebhook(job.data.team_id, job.data.crawl_id ?? job.id as string, data, job.data.webhook, job.data.v1); + } if (job.data.crawl_id) { await logJob({ From 95b9dc915d6bd0342abe1f8e4e43be4c9421af3b Mon Sep 17 00:00:00 2001 From: Nicolas Date: Sun, 1 Sep 2024 13:44:36 -0300 Subject: [PATCH 03/12] Nick: webhooks v1 working great --- apps/api/src/controllers/v1/crawl.ts | 5 +++ apps/api/src/services/queue-worker.ts | 37 +++++++++++++---- apps/api/src/services/webhook.ts | 60 +++++++++++++++++++-------- apps/api/src/types.ts | 5 ++- 4 files changed, 82 insertions(+), 25 deletions(-) diff --git a/apps/api/src/controllers/v1/crawl.ts b/apps/api/src/controllers/v1/crawl.ts index fd72c8c..c2d5bdc 100644 --- a/apps/api/src/controllers/v1/crawl.ts +++ b/apps/api/src/controllers/v1/crawl.ts @@ -22,6 +22,7 @@ import { getScrapeQueue } from "../../services/queue-service"; import { addScrapeJob } from "../../services/queue-jobs"; import { Logger } from "../../lib/logger"; import { getJobPriority } from "../../lib/job-priority"; +import { callWebhook } from "../../services/webhook"; export async function crawlController( req: RequestWithAuth<{}, CrawlResponse, CrawlRequest>, @@ -150,6 +151,10 @@ export async function crawlController( await addCrawlJob(id, job.id); } + if(req.body.webhook) { + await callWebhook(req.auth.team_id, id, null, req.body.webhook, true, "crawl.started"); + } + return res.status(200).json({ success: true, id, diff --git a/apps/api/src/services/queue-worker.ts b/apps/api/src/services/queue-worker.ts index e0bb0df..f8675ac 100644 --- a/apps/api/src/services/queue-worker.ts +++ b/apps/api/src/services/queue-worker.ts @@ -217,10 +217,15 @@ async function processJob(job: Job, token: string) { docs, }; - if (job.data.mode === "crawl") { - await callWebhook(job.data.team_id, job.id as string, data, job.data.webhook, job.data.v1); + + + // No idea what this does and when it is called. + if (job.data.mode === "crawl" && !job.data.v1) { + callWebhook(job.data.team_id, job.id as string, data, job.data.webhook, job.data.v1).catch((error) => { + Logger.error(`Error calling webhook for job (1 - mode crawl - v0) ${job.id} - ${error}`); + }); } - if (job.data.webhook && job.data.mode !== "crawl") { + if (job.data.webhook && job.data.mode !== "crawl" && job.data.v1) { await callWebhook(job.data.team_id, job.data.crawl_id, data, job.data.webhook, job.data.v1); } @@ -344,13 +349,24 @@ async function processJob(job: Job, token: string) { error: message /* etc... */, docs: fullDocs, }; + // v0 web hooks, call when done with all the data + if (!job.data.v1) { + callWebhook(job.data.team_id, job.data.crawl_id, data, job.data.webhook, job.data.v1, "crawl.completed").catch((error) => { + Logger.error(`Error calling webhook for job ${job.id} - ${error}`); + }); + } + // v1 web hooks, call when done with no data, but with event completed + if (job.data.v1 && job.data.webhook) { + callWebhook(job.data.team_id, job.id as string, [], job.data.webhook, job.data.v1, "crawl.completed").catch((error) => { + Logger.error(`Error calling webhook for job ${job.id} - ${error}`); + }); + } } } - if (!job.data.v1) { - await callWebhook(job.data.team_id, job.data.crawl_id, data, job.data.webhook, job.data.v1); - } + + Logger.info(`🐂 Job done ${job.id}`); return data; } catch (error) { @@ -391,7 +407,14 @@ async function processJob(job: Job, token: string) { }; if (!job.data.v1 && (job.data.mode === "crawl" || job.data.crawl_id)) { - await callWebhook(job.data.team_id, job.data.crawl_id ?? job.id as string, data, job.data.webhook, job.data.v1); + callWebhook(job.data.team_id, job.data.crawl_id ?? job.id as string, data, job.data.webhook, job.data.v1).catch((error) => { + Logger.error(`Error calling webhook for job (catch - v0) ${job.id} - ${error}`); + }); + } + if(job.data.v1) { + callWebhook(job.data.team_id, job.id as string, [], job.data.webhook, job.data.v1, "crawl.failed").catch((error) => { + Logger.error(`Error calling webhook for job (catch - v1) ${job.id} - ${error}`); + }); } if (job.data.crawl_id) { diff --git a/apps/api/src/services/webhook.ts b/apps/api/src/services/webhook.ts index a3af9c3..1ca7dba 100644 --- a/apps/api/src/services/webhook.ts +++ b/apps/api/src/services/webhook.ts @@ -1,11 +1,23 @@ +import axios from "axios"; import { legacyDocumentConverter } from "../../src/controllers/v1/types"; import { Logger } from "../../src/lib/logger"; import { supabase_service } from "./supabase"; +import { WebhookEventType } from "../types"; -export const callWebhook = async (teamId: string, id: string, data: any, specified?: string, v1 = false) => { +export const callWebhook = async ( + teamId: string, + id: string, + data: any | null, + specified?: string, + v1 = false, + eventType: WebhookEventType = "crawl.page" +) => { try { - const selfHostedUrl = process.env.SELF_HOSTED_WEBHOOK_URL?.replace("{{JOB_ID}}", id); - const useDbAuthentication = process.env.USE_DB_AUTHENTICATION === 'true'; + const selfHostedUrl = process.env.SELF_HOSTED_WEBHOOK_URL?.replace( + "{{JOB_ID}}", + id + ); + const useDbAuthentication = process.env.USE_DB_AUTHENTICATION === "true"; let webhookUrl = specified ?? selfHostedUrl; // Only fetch the webhook URL from the database if the self-hosted webhook URL and specified webhook are not set @@ -17,7 +29,9 @@ export const callWebhook = async (teamId: string, id: string, data: any, specifi .eq("team_id", teamId) .limit(1); if (error) { - Logger.error(`Error fetching webhook URL for team ID: ${teamId}, error: ${error.message}`); + Logger.error( + `Error fetching webhook URL for team ID: ${teamId}, error: ${error.message}` + ); return null; } @@ -29,10 +43,12 @@ export const callWebhook = async (teamId: string, id: string, data: any, specifi } let dataToSend = []; - if (data.result.links && data.result.links.length !== 0) { + if (data && data.result && data.result.links && data.result.links.length !== 0) { for (let i = 0; i < data.result.links.length; i++) { if (v1) { - dataToSend.push(legacyDocumentConverter(data.result.links[i].content)) + dataToSend.push( + legacyDocumentConverter(data.result.links[i].content) + ); } else { dataToSend.push({ content: data.result.links[i].content.content, @@ -43,19 +59,29 @@ export const callWebhook = async (teamId: string, id: string, data: any, specifi } } - await fetch(webhookUrl, { - method: "POST", - headers: { - "Content-Type": "application/json", - }, - body: JSON.stringify({ - success: data.success, - id: id, + axios.post( + webhookUrl, + { + success: !v1 ? data.success : eventType === "crawl.page" ? data.success : true, + type: eventType, + [v1 ? 'id' : 'jobId']: id, data: dataToSend, - error: data.error || undefined, - }), + error: !v1 ? data?.error || undefined : eventType === "crawl.page" ? data?.error || undefined : undefined, + }, + { + headers: { + "Content-Type": "application/json", + }, + timeout: 10000, // 10 seconds timeout + } + ).catch((error) => { + Logger.error( + `Error sending webhook for team ID: ${teamId}, error: ${error.message}` + ); }); } catch (error) { - Logger.debug(`Error sending webhook for team ID: ${teamId}, error: ${error.message}`); + Logger.debug( + `Error sending webhook for team ID: ${teamId}, error: ${error.message}` + ); } }; diff --git a/apps/api/src/types.ts b/apps/api/src/types.ts index 431c012..50fb6ee 100644 --- a/apps/api/src/types.ts +++ b/apps/api/src/types.ts @@ -153,4 +153,7 @@ export type PlanType = | "growth" | "growthdouble" | "free" - | ""; \ No newline at end of file + | ""; + + +export type WebhookEventType = "crawl.page" | "crawl.started" | "crawl.completed" | "crawl.failed"; \ No newline at end of file From 979697df1c6315fbc8e623647868bed745558248 Mon Sep 17 00:00:00 2001 From: Nicolas Date: Sun, 1 Sep 2024 13:47:51 -0300 Subject: [PATCH 04/12] Update queue-worker.ts --- apps/api/src/services/queue-worker.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/api/src/services/queue-worker.ts b/apps/api/src/services/queue-worker.ts index f8675ac..fbf7293 100644 --- a/apps/api/src/services/queue-worker.ts +++ b/apps/api/src/services/queue-worker.ts @@ -357,7 +357,7 @@ async function processJob(job: Job, token: string) { } // v1 web hooks, call when done with no data, but with event completed if (job.data.v1 && job.data.webhook) { - callWebhook(job.data.team_id, job.id as string, [], job.data.webhook, job.data.v1, "crawl.completed").catch((error) => { + callWebhook(job.data.team_id, job.data.crawl_id, [], job.data.webhook, job.data.v1, "crawl.completed").catch((error) => { Logger.error(`Error calling webhook for job ${job.id} - ${error}`); }); } From b68a50fea3b9693be63d679e570d36ad5b59d148 Mon Sep 17 00:00:00 2001 From: Nicolas Date: Sun, 1 Sep 2024 13:52:34 -0300 Subject: [PATCH 05/12] Nick: --- apps/api/src/services/queue-worker.ts | 22 +++-------- apps/api/src/services/webhook.ts | 55 +++++++++++++++++---------- 2 files changed, 41 insertions(+), 36 deletions(-) diff --git a/apps/api/src/services/queue-worker.ts b/apps/api/src/services/queue-worker.ts index fbf7293..d3a34e9 100644 --- a/apps/api/src/services/queue-worker.ts +++ b/apps/api/src/services/queue-worker.ts @@ -221,12 +221,10 @@ async function processJob(job: Job, token: string) { // No idea what this does and when it is called. if (job.data.mode === "crawl" && !job.data.v1) { - callWebhook(job.data.team_id, job.id as string, data, job.data.webhook, job.data.v1).catch((error) => { - Logger.error(`Error calling webhook for job (1 - mode crawl - v0) ${job.id} - ${error}`); - }); + callWebhook(job.data.team_id, job.id as string, data, job.data.webhook, job.data.v1); } if (job.data.webhook && job.data.mode !== "crawl" && job.data.v1) { - await callWebhook(job.data.team_id, job.data.crawl_id, data, job.data.webhook, job.data.v1); + callWebhook(job.data.team_id, job.data.crawl_id, data, job.data.webhook, job.data.v1); } if (job.data.crawl_id) { @@ -351,15 +349,11 @@ async function processJob(job: Job, token: string) { }; // v0 web hooks, call when done with all the data if (!job.data.v1) { - callWebhook(job.data.team_id, job.data.crawl_id, data, job.data.webhook, job.data.v1, "crawl.completed").catch((error) => { - Logger.error(`Error calling webhook for job ${job.id} - ${error}`); - }); + callWebhook(job.data.team_id, job.data.crawl_id, data, job.data.webhook, job.data.v1, "crawl.completed"); } // v1 web hooks, call when done with no data, but with event completed if (job.data.v1 && job.data.webhook) { - callWebhook(job.data.team_id, job.data.crawl_id, [], job.data.webhook, job.data.v1, "crawl.completed").catch((error) => { - Logger.error(`Error calling webhook for job ${job.id} - ${error}`); - }); + callWebhook(job.data.team_id, job.data.crawl_id, [], job.data.webhook, job.data.v1, "crawl.completed"); } } } @@ -407,14 +401,10 @@ async function processJob(job: Job, token: string) { }; if (!job.data.v1 && (job.data.mode === "crawl" || job.data.crawl_id)) { - callWebhook(job.data.team_id, job.data.crawl_id ?? job.id as string, data, job.data.webhook, job.data.v1).catch((error) => { - Logger.error(`Error calling webhook for job (catch - v0) ${job.id} - ${error}`); - }); + callWebhook(job.data.team_id, job.data.crawl_id ?? job.id as string, data, job.data.webhook, job.data.v1); } if(job.data.v1) { - callWebhook(job.data.team_id, job.id as string, [], job.data.webhook, job.data.v1, "crawl.failed").catch((error) => { - Logger.error(`Error calling webhook for job (catch - v1) ${job.id} - ${error}`); - }); + callWebhook(job.data.team_id, job.id as string, [], job.data.webhook, job.data.v1, "crawl.failed"); } if (job.data.crawl_id) { diff --git a/apps/api/src/services/webhook.ts b/apps/api/src/services/webhook.ts index 1ca7dba..e088129 100644 --- a/apps/api/src/services/webhook.ts +++ b/apps/api/src/services/webhook.ts @@ -43,7 +43,12 @@ export const callWebhook = async ( } let dataToSend = []; - if (data && data.result && data.result.links && data.result.links.length !== 0) { + if ( + data && + data.result && + data.result.links && + data.result.links.length !== 0 + ) { for (let i = 0; i < data.result.links.length; i++) { if (v1) { dataToSend.push( @@ -59,26 +64,36 @@ export const callWebhook = async ( } } - axios.post( - webhookUrl, - { - success: !v1 ? data.success : eventType === "crawl.page" ? data.success : true, - type: eventType, - [v1 ? 'id' : 'jobId']: id, - data: dataToSend, - error: !v1 ? data?.error || undefined : eventType === "crawl.page" ? data?.error || undefined : undefined, - }, - { - headers: { - "Content-Type": "application/json", + axios + .post( + webhookUrl, + { + success: !v1 + ? data.success + : eventType === "crawl.page" + ? data.success + : true, + type: eventType, + [v1 ? "id" : "jobId"]: id, + data: dataToSend, + error: !v1 + ? data?.error || undefined + : eventType === "crawl.page" + ? data?.error || undefined + : undefined, }, - timeout: 10000, // 10 seconds timeout - } - ).catch((error) => { - Logger.error( - `Error sending webhook for team ID: ${teamId}, error: ${error.message}` - ); - }); + { + headers: { + "Content-Type": "application/json", + }, + timeout: 10000, // 10 seconds timeout + } + ) + .catch((error) => { + Logger.error( + `Error sending webhook for team ID: ${teamId}, error: ${error.message}` + ); + }); } catch (error) { Logger.debug( `Error sending webhook for team ID: ${teamId}, error: ${error.message}` From 0df2441d7fc602cbf901080c01464c3c5f627fc9 Mon Sep 17 00:00:00 2001 From: Nicolas Date: Sun, 1 Sep 2024 13:54:41 -0300 Subject: [PATCH 06/12] Nick: sdks good --- apps/js-sdk/firecrawl/package.json | 2 +- apps/js-sdk/firecrawl/src/index.ts | 1 + apps/js-sdk/firecrawl/types/index.d.ts | 1 + 3 files changed, 3 insertions(+), 1 deletion(-) diff --git a/apps/js-sdk/firecrawl/package.json b/apps/js-sdk/firecrawl/package.json index 002e10d..e68b301 100644 --- a/apps/js-sdk/firecrawl/package.json +++ b/apps/js-sdk/firecrawl/package.json @@ -1,6 +1,6 @@ { "name": "@mendable/firecrawl-js", - "version": "1.2.0", + "version": "1.2.1", "description": "JavaScript SDK for Firecrawl API", "main": "build/cjs/index.js", "types": "types/index.d.ts", diff --git a/apps/js-sdk/firecrawl/src/index.ts b/apps/js-sdk/firecrawl/src/index.ts index ee55343..1d1bb4e 100644 --- a/apps/js-sdk/firecrawl/src/index.ts +++ b/apps/js-sdk/firecrawl/src/index.ts @@ -111,6 +111,7 @@ export interface CrawlParams { allowExternalLinks?: boolean; ignoreSitemap?: boolean; scrapeOptions?: ScrapeParams; + webhook?: string; } /** diff --git a/apps/js-sdk/firecrawl/types/index.d.ts b/apps/js-sdk/firecrawl/types/index.d.ts index 8b620f8..36356c4 100644 --- a/apps/js-sdk/firecrawl/types/index.d.ts +++ b/apps/js-sdk/firecrawl/types/index.d.ts @@ -103,6 +103,7 @@ export interface CrawlParams { allowExternalLinks?: boolean; ignoreSitemap?: boolean; scrapeOptions?: ScrapeParams; + webhook?: string; } /** * Response interface for crawling operations. From ae90370543dbe2f13c2f8b5629ff38a731059011 Mon Sep 17 00:00:00 2001 From: Nicolas Date: Sun, 1 Sep 2024 13:58:07 -0300 Subject: [PATCH 07/12] Update webhook.ts --- apps/api/src/services/webhook.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/api/src/services/webhook.ts b/apps/api/src/services/webhook.ts index e088129..f077db4 100644 --- a/apps/api/src/services/webhook.ts +++ b/apps/api/src/services/webhook.ts @@ -91,7 +91,7 @@ export const callWebhook = async ( ) .catch((error) => { Logger.error( - `Error sending webhook for team ID: ${teamId}, error: ${error.message}` + `Axios error sending webhook for team ID: ${teamId}, error: ${error.message}` ); }); } catch (error) { From 5c05bb12a7b08ccc22b922f35cad867bf58c8d8a Mon Sep 17 00:00:00 2001 From: Nicolas Date: Sun, 1 Sep 2024 13:58:50 -0300 Subject: [PATCH 08/12] Update webhook.ts --- apps/api/src/services/webhook.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/api/src/services/webhook.ts b/apps/api/src/services/webhook.ts index f077db4..06c1fdf 100644 --- a/apps/api/src/services/webhook.ts +++ b/apps/api/src/services/webhook.ts @@ -86,7 +86,7 @@ export const callWebhook = async ( headers: { "Content-Type": "application/json", }, - timeout: 10000, // 10 seconds timeout + timeout: v1 ? 10000 : 30000, // 10 seconds timeout (v1) } ) .catch((error) => { From 758f729ae27ca379297ec5e87368ef92dd35e015 Mon Sep 17 00:00:00 2001 From: Nicolas Date: Sun, 1 Sep 2024 14:10:47 -0300 Subject: [PATCH 09/12] Update queue-worker.ts --- apps/api/src/services/queue-worker.ts | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/apps/api/src/services/queue-worker.ts b/apps/api/src/services/queue-worker.ts index d3a34e9..477b318 100644 --- a/apps/api/src/services/queue-worker.ts +++ b/apps/api/src/services/queue-worker.ts @@ -288,6 +288,11 @@ async function processJob(job: Job, token: string) { } if (await finishCrawl(job.data.crawl_id)) { + // v1 web hooks, call when done with no data, but with event completed + if (job.data.v1 && job.data.webhook) { + callWebhook(job.data.team_id, job.data.crawl_id, [], job.data.webhook, job.data.v1, "crawl.completed"); + } + const jobIDs = await getCrawlJobs(job.data.crawl_id); const jobs = (await Promise.all(jobIDs.map(async x => { @@ -351,10 +356,7 @@ async function processJob(job: Job, token: string) { if (!job.data.v1) { callWebhook(job.data.team_id, job.data.crawl_id, data, job.data.webhook, job.data.v1, "crawl.completed"); } - // v1 web hooks, call when done with no data, but with event completed - if (job.data.v1 && job.data.webhook) { - callWebhook(job.data.team_id, job.data.crawl_id, [], job.data.webhook, job.data.v1, "crawl.completed"); - } + } } From 44fe741c3551590e28e147d3a00254a47b718586 Mon Sep 17 00:00:00 2001 From: Nicolas Date: Sun, 1 Sep 2024 14:19:43 -0300 Subject: [PATCH 10/12] Update queue-worker.ts --- apps/api/src/services/queue-worker.ts | 371 ++++++++++++++++---------- 1 file changed, 232 insertions(+), 139 deletions(-) diff --git a/apps/api/src/services/queue-worker.ts b/apps/api/src/services/queue-worker.ts index 477b318..627185e 100644 --- a/apps/api/src/services/queue-worker.ts +++ b/apps/api/src/services/queue-worker.ts @@ -1,5 +1,5 @@ import "dotenv/config"; -import "./sentry" +import "./sentry"; import * as Sentry from "@sentry/node"; import { CustomError } from "../lib/custom-error"; import { @@ -17,11 +17,23 @@ import { Logger } from "../lib/logger"; import { Worker } from "bullmq"; import systemMonitor from "./system-monitor"; import { v4 as uuidv4 } from "uuid"; -import { addCrawlJob, addCrawlJobDone, crawlToCrawler, finishCrawl, getCrawl, getCrawlJobs, lockURL } from "../lib/crawl-redis"; +import { + addCrawlJob, + addCrawlJobDone, + crawlToCrawler, + finishCrawl, + getCrawl, + getCrawlJobs, + lockURL, +} from "../lib/crawl-redis"; import { StoredCrawl } from "../lib/crawl-redis"; import { addScrapeJob } from "./queue-jobs"; import { supabaseGetJobById } from "../../src/lib/supabase-jobs"; -import { addJobPriority, deleteJobPriority, getJobPriority } from "../../src/lib/job-priority"; +import { + addJobPriority, + deleteJobPriority, + getJobPriority, +} from "../../src/lib/job-priority"; import { PlanType } from "../types"; if (process.env.ENV === "production") { @@ -52,25 +64,24 @@ const processJobInternal = async (token: string, job: Job) => { await job.extendLock(token, jobLockExtensionTime); }, jobLockExtendInterval); - await addJobPriority(job.data.team_id, job.id ); + await addJobPriority(job.data.team_id, job.id); let err = null; try { const result = await processJob(job, token); - try{ + try { if (job.data.crawl_id && process.env.USE_DB_AUTHENTICATION === "true") { await job.moveToCompleted(null, token, false); } else { await job.moveToCompleted(result.docs, token, false); } - }catch(e){ - } + } catch (e) {} } catch (error) { console.log("Job failed, error:", error); Sentry.captureException(error); err = error; await job.moveToFailed(error, token, false); } finally { - await deleteJobPriority(job.data.team_id, job.id ); + await deleteJobPriority(job.data.team_id, job.id); clearInterval(extendLockInterval); } @@ -84,7 +95,10 @@ process.on("SIGINT", () => { isShuttingDown = true; }); -const workerFun = async (queueName: string, processJobInternal: (token: string, job: Job) => Promise) => { +const workerFun = async ( + queueName: string, + processJobInternal: (token: string, job: Job) => Promise +) => { const worker = new Worker(queueName, null, { connection: redisConnection, lockDuration: 1 * 60 * 1000, // 1 minute @@ -113,46 +127,62 @@ const workerFun = async (queueName: string, processJobInternal: (token: string, const job = await worker.getNextJob(token); if (job) { if (job.data && job.data.sentry && Sentry.isInitialized()) { - Sentry.continueTrace({ sentryTrace: job.data.sentry.trace, baggage: job.data.sentry.baggage }, () => { - Sentry.startSpan({ + Sentry.continueTrace( + { + sentryTrace: job.data.sentry.trace, + baggage: job.data.sentry.baggage, + }, + () => { + Sentry.startSpan( + { + name: "Scrape job", + attributes: { + job: job.id, + worker: process.env.FLY_MACHINE_ID ?? worker.id, + }, + }, + async (span) => { + await Sentry.startSpan( + { + name: "Process scrape job", + op: "queue.process", + attributes: { + "messaging.message.id": job.id, + "messaging.destination.name": getScrapeQueue().name, + "messaging.message.body.size": job.data.sentry.size, + "messaging.message.receive.latency": + Date.now() - (job.processedOn ?? job.timestamp), + "messaging.message.retry.count": job.attemptsMade, + }, + }, + async () => { + const res = await processJobInternal(token, job); + if (res !== null) { + span.setStatus({ code: 2 }); // ERROR + } else { + span.setStatus({ code: 1 }); // OK + } + } + ); + } + ); + } + ); + } else { + Sentry.startSpan( + { name: "Scrape job", attributes: { job: job.id, worker: process.env.FLY_MACHINE_ID ?? worker.id, }, - }, async (span) => { - await Sentry.startSpan({ - name: "Process scrape job", - op: "queue.process", - attributes: { - "messaging.message.id": job.id, - "messaging.destination.name": getScrapeQueue().name, - "messaging.message.body.size": job.data.sentry.size, - "messaging.message.receive.latency": Date.now() - (job.processedOn ?? job.timestamp), - "messaging.message.retry.count": job.attemptsMade, - } - }, async () => { - const res = await processJobInternal(token, job); - if (res !== null) { - span.setStatus({ code: 2 }); // ERROR - } else { - span.setStatus({ code: 1 }); // OK - } - }); - }); - }); - } else { - Sentry.startSpan({ - name: "Scrape job", - attributes: { - job: job.id, - worker: process.env.FLY_MACHINE_ID ?? worker.id, }, - }, () => { - processJobInternal(token, job); - }); + () => { + processJobInternal(token, job); + } + ); } - + await sleep(gotJobInterval); } else { await sleep(connectionMonitorInterval); @@ -167,13 +197,20 @@ async function processJob(job: Job, token: string) { // Check if the job URL is researchhub and block it immediately // TODO: remove this once solve the root issue - if (job.data.url && (job.data.url.includes("researchhub.com") || job.data.url.includes("ebay.com") || job.data.url.includes("youtube.com") || job.data.url.includes("microsoft.com") )) { + if ( + job.data.url && + (job.data.url.includes("researchhub.com") || + job.data.url.includes("ebay.com") || + job.data.url.includes("youtube.com") || + job.data.url.includes("microsoft.com")) + ) { Logger.info(`🐂 Blocking job ${job.id} with URL ${job.data.url}`); const data = { success: false, docs: [], project_id: job.data.project_id, - error: "URL is blocked. Suspecious activity detected. Please contact hello@firecrawl.com if you believe this is an error.", + error: + "URL is blocked. Suspecious activity detected. Please contact hello@firecrawl.com if you believe this is an error.", }; await job.moveToCompleted(data.docs, token, false); return data; @@ -187,14 +224,14 @@ async function processJob(job: Job, token: string) { current_url: "", }); const start = Date.now(); - + const { success, message, docs } = await startWebScraperPipeline({ job, token, }); // Better if we throw here so we capture with the correct error - if(!success) { + if (!success) { throw new Error(message); } const end = Date.now(); @@ -217,14 +254,24 @@ async function processJob(job: Job, token: string) { docs, }; - - // No idea what this does and when it is called. if (job.data.mode === "crawl" && !job.data.v1) { - callWebhook(job.data.team_id, job.id as string, data, job.data.webhook, job.data.v1); + callWebhook( + job.data.team_id, + job.id as string, + data, + job.data.webhook, + job.data.v1 + ); } if (job.data.webhook && job.data.mode !== "crawl" && job.data.v1) { - callWebhook(job.data.team_id, job.data.crawl_id, data, job.data.webhook, job.data.v1); + callWebhook( + job.data.team_id, + job.data.crawl_id, + data, + job.data.webhook, + job.data.v1 + ); } if (job.data.crawl_id) { @@ -246,7 +293,7 @@ async function processJob(job: Job, token: string) { await addCrawlJobDone(job.data.crawl_id, job.id); - const sc = await getCrawl(job.data.crawl_id) as StoredCrawl; + const sc = (await getCrawl(job.data.crawl_id)) as StoredCrawl; if (!job.data.sitemapped) { if (!sc.cancelled) { @@ -256,13 +303,16 @@ async function processJob(job: Job, token: string) { crawler.extractLinksFromHTML(rawHtml ?? "", sc.originUrl), Infinity, sc.crawlerOptions?.maxDepth ?? 10 - ) - + ); + for (const link of links) { if (await lockURL(job.data.crawl_id, sc, link)) { - // This seems to work really welel - const jobPriority = await getJobPriority({plan:sc.plan as PlanType, team_id: sc.team_id, basePriority: job.data.crawl_id ? 20 : 10}) + const jobPriority = await getJobPriority({ + plan: sc.plan as PlanType, + team_id: sc.team_id, + basePriority: job.data.crawl_id ? 20 : 10, + }); const jobId = uuidv4(); // console.log("plan: ", sc.plan); @@ -270,16 +320,21 @@ async function processJob(job: Job, token: string) { // console.log("base priority: ", job.data.crawl_id ? 20 : 10) // console.log("job priority: " , jobPriority, "\n\n\n") - const newJob = await addScrapeJob({ - url: link, - mode: "single_urls", - crawlerOptions: sc.crawlerOptions, - team_id: sc.team_id, - pageOptions: sc.pageOptions, - origin: job.data.origin, - crawl_id: job.data.crawl_id, - v1: job.data.v1, - }, {}, jobId, jobPriority); + const newJob = await addScrapeJob( + { + url: link, + mode: "single_urls", + crawlerOptions: sc.crawlerOptions, + team_id: sc.team_id, + pageOptions: sc.pageOptions, + origin: job.data.origin, + crawl_id: job.data.crawl_id, + v1: job.data.v1, + }, + {}, + jobId, + jobPriority + ); await addCrawlJob(job.data.crawl_id, newJob.id); } @@ -290,79 +345,102 @@ async function processJob(job: Job, token: string) { if (await finishCrawl(job.data.crawl_id)) { // v1 web hooks, call when done with no data, but with event completed if (job.data.v1 && job.data.webhook) { - callWebhook(job.data.team_id, job.data.crawl_id, [], job.data.webhook, job.data.v1, "crawl.completed"); + callWebhook( + job.data.team_id, + job.data.crawl_id, + [], + job.data.webhook, + job.data.v1, + "crawl.completed" + ); } - - const jobIDs = await getCrawlJobs(job.data.crawl_id); - const jobs = (await Promise.all(jobIDs.map(async x => { - if (x === job.id) { - return { - async getState() { - return "completed" - }, - timestamp: Date.now(), - returnvalue: docs, - } - } - - const j = await getScrapeQueue().getJob(x); - - if (process.env.USE_DB_AUTHENTICATION === "true") { - const supabaseData = await supabaseGetJobById(j.id); - - if (supabaseData) { - j.returnvalue = supabaseData.docs; - } - } - - return j; - }))).sort((a, b) => a.timestamp - b.timestamp); - const jobStatuses = await Promise.all(jobs.map(x => x.getState())); - const jobStatus = sc.cancelled || jobStatuses.some(x => x === "failed") ? "failed" : "completed"; - - const fullDocs = jobs.map(x => Array.isArray(x.returnvalue) ? x.returnvalue[0] : x.returnvalue); - - await logJob({ - job_id: job.data.crawl_id, - success: jobStatus === "completed", - message: sc.cancelled ? "Cancelled" : message, - num_docs: fullDocs.length, - docs: [], - time_taken: (Date.now() - sc.createdAt) / 1000, - team_id: job.data.team_id, - mode: "crawl", - url: sc.originUrl, - crawlerOptions: sc.crawlerOptions, - pageOptions: sc.pageOptions, - origin: job.data.origin, - }); - - const data = { - success: jobStatus !== "failed", - result: { - links: fullDocs.map((doc) => { - return { - content: doc, - source: doc?.metadata?.sourceURL ?? doc?.url ?? "", - }; - }), - }, - project_id: job.data.project_id, - error: message /* etc... */, - docs: fullDocs, - }; - // v0 web hooks, call when done with all the data if (!job.data.v1) { - callWebhook(job.data.team_id, job.data.crawl_id, data, job.data.webhook, job.data.v1, "crawl.completed"); + const jobIDs = await getCrawlJobs(job.data.crawl_id); + + const jobs = ( + await Promise.all( + jobIDs.map(async (x) => { + if (x === job.id) { + return { + async getState() { + return "completed"; + }, + timestamp: Date.now(), + returnvalue: docs, + }; + } + + const j = await getScrapeQueue().getJob(x); + + if (process.env.USE_DB_AUTHENTICATION === "true") { + const supabaseData = await supabaseGetJobById(j.id); + + if (supabaseData) { + j.returnvalue = supabaseData.docs; + } + } + + return j; + }) + ) + ).sort((a, b) => a.timestamp - b.timestamp); + const jobStatuses = await Promise.all(jobs.map((x) => x.getState())); + const jobStatus = + sc.cancelled || jobStatuses.some((x) => x === "failed") + ? "failed" + : "completed"; + + const fullDocs = jobs.map((x) => + Array.isArray(x.returnvalue) ? x.returnvalue[0] : x.returnvalue + ); + + await logJob({ + job_id: job.data.crawl_id, + success: jobStatus === "completed", + message: sc.cancelled ? "Cancelled" : message, + num_docs: fullDocs.length, + docs: [], + time_taken: (Date.now() - sc.createdAt) / 1000, + team_id: job.data.team_id, + mode: "crawl", + url: sc.originUrl, + crawlerOptions: sc.crawlerOptions, + pageOptions: sc.pageOptions, + origin: job.data.origin, + }); + + const data = { + success: jobStatus !== "failed", + result: { + links: fullDocs.map((doc) => { + return { + content: doc, + source: doc?.metadata?.sourceURL ?? doc?.url ?? "", + }; + }), + }, + project_id: job.data.project_id, + error: message /* etc... */, + docs: fullDocs, + }; + + console.log(fullDocs.length); + // v0 web hooks, call when done with all the data + if (!job.data.v1) { + callWebhook( + job.data.team_id, + job.data.crawl_id, + data, + job.data.webhook, + job.data.v1, + "crawl.completed" + ); + } } - } } - - - Logger.info(`🐂 Job done ${job.id}`); return data; } catch (error) { @@ -370,9 +448,9 @@ async function processJob(job: Job, token: string) { Sentry.captureException(error, { data: { - job: job.id + job: job.id, }, - }) + }); if (error instanceof CustomError) { // Here we handle the error, then save the failed job @@ -403,12 +481,25 @@ async function processJob(job: Job, token: string) { }; if (!job.data.v1 && (job.data.mode === "crawl" || job.data.crawl_id)) { - callWebhook(job.data.team_id, job.data.crawl_id ?? job.id as string, data, job.data.webhook, job.data.v1); + callWebhook( + job.data.team_id, + job.data.crawl_id ?? (job.id as string), + data, + job.data.webhook, + job.data.v1 + ); } - if(job.data.v1) { - callWebhook(job.data.team_id, job.id as string, [], job.data.webhook, job.data.v1, "crawl.failed"); + if (job.data.v1) { + callWebhook( + job.data.team_id, + job.id as string, + [], + job.data.webhook, + job.data.v1, + "crawl.failed" + ); } - + if (job.data.crawl_id) { await logJob({ job_id: job.id as string, @@ -416,7 +507,8 @@ async function processJob(job: Job, token: string) { message: typeof error === "string" ? error - : error.message ?? "Something went wrong... Contact help@mendable.ai", + : error.message ?? + "Something went wrong... Contact help@mendable.ai", num_docs: 0, docs: [], time_taken: 0, @@ -437,7 +529,8 @@ async function processJob(job: Job, token: string) { message: typeof error === "string" ? error - : error.message ?? "Something went wrong... Contact help@mendable.ai", + : error.message ?? + "Something went wrong... Contact help@mendable.ai", num_docs: 0, docs: [], time_taken: 0, From 980293652d27ced915538c7ce72c2cd9894228f2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Gerg=C5=91=20M=C3=B3ricz?= Date: Sun, 1 Sep 2024 19:29:35 +0200 Subject: [PATCH 11/12] fix(queue-worker): new getJobs, log on v0 --- apps/api/src/services/queue-worker.ts | 51 +++++++++++++-------------- 1 file changed, 24 insertions(+), 27 deletions(-) diff --git a/apps/api/src/services/queue-worker.ts b/apps/api/src/services/queue-worker.ts index 627185e..c133b5c 100644 --- a/apps/api/src/services/queue-worker.ts +++ b/apps/api/src/services/queue-worker.ts @@ -35,6 +35,7 @@ import { getJobPriority, } from "../../src/lib/job-priority"; import { PlanType } from "../types"; +import { getJobs } from "../../src/controllers/v1/crawl-status"; if (process.env.ENV === "production") { initSDK({ @@ -358,33 +359,7 @@ async function processJob(job: Job, token: string) { if (!job.data.v1) { const jobIDs = await getCrawlJobs(job.data.crawl_id); - const jobs = ( - await Promise.all( - jobIDs.map(async (x) => { - if (x === job.id) { - return { - async getState() { - return "completed"; - }, - timestamp: Date.now(), - returnvalue: docs, - }; - } - - const j = await getScrapeQueue().getJob(x); - - if (process.env.USE_DB_AUTHENTICATION === "true") { - const supabaseData = await supabaseGetJobById(j.id); - - if (supabaseData) { - j.returnvalue = supabaseData.docs; - } - } - - return j; - }) - ) - ).sort((a, b) => a.timestamp - b.timestamp); + const jobs = (await getJobs(jobIDs)).sort((a, b) => a.timestamp - b.timestamp); const jobStatuses = await Promise.all(jobs.map((x) => x.getState())); const jobStatus = sc.cancelled || jobStatuses.some((x) => x === "failed") @@ -437,6 +412,28 @@ async function processJob(job: Job, token: string) { "crawl.completed" ); } + } else { + const jobIDs = await getCrawlJobs(job.data.crawl_id); + const jobStatuses = await Promise.all(jobIDs.map((x) => getScrapeQueue().getJobState(x))); + const jobStatus = + sc.cancelled || jobStatuses.some((x) => x === "failed") + ? "failed" + : "completed"; + + await logJob({ + job_id: job.data.crawl_id, + success: jobStatus === "completed", + message: sc.cancelled ? "Cancelled" : message, + num_docs: jobIDs.length, + docs: [], + time_taken: (Date.now() - sc.createdAt) / 1000, + team_id: job.data.team_id, + mode: "crawl", + url: sc.originUrl, + crawlerOptions: sc.crawlerOptions, + pageOptions: sc.pageOptions, + origin: job.data.origin, + }); } } } From cb2dfe29be577a1e7e7d5a9f511dae57ba4f1f87 Mon Sep 17 00:00:00 2001 From: Nicolas Date: Sun, 1 Sep 2024 15:06:36 -0300 Subject: [PATCH 12/12] Nick: --- apps/api/src/services/queue-worker.ts | 31 +++++---- apps/api/src/services/webhook.ts | 92 ++++++++++++++++++--------- 2 files changed, 80 insertions(+), 43 deletions(-) diff --git a/apps/api/src/services/queue-worker.ts b/apps/api/src/services/queue-worker.ts index c133b5c..6488759 100644 --- a/apps/api/src/services/queue-worker.ts +++ b/apps/api/src/services/queue-worker.ts @@ -266,12 +266,14 @@ async function processJob(job: Job, token: string) { ); } if (job.data.webhook && job.data.mode !== "crawl" && job.data.v1) { - callWebhook( + await callWebhook( job.data.team_id, job.data.crawl_id, data, job.data.webhook, - job.data.v1 + job.data.v1, + "crawl.page", + true ); } @@ -344,17 +346,7 @@ async function processJob(job: Job, token: string) { } if (await finishCrawl(job.data.crawl_id)) { - // v1 web hooks, call when done with no data, but with event completed - if (job.data.v1 && job.data.webhook) { - callWebhook( - job.data.team_id, - job.data.crawl_id, - [], - job.data.webhook, - job.data.v1, - "crawl.completed" - ); - } + if (!job.data.v1) { const jobIDs = await getCrawlJobs(job.data.crawl_id); @@ -400,7 +392,6 @@ async function processJob(job: Job, token: string) { docs: fullDocs, }; - console.log(fullDocs.length); // v0 web hooks, call when done with all the data if (!job.data.v1) { callWebhook( @@ -420,6 +411,18 @@ async function processJob(job: Job, token: string) { ? "failed" : "completed"; + // v1 web hooks, call when done with no data, but with event completed + if (job.data.v1 && job.data.webhook) { + callWebhook( + job.data.team_id, + job.data.crawl_id, + [], + job.data.webhook, + job.data.v1, + "crawl.completed" + ); + } + await logJob({ job_id: job.data.crawl_id, success: jobStatus === "completed", diff --git a/apps/api/src/services/webhook.ts b/apps/api/src/services/webhook.ts index 06c1fdf..56dd5c5 100644 --- a/apps/api/src/services/webhook.ts +++ b/apps/api/src/services/webhook.ts @@ -10,7 +10,8 @@ export const callWebhook = async ( data: any | null, specified?: string, v1 = false, - eventType: WebhookEventType = "crawl.page" + eventType: WebhookEventType = "crawl.page", + awaitWebhook: boolean = false ) => { try { const selfHostedUrl = process.env.SELF_HOSTED_WEBHOOK_URL?.replace( @@ -64,36 +65,69 @@ export const callWebhook = async ( } } - axios - .post( - webhookUrl, - { - success: !v1 - ? data.success - : eventType === "crawl.page" - ? data.success - : true, - type: eventType, - [v1 ? "id" : "jobId"]: id, - data: dataToSend, - error: !v1 - ? data?.error || undefined - : eventType === "crawl.page" - ? data?.error || undefined - : undefined, - }, - { - headers: { - "Content-Type": "application/json", + if (awaitWebhook) { + try { + await axios.post( + webhookUrl, + { + success: !v1 + ? data.success + : eventType === "crawl.page" + ? data.success + : true, + type: eventType, + [v1 ? "id" : "jobId"]: id, + data: dataToSend, + error: !v1 + ? data?.error || undefined + : eventType === "crawl.page" + ? data?.error || undefined + : undefined, }, - timeout: v1 ? 10000 : 30000, // 10 seconds timeout (v1) - } - ) - .catch((error) => { - Logger.error( - `Axios error sending webhook for team ID: ${teamId}, error: ${error.message}` + { + headers: { + "Content-Type": "application/json", + }, + timeout: v1 ? 10000 : 30000, // 10 seconds timeout (v1) + } ); - }); + } catch (error) { + Logger.error( + `Axios error (0) sending webhook for team ID: ${teamId}, error: ${error.message}` + ); + } + } else { + axios + .post( + webhookUrl, + { + success: !v1 + ? data.success + : eventType === "crawl.page" + ? data.success + : true, + type: eventType, + [v1 ? "id" : "jobId"]: id, + data: dataToSend, + error: !v1 + ? data?.error || undefined + : eventType === "crawl.page" + ? data?.error || undefined + : undefined, + }, + { + headers: { + "Content-Type": "application/json", + }, + timeout: v1 ? 10000 : 30000, // 10 seconds timeout (v1) + } + ) + .catch((error) => { + Logger.error( + `Axios error sending webhook for team ID: ${teamId}, error: ${error.message}` + ); + }); + } } catch (error) { Logger.debug( `Error sending webhook for team ID: ${teamId}, error: ${error.message}`