Skip to content

Commit

Permalink
SERVER-19554 populate lastOpVisible in ReplSetMetadata
Browse files Browse the repository at this point in the history
  • Loading branch information
matt dannenberg committed Aug 14, 2015
1 parent f075019 commit b77f580
Show file tree
Hide file tree
Showing 11 changed files with 119 additions and 27 deletions.
60 changes: 60 additions & 0 deletions jstests/replsets/last_op_visible.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
// Test the populating lastOpVisible field of the ReplSetMetadata.
// First we do a writeConcern-free write and ensure that a local read will return the same
// lastOpVisible, and that majority read with afterOpTime of lastOpVisible will return it as well.
// We then confirm that a writeConcern majority write will be seen as the lastVisibleOp by a
// majority read.

(function() {
"use strict";

var name = 'lastOpVisible';
var replTest = new ReplSetTest({name: name, nodes: 3});

replTest.startSet();
replTest.initiate();

var primary = replTest.getPrimary();

if (!primary.getDB(name).serverStatus().storageEngine.supportsCommittedReads) {
print("Skipping read_majority.js since storageEngine doesn't support it.");
return;
}

// Do an insert without writeConcern.
var res = primary.getDB(name).runCommandWithMetadata("insert",
{insert: name, documents: [{x:1}]},
{"$replData": 1});
assert.commandWorked(res.commandReply);
var last_op_visible = res.metadata["$replData"].lastOpVisible;

// A find should return the same lastVisibleOp.
res = primary.getDB(name).runCommandWithMetadata("find",
{find: name, readConcern: {level: "local"}},
{"$replData": 1});
assert.commandWorked(res.commandReply);
assert.eq(last_op_visible, res.metadata["$replData"].lastOpVisible);

// A majority readConcern with afterOpTime: lastOpVisible should also return the same lastVisibleOp.
res = primary.getDB(name).runCommandWithMetadata(
"find",
{find: name, readConcern: {level: "majority", afterOpTime: last_op_visible}},
{"$replData": 1});
assert.commandWorked(res.commandReply);
assert.eq(last_op_visible, res.metadata["$replData"].lastOpVisible);

// Do an insert without writeConcern.
res = primary.getDB(name).runCommandWithMetadata(
"insert",
{insert: name, documents: [{x:1}], writeConcern: {w: "majority"}},
{"$replData": 1});
assert.commandWorked(res.commandReply);
last_op_visible = res.metadata["$replData"].lastOpVisible;

// A majority readConcern should return the same lastVisibleOp.
res = primary.getDB(name).runCommandWithMetadata("find",
{find: name, readConcern: {level: "majority"}},
{"$replData": 1});
assert.commandWorked(res.commandReply);
assert.eq(last_op_visible, res.metadata["$replData"].lastOpVisible);

}());
23 changes: 12 additions & 11 deletions src/mongo/db/dbcommands.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1286,9 +1286,9 @@ bool Command::run(OperationContext* txn,

repl::ReplicationCoordinator* replCoord = repl::getGlobalReplicationCoordinator();

repl::ReadConcernArgs readConcern;
{
// parse and validate ReadConcernArgs
repl::ReadConcernArgs readConcern;
auto readConcernParseStatus = readConcern.initialize(request.getCommandArgs());
if (!readConcernParseStatus.isOK()) {
replyBuilder->setMetadata(rpc::makeEmptyMetadata())
Expand Down Expand Up @@ -1343,18 +1343,19 @@ bool Command::run(OperationContext* txn,

BSONObjBuilder metadataBob;

// For commands from mongos, append some info to help getLastError(w) work.
// TODO: refactor out of here as part of SERVER-18326
bool isReplSet = replCoord->getReplicationMode() == repl::ReplicationCoordinator::modeReplSet;

if (isReplSet && ShardingState::get(txn)->enabled()) {
rpc::ShardingMetadata(
repl::ReplClientInfo::forClient(txn->getClient()).getLastOp().getTimestamp(),
replCoord->getElectionId()).writeToMetadata(&metadataBob);
}

if (isReplSet) {
replCoord->prepareReplResponseMetadata(request, &metadataBob);
repl::OpTime lastOpTimeFromClient =
repl::ReplClientInfo::forClient(txn->getClient()).getLastOp();
replCoord->prepareReplResponseMetadata(
request, lastOpTimeFromClient, readConcern, &metadataBob);

// For commands from mongos, append some info to help getLastError(w) work.
// TODO: refactor out of here as part of SERVER-18326
if (ShardingState::get(txn)->enabled()) {
rpc::ShardingMetadata(lastOpTimeFromClient.getTimestamp(), replCoord->getElectionId())
.writeToMetadata(&metadataBob);
}
}

auto cmdResponse = replyBuilderBob.done();
Expand Down
7 changes: 7 additions & 0 deletions src/mongo/db/repl/replication_coordinator.h
Original file line number Diff line number Diff line change
Expand Up @@ -617,6 +617,8 @@ class ReplicationCoordinator : public SyncSourceSelector {
* Prepares a metadata object describing the current term, primary, and lastOp information.
*/
virtual void prepareReplResponseMetadata(const rpc::RequestInterface& request,
const OpTime& lastOpTimeFromClient,
const ReadConcernArgs& readConcern,
BSONObjBuilder* builder) = 0;

/**
Expand Down Expand Up @@ -678,6 +680,11 @@ class ReplicationCoordinator : public SyncSourceSelector {
*/
virtual void dropAllSnapshots() = 0;

/**
* Gets the latest OpTime of the currentCommittedSnapshot.
*/
virtual OpTime getCurrentCommittedSnapshotOpTime() = 0;

protected:
ReplicationCoordinator();
};
Expand Down
17 changes: 14 additions & 3 deletions src/mongo/db/repl/replication_coordinator_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ ReplicaSetConfig ReplicationCoordinatorImpl::getReplicaSetConfig_forTest() {
return _rsConfig;
}

OpTime ReplicationCoordinatorImpl::getCurrentCommittedSnapshot_forTest() {
OpTime ReplicationCoordinatorImpl::getCurrentCommittedSnapshotOpTime() {
stdx::lock_guard<stdx::mutex> lk(_mutex);
if (_currentCommittedSnapshot) {
return _currentCommittedSnapshot->opTime;
Expand Down Expand Up @@ -2749,6 +2749,8 @@ void ReplicationCoordinatorImpl::_processReplSetDeclareElectionWinner_finish(
}

void ReplicationCoordinatorImpl::prepareReplResponseMetadata(const rpc::RequestInterface& request,
const OpTime& lastOpTimeFromClient,
const ReadConcernArgs& readConcern,
BSONObjBuilder* builder) {
if (request.getMetadata().hasField(rpc::kReplSetMetadataFieldName)) {
rpc::ReplSetMetadata metadata;
Expand All @@ -2757,6 +2759,8 @@ void ReplicationCoordinatorImpl::prepareReplResponseMetadata(const rpc::RequestI
stdx::bind(&ReplicationCoordinatorImpl::_prepareReplResponseMetadata_finish,
this,
stdx::placeholders::_1,
lastOpTimeFromClient,
readConcern,
&metadata));

if (cbh.getStatus() == ErrorCodes::ShutdownInProgress) {
Expand All @@ -2771,8 +2775,15 @@ void ReplicationCoordinatorImpl::prepareReplResponseMetadata(const rpc::RequestI
}

void ReplicationCoordinatorImpl::_prepareReplResponseMetadata_finish(
const ReplicationExecutor::CallbackArgs& cbData, rpc::ReplSetMetadata* metadata) {
_topCoord->prepareReplResponseMetadata(metadata, getLastCommittedOpTime());
const ReplicationExecutor::CallbackArgs& cbData,
const OpTime& lastOpTimeFromClient,
const ReadConcernArgs& readConcern,
rpc::ReplSetMetadata* metadata) {
OpTime lastReadableOpTime = readConcern.getLevel() == ReadConcernLevel::kMajorityReadConcern
? getCurrentCommittedSnapshotOpTime()
: getMyLastOptime();
OpTime lastVisibleOpTime = std::max(lastOpTimeFromClient, lastReadableOpTime);
_topCoord->prepareReplResponseMetadata(metadata, lastVisibleOpTime, getLastCommittedOpTime());
}

bool ReplicationCoordinatorImpl::isV1ElectionProtocol() {
Expand Down
11 changes: 6 additions & 5 deletions src/mongo/db/repl/replication_coordinator_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,8 @@ class ReplicationCoordinatorImpl : public ReplicationCoordinator, public KillOpL
long long* responseTerm) override;

void prepareReplResponseMetadata(const rpc::RequestInterface&,
const OpTime& lastOpTimeFromClient,
const ReadConcernArgs& readConcern,
BSONObjBuilder* builder) override;

virtual Status processHeartbeatV1(const ReplSetHeartbeatArgsV1& args,
Expand All @@ -282,6 +284,8 @@ class ReplicationCoordinatorImpl : public ReplicationCoordinator, public KillOpL

virtual void onSnapshotCreate(OpTime timeOfSnapshot, SnapshotName name) override;

virtual OpTime getCurrentCommittedSnapshotOpTime() override;

// ================== Test support API ===================

/**
Expand All @@ -295,11 +299,6 @@ class ReplicationCoordinatorImpl : public ReplicationCoordinator, public KillOpL
*/
ReplicaSetConfig getReplicaSetConfig_forTest();

/**
* Gets the latest OpTime of the currentCommittedSnapshot.
*/
OpTime getCurrentCommittedSnapshot_forTest();

/**
* Simple wrapper around _setLastOptime_inlock to make it easier to test.
*/
Expand Down Expand Up @@ -547,6 +546,8 @@ class ReplicationCoordinatorImpl : public ReplicationCoordinator, public KillOpL
* Bottom half of prepareReplResponseMetadata.
*/
void _prepareReplResponseMetadata_finish(const ReplicationExecutor::CallbackArgs& cbData,
const OpTime& lastOpTimeFromClient,
const ReadConcernArgs& readConcern,
rpc::ReplSetMetadata* metadata);
/**
* Scheduled to cause the ReplicationCoordinator to reconsider any state that might
Expand Down
13 changes: 6 additions & 7 deletions src/mongo/db/repl/replication_coordinator_impl_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2586,8 +2586,7 @@ TEST_F(ReplCoordTest, MetadataUpdatesLastCommittedOpTime) {
<< 2 << "primaryIndex" << 2 << "term" << 1)));
getReplCoord()->processReplSetMetadata(metadata.getValue());
ASSERT_EQUALS(OpTime(Timestamp(10, 0), 1), getReplCoord()->getLastCommittedOpTime());
ASSERT_EQUALS(OpTime(Timestamp(10, 0), 1),
getReplCoord()->getCurrentCommittedSnapshot_forTest());
ASSERT_EQUALS(OpTime(Timestamp(10, 0), 1), getReplCoord()->getCurrentCommittedSnapshotOpTime());

// lower OpTime, should not change
StatusWith<rpc::ReplSetMetadata> metadata2 = rpc::ReplSetMetadata::readFromMetadata(BSON(
Expand Down Expand Up @@ -2678,21 +2677,21 @@ TEST_F(ReplCoordTest, SnapshotCommitting) {

// ensure current snapshot follows price is right rules (closest but not greater than)
getReplCoord()->setMyLastOptime(time3);
ASSERT_EQUALS(time2, getReplCoord()->getCurrentCommittedSnapshot_forTest());
ASSERT_EQUALS(time2, getReplCoord()->getCurrentCommittedSnapshotOpTime());
getReplCoord()->setMyLastOptime(time4);
ASSERT_EQUALS(time2, getReplCoord()->getCurrentCommittedSnapshot_forTest());
ASSERT_EQUALS(time2, getReplCoord()->getCurrentCommittedSnapshotOpTime());

// ensure current snapshot will not advance beyond existing snapshots
getReplCoord()->setMyLastOptime(time6);
ASSERT_EQUALS(time5, getReplCoord()->getCurrentCommittedSnapshot_forTest());
ASSERT_EQUALS(time5, getReplCoord()->getCurrentCommittedSnapshotOpTime());

// ensure current snapshot updates on new snapshot if we are that far
getReplCoord()->onSnapshotCreate(time6, SnapshotName(4));
ASSERT_EQUALS(time6, getReplCoord()->getCurrentCommittedSnapshot_forTest());
ASSERT_EQUALS(time6, getReplCoord()->getCurrentCommittedSnapshotOpTime());

// ensure dropping all snapshots should reset the current committed snapshot
getReplCoord()->dropAllSnapshots();
ASSERT_EQUALS(OpTime(), getReplCoord()->getCurrentCommittedSnapshot_forTest());
ASSERT_EQUALS(OpTime(), getReplCoord()->getCurrentCommittedSnapshotOpTime());
}

TEST_F(ReplCoordTest, MoveOpTimeForward) {
Expand Down
6 changes: 6 additions & 0 deletions src/mongo/db/repl/replication_coordinator_mock.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,8 @@ Status ReplicationCoordinatorMock::processReplSetDeclareElectionWinner(
}

void ReplicationCoordinatorMock::prepareReplResponseMetadata(const rpc::RequestInterface& request,
const OpTime& lastOpTimeFromClient,
const ReadConcernArgs& readConcern,
BSONObjBuilder* builder) {}

Status ReplicationCoordinatorMock::processHeartbeatV1(const ReplSetHeartbeatArgsV1& args,
Expand Down Expand Up @@ -368,5 +370,9 @@ void ReplicationCoordinatorMock::onSnapshotCreate(OpTime timeOfSnapshot, Snapsho

void ReplicationCoordinatorMock::dropAllSnapshots() {}

OpTime ReplicationCoordinatorMock::getCurrentCommittedSnapshotOpTime() {
return OpTime();
}

} // namespace repl
} // namespace mongo
4 changes: 4 additions & 0 deletions src/mongo/db/repl/replication_coordinator_mock.h
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,8 @@ class ReplicationCoordinatorMock : public ReplicationCoordinator {
long long* responseTerm);

void prepareReplResponseMetadata(const rpc::RequestInterface& request,
const OpTime& lastOpTimeFromClient,
const ReadConcernArgs& readConcern,
BSONObjBuilder* builder) override;

virtual Status processHeartbeatV1(const ReplSetHeartbeatArgsV1& args,
Expand All @@ -214,6 +216,8 @@ class ReplicationCoordinatorMock : public ReplicationCoordinator {

virtual void dropAllSnapshots() override;

virtual OpTime getCurrentCommittedSnapshotOpTime() override;

private:
AtomicUInt64 _snapshotNameGenerator;
const ReplSettings _settings;
Expand Down
1 change: 1 addition & 0 deletions src/mongo/db/repl/topology_coordinator.h
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,7 @@ class TopologyCoordinator {
* Prepares a BSONObj describing the current term, primary, and lastOp information.
*/
virtual void prepareReplResponseMetadata(rpc::ReplSetMetadata* metadata,
const OpTime& lastVisibleOpTime,
const OpTime& lastCommittedOpTime) const = 0;

/**
Expand Down
3 changes: 2 additions & 1 deletion src/mongo/db/repl/topology_coordinator_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2163,11 +2163,12 @@ bool TopologyCoordinatorImpl::shouldChangeSyncSource(const HostAndPort& currentS
}

void TopologyCoordinatorImpl::prepareReplResponseMetadata(rpc::ReplSetMetadata* metadata,
const OpTime& lastVisibleOpTime,
const OpTime& lastCommittedOpTime) const {
// TODO(dannenberg): change the third arg to be the lastOpTimeVisible.
*metadata = rpc::ReplSetMetadata(_term,
lastCommittedOpTime,
lastCommittedOpTime,
lastVisibleOpTime,
_rsConfig.getConfigVersion(),
_currentPrimaryIndex);
}
Expand Down
1 change: 1 addition & 0 deletions src/mongo/db/repl/topology_coordinator_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,7 @@ class TopologyCoordinatorImpl : public TopologyCoordinator {
virtual bool stepDownIfPending();
virtual Date_t getStepDownTime() const;
virtual void prepareReplResponseMetadata(rpc::ReplSetMetadata* metadata,
const OpTime& lastVisibleOpTime,
const OpTime& lastCommitttedOpTime) const;
Status processReplSetDeclareElectionWinner(const ReplSetDeclareElectionWinnerArgs& args,
long long* responseTerm);
Expand Down

0 comments on commit b77f580

Please sign in to comment.