Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ARSN-345: support batching and optimize #2130

Draft
wants to merge 4 commits into
base: development/8.1
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions lib/storage/metadata/MetadataWrapper.js
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,19 @@ class MetadataWrapper {
});
}

getObjectsMD(bucketName, objNamesWithParams, log, cb) {
log.debug('getting object from metadata');
this.client.getObjects(bucketName, objNamesWithParams, log, (err, data) => {
if (err) {
log.debug('error from metadata', { implName: this.implName,
err });
return cb(err);
}
log.debug('object retrieved from metadata');
return cb(err, data);
});
}

deleteObjectMD(bucketName, objName, params, log, cb, originOp = 's3:ObjectRemoved:Delete') {
log.debug('deleting object from metadata');
this.client.deleteObject(bucketName, objName, params, log, err => {
Expand Down
115 changes: 103 additions & 12 deletions lib/storage/metadata/mongoclient/MongoClientInterface.js
Original file line number Diff line number Diff line change
Expand Up @@ -1041,6 +1041,73 @@ class MongoClientInterface {
], cb);
}

getObjects(bucketName, objects, log, callback) {
const c = this.getCollection(bucketName);
let vFormat = null;
// Function to process each document
const processDoc = (doc, objName, params, key, cb) => {
if (!doc && params && params.versionId) {
return cb(null, {
doc: null,
versionId: params.versionId,
key,
});
}
// If no master found then object is either non existent
// or last version is delete marker
if (!doc || doc.value.isPHD) {
return this.getLatestVersion(c, objName, vFormat, log, (err, doc) => {
if (err && !err.is.NoSuchKey) {
return cb(err);
}
return cb(null, {
doc: doc || null,
versionId: params.versionId,
key,
});
});
}
MongoUtils.unserialize(doc.value);
return cb(null, {
doc: doc.value,
versionId: params.versionId,
key,
});
};
this.getBucketVFormat(bucketName, log, (err, _vFormat) => {
if (err) {
return callback(err);
}
vFormat = _vFormat;
// Create keys, maintaining the context with each key
const keysAndObjects = objects.map(({ key: objName, params }) => {
const _key = params && params.versionId
? formatVersionKey(objName, params.versionId, vFormat)
: formatMasterKey(objName, vFormat);
return { key: _key, objName, params };
});
// Extract keys and find documents
const keys = keysAndObjects.map(o => o.key);
return c.find({
_id: { $in: keys },
$or: [
{ 'value.deleted': { $exists: false } },
{ 'value.deleted': { $eq: false } },
],
}).toArray().then(docs => {
// Create a Map to quickly find docs by their keys
const docByKey = new Map(docs.map(doc => [doc._id, doc]));
// Process each document using associated context (objName, params)
async.mapLimit(keysAndObjects, 5, ({ key, objName, params }, cb) => {
const doc = docByKey.get(key);
processDoc(doc, objName, params, key, cb);
}, callback);
}).catch(err => {
callback(err);
});
});
}

/**
* This function return the latest version of an object
* by getting all keys related to an object's versions, ordering them
Expand Down Expand Up @@ -1279,7 +1346,7 @@ class MongoClientInterface {
return next(null);
}
return next(err);
}, originOp),
}, originOp, params),
], err => {
if (err) {
log.error(
Expand Down Expand Up @@ -1321,7 +1388,7 @@ class MongoClientInterface {
return cb(errors.InternalError);
}
return cb(null);
}, originOp);
}, originOp, params);
}

/**
Expand Down Expand Up @@ -1410,7 +1477,7 @@ class MongoClientInterface {
return cb(errors.InternalError);
}
return cb(null);
}, originOp);
}, originOp, params);
}

/**
Expand All @@ -1425,23 +1492,42 @@ class MongoClientInterface {
* @param {Function} cb callback containing error
* and BulkWriteResult
* @param {String} [originOp=s3:ObjectRemoved:Delete] origin operation
* @param {object} [params] request params
* @return {undefined}
*/
internalDeleteObject(collection, bucketName, key, filter, log, cb, originOp = 's3:ObjectRemoved:Delete') {
// filter used when finding and updating object
internalDeleteObject(collection, bucketName, key, filter, log, cb, originOp = 's3:ObjectRemoved:Delete',
params = null) {
// filter used when deleting object
const deleteFilter = Object.assign({
_id: key,
}, filter);

if (params?.shouldOnlyDelete) {
// If flag is true, directly delete object
return collection.deleteOne(deleteFilter)
.then(() => cb(null))
.catch(err => {
log.error('internalDeleteObject: error deleting object',
{ bucket: bucketName, object: key, error: err.message });
return cb(errors.InternalError);
});
}

// If flag is false, proceed with normal operations
const findFilter = Object.assign({
_id: key,
$or: [
{ 'value.deleted': { $exists: false } },
{ 'value.deleted': { $eq: false } },
],
}, filter);
// filter used when deleting object

const updateDeleteFilter = Object.assign({
'_id': key,
'value.deleted': true,
}, filter);
async.waterfall([

return async.waterfall([
// Adding delete flag when getting the object
// to avoid having race conditions.
next => collection.findOneAndUpdate(findFilter, {
Expand Down Expand Up @@ -1469,21 +1555,26 @@ class MongoClientInterface {
}),
// We update the full object to get the whole object metadata
// in the oplog update event
(objMetadata, next) => collection.bulkWrite([
{
(objMetadata, next) => {
const operations = [{
updateOne: {
filter: updateDeleteFilter,
update: {
$set: { _id: key, value: objMetadata },
},
upsert: false,
},
}, {
}];

// Add the delete operation if the shouldDelete flag is true
operations.push({
deleteOne: {
filter: updateDeleteFilter,
},
},
], { ordered: true }).then(() => next(null)).catch(() => next()),
});

collection.bulkWrite(operations, { ordered: true }).then(() => next(null)).catch(() => next());
},
], (err, res) => {
if (err) {
if (err.is.NoSuchKey) {
Expand Down