Skip to content

Commit

Permalink
Merge pull request #14 from bots-garden/nats-publisher
Browse files Browse the repository at this point in the history
πŸ›Ÿ Updated: publisher ok, next->doc
  • Loading branch information
k33g authored Aug 20, 2023
2 parents 29e7b3c + bc5000e commit e5f52d2
Show file tree
Hide file tree
Showing 29 changed files with 1,257 additions and 42 deletions.
9 changes: 9 additions & 0 deletions docs/host-functions/intro.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,12 @@ This section explains how to use the host functions with an Extism wasm plugin.
- [Memory Cache](mem.md)
- [Redis Cache](redis.md)
- [Redis PubSub](redis-pubsub.md)
- [Nats Publish](nats.md)

!!! info "About Extism Host Functions"
You can find more information into the **Extism** documentation: [https://extism.org/docs/concepts/host-functions](https://extism.org/docs/concepts/host-functions).
Each [Plug-in Development Kits](https://extism.org/docs/concepts/pdk) provides the "bridge helpers" allowing using the host function callbacks of the host application from the guest wasm plug-in. Then you need to "assemble" the "bridge helpers" to call the host function. It's not always straightforward, and you need to do it for every language you use.

Right now, we want to make sure that **SlingShot** is compliant with **Extism** PDKs and that's why we're only working on the "host" part.

In the future, the **Slingshot** project will propose **layers of abstraction** to simplify the use of host functions.
5 changes: 2 additions & 3 deletions docs/host-functions/nats.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
# πŸ› οΈ Host functions

## hostNatsPublish

πŸ‘€ Look at this sample: [Nats publisher plug-in](../)
## hostInitNatsConnection & hostNatsPublish

πŸ‘€ Look at this sample: [Nats publisher plug-in](../write-nats-publisher.md)
5 changes: 3 additions & 2 deletions docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
```bash title="Trigger a wasm plug-in with NATS messages"
./slingshot nats subscribe --wasm=./hello.wasm --handler=message \
--url=${NATS_SERVER_URL} \
--client-id=007 \
--connection-id=007 \
--subject=news
```

Expand All @@ -44,4 +44,5 @@ Slingshot is developed in Go with **[Wazero](https://wazero.io/)**[^1] as the Wa
- [Write & serve a plug-in as a nano-service](write-service.md)
- [Write a Redis subscriber plug-in](write-redis-subscriber.md)
- [Write a Redis publisher plug-in](write-redis-publisher.md)
- [Write a NATS subscriber plug-in](write-nats-subscriber.md)
- [Write a NATS subscriber plug-in](write-nats-subscriber.md)
- [Write a NATS publisher plug-in](write-nats-publisher.md)
348 changes: 348 additions & 0 deletions docs/write-nats-publisher.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,348 @@
# Write a NATS publisher plug-in

> We are going to use it to publish message to the [NATS subscriber](write-nats-subscriber.md)
=== "Go"
```golang linenums="1"
package main

import (
"errors"

"github.com/extism/go-pdk"
"github.com/valyala/fastjson"
)

//export hostPrint
func hostPrint(offset uint64) uint64

func Print(text string) {
memoryText := pdk.AllocateString(text)
hostPrint(memoryText.Offset())
}

//export hostGetEnv
func hostGetEnv(offset uint64) uint64

func GetEnv(name string) string {
// copy the name of the environment variable to the shared memory
variableName := pdk.AllocateString(name)
// call the host function
offset := hostGetEnv(variableName.Offset())

// read the value of the result from the shared memory
variableValue := pdk.FindMemory(offset)
buffer := make([]byte, variableValue.Length())
variableValue.Load(buffer)

// cast the buffer to string and return the value
envVarValue := string(buffer)
return envVarValue
}

var parser = fastjson.Parser{}

//export hostInitNatsConnection
func hostInitNatsConnection(offset uint64) uint64

func InitNatsConnection(natsConnectionId string, natsUrl string) (string, error) {
jsonStrArguments := `{"id":"` + natsConnectionId + `","url":"` + natsUrl + `"}`
// Copy the string value to the shared memory
arguments := pdk.AllocateString(jsonStrArguments)

// Call the host function with Json string argument
offset := hostInitNatsConnection(arguments.Offset())

// Get result from the shared memory
// The host function (hostInitNatsConnection) returns a JSON buffer:
// {
// "success": "the NATS connexion id",
// "failure": "error message if error, else empty"
// }
memoryResult := pdk.FindMemory(offset)
buffer := make([]byte, memoryResult.Length())
memoryResult.Load(buffer)

JSONData, err := parser.ParseBytes(buffer)
if err != nil {
return "", err
}
if len(JSONData.GetStringBytes("failure")) == 0 {
return string(JSONData.GetStringBytes("success")), nil
} else {
return "", errors.New(string(JSONData.GetStringBytes("failure")))
}

}


//export hostNatsPublish
func hostNatsPublish(offset uint64) uint64

func NatsPublish(natsServer string, subject string, data string) (string, error) {
// Prepare the arguments for the host function
// with a JSON string:
// {
// "id": "id of the NATS client",
// "subject": "name",
// "data": "Bob Morane"
// }
//jsonStr := `{"url":"` + natsServer + `","subject":"` + subject + `","data":"` + data + `"}`
jsonStr := `{"id":"` + natsServer + `","subject":"` + subject + `","data":"` + data + `"}`

// Copy the string value to the shared memory
arguments := pdk.AllocateString(jsonStr)

// Call host function with the offset of the arguments
offset := hostNatsPublish(arguments.Offset())

// Get result from the shared memory
// The host function (hostNatsPublish) returns a JSON buffer:
// {
// "success": "the message",
// "failure": "error message if error, else empty"
// }
memoryResult := pdk.FindMemory(offset)
buffResult := make([]byte, memoryResult.Length())
memoryResult.Load(buffResult)

JSONData, err := parser.ParseBytes(buffResult)

if err != nil {
return "", err
}
if len(JSONData.GetStringBytes("failure")) == 0 {
return string(JSONData.GetStringBytes("success")), nil
} else {
return "", errors.New(string(JSONData.GetStringBytes("failure")))
}
}

//export publish
func publish() uint64 {
input := pdk.Input()

natsURL := GetEnv("NATS_URL")
Print("πŸ’œ NATS_URL: " + natsURL)
idNatsConnection, errInit := InitNatsConnection("natsconn01", natsURL)
if errInit != nil {
Print("😑 " + errInit.Error())
} else {
Print("πŸ™‚ " + idNatsConnection)
}

res, err := NatsPublish("natsconn01", "news", string(input))

if err != nil {
Print("😑 " + err.Error())
} else {
Print("πŸ™‚ " + res)
}
return 0
}

func main() {}
```

=== "Rust"
```rust linenums="1"
use extism_pdk::*;
use serde::{Serialize, Deserialize};
use thiserror::Error;

extern "C" {
fn hostPrint(ptr: u64) -> u64;
}

pub fn print(text: String) {
let mut memory_text: Memory = extism_pdk::Memory::new(text.len());
memory_text.store(text);
unsafe { hostPrint(memory_text.offset) };
}

extern "C" {
fn hostGetEnv(ptr: u64) -> u64;
}

pub fn get_env(name: String) -> String {
// copy the name of the environment variable to the shared memory
let mut variable_name: Memory = extism_pdk::Memory::new(name.len());
variable_name.store(name);

// call the host function
let offset: u64 = unsafe { hostGetEnv(variable_name.offset) };

// read the value of the result from the shared memory
let variable_value: Memory = extism_pdk::Memory::find(offset).unwrap();

// return the value
return variable_value.to_string().unwrap()
}

#[derive(Serialize, Deserialize, Debug)]
struct NatsConfig {
pub id: String,
pub url: String,
}

#[derive(Serialize, Deserialize, Debug)]
struct NatsMessage {
pub id: String,
pub subject: String,
pub data: String,
}

#[derive(Serialize, Deserialize, Debug)]
struct StringResult {
pub success: String,
pub failure: String,
}

#[derive(Error, Debug)]
pub enum NatsError {
#[error("Nats Connection issue")]
ConnectionFailure,
#[error("Store issue")]
MessageFailure,
#[error("Not found")]
NotFound,
}

extern "C" {
fn hostInitNatsConnection(offset: u64) -> u64;
}

pub fn init_nats_connection(nats_connection_id: String, nats_url: String) -> Result<String, Error> {
// Prepare the arguments for the host function
// with a JSON string:
// {
// "id": "id of the NATS connection",
// "url": "URL of the NATS server"
// }
let args = NatsConfig {
id: nats_connection_id,
url: nats_url,
};
let json_str: String = serde_json::to_string(&args).unwrap();

// Copy the string value to the shared memory
let mut memory_json_str: Memory = extism_pdk::Memory::new(json_str.len());
memory_json_str.store(json_str);

// Call host function with the offset of the arguments
let offset: u64 = unsafe { hostInitNatsConnection(memory_json_str.offset) };

// Get result from the shared memory
// The host function (hostInitNatsConnection) returns a JSON buffer:
// {
// "success": "id of the connection",
// "failure": "error message if error, else empty"
// }
let memory_result: Memory = extism_pdk::Memory::find(offset).unwrap();
let json_string:String = memory_result.to_string().unwrap();
let result: StringResult = serde_json::from_str(&json_string).unwrap();

if result.failure.is_empty() {
return Ok(result.success);
} else {
return Err(NatsError::ConnectionFailure.into());
}
}

extern "C" {
fn hostNatsPublish(offset: u64) -> u64;
}

pub fn nats_publish(nats_connection_id: String, subject: String, data: String) -> Result<String, Error> {
// Prepare the arguments for the host function
// with a JSON string:
// {
// "id": "id of the NATS client",
// "subject": "name",
// "data": "Bob Morane"
// }
let args = NatsMessage {
id: nats_connection_id,
subject: subject,
data: data,
};
let json_str: String = serde_json::to_string(&args).unwrap();

// Copy the string value to the shared memory
let mut memory_json_str: Memory = extism_pdk::Memory::new(json_str.len());
memory_json_str.store(json_str);

// Call host function with the offset of the arguments
let offset: u64 = unsafe { hostNatsPublish(memory_json_str.offset) };

// Get result from the shared memory
// The host function returns a JSON buffer:
// {
// "success": "OK",
// "failure": "error message if error, else empty"
// }
let memory_result: Memory = extism_pdk::Memory::find(offset).unwrap();
let json_string:String = memory_result.to_string().unwrap();
let result: StringResult = serde_json::from_str(&json_string).unwrap();

if result.failure.is_empty() {
return Ok(result.success);
} else {
return Err(NatsError::MessageFailure.into());
}


}

#[plugin_fn]
pub fn publish(input: String) -> FnResult<u64> {

let nats_url : String = get_env("NATS_URL".to_string());
let nats_connection : Result<String, Error> = init_nats_connection("natsconn01".to_string(), nats_url);

match nats_connection {
Ok(value) => print("πŸ¦€ nats connection: ".to_string() + &value),
Err(error) => print("😑 error: ".to_string() + &error.to_string()),
}

match nats_publish("natsconn01".to_string(), "news".to_string(), input.to_string()) {
Ok(value) => print("πŸ¦€ πŸ™‚ ".to_string() + &value),
Err(error) => print("😑 error: ".to_string() + &error.to_string()),
}
Ok(0)
}
```

## Build

=== "Go"
```golang linenums="1"
tinygo build -scheduler=none --no-debug \
-o natspub.wasm \
-target wasi main.go
```

=== "Rust"
```rust linenums="1"
cargo clean
cargo build --release --target wasm32-wasi #--offline
ls -lh ./target/wasm32-wasi/release/*.wasm
cp ./target/wasm32-wasi/release/*.wasm .
```

## Run the plug-in to publish a message

```bash linenums="1"
export NATS_URL="nats://0.0.0.0:4222"
./slingshot run --wasm=./natspub.wasm \
--handler=publish \
--input="I πŸ’œ Wasm ✨"
```

On the subscriber side, you shoul read:
```bash linenums="1"
πŸ‘‹ message: {"subject":"news","data":"I πŸ’œ Wasm ✨"}
```
Loading

0 comments on commit e5f52d2

Please sign in to comment.