Skip to content

Commit

Permalink
feat(cli-tool): add binary support to publish (#2947)
Browse files Browse the repository at this point in the history
## Summary

Subcommand `streamr stream publish` now supports binary publishing. If
the input is detected to be a hexadecimal string, it will be interpreted
as binary and pushed to the network as such.
  • Loading branch information
harbu authored Dec 20, 2024
1 parent 06476ce commit 9e99462
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 12 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ Changes before Tatum release are not documented in this file.

#### Added

- Add binary data support to `streamr stream publish` (https://github.com/streamr-dev/network/pull/2947)
- Add binary data support to `streamr stream susbcribe` (https://github.com/streamr-dev/network/pull/2948)
- Add binary data support to `streamr mock-data generate` command (https://github.com/streamr-dev/network/pull/2946)

Expand Down
34 changes: 22 additions & 12 deletions packages/cli-tools/bin/streamr-stream-publish.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,18 @@ import '../src/logLevel'

import { Writable } from 'stream'
import { StreamrClient } from '@streamr/sdk'
import { wait } from '@streamr/utils'
import { hexToBinary, wait } from '@streamr/utils'
import es from 'event-stream'
import { createClientCommand, Options as BaseOptions } from '../src/command'

interface Options extends BaseOptions {
partitionKeyField?: string
}

const isHexadecimal = (str: string): boolean => {
return /^[0-9a-fA-F]+$/.test(str)
}

const publishStream = (
stream: string,
partitionKeyField: string | undefined,
Expand All @@ -19,21 +23,26 @@ const publishStream = (
const writable = new Writable({
objectMode: true,
write: (data: any, _: any, done: any) => {
let json = null
let message = null
// ignore newlines, etc
if (!data || String(data).trim() === '') {
done()
return
}
try {
json = JSON.parse(data)
} catch (e) {
console.error(data.toString())
done(e)
return
const trimmedData = String(data).trim()
if (isHexadecimal(trimmedData)) {
message = hexToBinary(trimmedData)
} else {
try {
message = JSON.parse(trimmedData)
} catch (e) {
console.error(data.toString())
done(e)
return
}
}
const partitionKey = (partitionKeyField !== undefined) ? json[partitionKeyField] : undefined
client.publish(stream, json, { partitionKey }).then(
const partitionKey = (partitionKeyField !== undefined && typeof message === 'object') ? message[partitionKeyField] : undefined
client.publish(stream, message, { partitionKey }).then(
() => done(),
(err) => done(err)
)
Expand Down Expand Up @@ -63,6 +72,7 @@ createClientCommand(async (client: StreamrClient, streamId: string, options: Opt
})
})
.arguments('<streamId>')
.description('publish to a stream by reading JSON messages from stdin line-by-line')
.option('-k, --partition-key-field <string>', 'field name in each message to use for assigning the message to a stream partition')
.description('publish to a stream by reading JSON messages from stdin line-by-line or hexadecimal strings for binary data')
// eslint-disable-next-line max-len
.option('-k, --partition-key-field <string>', 'field name in each message to use for assigning the message to a stream partition (only for JSON data)')
.parseAsync()

0 comments on commit 9e99462

Please sign in to comment.