feat: seperate crawl fetching jobs to pre queue

This commit is contained in:
Gergő Móricz 2024-09-17 20:26:25 +02:00
parent f5b84e15e1
commit c171baf963
8 changed files with 329 additions and 90 deletions

View File

@ -6,7 +6,7 @@ import { WebSocket } from "ws";
import { v4 as uuidv4 } from "uuid";
import { Logger } from "../../lib/logger";
import { getCrawl, getCrawlExpiry, getCrawlJobs, getDoneJobsOrdered, getDoneJobsOrderedLength, isCrawlFinished, isCrawlFinishedLocked } from "../../lib/crawl-redis";
import { getScrapeQueue } from "../../services/queue-service";
import { getCrawlPreQueue, getScrapeQueue } from "../../services/queue-service";
import { getJob, getJobs } from "./crawl-status";
import * as Sentry from "@sentry/node";
@ -94,9 +94,10 @@ async function crawlStatusWS(ws: WebSocket, req: RequestWithAuth<CrawlStatusPara
doneJobIDs = await getDoneJobsOrdered(req.params.jobId);
const preJobState = await getCrawlPreQueue().getJobState(req.params.jobId);
const jobIDs = await getCrawlJobs(req.params.jobId);
const jobStatuses = await Promise.all(jobIDs.map(x => getScrapeQueue().getJobState(x)));
const status: Exclude<CrawlStatusResponse, ErrorResponse>["status"] = sc.cancelled ? "cancelled" : jobStatuses.every(x => x === "completed") ? "completed" : jobStatuses.some(x => x === "failed") ? "failed" : "scraping";
const status: Exclude<CrawlStatusResponse, ErrorResponse>["status"] = sc.cancelled ? "cancelled" : preJobState === "failed" ? "failed" : preJobState !== "completed" ? "scraping" : jobStatuses.every(x => x === "completed") ? "completed" : jobStatuses.some(x => x === "failed") ? "failed" : "scraping";
const doneJobs = await getJobs(doneJobIDs);
const data = doneJobs.map(x => x.returnvalue);

View File

@ -1,7 +1,7 @@
import { Response } from "express";
import { CrawlStatusParams, CrawlStatusResponse, ErrorResponse, legacyDocumentConverter, RequestWithAuth } from "./types";
import { getCrawl, getCrawlExpiry, getCrawlJobs, getDoneJobsOrdered, getDoneJobsOrderedLength } from "../../lib/crawl-redis";
import { getScrapeQueue } from "../../services/queue-service";
import { getCrawlPreQueue, getScrapeQueue } from "../../services/queue-service";
import { supabaseGetJobById, supabaseGetJobsById } from "../../lib/supabase-jobs";
import { configDotenv } from "dotenv";
configDotenv();
@ -57,9 +57,10 @@ export async function crawlStatusController(req: RequestWithAuth<CrawlStatusPara
const start = typeof req.query.skip === "string" ? parseInt(req.query.skip, 10) : 0;
const end = typeof req.query.limit === "string" ? (start + parseInt(req.query.limit, 10) - 1) : undefined;
const preJobState = await getCrawlPreQueue().getJobState(req.params.jobId);
const jobIDs = await getCrawlJobs(req.params.jobId);
const jobStatuses = await Promise.all(jobIDs.map(x => getScrapeQueue().getJobState(x)));
const status: Exclude<CrawlStatusResponse, ErrorResponse>["status"] = sc.cancelled ? "cancelled" : jobStatuses.every(x => x === "completed") ? "completed" : jobStatuses.some(x => x === "failed") ? "failed" : "scraping";
const status: Exclude<CrawlStatusResponse, ErrorResponse>["status"] = sc.cancelled ? "cancelled" : preJobState === "failed" ? "failed" : preJobState !== "completed" ? "scraping" : jobStatuses.every(x => x === "completed") ? "completed" : jobStatuses.some(x => x === "failed") ? "failed" : "scraping";
const doneJobsLength = await getDoneJobsOrderedLength(req.params.jobId);
const doneJobsOrder = await getDoneJobsOrdered(req.params.jobId, start, end ?? -1);

View File

@ -19,7 +19,7 @@ import {
} from "../../lib/crawl-redis";
import { logCrawl } from "../../services/logging/crawl_log";
import { getScrapeQueue } from "../../services/queue-service";
import { addScrapeJob } from "../../services/queue-jobs";
import { addCrawlPreJob, addScrapeJob } from "../../services/queue-jobs";
import { Logger } from "../../lib/logger";
import { getJobPriority } from "../../lib/job-priority";
import { callWebhook } from "../../services/webhook";
@ -71,85 +71,15 @@ export async function crawlController(
plan: req.auth.plan,
};
const crawler = crawlToCrawler(id, sc);
try {
sc.robots = await crawler.getRobotsTxt();
} catch (e) {
Logger.debug(
`[Crawl] Failed to get robots.txt (this is probably fine!): ${JSON.stringify(
e
)}`
);
}
await saveCrawl(id, sc);
const sitemap = sc.crawlerOptions.ignoreSitemap
? null
: await crawler.tryGetSitemap();
if (sitemap !== null && sitemap.length > 0) {
let jobPriority = 20;
// If it is over 1000, we need to get the job priority,
// otherwise we can use the default priority of 20
if(sitemap.length > 1000){
// set base to 21
jobPriority = await getJobPriority({plan: req.auth.plan, team_id: req.auth.team_id, basePriority: 21})
}
const jobs = sitemap.map((x) => {
const url = x.url;
const uuid = uuidv4();
return {
name: uuid,
data: {
url,
mode: "single_urls",
team_id: req.auth.team_id,
crawlerOptions,
pageOptions,
origin: "api",
crawl_id: id,
sitemapped: true,
webhook: req.body.webhook,
v1: true,
},
opts: {
jobId: uuid,
priority: 20,
},
};
});
await lockURLs(
id,
jobs.map((x) => x.data.url)
);
await addCrawlJobs(
id,
jobs.map((x) => x.opts.jobId)
);
await getScrapeQueue().addBulk(jobs);
} else {
await lockURL(id, sc, req.body.url);
const job = await addScrapeJob(
{
url: req.body.url,
mode: "single_urls",
crawlerOptions: crawlerOptions,
team_id: req.auth.team_id,
pageOptions: pageOptions,
origin: "api",
crawl_id: id,
webhook: req.body.webhook,
v1: true,
},
{
priority: 15,
}
);
await addCrawlJob(id, job.id);
}
await addCrawlPreJob({
auth: req.auth,
crawlerOptions,
pageOptions,
webhook: req.body.webhook,
url: req.body.url,
}, id);
if(req.body.webhook) {
await callWebhook(req.auth.team_id, id, null, req.body.webhook, true, "crawl.started");

View File

@ -266,7 +266,7 @@ export type CrawlStatusResponse =
data: Document[];
};
type AuthObject = {
export type AuthObject = {
team_id: string;
plan: PlanType;
};

View File

@ -4,7 +4,7 @@ import * as Sentry from "@sentry/node";
import express, { NextFunction, Request, Response } from "express";
import bodyParser from "body-parser";
import cors from "cors";
import { getScrapeQueue } from "./services/queue-service";
import { getCrawlPreQueue, getScrapeQueue } from "./services/queue-service";
import { v0Router } from "./routes/v0";
import { initSDK } from "@hyperdx/node-opentelemetry";
import cluster from "cluster";
@ -67,7 +67,7 @@ if (cluster.isMaster) {
serverAdapter.setBasePath(`/admin/${process.env.BULL_AUTH_KEY}/queues`);
const { addQueue, removeQueue, setQueues, replaceQueues } = createBullBoard({
queues: [new BullAdapter(getScrapeQueue())],
queues: [new BullAdapter(getScrapeQueue()), new BullAdapter(getCrawlPreQueue())],
serverAdapter: serverAdapter,
});

View File

@ -1,8 +1,9 @@
import { Job, Queue } from "bullmq";
import { getScrapeQueue } from "./queue-service";
import { getCrawlPreQueue, getScrapeQueue } from "./queue-service";
import { v4 as uuidv4 } from "uuid";
import { WebScraperOptions } from "../types";
import * as Sentry from "@sentry/node";
import { AuthObject } from "../controllers/v1/types";
async function addScrapeJobRaw(
webScraperOptions: any,
@ -49,6 +50,52 @@ export async function addScrapeJob(
}
}
async function addCrawlPreJobRaw(
data: any,
jobId: string,
): Promise<Job> {
return await getCrawlPreQueue().add(jobId, data, {
jobId,
});
}
export async function addCrawlPreJob(
data: {
auth: AuthObject,
crawlerOptions: any,
pageOptions: any,
webhook?: string, // req.body.webhook
url: string, // req.body.url
sentry?: any,
},
jobId: string,
): Promise<Job> {
if (Sentry.isInitialized()) {
const size = JSON.stringify(data).length;
return await Sentry.startSpan({
name: "Add crawl pre job",
op: "queue.publish",
attributes: {
"messaging.message.id": jobId,
"messaging.destination.name": getCrawlPreQueue().name,
"messaging.message.body.size": size,
},
}, async (span) => {
return await addCrawlPreJobRaw({
...data,
sentry: {
trace: Sentry.spanToTraceHeader(span),
baggage: Sentry.spanToBaggageHeader(span),
size,
},
}, jobId);
});
} else {
return await addCrawlPreJobRaw(data, jobId);
}
}
export function waitForJob(jobId: string, timeout: number) {
return new Promise((resolve, reject) => {
const start = Date.now();

View File

@ -3,6 +3,7 @@ import { Logger } from "../lib/logger";
import IORedis from "ioredis";
let scrapeQueue: Queue;
let crawlPreQueue: Queue;
export const redisConnection = new IORedis(process.env.REDIS_URL, {
maxRetriesPerRequest: null,
@ -34,6 +35,33 @@ export function getScrapeQueue() {
return scrapeQueue;
}
export const crawlPreQueueName = "{crawlPreQueue}";
export function getCrawlPreQueue() {
if (!crawlPreQueue) {
crawlPreQueue = new Queue(
crawlPreQueueName,
{
connection: redisConnection,
}
// {
// settings: {
// lockDuration: 1 * 60 * 1000, // 1 minute in milliseconds,
// lockRenewTime: 15 * 1000, // 15 seconds in milliseconds
// stalledInterval: 30 * 1000,
// maxStalledCount: 10,
// },
// defaultJobOptions:{
// attempts: 5
// }
// }
);
Logger.info("Crawl pre queue created");
}
return crawlPreQueue;
}
// === REMOVED IN FAVOR OF POLLING -- NOT RELIABLE
// import { QueueEvents } from 'bullmq';

View File

@ -3,6 +3,7 @@ import "./sentry";
import * as Sentry from "@sentry/node";
import { CustomError } from "../lib/custom-error";
import {
crawlPreQueueName,
getScrapeQueue,
redisConnection,
scrapeQueueName,
@ -24,7 +25,10 @@ import {
finishCrawl,
getCrawl,
getCrawlJobs,
lockURLs,
saveCrawl,
lockURL,
addCrawlJobs,
} from "../lib/crawl-redis";
import { StoredCrawl } from "../lib/crawl-redis";
import { addScrapeJob } from "./queue-jobs";
@ -37,6 +41,7 @@ import {
import { PlanType } from "../types";
import { getJobs } from "../../src/controllers/v1/crawl-status";
import { configDotenv } from "dotenv";
import { AuthObject } from "../../src/controllers/v1/types";
configDotenv();
if (process.env.ENV === "production") {
@ -138,7 +143,7 @@ const workerFun = async (
() => {
Sentry.startSpan(
{
name: "Scrape job",
name: queueName === scrapeQueueName ? "Scrape job" : "Crawl pre job",
attributes: {
job: job.id,
worker: process.env.FLY_MACHINE_ID ?? worker.id,
@ -147,11 +152,11 @@ const workerFun = async (
async (span) => {
await Sentry.startSpan(
{
name: "Process scrape job",
name: queueName === scrapeQueueName ? "Process scrape job" : "Process crawl pre job",
op: "queue.process",
attributes: {
"messaging.message.id": job.id,
"messaging.destination.name": getScrapeQueue().name,
"messaging.destination.name": queueName,
"messaging.message.body.size": job.data.sentry.size,
"messaging.message.receive.latency":
Date.now() - (job.processedOn ?? job.timestamp),
@ -174,7 +179,7 @@ const workerFun = async (
} else {
Sentry.startSpan(
{
name: "Scrape job",
name: queueName === scrapeQueueName ? "Scrape job" : "Crawl pre job",
attributes: {
job: job.id,
worker: process.env.FLY_MACHINE_ID ?? worker.id,
@ -560,3 +565,230 @@ async function processJob(job: Job, token: string) {
// wsq.on("paused", j => ScrapeEvents.logJobEvent(j, "paused"));
// wsq.on("resumed", j => ScrapeEvents.logJobEvent(j, "resumed"));
// wsq.on("removed", j => ScrapeEvents.logJobEvent(j, "removed"));
const processCrawlPreJobInternal = async (token: string, job: Job) => {
const extendLockInterval = setInterval(async () => {
Logger.info(`🐂 Worker extending lock on crawl pre job ${job.id}`);
await job.extendLock(token, jobLockExtensionTime);
}, jobLockExtendInterval);
let err = null;
try {
const result = await processCrawlPreJob(job, token);
try {
await job.moveToCompleted(null, token, false);
} catch (e) {}
} catch (error) {
console.log("Job failed, error:", error);
Sentry.captureException(error);
err = error;
await job.moveToFailed(error, token, false);
} finally {
clearInterval(extendLockInterval);
}
return err;
};
workerFun(crawlPreQueueName, processCrawlPreJobInternal);
async function processCrawlPreJob(job: Job, token: string) {
Logger.info(`🐂 Worker taking crawl pre job ${job.id}`);
try {
const data = job.data as {
auth: AuthObject,
crawlerOptions: any,
pageOptions: any,
webhook?: string, // req.body.webhook
url: string, // req.body.url
};
const id = job.id as string;
const sc = await getCrawl(id);
if (!sc) throw new Error("Stored crawl with id " + id + " not found");
const crawler = crawlToCrawler(id, sc);
try {
sc.robots = await crawler.getRobotsTxt();
} catch (e) {
Logger.debug(
`[Crawl] Failed to get robots.txt (this is probably fine!): ${JSON.stringify(
e
)}`
);
}
await saveCrawl(id, sc);
const sitemap = sc.crawlerOptions.ignoreSitemap
? null
: await crawler.tryGetSitemap();
if (sitemap !== null && sitemap.length > 0) {
let jobPriority = 20;
// If it is over 1000, we need to get the job priority,
// otherwise we can use the default priority of 20
if(sitemap.length > 1000){
// set base to 21
jobPriority = await getJobPriority({plan: data.auth.plan, team_id: data.auth.team_id, basePriority: 21})
}
const jobs = sitemap.map((x) => {
const url = x.url;
const uuid = uuidv4();
return {
name: uuid,
data: {
url,
mode: "single_urls",
team_id: data.auth.team_id,
crawlerOptions: data.crawlerOptions,
pageOptions: data.pageOptions,
origin: "api",
crawl_id: id,
sitemapped: true,
webhook: data.webhook,
v1: true,
},
opts: {
jobId: uuid,
priority: 20,
},
};
});
await lockURLs(
id,
jobs.map((x) => x.data.url)
);
await addCrawlJobs(
id,
jobs.map((x) => x.opts.jobId)
);
await getScrapeQueue().addBulk(jobs);
} else {
await lockURL(id, sc, data.url);
const job = await addScrapeJob(
{
url: data.url,
mode: "single_urls",
crawlerOptions: data.crawlerOptions,
team_id: data.auth.team_id,
pageOptions: data.pageOptions,
origin: "api",
crawl_id: id,
webhook: data.webhook,
v1: true,
},
{
priority: 15,
}
);
await addCrawlJob(id, job.id);
}
Logger.info(`🐂 Crawl pre job done ${job.id}`);
} catch (error) {
Logger.error(`🐂 Job errored ${job.id} - ${error}`);
Sentry.captureException(error, {
data: {
job: job.id,
},
});
if (error instanceof CustomError) {
// Here we handle the error, then save the failed job
Logger.error(error.message); // or any other error handling
logtail.error("Custom error while ingesting", {
job_id: job.id,
error: error.message,
dataIngestionJob: error.dataIngestionJob,
});
}
Logger.error(error);
if (error.stack) {
Logger.error(error.stack);
}
logtail.error("Overall error ingesting", {
job_id: job.id,
error: error.message,
});
const data = {
success: false,
docs: [],
project_id: job.data.project_id,
error:
"Something went wrong... Contact help@mendable.ai or try again." /* etc... */,
};
if (!job.data.v1 && (job.data.mode === "crawl" || job.data.crawl_id)) {
callWebhook(
job.data.team_id,
job.data.crawl_id ?? (job.id as string),
data,
job.data.webhook,
job.data.v1
);
}
if (job.data.v1) {
callWebhook(
job.data.team_id,
job.id as string,
[],
job.data.webhook,
job.data.v1,
"crawl.failed"
);
}
if (job.data.crawl_id) {
await logJob({
job_id: job.id as string,
success: false,
message:
typeof error === "string"
? error
: error.message ??
"Something went wrong... Contact help@mendable.ai",
num_docs: 0,
docs: [],
time_taken: 0,
team_id: job.data.team_id,
mode: job.data.mode,
url: job.data.url,
crawlerOptions: job.data.crawlerOptions,
pageOptions: job.data.pageOptions,
origin: job.data.origin,
crawl_id: job.data.crawl_id,
});
const sc = await getCrawl(job.data.crawl_id);
await logJob({
job_id: job.data.crawl_id,
success: false,
message:
typeof error === "string"
? error
: error.message ??
"Something went wrong... Contact help@mendable.ai",
num_docs: 0,
docs: [],
time_taken: 0,
team_id: job.data.team_id,
mode: "crawl",
url: sc ? sc.originUrl : job.data.url,
crawlerOptions: sc ? sc.crawlerOptions : job.data.crawlerOptions,
pageOptions: sc ? sc.pageOptions : job.data.pageOptions,
origin: job.data.origin,
});
}
// done(null, data);
return data;
}
}