diff --git a/packages/auth/src/app.ts b/packages/auth/src/app.ts index a9d96984b6..caa74215ec 100644 --- a/packages/auth/src/app.ts +++ b/packages/auth/src/app.ts @@ -1,6 +1,5 @@ import { join } from 'path' import http, { Server } from 'http' -import { EventEmitter } from 'events' import { IocContract } from '@adonisjs/fold' import { Knex } from 'knex' @@ -52,7 +51,6 @@ import { InteractionRoutes } from './interaction/routes' export interface AppContextData extends DefaultContext { logger: Logger - closeEmitter: EventEmitter container: AppContainer // Set by @koa/router params: { [key: string]: string } @@ -88,7 +86,6 @@ export interface DatabaseCleanupRule { export interface AppServices { logger: Promise knex: Promise - closeEmitter: Promise config: Promise clientService: Promise grantService: Promise @@ -105,8 +102,6 @@ export class App { private authServer!: Server private introspectionServer!: Server private adminServer!: Server - public apolloServer!: ApolloServer - private closeEmitter!: EventEmitter private logger!: Logger private config!: IAppConfig private databaseCleanupRules!: { @@ -124,7 +119,6 @@ export class App { */ public async boot(): Promise { this.config = await this.container.use('config') - this.closeEmitter = await this.container.use('closeEmitter') this.logger = await this.container.use('logger') this.databaseCleanupRules = { @@ -158,12 +152,12 @@ export class App { }) // Setup Apollo - this.apolloServer = new ApolloServer({ + const apolloServer = new ApolloServer({ schema: schemaWithResolvers, plugins: [ApolloServerPluginDrainHttpServer({ httpServer })] }) - await this.apolloServer.start() + await apolloServer.start() koa.use(bodyParser()) @@ -184,7 +178,7 @@ export class App { ) koa.use( - koaMiddleware(this.apolloServer, { + koaMiddleware(apolloServer, { context: async (): Promise => { return { container: this.container, @@ -374,7 +368,6 @@ export class App { const koa = new Koa() koa.context.container = this.container - koa.context.closeEmitter = await this.container.use('closeEmitter') koa.context.logger = await this.container.use('logger') koa.use( @@ -400,29 +393,28 @@ export class App { } public async shutdown(): Promise { - return new Promise((resolve): void => { - this.isShuttingDown = true - this.closeEmitter.emit('shutdown') - - if (this.adminServer) { - this.adminServer.close((): void => { - resolve() - }) - } + this.isShuttingDown = true - if (this.authServer) { - this.authServer.close((): void => { - resolve() - }) - } + if (this.authServer) { + await this.stopServer(this.authServer) + } + if (this.adminServer) { + await this.stopServer(this.adminServer) + } + if (this.introspectionServer) { + await this.stopServer(this.introspectionServer) + } + } - if (this.introspectionServer) { - this.introspectionServer.close((): void => { - resolve() - }) - } + private async stopServer(server: Server): Promise { + return new Promise((resolve, reject) => { + server.close((err) => { + if (err) { + reject(err) + } - resolve() + resolve() + }) }) } diff --git a/packages/auth/src/index.ts b/packages/auth/src/index.ts index 0693c13c92..8e69503df8 100644 --- a/packages/auth/src/index.ts +++ b/packages/auth/src/index.ts @@ -1,5 +1,4 @@ import path from 'path' -import { EventEmitter } from 'events' import createLogger from 'pino' import { knex } from 'knex' import { Model } from 'objection' @@ -73,8 +72,6 @@ export function initIocContainer( return db }) - container.singleton('closeEmitter', async () => new EventEmitter()) - container.singleton('openPaymentsClient', async (deps) => { const logger = await deps.use('logger') return createOpenPaymentsClient({ logger }) @@ -228,6 +225,14 @@ export const start = async ( logger.info('received SIGTERM attempting graceful shutdown') try { + if (shuttingDown) { + logger.warn( + 'received second SIGTERM during graceful shutdown, exiting forcefully.' + ) + process.exit(1) + } + + shuttingDown = true // Graceful shutdown await gracefulShutdown(container, app) logger.info('completed graceful shutdown.') @@ -254,11 +259,14 @@ export const start = async ( const config = await container.use('config') await app.boot() + await app.startAdminServer(config.adminPort) - await app.startAuthServer(config.authPort) - await app.startIntrospectionServer(config.introspectionPort) logger.info(`Admin listening on ${app.getAdminPort()}`) + + await app.startAuthServer(config.authPort) logger.info(`Auth server listening on ${app.getAuthPort()}`) + + await app.startIntrospectionServer(config.introspectionPort) logger.info(`Introspection server listening on ${app.getIntrospectionPort()}`) } diff --git a/packages/auth/src/signature/middleware.test.ts b/packages/auth/src/signature/middleware.test.ts index 68de6b929c..471314cd47 100644 --- a/packages/auth/src/signature/middleware.test.ts +++ b/packages/auth/src/signature/middleware.test.ts @@ -50,7 +50,7 @@ describe('Signature Service', (): void => { afterAll(async (): Promise => { nock.restore() - appContainer.shutdown() + await appContainer.shutdown() }) describe('Signature middleware', (): void => { diff --git a/packages/auth/src/tests/context.ts b/packages/auth/src/tests/context.ts index 365abad245..c22616b53e 100644 --- a/packages/auth/src/tests/context.ts +++ b/packages/auth/src/tests/context.ts @@ -1,5 +1,4 @@ import crypto from 'crypto' -import EventEmitter from 'events' import * as httpMocks from 'node-mocks-http' import Koa from 'koa' import session from 'koa-session' @@ -36,7 +35,6 @@ export function createContext( ctx.params = params ctx.query = reqOpts.query || {} ctx.session = { ...req.session } - ctx.closeEmitter = new EventEmitter() ctx.container = container return ctx as T } diff --git a/packages/backend/src/app.ts b/packages/backend/src/app.ts index 4a001d9190..c47bada6ec 100644 --- a/packages/backend/src/app.ts +++ b/packages/backend/src/app.ts @@ -1,7 +1,7 @@ import { join } from 'path' import http, { Server } from 'http' -import { EventEmitter } from 'events' import { ParsedUrlQuery } from 'querystring' +import { Client as TigerbeetleClient } from 'tigerbeetle-node' import { IocContract } from '@adonisjs/fold' import { Knex } from 'knex' @@ -74,10 +74,10 @@ import { CombinedPaymentService } from './open_payments/payment/combined/service import { FeeService } from './fee/service' import { AutoPeeringService } from './auto-peering/service' import { AutoPeeringRoutes } from './auto-peering/routes' +import { Rafiki as ConnectorApp } from './connector/core' export interface AppContextData { logger: Logger - closeEmitter: EventEmitter container: AppContainer // Set by @koa/router. params: { [key: string]: string } @@ -177,7 +177,6 @@ const PAYMENT_POINTER_PATH = '/:paymentPointerPath+' export interface AppServices { logger: Promise knex: Promise - closeEmitter: Promise config: Promise httpTokenService: Promise assetService: Promise @@ -207,21 +206,21 @@ export interface AppServices { feeService: Promise autoPeeringService: Promise autoPeeringRoutes: Promise + connectorApp: Promise + tigerbeetle: Promise } export type AppContainer = IocContract export class App { private openPaymentsServer!: Server + private ilpConnectorService!: Server private adminServer!: Server private autoPeeringServer!: Server public apolloServer!: ApolloServer - public closeEmitter!: EventEmitter public isShuttingDown = false private logger!: Logger private config!: IAppConfig - private outgoingPaymentTimer!: NodeJS.Timer - private deactivateInvoiceTimer!: NodeJS.Timer public constructor(private container: IocContract) {} @@ -233,7 +232,6 @@ export class App { */ public async boot(): Promise { this.config = await this.container.use('config') - this.closeEmitter = await this.container.use('closeEmitter') this.logger = await this.container.use('logger') // Workers are in the way during tests. @@ -253,7 +251,7 @@ export class App { } } - public async startAdminServer(port: number | string): Promise { + public async startAdminServer(port: number): Promise { const koa = await this.createKoaServer() const httpServer = http.createServer(koa.callback()) @@ -321,7 +319,7 @@ export class App { this.adminServer = httpServer.listen(port) } - public async startOpenPaymentsServer(port: number | string): Promise { + public async startOpenPaymentsServer(port: number): Promise { const koa = await this.createKoaServer() const router = new Router() @@ -482,7 +480,7 @@ export class App { this.openPaymentsServer = koa.listen(port) } - public async startAutoPeeringServer(port: number | string): Promise { + public async startAutoPeeringServer(port: number): Promise { const koa = await this.createKoaServer() const autoPeeringRoutes = await this.container.use('autoPeeringRoutes') @@ -497,28 +495,37 @@ export class App { this.autoPeeringServer = koa.listen(port) } + public async startIlpConnectorServer(port: number): Promise { + const ilpConnectorService = await this.container.use('connectorApp') + this.ilpConnectorService = ilpConnectorService.listenPublic(port) + } + public async shutdown(): Promise { - return new Promise((resolve): void => { - this.isShuttingDown = true - this.closeEmitter.emit('shutdown') + this.isShuttingDown = true - if (this.openPaymentsServer) { - this.openPaymentsServer.close((): void => { - resolve() - }) - } + if (this.openPaymentsServer) { + await this.stopServer(this.openPaymentsServer) + } + if (this.adminServer) { + await this.stopServer(this.adminServer) + } + if (this.ilpConnectorService) { + await this.stopServer(this.ilpConnectorService) + } + if (this.autoPeeringServer) { + await this.stopServer(this.autoPeeringServer) + } + } - if (this.adminServer) { - this.adminServer.close((): void => { - resolve() - }) - } + private async stopServer(server: Server): Promise { + return new Promise((resolve, reject) => { + server.close((err) => { + if (err) { + reject(err) + } - if (this.autoPeeringServer) { - this.autoPeeringServer.close((): void => { - resolve() - }) - } + resolve() + }) }) } @@ -583,7 +590,6 @@ export class App { const koa = new Koa() koa.context.container = this.container - koa.context.closeEmitter = await this.container.use('closeEmitter') koa.context.logger = await this.container.use('logger') koa.use( diff --git a/packages/backend/src/index.ts b/packages/backend/src/index.ts index 4ab7b19188..f92a49adaf 100644 --- a/packages/backend/src/index.ts +++ b/packages/backend/src/index.ts @@ -1,5 +1,3 @@ -import { EventEmitter } from 'events' -import { Server } from 'http' import path from 'path' import createLogger from 'pino' import { knex } from 'knex' @@ -55,7 +53,6 @@ BigInt.prototype.toJSON = function () { const container = initIocContainer(Config) const app = new App(container) -let connectorServer: Server export function initIocContainer( config: typeof Config @@ -106,7 +103,6 @@ export function initIocContainer( ) return db }) - container.singleton('closeEmitter', async () => new EventEmitter()) container.singleton('redis', async (deps): Promise => { const config = await deps.use('config') return new Redis(config.redisUrl, { @@ -445,19 +441,12 @@ export const gracefulShutdown = async ( const logger = await container.use('logger') logger.info('shutting down.') await app.shutdown() - if (connectorServer) { - await new Promise((resolve, reject) => { - connectorServer.close((err?: Error) => - err ? reject(err) : resolve(null) - ) - }) - } const knex = await container.use('knex') await knex.destroy() const tigerbeetle = await container.use('tigerbeetle') - await tigerbeetle.destroy() + tigerbeetle.destroy() const redis = await container.use('redis') - await redis.disconnect() + redis.disconnect() } export const start = async ( @@ -493,6 +482,15 @@ export const start = async ( logger.info('received SIGTERM attempting graceful shutdown') try { + if (shuttingDown) { + logger.warn( + 'received second SIGTERM during graceful shutdown, exiting forcefully.' + ) + process.exit(1) + } + + shuttingDown = true + // Graceful shutdown await gracefulShutdown(container, app) logger.info('completed graceful shutdown.') @@ -518,12 +516,12 @@ export const start = async ( const config = await container.use('config') await app.boot() await app.startAdminServer(config.adminPort) - await app.startOpenPaymentsServer(config.openPaymentsPort) logger.info(`Admin listening on ${app.getAdminPort()}`) + + await app.startOpenPaymentsServer(config.openPaymentsPort) logger.info(`Open Payments listening on ${app.getOpenPaymentsPort()}`) - const connectorApp = await container.use('connectorApp') - connectorServer = connectorApp.listenPublic(config.connectorPort) + await app.startIlpConnectorServer(config.connectorPort) logger.info(`Connector listening on ${config.connectorPort}`) logger.info('🐒 has 🚀. Get ready for 🍌🍌🍌🍌🍌') diff --git a/packages/backend/src/shared/ilp_plugin.ts b/packages/backend/src/shared/ilp_plugin.ts index 94d4b332a5..c3964f740b 100644 --- a/packages/backend/src/shared/ilp_plugin.ts +++ b/packages/backend/src/shared/ilp_plugin.ts @@ -1,4 +1,4 @@ -import { LiquidityAccount } from '../accounting/service' +import { ConnectorAccount } from '../connector/core' // Maybe @interledger/pay should export this interface. export interface IlpPlugin { @@ -37,6 +37,6 @@ export class OutgoingIlpPlugin implements IlpPlugin { } export interface IlpPluginOptions { - sourceAccount: LiquidityAccount + sourceAccount: ConnectorAccount unfulfillable?: boolean } diff --git a/packages/backend/src/tests/context.ts b/packages/backend/src/tests/context.ts index 464702c8d7..368cb75287 100644 --- a/packages/backend/src/tests/context.ts +++ b/packages/backend/src/tests/context.ts @@ -1,4 +1,3 @@ -import EventEmitter from 'events' import * as httpMocks from 'node-mocks-http' import Koa from 'koa' import { AppContext, AppContextData, AppRequest } from '../app' @@ -18,6 +17,5 @@ export function createContext( if (reqOpts.body !== undefined) { ctx.request.body = reqOpts.body } - ctx.closeEmitter = new EventEmitter() return ctx as T }