Skip to content

Commit

Permalink
feat: add task log (#1173)
Browse files Browse the repository at this point in the history
Co-authored-by: jinbing.jb <jinbing.jb@alibaba-inc.com>
  • Loading branch information
stone-jin and jinbing.jb authored Jul 30, 2021
1 parent 562236c commit 00ca5e8
Show file tree
Hide file tree
Showing 4 changed files with 101 additions and 12 deletions.
4 changes: 3 additions & 1 deletion packages/task/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,13 @@
"@midwayjs/decorator": "^2.11.5",
"@midwayjs/koa": "^2.11.6",
"@midwayjs/mock": "^2.11.6",
"@midwayjs/logger": "^2.11.3",
"@types/bull": "^3.15.0",
"@types/cron": "^1.7.2"
},
"dependencies": {
"bull": "^3.22.0",
"cron": "^1.8.2"
"cron": "^1.8.2",
"uuid": "^8.3.2"
}
}
95 changes: 85 additions & 10 deletions packages/task/src/configuration.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,25 @@ import {
} from '@midwayjs/decorator';
import * as Bull from 'bull';
import { CronJob } from 'cron';
import { v4 } from 'uuid';
import { ScheduleContextLogger } from './service/scheduleContextLogger';

function isAsync(fn) {
return fn[Symbol.toStringTag] === 'AsyncFunction';
}

function wrapAsync(fn) {
return async function (...args) {
if (isAsync(fn)) {
await fn.call(...args);
} else {
const result = fn.call(...args);
if (result && result.then) {
await result;
}
}
};
}

@Configuration({
namespace: 'task',
Expand All @@ -35,13 +54,24 @@ export class AutoConfiguration {

async onReady(
container: IMidwayContainer,
app: IMidwayApplication
_: IMidwayApplication
): Promise<void> {
this.createLogger();
await this.loadTask(container);
await this.loadLocalTask();
await this.loadQueue(container);
}

createLogger() {
this.app.createLogger('midway-task', {
fileLogName: 'midway-task.log',
errorLogName: 'midway-task-error.log',
printFormat: info => {
return `${info.timestamp} ${info.LEVEL} ${info.pid} ${info.label} ${info.message}`;
},
});
}

async onStop() {
this.queueList.map(queue => {
queue.close();
Expand All @@ -51,6 +81,17 @@ export class AutoConfiguration {
});
}

getContext(options: { type: string; id: any; trigger: string }) {
const ctx = this.app.createAnonymousContext({ logger: console });
ctx.logger = new ScheduleContextLogger(
ctx,
this.app.getLogger('midway-task')
);
ctx.requestContext.registerObject('logger', ctx.logger);
ctx.taskInfo = options;
return ctx;
}

async loadTask(container) {
const modules = listModule(MODULE_TASK_KEY);
const queueTaskMap = {};
Expand All @@ -62,9 +103,20 @@ export class AutoConfiguration {
this.taskConfig
);
queue.process(async job => {
const ctx = this.app.createAnonymousContext();
const service = await ctx.requestContext.getAsync(module);
rule.value.call(service, job.data);
const ctx = this.getContext({
type: 'Task',
id: job.id,
trigger: `${rule.name}:${rule.propertyKey}`,
});
const { logger } = ctx;
try {
logger.info('task start.');
const service = await ctx.requestContext.getAsync(module);
await wrapAsync(rule.value)(service, job.data);
} catch (e) {
logger.error(`${e.stack}`);
}
logger.info('task end.');
});
queueTaskMap[`${rule.name}:${rule.propertyKey}`] = queue;
const allJobs = await queue.getRepeatableJobs();
Expand Down Expand Up @@ -95,9 +147,21 @@ export class AutoConfiguration {
const job = new CronJob(
rule.options,
async () => {
const ctx = this.app.createAnonymousContext();
const service = await ctx.requestContext.getAsync(module);
rule.value.call(service);
const requestId = v4();
const ctx = this.getContext({
type: 'LocalTask',
id: requestId,
trigger: `${module.name}:${rule.propertyKey}`,
});
const { logger } = ctx;
try {
const service = await ctx.requestContext.getAsync(module);
logger.info('local task start.');
await wrapAsync(rule.value)(service);
} catch (e) {
logger.error(`${e.stack}`);
}
logger.info('local task end.');
},
null,
true,
Expand All @@ -118,9 +182,20 @@ export class AutoConfiguration {
const rule = getClassMetadata(MODULE_TASK_QUEUE_OPTIONS, module);
const queue = new Bull(`${rule.name}:execute`, config);
queue.process(async job => {
const ctx = this.app.createAnonymousContext();
const service = await ctx.requestContext.getAsync(module);
await service.execute.call(service, job.data, job);
const ctx = this.getContext({
type: 'Queue',
id: job.id,
trigger: `${module.name}`,
});
const { logger } = ctx;
try {
logger.info('queue process start.');
const service = await ctx.requestContext.getAsync(module);
await wrapAsync(service.execute)(service, job.data, job);
} catch (e) {
logger.error(`${e.stack}`);
}
logger.info('queue process end.');
});
queueMap[`${rule.name}:execute`] = queue;
this.queueList.push(queue);
Expand Down
8 changes: 8 additions & 0 deletions packages/task/src/service/scheduleContextLogger.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
import { MidwayContextLogger } from '@midwayjs/logger';

export class ScheduleContextLogger extends MidwayContextLogger<any> {
formatContextLabel() {
const { taskInfo } = this.ctx;
return `[${taskInfo.type}][${taskInfo.id}][${taskInfo.trigger}]`;
}
}
6 changes: 5 additions & 1 deletion packages/task/test/fixtures/base-app/src/task/queue.task.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { App, Provide, Queue } from "@midwayjs/decorator";
import { App, Inject, Provide, Queue } from "@midwayjs/decorator";
import { Application } from "@midwayjs/koa";

@Queue()
Expand All @@ -8,7 +8,11 @@ export class QueueTask{
@App()
app: Application;

@Inject()
logger;

async execute(params){
this.logger.info(`====>QueueTask execute`)
this.app.getApplicationContext().registerObject(`queueConfig`, JSON.stringify(params));
}
}

0 comments on commit 00ca5e8

Please sign in to comment.