Skip to content

Commit

Permalink
fix(task-executor): propagate issues with invoice and debit note filt…
Browse files Browse the repository at this point in the history
…er to user

Right now the payment service has silently tolerated user triggered
errors from the invoice or debit note filters - the buggy code was still
operating producing bad market outcomes - leading to the providers
terminating agreements due to not accepted debit notes. The users were
left clueless.

What the PR changes is: given there's any issue with
processing an invoice or debit note, then it's not only logged in the
SDK on error level, but also an `error` event is raised from the
`PaymentService`. This error is then listened to by the `TaskExecutor`,
which cancels itself in response - producing the information to the user
and stopping the buggy code to operate.

One side effect here is that
_any_ issue with the processing will lead to the shutdown of the
TaskExecutor.

BREAKING CHANGE: The TaskExecutor will now be cancelled when an issue
will be raised with processing the payments - including errors in the
invoice and debit note filters.
  • Loading branch information
grisha87 committed Jan 31, 2024
1 parent dd6f412 commit 0e2136c
Show file tree
Hide file tree
Showing 4 changed files with 131 additions and 31 deletions.
3 changes: 3 additions & 0 deletions src/executor/executor.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ jest.mock("../payment/service", () => {
createAllocation: jest.fn(),
run: serviceRunSpy,
end: jest.fn(),
events: {
on: jest.fn(),
},
};
}),
};
Expand Down
54 changes: 38 additions & 16 deletions src/executor/executor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,12 @@ export class TaskExecutor {
this.statsService.run(),
this.storageProvider?.init(),
]).catch((e) => this.handleCriticalError(e));

// Start listening to issues reported by the services
this.paymentService.events.on("error", (e) => this.handleCriticalError(e));

this.taskService.run().catch((e) => this.handleCriticalError(e));

if (runtimeContextChecker.isNode) this.installSignalHandlers();
this.options.eventTarget.dispatchEvent(new Events.ComputationStarted());
this.logger.info(`Task Executor has started`, {
Expand Down Expand Up @@ -405,9 +410,29 @@ export class TaskExecutor {
}
}

private handleCriticalError(e: Error) {
this.options.eventTarget?.dispatchEvent(new Events.ComputationFailed({ reason: e.toString() }));
this.logger.error("Critical error", e);
public async cancel(reason: string) {
try {
if (this.isCanceled) {
this.logger.warn("The executor is already cancelled, ignoring second request");
return;
}

this.isCanceled = true;

if (runtimeContextChecker.isNode) {
this.removeSignalHandlers();
}

const message = `Executor has interrupted by the user. Reason: ${reason}.`;

this.logger.info(`${message}. Stopping all tasks...`, {
tasksInProgress: this.taskQueue.size,
});

await this.shutdown();
} catch (error) {
this.logger.error(`Error while cancelling the executor`, error);
}
}

private installSignalHandlers() {
Expand All @@ -424,19 +449,16 @@ export class TaskExecutor {
});
}

public async cancel(reason?: string) {
try {
if (this.isCanceled) return;
if (runtimeContextChecker.isNode) this.removeSignalHandlers();
const message = `Executor has interrupted by the user. Reason: ${reason}.`;
this.logger.warn(`${message}. Stopping all tasks...`, {
tasksInProgress: this.taskQueue.size,
});
this.isCanceled = true;
await this.shutdown();
} catch (error) {
this.logger.error(`Error while cancelling the executor`, error);
}
private handleCriticalError(err: Error) {
this.options.eventTarget?.dispatchEvent(new Events.ComputationFailed({ reason: err.toString() }));
const message =
"TaskExecutor faced a critical error and will now cancel work, terminate agreements and request settling payments";
this.logger.error(message, err);
// Make sure users know in case they didn't configure logging
console.error(message, err);
this.cancel(`Cancelling due to critical error ${err}`).catch((cancelErr) =>
this.logger.error("Issue when cancelling Task Executor", { err: cancelErr }),
);
}

private printStats() {
Expand Down
37 changes: 27 additions & 10 deletions src/payment/service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,17 @@ import { DebitNoteEvent, InvoiceEvent, PAYMENT_EVENT_TYPE, Payments } from "./pa
import { Agreement } from "../agreement";
import { AgreementPaymentProcess } from "./agreement_payment_process";
import { GolemPaymentError, PaymentErrorCode } from "./error";
import { EventEmitter } from "eventemitter3";

interface PaymentServiceEvents {
/**
* Triggered when the service encounters an issue in an "asynchronous sub-process" (like accepting payments)
* that should be notified to the caller
*
* @param err The error raised during an asynchronous process executed by the PaymentService
*/
error: (err: Error) => void;
}

export interface PaymentOptions extends BasePaymentOptions {
/** Interval for checking new invoices */
Expand Down Expand Up @@ -39,6 +50,8 @@ export class PaymentService {
private processes: Map<string, AgreementPaymentProcess> = new Map();
private payments?: Payments;

public events = new EventEmitter<PaymentServiceEvents>();

constructor(
private readonly yagnaApi: YagnaApi,
options?: PaymentOptions,
Expand Down Expand Up @@ -194,19 +207,23 @@ export class PaymentService {

private async subscribePayments(event: Event) {
if (event instanceof InvoiceEvent) {
this.processInvoice(event.invoice)
.then(() => this.logger.debug(`Invoice event processed`, { agreementId: event.invoice.agreementId }))
.catch((err) =>
this.logger.error(`Failed to process InvoiceEvent`, { agreementId: event.invoice.agreementId, err }),
);
try {
await this.processInvoice(event.invoice);
this.logger.debug(`Invoice event processed`, { agreementId: event.invoice.agreementId });
} catch (err) {
this.logger.error(`Failed to process InvoiceEvent`, { agreementId: event.invoice.agreementId, err });
this.events.emit("error", err);
}
}

if (event instanceof DebitNoteEvent) {
this.processDebitNote(event.debitNote)
.then(() => this.logger.debug(`DebitNote event processed`, { agreementId: event.debitNote.agreementId }))
.catch((err) =>
this.logger.error(`Failed to process DebitNoteEvent`, { agreementId: event.debitNote.agreementId, err }),
);
try {
await this.processDebitNote(event.debitNote);
this.logger.debug(`DebitNote event processed`, { agreementId: event.debitNote.agreementId });
} catch (err) {
this.logger.error(`Failed to process DebitNoteEvent`, { agreementId: event.debitNote.agreementId, err });
this.events.emit("error", err);
}
}
}

Expand Down
68 changes: 63 additions & 5 deletions tests/unit/payment_service.test.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { clear, setExpectedDebitNotes, setExpectedEvents, setExpectedInvoices } from "../mock/rest/payment";
import { LoggerMock, YagnaMock } from "../mock";
import { PaymentService, Allocation, PaymentFilters, GolemPaymentError, PaymentErrorCode } from "../../src/payment";
import { Allocation, GolemPaymentError, PaymentErrorCode, PaymentFilters, PaymentService } from "../../src/payment";
import { agreement } from "../mock/entities/agreement";
import { debitNotes, debitNotesEvents, invoiceEvents, invoices } from "../mock/fixtures";
import { anything, reset, spy, when } from "@johanblumenberg/ts-mockito";
Expand Down Expand Up @@ -108,7 +108,7 @@ describe("Payment Service", () => {
setExpectedEvents(debitNotesEvents);
setExpectedDebitNotes(debitNotes);
await paymentService.createAllocation();
await paymentService.acceptPayments(agreement);
paymentService.acceptPayments(agreement);
await paymentService.run();
await logger.expectToInclude(
`DebitNote rejected`,
Expand All @@ -129,7 +129,7 @@ describe("Payment Service", () => {
setExpectedDebitNotes(debitNotes);
setExpectedInvoices(invoices);
await paymentService.createAllocation();
await paymentService.acceptPayments(agreement);
paymentService.acceptPayments(agreement);
await paymentService.run();
await logger.expectToInclude(
`DebitNote rejected`,
Expand Down Expand Up @@ -170,7 +170,7 @@ describe("Payment Service", () => {
setExpectedEvents(debitNotesEvents);
setExpectedDebitNotes(debitNotes);
await paymentService.createAllocation();
await paymentService.acceptPayments(agreement);
paymentService.acceptPayments(agreement);
await paymentService.run();
await logger.expectToInclude(
`DebitNote rejected`,
Expand Down Expand Up @@ -213,7 +213,7 @@ describe("Payment Service", () => {
setExpectedEvents(debitNotesEvents);
setExpectedDebitNotes(debitNotes);
await paymentService.createAllocation();
await paymentService.acceptPayments(agreement);
paymentService.acceptPayments(agreement);
await paymentService.run();
await logger.expectToInclude(
`DebitNote accepted`,
Expand Down Expand Up @@ -249,6 +249,64 @@ describe("Payment Service", () => {
await paymentService.end();
});

describe("emitting 'error' event", () => {
it("should emit when there's an issue with processing the debit note", async () => {
// Given
const error = new Error("Broken debit note filter");

const paymentService = new PaymentService(yagnaApi, {
logger,
debitNotesFilter: () => {
throw error;
},
paymentTimeout: TEST_PAYMENT_TIMEOUT_MS,
});

setExpectedEvents(debitNotesEvents);
setExpectedDebitNotes(debitNotes);

const handler = jest.fn();
paymentService.events.once("error", handler);

// When
await paymentService.createAllocation();
paymentService.acceptPayments(agreement);
await paymentService.run();
await paymentService.end();

// Then
expect(handler).toHaveBeenCalledWith(error);
});

it("should emit an error event when there's an issue with processing the invoice", async () => {
// Given
const error = new Error("Broken invoice filter");

const paymentService = new PaymentService(yagnaApi, {
logger,
invoiceFilter: () => {
throw error;
},
paymentTimeout: TEST_PAYMENT_TIMEOUT_MS,
});

setExpectedEvents(invoiceEvents);
setExpectedInvoices(invoices);

const handler = jest.fn();
paymentService.events.once("error", handler);

// When
await paymentService.createAllocation();
paymentService.acceptPayments(agreement);
await paymentService.run();
await paymentService.end();

// Then
expect(handler).toHaveBeenCalledWith(error);
});
});

it("should throw GolemPaymentError if allocation is not created", async () => {
const paymentService = new PaymentService(yagnaApi, {
logger,
Expand Down

0 comments on commit 0e2136c

Please sign in to comment.