Skip to content

Commit

Permalink
feat: Added Lettuce command metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
Julien Ruaux committed Jul 21, 2022
1 parent d5a922f commit 88c60ad
Show file tree
Hide file tree
Showing 5 changed files with 59 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ public class ByteArrayResultSetCodec implements RedisCodec<String, ResultSet> {
public ByteArrayResultSetCodec(RowSetFactory rowSetFactory, int maxBufferCapacityMB, MeterRegistry meterRegistry) {
this.rowSetFactory = rowSetFactory;
this.maxByteBufferCapacity = maxBufferCapacityMB * MEGA;
this.encodeTimer = meterRegistry.timer("encoding", "codec", "ByteArrayResultSetCodec");
this.decodeTimer = meterRegistry.timer("decoding", "codec", "ByteArrayResultSetCodec");
this.encodeTimer = meterRegistry.timer("encoding", "codec", "byte");
this.decodeTimer = meterRegistry.timer("decoding", "codec", "byte");
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,22 +7,21 @@
import com.redis.micrometer.RedisTimeSeriesConfig;
import com.redis.micrometer.RedisTimeSeriesMeterRegistry;

import io.lettuce.core.AbstractRedisClient;
import io.micrometer.core.instrument.Clock;
import io.micrometer.core.instrument.MeterRegistry;

public class MeterRegistryManager {

private final Map<AbstractRedisClient, MeterRegistry> registries = new HashMap<>();
private final Map<String, MeterRegistry> registries = new HashMap<>();

public MeterRegistry getRegistry(AbstractRedisClient redisClient, Config config) {
if (!registries.containsKey(redisClient)) {
registries.put(redisClient, registry(redisClient, config));
public MeterRegistry getRegistry(Config config) {
if (!registries.containsKey(config.getRedis().getUri())) {
registries.put(config.getRedis().getUri(), registry(config));
}
return registries.get(redisClient);
return registries.get(config.getRedis().getUri());
}

private MeterRegistry registry(AbstractRedisClient redisClient, Config config) {
private MeterRegistry registry(Config config) {
return new RedisTimeSeriesMeterRegistry(new RedisTimeSeriesConfig() {

@Override
Expand Down Expand Up @@ -50,7 +49,7 @@ public Duration step() {
return Duration.ofSeconds(config.getMetricsStep());
}

}, Clock.SYSTEM, redisClient);
}, Clock.SYSTEM);
}

public void clear() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ public Connection connect(String url, Properties info) throws SQLException {
throw new SQLException("Invalid connection URL: " + url);
}
config.getRedis().setUri(matcher.group(1));
AbstractRedisClient redisClient = redisManager.getClient(config.getRedis());
MeterRegistry meterRegistry = meterRegistryManager.getRegistry(config);
AbstractRedisClient redisClient = redisManager.getClient(config.getRedis(), meterRegistry);
Connection backendConnection = backendConnection(config, info);
RowSetFactory rowSetFactory = RowSetProvider.newFactory();
try {
Expand All @@ -72,7 +73,6 @@ public Connection connect(String url, Properties info) throws SQLException {
} catch (JsonProcessingException e) {
throw new SQLException("Could not initialize config object", e);
}
MeterRegistry meterRegistry = meterRegistryManager.getRegistry(redisClient, config);
ByteArrayResultSetCodec codec = new ByteArrayResultSetCodec(RowSetProvider.newFactory(),
config.getRedis().getBufferSize(), meterRegistry);
GenericObjectPool<StatefulConnection<String, ResultSet>> pool = redisManager
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,27 +17,46 @@
import io.lettuce.core.RedisURI;
import io.lettuce.core.api.StatefulConnection;
import io.lettuce.core.codec.RedisCodec;
import io.lettuce.core.metrics.MicrometerCommandLatencyRecorder;
import io.lettuce.core.metrics.MicrometerOptions;
import io.lettuce.core.resource.ClientResources;
import io.lettuce.core.support.ConnectionPoolSupport;
import io.micrometer.core.instrument.MeterRegistry;

public class RedisManager {

private final Map<RedisURI, AbstractRedisClient> clients = new HashMap<>();
private final Map<RedisURI, GenericObjectPool<StatefulConnection<String, ResultSet>>> pools = new HashMap<>();

public AbstractRedisClient getClient(Redis redis) {
RedisURI uri = redis.redisURI();
if (!clients.containsKey(uri)) {
clients.put(uri,
redis.isCluster() ? RedisModulesClusterClient.create(uri) : RedisModulesClient.create(uri));
public AbstractRedisClient getClient(Redis redis, MeterRegistry meterRegistry) {
if (!clients.containsKey(redis.redisURI())) {
clients.put(redis.redisURI(), client(redis, meterRegistry));
}
return clients.get(redis.redisURI());
}

private AbstractRedisClient client(Redis redis, MeterRegistry meterRegistry) {
MicrometerOptions options = MicrometerOptions.create();
ClientResources resources = ClientResources.builder()
.commandLatencyRecorder(new MicrometerCommandLatencyRecorder(meterRegistry, options)).build();
if (redis.isCluster()) {
return RedisModulesClusterClient.create(resources, redis.redisURI());
}
return RedisModulesClient.create(resources, redis.redisURI());
}

public AbstractRedisClient client(Redis redis) {
if (redis.isCluster()) {
return RedisModulesClusterClient.create(redis.redisURI());
}
return clients.get(uri);
return RedisModulesClient.create(redis.redisURI());
}

public GenericObjectPool<StatefulConnection<String, ResultSet>> getConnectionPool(Redis redis,
RedisCodec<String, ResultSet> codec) {
RedisURI uri = redis.redisURI();
if (!pools.containsKey(uri)) {
AbstractRedisClient client = getClient(redis);
AbstractRedisClient client = clients.get(redis.redisURI());
boolean cluster = redis.isCluster();
GenericObjectPool<StatefulConnection<String, ResultSet>> pool = ConnectionPoolSupport
.createGenericObjectPool(() -> cluster ? ((RedisModulesClusterClient) client).connect(codec)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import java.sql.SQLException;
import java.sql.SQLWarning;
import java.sql.Statement;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.logging.Level;
Expand All @@ -18,6 +19,7 @@
import io.micrometer.core.instrument.Timer;
import net.sf.jsqlparser.JSQLParserException;
import net.sf.jsqlparser.parser.CCJSqlParserUtil;
import net.sf.jsqlparser.statement.select.Select;
import net.sf.jsqlparser.util.TablesNamesFinder;

public class SidecarStatement implements Statement {
Expand Down Expand Up @@ -117,15 +119,13 @@ private ResultSet doExecuteQuery(QueryExecutable executable) throws SQLException
if (cachedResultSet.isPresent()) {
return cachedResultSet.get();
}
ResultSet backendResultSet = executable.execute();
if (isCachingEnabled()) {
return cache(backendResultSet);
}
return backendResultSet;

return cache(executable.execute());
}

private CachedRowSet cache(ResultSet resultSet) throws SQLException {
private ResultSet cache(ResultSet resultSet) throws SQLException {
if (resultSet == null || !isCachingEnabled()) {
return resultSet;
}
CachedRowSet rowSet = connection.createCachedRowSet();
rowSet.populate(resultSet);
connection.getCache().put(key(), ttl, rowSet);
Expand All @@ -143,15 +143,7 @@ protected String key() {

private Optional<ResultSet> getCachedResultSet() {
String key = key();
List<String> tables;
try {
net.sf.jsqlparser.statement.Statement parsedStatement = CCJSqlParserUtil.parse(sql);
TablesNamesFinder tablesNamesFinder = new TablesNamesFinder();
tables = tablesNamesFinder.getTableList(parsedStatement);
} catch (JSQLParserException e) {
log.log(Level.FINE, String.format("Could not parse SQL: %s", sql), e);
return Optional.empty();
}
List<String> tables = tables(sql);
if (tables.isEmpty()) {
return Optional.empty();
}
Expand All @@ -167,6 +159,19 @@ private Optional<ResultSet> getCachedResultSet() {
return Optional.empty();
}

private List<String> tables(String sql) {
try {
net.sf.jsqlparser.statement.Statement parsedStatement = CCJSqlParserUtil.parse(sql);
if (parsedStatement instanceof Select) {
TablesNamesFinder tablesNamesFinder = new TablesNamesFinder();
return tablesNamesFinder.getTableList(parsedStatement);
}
} catch (JSQLParserException e) {
log.log(Level.FINE, String.format("Could not parse SQL: %s", sql), e);
}
return Collections.emptyList();
}

@Override
public int executeUpdate(String sql) throws SQLException {
return statement.executeUpdate(sql);
Expand Down Expand Up @@ -242,11 +247,7 @@ public ResultSet getResultSet() throws SQLException {
if (resultSet.isPresent()) {
return resultSet.get();
}
ResultSet backendResultSet = statement.getResultSet();
if (isCachingEnabled()) {
return cache(backendResultSet);
}
return backendResultSet;
return cache(statement.getResultSet());
}

@Override
Expand Down

0 comments on commit 88c60ad

Please sign in to comment.