Skip to content

Commit

Permalink
MongoDB client submitted by Yen Pai
Browse files Browse the repository at this point in the history
  • Loading branch information
Brian Frank Cooper committed May 12, 2010
1 parent 6d2b0b6 commit ea11a8d
Show file tree
Hide file tree
Showing 4 changed files with 282 additions and 1 deletion.
5 changes: 5 additions & 0 deletions build.xml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@
<antcall target="dbcompile"/>
</target>

<target name="dbcompile-mongodb" depends="compile">
<property name="db.dir" value="db/mongodb"/>
<antcall target="dbcompile"/>
</target>

<target name="compile">
<mkdir dir="${classes.dir}"/>
<javac srcdir="${src.dir}" destdir="${classes.dir}" excludes="com/yahoo/ycsb/db/**" deprecation="on">
Expand Down
1 change: 1 addition & 0 deletions db/mongodb/lib/README
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
This directory should contain jars for building and running MongoDB
267 changes: 267 additions & 0 deletions db/mongodb/src/com/yahoo/ycsb/db/MongoDbClient.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,267 @@
/**
* MongoDB client binding for YCSB.
*
* Submitted by Yen Pai on 5/11/2010.
*
* https://gist.github.com/000a66b8db2caf42467b#file_mongo_db.java
*
*/

package com.yahoo.ycsb.db;

import java.util.HashMap;
import java.util.Iterator;
import java.util.Properties;
import java.util.Set;
import java.util.Vector;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.mongodb.BasicDBObject;
import com.mongodb.DBAddress;
import com.mongodb.DBCollection;
import com.mongodb.DBCursor;
import com.mongodb.DBObject;
import com.mongodb.Mongo;
import com.mongodb.DB.WriteConcern;
import com.yahoo.ycsb.DB;
import com.yahoo.ycsb.DBException;

/**
* MongoDB client for YCSB framework.
*
* Properties to set:
*
* mongodb.url=mongodb://localhost:27017
* mongodb.database=ycsb
*
* @author ypai
*
*/
public class MongoDbClient extends DB {

private static final Logger logger = LoggerFactory.getLogger(MongoDbClient.class);

private Mongo mongo;
private WriteConcern writeConcern;
private String database;

/**
* Initialize any state for this DB. Called once per DB instance; there is
* one DB instance per client thread.
*/
public void init() throws DBException {
// initialize MongoDb driver
Properties props = getProperties();
String url = props.getProperty("mongodb.url");
database = props.getProperty("mongodb.database");
String writeConcernType = props.getProperty("mongodb.writeConcern");

if ("none".equals(writeConcernType)) {
writeConcern = WriteConcern.NONE;
} else if ("strict".equals(writeConcernType)) {
writeConcern = WriteConcern.STRICT;
} else if ("normal".equals(writeConcernType)) {
writeConcern = WriteConcern.NORMAL;
}

try {
// strip out prefix since Java driver doesn't currently support
// standard
// connection format URL yet
// http://www.mongodb.org/display/DOCS/Connections
if (url.startsWith("mongodb://")) {
url = url.substring(10);
}

mongo = new Mongo(new DBAddress(url));
} catch (Exception e1) {
logger.error(
"Could not initialize MongoDB connection pool for Loader: "
+ e1, e1);
return;
}

}

@Override
/**
* Delete a record from the database.
*
* @param table The name of the table
* @param key The record key of the record to delete.
* @return Zero on success, a non-zero error code on error. See this class's description for a discussion of error codes.
*/
public int delete(String table, String key) {
try {
com.mongodb.DB db = mongo.getDB(database);
DBCollection collection = db.getCollection(table);
DBObject q = new BasicDBObject().append("_id", key);
if (writeConcern.equals(WriteConcern.STRICT)) {
q.put("$atomic", true);
}
collection.remove(q);

// see if record was deleted
DBObject errors = db.getLastError();

return (Long) errors.get("n") == 1 ? 0 : 1;
} catch (Exception e) {
logger.error(e + "", e);
return 1;
}

}

@Override
/**
* Insert a record in the database. Any field/value pairs in the specified values HashMap will be written into the record with the specified
* record key.
*
* @param table The name of the table
* @param key The record key of the record to insert.
* @param values A HashMap of field/value pairs to insert in the record
* @return Zero on success, a non-zero error code on error. See this class's description for a discussion of error codes.
*/
public int insert(String table, String key, HashMap<String, String> values) {
try {
com.mongodb.DB db = mongo.getDB(database);
DBCollection collection = db.getCollection(table);
DBObject r = new BasicDBObject().append("_id", key);
r.putAll(values);

collection.setWriteConcern(writeConcern);

collection.insert(r);

// determine if record was inserted
DBObject errors = db.getLastError();
return (Long) errors.get("n") == 1 ? 0 : 1;
} catch (Exception e) {
logger.error(e + "", e);
return 1;
}

}

@Override
@SuppressWarnings ("unchecked")
/**
* Read a record from the database. Each field/value pair from the result will be stored in a HashMap.
*
* @param table The name of the table
* @param key The record key of the record to read.
* @param fields The list of fields to read, or null for all of them
* @param result A HashMap of field/value pairs for the result
* @return Zero on success, a non-zero error code on error or "not found".
*/
public int read(String table, String key, Set<String> fields,
HashMap<String, String> result) {
try {
com.mongodb.DB db = mongo.getDB(database);
DBCollection collection = db.getCollection(table);
DBObject q = new BasicDBObject().append("_id", key);
DBObject fieldsToReturn = new BasicDBObject();
boolean returnAllFields = fields == null;

DBObject queryResult = null;
if (!returnAllFields) {
Iterator<String> iter = fields.iterator();
while (iter.hasNext()) {
fieldsToReturn.put(iter.next(), 1);
}
queryResult = collection.findOne(q, fieldsToReturn);
} else {
queryResult = collection.findOne(q);
}

if (queryResult != null) {
//toMap() returns a Map, but result.putAll() expects a Map<String,String>. Hence, the suppress warnings.
result.putAll(queryResult.toMap());
}
return queryResult != null ? 0 : 1;
} catch (Exception e) {
logger.error(e + "", e);
return 1;
}

}

@Override
@SuppressWarnings("unchecked")
/**
* Perform a range scan for a set of records in the database. Each field/value pair from the result will be stored in a HashMap.
*
* @param table The name of the table
* @param startkey The record key of the first record to read.
* @param recordcount The number of records to read
* @param fields The list of fields to read, or null for all of them
* @param result A Vector of HashMaps, where each HashMap is a set field/value pairs for one record
* @return Zero on success, a non-zero error code on error. See this class's description for a discussion of error codes.
*/
public int scan(String table, String startkey, int recordcount,
Set<String> fields, Vector<HashMap<String, String>> result) {
try {
com.mongodb.DB db = mongo.getDB(database);
DBCollection collection = db.getCollection(table);
// { "_id":{"$gte":startKey, "$lte":{"appId":key+"\uFFFF"}} }
DBObject scanRange = new BasicDBObject().append("$gte", startkey);
DBObject q = new BasicDBObject().append("_id", scanRange);
DBCursor cursor = collection.find(q).limit(recordcount);
while (cursor.hasNext()) {
//toMap() returns a Map, but result.add() expects a Map<String,String>. Hence, the suppress warnings.
result.add((HashMap<String, String>) cursor.next().toMap());
}
return 0;
} catch (Exception e) {
logger.error(e + "", e);
return 1;
}

}

@Override
/**
* Update a record in the database. Any field/value pairs in the specified values HashMap will be written into the record with the specified
* record key, overwriting any existing values with the same field name.
*
* @param table The name of the table
* @param key The record key of the record to write.
* @param values A HashMap of field/value pairs to update in the record
* @return Zero on success, a non-zero error code on error. See this class's description for a discussion of error codes.
*/
public int update(String table, String key, HashMap<String, String> values) {
try {
com.mongodb.DB db = mongo.getDB(database);
DBCollection collection = db.getCollection(table);
DBObject q = new BasicDBObject().append("_id", key);
DBObject u = new BasicDBObject();
DBObject fieldsToSet = new BasicDBObject();
Iterator<String> keys = values.keySet().iterator();
String tmpKey = null, tmpVal = null;
while (keys.hasNext()) {
tmpKey = keys.next();
tmpVal = values.get(tmpKey);
fieldsToSet.put(tmpKey, tmpVal);

}
u.put("$set", fieldsToSet);

collection.setWriteConcern(writeConcern);

collection.update(q, u);

// determine if record was inserted
DBObject errors = db.getLastError();

return (Long) errors.get("n") == 1 ? 0 : 1;
} catch (Exception e) {
logger.error(e + "", e);
return 1;
}

}
}


10 changes: 9 additions & 1 deletion src/com/yahoo/ycsb/CommandLine.java
Original file line number Diff line number Diff line change
Expand Up @@ -326,9 +326,17 @@ else if (tokens[0].compareTo("scan")==0)
int ret=db.scan(table,tokens[1],Integer.parseInt(tokens[2]),fields,results);
System.out.println("Return code: "+ret);
int record=0;
if (results.size()==0)
{
System.out.println("0 records");
}
else
{
System.out.println("--------------------------------");
}
for (HashMap<String,String> result : results)
{
System.out.println("Record"+(record++));
System.out.println("Record "+(record++));
for (Map.Entry<String,String> ent : result.entrySet())
{
System.out.println(ent.getKey()+"="+ent.getValue());
Expand Down

0 comments on commit ea11a8d

Please sign in to comment.