Skip to content

Commit

Permalink
SERVER-19466 TTLMonitor::doTTLForIndex() use IXSCAN => FETCH => DELETE
Browse files Browse the repository at this point in the history
  • Loading branch information
coollog committed Aug 13, 2015
1 parent 9eadefd commit 7c80890
Show file tree
Hide file tree
Showing 17 changed files with 270 additions and 148 deletions.
4 changes: 2 additions & 2 deletions src/mongo/db/catalog/capped_utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,8 @@ Status cloneCollectionAsCapped(OperationContext* txn,

long long excessSize = fromCollection->dataSize(txn) - allocatedSpaceGuess;

std::unique_ptr<PlanExecutor> exec(
InternalPlanner::collectionScan(txn, fromNs, fromCollection, InternalPlanner::FORWARD));
std::unique_ptr<PlanExecutor> exec(InternalPlanner::collectionScan(
txn, fromNs, fromCollection, PlanExecutor::YIELD_MANUAL, InternalPlanner::FORWARD));

exec->setYieldPolicy(PlanExecutor::WRITE_CONFLICT_RETRY_ONLY);

Expand Down
4 changes: 2 additions & 2 deletions src/mongo/db/catalog/index_create.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -230,8 +230,8 @@ Status MultiIndexBlock::insertAllDocumentsInCollection(std::set<RecordId>* dupsO

unsigned long long n = 0;

unique_ptr<PlanExecutor> exec(
InternalPlanner::collectionScan(_txn, _collection->ns().ns(), _collection));
unique_ptr<PlanExecutor> exec(InternalPlanner::collectionScan(
_txn, _collection->ns().ns(), _collection, PlanExecutor::YIELD_MANUAL));
if (_buildInBackground) {
invariant(_allowInterruption);
exec->setYieldPolicy(PlanExecutor::YIELD_AUTO);
Expand Down
6 changes: 4 additions & 2 deletions src/mongo/db/commands/dbhash.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -101,11 +101,13 @@ std::string DBHashCmd::hashCollection(OperationContext* opCtx,
desc,
BSONObj(),
BSONObj(),
false,
false, // endKeyInclusive
PlanExecutor::YIELD_MANUAL,
InternalPlanner::FORWARD,
InternalPlanner::IXSCAN_FETCH);
} else if (collection->isCapped()) {
exec = InternalPlanner::collectionScan(opCtx, fullCollectionName, collection);
exec = InternalPlanner::collectionScan(
opCtx, fullCollectionName, collection, PlanExecutor::YIELD_MANUAL);
} else {
log() << "can't find _id index for: " << fullCollectionName << endl;
return "no _id _index";
Expand Down
2 changes: 1 addition & 1 deletion src/mongo/db/commands/test_commands.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ class CapTrunc : public Command {
RecordId end;
{
std::unique_ptr<PlanExecutor> exec(InternalPlanner::collectionScan(
txn, nss.ns(), collection, InternalPlanner::BACKWARD));
txn, nss.ns(), collection, PlanExecutor::YIELD_MANUAL, InternalPlanner::BACKWARD));
// We remove 'n' elements so the start is one past that
for (int i = 0; i < n + 1; ++i) {
PlanExecutor::ExecState state = exec->getNext(NULL, &end);
Expand Down
3 changes: 2 additions & 1 deletion src/mongo/db/db.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,8 @@ static void repairDatabasesAndCheckVersion() {
const string systemIndexes = db->name() + ".system.indexes";

Collection* coll = db->getCollection(systemIndexes);
unique_ptr<PlanExecutor> exec(InternalPlanner::collectionScan(&txn, systemIndexes, coll));
unique_ptr<PlanExecutor> exec(
InternalPlanner::collectionScan(&txn, systemIndexes, coll, PlanExecutor::YIELD_MANUAL));

BSONObj index;
PlanExecutor::ExecState state;
Expand Down
10 changes: 8 additions & 2 deletions src/mongo/db/dbcommands.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -766,7 +766,7 @@ class CmdDatasize : public Command {
result.append("millis", timer.millis());
return 1;
}
exec = InternalPlanner::collectionScan(txn, ns, collection);
exec = InternalPlanner::collectionScan(txn, ns, collection, PlanExecutor::YIELD_MANUAL);
} else if (min.isEmpty() || max.isEmpty()) {
errmsg = "only one of min or max specified";
return false;
Expand All @@ -790,7 +790,13 @@ class CmdDatasize : public Command {
min = Helpers::toKeyFormat(kp.extendRangeBound(min, false));
max = Helpers::toKeyFormat(kp.extendRangeBound(max, false));

exec = InternalPlanner::indexScan(txn, collection, idx, min, max, false);
exec = InternalPlanner::indexScan(txn,
collection,
idx,
min,
max,
false, // endKeyInclusive
PlanExecutor::YIELD_MANUAL);
}

long long avgObjSize = collection->dataSize(txn) / collection->numRecords(txn);
Expand Down
15 changes: 11 additions & 4 deletions src/mongo/db/dbhelpers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,8 @@ RecordId Helpers::findById(OperationContext* txn, Collection* collection, const

bool Helpers::getSingleton(OperationContext* txn, const char* ns, BSONObj& result) {
AutoGetCollectionForRead ctx(txn, ns);
unique_ptr<PlanExecutor> exec(InternalPlanner::collectionScan(txn, ns, ctx.getCollection()));
unique_ptr<PlanExecutor> exec(
InternalPlanner::collectionScan(txn, ns, ctx.getCollection(), PlanExecutor::YIELD_MANUAL));
PlanExecutor::ExecState state = exec->getNext(&result, NULL);

CurOp::get(txn)->done();
Expand All @@ -208,7 +209,7 @@ bool Helpers::getSingleton(OperationContext* txn, const char* ns, BSONObj& resul
bool Helpers::getLast(OperationContext* txn, const char* ns, BSONObj& result) {
AutoGetCollectionForRead autoColl(txn, ns);
unique_ptr<PlanExecutor> exec(InternalPlanner::collectionScan(
txn, ns, autoColl.getCollection(), InternalPlanner::BACKWARD));
txn, ns, autoColl.getCollection(), PlanExecutor::YIELD_MANUAL, InternalPlanner::BACKWARD));
PlanExecutor::ExecState state = exec->getNext(&result, NULL);

if (PlanExecutor::ADVANCED == state) {
Expand Down Expand Up @@ -353,6 +354,7 @@ long long Helpers::removeRange(OperationContext* txn,
min,
max,
maxInclusive,
PlanExecutor::YIELD_MANUAL,
InternalPlanner::FORWARD,
InternalPlanner::IXSCAN_FETCH));
exec->setYieldPolicy(PlanExecutor::YIELD_AUTO);
Expand Down Expand Up @@ -519,8 +521,13 @@ Status Helpers::getLocsInRange(OperationContext* txn,
bool isLargeChunk = false;
long long docCount = 0;

unique_ptr<PlanExecutor> exec(
InternalPlanner::indexScan(txn, collection, idx, min, max, false));
unique_ptr<PlanExecutor> exec(InternalPlanner::indexScan(txn,
collection,
idx,
min,
max,
false, // endKeyInclusive
PlanExecutor::YIELD_MANUAL));
// we can afford to yield here because any change to the base data that we might miss is
// already being queued and will be migrated in the 'transferMods' stage
exec->setYieldPolicy(PlanExecutor::YIELD_AUTO);
Expand Down
9 changes: 7 additions & 2 deletions src/mongo/db/index/haystack_access_method.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,13 @@ void HaystackAccessMethod::searchCommand(OperationContext* txn,
unordered_set<RecordId, RecordId::Hasher> thisPass;


unique_ptr<PlanExecutor> exec(
InternalPlanner::indexScan(txn, collection, _descriptor, key, key, true));
unique_ptr<PlanExecutor> exec(InternalPlanner::indexScan(txn,
collection,
_descriptor,
key,
key,
true, // endKeyInclusive
PlanExecutor::YIELD_MANUAL));
PlanExecutor::ExecState state;
RecordId loc;
while (PlanExecutor::ADVANCED == (state = exec->getNext(NULL, &loc))) {
Expand Down
85 changes: 68 additions & 17 deletions src/mongo/db/query/internal_plans.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,27 +33,27 @@
#include "mongo/db/catalog/database.h"
#include "mongo/db/client.h"
#include "mongo/db/exec/collection_scan.h"
#include "mongo/db/exec/delete.h"
#include "mongo/db/exec/eof.h"
#include "mongo/db/exec/fetch.h"
#include "mongo/db/exec/index_scan.h"
#include "mongo/db/query/plan_executor.h"
#include "mongo/stdx/memory.h"

namespace mongo {

// static
std::unique_ptr<PlanExecutor> InternalPlanner::collectionScan(OperationContext* txn,
StringData ns,
Collection* collection,
PlanExecutor::YieldPolicy yieldPolicy,
const Direction direction,
const RecordId startLoc) {
std::unique_ptr<WorkingSet> ws = stdx::make_unique<WorkingSet>();

if (NULL == collection) {
auto eof = stdx::make_unique<EOFStage>(txn);
// Takes ownership of 'ws' and 'eof'.
auto statusWithPlanExecutor = PlanExecutor::make(
txn, std::move(ws), std::move(eof), ns.toString(), PlanExecutor::YIELD_MANUAL);
auto statusWithPlanExecutor =
PlanExecutor::make(txn, std::move(ws), std::move(eof), ns.toString(), yieldPolicy);
invariant(statusWithPlanExecutor.isOK());
return std::move(statusWithPlanExecutor.getValue());
}
Expand All @@ -73,21 +73,78 @@ std::unique_ptr<PlanExecutor> InternalPlanner::collectionScan(OperationContext*
std::unique_ptr<CollectionScan> cs =
stdx::make_unique<CollectionScan>(txn, params, ws.get(), nullptr);
// Takes ownership of 'ws' and 'cs'.
auto statusWithPlanExecutor = PlanExecutor::make(
txn, std::move(ws), std::move(cs), collection, PlanExecutor::YIELD_MANUAL);
auto statusWithPlanExecutor =
PlanExecutor::make(txn, std::move(ws), std::move(cs), collection, yieldPolicy);
invariant(statusWithPlanExecutor.isOK());
return std::move(statusWithPlanExecutor.getValue());
}

// static
std::unique_ptr<PlanExecutor> InternalPlanner::indexScan(OperationContext* txn,
const Collection* collection,
const IndexDescriptor* descriptor,
const BSONObj& startKey,
const BSONObj& endKey,
bool endKeyInclusive,
PlanExecutor::YieldPolicy yieldPolicy,
Direction direction,
int options) {
auto ws = stdx::make_unique<WorkingSet>();

std::unique_ptr<PlanStage> root = _indexScan(txn,
ws.get(),
collection,
descriptor,
startKey,
endKey,
endKeyInclusive,
direction,
options);

auto executor =
PlanExecutor::make(txn, std::move(ws), std::move(root), collection, yieldPolicy);
invariantOK(executor.getStatus());
return std::move(executor.getValue());
}

std::unique_ptr<PlanExecutor> InternalPlanner::deleteWithIndexScan(
OperationContext* txn,
Collection* collection,
const DeleteStageParams& params,
const IndexDescriptor* descriptor,
const BSONObj& startKey,
const BSONObj& endKey,
bool endKeyInclusive,
PlanExecutor::YieldPolicy yieldPolicy,
Direction direction) {
auto ws = stdx::make_unique<WorkingSet>();

std::unique_ptr<PlanStage> root = _indexScan(txn,
ws.get(),
collection,
descriptor,
startKey,
endKey,
endKeyInclusive,
direction,
InternalPlanner::IXSCAN_FETCH);

root = stdx::make_unique<DeleteStage>(txn, params, ws.get(), collection, root.release());

auto executor =
PlanExecutor::make(txn, std::move(ws), std::move(root), collection, yieldPolicy);
invariantOK(executor.getStatus());
return std::move(executor.getValue());
}

std::unique_ptr<PlanStage> InternalPlanner::_indexScan(OperationContext* txn,
WorkingSet* ws,
const Collection* collection,
const IndexDescriptor* descriptor,
const BSONObj& startKey,
const BSONObj& endKey,
bool endKeyInclusive,
Direction direction,
int options) {
invariant(collection);
invariant(descriptor);

Expand All @@ -99,19 +156,13 @@ std::unique_ptr<PlanExecutor> InternalPlanner::indexScan(OperationContext* txn,
params.bounds.endKey = endKey;
params.bounds.endKeyInclusive = endKeyInclusive;

std::unique_ptr<WorkingSet> ws = stdx::make_unique<WorkingSet>();

std::unique_ptr<PlanStage> root = stdx::make_unique<IndexScan>(txn, params, ws.get(), nullptr);
std::unique_ptr<PlanStage> root = stdx::make_unique<IndexScan>(txn, params, ws, nullptr);

if (IXSCAN_FETCH & options) {
root = stdx::make_unique<FetchStage>(txn, ws.get(), root.release(), nullptr, collection);
if (InternalPlanner::IXSCAN_FETCH & options) {
root = stdx::make_unique<FetchStage>(txn, ws, root.release(), nullptr, collection);
}

// Takes ownership of 'ws' and 'root'.
auto statusWithPlanExecutor = PlanExecutor::make(
txn, std::move(ws), std::move(root), collection, PlanExecutor::YIELD_MANUAL);
invariant(statusWithPlanExecutor.isOK());
return std::move(statusWithPlanExecutor.getValue());
return root;
}

} // namespace mongo
42 changes: 38 additions & 4 deletions src/mongo/db/query/internal_plans.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
#pragma once

#include "mongo/base/string_data.h"
#include "mongo/db/query/plan_executor.h"
#include "mongo/db/record_id.h"

namespace mongo {
Expand All @@ -37,7 +38,9 @@ class BSONObj;
class Collection;
class IndexDescriptor;
class OperationContext;
class PlanExecutor;
class PlanStage;
class WorkingSet;
struct DeleteStageParams;

/**
* The internal planner is a one-stop shop for "off-the-shelf" plans. Most internal procedures
Expand All @@ -61,25 +64,56 @@ class InternalPlanner {
};

/**
* Return a collection scan. Caller owns pointer.
* Returns a collection scan. Caller owns pointer.
*/
static std::unique_ptr<PlanExecutor> collectionScan(OperationContext* txn,
StringData ns,
Collection* collection,
PlanExecutor::YieldPolicy yieldPolicy,
const Direction direction = FORWARD,
const RecordId startLoc = RecordId());

/**
* Return an index scan. Caller owns returned pointer.
* Returns an index scan. Caller owns returned pointer.
*/
static std::unique_ptr<PlanExecutor> indexScan(OperationContext* txn,
const Collection* collection,
const IndexDescriptor* descriptor,
const BSONObj& startKey,
const BSONObj& endKey,
bool endKeyInclusive,
PlanExecutor::YieldPolicy yieldPolicy,
Direction direction = FORWARD,
int options = 0);
int options = IXSCAN_DEFAULT);

/**
* Returns an IXSCAN => FETCH => DELETE plan.
*/
static std::unique_ptr<PlanExecutor> deleteWithIndexScan(OperationContext* txn,
Collection* collection,
const DeleteStageParams& params,
const IndexDescriptor* descriptor,
const BSONObj& startKey,
const BSONObj& endKey,
bool endKeyInclusive,
PlanExecutor::YieldPolicy yieldPolicy,
Direction direction = FORWARD);

private:
/**
* Returns a plan stage that is either an index scan or an index scan with a fetch stage.
*
* Used as a helper for indexScan() and deleteWithIndexScan().
*/
static std::unique_ptr<PlanStage> _indexScan(OperationContext* txn,
WorkingSet* ws,
const Collection* collection,
const IndexDescriptor* descriptor,
const BSONObj& startKey,
const BSONObj& endKey,
bool endKeyInclusive,
Direction direction = FORWARD,
int options = IXSCAN_DEFAULT);
};

} // namespace mongo
6 changes: 3 additions & 3 deletions src/mongo/db/repl/master_slave.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ void ReplSource::loadAll(OperationContext* txn, SourceVector& v) {
// add if missing
int n = 0;
unique_ptr<PlanExecutor> exec(InternalPlanner::collectionScan(
txn, localSources, ctx.db()->getCollection(localSources)));
txn, localSources, ctx.db()->getCollection(localSources), PlanExecutor::YIELD_MANUAL));
BSONObj obj;
PlanExecutor::ExecState state;
while (PlanExecutor::ADVANCED == (state = exec->getNext(&obj, NULL))) {
Expand Down Expand Up @@ -309,8 +309,8 @@ void ReplSource::loadAll(OperationContext* txn, SourceVector& v) {
}
}

unique_ptr<PlanExecutor> exec(
InternalPlanner::collectionScan(txn, localSources, ctx.db()->getCollection(localSources)));
unique_ptr<PlanExecutor> exec(InternalPlanner::collectionScan(
txn, localSources, ctx.db()->getCollection(localSources), PlanExecutor::YIELD_MANUAL));
BSONObj obj;
PlanExecutor::ExecState state;
while (PlanExecutor::ADVANCED == (state = exec->getNext(&obj, NULL))) {
Expand Down
1 change: 1 addition & 0 deletions src/mongo/db/repl/oplog_interface_local.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ OplogIteratorLocal::OplogIteratorLocal(OperationContext* txn, const std::string&
_exec(InternalPlanner::collectionScan(txn,
collectionName,
_ctx.db()->getCollection(collectionName),
PlanExecutor::YIELD_MANUAL,
InternalPlanner::BACKWARD)) {}

StatusWith<OplogInterface::Iterator::Value> OplogIteratorLocal::next() {
Expand Down
Loading

0 comments on commit 7c80890

Please sign in to comment.