Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(server): job discovery #13838

Merged
merged 1 commit into from
Oct 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions server/src/app.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import { controllers } from 'src/controllers';
import { entities } from 'src/entities';
import { ImmichWorker } from 'src/enum';
import { IEventRepository } from 'src/interfaces/event.interface';
import { IJobRepository } from 'src/interfaces/job.interface';
import { ILoggerRepository } from 'src/interfaces/logger.interface';
import { ITelemetryRepository } from 'src/interfaces/telemetry.interface';
import { AuthGuard } from 'src/middleware/auth.guard';
Expand Down Expand Up @@ -64,6 +65,7 @@ abstract class BaseModule implements OnModuleInit, OnModuleDestroy {
constructor(
@Inject(ILoggerRepository) logger: ILoggerRepository,
@Inject(IEventRepository) private eventRepository: IEventRepository,
@Inject(IJobRepository) private jobRepository: IJobRepository,
@Inject(ITelemetryRepository) private telemetryRepository: ITelemetryRepository,
) {
logger.setAppName(this.worker);
Expand All @@ -73,6 +75,12 @@ abstract class BaseModule implements OnModuleInit, OnModuleDestroy {

async onModuleInit() {
this.telemetryRepository.setup({ repositories: repositories.map(({ useClass }) => useClass) });

this.jobRepository.setup({ services });
if (this.worker === ImmichWorker.MICROSERVICES) {
this.jobRepository.startWorkers();
}

this.eventRepository.setup({ services });
await this.eventRepository.emit('app.bootstrap', this.worker);
}
Expand Down
7 changes: 7 additions & 0 deletions server/src/decorators.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import _ from 'lodash';
import { ADDED_IN_PREFIX, DEPRECATED_IN_PREFIX, LIFECYCLE_EXTENSION } from 'src/constants';
import { MetadataKey } from 'src/enum';
import { EmitEvent } from 'src/interfaces/event.interface';
import { JobName, QueueName } from 'src/interfaces/job.interface';
import { setUnion } from 'src/utils/set';

// PostgreSQL uses a 16-bit integer to indicate the number of bound parameters. This means that the
Expand Down Expand Up @@ -122,6 +123,12 @@ export type EventConfig = {
};
export const OnEvent = (config: EventConfig) => SetMetadata(MetadataKey.EVENT_CONFIG, config);

export type JobConfig = {
name: JobName;
queue: QueueName;
};
export const OnJob = (config: JobConfig) => SetMetadata(MetadataKey.JOB_CONFIG, config);

type LifecycleRelease = 'NEXT_RELEASE' | string;
type LifecycleMetadata = {
addedAt?: LifecycleRelease;
Expand Down
1 change: 1 addition & 0 deletions server/src/enum.ts
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,7 @@ export enum MetadataKey {
SHARED_ROUTE = 'shared_route',
API_KEY_SECURITY = 'api_key',
EVENT_CONFIG = 'event_config',
JOB_CONFIG = 'job_config',
TELEMETRY_ENABLED = 'telemetry_enabled',
}

Expand Down
3 changes: 3 additions & 0 deletions server/src/interfaces/event.interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { SystemConfig } from 'src/config';
import { AssetResponseDto } from 'src/dtos/asset-response.dto';
import { ReleaseNotification, ServerVersionResponseDto } from 'src/dtos/server.dto';
import { ImmichWorker } from 'src/enum';
import { JobItem, QueueName } from 'src/interfaces/job.interface';

export const IEventRepository = 'IEventRepository';

Expand Down Expand Up @@ -38,6 +39,8 @@ type EventMap = {
'assets.delete': [{ assetIds: string[]; userId: string }];
'assets.restore': [{ assetIds: string[]; userId: string }];

'job.start': [QueueName, JobItem];

// session events
'session.delete': [{ sessionId: string }];

Expand Down
16 changes: 9 additions & 7 deletions server/src/interfaces/job.interface.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { ClassConstructor } from 'class-transformer';
import { EmailImageAttachment } from 'src/interfaces/notification.interface';

export enum QueueName {
Expand Down Expand Up @@ -238,8 +239,8 @@ export type JobItem =

// Migration
| { name: JobName.QUEUE_MIGRATION; data?: IBaseJob }
| { name: JobName.MIGRATE_ASSET; data?: IEntityJob }
| { name: JobName.MIGRATE_PERSON; data?: IEntityJob }
| { name: JobName.MIGRATE_ASSET; data: IEntityJob }
| { name: JobName.MIGRATE_PERSON; data: IEntityJob }

// Metadata Extraction
| { name: JobName.QUEUE_METADATA_EXTRACTION; data: IBaseJob }
Expand Down Expand Up @@ -286,7 +287,7 @@ export type JobItem =
| { name: JobName.LIBRARY_SYNC_FILE; data: ILibraryFileJob }
| { name: JobName.LIBRARY_QUEUE_SYNC_FILES; data: IEntityJob }
| { name: JobName.LIBRARY_QUEUE_SYNC_ASSETS; data: IEntityJob }
| { name: JobName.LIBRARY_SYNC_ASSET; data: IEntityJob }
| { name: JobName.LIBRARY_SYNC_ASSET; data: ILibraryAssetJob }
| { name: JobName.LIBRARY_DELETE; data: IEntityJob }
| { name: JobName.LIBRARY_QUEUE_SYNC_ALL; data?: IBaseJob }
| { name: JobName.LIBRARY_QUEUE_CLEANUP; data: IBaseJob }
Expand All @@ -305,14 +306,15 @@ export enum JobStatus {
FAILED = 'failed',
SKIPPED = 'skipped',
}

export type JobHandler<T = any> = (data: T) => Promise<JobStatus>;
export type JobItemHandler = (item: JobItem) => Promise<void>;
export type Jobs = { [K in JobItem['name']]: (JobItem & { name: K })['data'] };
export type JobOf<T extends JobName> = Jobs[T];

export const IJobRepository = 'IJobRepository';

export interface IJobRepository {
addHandler(queueName: QueueName, concurrency: number, handler: JobItemHandler): void;
setup(options: { services: ClassConstructor<unknown>[] }): void;
startWorkers(): void;
run(job: JobItem): Promise<JobStatus>;
addCronJob(name: string, expression: string, onTick: () => void, start?: boolean): void;
updateCronJob(name: string, expression?: string, start?: boolean): void;
setConcurrency(queueName: QueueName, concurrency: number): void;
Expand Down
190 changes: 96 additions & 94 deletions server/src/repositories/job.repository.ts
Original file line number Diff line number Diff line change
@@ -1,124 +1,122 @@
import { getQueueToken } from '@nestjs/bullmq';
import { Inject, Injectable } from '@nestjs/common';
import { ModuleRef } from '@nestjs/core';
import { ModuleRef, Reflector } from '@nestjs/core';
import { SchedulerRegistry } from '@nestjs/schedule';
import { Job, JobsOptions, Processor, Queue, Worker, WorkerOptions } from 'bullmq';
import { JobsOptions, Queue, Worker } from 'bullmq';
import { ClassConstructor } from 'class-transformer';
import { CronJob, CronTime } from 'cron';
import { setTimeout } from 'node:timers/promises';
import { JobConfig } from 'src/decorators';
import { MetadataKey } from 'src/enum';
import { IConfigRepository } from 'src/interfaces/config.interface';
import { IEventRepository } from 'src/interfaces/event.interface';
import {
IEntityJob,
IJobRepository,
JobCounts,
JobItem,
JobName,
JobOf,
JobStatus,
QueueCleanType,
QueueName,
QueueStatus,
} from 'src/interfaces/job.interface';
import { ILoggerRepository } from 'src/interfaces/logger.interface';
import { getKeyByValue, getMethodNames, ImmichStartupError } from 'src/utils/misc';

export const JOBS_TO_QUEUE: Record<JobName, QueueName> = {
// misc
[JobName.ASSET_DELETION]: QueueName.BACKGROUND_TASK,
[JobName.ASSET_DELETION_CHECK]: QueueName.BACKGROUND_TASK,
[JobName.USER_DELETE_CHECK]: QueueName.BACKGROUND_TASK,
[JobName.USER_DELETION]: QueueName.BACKGROUND_TASK,
[JobName.DELETE_FILES]: QueueName.BACKGROUND_TASK,
[JobName.CLEAN_OLD_AUDIT_LOGS]: QueueName.BACKGROUND_TASK,
[JobName.CLEAN_OLD_SESSION_TOKENS]: QueueName.BACKGROUND_TASK,
[JobName.PERSON_CLEANUP]: QueueName.BACKGROUND_TASK,
[JobName.USER_SYNC_USAGE]: QueueName.BACKGROUND_TASK,

// backups
[JobName.BACKUP_DATABASE]: QueueName.BACKUP_DATABASE,

// conversion
[JobName.QUEUE_VIDEO_CONVERSION]: QueueName.VIDEO_CONVERSION,
[JobName.VIDEO_CONVERSION]: QueueName.VIDEO_CONVERSION,

// thumbnails
[JobName.QUEUE_GENERATE_THUMBNAILS]: QueueName.THUMBNAIL_GENERATION,
[JobName.GENERATE_THUMBNAILS]: QueueName.THUMBNAIL_GENERATION,
[JobName.GENERATE_PERSON_THUMBNAIL]: QueueName.THUMBNAIL_GENERATION,

// tags
[JobName.TAG_CLEANUP]: QueueName.BACKGROUND_TASK,

// metadata
[JobName.QUEUE_METADATA_EXTRACTION]: QueueName.METADATA_EXTRACTION,
[JobName.METADATA_EXTRACTION]: QueueName.METADATA_EXTRACTION,
[JobName.LINK_LIVE_PHOTOS]: QueueName.METADATA_EXTRACTION,

// storage template
[JobName.STORAGE_TEMPLATE_MIGRATION]: QueueName.STORAGE_TEMPLATE_MIGRATION,
[JobName.STORAGE_TEMPLATE_MIGRATION_SINGLE]: QueueName.STORAGE_TEMPLATE_MIGRATION,

// migration
[JobName.QUEUE_MIGRATION]: QueueName.MIGRATION,
[JobName.MIGRATE_ASSET]: QueueName.MIGRATION,
[JobName.MIGRATE_PERSON]: QueueName.MIGRATION,

// facial recognition
[JobName.QUEUE_FACE_DETECTION]: QueueName.FACE_DETECTION,
[JobName.FACE_DETECTION]: QueueName.FACE_DETECTION,
[JobName.QUEUE_FACIAL_RECOGNITION]: QueueName.FACIAL_RECOGNITION,
[JobName.FACIAL_RECOGNITION]: QueueName.FACIAL_RECOGNITION,

// smart search
[JobName.QUEUE_SMART_SEARCH]: QueueName.SMART_SEARCH,
[JobName.SMART_SEARCH]: QueueName.SMART_SEARCH,

// duplicate detection
[JobName.QUEUE_DUPLICATE_DETECTION]: QueueName.DUPLICATE_DETECTION,
[JobName.DUPLICATE_DETECTION]: QueueName.DUPLICATE_DETECTION,

// XMP sidecars
[JobName.QUEUE_SIDECAR]: QueueName.SIDECAR,
[JobName.SIDECAR_DISCOVERY]: QueueName.SIDECAR,
[JobName.SIDECAR_SYNC]: QueueName.SIDECAR,
[JobName.SIDECAR_WRITE]: QueueName.SIDECAR,

// Library management
[JobName.LIBRARY_SYNC_FILE]: QueueName.LIBRARY,
[JobName.LIBRARY_QUEUE_SYNC_FILES]: QueueName.LIBRARY,
[JobName.LIBRARY_QUEUE_SYNC_ASSETS]: QueueName.LIBRARY,
[JobName.LIBRARY_DELETE]: QueueName.LIBRARY,
[JobName.LIBRARY_SYNC_ASSET]: QueueName.LIBRARY,
[JobName.LIBRARY_QUEUE_SYNC_ALL]: QueueName.LIBRARY,
[JobName.LIBRARY_QUEUE_CLEANUP]: QueueName.LIBRARY,

// Notification
[JobName.SEND_EMAIL]: QueueName.NOTIFICATION,
[JobName.NOTIFY_ALBUM_INVITE]: QueueName.NOTIFICATION,
[JobName.NOTIFY_ALBUM_UPDATE]: QueueName.NOTIFICATION,
[JobName.NOTIFY_SIGNUP]: QueueName.NOTIFICATION,

// Version check
[JobName.VERSION_CHECK]: QueueName.BACKGROUND_TASK,

// Trash
[JobName.QUEUE_TRASH_EMPTY]: QueueName.BACKGROUND_TASK,
type JobMapItem = {
jobName: JobName;
queueName: QueueName;
handler: (job: JobOf<any>) => Promise<JobStatus>;
label: string;
};

@Injectable()
export class JobRepository implements IJobRepository {
private workers: Partial<Record<QueueName, Worker>> = {};
private handlers: Partial<Record<JobName, JobMapItem>> = {};

constructor(
private moduleReference: ModuleRef,
private schedulerReqistry: SchedulerRegistry,
private moduleRef: ModuleRef,
private schedulerRegistry: SchedulerRegistry,
@Inject(IConfigRepository) private configRepository: IConfigRepository,
@Inject(IEventRepository) private eventRepository: IEventRepository,
@Inject(ILoggerRepository) private logger: ILoggerRepository,
) {
this.logger.setContext(JobRepository.name);
}

addHandler(queueName: QueueName, concurrency: number, handler: (item: JobItem) => Promise<void>) {
setup({ services }: { services: ClassConstructor<unknown>[] }) {
const reflector = this.moduleRef.get(Reflector, { strict: false });

// discovery
for (const Service of services) {
const instance = this.moduleRef.get<any>(Service);
for (const methodName of getMethodNames(instance)) {
const handler = instance[methodName];
const config = reflector.get<JobConfig>(MetadataKey.JOB_CONFIG, handler);
if (!config) {
continue;
}

const { name: jobName, queue: queueName } = config;
const label = `${Service.name}.${handler.name}`;

// one handler per job
if (this.handlers[jobName]) {
const jobKey = getKeyByValue(JobName, jobName);
const errorMessage = `Failed to add job handler for ${label}`;
this.logger.error(
`${errorMessage}. JobName.${jobKey} is already handled by ${this.handlers[jobName].label}.`,
);
throw new ImmichStartupError(errorMessage);
}

this.handlers[jobName] = {
label,
jobName,
queueName,
handler: handler.bind(instance),
};

this.logger.verbose(`Added job handler: ${jobName} => ${label}`);
}
}

// no missing handlers
for (const [jobKey, jobName] of Object.entries(JobName)) {
const item = this.handlers[jobName];
if (!item) {
const errorMessage = `Failed to find job handler for Job.${jobKey} ("${jobName}")`;
this.logger.error(
`${errorMessage}. Make sure to add the @OnJob({ name: JobName.${jobKey}, queue: QueueName.XYZ }) decorator for the new job.`,
);
throw new ImmichStartupError(errorMessage);
}
}
}

startWorkers() {
const { bull } = this.configRepository.getEnv();
const workerHandler: Processor = async (job: Job) => handler(job as JobItem);
const workerOptions: WorkerOptions = { ...bull.config, concurrency };
this.workers[queueName] = new Worker(queueName, workerHandler, workerOptions);
for (const queueName of Object.values(QueueName)) {
this.logger.debug(`Starting worker for queue: ${queueName}`);
this.workers[queueName] = new Worker(
queueName,
(job) => this.eventRepository.emit('job.start', queueName, job as JobItem),
{ ...bull.config, concurrency: 1 },
);
}
}

async run({ name, data }: JobItem) {
const item = this.handlers[name as JobName];
if (!item) {
this.logger.warn(`Skipping unknown job: "${name}"`);
return JobStatus.SKIPPED;
}

return item.handler(data);
}

addCronJob(name: string, expression: string, onTick: () => void, start = true): void {
Expand All @@ -141,11 +139,11 @@ export class JobRepository implements IJobRepository {
true,
);

this.schedulerReqistry.addCronJob(name, job);
this.schedulerRegistry.addCronJob(name, job);
}

updateCronJob(name: string, expression?: string, start?: boolean): void {
const job = this.schedulerReqistry.getCronJob(name);
const job = this.schedulerRegistry.getCronJob(name);
if (expression) {
job.setTime(new CronTime(expression));
}
Expand Down Expand Up @@ -204,6 +202,10 @@ export class JobRepository implements IJobRepository {
) as unknown as Promise<JobCounts>;
}

private getQueueName(name: JobName) {
return (this.handlers[name] as JobMapItem).queueName;
}

async queueAll(items: JobItem[]): Promise<void> {
if (items.length === 0) {
return;
Expand All @@ -212,7 +214,7 @@ export class JobRepository implements IJobRepository {
const promises = [];
const itemsByQueue = {} as Record<string, (JobItem & { data: any; options: JobsOptions | undefined })[]>;
for (const item of items) {
const queueName = JOBS_TO_QUEUE[item.name];
const queueName = this.getQueueName(item.name);
const job = {
name: item.name,
data: item.data || {},
Expand Down Expand Up @@ -273,11 +275,11 @@ export class JobRepository implements IJobRepository {
}

private getQueue(queue: QueueName): Queue {
return this.moduleReference.get<Queue>(getQueueToken(queue), { strict: false });
return this.moduleRef.get<Queue>(getQueueToken(queue), { strict: false });
}

public async removeJob(jobId: string, name: JobName): Promise<IEntityJob | undefined> {
const existingJob = await this.getQueue(JOBS_TO_QUEUE[name]).getJob(jobId);
const existingJob = await this.getQueue(this.getQueueName(name)).getJob(jobId);
if (!existingJob) {
return;
}
Expand Down
Loading
Loading