Skip to content

Commit

Permalink
SERVER-19697 AsyncCommand responses return proper elapsed time
Browse files Browse the repository at this point in the history
  • Loading branch information
samantharitter committed Aug 14, 2015
1 parent 35cb5b7 commit 3211eea
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 55 deletions.
20 changes: 9 additions & 11 deletions src/mongo/executor/network_interface_asio.h
Original file line number Diff line number Diff line change
Expand Up @@ -120,20 +120,16 @@ class NetworkInterfaceASIO final : public NetworkInterface {
*/
class AsyncCommand {
public:
AsyncCommand(AsyncConnection* conn);

// This method resets the Messages and associated information held inside
// an AsyncCommand so that it may be reused to run a new network roundtrip.
void reset();
AsyncCommand(AsyncConnection* conn, Message&& command, Date_t now);

NetworkInterfaceASIO::AsyncConnection& conn();

Message& toSend();
void setToSend(Message&& message);

Message& toRecv();
MSGHEADER::Value& header();

ResponseStatus response(rpc::Protocol protocol, Date_t now);

private:
NetworkInterfaceASIO::AsyncConnection* const _conn;

Expand All @@ -142,6 +138,8 @@ class NetworkInterfaceASIO final : public NetworkInterface {

// TODO: Investigate efficiency of storing header separately.
MSGHEADER::Value _header;

const Date_t _start;
};

/**
Expand All @@ -166,8 +164,10 @@ class NetworkInterfaceASIO final : public NetworkInterface {
// AsyncOp may run multiple commands over its lifetime (for example, an ismaster
// command, the command provided to the NetworkInterface via startCommand(), etc.)
// Calling beginCommand() resets internal state to prepare to run newCommand.
AsyncCommand& beginCommand(const RemoteCommandRequest& request, rpc::Protocol protocol);
AsyncCommand& beginCommand(Message&& newCommand);
AsyncCommand& beginCommand(const RemoteCommandRequest& request,
rpc::Protocol protocol,
Date_t now);
AsyncCommand& beginCommand(Message&& newCommand, Date_t now);
AsyncCommand& command();

void finish(const TaskExecutor::ResponseStatus& status);
Expand Down Expand Up @@ -232,8 +232,6 @@ class NetworkInterfaceASIO final : public NetworkInterface {
handler();
}

ResponseStatus _responseFromMessage(const Message& received, rpc::Protocol protocol);

// Connection
void _connect(AsyncOp* op);

Expand Down
7 changes: 3 additions & 4 deletions src/mongo/executor/network_interface_asio_auth.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ void NetworkInterfaceASIO::_runIsMaster(AsyncOp* op) {
requestBuilder.setCommandArgs(BSON("isMaster" << 1));

// Set current command to ismaster request and run
auto& cmd = op->beginCommand(std::move(*(requestBuilder.done())));
auto& cmd = op->beginCommand(std::move(*(requestBuilder.done())), now());

// Callback to parse protocol information out of received ismaster response
auto parseIsMaster = [this, op]() {
Expand Down Expand Up @@ -119,11 +119,10 @@ void NetworkInterfaceASIO::_authenticate(AsyncOp* op) {
// authenticateClient will use this to run auth-related commands over our connection.
auto runCommandHook = [this, op](executor::RemoteCommandRequest request,
auth::AuthCompletionHandler handler) {
auto& cmd = op->beginCommand(request, op->operationProtocol());
auto& cmd = op->beginCommand(request, op->operationProtocol(), now());

auto callAuthCompletionHandler = [this, op, handler]() {
auto authResponse =
_responseFromMessage(op->command().toRecv(), op->operationProtocol());
auto authResponse = op->command().response(op->operationProtocol(), now());
handler(authResponse);
};

Expand Down
51 changes: 22 additions & 29 deletions src/mongo/executor/network_interface_asio_command.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -124,12 +124,11 @@ void asyncRecvMessageBody(AsyncStreamInterface& stream,

} // namespace

NetworkInterfaceASIO::AsyncCommand::AsyncCommand(AsyncConnection* conn) : _conn(conn) {}

void NetworkInterfaceASIO::AsyncCommand::reset() {
// TODO: Optimize reuse of Messages to be more space-efficient.
_toSend.reset();
_toRecv.reset();
NetworkInterfaceASIO::AsyncCommand::AsyncCommand(AsyncConnection* conn,
Message&& command,
Date_t now)
: _conn(conn), _toSend(std::move(command)), _start(now) {
_toSend.header().setResponseTo(0);
}

NetworkInterfaceASIO::AsyncConnection& NetworkInterfaceASIO::AsyncCommand::conn() {
Expand All @@ -140,10 +139,6 @@ Message& NetworkInterfaceASIO::AsyncCommand::toSend() {
return _toSend;
}

void NetworkInterfaceASIO::AsyncCommand::setToSend(Message&& message) {
_toSend = std::move(message);
}

Message& NetworkInterfaceASIO::AsyncCommand::toRecv() {
return _toRecv;
}
Expand All @@ -152,22 +147,9 @@ MSGHEADER::Value& NetworkInterfaceASIO::AsyncCommand::header() {
return _header;
}

void NetworkInterfaceASIO::_startCommand(AsyncOp* op) {
LOG(3) << "running command " << op->request().cmdObj << " against database "
<< op->request().dbname << " across network to " << op->request().target.toString();
if (inShutdown()) {
return;
}

// _connect() will continue the state machine.
_connect(op);
}

ResponseStatus NetworkInterfaceASIO::_responseFromMessage(const Message& received,
rpc::Protocol protocol) {
ResponseStatus NetworkInterfaceASIO::AsyncCommand::response(rpc::Protocol protocol, Date_t now) {
auto& received = _toRecv;
try {
// TODO: elapsed isn't going to be correct here, SERVER-19697
auto start = now();
auto reply = rpc::makeReply(&received);

if (reply->getProtocol() != protocol) {
Expand All @@ -185,16 +167,27 @@ ResponseStatus NetworkInterfaceASIO::_responseFromMessage(const Message& receive
// unavoidable copy
auto ownedCommandReply = reply->getCommandReply().getOwned();
auto ownedReplyMetadata = reply->getMetadata().getOwned();
return ResponseStatus(
RemoteCommandResponse(ownedCommandReply, ownedReplyMetadata, now() - start));
return ResponseStatus(RemoteCommandResponse(
std::move(ownedCommandReply), std::move(ownedReplyMetadata), now - _start));
} catch (...) {
// makeReply can throw if the reply was invalid.
return exceptionToStatus();
}
}

void NetworkInterfaceASIO::_startCommand(AsyncOp* op) {
LOG(3) << "running command " << op->request().cmdObj << " against database "
<< op->request().dbname << " across network to " << op->request().target.toString();
if (inShutdown()) {
return;
}

// _connect() will continue the state machine.
_connect(op);
}

void NetworkInterfaceASIO::_beginCommunication(AsyncOp* op) {
auto& cmd = op->beginCommand(op->request(), op->operationProtocol());
auto& cmd = op->beginCommand(op->request(), op->operationProtocol(), now());

_asyncRunCommand(&cmd,
[this, op](std::error_code ec, size_t bytes) {
Expand All @@ -204,7 +197,7 @@ void NetworkInterfaceASIO::_beginCommunication(AsyncOp* op) {

void NetworkInterfaceASIO::_completedOpCallback(AsyncOp* op) {
// TODO: handle metadata readers.
auto response = _responseFromMessage(op->command().toRecv(), op->operationProtocol());
auto response = op->command().response(op->operationProtocol(), now());
_completeOperation(op, response);
}

Expand Down
17 changes: 6 additions & 11 deletions src/mongo/executor/network_interface_asio_operation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,25 +92,20 @@ void NetworkInterfaceASIO::AsyncOp::setConnection(AsyncConnection&& conn) {
}

NetworkInterfaceASIO::AsyncCommand& NetworkInterfaceASIO::AsyncOp::beginCommand(
Message&& newCommand) {
Message&& newCommand, Date_t now) {
// NOTE: We operate based on the assumption that AsyncOp's
// AsyncConnection does not change over its lifetime.
invariant(_connection.is_initialized());
if (_command.is_initialized()) {
// We can just reset our state if initialized.
_command->reset();
} else {
_command.emplace(_connection.get_ptr());
}
newCommand.header().setResponseTo(0);
_command->setToSend(std::move(newCommand));

// Construct a new AsyncCommand object for each command.
_command.emplace(_connection.get_ptr(), std::move(newCommand), now);
return _command.get();
}

NetworkInterfaceASIO::AsyncCommand& NetworkInterfaceASIO::AsyncOp::beginCommand(
const RemoteCommandRequest& request, rpc::Protocol protocol) {
const RemoteCommandRequest& request, rpc::Protocol protocol, Date_t now) {
auto newCommand = messageFromRequest(request, protocol);
return beginCommand(std::move(*newCommand));
return beginCommand(std::move(*newCommand), now);
}

NetworkInterfaceASIO::AsyncCommand& NetworkInterfaceASIO::AsyncOp::command() {
Expand Down

0 comments on commit 3211eea

Please sign in to comment.