diff --git a/apps/workers/crawlerWorker.ts b/apps/workers/crawlerWorker.ts index f607234e..c8141d39 100644 --- a/apps/workers/crawlerWorker.ts +++ b/apps/workers/crawlerWorker.ts @@ -166,20 +166,20 @@ export class CrawlerWorker { /* timeoutSec */ serverConfig.crawler.jobTimeoutSec, ), onComplete: async (job) => { - const jobId = job?.id ?? "unknown"; + const jobId = job.id; logger.info(`[Crawler][${jobId}] Completed successfully`); - const bookmarkId = job?.data.bookmarkId; + const bookmarkId = job.data.bookmarkId; if (bookmarkId) { await changeBookmarkStatus(bookmarkId, "success"); } }, onError: async (job) => { - const jobId = job?.id ?? "unknown"; + const jobId = job.id; logger.error( `[Crawler][${jobId}] Crawling job failed: ${job.error}\n${job.error.stack}`, ); const bookmarkId = job.data?.bookmarkId; - if (bookmarkId) { + if (bookmarkId && job.numRetriesLeft == 0) { await changeBookmarkStatus(bookmarkId, "failure"); } }, diff --git a/apps/workers/openaiWorker.ts b/apps/workers/openaiWorker.ts index 571d5b73..948e92a7 100644 --- a/apps/workers/openaiWorker.ts +++ b/apps/workers/openaiWorker.ts @@ -68,16 +68,18 @@ export class OpenAiWorker { { run: runOpenAI, onComplete: async (job) => { - const jobId = job?.id ?? "unknown"; + const jobId = job.id; logger.info(`[inference][${jobId}] Completed successfully`); - await attemptMarkTaggingStatus(job?.data, "success"); + await attemptMarkTaggingStatus(job.data, "success"); }, onError: async (job) => { - const jobId = job?.id ?? "unknown"; + const jobId = job.id; logger.error( `[inference][${jobId}] inference job failed: ${job.error}\n${job.error.stack}`, ); - await attemptMarkTaggingStatus(job?.data, "failure"); + if (job.numRetriesLeft == 0) { + await attemptMarkTaggingStatus(job?.data, "failure"); + } }, }, { @@ -387,7 +389,7 @@ async function connectTags( } async function runOpenAI(job: DequeuedJob) { - const jobId = job.id ?? "unknown"; + const jobId = job.id; const inferenceClient = InferenceClientFactory.build(); if (!inferenceClient) { diff --git a/apps/workers/searchWorker.ts b/apps/workers/searchWorker.ts index d2f1dffc..1fbdbe73 100644 --- a/apps/workers/searchWorker.ts +++ b/apps/workers/searchWorker.ts @@ -19,12 +19,12 @@ export class SearchIndexingWorker { { run: runSearchIndexing, onComplete: (job) => { - const jobId = job?.id ?? "unknown"; + const jobId = job.id; logger.info(`[search][${jobId}] Completed successfully`); return Promise.resolve(); }, onError: (job) => { - const jobId = job?.id ?? "unknown"; + const jobId = job.id; logger.error( `[search][${jobId}] search job failed: ${job.error}\n${job.error.stack}`, ); @@ -117,7 +117,7 @@ async function runDelete( } async function runSearchIndexing(job: DequeuedJob) { - const jobId = job.id ?? "unknown"; + const jobId = job.id; const request = zSearchIndexingRequestSchema.safeParse(job.data); if (!request.success) { diff --git a/apps/workers/tidyAssetsWorker.ts b/apps/workers/tidyAssetsWorker.ts index c70736f2..bea0b7d9 100644 --- a/apps/workers/tidyAssetsWorker.ts +++ b/apps/workers/tidyAssetsWorker.ts @@ -19,12 +19,12 @@ export class TidyAssetsWorker { { run: runTidyAssets, onComplete: (job) => { - const jobId = job?.id ?? "unknown"; + const jobId = job.id; logger.info(`[tidyAssets][${jobId}] Completed successfully`); return Promise.resolve(); }, onError: (job) => { - const jobId = job?.id ?? "unknown"; + const jobId = job.id; logger.error( `[tidyAssets][${jobId}] tidy assets job failed: ${job.error}\n${job.error.stack}`, ); @@ -86,7 +86,7 @@ async function handleAsset( } async function runTidyAssets(job: DequeuedJob) { - const jobId = job.id ?? "unknown"; + const jobId = job.id; const request = zTidyAssetsRequestSchema.safeParse(job.data); if (!request.success) { diff --git a/apps/workers/videoWorker.ts b/apps/workers/videoWorker.ts index a85a8cae..444448f7 100644 --- a/apps/workers/videoWorker.ts +++ b/apps/workers/videoWorker.ts @@ -14,7 +14,11 @@ import { } from "@hoarder/shared/assetdb"; import serverConfig from "@hoarder/shared/config"; import logger from "@hoarder/shared/logger"; -import { VideoWorkerQueue, ZVideoRequest } from "@hoarder/shared/queues"; +import { + VideoWorkerQueue, + ZVideoRequest, + zvideoRequestSchema, +} from "@hoarder/shared/queues"; import { withTimeout } from "./utils"; import { getBookmarkDetails, updateAsset } from "./workerUtils"; @@ -33,14 +37,14 @@ export class VideoWorker { /* timeoutSec */ serverConfig.crawler.downloadVideoTimeout, ), onComplete: async (job) => { - const jobId = job?.id ?? "unknown"; + const jobId = job.id; logger.info( `[VideoCrawler][${jobId}] Video Download Completed successfully`, ); return Promise.resolve(); }, onError: async (job) => { - const jobId = job?.id ?? "unknown"; + const jobId = job.id; logger.error( `[VideoCrawler][${jobId}] Video Download job failed: ${job.error}`, ); @@ -51,6 +55,7 @@ export class VideoWorker { pollIntervalMs: 1000, timeoutSecs: serverConfig.crawler.downloadVideoTimeout, concurrency: 1, + validator: zvideoRequestSchema, }, ); } @@ -71,7 +76,7 @@ function prepareYtDlpArguments(url: string, assetPath: string) { } async function runWorker(job: DequeuedJob) { - const jobId = job.id ?? "unknown"; + const jobId = job.id; const { bookmarkId } = job.data; const {