Skip to content

Commit

Permalink
sdk: update asyncio wit, simplify some methods
Browse files Browse the repository at this point in the history
  • Loading branch information
fuxiaohei committed Aug 23, 2024
1 parent 9d21d60 commit bccd0af
Show file tree
Hide file tree
Showing 11 changed files with 254 additions and 162 deletions.
28 changes: 14 additions & 14 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[workspace.package]
version = "0.5.0-rc.6"
version = "0.5.0-rc.7"
edition = "2021"
authors = ["fuxiaohei <fudong0797@gmail.com>"]

Expand Down
79 changes: 65 additions & 14 deletions lib/sdk/src/execution_ctx.rs
Original file line number Diff line number Diff line change
@@ -1,34 +1,78 @@
use super::http_service::land::asyncio::asyncio;
use std::sync::{Arc, Mutex};
use std::{
collections::HashMap,
sync::{Arc, Mutex},
};

type WaitUntilHandler = Box<dyn Fn() + Send + 'static>;

struct Inner {
pub handlers: Vec<(u32, WaitUntilHandler)>,
pub handlers: HashMap<u32, WaitUntilHandler>,
}

impl Inner {
pub fn new() -> Self {
Self { handlers: vec![] }
Self {
handlers: HashMap::new(),
}
}
pub fn wait_until(&mut self, f: WaitUntilHandler) {
let seq_id = asyncio::new().unwrap();
self.handlers.push((seq_id, f));
self.handlers.insert(seq_id, f);
}
pub fn execute(&mut self) {
let current = self.handlers.pop();
if let Some((seq_id, handler)) = current {

/// sleep add empty sleep task to asyncio task with seq_id
pub fn sleep(&mut self, ms: u32) -> u32 {
asyncio::sleep(ms).unwrap()
}

/// sleep_callback add callback function to asyncio task with seq_id
pub fn sleep_callback(&mut self, seq_id: u32, f: WaitUntilHandler) {
self.handlers.insert(seq_id, f);
}

/*
fn execute_runnable(&mut self) -> bool {
let (handle, is_wait) = asyncio::select();
if self.handlers.is_empty() {
return false;
}
while let Some(idx) = self
.handlers
.iter()
.position(|(handle, _)| asyncio::is_runnable(*handle))
{
let (seq_id, handler) = self.handlers.remove(idx).unwrap();
println!("asyncio->execute_runnable: {:?}", seq_id);
handler();
asyncio::finish(seq_id);
} else {
// if nothing pop, check is-pending to wait sleep timer tasks
if asyncio::is_pending() {
asyncio::wait();
}
return true;
}
return false;
}*/

pub fn execute(&mut self) {
let (handle, is_wait) = asyncio::select();
if !is_wait {
return;
}
// no handle to run, but is-wait=true, do wait
if handle.is_none() {
asyncio::ready();
// after ready, select runnable when next time
return;
}
let handle = handle.unwrap();
let handler = self.handlers.remove(&handle);
if let Some(handler) = handler {
// call callback function
handler();
}
}
pub fn is_pending(&self) -> bool {
!self.handlers.is_empty() || asyncio::is_pending()
!self.handlers.is_empty()
}
}

Expand Down Expand Up @@ -114,7 +158,14 @@ impl ExecutionCtx {
}
/// `sleep` sleep for `ms` milliseconds in hostcall tokio spawn task
pub fn sleep(&self, ms: u32) -> u32 {
asyncio::sleep(ms).unwrap()
self.inner.lock().unwrap().sleep(ms)
}
/// `sleep_callback` add callback function to asyncio task with seq_id
pub fn sleep_callback<F>(&self, id: u32, f: F)
where
F: Fn() + 'static + Send,
{
self.inner.lock().unwrap().sleep_callback(id, Box::new(f));
}
}

Expand Down
71 changes: 36 additions & 35 deletions lib/sdk/src/http_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,48 +90,50 @@ pub mod land {
}
}
#[allow(unused_unsafe, clippy::all)]
/// finish async task
pub fn finish(handle: Handle) {
unsafe {
#[cfg(target_arch = "wasm32")]
#[link(wasm_import_module = "land:asyncio/asyncio")]
extern "C" {
#[link_name = "finish"]
fn wit_import(_: i32);
}
#[cfg(not(target_arch = "wasm32"))]
fn wit_import(_: i32) {
unreachable!()
}
wit_import(_rt::as_i32(handle));
}
}
#[allow(unused_unsafe, clippy::all)]
/// is-pending checks if the task is pending
pub fn is_pending() -> bool {
/// select async task, return task handle or need waiting
pub fn select() -> (Option<Handle>, bool) {
unsafe {
#[repr(align(4))]
struct RetArea([::core::mem::MaybeUninit<u8>; 12]);
let mut ret_area = RetArea([::core::mem::MaybeUninit::uninit(); 12]);
let ptr0 = ret_area.0.as_mut_ptr().cast::<u8>();
#[cfg(target_arch = "wasm32")]
#[link(wasm_import_module = "land:asyncio/asyncio")]
extern "C" {
#[link_name = "is-pending"]
fn wit_import() -> i32;
#[link_name = "select"]
fn wit_import(_: *mut u8);
}
#[cfg(not(target_arch = "wasm32"))]
fn wit_import() -> i32 {
fn wit_import(_: *mut u8) {
unreachable!()
}
let ret = wit_import();
_rt::bool_lift(ret as u8)
wit_import(ptr0);
let l1 = i32::from(*ptr0.add(0).cast::<u8>());
let l3 = i32::from(*ptr0.add(8).cast::<u8>());
(
match l1 {
0 => None,
1 => {
let e = {
let l2 = *ptr0.add(4).cast::<i32>();
l2 as u32
};
Some(e)
}
_ => _rt::invalid_enum_discriminant(),
},
_rt::bool_lift(l3 as u8),
)
}
}
#[allow(unused_unsafe, clippy::all)]
/// wait for async task to finish
pub fn wait() {
/// ready wating async task ready
pub fn ready() {
unsafe {
#[cfg(target_arch = "wasm32")]
#[link(wasm_import_module = "land:asyncio/asyncio")]
extern "C" {
#[link_name = "wait"]
#[link_name = "ready"]
fn wit_import();
}
#[cfg(not(target_arch = "wasm32"))]
Expand Down Expand Up @@ -1098,8 +1100,8 @@ mod _rt {
#[cfg(target_arch = "wasm32")]
#[link_section = "component-type:wit-bindgen:0.30.0:http-service-with-all-of-its-exports-removed:encoded world"]
#[doc(hidden)]
pub static __WIT_BINDGEN_COMPONENT_TYPE: [u8; 1359] = *b"\
\0asm\x0d\0\x01\0\0\x19\x16wit-component-encoding\x04\0\x07\xac\x09\x01A\x02\x01\
pub static __WIT_BINDGEN_COMPONENT_TYPE: [u8; 1340] = *b"\
\0asm\x0d\0\x01\0\0\x19\x16wit-component-encoding\x04\0\x07\x99\x09\x01A\x02\x01\
A\x10\x01B\x16\x01{\x04\0\x0bstatus-code\x03\0\0\x01s\x04\0\x06method\x03\0\x02\x01\
o\x02ss\x01p\x04\x04\0\x07headers\x03\0\x05\x01s\x04\0\x03uri\x03\0\x07\x01y\x04\
\0\x0bbody-handle\x03\0\x09\x01k\x0a\x01r\x04\x06method\x03\x03uri\x08\x07header\
Expand All @@ -1125,12 +1127,11 @@ response\x03\0\x02\x02\x03\x02\x01\x05\x04\0\x0drequest-error\x03\0\x04\x02\x03\
ching\x05\x07\x01B\x02\x01y\x04\0\x06handle\x03\0\0\x03\x01\x12land:asyncio/type\
s\x05\x08\x02\x03\0\x03\x06handle\x01B\x0d\x02\x03\x02\x01\x09\x04\0\x06handle\x03\
\0\0\x01j\x01\x01\0\x01@\0\0\x02\x04\0\x03new\x01\x03\x01@\x01\x02msy\0\x02\x04\0\
\x05sleep\x01\x04\x01@\x01\x06handle\x01\x01\0\x04\0\x06finish\x01\x05\x01@\0\0\x7f\
\x04\0\x0ais-pending\x01\x06\x01@\0\x01\0\x04\0\x04wait\x01\x07\x03\x01\x14land:\
asyncio/asyncio\x05\x0a\x04\x018land:worker/http-service-with-all-of-its-exports\
-removed\x04\0\x0b2\x01\0,http-service-with-all-of-its-exports-removed\x03\0\0\0\
G\x09producers\x01\x0cprocessed-by\x02\x0dwit-component\x070.215.0\x10wit-bindge\
n-rust\x060.30.0";
\x05sleep\x01\x04\x01k\x01\x01o\x02\x05\x7f\x01@\0\0\x06\x04\0\x06select\x01\x07\
\x01@\0\x01\0\x04\0\x05ready\x01\x08\x03\x01\x14land:asyncio/asyncio\x05\x0a\x04\
\x018land:worker/http-service-with-all-of-its-exports-removed\x04\0\x0b2\x01\0,h\
ttp-service-with-all-of-its-exports-removed\x03\0\0\0G\x09producers\x01\x0cproce\
ssed-by\x02\x0dwit-component\x070.215.0\x10wit-bindgen-rust\x060.30.0";
#[inline(never)]
#[doc(hidden)]
pub fn __link_custom_section_describing_imports() {
Expand Down
10 changes: 8 additions & 2 deletions lib/wasm-host/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ pub struct Context {
table: ResourceTable,
host_ctx: HostContext,
pub limiter: Limiter,
req_id: String,
}

impl WasiView for Context {
Expand All @@ -56,12 +57,12 @@ impl WasiView for Context {

impl Default for Context {
fn default() -> Self {
Self::new(None)
Self::new(None, String::new())
}
}

impl Context {
pub fn new(envs: Option<HashMap<String, String>>) -> Self {
pub fn new(envs: Option<HashMap<String, String>>, req_id: String) -> Self {
let table = ResourceTable::new();
let mut wasi_ctx_builder = WasiCtxBuilder::new();
wasi_ctx_builder.inherit_stdio();
Expand All @@ -76,6 +77,7 @@ impl Context {
host_ctx: HostContext::new(),
limiter: Limiter::default(),
table,
req_id,
}
}
/// get host_ctx
Expand All @@ -94,4 +96,8 @@ impl Context {
pub fn elapsed(&self) -> tokio::time::Duration {
self.host_ctx.elapsed()
}
/// req_id returns the request id
pub fn req_id(&self) -> &str {
&self.req_id
}
}
Loading

0 comments on commit bccd0af

Please sign in to comment.