From 0e2136ce63133e1a13c558380665124ef4a4b705 Mon Sep 17 00:00:00 2001 From: Grzegorz Godlewski Date: Thu, 25 Jan 2024 21:56:34 +0100 Subject: [PATCH] fix(task-executor): propagate issues with invoice and debit note filter to user Right now the payment service has silently tolerated user triggered errors from the invoice or debit note filters - the buggy code was still operating producing bad market outcomes - leading to the providers terminating agreements due to not accepted debit notes. The users were left clueless. What the PR changes is: given there's any issue with processing an invoice or debit note, then it's not only logged in the SDK on error level, but also an `error` event is raised from the `PaymentService`. This error is then listened to by the `TaskExecutor`, which cancels itself in response - producing the information to the user and stopping the buggy code to operate. One side effect here is that _any_ issue with the processing will lead to the shutdown of the TaskExecutor. BREAKING CHANGE: The TaskExecutor will now be cancelled when an issue will be raised with processing the payments - including errors in the invoice and debit note filters. --- src/executor/executor.spec.ts | 3 ++ src/executor/executor.ts | 54 +++++++++++++++++------- src/payment/service.ts | 37 +++++++++++----- tests/unit/payment_service.test.ts | 68 +++++++++++++++++++++++++++--- 4 files changed, 131 insertions(+), 31 deletions(-) diff --git a/src/executor/executor.spec.ts b/src/executor/executor.spec.ts index 42a876435..e0ada58d1 100644 --- a/src/executor/executor.spec.ts +++ b/src/executor/executor.spec.ts @@ -30,6 +30,9 @@ jest.mock("../payment/service", () => { createAllocation: jest.fn(), run: serviceRunSpy, end: jest.fn(), + events: { + on: jest.fn(), + }, }; }), }; diff --git a/src/executor/executor.ts b/src/executor/executor.ts index b15091537..84e5700e8 100644 --- a/src/executor/executor.ts +++ b/src/executor/executor.ts @@ -258,7 +258,12 @@ export class TaskExecutor { this.statsService.run(), this.storageProvider?.init(), ]).catch((e) => this.handleCriticalError(e)); + + // Start listening to issues reported by the services + this.paymentService.events.on("error", (e) => this.handleCriticalError(e)); + this.taskService.run().catch((e) => this.handleCriticalError(e)); + if (runtimeContextChecker.isNode) this.installSignalHandlers(); this.options.eventTarget.dispatchEvent(new Events.ComputationStarted()); this.logger.info(`Task Executor has started`, { @@ -405,9 +410,29 @@ export class TaskExecutor { } } - private handleCriticalError(e: Error) { - this.options.eventTarget?.dispatchEvent(new Events.ComputationFailed({ reason: e.toString() })); - this.logger.error("Critical error", e); + public async cancel(reason: string) { + try { + if (this.isCanceled) { + this.logger.warn("The executor is already cancelled, ignoring second request"); + return; + } + + this.isCanceled = true; + + if (runtimeContextChecker.isNode) { + this.removeSignalHandlers(); + } + + const message = `Executor has interrupted by the user. Reason: ${reason}.`; + + this.logger.info(`${message}. Stopping all tasks...`, { + tasksInProgress: this.taskQueue.size, + }); + + await this.shutdown(); + } catch (error) { + this.logger.error(`Error while cancelling the executor`, error); + } } private installSignalHandlers() { @@ -424,19 +449,16 @@ export class TaskExecutor { }); } - public async cancel(reason?: string) { - try { - if (this.isCanceled) return; - if (runtimeContextChecker.isNode) this.removeSignalHandlers(); - const message = `Executor has interrupted by the user. Reason: ${reason}.`; - this.logger.warn(`${message}. Stopping all tasks...`, { - tasksInProgress: this.taskQueue.size, - }); - this.isCanceled = true; - await this.shutdown(); - } catch (error) { - this.logger.error(`Error while cancelling the executor`, error); - } + private handleCriticalError(err: Error) { + this.options.eventTarget?.dispatchEvent(new Events.ComputationFailed({ reason: err.toString() })); + const message = + "TaskExecutor faced a critical error and will now cancel work, terminate agreements and request settling payments"; + this.logger.error(message, err); + // Make sure users know in case they didn't configure logging + console.error(message, err); + this.cancel(`Cancelling due to critical error ${err}`).catch((cancelErr) => + this.logger.error("Issue when cancelling Task Executor", { err: cancelErr }), + ); } private printStats() { diff --git a/src/payment/service.ts b/src/payment/service.ts index c1717aa95..7f76067b7 100644 --- a/src/payment/service.ts +++ b/src/payment/service.ts @@ -7,6 +7,17 @@ import { DebitNoteEvent, InvoiceEvent, PAYMENT_EVENT_TYPE, Payments } from "./pa import { Agreement } from "../agreement"; import { AgreementPaymentProcess } from "./agreement_payment_process"; import { GolemPaymentError, PaymentErrorCode } from "./error"; +import { EventEmitter } from "eventemitter3"; + +interface PaymentServiceEvents { + /** + * Triggered when the service encounters an issue in an "asynchronous sub-process" (like accepting payments) + * that should be notified to the caller + * + * @param err The error raised during an asynchronous process executed by the PaymentService + */ + error: (err: Error) => void; +} export interface PaymentOptions extends BasePaymentOptions { /** Interval for checking new invoices */ @@ -39,6 +50,8 @@ export class PaymentService { private processes: Map = new Map(); private payments?: Payments; + public events = new EventEmitter(); + constructor( private readonly yagnaApi: YagnaApi, options?: PaymentOptions, @@ -194,19 +207,23 @@ export class PaymentService { private async subscribePayments(event: Event) { if (event instanceof InvoiceEvent) { - this.processInvoice(event.invoice) - .then(() => this.logger.debug(`Invoice event processed`, { agreementId: event.invoice.agreementId })) - .catch((err) => - this.logger.error(`Failed to process InvoiceEvent`, { agreementId: event.invoice.agreementId, err }), - ); + try { + await this.processInvoice(event.invoice); + this.logger.debug(`Invoice event processed`, { agreementId: event.invoice.agreementId }); + } catch (err) { + this.logger.error(`Failed to process InvoiceEvent`, { agreementId: event.invoice.agreementId, err }); + this.events.emit("error", err); + } } if (event instanceof DebitNoteEvent) { - this.processDebitNote(event.debitNote) - .then(() => this.logger.debug(`DebitNote event processed`, { agreementId: event.debitNote.agreementId })) - .catch((err) => - this.logger.error(`Failed to process DebitNoteEvent`, { agreementId: event.debitNote.agreementId, err }), - ); + try { + await this.processDebitNote(event.debitNote); + this.logger.debug(`DebitNote event processed`, { agreementId: event.debitNote.agreementId }); + } catch (err) { + this.logger.error(`Failed to process DebitNoteEvent`, { agreementId: event.debitNote.agreementId, err }); + this.events.emit("error", err); + } } } diff --git a/tests/unit/payment_service.test.ts b/tests/unit/payment_service.test.ts index be23158d7..0b2ce380c 100644 --- a/tests/unit/payment_service.test.ts +++ b/tests/unit/payment_service.test.ts @@ -1,6 +1,6 @@ import { clear, setExpectedDebitNotes, setExpectedEvents, setExpectedInvoices } from "../mock/rest/payment"; import { LoggerMock, YagnaMock } from "../mock"; -import { PaymentService, Allocation, PaymentFilters, GolemPaymentError, PaymentErrorCode } from "../../src/payment"; +import { Allocation, GolemPaymentError, PaymentErrorCode, PaymentFilters, PaymentService } from "../../src/payment"; import { agreement } from "../mock/entities/agreement"; import { debitNotes, debitNotesEvents, invoiceEvents, invoices } from "../mock/fixtures"; import { anything, reset, spy, when } from "@johanblumenberg/ts-mockito"; @@ -108,7 +108,7 @@ describe("Payment Service", () => { setExpectedEvents(debitNotesEvents); setExpectedDebitNotes(debitNotes); await paymentService.createAllocation(); - await paymentService.acceptPayments(agreement); + paymentService.acceptPayments(agreement); await paymentService.run(); await logger.expectToInclude( `DebitNote rejected`, @@ -129,7 +129,7 @@ describe("Payment Service", () => { setExpectedDebitNotes(debitNotes); setExpectedInvoices(invoices); await paymentService.createAllocation(); - await paymentService.acceptPayments(agreement); + paymentService.acceptPayments(agreement); await paymentService.run(); await logger.expectToInclude( `DebitNote rejected`, @@ -170,7 +170,7 @@ describe("Payment Service", () => { setExpectedEvents(debitNotesEvents); setExpectedDebitNotes(debitNotes); await paymentService.createAllocation(); - await paymentService.acceptPayments(agreement); + paymentService.acceptPayments(agreement); await paymentService.run(); await logger.expectToInclude( `DebitNote rejected`, @@ -213,7 +213,7 @@ describe("Payment Service", () => { setExpectedEvents(debitNotesEvents); setExpectedDebitNotes(debitNotes); await paymentService.createAllocation(); - await paymentService.acceptPayments(agreement); + paymentService.acceptPayments(agreement); await paymentService.run(); await logger.expectToInclude( `DebitNote accepted`, @@ -249,6 +249,64 @@ describe("Payment Service", () => { await paymentService.end(); }); + describe("emitting 'error' event", () => { + it("should emit when there's an issue with processing the debit note", async () => { + // Given + const error = new Error("Broken debit note filter"); + + const paymentService = new PaymentService(yagnaApi, { + logger, + debitNotesFilter: () => { + throw error; + }, + paymentTimeout: TEST_PAYMENT_TIMEOUT_MS, + }); + + setExpectedEvents(debitNotesEvents); + setExpectedDebitNotes(debitNotes); + + const handler = jest.fn(); + paymentService.events.once("error", handler); + + // When + await paymentService.createAllocation(); + paymentService.acceptPayments(agreement); + await paymentService.run(); + await paymentService.end(); + + // Then + expect(handler).toHaveBeenCalledWith(error); + }); + + it("should emit an error event when there's an issue with processing the invoice", async () => { + // Given + const error = new Error("Broken invoice filter"); + + const paymentService = new PaymentService(yagnaApi, { + logger, + invoiceFilter: () => { + throw error; + }, + paymentTimeout: TEST_PAYMENT_TIMEOUT_MS, + }); + + setExpectedEvents(invoiceEvents); + setExpectedInvoices(invoices); + + const handler = jest.fn(); + paymentService.events.once("error", handler); + + // When + await paymentService.createAllocation(); + paymentService.acceptPayments(agreement); + await paymentService.run(); + await paymentService.end(); + + // Then + expect(handler).toHaveBeenCalledWith(error); + }); + }); + it("should throw GolemPaymentError if allocation is not created", async () => { const paymentService = new PaymentService(yagnaApi, { logger,