procClass) {
- return (T)(this.class_procedures.get(procClass));
+ public final P getProcedure(Class
procClass) {
+ return (P) (this.class_procedures.get(procClass));
}
-
+
public final Histogram getTransactionSuccessHistogram() {
return (this.txnSuccess);
}
+
public final Histogram getTransactionRetryHistogram() {
return (this.txnRetry);
}
+
public final Histogram getTransactionAbortHistogram() {
return (this.txnAbort);
}
+
public final Histogram getTransactionErrorHistogram() {
return (this.txnErrors);
}
+
public final Map> getTransactionAbortMessageHistogram() {
return (this.txnAbortMessages);
}
-
+
synchronized public void setCurrStatement(Statement s) {
this.currStatement = s;
}
@@ -159,44 +197,36 @@ synchronized public void cancelStatement() {
try {
if (this.currStatement != null)
this.currStatement.cancel();
- } catch(SQLException e) {
+ } catch (SQLException e) {
LOG.error("Failed to cancel statement: " + e.getMessage());
}
}
- /**
- * Get unique name for this worker's thread
- */
- public final String getName() {
- return String.format("worker%03d", this.getId());
- }
-
- @Override
- public final void run() {
- Thread t = Thread.currentThread();
+ @Override
+ public final void run() {
+ Thread t = Thread.currentThread();
SubmittedProcedure pieceOfWork;
- t.setName(this.getName());
-
- // In case of reuse reset the measurements
- latencies = new LatencyRecord(wrkldState.getTestStartNs());
-
- // Invoke the initialize callback
- try {
- this.initialize();
- } catch (Throwable ex) {
- throw new RuntimeException("Unexpected error when initializing " + this.getName(), ex);
- }
-
- // wait for start
- wrkldState.blockForStart();
+ t.setName(this.toString());
+
+ // In case of reuse reset the measurements
+ latencies = new LatencyRecord(wrkldState.getTestStartNs());
+
+ // Invoke the initialize callback
+ try {
+ this.initialize();
+ } catch (Throwable ex) {
+ throw new RuntimeException("Unexpected error when initializing " + this, ex);
+ }
+
+ // wait for start
+ wrkldState.blockForStart();
State preState, postState;
Phase phase;
-
- TransactionType invalidTT = TransactionType.INVALID;
- assert(invalidTT != null);
-
-work:
- while (true) {
+
+ TransactionType invalidTT = TransactionType.INVALID;
+ assert (invalidTT != null);
+
+ work: while (true) {
// PART 1: Init and check if done
@@ -209,17 +239,19 @@ public final void run() {
// This is the first time we have observed that the
// test is done notify the global test state, then
// continue applying load
- seenDone = true;
- wrkldState.signalDone();
+ seenDone = true;
+ wrkldState.signalDone();
break work;
}
- break;
- }
+ break;
+ default:
+ // Do nothing
+ }
// PART 2: Wait for work
// Sleep if there's nothing to do.
- wrkldState.stayAwake();
+ wrkldState.stayAwake();
phase = this.wrkldState.getCurrentPhase();
if (phase == null)
continue work;
@@ -240,18 +272,20 @@ public final void run() {
// Once a latency run is complete, we wait until the next
// phase or until DONE.
continue work;
- }
+ default:
+ // Do nothing
+ }
// PART 3: Execute work
- // TODO: Measuring latency when not rate limited is ... a little
+ // TODO: Measuring latency when not rate limited is ... a little
// weird because if you add more simultaneous clients, you will
// increase latency (queue delay) but we do this anyway since it is
// useful sometimes
long start = pieceOfWork.getStartTime();
- TransactionType type = invalidTT;
+ TransactionType type = invalidTT;
try {
type = doWork(preState == State.MEASURE, pieceOfWork);
} catch (IndexOutOfBoundsException e) {
@@ -262,15 +296,14 @@ public final void run() {
if (phase.id == this.wrkldState.getCurrentPhase().id) {
switch (preState) {
case WARMUP:
- // Don't quit yet: we haven't even begun!
+ // Don't quit yet: we haven't even begun!
phase.resetSerial();
break;
case COLD_QUERY:
case MEASURE:
// The serial phase is over. Finish the run early.
wrkldState.signalLatencyComplete();
- LOG.info("[Serial] Serial execution of all"
- + " transactions complete.");
+ LOG.info("[Serial] Serial execution of all" + " transactions complete.");
break;
default:
throw e;
@@ -279,21 +312,20 @@ public final void run() {
}
// PART 4: Record results
-
- long end = System.nanoTime();
+
+ long end = System.nanoTime();
postState = wrkldState.getGlobalState();
- switch(postState) {
+ switch (postState) {
case MEASURE:
// Non-serial measurement. Only measure if the state both
// before and after was MEASURE, and the phase hasn't
// changed, otherwise we're recording results for a query
// that either started during the warmup phase or ended
// after the timer went off.
- if (preState == State.MEASURE && type != null
- && this.wrkldState.getCurrentPhase().id == phase.id) {
- latencies.addLatency(type.getId(), start, end, this.id
- , phase.id);
+ if (preState == State.MEASURE && type != null && this.wrkldState.getCurrentPhase().id == phase.id) {
+ latencies.addLatency(type.getId(), start, end, this.id, phase.id);
+ intervalRequests.incrementAndGet();
}
if (phase.isLatencyRun())
this.wrkldState.startColdQuery();
@@ -304,51 +336,55 @@ public final void run() {
if (preState == State.COLD_QUERY)
this.wrkldState.startHotQuery();
break;
- }
+ default:
+ // Do nothing
+ }
wrkldState.finishedWork();
- }
-
- tearDown(false);
- }
-
- /**
- * Called in a loop in the thread to exercise the system under test.
- * Each implementing worker should return the TransactionType handle that
- * was executed.
- *
- * @param llr
- */
+ }
+
+ tearDown(false);
+ }
+
+ /**
+ * Called in a loop in the thread to exercise the system under test. Each
+ * implementing worker should return the TransactionType handle that was
+ * executed.
+ *
+ * @param llr
+ */
protected final TransactionType doWork(boolean measure, SubmittedProcedure pieceOfWork) {
- TransactionType next = null;
- TransactionStatus status = TransactionStatus.RETRY;
- Savepoint savepoint = null;
- final DatabaseType dbType = wrkld.getDBType();
- final boolean recordAbortMessages = wrkld.getRecordAbortMessages();
-
- try {
- while (status == TransactionStatus.RETRY && this.wrkldState.getGlobalState() != State.DONE) {
+ TransactionType next = null;
+ TransactionStatus status = TransactionStatus.RETRY;
+ Savepoint savepoint = null;
+ final DatabaseType dbType = wrkld.getDBType();
+ final boolean recordAbortMessages = wrkld.getRecordAbortMessages();
+
+ try {
+ while (status == TransactionStatus.RETRY && this.wrkldState.getGlobalState() != State.DONE) {
if (next == null) {
next = transactionTypes.getType(pieceOfWork.getType());
}
- assert(next.isSupplemental() == false) :
- "Trying to select a supplemental transaction " + next;
-
- try {
- // For Postgres, we have to create a savepoint in order
- // to rollback a user aborted transaction
-// if (dbType == DatabaseType.POSTGRES) {
-// savepoint = this.conn.setSavepoint();
-// // if (LOG.isDebugEnabled())
-// LOG.info("Created SavePoint: " + savepoint);
-// }
-
- status = this.executeWork(next);
- // User Abort Handling
- // These are not errors
- } catch (UserAbortException ex) {
- if (LOG.isDebugEnabled()) LOG.debug(next + " Aborted", ex);
-
+ assert (next.isSupplemental() == false) : "Trying to select a supplemental transaction " + next;
+
+ try {
+ // For Postgres, we have to create a savepoint in order
+ // to rollback a user aborted transaction
+ // if (dbType == DatabaseType.POSTGRES) {
+ // savepoint = this.conn.setSavepoint();
+ // // if (LOG.isDebugEnabled())
+ // LOG.info("Created SavePoint: " + savepoint);
+ // }
+
+ status = TransactionStatus.UNKNOWN;
+ status = this.executeWork(next);
+
+ // User Abort Handling
+ // These are not errors
+ } catch (UserAbortException ex) {
+ if (LOG.isDebugEnabled())
+ LOG.trace(next + " Aborted", ex);
+
/* PAVLO */
if (recordAbortMessages) {
Histogram error_h = this.txnAbortMessages.get(next);
@@ -358,23 +394,27 @@ protected final TransactionType doWork(boolean measure, SubmittedProcedure piece
}
error_h.put(StringUtil.abbrv(ex.getMessage(), 20));
}
-
+
if (savepoint != null) {
this.conn.rollback(savepoint);
} else {
this.conn.rollback();
}
- this.txnAbort.put(next);
- break;
+ status = TransactionStatus.USER_ABORTED;
+ break;
+
// Database System Specific Exception Handling
} catch (SQLException ex) {
-
- //TODO: Handle acceptable error codes for every DBMS
- LOG.debug(next+ " " + ex.getMessage()+" "+ex.getErrorCode()+ " - " +ex.getSQLState());
+ // TODO: Handle acceptable error codes for every DBMS
+ if (LOG.isDebugEnabled())
+ LOG.warn(String.format("%s thrown when executing '%s' on '%s' " +
+ "[Message='%s', ErrorCode='%d', SQLState='%s']",
+ ex.getClass().getSimpleName(), next, this.toString(),
+ ex.getMessage(), ex.getErrorCode(), ex.getSQLState()), ex);
this.txnErrors.put(next);
-
+
if (savepoint != null) {
this.conn.rollback(savepoint);
} else {
@@ -383,113 +423,156 @@ protected final TransactionType doWork(boolean measure, SubmittedProcedure piece
if (ex.getSQLState() == null) {
continue;
- }
- else if (ex.getErrorCode() == 1213 && ex.getSQLState().equals("40001")) {
+ // ------------------
+ // MYSQL
+ // ------------------
+ } else if (ex.getErrorCode() == 1213 && ex.getSQLState().equals("40001")) {
// MySQLTransactionRollbackException
continue;
- }
- else if (ex.getErrorCode() == 1205 && ex.getSQLState().equals("41000")) {
+ } else if (ex.getErrorCode() == 1205 && ex.getSQLState().equals("41000")) {
// MySQL Lock timeout
continue;
- }
- else if (ex.getErrorCode() == 1205 && ex.getSQLState().equals("40001")) {
+
+ // ------------------
+ // SQL SERVER
+ // ------------------
+ } else if (ex.getErrorCode() == 1205 && ex.getSQLState().equals("40001")) {
// SQLServerException Deadlock
continue;
- }
- else if (ex.getErrorCode() == -911 && ex.getSQLState().equals("40001")) {
- // DB2Exception Deadlock
- continue;
- }
- else if (ex.getErrorCode() == 0 && ex.getSQLState() != null && ex.getSQLState().equals("40001")) {
+
+ // ------------------
+ // POSTGRES
+ // ------------------
+ } else if (ex.getErrorCode() == 0 && ex.getSQLState() != null && ex.getSQLState().equals("40001")) {
// Postgres serialization
continue;
- }
- else if (ex.getErrorCode() == 8177 && ex.getSQLState().equals("72000")) {
+ } else if (ex.getErrorCode() == 0 && ex.getSQLState() != null && ex.getSQLState().equals("53200")) {
+ // Postgres OOM error
+ throw ex;
+
+ // ------------------
+ // ORACLE
+ // ------------------
+ } else if (ex.getErrorCode() == 8177 && ex.getSQLState().equals("72000")) {
// ORA-08177: Oracle Serialization
continue;
- }
- else if ( (ex.getErrorCode() == 0 && ex.getSQLState().equals("57014"))
- || (ex.getErrorCode() == -952 && ex.getSQLState().equals("57014")) // DB2
- )
- {
+
+ // ------------------
+ // DB2
+ // ------------------
+ } else if (ex.getErrorCode() == -911 && ex.getSQLState().equals("40001")) {
+ // DB2Exception Deadlock
+ continue;
+ } else if ((ex.getErrorCode() == 0 && ex.getSQLState().equals("57014")) ||
+ (ex.getErrorCode() == -952 && ex.getSQLState().equals("57014"))) {
// Query cancelled by benchmark because we changed
// state. That's fine! We expected/caused this.
status = TransactionStatus.RETRY_DIFFERENT;
continue;
- }
- else if (ex.getErrorCode() == 0 && ex.getSQLState().equals("02000")) {
+ } else if (ex.getErrorCode() == 0 && ex.getSQLState().equals("02000")) {
// No results returned. That's okay, we can proceed to
// a different query. But we should send out a warning,
// too, since this is unusual.
status = TransactionStatus.RETRY_DIFFERENT;
continue;
- }
- else {
+
+ // ------------------
+ // UNKNOWN!
+ // ------------------
+ } else {
// UNKNOWN: In this case .. Retry as well!
+ LOG.warn("The DBMS rejected the transaction without an error code", ex);
continue;
- //FIXME Disable this for now
+ // FIXME Disable this for now
// throw ex;
}
- }
- finally {
+ // Assertion Error
+ } catch (Error ex) {
+ LOG.error("Fatal error when invoking " + next, ex);
+ throw ex;
+ // Random Error
+ } catch (Exception ex) {
+ LOG.error("Fatal error when invoking " + next, ex);
+ throw new RuntimeException(ex);
+
+ } finally {
+ if (LOG.isDebugEnabled())
+ LOG.debug(String.format("%s %s Result: %s", this, next, status));
+
switch (status) {
case SUCCESS:
this.txnSuccess.put(next);
- LOG.debug("Executed a new invocation of " + next);
break;
case RETRY_DIFFERENT:
this.txnRetry.put(next);
return null;
+ case USER_ABORTED:
+ this.txnAbort.put(next);
+ break;
case RETRY:
- LOG.debug("Retrying transaction...");
continue;
default:
- assert(false) :
- String.format("Unexpected status '%s' for %s", status, next);
+ assert (false) : String.format("Unexpected status '%s' for %s", status, next);
} // SWITCH
}
- } // WHILE
- } catch (SQLException ex) {
- throw new RuntimeException(String.format("Unexpected error in %s when executing %s [%s]",
- this.getName(), next, dbType), ex);
- }
-
+ } // WHILE
+ } catch (SQLException ex) {
+ String msg = String.format("Unexpected fatal, error in '%s' when executing '%s' [%s]",
+ this, next, dbType);
+ // FIXME: PAVLO 2016-12-29
+ // Right now our DBMS throws an exception when the txn gets aborted
+ // due to a conflict, so for now we have to not kill ourselves.
+ // This *does not* incorrectly inflate our performance numbers.
+ // It's more of a workaround for now until I can figure out how to do
+ // this correctly in JDBC.
+ if (dbType == DatabaseType.PELOTON) {
+ msg += "\nBut we are not stopping because " + dbType + " cannot handle this correctly";
+ LOG.warn(msg);
+ } else {
+ throw new RuntimeException(msg, ex);
+ }
+ }
+
return (next);
- }
-
- /**
- * Optional callback that can be used to initialize the Worker
- * right before the benchmark execution begins
- */
- protected void initialize() {
- // The default is to do nothing
- }
-
+ }
+
+ /**
+ * Optional callback that can be used to initialize the Worker right before
+ * the benchmark execution begins
+ */
+ protected void initialize() {
+ // The default is to do nothing
+ }
+
/**
* Invoke a single transaction for the given TransactionType
+ *
* @param txnType
* @return TODO
- * @throws UserAbortException TODO
- * @throws SQLException TODO
+ * @throws UserAbortException
+ * TODO
+ * @throws SQLException
+ * TODO
+ */
+ protected abstract TransactionStatus executeWork(TransactionType txnType) throws UserAbortException, SQLException;
+
+ /**
+ * Called at the end of the test to do any clean up that may be required.
+ *
+ * @param error
+ * TODO
*/
- protected abstract TransactionStatus executeWork(TransactionType txnType) throws UserAbortException, SQLException;
-
- /**
- * Called at the end of the test to do any clean up that may be
- * required.
- * @param error TODO
- */
- public void tearDown(boolean error) {
- try {
- conn.close();
- } catch (SQLException e) {
- LOG.warn("No connection to close");
- }
- }
-
- public void initializeState() {
- assert (this.wrkldState == null);
- this.wrkldState = this.wrkld.getWorkloadState();
- }
+ public void tearDown(boolean error) {
+ try {
+ conn.close();
+ } catch (SQLException e) {
+ LOG.warn("No connection to close");
+ }
+ }
+
+ public void initializeState() {
+ assert (this.wrkldState == null);
+ this.wrkldState = this.wrkld.getWorkloadState();
+ }
}
diff --git a/src/com/oltpbenchmark/api/collectors/DBCollector.java b/src/com/oltpbenchmark/api/collectors/DBCollector.java
new file mode 100644
index 000000000..8d93cad00
--- /dev/null
+++ b/src/com/oltpbenchmark/api/collectors/DBCollector.java
@@ -0,0 +1,60 @@
+/******************************************************************************
+ * Copyright 2015 by OLTPBenchmark Project *
+ * *
+ * Licensed under the Apache License, Version 2.0 (the "License"); *
+ * you may not use this file except in compliance with the License. *
+ * You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, software *
+ * distributed under the License is distributed on an "AS IS" BASIS, *
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. *
+ * See the License for the specific language governing permissions and *
+ * limitations under the License. *
+ ******************************************************************************/
+
+package com.oltpbenchmark.api.collectors;
+
+import org.apache.log4j.Logger;
+
+import com.oltpbenchmark.util.JSONUtil;
+
+import java.util.Map;
+import java.util.TreeMap;
+
+public class DBCollector implements DBParameterCollector {
+
+ private static final Logger LOG = Logger.getLogger(DBCollector.class);
+
+ protected final Map dbParameters = new TreeMap();
+
+ protected final Map dbMetrics = new TreeMap();
+
+ protected final StringBuilder version = new StringBuilder();
+
+ @Override
+ public boolean hasParameters() {
+ return (dbParameters.isEmpty() == false);
+ }
+
+ @Override
+ public boolean hasMetrics() {
+ return (dbMetrics.isEmpty() == false);
+ }
+
+ @Override
+ public String collectParameters() {
+ return JSONUtil.format(JSONUtil.toJSONString(dbParameters));
+ }
+
+ @Override
+ public String collectMetrics() {
+ return JSONUtil.format(JSONUtil.toJSONString(dbMetrics));
+ }
+
+ @Override
+ public String collectVersion() {
+ return version.toString();
+ }
+}
diff --git a/src/com/oltpbenchmark/api/collectors/DBParameterCollector.java b/src/com/oltpbenchmark/api/collectors/DBParameterCollector.java
new file mode 100644
index 000000000..2ab2e7835
--- /dev/null
+++ b/src/com/oltpbenchmark/api/collectors/DBParameterCollector.java
@@ -0,0 +1,25 @@
+/******************************************************************************
+ * Copyright 2015 by OLTPBenchmark Project *
+ * *
+ * Licensed under the Apache License, Version 2.0 (the "License"); *
+ * you may not use this file except in compliance with the License. *
+ * You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, software *
+ * distributed under the License is distributed on an "AS IS" BASIS, *
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. *
+ * See the License for the specific language governing permissions and *
+ * limitations under the License. *
+ ******************************************************************************/
+
+package com.oltpbenchmark.api.collectors;
+
+public interface DBParameterCollector {
+ boolean hasParameters();
+ boolean hasMetrics();
+ String collectParameters();
+ String collectMetrics();
+ String collectVersion();
+}
diff --git a/src/com/oltpbenchmark/api/collectors/DBParameterCollectorGen.java b/src/com/oltpbenchmark/api/collectors/DBParameterCollectorGen.java
new file mode 100644
index 000000000..1a5762513
--- /dev/null
+++ b/src/com/oltpbenchmark/api/collectors/DBParameterCollectorGen.java
@@ -0,0 +1,30 @@
+/******************************************************************************
+ * Copyright 2015 by OLTPBenchmark Project *
+ * *
+ * Licensed under the Apache License, Version 2.0 (the "License"); *
+ * you may not use this file except in compliance with the License. *
+ * You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, software *
+ * distributed under the License is distributed on an "AS IS" BASIS, *
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. *
+ * See the License for the specific language governing permissions and *
+ * limitations under the License. *
+ ******************************************************************************/
+
+package com.oltpbenchmark.api.collectors;
+
+public class DBParameterCollectorGen {
+ public static DBParameterCollector getCollector(String dbType, String dbUrl, String username, String password) {
+ String db = dbType.toLowerCase();
+ if (db.equals("mysql")) {
+ return new MySQLCollector(dbUrl, username, password);
+ } else if (db.equals("postgres")) {
+ return new PostgresCollector(dbUrl, username, password);
+ } else {
+ return new DBCollector();
+ }
+ }
+}
diff --git a/src/com/oltpbenchmark/api/collectors/MySQLCollector.java b/src/com/oltpbenchmark/api/collectors/MySQLCollector.java
new file mode 100644
index 000000000..d9608cb3e
--- /dev/null
+++ b/src/com/oltpbenchmark/api/collectors/MySQLCollector.java
@@ -0,0 +1,65 @@
+/******************************************************************************
+ * Copyright 2015 by OLTPBenchmark Project *
+ * *
+ * Licensed under the Apache License, Version 2.0 (the "License"); *
+ * you may not use this file except in compliance with the License. *
+ * You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, software *
+ * distributed under the License is distributed on an "AS IS" BASIS, *
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. *
+ * See the License for the specific language governing permissions and *
+ * limitations under the License. *
+ ******************************************************************************/
+
+package com.oltpbenchmark.api.collectors;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+
+import org.apache.log4j.Logger;
+
+import com.oltpbenchmark.catalog.Catalog;
+
+public class MySQLCollector extends DBCollector {
+ private static final Logger LOG = Logger.getLogger(MySQLCollector.class);
+
+ private static final String VERSION_SQL = "SELECT @@GLOBAL.version;";
+
+ private static final String PARAMETERS_SQL = "SHOW VARIABLES;";
+
+ private static final String METRICS_SQL = "SHOW STATUS";
+
+ public MySQLCollector(String oriDBUrl, String username, String password) {
+ try {
+ Connection conn = DriverManager.getConnection(oriDBUrl, username, password);
+ Catalog.setSeparator(conn);
+ Statement s = conn.createStatement();
+
+ // Collect DBMS version
+ ResultSet out = s.executeQuery(VERSION_SQL);
+ if (out.next()) {
+ this.version.append(out.getString(1));
+ }
+
+ // Collect DBMS parameters
+ out = s.executeQuery(PARAMETERS_SQL);
+ while(out.next()) {
+ dbParameters.put(out.getString(1).toLowerCase(), out.getString(2));
+ }
+
+ // Collect DBMS internal metrics
+ out = s.executeQuery(METRICS_SQL);
+ while (out.next()) {
+ dbMetrics.put(out.getString(1).toLowerCase(), out.getString(2));
+ }
+ } catch (SQLException e) {
+ LOG.error("Error while collecting DB parameters: " + e.getMessage());
+ }
+ }
+}
diff --git a/src/com/oltpbenchmark/api/collectors/PostgresCollector.java b/src/com/oltpbenchmark/api/collectors/PostgresCollector.java
new file mode 100644
index 000000000..18fa2378b
--- /dev/null
+++ b/src/com/oltpbenchmark/api/collectors/PostgresCollector.java
@@ -0,0 +1,109 @@
+/******************************************************************************
+ * Copyright 2015 by OLTPBenchmark Project *
+ * *
+ * Licensed under the Apache License, Version 2.0 (the "License"); *
+ * you may not use this file except in compliance with the License. *
+ * You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, software *
+ * distributed under the License is distributed on an "AS IS" BASIS, *
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. *
+ * See the License for the specific language governing permissions and *
+ * limitations under the License. *
+ ******************************************************************************/
+
+package com.oltpbenchmark.api.collectors;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.log4j.Logger;
+
+import com.oltpbenchmark.catalog.Catalog;
+import com.oltpbenchmark.util.JSONUtil;
+
+public class PostgresCollector extends DBCollector {
+ private static final Logger LOG = Logger.getLogger(PostgresCollector.class);
+
+ private static final String VERSION_SQL = "SELECT version();";
+
+ private static final String PARAMETERS_SQL = "SHOW ALL;";
+
+ private static final String[] PG_STAT_VIEWS = {
+ "pg_stat_archiver", "pg_stat_bgwriter", "pg_stat_database",
+ "pg_stat_database_conflicts", "pg_stat_user_tables", "pg_statio_user_tables",
+ "pg_stat_user_indexes", "pg_statio_user_indexes"
+ };
+
+ private final Map>> pgMetrics;
+
+ public PostgresCollector(String oriDBUrl, String username, String password) {
+ pgMetrics = new HashMap>>();
+ try {
+ Connection conn = DriverManager.getConnection(oriDBUrl, username, password);
+ Catalog.setSeparator(conn);
+ Statement s = conn.createStatement();
+
+ // Collect DBMS version
+ ResultSet out = s.executeQuery(VERSION_SQL);
+ if (out.next()) {
+ this.version.append(out.getString(1));
+ }
+
+ // Collect DBMS parameters
+ out = s.executeQuery(PARAMETERS_SQL);
+ while (out.next()) {
+ dbParameters.put(out.getString("name"), out.getString("setting"));
+ }
+
+ // Collect DBMS internal metrics
+ for (String viewName : PG_STAT_VIEWS) {
+ out = s.executeQuery("SELECT * FROM " + viewName);
+ pgMetrics.put(viewName, getMetrics(out));
+ }
+ } catch (SQLException e) {
+ LOG.error("Error while collecting DB parameters: " + e.getMessage());
+ }
+ }
+
+ @Override
+ public boolean hasMetrics() {
+ return (pgMetrics.isEmpty() == false);
+ }
+
+ @Override
+ public String collectMetrics() {
+ return JSONUtil.format(JSONUtil.toJSONString(pgMetrics));
+ }
+
+ private static List