From 114b63f26d6ffffa0d807d3f22575c074aab2a6b Mon Sep 17 00:00:00 2001 From: Teo Gebhard Date: Thu, 20 Jun 2024 12:59:52 +0300 Subject: [PATCH 1/3] rename events --- .../src/plugins/storage/StorageEventListener.ts | 8 ++++---- packages/client/src/ContractFactory.ts | 2 +- packages/client/src/MetricsPublisher.ts | 4 ++-- packages/client/src/Stream.ts | 2 +- packages/client/src/StreamrClient.ts | 4 ++-- packages/client/src/contracts/StreamRegistry.ts | 2 +- .../client/src/contracts/StreamStorageRegistry.ts | 4 ++-- packages/client/src/encryption/GroupKeyManager.ts | 2 +- .../client/src/encryption/LocalGroupKeyStore.ts | 2 +- .../client/src/encryption/PublisherKeyExchange.ts | 2 +- packages/client/src/events.ts | 14 +++++++------- packages/client/src/utils/utils.ts | 2 +- .../test/end-to-end/StorageNodeRegistry.test.ts | 4 ++-- .../client/test/end-to-end/StreamRegistry.test.ts | 10 +++++----- packages/client/test/integration/events.test.ts | 2 +- .../test/integration/json-rpc-provider.test.ts | 4 ++-- packages/client/test/unit/GroupKeyManager.test.ts | 2 +- packages/client/test/unit/MetricsPublisher.test.ts | 2 +- packages/client/test/unit/events.test.ts | 2 +- 19 files changed, 37 insertions(+), 37 deletions(-) diff --git a/packages/broker/src/plugins/storage/StorageEventListener.ts b/packages/broker/src/plugins/storage/StorageEventListener.ts index cff3dfe4f6..0aa451217c 100644 --- a/packages/broker/src/plugins/storage/StorageEventListener.ts +++ b/packages/broker/src/plugins/storage/StorageEventListener.ts @@ -40,12 +40,12 @@ export class StorageEventListener { } start(): void { - this.streamrClient.on('addToStorageNode', this.onAddToStorageNode) - this.streamrClient.on('removeFromStorageNode', this.onRemoveFromStorageNode) + this.streamrClient.on('streamAddedToStorageNode', this.onAddToStorageNode) + this.streamrClient.on('streamRemovedFromFromStorageNode', this.onRemoveFromStorageNode) } destroy(): void { - this.streamrClient.off('addToStorageNode', this.onAddToStorageNode) - this.streamrClient.off('removeFromStorageNode', this.onRemoveFromStorageNode) + this.streamrClient.off('streamAddedToStorageNode', this.onAddToStorageNode) + this.streamrClient.off('streamRemovedFromFromStorageNode', this.onRemoveFromStorageNode) } } diff --git a/packages/client/src/ContractFactory.ts b/packages/client/src/ContractFactory.ts index 490ca97452..6537d0b235 100644 --- a/packages/client/src/ContractFactory.ts +++ b/packages/client/src/ContractFactory.ts @@ -57,7 +57,7 @@ export class ContractFactory { 999999 ) contract.eventEmitter.on('onTransactionConfirm', (methodName: string, receipt: ContractTransactionReceipt | null) => { - this.eventEmitter.emit('confirmContractTransaction', { + this.eventEmitter.emit('contractTransactionConfirmed', { methodName, receipt }) diff --git a/packages/client/src/MetricsPublisher.ts b/packages/client/src/MetricsPublisher.ts index dcbf0f9f45..2ff5434459 100644 --- a/packages/client/src/MetricsPublisher.ts +++ b/packages/client/src/MetricsPublisher.ts @@ -78,8 +78,8 @@ export class MetricsPublisher { }) }) if (this.config.periods.length > 0) { - this.eventEmitter.on('publish', () => ensureStarted()) - this.eventEmitter.on('subscribe', () => ensureStarted()) + this.eventEmitter.on('messagePublished', () => ensureStarted()) + this.eventEmitter.on('streamPartSubscribed', () => ensureStarted()) } } diff --git a/packages/client/src/Stream.ts b/packages/client/src/Stream.ts index a8b2a48aeb..e0792afa5f 100644 --- a/packages/client/src/Stream.ts +++ b/packages/client/src/Stream.ts @@ -286,7 +286,7 @@ export class Stream { */ async publish(content: unknown, metadata?: PublishMetadata): Promise { const result = await this._publisher.publish(this.id, content, metadata) - this._eventEmitter.emit('publish', result) + this._eventEmitter.emit('messagePublished', result) return convertStreamMessageToMessage(result) } diff --git a/packages/client/src/StreamrClient.ts b/packages/client/src/StreamrClient.ts index e5c2c01ed8..a359f2c4f2 100644 --- a/packages/client/src/StreamrClient.ts +++ b/packages/client/src/StreamrClient.ts @@ -152,7 +152,7 @@ export class StreamrClient { metadata?: PublishMetadata ): Promise { const result = await this.publisher.publish(streamDefinition, content, metadata) - this.eventEmitter.emit('publish', result) + this.eventEmitter.emit('messagePublished', result) return convertStreamMessageToMessage(result) } @@ -232,7 +232,7 @@ export class StreamrClient { if (onMessage !== undefined) { sub.useLegacyOnMessageHandler(onMessage) } - this.eventEmitter.emit('subscribe', undefined) + this.eventEmitter.emit('streamPartSubscribed', undefined) return sub } diff --git a/packages/client/src/contracts/StreamRegistry.ts b/packages/client/src/contracts/StreamRegistry.ts index e560cff6cc..3e0657f527 100644 --- a/packages/client/src/contracts/StreamRegistry.ts +++ b/packages/client/src/contracts/StreamRegistry.ts @@ -120,7 +120,7 @@ export class StreamRegistry { initContractEventGateway({ sourceName: 'StreamCreated', sourceEmitter: chainEventPoller, - targetName: 'createStream', + targetName: 'streamCreated', targetEmitter: eventEmitter, transformation: (streamId: string, metadata: string, blockNumber: number) => ({ streamId: toStreamID(streamId), diff --git a/packages/client/src/contracts/StreamStorageRegistry.ts b/packages/client/src/contracts/StreamStorageRegistry.ts index f9d127d540..06ca7cab41 100644 --- a/packages/client/src/contracts/StreamStorageRegistry.ts +++ b/packages/client/src/contracts/StreamStorageRegistry.ts @@ -94,7 +94,7 @@ export class StreamStorageRegistry { initContractEventGateway({ sourceName: 'Added', sourceEmitter: chainEventPoller, - targetName: 'addToStorageNode', + targetName: 'streamAddedToStorageNode', targetEmitter: eventEmitter, transformation, loggerFactory @@ -102,7 +102,7 @@ export class StreamStorageRegistry { initContractEventGateway({ sourceName: 'Removed', sourceEmitter: chainEventPoller, - targetName: 'removeFromStorageNode', + targetName: 'streamRemovedFromFromStorageNode', targetEmitter: eventEmitter, transformation, loggerFactory diff --git a/packages/client/src/encryption/GroupKeyManager.ts b/packages/client/src/encryption/GroupKeyManager.ts index 3c3cfdf711..b294046356 100644 --- a/packages/client/src/encryption/GroupKeyManager.ts +++ b/packages/client/src/encryption/GroupKeyManager.ts @@ -64,7 +64,7 @@ export class GroupKeyManager { const groupKeyIds = await waitForEvent( // TODO remove "as any" type casing in NET-889 this.eventEmitter as any, - 'storeEncryptionKeyToLocalStore', + 'encryptionKeyStoredToLocalStore', this.config.encryption.keyRequestTimeout, (storedGroupKeyId: string) => storedGroupKeyId === groupKeyId, this.destroySignal.abortSignal diff --git a/packages/client/src/encryption/LocalGroupKeyStore.ts b/packages/client/src/encryption/LocalGroupKeyStore.ts index e1d7d0f181..69f7082f68 100644 --- a/packages/client/src/encryption/LocalGroupKeyStore.ts +++ b/packages/client/src/encryption/LocalGroupKeyStore.ts @@ -90,7 +90,7 @@ export class LocalGroupKeyStore { const persistence = await this.persistenceManager.getPersistence(NAMESPACES.ENCRYPTION_KEYS) await persistence.set(formLookupKey1(keyId, publisherId), Buffer.from(data).toString('hex')) this.logger.debug('Set key', { keyId, publisherId }) - this.eventEmitter.emit('storeEncryptionKeyToLocalStore', keyId) + this.eventEmitter.emit('encryptionKeyStoredToLocalStore', keyId) } async setLatestEncryptionKeyId(keyId: string, publisherId: EthereumAddress, streamId: StreamID): Promise { diff --git a/packages/client/src/encryption/PublisherKeyExchange.ts b/packages/client/src/encryption/PublisherKeyExchange.ts index a883f58f0e..48a447ea0e 100644 --- a/packages/client/src/encryption/PublisherKeyExchange.ts +++ b/packages/client/src/encryption/PublisherKeyExchange.ts @@ -74,7 +74,7 @@ export class PublisherKeyExchange { node.addMessageListener((msg: StreamMessage) => this.onMessage(msg)) this.logger.debug('Started') }) - eventEmitter.on('publish', (msg) => { + eventEmitter.on('messagePublished', (msg) => { if (msg.signatureType === SignatureType.ERC_1271) { const address = msg.getPublisherId() if (!this.erc1271ContractAddresses.has(address)) { diff --git a/packages/client/src/events.ts b/packages/client/src/events.ts index d97f5fc811..d7239f4d45 100644 --- a/packages/client/src/events.ts +++ b/packages/client/src/events.ts @@ -6,19 +6,19 @@ import { StreamMessage } from '@streamr/protocol' import { ContractTransactionReceipt } from 'ethers' export interface StreamrClientEvents { - createStream: (payload: StreamCreationEvent) => void - addToStorageNode: (payload: StorageNodeAssignmentEvent) => void - removeFromStorageNode: (payload: StorageNodeAssignmentEvent) => void + streamCreated: (payload: StreamCreationEvent) => void + streamAddedToStorageNode: (payload: StorageNodeAssignmentEvent) => void + streamRemovedFromFromStorageNode: (payload: StorageNodeAssignmentEvent) => void /** @internal */ - storeEncryptionKeyToLocalStore: (keyId: string) => void + encryptionKeyStoredToLocalStore: (keyId: string) => void /** @internal */ - confirmContractTransaction: (payload: { methodName: string, receipt: ContractTransactionReceipt | null }) => void + contractTransactionConfirmed: (payload: { methodName: string, receipt: ContractTransactionReceipt | null }) => void } // events for internal communication between StreamrClient components export interface InternalEvents { - publish: (message: StreamMessage) => void - subscribe: () => void + messagePublished: (message: StreamMessage) => void + streamPartSubscribed: () => void } @scoped(Lifecycle.ContainerScoped) diff --git a/packages/client/src/utils/utils.ts b/packages/client/src/utils/utils.ts index 8d7bd21f6f..edec52054e 100644 --- a/packages/client/src/utils/utils.ts +++ b/packages/client/src/utils/utils.ts @@ -183,7 +183,7 @@ export const createTheGraphClient = ( // eslint-disable-next-line no-underscore-dangle indexPollInterval: config._timeouts.theGraph.indexPollInterval }) - eventEmitter.on('confirmContractTransaction', (payload: { receipt: ContractTransactionReceipt | null }) => { + eventEmitter.on('contractTransactionConfirmed', (payload: { receipt: ContractTransactionReceipt | null }) => { if (payload.receipt != null) { instance.updateRequiredBlockNumber(payload.receipt.blockNumber) } diff --git a/packages/client/test/end-to-end/StorageNodeRegistry.test.ts b/packages/client/test/end-to-end/StorageNodeRegistry.test.ts index dbcada31f0..5d6aac9869 100644 --- a/packages/client/test/end-to-end/StorageNodeRegistry.test.ts +++ b/packages/client/test/end-to-end/StorageNodeRegistry.test.ts @@ -83,10 +83,10 @@ describe('StorageNodeRegistry', () => { const onAddPayloads: any[] = [] const onRemovePayloads: any[] = [] - listenerClient.on('addToStorageNode', (payload: any) => { + listenerClient.on('streamAddedToStorageNode', (payload: any) => { onAddPayloads.push(payload) }) - listenerClient.on('removeFromStorageNode', (payload: any) => { + listenerClient.on('streamRemovedFromFromStorageNode', (payload: any) => { onRemovePayloads.push(payload) }) diff --git a/packages/client/test/end-to-end/StreamRegistry.test.ts b/packages/client/test/end-to-end/StreamRegistry.test.ts index 070078b638..6b77b6f637 100644 --- a/packages/client/test/end-to-end/StreamRegistry.test.ts +++ b/packages/client/test/end-to-end/StreamRegistry.test.ts @@ -84,8 +84,8 @@ describe('StreamRegistry', () => { }, TIMEOUT) it('listener', async () => { - const onCreateSteam = jest.fn() - client.on('createStream', onCreateSteam) + const onStreamCreated = jest.fn() + client.on('streamCreated', onStreamCreated) const invalidStream = await client.createStream({ id: createRelativeTestStreamId(module, 'invalid'), partitions: 150 @@ -96,12 +96,12 @@ describe('StreamRegistry', () => { description: 'Foobar' }) const hasBeenCalledFor = (stream: Stream) => { - const streamIds = onCreateSteam.mock.calls.map((c) => c[0].streamId) + const streamIds = onStreamCreated.mock.calls.map((c) => c[0].streamId) return streamIds.includes(stream.id) } await waitForCondition(() => hasBeenCalledFor(validStream)) - client.off('createStream', onCreateSteam) - expect(onCreateSteam).toHaveBeenCalledWith({ + client.off('streamCreated', onStreamCreated) + expect(onStreamCreated).toHaveBeenCalledWith({ streamId: validStream.id, metadata: { partitions: 3, diff --git a/packages/client/test/integration/events.test.ts b/packages/client/test/integration/events.test.ts index 76f50ac10b..f303d45740 100644 --- a/packages/client/test/integration/events.test.ts +++ b/packages/client/test/integration/events.test.ts @@ -18,7 +18,7 @@ describe('events', () => { it('client', async () => { const client = environment.createClient() - client.on('addToStorageNode', () => {}) + client.on('streamAddedToStorageNode', () => {}) await client.destroy() // @ts-expect-error private expect(client.eventEmitter.getListenerCount()).toBe(0) diff --git a/packages/client/test/integration/json-rpc-provider.test.ts b/packages/client/test/integration/json-rpc-provider.test.ts index 2727f2e384..4ec8d4a5fd 100644 --- a/packages/client/test/integration/json-rpc-provider.test.ts +++ b/packages/client/test/integration/json-rpc-provider.test.ts @@ -106,7 +106,7 @@ describe('use JsonRpcProvider', () => { const runErrorTest = async (errorState: ErrorState, extraWait = 0): Promise => { servers.forEach((s) => s.setError('eth_getLogs', errorState)) const receivedEvents: StreamCreationEvent[] = [] - client.on('createStream', (event: StreamCreationEvent) => { + client.on('streamCreated', (event: StreamCreationEvent) => { receivedEvents.push(event) }) await waitForCondition(() => getRequests().some((r) => r.method === 'eth_getLogs'), 5000 + extraWait) @@ -124,7 +124,7 @@ describe('use JsonRpcProvider', () => { it('happy path', async () => { const receivedEvents: StreamCreationEvent[] = [] const now = Date.now() - client.on('createStream', (event: StreamCreationEvent) => { + client.on('streamCreated', (event: StreamCreationEvent) => { receivedEvents.push(event) }) await wait(0.5 * POLL_INTERVAL) diff --git a/packages/client/test/unit/GroupKeyManager.test.ts b/packages/client/test/unit/GroupKeyManager.test.ts index 150bc10ff1..523332103a 100644 --- a/packages/client/test/unit/GroupKeyManager.test.ts +++ b/packages/client/test/unit/GroupKeyManager.test.ts @@ -80,7 +80,7 @@ describe('GroupKeyManager', () => { it('key present in subscriber key exchange', async () => { subscriberKeyExchange.requestGroupKey.mockImplementation(async () => { groupKeyStore.get.mockResolvedValue(groupKey) - setTimeout(() => eventEmitter.emit('storeEncryptionKeyToLocalStore', groupKey.id), 0) + setTimeout(() => eventEmitter.emit('encryptionKeyStoredToLocalStore', groupKey.id), 0) }) const key = await groupKeyManager.fetchKey(toStreamPartID(streamId, 0), groupKeyId, publisherId) diff --git a/packages/client/test/unit/MetricsPublisher.test.ts b/packages/client/test/unit/MetricsPublisher.test.ts index 4e6801367b..24d5a9e209 100644 --- a/packages/client/test/unit/MetricsPublisher.test.ts +++ b/packages/client/test/unit/MetricsPublisher.test.ts @@ -39,7 +39,7 @@ describe('MetricsPublisher', () => { ) // trigger metric publisher to start - eventEmitter.emit('subscribe', undefined) + eventEmitter.emit('streamPartSubscribed', undefined) } const assertPublisherEnabled = async ( diff --git a/packages/client/test/unit/events.test.ts b/packages/client/test/unit/events.test.ts index 1429c11c1b..36701e247e 100644 --- a/packages/client/test/unit/events.test.ts +++ b/packages/client/test/unit/events.test.ts @@ -9,7 +9,7 @@ describe('events', () => { const client = environment.createClient() const onEmit = jest.fn() // @ts-expect-error internal event - client.on('publish', onEmit) + client.on('messagePublished', onEmit) const stream = await client.createStream('/test') await client.publish(stream.id, {}) await stream.publish({}) From c7f5a1d2ba2b60e0693b1a18501ccb523ca6089e Mon Sep 17 00:00:00 2001 From: Teo Gebhard Date: Thu, 20 Jun 2024 13:09:28 +0300 Subject: [PATCH 2/3] update tests --- .../broker/test/unit/plugins/storage/StorageConfig.test.ts | 4 ++-- .../test/unit/plugins/storage/StorageEventListener.test.ts | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/packages/broker/test/unit/plugins/storage/StorageConfig.test.ts b/packages/broker/test/unit/plugins/storage/StorageConfig.test.ts index 5fbaee2677..e0f59d5015 100644 --- a/packages/broker/test/unit/plugins/storage/StorageConfig.test.ts +++ b/packages/broker/test/unit/plugins/storage/StorageConfig.test.ts @@ -103,8 +103,8 @@ describe(StorageConfig, () => { describe('on event-based results', () => { beforeEach(async () => { await storageConfig.start() - const addToStorageNodeListener = storageEventListeners.get('addToStorageNode')! - const removeFromStorageNodeListener = storageEventListeners.get('removeFromStorageNode')! + const addToStorageNodeListener = storageEventListeners.get('streamAddedToStorageNode')! + const removeFromStorageNodeListener = storageEventListeners.get('streamRemovedFromFromStorageNode')! addToStorageNodeListener({ streamId: toStreamID('stream-1'), nodeAddress: CLUSTER_ID, diff --git a/packages/broker/test/unit/plugins/storage/StorageEventListener.test.ts b/packages/broker/test/unit/plugins/storage/StorageEventListener.test.ts index ec87864600..0f750141ef 100644 --- a/packages/broker/test/unit/plugins/storage/StorageEventListener.test.ts +++ b/packages/broker/test/unit/plugins/storage/StorageEventListener.test.ts @@ -49,7 +49,7 @@ describe(StorageEventListener, () => { }) function addToStorageNode(recipient: EthereumAddress) { - storageEventListeners.get('addToStorageNode')!({ + storageEventListeners.get('streamAddedToStorageNode')!({ nodeAddress: recipient, streamId: toStreamID('streamId'), blockNumber: 1234 From 9a935331a1d07d4a5c062ec9fbfd084f8a851d6a Mon Sep 17 00:00:00 2001 From: Teo Gebhard Date: Thu, 20 Jun 2024 13:09:36 +0300 Subject: [PATCH 3/3] changelog --- CHANGELOG.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index c0a6a3c2cf..9775161dfd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,6 +17,10 @@ Changes before Tatum release are not documented in this file. - Update to ethers.js library to v6 (https://github.com/streamr-dev/network/pull/2506) - Restructured `contracts` config structure (https://github.com/streamr-dev/network/pull/2581) - Improve reliability of JSON RPC interactions by adding retry redundancy (https://github.com/streamr-dev/network/pull/2562) +- Renamed events: (https://github.com/streamr-dev/network/pull/2604) + - `createStream` -> `streamCreated` + - `addToStorageNode` -> `streamAddedToStorageNode` + - `removeFromStorageNode` -> `streamRemovedFromFromStorageNode` #### Deprecated