Skip to content

Commit

Permalink
Add MariaDB history collector
Browse files Browse the repository at this point in the history
Signed-off-by: draco <dracode01@gmail.com>
  • Loading branch information
dracoooooo committed Mar 23, 2024
1 parent 303b49d commit a105fb7
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 0 deletions.
11 changes: 11 additions & 0 deletions src/main/java/collector/maria/MariaClient.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package collector.maria;

import collector.DBClient;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class MariaClient extends DBClient {
public MariaClient(String url, String username, String password) {
super(url, username, password);
}
}
79 changes: 79 additions & 0 deletions src/main/java/collector/maria/MariaCollector.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
package collector.maria;

import collector.Collector;
import history.History;
import lombok.SneakyThrows;

import java.sql.Statement;
import java.util.ArrayList;
import java.util.Properties;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class MariaCollector extends Collector<Long, Long> {
public static final String NAME = "MARIA";

static {
try {
Class.forName("com.mysql.cj.jdbc.Driver");
} catch (ClassNotFoundException e) {
throw new RuntimeException(e);
}
}

@SneakyThrows
public MariaCollector(Properties config) {
super(config);
}

@Override
@SneakyThrows
public History<Long, Long> collect(History<Long, Long> history) {
createTable();
createVariables(nKey);
ExecutorService executor = Executors.newFixedThreadPool(history.getSessions().size());
var todo = new ArrayList<Callable<Void>>();
history.getSessions().values().forEach(session -> {
Callable<Void> task = () -> {
var node = new MariaClient(url, username, password);
node.execSession(session, isolation);
node.close();
session.getTransactions().removeIf((txn) -> !txn.isSuccess());
return null;
};
todo.add(task);
});
executor.invokeAll(todo);
dropDatabase();
history.getTransactions().entrySet().removeIf((entry) -> !entry.getValue().isSuccess());
return history;
}

@Override
@SneakyThrows
protected void createTable() {
var statement = connection.createStatement();
statement.executeUpdate("CREATE DATABASE IF NOT EXISTS dbtest");
statement.executeUpdate("DROP TABLE IF EXISTS dbtest.variables");
statement.executeUpdate("CREATE TABLE IF NOT EXISTS dbtest.variables (var BIGINT(64) UNSIGNED NOT NULL PRIMARY KEY, val BIGINT(64) UNSIGNED NOT NULL)");
}

@Override
@SneakyThrows
protected void createVariables(long nKey) {
var insertStmt = connection.prepareStatement("INSERT INTO dbtest.variables (var, val) values (?, 0)");
for (long k = 1; k <= nKey; k++) {
insertStmt.setLong(1, k);
insertStmt.addBatch();
}
insertStmt.executeBatch();
}

@Override
@SneakyThrows
protected void dropDatabase() {
Statement statement = connection.createStatement();
statement.executeUpdate("DROP DATABASE dbtest");
}
}

0 comments on commit a105fb7

Please sign in to comment.