Skip to content

Commit

Permalink
feat(db): improve performance for db query
Browse files Browse the repository at this point in the history
  1. add cache for db
  • Loading branch information
halibobo1205 committed Nov 14, 2022
1 parent 2d88f6e commit 937f445
Show file tree
Hide file tree
Showing 19 changed files with 461 additions and 74 deletions.
2 changes: 1 addition & 1 deletion chainbase/src/main/java/org/tron/core/db2/common/DB.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ public interface DB<K, V> extends Iterable<Map.Entry<K, V>>, Instance<DB<K, V>>

void remove(K k);

Iterator iterator();
Iterator<Map.Entry<K, V>> iterator();

void close();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@ public abstract class AbstractSnapshot<K, V> implements Snapshot {

protected WeakReference<Snapshot> next;

protected boolean isOptimized;

@Override
public Snapshot advance() {
return new SnapshotImpl(this);
Expand All @@ -36,9 +34,4 @@ public void setNext(Snapshot next) {
public String getDbName() {
return db.getDbName();
}

@Override
public boolean isOptimized(){
return isOptimized;
}
}
2 changes: 0 additions & 2 deletions chainbase/src/main/java/org/tron/core/db2/core/Snapshot.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,4 @@ static boolean isImpl(Snapshot snapshot) {
void updateSolidity();

String getDbName();

boolean isOptimized();
}
14 changes: 0 additions & 14 deletions chainbase/src/main/java/org/tron/core/db2/core/SnapshotImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,6 @@ public class SnapshotImpl extends AbstractSnapshot<Key, Value> {
}
previous = snapshot;
snapshot.setNext(this);
// inherit
isOptimized = snapshot.isOptimized();
// merge for DynamicPropertiesStore,about 100 keys
if (isOptimized) {
if (root == previous ){
Streams.stream(root.iterator()).forEach( e -> put(e.getKey(),e.getValue()));
}else {
merge(previous);
}
}
}

@Override
Expand All @@ -50,10 +40,6 @@ public byte[] get(byte[] key) {
private byte[] get(Snapshot head, byte[] key) {
Snapshot snapshot = head;
Value value;
if (isOptimized) {
value = db.get(Key.of(key));
return value == null ? null: value.getBytes();
}
while (Snapshot.isImpl(snapshot)) {
if ((value = ((SnapshotImpl) snapshot).db.get(Key.of(key))) != null) {
return value.getBytes();
Expand Down
62 changes: 56 additions & 6 deletions chainbase/src/main/java/org/tron/core/db2/core/SnapshotRoot.java
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
package org.tron.core.db2.core;

import ch.qos.logback.core.encoder.ByteArrayUtil;
import com.google.common.collect.Maps;
import com.google.common.collect.Streams;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
import lombok.Getter;
import org.tron.common.cache.CacheManager;
import org.tron.common.cache.CacheType;
import org.tron.common.cache.TronCache;
import org.tron.common.parameter.CommonParameter;
import org.tron.common.utils.ByteArray;
import org.tron.core.ChainBaseManager;
import org.tron.core.capsule.AccountCapsule;
Expand All @@ -23,11 +27,17 @@ public class SnapshotRoot extends AbstractSnapshot<byte[], byte[]> {
private Snapshot solidity;
private boolean isAccountDB;

private TronCache<WrappedByteArray, WrappedByteArray> cache;
private static final List<String> CACHE_DBS = CommonParameter.getInstance()
.getStorage().getCacheDbs();

public SnapshotRoot(DB<byte[], byte[]> db) {
this.db = db;
solidity = this;
isOptimized = "properties".equalsIgnoreCase(db.getDbName());
isAccountDB = "account".equalsIgnoreCase(db.getDbName());
if (CACHE_DBS.contains(this.db.getDbName())) {
this.cache = CacheManager.allocate(CacheType.findByType(this.db.getDbName()));
}
}

private boolean needOptAsset() {
Expand All @@ -37,11 +47,18 @@ private boolean needOptAsset() {

@Override
public byte[] get(byte[] key) {
return db.get(key);
WrappedByteArray cache = getCache(key);
if (cache != null) {
return cache.getBytes();
}
byte[] value = db.get(key);
putCache(key, value);
return value;
}

@Override
public void put(byte[] key, byte[] value) {
byte[] v = value;
if (needOptAsset()) {
if (ByteArray.isEmpty(value)) {
remove(key);
Expand All @@ -56,10 +73,10 @@ public void put(byte[] key, byte[] value) {
}
assetStore.putAccount(item.getInstance());
item.clearAsset();
db.put(key, item.getData());
} else {
db.put(key, value);
v = item.getData();
}
db.put(key, v);
putCache(key, v);
}

@Override
Expand All @@ -68,6 +85,7 @@ public void remove(byte[] key) {
ChainBaseManager.getInstance().getAccountAssetStore().deleteAccount(key);
}
db.remove(key);
putCache(key, null);
}

@Override
Expand All @@ -81,6 +99,7 @@ public void merge(Snapshot from) {
processAccount(batch);
} else {
((Flusher) db).flush(batch);
putCache(batch);
}
}

Expand All @@ -97,6 +116,7 @@ public void merge(List<Snapshot> snapshots) {
processAccount(batch);
} else {
((Flusher) db).flush(batch);
putCache(batch);
}
}

Expand All @@ -120,11 +140,37 @@ private void processAccount(Map<WrappedByteArray, WrappedByteArray> batch) {
}
});
((Flusher) db).flush(accounts);
putCache(accounts);
if (assets.size() > 0) {
assetStore.updateByBatch(AccountAssetStore.convert(assets));
}
}

private boolean cached() {
return Objects.nonNull(this.cache);
}

private void putCache(byte[] key, byte[] value) {
if (cached()) {
cache.put(WrappedByteArray.of(key), WrappedByteArray.of(value));
}
}

private void putCache(Map<WrappedByteArray, WrappedByteArray> values) {
if (cached()) {
values.forEach(cache::put);
}
}

private WrappedByteArray getCache(byte[] key) {
if (cached()) {
return cache.getIfPresent(WrappedByteArray.of(key));
}
return null;
}

// second cache

@Override
public Snapshot retreat() {
return this;
Expand All @@ -142,11 +188,15 @@ public Iterator<Map.Entry<byte[], byte[]>> iterator() {

@Override
public void close() {
if (cached()) {
CacheManager.release(cache);
}
((Flusher) db).close();
}

@Override
public void reset() {
CacheManager.release(cache);
((Flusher) db).reset();
}

Expand Down
38 changes: 9 additions & 29 deletions chainbase/src/main/java/org/tron/core/service/MortgageService.java
Original file line number Diff line number Diff line change
@@ -1,14 +1,9 @@
package org.tron.core.service;

import com.google.protobuf.ByteString;

import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
Expand All @@ -18,7 +13,6 @@
import org.tron.common.utils.StringUtil;
import org.tron.core.capsule.AccountCapsule;
import org.tron.core.capsule.WitnessCapsule;
import org.tron.core.config.Parameter.ChainConstant;
import org.tron.core.exception.BalanceInsufficientException;
import org.tron.core.store.AccountStore;
import org.tron.core.store.DelegationStore;
Expand Down Expand Up @@ -52,31 +46,17 @@ public void initStore(WitnessStore witnessStore, DelegationStore delegationStore
}

public void payStandbyWitness() {
List<WitnessCapsule> witnessCapsules = witnessStore.getAllWitnesses();
Map<ByteString, WitnessCapsule> witnessCapsuleMap = new HashMap<>();
List<ByteString> witnessAddressList = new ArrayList<>();
for (WitnessCapsule witnessCapsule : witnessCapsules) {
witnessAddressList.add(witnessCapsule.getAddress());
witnessCapsuleMap.put(witnessCapsule.getAddress(), witnessCapsule);
}
witnessAddressList.sort(Comparator.comparingLong((ByteString b) -> witnessCapsuleMap.get(b).getVoteCount())
.reversed().thenComparing(Comparator.comparingInt(ByteString::hashCode).reversed()));
if (witnessAddressList.size() > ChainConstant.WITNESS_STANDBY_LENGTH) {
witnessAddressList = witnessAddressList.subList(0, ChainConstant.WITNESS_STANDBY_LENGTH);
List<WitnessCapsule> witnessStandbys = witnessStore.getWitnessStandby();
long voteSum = witnessStandbys.stream().mapToLong(WitnessCapsule::getVoteCount).sum();
if (voteSum < 1) {
return;
}
long voteSum = 0;
long totalPay = dynamicPropertiesStore.getWitness127PayPerBlock();
for (ByteString b : witnessAddressList) {
voteSum += witnessCapsuleMap.get(b).getVoteCount();
}

if (voteSum > 0) {
for (ByteString b : witnessAddressList) {
double eachVotePay = (double) totalPay / voteSum;
long pay = (long) (witnessCapsuleMap.get(b).getVoteCount() * eachVotePay);
logger.debug("pay {} stand reward {}", Hex.toHexString(b.toByteArray()), pay);
payReward(b.toByteArray(), pay);
}
double eachVotePay = (double) totalPay / voteSum;
for (WitnessCapsule w : witnessStandbys) {
long pay = (long) (w.getVoteCount() * eachVotePay);
payReward(w.getAddress().toByteArray(), pay);
logger.debug("pay {} stand reward {}", Hex.toHexString(w.getAddress().toByteArray()), pay);
}
}

Expand Down
40 changes: 40 additions & 0 deletions chainbase/src/main/java/org/tron/core/store/WitnessStore.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package org.tron.core.store;

import com.google.common.collect.Streams;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Map.Entry;
import java.util.stream.Collectors;
Expand All @@ -9,16 +11,25 @@
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import org.tron.common.cache.CacheManager;
import org.tron.common.cache.CacheStrategies;
import org.tron.common.cache.CacheType;
import org.tron.common.cache.TronCache;
import org.tron.core.capsule.WitnessCapsule;
import org.tron.core.config.Parameter;
import org.tron.core.db.TronStoreWithRevoking;

@Slf4j(topic = "DB")
@Component
public class WitnessStore extends TronStoreWithRevoking<WitnessCapsule> {

private final TronCache<Integer, List<WitnessCapsule>> witnessStandby;

@Autowired
protected WitnessStore(@Value("witness") String dbName) {
super(dbName);
String strategy = String.format(CacheStrategies.PATTERNS, 1, 1, "30s", 1);
witnessStandby = CacheManager.allocate(CacheType.witnessStandby, strategy);
}

/**
Expand All @@ -35,4 +46,33 @@ public WitnessCapsule get(byte[] key) {
byte[] value = revokingDB.getUnchecked(key);
return ArrayUtils.isEmpty(value) ? null : new WitnessCapsule(value);
}

public List<WitnessCapsule> getWitnessStandby() {
List<WitnessCapsule> list =
witnessStandby.getIfPresent(Parameter.ChainConstant.WITNESS_STANDBY_LENGTH);
if (list != null) {
return list;
}
return updateWitnessStandby(null);
}

public List<WitnessCapsule> updateWitnessStandby(List<WitnessCapsule> all) {
List<WitnessCapsule> ret;
if (all == null) {
all = getAllWitnesses();
}
all.sort(Comparator.comparingLong(WitnessCapsule::getVoteCount)
.reversed().thenComparing(Comparator.comparingInt(
(WitnessCapsule w) -> w.getAddress().hashCode()).reversed()));
if (all.size() > Parameter.ChainConstant.WITNESS_STANDBY_LENGTH) {
ret = new ArrayList<>(all.subList(0, Parameter.ChainConstant.WITNESS_STANDBY_LENGTH));
} else {
ret = new ArrayList<>(all);
}
// trim voteCount = 0
ret.removeIf(w -> w.getVoteCount() < 1);
witnessStandby.put(Parameter.ChainConstant.WITNESS_STANDBY_LENGTH, ret);
return ret;
}

}
44 changes: 44 additions & 0 deletions common/src/main/java/org/tron/common/cache/CacheManager.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package org.tron.common.cache;

import com.google.common.cache.CacheLoader;
import com.google.common.cache.CacheStats;
import com.google.common.collect.Maps;
import java.util.Map;
import java.util.stream.Collectors;
import org.tron.common.parameter.CommonParameter;

public class CacheManager {

private static final Map<CacheType, TronCache<?, ?>> CACHES = Maps.newConcurrentMap();

public static <K, V> TronCache<K, V> allocate(CacheType name) {
TronCache<K, V> cache = new TronCache<>(name, CommonParameter.getInstance()
.getStorage().getCacheStrategy(name));
CACHES.put(name, cache);
return cache;
}

public static <K, V> TronCache<K, V> allocate(CacheType name, String strategy) {
TronCache<K, V> cache = new TronCache<>(name, strategy);
CACHES.put(name, cache);
return cache;
}

public static <K, V> TronCache<K, V> allocate(CacheType name, String strategy,
CacheLoader<K, V> loader) {
TronCache<K, V> cache = new TronCache<>(name, strategy, loader);
CACHES.put(name, cache);
return cache;
}


public static void release(TronCache cache) {
cache.invalidateAll();
}

public static Map<String, CacheStats> stats() {
return CACHES.values().stream().collect(Collectors.toMap(c -> c.getName().toString(),
TronCache::stats));
}

}
Loading

0 comments on commit 937f445

Please sign in to comment.