-
Notifications
You must be signed in to change notification settings - Fork 6
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Documentation of the wasmer plugin prototype (wip)
- Loading branch information
Showing
12 changed files
with
650 additions
and
59 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,19 @@ | ||
# Conclusion | ||
|
||
With the given approach plugins can be integrated easily into the host application, eg. in case of a `MessageProducer`: | ||
|
||
```rust,no_run | ||
let source_plugin_factory = SourcePluginFactory::new(); | ||
let source_plugin = source_plugin_factory.create(0).unwrap(); | ||
let source = ByteSourceProxy::new(source_plugin, ...); | ||
let dlt_plugin_factory = DltPluginFactory::new(); | ||
let dlt_plugin = dlt_plugin_factory.create(1).unwrap(); | ||
let dlt_parser = DltParserProxy::new(dlt_plugin, ...); | ||
let mut dlt_msg_producer = MessageProducer::new(dlt_parser, source, ...); | ||
``` | ||
|
||
Though, some difficulties with the current one-step processing logic of Chipmunk in contrast to performance saving bulk operations between the host and plugins have to be considered. | ||
|
||
Given the acceptable performance impact of this low-level WebAssembly approach using memory-copy, it could serve as a first integration step towards a plugin support for Chipmunk, however. |
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,165 @@ | ||
# Parser Plugin for Dlt | ||
|
||
The parser plugin for DLT is realized by a `wasm` module in `application/apps/indexer/plugin/dlt` together with a corresponding host proxy for the `Parser` trait in `application/apps/indexer/session/src/plugin`. | ||
|
||
In order to minimize actual calls to the plugin, each data chunk provided to the proxy is being processed in a bulk operation within the plugin. | ||
|
||
## DltParser WASM module | ||
|
||
The plugin will parse a given chunk of data and return all results to the host. | ||
|
||
```rust,no_run | ||
// Singleton instance of the DltParser within the plugin. | ||
lazy_static! { | ||
static ref PARSER: Mutex<DltParser<'static>> = DltParser::default().into(); | ||
} | ||
#[repr(C)] | ||
pub struct Response(*mut u8, u32); | ||
#[allow(clippy::missing_safety_doc)] | ||
#[no_mangle] | ||
pub unsafe extern "C" fn message(ptr: *const u8, len: u32) -> Response { | ||
// Get the instance of our parser. | ||
let mut parser = PARSER.lock().unwrap_or_else(|error| panic("lock", &error.to_string())); | ||
// Deserialize the request from the module's memory. | ||
let input = unsafe { std::slice::from_raw_parts(ptr, len.try_into().unwrap()) }; | ||
let request = rkyv::from_bytes(input).unwrap_or_else(|error| panic("from_bytes", &error.to_string())); | ||
// Process the request. | ||
let response = match request { | ||
PluginRpc::Request(DltParserRpc::Setup(ParserSettings { | ||
with_storage_header, | ||
})) => { | ||
print("init parser"); | ||
parser.with_storage_header = with_storage_header; | ||
PluginRpc::Response(DltParserRpc::SetupDone) | ||
} | ||
PluginRpc::Request(DltParserRpc::Parse(ParseInput { bytes })) => { | ||
let response: PluginRpc<DltParserRpc>; | ||
let mut results: Vec<ParserResult> = Vec::new(); | ||
let mut input: &[u8] = &bytes; | ||
loop { | ||
match parser.parse(input, None) { | ||
Ok((rest, Some(result))) => { | ||
let bytes_remaining = rest.len(); | ||
let message = match result { | ||
ParseYield::Message(message) => { | ||
print(&format!("parse message ({} bytes remaining)", bytes_remaining)); | ||
Some(format!("{}", message)) // TODO | ||
} | ||
... | ||
}; | ||
results.push(ParserResult::ParseOk(ParseOutput { | ||
bytes_remaining, | ||
message, | ||
})); | ||
if rest.is_empty() { | ||
response = PluginRpc::Response(DltParserRpc::ParseResult(results)); | ||
break; | ||
} else { | ||
input = rest; | ||
} | ||
} | ||
... | ||
Err(ParserError::Parse(error)) => { | ||
// Ignore expected parse errors at end of provided data. | ||
if results.is_empty() { | ||
results.push(ParserResult::ParseError(error)); | ||
} | ||
response = PluginRpc::Response(DltParserRpc::ParseResult(results)); | ||
break; | ||
} | ||
}; | ||
} | ||
response | ||
} | ||
... | ||
}; | ||
// Serialize the response to the module's memory. | ||
let mut output = rkyv::to_bytes::<_, 256>(&response).unwrap_or_else(|error| panic("to_bytes", &error.to_string())); | ||
let ptr = output.as_mut_ptr(); | ||
let len = output.len(); | ||
mem::forget(output); | ||
Response(ptr, len as u32) | ||
} | ||
``` | ||
|
||
## DltParser Host proxy | ||
|
||
The proxy will store the results retrieved from the plugin and returns them to the host as long as unconsumed results are available. | ||
|
||
```rust,no_run | ||
impl DltParserProxy { | ||
pub fn new(mut proxy: PluginProxy, with_storage_header: bool) -> Self { | ||
// Init plugin with given settings. | ||
let request: PluginRpc<DltParserRpc> = | ||
PluginRpc::Request(DltParserRpc::Setup(ParserSettings { | ||
with_storage_header, | ||
})); | ||
let request_bytes = rkyv::to_bytes::<_, 256>(&request).unwrap(); | ||
match proxy.call(&request_bytes) { | ||
Ok(response_bytes) => { | ||
let response: PluginRpc<DltParserRpc> = rkyv::from_bytes(&response_bytes).unwrap(); | ||
if let PluginRpc::Response(DltParserRpc::SetupDone) = response { | ||
// nothing | ||
} | ||
... | ||
} | ||
... | ||
} | ||
Self { | ||
proxy, | ||
results: VecDeque::new(), // Current bulk results from plugin. | ||
} | ||
} | ||
// Returns the next item from stored results. | ||
fn next_result<'b>( | ||
&mut self, | ||
input: &'b [u8], | ||
) -> Option<Result<(&'b [u8], Option<ParseYield<DltProxyMessage>>), ParserError>> { | ||
... | ||
} | ||
} | ||
impl Parser<DltProxyMessage> for DltParserProxy { | ||
fn parse<'b>( | ||
&mut self, | ||
input: &'b [u8], | ||
_timestamp: Option<u64>, | ||
) -> Result<(&'b [u8], Option<ParseYield<DltProxyMessage>>), ParserError> { | ||
// As long as we have stored results we return next item. | ||
if let Some(result) = self.next_result(input) { | ||
return result; | ||
} | ||
// Create request with next chunk of data to be parsed. | ||
let request: PluginRpc<DltParserRpc> = | ||
PluginRpc::Request(DltParserRpc::Parse(ParseInput { | ||
bytes: input.to_vec(), | ||
})); | ||
let request_bytes = rkyv::to_bytes::<_, 256>(&request).unwrap(); | ||
match self.proxy.call(&request_bytes) { | ||
Ok(response_bytes) => { | ||
let response: PluginRpc<DltParserRpc> = rkyv::from_bytes(&response_bytes).unwrap(); | ||
if let PluginRpc::Response(DltParserRpc::ParseResult(results)) = response { | ||
// Store results and return first item, if any. | ||
self.results = VecDeque::from(results); | ||
if let Some(result) = self.next_result(input) { | ||
result | ||
} | ||
... | ||
} | ||
... | ||
} | ||
} | ||
} | ||
``` |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,151 @@ | ||
# Plugin Factory | ||
|
||
In `application/apps/indexer/plugin/host` some generic host components are defined, such as the trait for a plugin factory and its actual implementations for `wasm` and `wasi` plugins: | ||
|
||
```rust,no_run | ||
pub trait PluginFactory { | ||
fn create(&self, id: PluginId) -> Result<PluginProxy, PluginError>; | ||
} | ||
``` | ||
|
||
## Wasm Plugin Factory | ||
|
||
The implementation for a plain `wasm` plugin factory would get a precompiled WebAssembly binary and instantiate a proxy based on the `wasmer` runtime: | ||
|
||
```rust,no_run | ||
use wasmer::{ | ||
imports, AsStoreRef, Function, FunctionEnv, FunctionEnvMut, Instance, Memory, Module, Store, | ||
TypedFunction, WasmPtr, WasmSlice, | ||
}; | ||
... | ||
impl WasmPluginFactory { | ||
pub fn new(binary: Vec<u8>) -> Self { | ||
WasmPluginFactory { binary } | ||
} | ||
} | ||
impl PluginFactory for WasmPluginFactory { | ||
fn create(&self, id: PluginId) -> Result<PluginProxy, PluginError> { | ||
let mut store = Store::default(); | ||
// Load the module from the precompiled binary. | ||
let module = Module::from_binary(&store, &self.binary).expect("from_binary"); | ||
// Create a plugin environment to be used at runtime. | ||
let plugin_env = FunctionEnv::new(&mut store, PluginEnv { id, memory: None }); | ||
// Add a debug-print function from the host to be imported in the plugin. | ||
let host_print = Function::new_typed_with_env( | ||
&mut store, | ||
&plugin_env, | ||
|env: FunctionEnvMut<PluginEnv>, ptr: WasmPtr<u8>, len: u32| { | ||
let store = env.as_store_ref(); | ||
let memory = env.data().memory.as_ref().unwrap(); | ||
let memory_view = memory.view(store.borrow()); | ||
let string = ptr.read_utf8_string(&memory_view, len).unwrap(); | ||
debug!("proxy<{}> : {}", env.data().id, string); | ||
}, | ||
); | ||
let imports = imports! { | ||
"host" => { | ||
"host_print" => host_print, | ||
}, | ||
}; | ||
// Create an instance of the plugin runtime. | ||
let instance = Instance::new(&mut store, &module, &imports).expect("instance"); | ||
// Add the instance's memory to the plugin-env so it could be accessed from import functions. | ||
let env = plugin_env.as_mut(&mut store); | ||
env.memory = Some( | ||
instance.exports.get_memory("memory").expect("memory").clone(), | ||
); | ||
Ok(PluginProxy::new(id, store, instance)) | ||
} | ||
} | ||
``` | ||
|
||
## Wasi Plugin Factory | ||
|
||
The implementation for a `wasi` plugin factory would respectively get a precompiled WebAssembly binary and instantiate a proxy based on the `wasmer` runtime and a `wasmer_wasix` environment to support eg. system I/O operations: | ||
|
||
```rust,no_run | ||
use wasmer::{ | ||
AsStoreRef, Function, FunctionEnv, FunctionEnvMut, Instance, Memory, Module, Store, | ||
TypedFunction, WasmPtr, WasmSlice, | ||
}; | ||
use wasmer_wasix::{default_fs_backing, WasiEnv}; | ||
impl WasiPluginFactory { | ||
pub fn new(binary: Vec<u8>) -> Self { | ||
WasiPluginFactory { binary } | ||
} | ||
} | ||
impl PluginFactory for WasiPluginFactory { | ||
fn create(&self, id: PluginId) -> Result<PluginProxy, PluginError> { | ||
let mut store = Store::default(); | ||
// Load the module from the precompiled binary. | ||
let module = Module::from_binary(&store, &self.binary).expect("from_binary"); | ||
// Create a wasi environment to be used at runtime and map pre-opened host file handles. | ||
let mut wasi_env = WasiEnv::builder(format!("wasi-proxy<{}>", id)) | ||
.fs(default_fs_backing()) | ||
.preopen_dir(Path::new("/")) // NOTE: Map currently only the working-directory of the application. | ||
.expect("preopen_dir") | ||
.map_dir("/", ".") | ||
.expect("map_dir") | ||
.finalize(&mut store) | ||
.expect("finalize_env"); | ||
let mut imports = wasi_env | ||
.import_object(&mut store, &module) | ||
.expect("imports"); | ||
// Create a plugin environment to be used at runtime. | ||
let plugin_env = FunctionEnv::new(&mut store, PluginEnv { id, memory: None }); | ||
// Add a debug-print function from the host to be imported in the plugin. | ||
let host_print = Function::new_typed_with_env( | ||
&mut store, | ||
&plugin_env, | ||
|env: FunctionEnvMut<PluginEnv>, ptr: WasmPtr<u8>, len: u32| { | ||
let store = env.as_store_ref(); | ||
let memory = env.data().memory.as_ref().unwrap(); | ||
let memory_view = memory.view(store.borrow()); | ||
let string = ptr.read_utf8_string(&memory_view, len).unwrap(); | ||
debug!("proxy<{}> : {}", env.data().id, string); | ||
}, | ||
); | ||
imports.define("host", "host_print", host_print); | ||
// Create an instance of the plugin runtime. | ||
let instance = Instance::new(&mut store, &module, &imports).expect("instance"); | ||
// Initialize the wasi environment | ||
wasi_env | ||
.initialize(&mut store, instance.clone()) | ||
.expect("initialize_env"); | ||
// Add the instance's memory to the plugin-env so it could be accessed from import functions. | ||
let env = plugin_env.as_mut(&mut store); | ||
env.memory = Some( | ||
instance.exports.get_memory("memory").expect("memory").clone(), | ||
); | ||
// Start the wasi plugin as a reactor. | ||
let start = instance | ||
.exports | ||
.get_function("_initialize") | ||
.expect("exports"); | ||
start.call(&mut store, &[]).expect("start"); | ||
Ok(PluginProxy::new(id, store, instance)) | ||
} | ||
} | ||
``` |
Oops, something went wrong.