Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(client)[!]: Rename client events #2604

Merged
merged 3 commits into from
Jun 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ Changes before Tatum release are not documented in this file.
- Update to ethers.js library to v6 (https://github.com/streamr-dev/network/pull/2506)
- Restructured `contracts` config structure (https://github.com/streamr-dev/network/pull/2581)
- Improve reliability of JSON RPC interactions by adding retry redundancy (https://github.com/streamr-dev/network/pull/2562)
- Renamed events: (https://github.com/streamr-dev/network/pull/2604)
- `createStream` -> `streamCreated`
- `addToStorageNode` -> `streamAddedToStorageNode`
- `removeFromStorageNode` -> `streamRemovedFromFromStorageNode`

#### Deprecated

Expand Down
8 changes: 4 additions & 4 deletions packages/broker/src/plugins/storage/StorageEventListener.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,12 @@ export class StorageEventListener {
}

start(): void {
this.streamrClient.on('addToStorageNode', this.onAddToStorageNode)
this.streamrClient.on('removeFromStorageNode', this.onRemoveFromStorageNode)
this.streamrClient.on('streamAddedToStorageNode', this.onAddToStorageNode)
this.streamrClient.on('streamRemovedFromFromStorageNode', this.onRemoveFromStorageNode)
}

destroy(): void {
this.streamrClient.off('addToStorageNode', this.onAddToStorageNode)
this.streamrClient.off('removeFromStorageNode', this.onRemoveFromStorageNode)
this.streamrClient.off('streamAddedToStorageNode', this.onAddToStorageNode)
this.streamrClient.off('streamRemovedFromFromStorageNode', this.onRemoveFromStorageNode)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,8 @@ describe(StorageConfig, () => {
describe('on event-based results', () => {
beforeEach(async () => {
await storageConfig.start()
const addToStorageNodeListener = storageEventListeners.get('addToStorageNode')!
const removeFromStorageNodeListener = storageEventListeners.get('removeFromStorageNode')!
const addToStorageNodeListener = storageEventListeners.get('streamAddedToStorageNode')!
const removeFromStorageNodeListener = storageEventListeners.get('streamRemovedFromFromStorageNode')!
addToStorageNodeListener({
streamId: toStreamID('stream-1'),
nodeAddress: CLUSTER_ID,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ describe(StorageEventListener, () => {
})

function addToStorageNode(recipient: EthereumAddress) {
storageEventListeners.get('addToStorageNode')!({
storageEventListeners.get('streamAddedToStorageNode')!({
nodeAddress: recipient,
streamId: toStreamID('streamId'),
blockNumber: 1234
Expand Down
2 changes: 1 addition & 1 deletion packages/client/src/ContractFactory.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ export class ContractFactory {
999999
)
contract.eventEmitter.on('onTransactionConfirm', (methodName: string, receipt: ContractTransactionReceipt | null) => {
this.eventEmitter.emit('confirmContractTransaction', {
this.eventEmitter.emit('contractTransactionConfirmed', {
methodName,
receipt
})
Expand Down
4 changes: 2 additions & 2 deletions packages/client/src/MetricsPublisher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,8 @@ export class MetricsPublisher {
})
})
if (this.config.periods.length > 0) {
this.eventEmitter.on('publish', () => ensureStarted())
this.eventEmitter.on('subscribe', () => ensureStarted())
this.eventEmitter.on('messagePublished', () => ensureStarted())
this.eventEmitter.on('streamPartSubscribed', () => ensureStarted())
}
}

Expand Down
2 changes: 1 addition & 1 deletion packages/client/src/Stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ export class Stream {
*/
async publish(content: unknown, metadata?: PublishMetadata): Promise<Message> {
const result = await this._publisher.publish(this.id, content, metadata)
this._eventEmitter.emit('publish', result)
this._eventEmitter.emit('messagePublished', result)
return convertStreamMessageToMessage(result)
}

Expand Down
4 changes: 2 additions & 2 deletions packages/client/src/StreamrClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ export class StreamrClient {
metadata?: PublishMetadata
): Promise<Message> {
const result = await this.publisher.publish(streamDefinition, content, metadata)
this.eventEmitter.emit('publish', result)
this.eventEmitter.emit('messagePublished', result)
return convertStreamMessageToMessage(result)
}

Expand Down Expand Up @@ -232,7 +232,7 @@ export class StreamrClient {
if (onMessage !== undefined) {
sub.useLegacyOnMessageHandler(onMessage)
}
this.eventEmitter.emit('subscribe', undefined)
this.eventEmitter.emit('streamPartSubscribed', undefined)
return sub
}

Expand Down
2 changes: 1 addition & 1 deletion packages/client/src/contracts/StreamRegistry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ export class StreamRegistry {
initContractEventGateway({
sourceName: 'StreamCreated',
sourceEmitter: chainEventPoller,
targetName: 'createStream',
targetName: 'streamCreated',
targetEmitter: eventEmitter,
transformation: (streamId: string, metadata: string, blockNumber: number) => ({
streamId: toStreamID(streamId),
Expand Down
4 changes: 2 additions & 2 deletions packages/client/src/contracts/StreamStorageRegistry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -94,15 +94,15 @@ export class StreamStorageRegistry {
initContractEventGateway({
sourceName: 'Added',
sourceEmitter: chainEventPoller,
targetName: 'addToStorageNode',
targetName: 'streamAddedToStorageNode',
targetEmitter: eventEmitter,
transformation,
loggerFactory
})
initContractEventGateway({
sourceName: 'Removed',
sourceEmitter: chainEventPoller,
targetName: 'removeFromStorageNode',
targetName: 'streamRemovedFromFromStorageNode',
targetEmitter: eventEmitter,
transformation,
loggerFactory
Expand Down
2 changes: 1 addition & 1 deletion packages/client/src/encryption/GroupKeyManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ export class GroupKeyManager {
const groupKeyIds = await waitForEvent(
// TODO remove "as any" type casing in NET-889
this.eventEmitter as any,
'storeEncryptionKeyToLocalStore',
'encryptionKeyStoredToLocalStore',
this.config.encryption.keyRequestTimeout,
(storedGroupKeyId: string) => storedGroupKeyId === groupKeyId,
this.destroySignal.abortSignal
Expand Down
2 changes: 1 addition & 1 deletion packages/client/src/encryption/LocalGroupKeyStore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ export class LocalGroupKeyStore {
const persistence = await this.persistenceManager.getPersistence(NAMESPACES.ENCRYPTION_KEYS)
await persistence.set(formLookupKey1(keyId, publisherId), Buffer.from(data).toString('hex'))
this.logger.debug('Set key', { keyId, publisherId })
this.eventEmitter.emit('storeEncryptionKeyToLocalStore', keyId)
this.eventEmitter.emit('encryptionKeyStoredToLocalStore', keyId)
}

async setLatestEncryptionKeyId(keyId: string, publisherId: EthereumAddress, streamId: StreamID): Promise<void> {
Expand Down
2 changes: 1 addition & 1 deletion packages/client/src/encryption/PublisherKeyExchange.ts
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ export class PublisherKeyExchange {
node.addMessageListener((msg: StreamMessage) => this.onMessage(msg))
this.logger.debug('Started')
})
eventEmitter.on('publish', (msg) => {
eventEmitter.on('messagePublished', (msg) => {
if (msg.signatureType === SignatureType.ERC_1271) {
const address = msg.getPublisherId()
if (!this.erc1271ContractAddresses.has(address)) {
Expand Down
14 changes: 7 additions & 7 deletions packages/client/src/events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,19 @@ import { StreamMessage } from '@streamr/protocol'
import { ContractTransactionReceipt } from 'ethers'

export interface StreamrClientEvents {
createStream: (payload: StreamCreationEvent) => void
addToStorageNode: (payload: StorageNodeAssignmentEvent) => void
removeFromStorageNode: (payload: StorageNodeAssignmentEvent) => void
streamCreated: (payload: StreamCreationEvent) => void
streamAddedToStorageNode: (payload: StorageNodeAssignmentEvent) => void
streamRemovedFromFromStorageNode: (payload: StorageNodeAssignmentEvent) => void
/** @internal */
storeEncryptionKeyToLocalStore: (keyId: string) => void
encryptionKeyStoredToLocalStore: (keyId: string) => void
/** @internal */
confirmContractTransaction: (payload: { methodName: string, receipt: ContractTransactionReceipt | null }) => void
contractTransactionConfirmed: (payload: { methodName: string, receipt: ContractTransactionReceipt | null }) => void
}

// events for internal communication between StreamrClient components
export interface InternalEvents {
publish: (message: StreamMessage) => void
subscribe: () => void
messagePublished: (message: StreamMessage) => void
streamPartSubscribed: () => void
}

@scoped(Lifecycle.ContainerScoped)
Expand Down
2 changes: 1 addition & 1 deletion packages/client/src/utils/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ export const createTheGraphClient = (
// eslint-disable-next-line no-underscore-dangle
indexPollInterval: config._timeouts.theGraph.indexPollInterval
})
eventEmitter.on('confirmContractTransaction', (payload: { receipt: ContractTransactionReceipt | null }) => {
eventEmitter.on('contractTransactionConfirmed', (payload: { receipt: ContractTransactionReceipt | null }) => {
if (payload.receipt != null) {
instance.updateRequiredBlockNumber(payload.receipt.blockNumber)
}
Expand Down
4 changes: 2 additions & 2 deletions packages/client/test/end-to-end/StorageNodeRegistry.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -83,10 +83,10 @@ describe('StorageNodeRegistry', () => {

const onAddPayloads: any[] = []
const onRemovePayloads: any[] = []
listenerClient.on('addToStorageNode', (payload: any) => {
listenerClient.on('streamAddedToStorageNode', (payload: any) => {
onAddPayloads.push(payload)
})
listenerClient.on('removeFromStorageNode', (payload: any) => {
listenerClient.on('streamRemovedFromFromStorageNode', (payload: any) => {
onRemovePayloads.push(payload)
})

Expand Down
10 changes: 5 additions & 5 deletions packages/client/test/end-to-end/StreamRegistry.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,8 @@ describe('StreamRegistry', () => {
}, TIMEOUT)

it('listener', async () => {
const onCreateSteam = jest.fn()
client.on('createStream', onCreateSteam)
const onStreamCreated = jest.fn()
client.on('streamCreated', onStreamCreated)
const invalidStream = await client.createStream({
id: createRelativeTestStreamId(module, 'invalid'),
partitions: 150
Expand All @@ -96,12 +96,12 @@ describe('StreamRegistry', () => {
description: 'Foobar'
})
const hasBeenCalledFor = (stream: Stream) => {
const streamIds = onCreateSteam.mock.calls.map((c) => c[0].streamId)
const streamIds = onStreamCreated.mock.calls.map((c) => c[0].streamId)
return streamIds.includes(stream.id)
}
await waitForCondition(() => hasBeenCalledFor(validStream))
client.off('createStream', onCreateSteam)
expect(onCreateSteam).toHaveBeenCalledWith({
client.off('streamCreated', onStreamCreated)
expect(onStreamCreated).toHaveBeenCalledWith({
streamId: validStream.id,
metadata: {
partitions: 3,
Expand Down
2 changes: 1 addition & 1 deletion packages/client/test/integration/events.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ describe('events', () => {

it('client', async () => {
const client = environment.createClient()
client.on('addToStorageNode', () => {})
client.on('streamAddedToStorageNode', () => {})
await client.destroy()
// @ts-expect-error private
expect(client.eventEmitter.getListenerCount()).toBe(0)
Expand Down
4 changes: 2 additions & 2 deletions packages/client/test/integration/json-rpc-provider.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ describe('use JsonRpcProvider', () => {
const runErrorTest = async (errorState: ErrorState, extraWait = 0): Promise<void> => {
servers.forEach((s) => s.setError('eth_getLogs', errorState))
const receivedEvents: StreamCreationEvent[] = []
client.on('createStream', (event: StreamCreationEvent) => {
client.on('streamCreated', (event: StreamCreationEvent) => {
receivedEvents.push(event)
})
await waitForCondition(() => getRequests().some((r) => r.method === 'eth_getLogs'), 5000 + extraWait)
Expand All @@ -124,7 +124,7 @@ describe('use JsonRpcProvider', () => {
it('happy path', async () => {
const receivedEvents: StreamCreationEvent[] = []
const now = Date.now()
client.on('createStream', (event: StreamCreationEvent) => {
client.on('streamCreated', (event: StreamCreationEvent) => {
receivedEvents.push(event)
})
await wait(0.5 * POLL_INTERVAL)
Expand Down
2 changes: 1 addition & 1 deletion packages/client/test/unit/GroupKeyManager.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ describe('GroupKeyManager', () => {
it('key present in subscriber key exchange', async () => {
subscriberKeyExchange.requestGroupKey.mockImplementation(async () => {
groupKeyStore.get.mockResolvedValue(groupKey)
setTimeout(() => eventEmitter.emit('storeEncryptionKeyToLocalStore', groupKey.id), 0)
setTimeout(() => eventEmitter.emit('encryptionKeyStoredToLocalStore', groupKey.id), 0)
})

const key = await groupKeyManager.fetchKey(toStreamPartID(streamId, 0), groupKeyId, publisherId)
Expand Down
2 changes: 1 addition & 1 deletion packages/client/test/unit/MetricsPublisher.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ describe('MetricsPublisher', () => {
)

// trigger metric publisher to start
eventEmitter.emit('subscribe', undefined)
eventEmitter.emit('streamPartSubscribed', undefined)
}

const assertPublisherEnabled = async (
Expand Down
2 changes: 1 addition & 1 deletion packages/client/test/unit/events.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ describe('events', () => {
const client = environment.createClient()
const onEmit = jest.fn()
// @ts-expect-error internal event
client.on('publish', onEmit)
client.on('messagePublished', onEmit)
const stream = await client.createStream('/test')
await client.publish(stream.id, {})
await stream.publish({})
Expand Down
Loading