fix(crawl): submit sitemapped jobs in bulk

This commit is contained in:
Gergő Móricz 2024-08-14 20:34:19 +02:00
parent 2ca1017fc3
commit b8ec40dd72
2 changed files with 39 additions and 15 deletions

View File

@ -10,7 +10,8 @@ import { createIdempotencyKey } from "../../src/services/idempotency/create";
import { defaultCrawlPageOptions, defaultCrawlerOptions, defaultOrigin } from "../../src/lib/default-values";
import { v4 as uuidv4 } from "uuid";
import { Logger } from "../../src/lib/logger";
import { addCrawlJob, crawlToCrawler, lockURL, saveCrawl, StoredCrawl } from "../../src/lib/crawl-redis";
import { addCrawlJob, addCrawlJobs, crawlToCrawler, lockURL, lockURLs, saveCrawl, StoredCrawl } from "../../src/lib/crawl-redis";
import { getScrapeQueue } from "../../src/services/queue-service";
export async function crawlController(req: Request, res: Response) {
try {
@ -115,20 +116,31 @@ export async function crawlController(req: Request, res: Response) {
const sitemap = sc.crawlerOptions?.ignoreSitemap ? null : await crawler.tryGetSitemap();
if (sitemap !== null) {
for (const url of sitemap.map(x => x.url)) {
await lockURL(id, sc, url);
const job = await addScrapeJob({
url,
mode: "single_urls",
crawlerOptions: crawlerOptions,
team_id: team_id,
pageOptions: pageOptions,
origin: req.body.origin ?? defaultOrigin,
crawl_id: id,
sitemapped: true,
});
await addCrawlJob(id, job.id);
}
const jobs = sitemap.map(x => {
const url = x.url;
const uuid = uuidv4();
return {
name: uuid,
data: {
url,
mode: "single_urls",
crawlerOptions: crawlerOptions,
team_id: team_id,
pageOptions: pageOptions,
origin: req.body.origin ?? defaultOrigin,
crawl_id: id,
sitemapped: true,
},
opts: {
jobId: uuid,
priority: 2,
}
};
})
await lockURLs(id, jobs.map(x => x.data.url));
await addCrawlJobs(id, jobs.map(x => x.opts.jobId));
await getScrapeQueue().addBulk(jobs);
} else {
await lockURL(id, sc, url);
const job = await addScrapeJob({

View File

@ -30,6 +30,11 @@ export async function addCrawlJob(id: string, job_id: string) {
await redisConnection.expire("crawl:" + id + ":jobs", 24 * 60 * 60, "NX");
}
export async function addCrawlJobs(id: string, job_ids: string[]) {
await redisConnection.sadd("crawl:" + id + ":jobs", ...job_ids);
await redisConnection.expire("crawl:" + id + ":jobs", 24 * 60 * 60, "NX");
}
export async function addCrawlJobDone(id: string, job_id: string) {
await redisConnection.sadd("crawl:" + id + ":jobs_done", job_id);
await redisConnection.expire("crawl:" + id + ":jobs_done", 24 * 60 * 60, "NX");
@ -54,6 +59,13 @@ export async function lockURL(id: string, sc: StoredCrawl, url: string): Promise
return res;
}
/// NOTE: does not check limit. only use if limit is checked beforehand e.g. with sitemap
export async function lockURLs(id: string, urls: string[]): Promise<boolean> {
const res = (await redisConnection.sadd("crawl:" + id + ":visited", ...urls)) !== 0
await redisConnection.expire("crawl:" + id + ":visited", 24 * 60 * 60, "NX");
return res;
}
export function crawlToCrawler(id: string, sc: StoredCrawl): WebCrawler {
const crawler = new WebCrawler({
jobId: id,