Skip to content

Commit

Permalink
Merge pull request #2348 from Agoric/mfig-fewer-threads
Browse files Browse the repository at this point in the history
fix: remove unnecessary use of threads from Node-Golang bridge
  • Loading branch information
michaelfig authored Feb 5, 2021
2 parents 6b7ee0f + 5ce3ce5 commit b529446
Showing 1 changed file with 27 additions and 48 deletions.
75 changes: 27 additions & 48 deletions golang/cosmos/src/agcosmosdaemon-node.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,6 @@ namespace ss {

static std::shared_ptr<ThreadSafeCallback> dispatcher;

class NodeReply {
public:
NodeReply(bool isRejection, std::string value) :
_isRejection(isRejection), _value(value) {}
std::string value() {
return _value;
}
bool isRejection() {
return _isRejection;
}
private:
bool _isRejection;
std::string _value;
};

class NodeReplier : public Napi::ObjectWrap<NodeReplier> {
public:
static Napi::Object Init(Napi::Env env, Napi::Object exports) {
Expand All @@ -42,18 +27,28 @@ class NodeReplier : public Napi::ObjectWrap<NodeReplier> {
// Do nothing, we're initialized by New.
}

static Napi::Object New(Napi::Env env, std::shared_ptr<std::promise<NodeReply>> promise) {
static Napi::Object New(Napi::Env env, int replyPort) {
Napi::Object obj = constructor.New({});
NodeReplier* self = Unwrap(obj);
self->_promise.swap(promise);
self->replyPort_ = replyPort;
return obj;
}

private:
static Napi::FunctionReference constructor;
void doReply(bool isRejection, std::string value) {
NodeReply reply(isRejection, value);
_promise->set_value(reply);
try {
// std::cerr << "Replying to Go with " << value << " " << isRejection << std::endl;
if (replyPort_) {
ReplyToGo(replyPort_, isRejection, value.c_str());
}
} catch (std::exception& e) {
// std::cerr << "Exceptioning " << e.what() << std::endl;
if (replyPort_) {
ReplyToGo(replyPort_, true, e.what());
}
}
// std::cerr << "Replier is finished" << std::endl;
}

void Resolve(const Napi::CallbackInfo& info) {
Expand All @@ -64,46 +59,30 @@ class NodeReplier : public Napi::ObjectWrap<NodeReplier> {
doReply(true, info[0].As<Napi::String>().Utf8Value());
}

std::shared_ptr<std::promise<NodeReply>> _promise;
int replyPort_;
};

Napi::FunctionReference NodeReplier::constructor;

static int daemonPort = -1;
int SendToNode(int port, int replyPort, Body str) {
//std::cerr << "Send to node port " << port << " " << str << std::endl;
// std::cerr << "Send to node port " << port << " " << str << std::endl;
// FIXME: Make a better bootstrap, honouring an AG_COSMOS_START message.
if (daemonPort < 0) {
daemonPort = replyPort;
}
std::string instr(str);
std::thread([instr, port, replyPort]{
auto promise = std::make_shared<std::promise<NodeReply>>();
dispatcher->call(
// Prepare arguments.
[port, instr, promise](Napi::Env env, std::vector<napi_value>& args){
// std::cerr << "Calling threadsafe callback with " << instr << std::endl;
args = {
Napi::Number::New(env, port),
Napi::String::New(env, instr),
NodeReplier::New(env, promise),
};
});
// std::cerr << "Waiting on future" << std::endl;
try {
NodeReply ret = promise->get_future().get();
// std::cerr << "Replying to Go with " << ret.value() << " " << ret.isRejection() << std::endl;
if (replyPort) {
ReplyToGo(replyPort, ret.isRejection(), ret.value().c_str());
}
} catch (std::exception& e) {
// std::cerr << "Exceptioning " << e.what() << std::endl;
if (replyPort) {
ReplyToGo(replyPort, true, e.what());
}
}
// std::cerr << "Thread is finished" << std::endl;
}).detach();
// This call is queued on the main thread, so we can return immediately to Golang.
dispatcher->call(
// Prepare arguments.
[instr, port, replyPort](Napi::Env env, std::vector<napi_value>& args){
// std::cerr << "Calling threadsafe callback with " << instr << std::endl;
args = {
Napi::Number::New(env, port),
Napi::String::New(env, instr),
NodeReplier::New(env, replyPort),
};
});
// std::cerr << "Ending Send to Node " << str << std::endl;
return 0;
}
Expand Down

0 comments on commit b529446

Please sign in to comment.