Skip to content

Commit

Permalink
SERVER-19155 NetworkInterfaceASIO authenticates asynchronously
Browse files Browse the repository at this point in the history
  • Loading branch information
samantharitter committed Aug 13, 2015
1 parent a694a77 commit 5c30b38
Show file tree
Hide file tree
Showing 6 changed files with 141 additions and 67 deletions.
6 changes: 4 additions & 2 deletions src/mongo/executor/SConscript
Original file line number Diff line number Diff line change
Expand Up @@ -84,10 +84,12 @@ env.Library(
'network_interface_asio_operation.cpp',
],
LIBDEPS=[
'network_interface',
'task_executor_interface',
'$BUILD_DIR/mongo/client/authentication',
'$BUILD_DIR/mongo/db/auth/authcommon',
'$BUILD_DIR/mongo/rpc/rpc',
'$BUILD_DIR/third_party/shim_asio',
'network_interface',
'task_executor_interface',
])

env.CppUnitTest(
Expand Down
4 changes: 2 additions & 2 deletions src/mongo/executor/network_interface_asio.h
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,7 @@ class NetworkInterfaceASIO final : public NetworkInterface {
// AsyncOp may run multiple commands over its lifetime (for example, an ismaster
// command, the command provided to the NetworkInterface via startCommand(), etc.)
// Calling beginCommand() resets internal state to prepare to run newCommand.
AsyncCommand& beginCommand(const RemoteCommandRequest& request, rpc::Protocol protocol);
AsyncCommand& beginCommand(Message&& newCommand);
AsyncCommand& command();

Expand Down Expand Up @@ -230,8 +231,7 @@ class NetworkInterfaceASIO final : public NetworkInterface {
handler();
}

std::unique_ptr<Message> _messageFromRequest(const RemoteCommandRequest& request,
rpc::Protocol protocol);
ResponseStatus _responseFromMessage(const Message& received, rpc::Protocol protocol);

// Connection
void _connect(AsyncOp* op);
Expand Down
75 changes: 68 additions & 7 deletions src/mongo/executor/network_interface_asio_auth.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,22 @@

#include "mongo/executor/network_interface_asio.h"

#include "mongo/rpc/reply_interface.h"
#include "mongo/client/authenticate.h"
#include "mongo/config.h"
#include "mongo/db/auth/authorization_manager_global.h"
#include "mongo/db/auth/internal_user_auth.h"
#include "mongo/db/server_options.h"
#include "mongo/rpc/factory.h"
#include "mongo/rpc/legacy_request_builder.h"
#include "mongo/rpc/reply_interface.h"
#include "mongo/stdx/memory.h"
#include "mongo/util/net/ssl_manager.h"

namespace mongo {
namespace executor {

using ResponseStatus = TaskExecutor::ResponseStatus;

void NetworkInterfaceASIO::_runIsMaster(AsyncOp* op) {
// We use a legacy builder to create our ismaster request because we may
// have to communicate with servers that do not support OP_COMMAND
Expand All @@ -50,7 +59,6 @@ void NetworkInterfaceASIO::_runIsMaster(AsyncOp* op) {

// Set current command to ismaster request and run
auto& cmd = op->beginCommand(std::move(*(requestBuilder.done())));
cmd.toSend().header().setResponseTo(0);

// Callback to parse protocol information out of received ismaster response
auto parseIsMaster = [this, op]() {
Expand All @@ -64,12 +72,22 @@ void NetworkInterfaceASIO::_runIsMaster(AsyncOp* op) {

op->connection().setServerProtocols(protocolSet.getValue());

// Set the operation protocol
auto negotiatedProtocol = rpc::negotiate(op->connection().serverProtocols(),
op->connection().clientProtocols());

if (!negotiatedProtocol.isOK()) {
return _completeOperation(op, negotiatedProtocol.getStatus());
}

op->setOperationProtocol(negotiatedProtocol.getValue());

// Advance the state machine
_beginCommunication(op);
return _authenticate(op);

} catch (...) {
// makeReply will throw if the rely was invalid.
_completeOperation(op, exceptionToStatus());
// makeReply will throw if the reply was invalid.
return _completeOperation(op, exceptionToStatus());
}
};

Expand All @@ -80,8 +98,51 @@ void NetworkInterfaceASIO::_runIsMaster(AsyncOp* op) {
}

void NetworkInterfaceASIO::_authenticate(AsyncOp* op) {
// TODO: Implement asynchronous authentication, SERVER-19155
asio::post(_io_service, [this, op]() { _runIsMaster(op); });
// There is currently no way for NetworkInterfaceASIO's users to run a command
// without going through _authenticate(). Callers may want to run certain commands,
// such as ismasters, pre-auth. We may want to offer this choice in the future.

// This check is sufficient to see if auth is enabled on the system,
// and avoids creating dependencies on deeper, less accessible auth code.
if (!isInternalAuthSet()) {
return asio::post(_io_service, [this, op]() { _beginCommunication(op); });
}

// We will only have a valid clientName if SSL is enabled.
std::string clientName;
#ifdef MONGO_CONFIG_SSL
if (getSSLManager()) {
clientName = getSSLManager()->getSSLConfiguration().clientSubjectName;
}
#endif

// authenticateClient will use this to run auth-related commands over our connection.
auto runCommandHook = [this, op](executor::RemoteCommandRequest request,
auth::AuthCompletionHandler handler) {
auto& cmd = op->beginCommand(request, op->operationProtocol());

auto callAuthCompletionHandler = [this, op, handler]() {
auto authResponse =
_responseFromMessage(op->command().toRecv(), op->operationProtocol());
handler(authResponse);
};

_asyncRunCommand(&cmd,
[this, op, callAuthCompletionHandler](std::error_code ec, size_t bytes) {
_validateAndRun(op, ec, callAuthCompletionHandler);
});
};

// This will be called when authentication has completed.
auto authHook = [this, op](auth::AuthResponse response) {
if (!response.isOK())
return _completeOperation(op, response);
return _beginCommunication(op);
};

auto params = getInternalUserAuthParamsWithFallback();
auth::authenticateClient(
params, op->request().target.host(), clientName, runCommandHook, authHook);
}

} // namespace executor
Expand Down
91 changes: 36 additions & 55 deletions src/mongo/executor/network_interface_asio_command.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,9 @@
#include "mongo/db/dbmessage.h"
#include "mongo/db/jsobj.h"
#include "mongo/rpc/factory.h"
#include "mongo/rpc/protocol.h"
#include "mongo/rpc/reply_interface.h"
#include "mongo/rpc/request_builder_interface.h"
#include "mongo/stdx/memory.h"
#include "mongo/util/assert_util.h"
#include "mongo/util/log.h"
#include "mongo/util/mongoutils/str.h"
Expand All @@ -56,6 +57,7 @@ namespace executor {
namespace {

using asio::ip::tcp;
using ResponseStatus = TaskExecutor::ResponseStatus;

// A type conforms to the NetworkHandler concept if it is a callable type that takes a
// std::error_code and std::size_t and returns void. The std::error_code parameter is used
Expand Down Expand Up @@ -164,37 +166,38 @@ void NetworkInterfaceASIO::_startCommand(AsyncOp* op) {
_connect(op);
}

std::unique_ptr<Message> NetworkInterfaceASIO::_messageFromRequest(
const RemoteCommandRequest& request, rpc::Protocol protocol) {
BSONObj query = request.cmdObj;
auto requestBuilder = rpc::makeRequestBuilder(protocol);

// TODO: handle metadata writers
auto toSend = rpc::makeRequestBuilder(protocol)
->setDatabase(request.dbname)
.setCommandName(request.cmdObj.firstElementFieldName())
.setMetadata(request.metadata)
.setCommandArgs(request.cmdObj)
.done();

toSend->header().setId(nextMessageId());
toSend->header().setResponseTo(0);
ResponseStatus NetworkInterfaceASIO::_responseFromMessage(const Message& received,
rpc::Protocol protocol) {
try {
// TODO: elapsed isn't going to be correct here, SERVER-19697
auto start = now();
auto reply = rpc::makeReply(&received);

if (reply->getProtocol() != protocol) {
auto requestProtocol = rpc::toString(static_cast<rpc::ProtocolSet>(protocol));
if (!requestProtocol.isOK())
return requestProtocol.getStatus();

return Status(ErrorCodes::RPCProtocolNegotiationFailed,
str::stream() << "Mismatched RPC protocols - request was '"
<< requestProtocol.getValue().toString() << "' '"
<< " but reply was '" << opToString(received.operation())
<< "'");
}

return toSend;
// unavoidable copy
auto ownedCommandReply = reply->getCommandReply().getOwned();
auto ownedReplyMetadata = reply->getMetadata().getOwned();
return ResponseStatus(
RemoteCommandResponse(ownedCommandReply, ownedReplyMetadata, now() - start));
} catch (...) {
// makeReply can throw if the reply was invalid.
return exceptionToStatus();
}
}

void NetworkInterfaceASIO::_beginCommunication(AsyncOp* op) {
auto negotiatedProtocol =
rpc::negotiate(op->connection().serverProtocols(), op->connection().clientProtocols());

if (!negotiatedProtocol.isOK()) {
return _completeOperation(op, negotiatedProtocol.getStatus());
}

op->setOperationProtocol(negotiatedProtocol.getValue());

auto& cmd = op->beginCommand(
std::move(*_messageFromRequest(op->request(), negotiatedProtocol.getValue())));
auto& cmd = op->beginCommand(op->request(), op->operationProtocol());

_asyncRunCommand(&cmd,
[this, op](std::error_code ec, size_t bytes) {
Expand All @@ -204,37 +207,15 @@ void NetworkInterfaceASIO::_beginCommunication(AsyncOp* op) {

void NetworkInterfaceASIO::_completedOpCallback(AsyncOp* op) {
// If we were told to send an empty message, toRecv will be empty here.

// TODO: handle metadata readers
const auto elapsed = [this, op]() { return now() - op->start(); };

if (op->command().toRecv().empty()) {
LOG(3) << "received an empty message";
return _completeOperation(op, RemoteCommandResponse(BSONObj(), BSONObj(), elapsed()));
auto elapsed = now() - op->start();
return _completeOperation(op, RemoteCommandResponse(BSONObj(), BSONObj(), elapsed));
}

try {
auto reply = rpc::makeReply(&(op->command().toRecv()));

if (reply->getProtocol() != op->operationProtocol()) {
return _completeOperation(
op,
Status(ErrorCodes::RPCProtocolNegotiationFailed,
str::stream() << "Mismatched RPC protocols - request was '"
<< opToString(op->command().toSend().operation()) << "' '"
<< " but reply was '"
<< opToString(op->command().toRecv().operation()) << "'"));
}

_completeOperation(op,
// unavoidable copy
RemoteCommandResponse(reply->getCommandReply().getOwned(),
reply->getMetadata().getOwned(),
elapsed()));
} catch (...) {
// makeReply can throw if the reply was invalid.
_completeOperation(op, exceptionToStatus());
}
// TODO: handle metadata readers.
auto response = _responseFromMessage(op->command().toRecv(), op->operationProtocol());
_completeOperation(op, response);
}

void NetworkInterfaceASIO::_networkErrorCallback(AsyncOp* op, const std::error_code& ec) {
Expand Down
2 changes: 1 addition & 1 deletion src/mongo/executor/network_interface_asio_connect.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ void NetworkInterfaceASIO::_setupSocket(AsyncOp* op, tcp::resolver::iterator end

stream.connect(std::move(endpoints),
[this, op](std::error_code ec) {
_validateAndRun(op, ec, [this, op]() { _authenticate(op); });
_validateAndRun(op, ec, [this, op]() { _runIsMaster(op); });
});
}

Expand Down
30 changes: 30 additions & 0 deletions src/mongo/executor/network_interface_asio_operation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,35 @@

#include "mongo/executor/network_interface_asio.h"
#include "mongo/executor/async_stream_interface.h"
#include "mongo/rpc/factory.h"
#include "mongo/rpc/request_builder_interface.h"

namespace mongo {
namespace executor {

using asio::ip::tcp;

namespace {

std::unique_ptr<Message> messageFromRequest(const RemoteCommandRequest& request,
rpc::Protocol protocol) {
BSONObj query = request.cmdObj;
auto requestBuilder = rpc::makeRequestBuilder(protocol);

// TODO: handle metadata writers
auto toSend = rpc::makeRequestBuilder(protocol)
->setDatabase(request.dbname)
.setCommandName(request.cmdObj.firstElementFieldName())
.setMetadata(request.metadata)
.setCommandArgs(request.cmdObj)
.done();

toSend->header().setId(nextMessageId());
return toSend;
}

} // namespace

NetworkInterfaceASIO::AsyncOp::AsyncOp(const TaskExecutor::CallbackHandle& cbHandle,
const RemoteCommandRequest& request,
const RemoteCommandCompletionFn& onFinish,
Expand Down Expand Up @@ -79,10 +102,17 @@ NetworkInterfaceASIO::AsyncCommand& NetworkInterfaceASIO::AsyncOp::beginCommand(
} else {
_command.emplace(_connection.get_ptr());
}
newCommand.header().setResponseTo(0);
_command->setToSend(std::move(newCommand));
return _command.get();
}

NetworkInterfaceASIO::AsyncCommand& NetworkInterfaceASIO::AsyncOp::beginCommand(
const RemoteCommandRequest& request, rpc::Protocol protocol) {
auto newCommand = messageFromRequest(request, protocol);
return beginCommand(std::move(*newCommand));
}

NetworkInterfaceASIO::AsyncCommand& NetworkInterfaceASIO::AsyncOp::command() {
invariant(_command.is_initialized());
return _command.get();
Expand Down

0 comments on commit 5c30b38

Please sign in to comment.