Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow execution of multiple instances of the same plugin. #78

Merged
merged 2 commits into from
Oct 30, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 12 additions & 9 deletions include/proxy-wasm/context.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,20 +50,23 @@ struct PluginBase {
std::string_view runtime, std::string_view plugin_configuration, bool fail_open)
: name_(std::string(name)), root_id_(std::string(root_id)), vm_id_(std::string(vm_id)),
runtime_(std::string(runtime)), plugin_configuration_(plugin_configuration),
fail_open_(fail_open) {}
fail_open_(fail_open), key_(root_id_ + "||" + plugin_configuration_) {}

const std::string name_;
const std::string root_id_;
const std::string vm_id_;
const std::string runtime_;
std::string plugin_configuration_;
const std::string plugin_configuration_;
const bool fail_open_;

const std::string &key() const { return key_; }
const std::string &log_prefix() const { return log_prefix_; }

private:
std::string makeLogPrefix() const;

std::string log_prefix_;
const std::string key_;
const std::string log_prefix_;
};

struct BufferBase : public BufferInterface {
Expand Down Expand Up @@ -373,16 +376,16 @@ class ContextBase : public RootInterface,
protected:
friend class WasmBase;

void initializeRootBase(WasmBase *wasm, std::shared_ptr<PluginBase> plugin);
std::string makeRootLogPrefix(std::string_view vm_id) const;

WasmBase *wasm_{nullptr};
uint32_t id_{0};
uint32_t parent_context_id_{0}; // 0 for roots and the general context.
ContextBase *parent_context_{nullptr}; // set in all contexts.
std::string root_id_; // set only in root context.
std::string root_log_prefix_; // set only in root context.
std::shared_ptr<PluginBase> plugin_;
uint32_t parent_context_id_{0}; // 0 for roots and the general context.
ContextBase *parent_context_{nullptr}; // set in all contexts.
std::string root_id_; // set only in root context.
std::string root_log_prefix_; // set only in root context.
std::shared_ptr<PluginBase> plugin_; // set in root and stream contexts.
std::shared_ptr<PluginBase> temp_plugin_; // Remove once ABI v0.1.0 is gone.
bool in_vm_context_created_ = false;
bool destroyed_ = false;
};
Expand Down
9 changes: 5 additions & 4 deletions include/proxy-wasm/wasm.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,7 @@ class WasmBase : public std::enable_shared_from_this<WasmBase> {
std::string_view vm_key() const { return vm_key_; }
WasmVm *wasm_vm() const { return wasm_vm_.get(); }
ContextBase *vm_context() const { return vm_context_.get(); }
ContextBase *getRootContext(std::string_view root_id);
ContextBase *getOrCreateRootContext(const std::shared_ptr<PluginBase> &plugin);
ContextBase *getRootContext(const std::shared_ptr<PluginBase> &plugin, bool allow_closed);
ContextBase *getContext(uint32_t id) {
auto it = contexts_.find(id);
if (it != contexts_.end())
Expand All @@ -78,6 +77,7 @@ class WasmBase : public std::enable_shared_from_this<WasmBase> {
void timerReady(uint32_t root_context_id);
void queueReady(uint32_t root_context_id, uint32_t token);

void startShutdown(const std::shared_ptr<PluginBase> &plugin);
void startShutdown();
WasmResult done(ContextBase *root_context);
void finishShutdown();
Expand Down Expand Up @@ -164,11 +164,12 @@ class WasmBase : public std::enable_shared_from_this<WasmBase> {
uint32_t next_context_id_ = 1; // 0 is reserved for the VM context.
std::shared_ptr<ContextBase> vm_context_; // Context unrelated to any specific root or stream
// (e.g. for global constructors).
std::unordered_map<std::string, std::unique_ptr<ContextBase>> root_contexts_;
std::unordered_map<std::string, std::unique_ptr<ContextBase>> root_contexts_; // Root contexts.
std::unordered_map<std::string, std::unique_ptr<ContextBase>> pending_done_; // Root contexts.
std::unordered_set<std::unique_ptr<ContextBase>> pending_delete_; // Root contexts.
std::unordered_map<uint32_t, ContextBase *> contexts_; // Contains all contexts.
std::unordered_map<uint32_t, std::chrono::milliseconds> timer_period_; // per root_id.
std::unique_ptr<ShutdownHandle> shutdown_handle_;
std::unordered_set<ContextBase *> pending_done_; // Root contexts not done during shutdown.

WasmCallVoid<0> _initialize_; /* Emscripten v1.39.17+ */
WasmCallVoid<0> _start_; /* Emscripten v1.39.0+ */
Expand Down
27 changes: 10 additions & 17 deletions src/context.cc
Original file line number Diff line number Diff line change
Expand Up @@ -269,8 +269,10 @@ ContextBase::ContextBase(WasmBase *wasm) : wasm_(wasm), parent_context_(this) {
wasm_->contexts_[id_] = this;
}

ContextBase::ContextBase(WasmBase *wasm, std::shared_ptr<PluginBase> plugin) {
initializeRootBase(wasm, plugin);
ContextBase::ContextBase(WasmBase *wasm, std::shared_ptr<PluginBase> plugin)
: wasm_(wasm), id_(wasm->allocContextId()), parent_context_(this), root_id_(plugin->root_id_),
root_log_prefix_(makeRootLogPrefix(plugin->vm_id_)), plugin_(plugin) {
wasm_->contexts_[id_] = this;
}

// NB: wasm can be nullptr if it failed to be created successfully.
Expand All @@ -288,15 +290,6 @@ WasmVm *ContextBase::wasmVm() const { return wasm_->wasm_vm(); }

bool ContextBase::isFailed() { return !wasm_ || wasm_->isFailed(); }

void ContextBase::initializeRootBase(WasmBase *wasm, std::shared_ptr<PluginBase> plugin) {
wasm_ = wasm;
id_ = wasm->allocContextId();
root_id_ = plugin->root_id_;
root_log_prefix_ = makeRootLogPrefix(plugin->vm_id_);
parent_context_ = this;
wasm_->contexts_[id_] = this;
}

std::string ContextBase::makeRootLogPrefix(std::string_view vm_id) const {
std::string prefix;
if (!root_id_.empty()) {
Expand All @@ -315,10 +308,10 @@ bool ContextBase::onStart(std::shared_ptr<PluginBase> plugin) {
DeferAfterCallActions actions(this);
bool result = true;
if (wasm_->on_context_create_) {
plugin_ = plugin;
temp_plugin_ = plugin;
wasm_->on_context_create_(this, id_, 0);
in_vm_context_created_ = true;
plugin_.reset();
temp_plugin_.reset();
}
if (wasm_->on_vm_start_) {
// Do not set plugin_ as the on_vm_start handler should be independent of the plugin since the
Expand Down Expand Up @@ -350,11 +343,11 @@ bool ContextBase::onConfigure(std::shared_ptr<PluginBase> plugin) {
}

DeferAfterCallActions actions(this);
plugin_ = plugin;
temp_plugin_ = plugin;
auto result =
wasm_->on_configure_(this, id_, static_cast<uint32_t>(plugin->plugin_configuration_.size()))
.u64_ != 0;
plugin_.reset();
temp_plugin_.reset();
return result;
}

Expand Down Expand Up @@ -644,8 +637,8 @@ WasmResult ContextBase::setTimerPeriod(std::chrono::milliseconds period,
}

ContextBase::~ContextBase() {
// Do not remove vm or root contexts which have the same lifetime as wasm_.
if (parent_context_id_) {
// Do not remove vm context which has the same lifetime as wasm_.
if (id_) {
wasm_->contexts_.erase(id_);
}
}
Expand Down
100 changes: 59 additions & 41 deletions src/wasm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,11 @@ WasmBase::WasmBase(std::unique_ptr<WasmVm> wasm_vm, std::string_view vm_id,
}
}

WasmBase::~WasmBase() {}
WasmBase::~WasmBase() {
root_contexts_.clear();
pending_done_.clear();
pending_delete_.clear();
}

bool WasmBase::initialize(const std::string &code, bool allow_precompiled) {
if (!wasm_vm_) {
Expand Down Expand Up @@ -319,22 +323,19 @@ bool WasmBase::initialize(const std::string &code, bool allow_precompiled) {
return !isFailed();
}

ContextBase *WasmBase::getRootContext(std::string_view root_id) {
auto it = root_contexts_.find(std::string(root_id));
if (it == root_contexts_.end()) {
return nullptr;
ContextBase *WasmBase::getRootContext(const std::shared_ptr<PluginBase> &plugin,
bool allow_closed) {
auto it = root_contexts_.find(plugin->key());
if (it != root_contexts_.end()) {
return it->second.get();
}
return it->second.get();
}

ContextBase *WasmBase::getOrCreateRootContext(const std::shared_ptr<PluginBase> &plugin) {
auto root_context = getRootContext(plugin->root_id_);
if (!root_context) {
auto context = std::unique_ptr<ContextBase>(createRootContext(plugin));
root_context = context.get();
root_contexts_[plugin->root_id_] = std::move(context);
if (allow_closed) {
it = pending_done_.find(plugin->key());
if (it != pending_done_.end()) {
return it->second.get();
}
}
return root_context;
return nullptr;
}

void WasmBase::startVm(ContextBase *root_context) {
Expand All @@ -352,15 +353,14 @@ bool WasmBase::configure(ContextBase *root_context, std::shared_ptr<PluginBase>
}

ContextBase *WasmBase::start(std::shared_ptr<PluginBase> plugin) {
auto root_id = plugin->root_id_;
auto it = root_contexts_.find(root_id);
auto it = root_contexts_.find(plugin->key());
if (it != root_contexts_.end()) {
it->second->onStart(plugin);
return it->second.get();
}
auto context = std::unique_ptr<ContextBase>(createRootContext(plugin));
auto context_ptr = context.get();
root_contexts_[root_id] = std::move(context);
root_contexts_[plugin->key()] = std::move(context);
if (!context_ptr->onStart(plugin)) {
return nullptr;
}
Expand All @@ -377,38 +377,49 @@ uint32_t WasmBase::allocContextId() {
}
}

void WasmBase::startShutdown() {
bool all_done = true;
for (auto &p : root_contexts_) {
if (!p.second->onDone()) {
all_done = false;
pending_done_.insert(p.second.get());
void WasmBase::startShutdown(const std::shared_ptr<PluginBase> &plugin) {
auto it = root_contexts_.find(plugin->key());
if (it != root_contexts_.end()) {
if (it->second->onDone()) {
it->second->onDelete();
} else {
pending_done_[it->first] = std::move(it->second);
}
root_contexts_.erase(it);
}
if (!all_done) {
shutdown_handle_ = std::make_unique<ShutdownHandle>(shared_from_this());
} else {
finishShutdown();
}

void WasmBase::startShutdown() {
auto it = root_contexts_.begin();
while (it != root_contexts_.end()) {
if (it->second->onDone()) {
it->second->onDelete();
} else {
pending_done_[it->first] = std::move(it->second);
}
it = root_contexts_.erase(it);
}
}

WasmResult WasmBase::done(ContextBase *root_context) {
auto it = pending_done_.find(root_context);
auto it = pending_done_.find(root_context->plugin_->key());
if (it == pending_done_.end()) {
return WasmResult::NotFound;
}
pending_delete_.insert(std::move(it->second));
pending_done_.erase(it);
if (pending_done_.empty() && shutdown_handle_) {
// Defer the delete so that onDelete is not called from within the done() handler.
addAfterVmCallAction(
[shutdown_handle = shutdown_handle_.release()]() { delete shutdown_handle; });
}
// Defer the delete so that onDelete is not called from within the done() handler.
shutdown_handle_ = std::make_unique<ShutdownHandle>(shared_from_this());
addAfterVmCallAction(
[shutdown_handle = shutdown_handle_.release()]() { delete shutdown_handle; });
return WasmResult::Ok;
}

void WasmBase::finishShutdown() {
for (auto &p : root_contexts_) {
p.second->onDelete();
auto it = pending_delete_.begin();
while (it != pending_delete_.end()) {
(*it)->onDelete();
it = pending_delete_.erase(it);
}
}

Expand Down Expand Up @@ -520,11 +531,18 @@ getOrCreateThreadLocalWasm(std::shared_ptr<WasmHandleBase> base_wasm,
WasmHandleCloneFactory clone_factory) {
auto wasm_handle = getThreadLocalWasm(base_wasm->wasm()->vm_key());
if (wasm_handle) {
auto root_context = wasm_handle->wasm()->getOrCreateRootContext(plugin);
if (!wasm_handle->wasm()->configure(root_context, plugin)) {
base_wasm->wasm()->fail(FailState::ConfigureFailed,
"Failed to configure thread-local Wasm code");
return nullptr;
auto root_context = wasm_handle->wasm()->getRootContext(plugin, false);
if (!root_context) {
root_context = wasm_handle->wasm()->start(plugin);
if (!root_context) {
base_wasm->wasm()->fail(FailState::StartFailed, "Failed to start thread-local Wasm");
return nullptr;
}
if (!wasm_handle->wasm()->configure(root_context, plugin)) {
base_wasm->wasm()->fail(FailState::ConfigureFailed,
"Failed to configure thread-local Wasm plugin");
return nullptr;
}
}
return wasm_handle;
}
Expand Down