Skip to content

Commit

Permalink
feat(sdk): [NET-1316] Use StreamRegistry v5 permission methods (#2780)
Browse files Browse the repository at this point in the history
**NOTE: Do not make releases until we've deployed `StreamRegistry` v5 to
production.**

## Summary

Updated the sdk to use `StreamRegistry` v5 instead of v4. In v5 there
are `*forUserId`-permission methods which support arbitrary length user
ids.

The API is same for all permission types, but longer/shorter IDs is can
be used only for publish and subscribe permissions. as there is no
contract-level support for other types (edit, grant, delete).

## Changes

Replaced usage:
- `hasPermission`, `hasDirectPermission` -> `getPermissionsForUserId`,
`getDirectPermissionsForUserId`
  - i.e. read a permissions struct instead of reading a boolean
- it still is ok to use `hasPublicPermission` contract call to check
public access, no changes in that logic
- `grantPermission` -> `grantPermissionForUserId`
- `revokePermission` -> `revokePermissionForUserId`
- `setPermissionsMultipleStreams` ->
`setMultipleStreamPermissionsForUserIds`

Also changed The Graph queries to use `userId` instead of `userAddress`.

## Refactoring

Refactoring the internal call flow in `StreamRegistry` by introducing a
helper method `isStreamPublisherOrSubscriber_nonCached`.

## Tests

Changed `randomUserId` to return random-length user ids.
  • Loading branch information
teogeb authored Nov 1, 2024
1 parent 85a9e03 commit 6b73740
Show file tree
Hide file tree
Showing 12 changed files with 680 additions and 500 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ Changes before Tatum release are not documented in this file.

#### Added

- Add support for arbitrary length user IDs (https://github.com/streamr-dev/network/pull/2774, https://github.com/streamr-dev/network/pull/2780)
- it is supported for `PUBLISH` and `SUBSCRIBE` permissions
- new `StreamrClient#getUserId()` method
- Method `StreamrClient#getDiagnosticInfo()` provides diagnostic info about network (https://github.com/streamr-dev/network/pull/2740, https://github.com/streamr-dev/network/pull/2741)
- Add `StreamrClient#getUserId()` method (https://github.com/streamr-dev/network/pull/2774)
- Add `Stream#getPartitionCount()` method (https://github.com/streamr-dev/network/pull/2825)
Expand All @@ -21,6 +24,7 @@ Changes before Tatum release are not documented in this file.
- **BREAKING CHANGE:** Field `StreamMetadata#partitions` is nullable (https://github.com/streamr-dev/network/pull/2825)
- **BREAKING CHANGE:** Method `Stream#update()` overwrites metadata instead of merging it (https://github.com/streamr-dev/network/pull/2826)
- **BREAKING CHANGE:** Method `Stream#addToStorageNode()` doesn't wait for acknowledgment by default (https://github.com/streamr-dev/network/pull/2810)
- Upgrade `StreamRegistry` from v4 to v5 (https://github.com/streamr-dev/network/pull/2780)
- Network-level changes
- Avoid routing through proxy connections (https://github.com/streamr-dev/network/pull/2801)
- Internal record `StreamPartitionInfo` format changed (https://github.com/streamr-dev/network/pull/2738, https://github.com/streamr-dev/network/pull/2790)
Expand Down
1 change: 1 addition & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

81 changes: 49 additions & 32 deletions packages/sdk/src/contracts/StreamRegistry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,27 +8,29 @@ import {
UserID,
collect,
isENSName,
isEthereumAddressUserId,
toEthereumAddress,
toStreamID,
toUserId
} from '@streamr/utils'
import { ContractTransactionResponse } from 'ethers'
import { intersection } from 'lodash'
import { Lifecycle, inject, scoped } from 'tsyringe'
import { Authentication, AuthenticationInjectionToken } from '../Authentication'
import { ConfigInjectionToken, StrictStreamrClientConfig } from '../Config'
import { RpcProviderSource } from '../RpcProviderSource'
import { Stream, StreamMetadata } from '../Stream'
import { StreamIDBuilder } from '../StreamIDBuilder'
import { StreamrClientError } from '../StreamrClientError'
import type { StreamRegistryV4 as StreamRegistryContract } from '../ethereumArtifacts/StreamRegistryV4'
import StreamRegistryArtifact from '../ethereumArtifacts/StreamRegistryV4Abi.json'
import type { StreamRegistryV5 as StreamRegistryContract } from '../ethereumArtifacts/StreamRegistryV5'
import StreamRegistryArtifact from '../ethereumArtifacts/StreamRegistryV5Abi.json'
import { getEthersOverrides } from '../ethereumUtils'
import { StreamrClientEventEmitter } from '../events'
import {
ChainPermissions,
InternalPermissionAssignment,
InternalPermissionQuery,
PUBLIC_PERMISSION_ADDRESS,
PUBLIC_PERMISSION_USER_ID,
PermissionQueryResult,
StreamPermission,
convertChainPermissionsToStreamPermissions,
Expand Down Expand Up @@ -60,7 +62,7 @@ export interface StreamQueryResult {

interface StreamPublisherOrSubscriberItem {
id: string
userAddress: string
userId: string
}

export interface StreamCreationEvent {
Expand All @@ -69,6 +71,24 @@ export interface StreamCreationEvent {
readonly blockNumber: number
}

const validatePermissionAssignments = (assignments: InternalPermissionAssignment[]): void | never => {
for (const assignment of assignments) {
if (!isPublicPermissionAssignment(assignment)) {
if (!isEthereumAddressUserId(assignment.user)) {
// not supported by StreamRegistry v5 contract
const ETHEREUM_ONLY_PERMISSION_TYPES = [StreamPermission.EDIT, StreamPermission.DELETE, StreamPermission.GRANT]
const invalidPermissions = intersection(assignment.permissions, ETHEREUM_ONLY_PERMISSION_TYPES)
if (invalidPermissions.length > 0) {
throw new StreamrClientError(
`Non-Ethereum user id is not supported for permission types: ${invalidPermissions.map((p) => p.toUpperCase()).join(', ')}`,
'UNSUPPORTED_OPERATION'
)
}
}
}
}
}

const streamContractErrorProcessor = (err: any, streamId: StreamID, registry: string): never => {
if (err.reason?.code === 'CALL_EXCEPTION') {
throw new StreamrClientError('Stream not found: id=' + streamId, 'STREAM_NOT_FOUND')
Expand Down Expand Up @@ -149,15 +169,15 @@ export class StreamRegistry {
}
})
this.isStreamPublisher_cached = CacheAsyncFn((streamId: StreamID, userId: UserID) => {
return this.isStreamPublisher_nonCached(streamId, userId)
return this.isStreamPublisher(streamId, userId, false)
}, {
...config.cache,
cacheKey([streamId, userId]): string {
return [streamId, userId].join(CACHE_KEY_SEPARATOR)
}
})
this.isStreamSubscriber_cached = CacheAsyncFn((streamId: StreamID, userId: UserID) => {
return this.isStreamSubscriber_nonCached(streamId, userId)
return this.isStreamSubscriber(streamId, userId, false)
}, {
...config.cache,
cacheKey([streamId, userId]): string {
Expand Down Expand Up @@ -315,7 +335,7 @@ export class StreamRegistry {
const validItems = filter<StreamPublisherOrSubscriberItem>(backendResults, (p) => (p as any).stream !== null)
yield* map<StreamPublisherOrSubscriberItem, UserID>(
validItems,
(item) => toUserId(item.userAddress)
(item) => toUserId(item.userId)
)
}

Expand All @@ -337,7 +357,7 @@ export class StreamRegistry {
}
) {
id
userAddress
userId
stream {
id
}
Expand All @@ -352,13 +372,15 @@ export class StreamRegistry {

async hasPermission(query: InternalPermissionQuery): Promise<boolean> {
const streamId = await this.streamIdBuilder.toStreamID(query.streamId)
const permissionType = streamPermissionToSolidityType(query.permission)
if (isPublicPermissionQuery(query)) {
const permissionType = streamPermissionToSolidityType(query.permission)
return this.streamRegistryContractReadonly.hasPublicPermission(streamId, permissionType)
} else if (query.allowPublic) {
return this.streamRegistryContractReadonly.hasPermission(streamId, query.user, permissionType)
} else {
return this.streamRegistryContractReadonly.hasDirectPermission(streamId, query.user, permissionType)
const chainPermissions = query.allowPublic
? await this.streamRegistryContractReadonly.getPermissionsForUserId(streamId, query.user)
: await this.streamRegistryContractReadonly.getDirectPermissionsForUserId(streamId, query.user)
const permissions = convertChainPermissionsToStreamPermissions(chainPermissions)
return permissions.includes(query.permission)
}
}

Expand All @@ -372,7 +394,7 @@ export class StreamRegistry {
metadata
permissions(first: ${pageSize} orderBy: "id" where: { id_gt: "${lastId}"}) {
id
userAddress
userId
canEdit
canDelete
publishExpiration
Expand Down Expand Up @@ -401,14 +423,14 @@ export class StreamRegistry {
* empty assignments cleanup in The Graph.
*/
if (permissions.length > 0) {
if (permissionResult.userAddress === PUBLIC_PERMISSION_ADDRESS) {
if (permissionResult.userId === PUBLIC_PERMISSION_USER_ID) {
assignments.push({
public: true,
permissions
})
} else {
assignments.push({
user: toUserId(permissionResult.userAddress),
user: toUserId(permissionResult.userId),
permissions
})
}
Expand All @@ -418,21 +440,23 @@ export class StreamRegistry {
}

async grantPermissions(streamIdOrPath: string, ...assignments: InternalPermissionAssignment[]): Promise<void> {
validatePermissionAssignments(assignments)
const overrides = await getEthersOverrides(this.rpcProviderSource, this.config)
return this.updatePermissions(streamIdOrPath, (streamId: StreamID, userId: UserID | undefined, solidityType: bigint) => {
return (userId === undefined)
? this.streamRegistryContract!.grantPublicPermission(streamId, solidityType, overrides)
: this.streamRegistryContract!.grantPermission(streamId, userId, solidityType, overrides)
: this.streamRegistryContract!.grantPermissionForUserId(streamId, userId, solidityType, overrides)
}, ...assignments)
}

/* eslint-disable max-len */
async revokePermissions(streamIdOrPath: string, ...assignments: InternalPermissionAssignment[]): Promise<void> {
validatePermissionAssignments(assignments)
const overrides = await getEthersOverrides(this.rpcProviderSource, this.config)
return this.updatePermissions(streamIdOrPath, (streamId: StreamID, userId: UserID | undefined, solidityType: bigint) => {
return (userId === undefined)
? this.streamRegistryContract!.revokePublicPermission(streamId, solidityType, overrides)
: this.streamRegistryContract!.revokePermission(streamId, userId, solidityType, overrides)
: this.streamRegistryContract!.revokePermissionForUserId(streamId, userId, solidityType, overrides)
}, ...assignments)
}

Expand All @@ -459,22 +483,23 @@ export class StreamRegistry {
assignments: InternalPermissionAssignment[]
}[]): Promise<void> {
const streamIds: StreamID[] = []
const targets: (UserID | typeof PUBLIC_PERMISSION_ADDRESS)[][] = []
const targets: (UserID | typeof PUBLIC_PERMISSION_USER_ID)[][] = []
const chainPermissions: ChainPermissions[][] = []
for (const item of items) {
validatePermissionAssignments(item.assignments)
const streamId = await this.streamIdBuilder.toStreamID(item.streamId)
this.clearStreamCache(streamId)
streamIds.push(streamId)
targets.push(item.assignments.map((assignment) => {
return isPublicPermissionAssignment(assignment) ? PUBLIC_PERMISSION_ADDRESS : assignment.user
return isPublicPermissionAssignment(assignment) ? PUBLIC_PERMISSION_USER_ID : assignment.user
}))
chainPermissions.push(item.assignments.map((assignment) => {
return convertStreamPermissionsToChainPermission(assignment.permissions)
}))
}
await this.connectToContract()
const ethersOverrides = await getEthersOverrides(this.rpcProviderSource, this.config)
const txToSubmit = this.streamRegistryContract!.setPermissionsMultipleStreams(
const txToSubmit = this.streamRegistryContract!.setMultipleStreamPermissionsForUserIds(
streamIds,
targets,
chainPermissions,
Expand All @@ -483,17 +508,9 @@ export class StreamRegistry {
await waitForTx(txToSubmit)
}

private async isStreamPublisher_nonCached(streamId: StreamID, userId: UserID): Promise<boolean> {
try {
return await this.streamRegistryContractReadonly.hasPermission(streamId, userId, streamPermissionToSolidityType(StreamPermission.PUBLISH))
} catch (err) {
return streamContractErrorProcessor(err, streamId, 'StreamPermission')
}
}

private async isStreamSubscriber_nonCached(streamId: StreamID, userId: UserID): Promise<boolean> {
private async isStreamPublisherOrSubscriber_nonCached(streamId: StreamID, userId: UserID, permission: StreamPermission): Promise<boolean> {
try {
return await this.streamRegistryContractReadonly.hasPermission(streamId, userId, streamPermissionToSolidityType(StreamPermission.SUBSCRIBE))
return await this.hasPermission({ streamId, user: userId, permission, allowPublic: true })
} catch (err) {
return streamContractErrorProcessor(err, streamId, 'StreamPermission')
}
Expand All @@ -515,15 +532,15 @@ export class StreamRegistry {
if (useCache) {
return this.isStreamPublisher_cached(streamId, userId)
} else {
return this.isStreamPublisher_nonCached(streamId, userId)
return this.isStreamPublisherOrSubscriber_nonCached(streamId, userId, StreamPermission.PUBLISH)
}
}

isStreamSubscriber(streamId: StreamID, userId: UserID, useCache = true): Promise<boolean> {
if (useCache) {
return this.isStreamSubscriber_cached(streamId, userId)
} else {
return this.isStreamSubscriber_nonCached(streamId, userId)
return this.isStreamPublisherOrSubscriber_nonCached(streamId, userId, StreamPermission.SUBSCRIBE)
}
}

Expand Down
10 changes: 5 additions & 5 deletions packages/sdk/src/contracts/searchStreams.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { ChangeFieldType, GraphQLQuery, HexString, Logger, StreamID, TheGraphClient, toStreamID, toUserId, UserID } from '@streamr/utils'
import { Stream } from '../Stream'
import { ChainPermissions, PUBLIC_PERMISSION_ADDRESS, StreamPermission, convertChainPermissionsToStreamPermissions } from '../permission'
import { ChainPermissions, PUBLIC_PERMISSION_USER_ID, StreamPermission, convertChainPermissionsToStreamPermissions } from '../permission'
import { filter, map, unique } from '../utils/GeneratorUtils'
import { StreamQueryResult } from './StreamRegistry'

Expand Down Expand Up @@ -74,7 +74,7 @@ async function* fetchSearchStreamsResultFromTheGraph(
const withoutOrphaned = filter(backendResults, (p) => p.stream !== null)
/*
* As we query via permissions entity, any stream can appear multiple times (once per
* permission user) if we don't do have exactly one userAddress in the GraphQL query.
* permission user) if we don't do have exactly one userId in the GraphQL query.
* That is the case if no permission filter is defined at all, or if permission.allowPublic
* is true (then it appears twice: once for the user, and once for the public address).
*/
Expand Down Expand Up @@ -119,9 +119,9 @@ const buildQuery = (
id_gt: lastId
}
if (permissionFilter !== undefined) {
variables.userAddress_in = [permissionFilter.user]
variables.userId_in = [permissionFilter.user]
if (permissionFilter.allowPublic) {
variables.userAddress_in.push(PUBLIC_PERMISSION_ADDRESS)
variables.userId_in.push(PUBLIC_PERMISSION_USER_ID)
}
if (permissionFilter.allOf !== undefined) {
const now = String(Math.round(Date.now() / 1000))
Expand All @@ -135,7 +135,7 @@ const buildQuery = (
const query = `
query (
$stream_contains: String,
$userAddress_in: [Bytes!]
$userId_in: [Bytes!]
$canEdit: Boolean
$canDelete: Boolean
$publishExpiration_gt: BigInt
Expand Down
Loading

0 comments on commit 6b73740

Please sign in to comment.