diff --git a/src/event-stream/event-stream.service.ts b/src/event-stream/event-stream.service.ts index 85655b4..45a8cc1 100644 --- a/src/event-stream/event-stream.service.ts +++ b/src/event-stream/event-stream.service.ts @@ -315,6 +315,22 @@ export class EventStreamService { ); } + async deleteSubscriptionByName(ctx: Context, streamId: string, name: string) { + const existingSubscriptions = await this.getSubscriptions(ctx); + const sub = existingSubscriptions.find(s => s.name === name && s.stream === streamId); + if (!sub) { + this.logger.log(`No subscription found for ${name}`); + return false; + } + await lastValueFrom( + this.http.delete( + new URL(`/subscriptions/${sub.id}`, this.baseUrl).href, + this.requestOptions(ctx), + ), + ); + return true; + } + connect( url: string, topic: string, diff --git a/src/main.ts b/src/main.ts index 308b816..549da08 100644 --- a/src/main.ts +++ b/src/main.ts @@ -14,7 +14,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -import { NestApplicationOptions, ShutdownSignal, ValidationPipe } from '@nestjs/common'; +import { ShutdownSignal, ValidationPipe } from '@nestjs/common'; import { ConfigService } from '@nestjs/config'; import { NestFactory } from '@nestjs/core'; import { WsAdapter } from '@nestjs/platform-ws'; diff --git a/src/tokens/tokens.controller.ts b/src/tokens/tokens.controller.ts index 5b9da16..2daeeb2 100644 --- a/src/tokens/tokens.controller.ts +++ b/src/tokens/tokens.controller.ts @@ -28,6 +28,7 @@ import { TokenMint, TokenPool, TokenPoolActivate, + TokenPoolDeactivate, TokenPoolEvent, TokenTransfer, } from './tokens.interfaces'; @@ -70,13 +71,23 @@ export class TokensController { @Post('activatepool') @HttpCode(200) @ApiOperation({ - summary: 'Activate a token pool to begin receiving transfer events', + summary: 'Activate a token pool to begin receiving transfer and approval events', }) @ApiBody({ type: TokenPoolActivate }) activatePool(@RequestContext() ctx: Context, @Body() dto: TokenPoolActivate) { return this.service.activatePool(ctx, dto); } + @Post('deactivatepool') + @HttpCode(204) + @ApiOperation({ + summary: 'Deactivate a token pool to delete all listeners and stop receiving events', + }) + @ApiBody({ type: TokenPoolDeactivate }) + async deactivatePool(@RequestContext() ctx: Context, @Body() dto: TokenPoolDeactivate) { + await this.service.deactivatePool(ctx, dto); + } + @Post('checkinterface') @HttpCode(200) @ApiOperation({ diff --git a/src/tokens/tokens.interfaces.ts b/src/tokens/tokens.interfaces.ts index f46729c..d174053 100644 --- a/src/tokens/tokens.interfaces.ts +++ b/src/tokens/tokens.interfaces.ts @@ -221,9 +221,15 @@ export class TokenPoolActivate { @IsOptional() config?: TokenPoolConfig; - @ApiProperty({ description: requestIdDescription }) + @ApiProperty() @IsOptional() - requestId?: string; + poolData?: string; +} + +export class TokenPoolDeactivate { + @ApiProperty() + @IsNotEmpty() + poolLocator: string; @ApiProperty() @IsOptional() diff --git a/src/tokens/tokens.service.ts b/src/tokens/tokens.service.ts index e996720..b82c961 100644 --- a/src/tokens/tokens.service.ts +++ b/src/tokens/tokens.service.ts @@ -34,6 +34,7 @@ import { TokenPool, TokenPoolActivate, TokenPoolConfig, + TokenPoolDeactivate, TokenPoolEvent, TokenTransfer, TokenType, @@ -316,6 +317,24 @@ export class TokensService { } } + private getEventAbis(poolLocator: IValidPoolLocator) { + const transferAbi = poolLocator.type === TokenType.FUNGIBLE ? ERC20Transfer : ERC721Transfer; + if (transferAbi?.name === undefined) { + throw new NotFoundException('Transfer event ABI not found'); + } + const approvalAbi = poolLocator.type === TokenType.FUNGIBLE ? ERC20Approval : ERC721Approval; + if (approvalAbi?.name === undefined) { + throw new NotFoundException('Approval event ABI not found'); + } + const approvalForAllAbi = + poolLocator.type === TokenType.FUNGIBLE ? undefined : ERC721ApprovalForAll; + return { + transferAbi, + approvalAbi, + approvalForAllAbi, + }; + } + async activatePool(ctx: Context, dto: TokenPoolActivate) { const poolLocator = unpackPoolLocator(dto.poolLocator); if (!validatePoolLocator(poolLocator)) { @@ -327,26 +346,16 @@ export class TokensService { abi, poolLocator.type === TokenType.FUNGIBLE, ); - + const eventAbis = this.getEventAbis(poolLocator); const stream = await this.getStream(ctx); - const transferAbi = poolLocator.type === TokenType.FUNGIBLE ? ERC20Transfer : ERC721Transfer; - if (transferAbi?.name === undefined) { - throw new NotFoundException('Transfer event ABI not found'); - } - const approvalAbi = poolLocator.type === TokenType.FUNGIBLE ? ERC20Approval : ERC721Approval; - if (approvalAbi?.name === undefined) { - throw new NotFoundException('Approval event ABI not found'); - } - const approvalForAllAbi = - poolLocator.type === TokenType.FUNGIBLE ? undefined : ERC721ApprovalForAll; const promises = [ this.eventstream.getOrCreateSubscription( ctx, this.baseUrl, - transferAbi, + eventAbis.transferAbi, stream.id, - packSubscriptionName(dto.poolLocator, transferAbi.name, dto.poolData), + packSubscriptionName(dto.poolLocator, eventAbis.transferAbi.name, dto.poolData), poolLocator.address, possibleMethods, this.getSubscriptionBlockNumber(dto.config), @@ -354,22 +363,22 @@ export class TokensService { this.eventstream.getOrCreateSubscription( ctx, this.baseUrl, - approvalAbi, + eventAbis.approvalAbi, stream.id, - packSubscriptionName(dto.poolLocator, approvalAbi.name, dto.poolData), + packSubscriptionName(dto.poolLocator, eventAbis.approvalAbi.name, dto.poolData), poolLocator.address, possibleMethods, this.getSubscriptionBlockNumber(dto.config), ), ]; - if (approvalForAllAbi?.name !== undefined) { + if (eventAbis.approvalForAllAbi?.name !== undefined) { promises.push( this.eventstream.getOrCreateSubscription( ctx, this.baseUrl, - approvalForAllAbi, + eventAbis.approvalForAllAbi, stream.id, - packSubscriptionName(dto.poolLocator, approvalForAllAbi.name, dto.poolData), + packSubscriptionName(dto.poolLocator, eventAbis.approvalForAllAbi.name, dto.poolData), poolLocator.address, possibleMethods, this.getSubscriptionBlockNumber(dto.config), @@ -397,6 +406,42 @@ export class TokensService { return tokenPoolEvent; } + async deactivatePool(ctx: Context, dto: TokenPoolDeactivate) { + const poolLocator = unpackPoolLocator(dto.poolLocator); + if (!validatePoolLocator(poolLocator)) { + throw new BadRequestException('Invalid pool locator'); + } + + const stream = await this.getStream(ctx); + const eventAbis = this.getEventAbis(poolLocator); + const promises = [ + this.eventstream.deleteSubscriptionByName( + ctx, + stream.id, + packSubscriptionName(dto.poolLocator, eventAbis.transferAbi.name, dto.poolData), + ), + this.eventstream.deleteSubscriptionByName( + ctx, + stream.id, + packSubscriptionName(dto.poolLocator, eventAbis.approvalAbi.name, dto.poolData), + ), + ]; + if (eventAbis.approvalForAllAbi?.name !== undefined) { + promises.push( + this.eventstream.deleteSubscriptionByName( + ctx, + stream.id, + packSubscriptionName(dto.poolLocator, eventAbis.approvalForAllAbi.name, dto.poolData), + ), + ); + } + + const results = await Promise.all(promises); + if (results.every(deleted => !deleted)) { + throw new NotFoundException('No listeners found'); + } + } + checkInterface(dto: CheckInterfaceRequest): CheckInterfaceResponse { const poolLocator = unpackPoolLocator(dto.poolLocator); if (!validatePoolLocator(poolLocator)) {