Skip to content

Commit

Permalink
feat(*): improve performance
Browse files Browse the repository at this point in the history
  1. add second-cache for revoking db
  2. optimize pack trx
  • Loading branch information
halibobo1205 committed Oct 29, 2022
1 parent 2d88f6e commit e26b1af
Show file tree
Hide file tree
Showing 13 changed files with 365 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,13 @@ public void addTransaction(TransactionCapsule pendingTrx) {
getTransactions().add(pendingTrx);
}

public void addAllTransactions(List<TransactionCapsule> pendingTrxs) {
List<Transaction> list = pendingTrxs.stream().map(TransactionCapsule::getInstance).collect(
Collectors.toList());
this.block = this.block.toBuilder().addAllTransactions(list).build();
getTransactions().addAll(pendingTrxs);
}

public List<TransactionCapsule> getTransactions() {
return transactions;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package org.tron.core.capsule;

import com.google.protobuf.InvalidProtocolBufferException;
import java.util.List;
import java.util.Objects;
import lombok.extern.slf4j.Slf4j;
import org.tron.core.exception.BadItemException;
Expand Down Expand Up @@ -29,7 +30,7 @@ public TransactionRetCapsule() {

public TransactionRetCapsule(byte[] data) throws BadItemException {
try {
this.transactionRet = transactionRet.parseFrom(data);
this.transactionRet = TransactionRet.parseFrom(data);
} catch (InvalidProtocolBufferException e) {
throw new BadItemException("TransactionInfoCapsule proto data parse exception");
}
Expand All @@ -39,6 +40,10 @@ public void addTransactionInfo(TransactionInfo result) {
this.transactionRet = this.transactionRet.toBuilder().addTransactioninfo(result).build();
}

public void addAllTransactionInfos(List<TransactionInfo> results) {
this.transactionRet = this.transactionRet.toBuilder().addAllTransactioninfo(results).build();
}

@Override
public byte[] getData() {
if (Objects.isNull(transactionRet)) {
Expand Down
2 changes: 1 addition & 1 deletion chainbase/src/main/java/org/tron/core/db2/common/DB.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ public interface DB<K, V> extends Iterable<Map.Entry<K, V>>, Instance<DB<K, V>>

void remove(K k);

Iterator iterator();
Iterator<Map.Entry<K, V>> iterator();

void close();

Expand Down
73 changes: 68 additions & 5 deletions chainbase/src/main/java/org/tron/core/db2/core/SnapshotRoot.java
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
package org.tron.core.db2.core;

import ch.qos.logback.core.encoder.ByteArrayUtil;
import com.google.common.collect.Maps;
import com.google.common.collect.Streams;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
import lombok.Getter;
import org.tron.common.cache.CacheManager;
import org.tron.common.cache.TronCache;
import org.tron.common.parameter.CommonParameter;
import org.tron.common.utils.ByteArray;
import org.tron.core.ChainBaseManager;
import org.tron.core.capsule.AccountCapsule;
Expand All @@ -23,11 +26,18 @@ public class SnapshotRoot extends AbstractSnapshot<byte[], byte[]> {
private Snapshot solidity;
private boolean isAccountDB;

private TronCache<WrappedByteArray, WrappedByteArray> cache;
private static final List<String> CACHE_DBS = CommonParameter.getInstance()
.getStorage().getCacheDbs();

public SnapshotRoot(DB<byte[], byte[]> db) {
this.db = db;
solidity = this;
isOptimized = "properties".equalsIgnoreCase(db.getDbName());
isAccountDB = "account".equalsIgnoreCase(db.getDbName());
if (CACHE_DBS.contains(this.db.getDbName())) {
this.cache = CacheManager.allocate(this.db.getDbName());
}
}

private boolean needOptAsset() {
Expand All @@ -37,11 +47,18 @@ private boolean needOptAsset() {

@Override
public byte[] get(byte[] key) {
return db.get(key);
WrappedByteArray cache = getCache(key);
if (cache != null) {
return cache.getBytes();
}
byte[] value = db.get(key);
putCache(key, value);
return value;
}

@Override
public void put(byte[] key, byte[] value) {
byte[] v = value;
if (needOptAsset()) {
if (ByteArray.isEmpty(value)) {
remove(key);
Expand All @@ -56,10 +73,10 @@ public void put(byte[] key, byte[] value) {
}
assetStore.putAccount(item.getInstance());
item.clearAsset();
db.put(key, item.getData());
} else {
db.put(key, value);
v = item.getData();
}
db.put(key, v);
putCache(key, v);
}

@Override
Expand All @@ -68,6 +85,7 @@ public void remove(byte[] key) {
ChainBaseManager.getInstance().getAccountAssetStore().deleteAccount(key);
}
db.remove(key);
putCache(key, null);
}

@Override
Expand All @@ -81,6 +99,7 @@ public void merge(Snapshot from) {
processAccount(batch);
} else {
((Flusher) db).flush(batch);
putCache(batch);
}
}

Expand All @@ -97,6 +116,7 @@ public void merge(List<Snapshot> snapshots) {
processAccount(batch);
} else {
((Flusher) db).flush(batch);
putCache(batch);
}
}

Expand All @@ -120,11 +140,52 @@ private void processAccount(Map<WrappedByteArray, WrappedByteArray> batch) {
}
});
((Flusher) db).flush(accounts);
putCache(accounts);
if (assets.size() > 0) {
assetStore.updateByBatch(AccountAssetStore.convert(assets));
}
}

private boolean cached() {
return Objects.nonNull(this.cache);
}

private void putCache(byte[] key, byte[] value) {
if (cached()) {
if (key == null) {
return;
}
cache.put(WrappedByteArray.of(key), WrappedByteArray.of(value));
}
}

private void putCache(WrappedByteArray key, WrappedByteArray value) {
if (cached()) {
if (key == null || value == null) {
return;
}
cache.put(key, value);
}
}

private void putCache(Map<WrappedByteArray, WrappedByteArray> values) {
if (cached()) {
if (values == null || values.isEmpty()) {
return;
}
values.forEach(this::putCache);
}
}

private WrappedByteArray getCache(byte[] key) {
if (cached()) {
return cache.getIfPresent(WrappedByteArray.of(key));
}
return null;
}

// second cache

@Override
public Snapshot retreat() {
return this;
Expand All @@ -142,11 +203,13 @@ public Iterator<Map.Entry<byte[], byte[]>> iterator() {

@Override
public void close() {
CacheManager.release(this.getDbName());
((Flusher) db).close();
}

@Override
public void reset() {
CacheManager.release(this.getDbName());
((Flusher) db).reset();
}

Expand Down
45 changes: 45 additions & 0 deletions common/src/main/java/org/tron/common/cache/CacheManager.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package org.tron.common.cache;

import com.google.common.cache.CacheLoader;
import com.google.common.cache.CacheStats;
import com.google.common.collect.Maps;
import java.util.Map;
import java.util.stream.Collectors;
import org.tron.common.parameter.CommonParameter;

public class CacheManager {

private static final Map<String, TronCache<?, ?>> CACHES = Maps.newConcurrentMap();

public static <K, V> TronCache<K, V> allocate(String name) {
TronCache<K, V> cache = new TronCache<>(name, CommonParameter.getInstance()
.getStorage().getCacheStrategy(name));
CACHES.put(name, cache);
return cache;
}

public static <K, V> TronCache<K, V> allocate(String name, String strategy) {
TronCache<K, V> cache = new TronCache<>(name, strategy);
CACHES.put(name, cache);
return cache;
}

public static <K, V> TronCache<K, V> allocate(String name, String strategy,
CacheLoader<K, V> loader) {
TronCache<K, V> cache = new TronCache<>(name, strategy, loader);
CACHES.put(name, cache);
return cache;
}

public static void release(String name) {
TronCache cache = CACHES.remove(name);
if (cache != null) {
cache.invalidateAll();
}
}

public static Map<String, CacheStats> stats() {
return CACHES.values().stream().collect(Collectors.toMap(TronCache::getName, TronCache::stats));
}

}
66 changes: 66 additions & 0 deletions common/src/main/java/org/tron/common/cache/TronCache.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package org.tron.common.cache;

import com.google.common.base.Objects;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.CacheStats;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import lombok.Getter;

public class TronCache<K, V> {

private static final int CPUS = Runtime.getRuntime().availableProcessors();

@Getter
private final String name;
private final Cache<K, V> cache;

TronCache(String name, String strategy) {
this.name = name;
this.cache = CacheBuilder.from(strategy).concurrencyLevel(CPUS).recordStats().build();
}

TronCache(String name, String strategy, CacheLoader<K, V> loader) {
this.name = name;
this.cache = CacheBuilder.from(strategy).concurrencyLevel(CPUS).recordStats().build(loader);
}

public void put(K k, V v) {
this.cache.put(k, v);
}

public V getIfPresent(K k) {
return this.cache.getIfPresent(k);
}

public V get(K k, Callable<? extends V> loader) throws ExecutionException {
return this.cache.get(k, loader);
}

public CacheStats stats() {
return this.cache.stats();
}

public void invalidateAll() {
this.cache.invalidateAll();
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
TronCache<?, ?> tronCache = (TronCache<?, ?>) o;
return Objects.equal(name, tronCache.name);
}

@Override
public int hashCode() {
return Objects.hashCode(name);
}
}
Loading

0 comments on commit e26b1af

Please sign in to comment.