Merge pull request #594 from mendableai/v1/webhooks

[v1] Webhooks
This commit is contained in:
Nicolas 2024-09-01 15:13:10 -03:00 committed by GitHub
commit 4f3d421c70
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 346 additions and 148 deletions

View File

@ -22,6 +22,7 @@ import { getScrapeQueue } from "../../services/queue-service";
import { addScrapeJob } from "../../services/queue-jobs";
import { Logger } from "../../lib/logger";
import { getJobPriority } from "../../lib/job-priority";
import { callWebhook } from "../../services/webhook";
export async function crawlController(
req: RequestWithAuth<{}, CrawlResponse, CrawlRequest>,
@ -150,6 +151,10 @@ export async function crawlController(
await addCrawlJob(id, job.id);
}
if(req.body.webhook) {
await callWebhook(req.auth.team_id, id, null, req.body.webhook, true, "crawl.started");
}
return res.status(200).json({
success: true,
id,

View File

@ -1,5 +1,5 @@
import "dotenv/config";
import "./sentry"
import "./sentry";
import * as Sentry from "@sentry/node";
import { CustomError } from "../lib/custom-error";
import {
@ -17,12 +17,25 @@ import { Logger } from "../lib/logger";
import { Worker } from "bullmq";
import systemMonitor from "./system-monitor";
import { v4 as uuidv4 } from "uuid";
import { addCrawlJob, addCrawlJobDone, crawlToCrawler, finishCrawl, getCrawl, getCrawlJobs, lockURL } from "../lib/crawl-redis";
import {
addCrawlJob,
addCrawlJobDone,
crawlToCrawler,
finishCrawl,
getCrawl,
getCrawlJobs,
lockURL,
} from "../lib/crawl-redis";
import { StoredCrawl } from "../lib/crawl-redis";
import { addScrapeJob } from "./queue-jobs";
import { supabaseGetJobById } from "../../src/lib/supabase-jobs";
import { addJobPriority, deleteJobPriority, getJobPriority } from "../../src/lib/job-priority";
import {
addJobPriority,
deleteJobPriority,
getJobPriority,
} from "../../src/lib/job-priority";
import { PlanType } from "../types";
import { getJobs } from "../../src/controllers/v1/crawl-status";
if (process.env.ENV === "production") {
initSDK({
@ -52,25 +65,24 @@ const processJobInternal = async (token: string, job: Job) => {
await job.extendLock(token, jobLockExtensionTime);
}, jobLockExtendInterval);
await addJobPriority(job.data.team_id, job.id );
await addJobPriority(job.data.team_id, job.id);
let err = null;
try {
const result = await processJob(job, token);
try{
try {
if (job.data.crawl_id && process.env.USE_DB_AUTHENTICATION === "true") {
await job.moveToCompleted(null, token, false);
} else {
await job.moveToCompleted(result.docs, token, false);
}
}catch(e){
}
} catch (e) {}
} catch (error) {
console.log("Job failed, error:", error);
Sentry.captureException(error);
err = error;
await job.moveToFailed(error, token, false);
} finally {
await deleteJobPriority(job.data.team_id, job.id );
await deleteJobPriority(job.data.team_id, job.id);
clearInterval(extendLockInterval);
}
@ -84,7 +96,10 @@ process.on("SIGINT", () => {
isShuttingDown = true;
});
const workerFun = async (queueName: string, processJobInternal: (token: string, job: Job) => Promise<any>) => {
const workerFun = async (
queueName: string,
processJobInternal: (token: string, job: Job) => Promise<any>
) => {
const worker = new Worker(queueName, null, {
connection: redisConnection,
lockDuration: 1 * 60 * 1000, // 1 minute
@ -113,46 +128,62 @@ const workerFun = async (queueName: string, processJobInternal: (token: string,
const job = await worker.getNextJob(token);
if (job) {
if (job.data && job.data.sentry && Sentry.isInitialized()) {
Sentry.continueTrace({ sentryTrace: job.data.sentry.trace, baggage: job.data.sentry.baggage }, () => {
Sentry.startSpan({
Sentry.continueTrace(
{
sentryTrace: job.data.sentry.trace,
baggage: job.data.sentry.baggage,
},
() => {
Sentry.startSpan(
{
name: "Scrape job",
attributes: {
job: job.id,
worker: process.env.FLY_MACHINE_ID ?? worker.id,
},
},
async (span) => {
await Sentry.startSpan(
{
name: "Process scrape job",
op: "queue.process",
attributes: {
"messaging.message.id": job.id,
"messaging.destination.name": getScrapeQueue().name,
"messaging.message.body.size": job.data.sentry.size,
"messaging.message.receive.latency":
Date.now() - (job.processedOn ?? job.timestamp),
"messaging.message.retry.count": job.attemptsMade,
},
},
async () => {
const res = await processJobInternal(token, job);
if (res !== null) {
span.setStatus({ code: 2 }); // ERROR
} else {
span.setStatus({ code: 1 }); // OK
}
}
);
}
);
}
);
} else {
Sentry.startSpan(
{
name: "Scrape job",
attributes: {
job: job.id,
worker: process.env.FLY_MACHINE_ID ?? worker.id,
},
}, async (span) => {
await Sentry.startSpan({
name: "Process scrape job",
op: "queue.process",
attributes: {
"messaging.message.id": job.id,
"messaging.destination.name": getScrapeQueue().name,
"messaging.message.body.size": job.data.sentry.size,
"messaging.message.receive.latency": Date.now() - (job.processedOn ?? job.timestamp),
"messaging.message.retry.count": job.attemptsMade,
}
}, async () => {
const res = await processJobInternal(token, job);
if (res !== null) {
span.setStatus({ code: 2 }); // ERROR
} else {
span.setStatus({ code: 1 }); // OK
}
});
});
});
} else {
Sentry.startSpan({
name: "Scrape job",
attributes: {
job: job.id,
worker: process.env.FLY_MACHINE_ID ?? worker.id,
},
}, () => {
processJobInternal(token, job);
});
() => {
processJobInternal(token, job);
}
);
}
await sleep(gotJobInterval);
} else {
await sleep(connectionMonitorInterval);
@ -167,13 +198,20 @@ async function processJob(job: Job, token: string) {
// Check if the job URL is researchhub and block it immediately
// TODO: remove this once solve the root issue
if (job.data.url && (job.data.url.includes("researchhub.com") || job.data.url.includes("ebay.com") || job.data.url.includes("youtube.com") || job.data.url.includes("microsoft.com") )) {
if (
job.data.url &&
(job.data.url.includes("researchhub.com") ||
job.data.url.includes("ebay.com") ||
job.data.url.includes("youtube.com") ||
job.data.url.includes("microsoft.com"))
) {
Logger.info(`🐂 Blocking job ${job.id} with URL ${job.data.url}`);
const data = {
success: false,
docs: [],
project_id: job.data.project_id,
error: "URL is blocked. Suspecious activity detected. Please contact hello@firecrawl.com if you believe this is an error.",
error:
"URL is blocked. Suspecious activity detected. Please contact hello@firecrawl.com if you believe this is an error.",
};
await job.moveToCompleted(data.docs, token, false);
return data;
@ -187,14 +225,14 @@ async function processJob(job: Job, token: string) {
current_url: "",
});
const start = Date.now();
const { success, message, docs } = await startWebScraperPipeline({
job,
token,
});
// Better if we throw here so we capture with the correct error
if(!success) {
if (!success) {
throw new Error(message);
}
const end = Date.now();
@ -217,8 +255,26 @@ async function processJob(job: Job, token: string) {
docs,
};
if (job.data.mode === "crawl") {
await callWebhook(job.data.team_id, job.id as string, data, job.data.webhook, job.data.v1);
// No idea what this does and when it is called.
if (job.data.mode === "crawl" && !job.data.v1) {
callWebhook(
job.data.team_id,
job.id as string,
data,
job.data.webhook,
job.data.v1
);
}
if (job.data.webhook && job.data.mode !== "crawl" && job.data.v1) {
await callWebhook(
job.data.team_id,
job.data.crawl_id,
data,
job.data.webhook,
job.data.v1,
"crawl.page",
true
);
}
if (job.data.crawl_id) {
@ -240,7 +296,7 @@ async function processJob(job: Job, token: string) {
await addCrawlJobDone(job.data.crawl_id, job.id);
const sc = await getCrawl(job.data.crawl_id) as StoredCrawl;
const sc = (await getCrawl(job.data.crawl_id)) as StoredCrawl;
if (!job.data.sitemapped) {
if (!sc.cancelled) {
@ -250,13 +306,16 @@ async function processJob(job: Job, token: string) {
crawler.extractLinksFromHTML(rawHtml ?? "", sc.originUrl),
Infinity,
sc.crawlerOptions?.maxDepth ?? 10
)
);
for (const link of links) {
if (await lockURL(job.data.crawl_id, sc, link)) {
// This seems to work really welel
const jobPriority = await getJobPriority({plan:sc.plan as PlanType, team_id: sc.team_id, basePriority: job.data.crawl_id ? 20 : 10})
const jobPriority = await getJobPriority({
plan: sc.plan as PlanType,
team_id: sc.team_id,
basePriority: job.data.crawl_id ? 20 : 10,
});
const jobId = uuidv4();
// console.log("plan: ", sc.plan);
@ -264,16 +323,21 @@ async function processJob(job: Job, token: string) {
// console.log("base priority: ", job.data.crawl_id ? 20 : 10)
// console.log("job priority: " , jobPriority, "\n\n\n")
const newJob = await addScrapeJob({
url: link,
mode: "single_urls",
crawlerOptions: sc.crawlerOptions,
team_id: sc.team_id,
pageOptions: sc.pageOptions,
origin: job.data.origin,
crawl_id: job.data.crawl_id,
v1: job.data.v1,
}, {}, jobId, jobPriority);
const newJob = await addScrapeJob(
{
url: link,
mode: "single_urls",
crawlerOptions: sc.crawlerOptions,
team_id: sc.team_id,
pageOptions: sc.pageOptions,
origin: job.data.origin,
crawl_id: job.data.crawl_id,
v1: job.data.v1,
},
{},
jobId,
jobPriority
);
await addCrawlJob(job.data.crawl_id, newJob.id);
}
@ -282,67 +346,98 @@ async function processJob(job: Job, token: string) {
}
if (await finishCrawl(job.data.crawl_id)) {
const jobIDs = await getCrawlJobs(job.data.crawl_id);
const jobs = (await Promise.all(jobIDs.map(async x => {
if (x === job.id) {
return {
async getState() {
return "completed"
},
timestamp: Date.now(),
returnvalue: docs,
}
if (!job.data.v1) {
const jobIDs = await getCrawlJobs(job.data.crawl_id);
const jobs = (await getJobs(jobIDs)).sort((a, b) => a.timestamp - b.timestamp);
const jobStatuses = await Promise.all(jobs.map((x) => x.getState()));
const jobStatus =
sc.cancelled || jobStatuses.some((x) => x === "failed")
? "failed"
: "completed";
const fullDocs = jobs.map((x) =>
Array.isArray(x.returnvalue) ? x.returnvalue[0] : x.returnvalue
);
await logJob({
job_id: job.data.crawl_id,
success: jobStatus === "completed",
message: sc.cancelled ? "Cancelled" : message,
num_docs: fullDocs.length,
docs: [],
time_taken: (Date.now() - sc.createdAt) / 1000,
team_id: job.data.team_id,
mode: "crawl",
url: sc.originUrl,
crawlerOptions: sc.crawlerOptions,
pageOptions: sc.pageOptions,
origin: job.data.origin,
});
const data = {
success: jobStatus !== "failed",
result: {
links: fullDocs.map((doc) => {
return {
content: doc,
source: doc?.metadata?.sourceURL ?? doc?.url ?? "",
};
}),
},
project_id: job.data.project_id,
error: message /* etc... */,
docs: fullDocs,
};
// v0 web hooks, call when done with all the data
if (!job.data.v1) {
callWebhook(
job.data.team_id,
job.data.crawl_id,
data,
job.data.webhook,
job.data.v1,
"crawl.completed"
);
}
} else {
const jobIDs = await getCrawlJobs(job.data.crawl_id);
const jobStatuses = await Promise.all(jobIDs.map((x) => getScrapeQueue().getJobState(x)));
const jobStatus =
sc.cancelled || jobStatuses.some((x) => x === "failed")
? "failed"
: "completed";
const j = await getScrapeQueue().getJob(x);
if (process.env.USE_DB_AUTHENTICATION === "true") {
const supabaseData = await supabaseGetJobById(j.id);
if (supabaseData) {
j.returnvalue = supabaseData.docs;
// v1 web hooks, call when done with no data, but with event completed
if (job.data.v1 && job.data.webhook) {
callWebhook(
job.data.team_id,
job.data.crawl_id,
[],
job.data.webhook,
job.data.v1,
"crawl.completed"
);
}
}
return j;
}))).sort((a, b) => a.timestamp - b.timestamp);
const jobStatuses = await Promise.all(jobs.map(x => x.getState()));
const jobStatus = sc.cancelled || jobStatuses.some(x => x === "failed") ? "failed" : "completed";
const fullDocs = jobs.map(x => Array.isArray(x.returnvalue) ? x.returnvalue[0] : x.returnvalue);
await logJob({
job_id: job.data.crawl_id,
success: jobStatus === "completed",
message: sc.cancelled ? "Cancelled" : message,
num_docs: fullDocs.length,
docs: [],
time_taken: (Date.now() - sc.createdAt) / 1000,
team_id: job.data.team_id,
mode: "crawl",
url: sc.originUrl,
crawlerOptions: sc.crawlerOptions,
pageOptions: sc.pageOptions,
origin: job.data.origin,
});
const data = {
success: jobStatus !== "failed",
result: {
links: fullDocs.map((doc) => {
return {
content: doc,
source: doc?.metadata?.sourceURL ?? doc?.url ?? "",
};
}),
},
project_id: job.data.project_id,
error: message /* etc... */,
docs: fullDocs,
};
await callWebhook(job.data.team_id, job.data.crawl_id, data, job.data.webhook, job.data.v1);
await logJob({
job_id: job.data.crawl_id,
success: jobStatus === "completed",
message: sc.cancelled ? "Cancelled" : message,
num_docs: jobIDs.length,
docs: [],
time_taken: (Date.now() - sc.createdAt) / 1000,
team_id: job.data.team_id,
mode: "crawl",
url: sc.originUrl,
crawlerOptions: sc.crawlerOptions,
pageOptions: sc.pageOptions,
origin: job.data.origin,
});
}
}
}
@ -353,9 +448,9 @@ async function processJob(job: Job, token: string) {
Sentry.captureException(error, {
data: {
job: job.id
job: job.id,
},
})
});
if (error instanceof CustomError) {
// Here we handle the error, then save the failed job
@ -384,11 +479,27 @@ async function processJob(job: Job, token: string) {
error:
"Something went wrong... Contact help@mendable.ai or try again." /* etc... */,
};
if (job.data.mode === "crawl" || job.data.crawl_id) {
await callWebhook(job.data.team_id, job.data.crawl_id ?? job.id as string, data, job.data.webhook, job.data.v1);
if (!job.data.v1 && (job.data.mode === "crawl" || job.data.crawl_id)) {
callWebhook(
job.data.team_id,
job.data.crawl_id ?? (job.id as string),
data,
job.data.webhook,
job.data.v1
);
}
if (job.data.v1) {
callWebhook(
job.data.team_id,
job.id as string,
[],
job.data.webhook,
job.data.v1,
"crawl.failed"
);
}
if (job.data.crawl_id) {
await logJob({
job_id: job.id as string,
@ -396,7 +507,8 @@ async function processJob(job: Job, token: string) {
message:
typeof error === "string"
? error
: error.message ?? "Something went wrong... Contact help@mendable.ai",
: error.message ??
"Something went wrong... Contact help@mendable.ai",
num_docs: 0,
docs: [],
time_taken: 0,
@ -417,7 +529,8 @@ async function processJob(job: Job, token: string) {
message:
typeof error === "string"
? error
: error.message ?? "Something went wrong... Contact help@mendable.ai",
: error.message ??
"Something went wrong... Contact help@mendable.ai",
num_docs: 0,
docs: [],
time_taken: 0,

View File

@ -1,11 +1,24 @@
import axios from "axios";
import { legacyDocumentConverter } from "../../src/controllers/v1/types";
import { Logger } from "../../src/lib/logger";
import { supabase_service } from "./supabase";
import { WebhookEventType } from "../types";
export const callWebhook = async (teamId: string, jobId: string, data: any, specified?: string, v1 = false) => {
export const callWebhook = async (
teamId: string,
id: string,
data: any | null,
specified?: string,
v1 = false,
eventType: WebhookEventType = "crawl.page",
awaitWebhook: boolean = false
) => {
try {
const selfHostedUrl = process.env.SELF_HOSTED_WEBHOOK_URL?.replace("{{JOB_ID}}", jobId);
const useDbAuthentication = process.env.USE_DB_AUTHENTICATION === 'true';
const selfHostedUrl = process.env.SELF_HOSTED_WEBHOOK_URL?.replace(
"{{JOB_ID}}",
id
);
const useDbAuthentication = process.env.USE_DB_AUTHENTICATION === "true";
let webhookUrl = specified ?? selfHostedUrl;
// Only fetch the webhook URL from the database if the self-hosted webhook URL and specified webhook are not set
@ -17,7 +30,9 @@ export const callWebhook = async (teamId: string, jobId: string, data: any, spec
.eq("team_id", teamId)
.limit(1);
if (error) {
Logger.error(`Error fetching webhook URL for team ID: ${teamId}, error: ${error.message}`);
Logger.error(
`Error fetching webhook URL for team ID: ${teamId}, error: ${error.message}`
);
return null;
}
@ -29,10 +44,17 @@ export const callWebhook = async (teamId: string, jobId: string, data: any, spec
}
let dataToSend = [];
if (data.result.links && data.result.links.length !== 0) {
if (
data &&
data.result &&
data.result.links &&
data.result.links.length !== 0
) {
for (let i = 0; i < data.result.links.length; i++) {
if (v1) {
dataToSend.push(legacyDocumentConverter(data.result.links[i].content))
dataToSend.push(
legacyDocumentConverter(data.result.links[i].content)
);
} else {
dataToSend.push({
content: data.result.links[i].content.content,
@ -43,19 +65,72 @@ export const callWebhook = async (teamId: string, jobId: string, data: any, spec
}
}
await fetch(webhookUrl, {
method: "POST",
headers: {
"Content-Type": "application/json",
},
body: JSON.stringify({
success: data.success,
jobId: jobId,
data: dataToSend,
error: data.error || undefined,
}),
});
if (awaitWebhook) {
try {
await axios.post(
webhookUrl,
{
success: !v1
? data.success
: eventType === "crawl.page"
? data.success
: true,
type: eventType,
[v1 ? "id" : "jobId"]: id,
data: dataToSend,
error: !v1
? data?.error || undefined
: eventType === "crawl.page"
? data?.error || undefined
: undefined,
},
{
headers: {
"Content-Type": "application/json",
},
timeout: v1 ? 10000 : 30000, // 10 seconds timeout (v1)
}
);
} catch (error) {
Logger.error(
`Axios error (0) sending webhook for team ID: ${teamId}, error: ${error.message}`
);
}
} else {
axios
.post(
webhookUrl,
{
success: !v1
? data.success
: eventType === "crawl.page"
? data.success
: true,
type: eventType,
[v1 ? "id" : "jobId"]: id,
data: dataToSend,
error: !v1
? data?.error || undefined
: eventType === "crawl.page"
? data?.error || undefined
: undefined,
},
{
headers: {
"Content-Type": "application/json",
},
timeout: v1 ? 10000 : 30000, // 10 seconds timeout (v1)
}
)
.catch((error) => {
Logger.error(
`Axios error sending webhook for team ID: ${teamId}, error: ${error.message}`
);
});
}
} catch (error) {
Logger.debug(`Error sending webhook for team ID: ${teamId}, error: ${error.message}`);
Logger.debug(
`Error sending webhook for team ID: ${teamId}, error: ${error.message}`
);
}
};

View File

@ -153,4 +153,7 @@ export type PlanType =
| "growth"
| "growthdouble"
| "free"
| "";
| "";
export type WebhookEventType = "crawl.page" | "crawl.started" | "crawl.completed" | "crawl.failed";

View File

@ -1,6 +1,6 @@
{
"name": "@mendable/firecrawl-js",
"version": "1.2.0",
"version": "1.2.1",
"description": "JavaScript SDK for Firecrawl API",
"main": "build/cjs/index.js",
"types": "types/index.d.ts",

View File

@ -111,6 +111,7 @@ export interface CrawlParams {
allowExternalLinks?: boolean;
ignoreSitemap?: boolean;
scrapeOptions?: ScrapeParams;
webhook?: string;
}
/**

View File

@ -103,6 +103,7 @@ export interface CrawlParams {
allowExternalLinks?: boolean;
ignoreSitemap?: boolean;
scrapeOptions?: ScrapeParams;
webhook?: string;
}
/**
* Response interface for crawling operations.