Skip to content

Commit

Permalink
feat: incremental AT Processing Cache works
Browse files Browse the repository at this point in the history
  • Loading branch information
ohager committed Apr 30, 2024
1 parent bc6c4d6 commit 0ba6751
Show file tree
Hide file tree
Showing 8 changed files with 645 additions and 507 deletions.
6 changes: 6 additions & 0 deletions conf/node-default.properties
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,12 @@

# node.checkPointHeight = -1

## Number of past blocks for AT processor to load into memory/cache
## Put -1, if you want to disable the cache, which may slow down AT/smart contract processing significantly
## Do not put too high values as this may cause significant memory occupation and cause even negative impact on processing times.
# node.atProcessorCacheBlockCount = 1000


#### API SERVER ####

## Accept http/json API requests.
Expand Down
8 changes: 4 additions & 4 deletions src/brs/BlockchainProcessorImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,7 @@
import static brs.Constants.FEE_QUANT_SIP3;
import static brs.Constants.ONE_SIGNA;

import brs.at.AT;
import brs.at.AtBlock;
import brs.at.AtController;
import brs.at.AtException;
import brs.at.*;
import brs.crypto.Crypto;
import brs.db.BlockDb;
import brs.db.DerivedTable;
Expand Down Expand Up @@ -124,6 +121,7 @@ public final class BlockchainProcessorImpl implements BlockchainProcessor {
private final boolean autoPopOffEnabled;
private int autoPopOffLastStuckHeight = 0;
private int autoPopOffNumberOfBlocks = 0;
private ATProcessorCache atProcessorCache = ATProcessorCache.getInstance();

public final void setOclVerify(Boolean b) {
oclVerify = b;
Expand Down Expand Up @@ -1170,6 +1168,7 @@ private void pushBlock(final Block block) throws BlockNotAcceptedException {
stores.rollbackTransaction();
blockchain.setLastBlock(previousLastBlock);
downloadCache.resetCache();
atProcessorCache.reset();
throw e;
} finally {
stores.endTransaction();
Expand Down Expand Up @@ -1301,6 +1300,7 @@ private List<Block> popOffTo(Block commonBlock, List<Block> forkBlocks) {
dbCacheManager.flushCache();
stores.commitTransaction();
downloadCache.resetCache();
atProcessorCache.reset();;
} catch (RuntimeException e) {
stores.rollbackTransaction();
logger.debug("Error popping off to {}", commonBlock.getHeight(), e);
Expand Down
176 changes: 137 additions & 39 deletions src/brs/at/ATProcessorCache.java
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
package brs.at;

import brs.Signum;
import brs.SignumException;
import brs.Transaction;
import brs.db.TransactionDb;
import brs.db.sql.Db;
import brs.props.PropertyService;
import brs.props.Props;
import brs.schema.tables.records.TransactionRecord;
import org.jooq.Result;
import org.slf4j.Logger;
Expand All @@ -16,6 +19,10 @@

import static brs.schema.Tables.TRANSACTION;

/**
* This class is used to cache the transactions of past x (Props.BRS_AT_PROCESSOR_CACHE_BLOCK_COUNT) blocks
* to reduce database access as much as possible while AT processing.
*/
public final class ATProcessorCache {

public static class CacheMissException extends Exception {
Expand All @@ -27,65 +34,84 @@ public static class CacheMissException extends Exception {
private static final int CostOfOneAT = AtConstants.AT_ID_SIZE + 16;
private final LinkedHashMap<Long, ATContext> atMap = new LinkedHashMap<>();
private int currentBlockHeight = Integer.MIN_VALUE;
private int lowestBlockHeight = Integer.MAX_VALUE;
private final ArrayList<Long> currentBlockAtIds = new ArrayList<>();
private int startBlockHeight = Integer.MAX_VALUE;
private long minimumActivationAmount = Long.MAX_VALUE;
private final int numberOfBlocksToCache;
private int lastLoadedBlockHeight = 0;

public static class ATContext {
public byte[] md5;
public AT at;
public ArrayList<Transaction> transactions = new ArrayList<>();
public LinkedList<Transaction> transactions = new LinkedList<>();
}

private ATProcessorCache() {

private ATProcessorCache(PropertyService propertyService) {
this.numberOfBlocksToCache = propertyService.getInt(Props.BRS_AT_PROCESSOR_CACHE_BLOCK_COUNT);
}

public int getLowestBlockHeight() {
return lowestBlockHeight;
public boolean isEnabled() {
return numberOfBlocksToCache > 0;
}

public HashMap<Long, ATContext> getAtMap() {
return this.atMap;
}


public static ATProcessorCache getInstance() {
if (instance == null) {
instance = new ATProcessorCache();
instance = new ATProcessorCache(Signum.getPropertyService());
}
return instance;
}

public void reset() {
logger.debug("Resetting AT Processor Cache");
atMap.clear();
currentBlockHeight = Integer.MIN_VALUE;
lowestBlockHeight = Integer.MAX_VALUE;
startBlockHeight = Integer.MAX_VALUE;
minimumActivationAmount = Long.MAX_VALUE;
lastLoadedBlockHeight = 0;
}

public ArrayList<Long> getCurrentBlockAtIds() {
return this.currentBlockAtIds;
}

public LinkedHashMap<Long, ATContext> load(byte[] ats, int blockHeight) throws AtException {
public ATContext getATContext(Long atId) {
return this.atMap.get(atId);
}

this.reset();
public void loadBlock(byte[] ats, int blockHeight) throws AtException {
this.currentBlockHeight = blockHeight;
this.lowestBlockHeight = blockHeight - 1000;
this.startBlockHeight = blockHeight - this.numberOfBlocksToCache;
this.currentBlockAtIds.clear();
if (ats == null || ats.length == 0) {
return this.atMap;
return;
}
long startTime = System.nanoTime();
parseATBytes(ats);
// if this is impacts on db access, we might add a multi AT fetch -> getATs
this.atMap.forEach((atId, proxy) -> {
AT at = AT.getAT(atId);
logger.debug("Cached AT {}", atId);
loadAtBytesIntoAtMap(ats);
loadATsforBlock(blockHeight);
if (isEnabled()) {
loadTransactions();
}
long executionTime = (System.nanoTime() - startTime) / 1000000;
logger.debug("Cache Duration: {} milliseconds", executionTime);
}

private void loadATsforBlock(int blockHeight) {
logger.debug("Loading {} ATs for block height {}", getCurrentBlockAtIds().size(), blockHeight);
Signum.getStores().getAtStore().getATs(getCurrentBlockAtIds()).forEach(at -> {
Long atId = AtApiHelper.getLong(at.getId());
this.minimumActivationAmount = Math.min(this.minimumActivationAmount, at.minActivationAmount());
proxy.at = at;
ATContext atContext = atMap.get(atId);
atContext.at = at;
logger.debug("Cached AT {}", atId);
});
loadRelevantTransactions();
long executionTime = (System.nanoTime() - startTime) / 1000000;
logger.debug("Cache Duration for {} ATs: {} milliseconds", atMap.size(), executionTime);
return this.atMap;
}

private void parseATBytes(byte[] ats) throws AtException {
private void loadAtBytesIntoAtMap(byte[] ats) throws AtException {
if (ats.length % (CostOfOneAT) != 0) {
throw new AtException("ATs must be a multiple of cost of one AT ( " + CostOfOneAT + " )");
}
Expand All @@ -100,22 +126,33 @@ private void parseATBytes(byte[] ats) throws AtException {
b.get(atId);
b.get(md5);
long atIdLong = AtApiHelper.getLong(atId);
if (atMap.containsKey(atIdLong)) {
throw new AtException("AT included in block multiple times");
ATContext existingAtContext = atMap.get(atIdLong);
if (existingAtContext == null) {
ATContext atContext = new ATContext();
atContext.md5 = md5.clone();
atMap.put(atIdLong, atContext);
} else {
existingAtContext.md5 = md5.clone();
}
ATContext atContext = new ATContext();
atContext.md5 = md5.clone();
atMap.put(atIdLong, atContext);
this.currentBlockAtIds.add(atIdLong);
}
}

private void loadTransactions() {
if (lastLoadedBlockHeight == 0) {
loadTransactionsFromHeightUntilCurrentBlock(startBlockHeight, false);
} else {
loadTransactionsFromHeightUntilCurrentBlock(lastLoadedBlockHeight, true);
}
lastLoadedBlockHeight = currentBlockHeight;
}

// TODO: make this smarter, instead of loading them all
private void loadRelevantTransactions() {
logger.debug("Loading tx for lo: {}, hi: {}, amount: {}, no ATs: {}", lowestBlockHeight, currentBlockHeight, minimumActivationAmount, getAtMap().size());
private void loadTransactionsPerATs() {
logger.debug("Loading tx for lo: {}, hi: {}, amount: {}, no ATs: {}", startBlockHeight, currentBlockHeight, minimumActivationAmount, getAtMap().size());
Result<TransactionRecord> result = Db.useDSLContext(ctx -> {
return ctx.selectFrom(TRANSACTION)
.where(TRANSACTION.HEIGHT.between(lowestBlockHeight, currentBlockHeight))
.and(TRANSACTION.RECIPIENT_ID.in(getAtMap().keySet()))
.where(TRANSACTION.HEIGHT.between(startBlockHeight, currentBlockHeight))
.and(TRANSACTION.RECIPIENT_ID.in(getCurrentBlockAtIds()))
.and(TRANSACTION.AMOUNT.greaterOrEqual(minimumActivationAmount))
.orderBy(TRANSACTION.HEIGHT, TRANSACTION.ID)
.fetch();
Expand All @@ -127,15 +164,74 @@ private void loadRelevantTransactions() {
try {
ATContext context = this.atMap.get(r.getRecipientId());
if (context != null) {
Transaction tx = db.loadTransaction(r);
context.transactions.add(tx);
context.transactions.add(db.loadTransaction(r));
}
} catch (SignumException.ValidationException e) {
throw new RuntimeException(e);
}
}
}

private void loadTransactionsFromHeightUntilCurrentBlock(int startHeight, boolean shallRemoveOldest) {
logger.debug("Loading all tx for heights from {} to {}", startHeight, currentBlockHeight - 1);
Result<TransactionRecord> result = Db.useDSLContext(ctx -> {
return ctx.selectFrom(TRANSACTION)
.where(TRANSACTION.HEIGHT.between(startHeight, currentBlockHeight - 1))
.and(TRANSACTION.RECIPIENT_ID.isNotNull())
.orderBy(TRANSACTION.HEIGHT, TRANSACTION.ID)
.fetch();
});

HashSet<Long> processedRecipients = new HashSet<>();
TransactionDb db = Db.getDbsByDatabaseType().getTransactionDb();
for (TransactionRecord r : result) {
Long recipientId = r.getRecipientId();
try {
ATContext context = this.atMap.get(recipientId);
if (context != null) {
//defensive: avoid double adding - maybe not the most elegant way...
if (context.transactions.stream().anyMatch(t -> t.getId() == r.getId())) {
logger.debug("Doubled Tx ({}) found for Recipient {}", r.getId(), recipientId);
} else {
context.transactions.add(db.loadTransaction(r));
}
} else {
ATContext newContext = new ATContext();
newContext.transactions.add(db.loadTransaction(r));
this.atMap.put(recipientId, newContext);
}
processedRecipients.add(recipientId);
} catch (SignumException.ValidationException e) {
throw new RuntimeException(e);
}
}

if (shallRemoveOldest) {
processedRecipients.forEach(this::pruneTransactionList);
}

}

private void pruneTransactionList(long recipientId) {
ATContext context = this.atMap.get(recipientId);
if (context == null || context.transactions.isEmpty()) {
return;
}

int minimumHeightToKeep = currentBlockHeight - numberOfBlocksToCache - 1;
Transaction oldest = context.transactions.peekFirst();
int count = 0;
while(!context.transactions.isEmpty() && oldest.getHeight() < minimumHeightToKeep){
context.transactions.removeFirst();
logger.debug("Removed tx {}", oldest.getId());
oldest = context.transactions.peekFirst();
++count;
}
if(count > 0){
logger.debug("Removed {} old transactions lower than height {} for recipient {}", count, minimumHeightToKeep, recipientId);
}
}

public Long findTransactionId(int startHeight, int endHeight, Long atID, int numOfTx, long minAmount) throws CacheMissException {
long startTime = System.nanoTime();

Expand All @@ -150,8 +246,8 @@ public Long findTransactionId(int startHeight, int endHeight, Long atID, int num
startHeight = atContext.at.getCreationBlockHeight();
}

if (startHeight < lowestBlockHeight || endHeight > currentBlockHeight) {
logger.debug("Out of range (start: {}, end: {} - wanted block: {})", lowestBlockHeight, currentBlockHeight, startHeight);
if (startHeight < startBlockHeight || endHeight > currentBlockHeight) {
logger.debug("Out of range (start: {}, end: {} - wanted block: {})", startBlockHeight, currentBlockHeight, startHeight);
throw new CacheMissException();
}

Expand All @@ -161,7 +257,8 @@ public Long findTransactionId(int startHeight, int endHeight, Long atID, int num
.filter(t ->
t.getHeight() >= finalStartHeight &&
t.getHeight() < endHeight &&
t.getAmountNqt() >= minAmount)
t.getAmountNqt() >= minAmount
)
.collect(Collectors.toList());

if (collected.size() > numOfTx) {
Expand All @@ -182,8 +279,9 @@ public int findTransactionHeight(Long transactionId, int height, Long atID, long
throw new CacheMissException();
}

// TODO: there must be another way how this works. This is very fragile
int count = 0;
ArrayList<Transaction> transactions = atContext.transactions;
Collection<Transaction> transactions = atContext.transactions;
for (Transaction t : transactions) {
if (t.getHeight() == height && t.getAmountNqt() >= minAmount) {
++count;
Expand Down
37 changes: 27 additions & 10 deletions src/brs/at/AtApiPlatformImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,26 @@ public static AtApiPlatformImpl getInstance() {
return instance;
}

// Version 1 - ok
// Pro Block die letzten 500 blocke der aktuell relevanten ATs in den Mem laden - 40 ms

// Version 2 - 80% ok
// Bei start alle tx der letzen 500 Bloecke in den Speicher laden - und (check) - 400ms (1x)
// Jeden weiteren Block inkrementell die neuesten Tx an den Speicher anhaengen (check) - 2m (Nx)
// --> Entferne alte Bloecke (aelter als 500 Bloecke)


private static Long findTransaction(int startHeight, int endHeight, Long atID, int numOfTx, long minAmount) {
try {
return ATProcessorCache.getInstance().findTransactionId(startHeight, endHeight, atID, numOfTx, minAmount);
} catch (ATProcessorCache.CacheMissException e) {
return Signum.getStores().getAtStore().findTransaction(startHeight, endHeight, atID, numOfTx, minAmount);
ATProcessorCache cache = ATProcessorCache.getInstance();
if (cache.isEnabled()) {
try {
return ATProcessorCache.getInstance().findTransactionId(startHeight, endHeight, atID, numOfTx, minAmount);
} catch (ATProcessorCache.CacheMissException e) {
// no op
}
}
////
return Signum.getStores().getAtStore().findTransaction(startHeight, endHeight, atID, numOfTx, minAmount);
//
// long id = 0;
// long idOrig = Signum.getStores().getAtStore().findTransaction(startHeight, endHeight, atID, numOfTx, minAmount);
// try {
Expand All @@ -56,15 +69,19 @@ private static Long findTransaction(int startHeight, int endHeight, Long atID, i
// logger.error("Cache mismatch: {} x {}", id, idOrig );
// }
//
// return id;
// return idOrig;
}

private static int findTransactionHeight(Long transactionId, int height, Long atID, long minAmount) {
try {
return ATProcessorCache.getInstance().findTransactionHeight(transactionId, height, atID, minAmount);
} catch (ATProcessorCache.CacheMissException e) {
return Signum.getStores().getAtStore().findTransactionHeight(transactionId, height, atID, minAmount);
ATProcessorCache cache = ATProcessorCache.getInstance();
if (cache.isEnabled()) {
try {
return ATProcessorCache.getInstance().findTransactionHeight(transactionId, height, atID, minAmount);
} catch (ATProcessorCache.CacheMissException e) {
// no op
}
}
return Signum.getStores().getAtStore().findTransactionHeight(transactionId, height, atID, minAmount);
// int h;
// int hOrig =
// Signum.getStores().getAtStore().findTransactionHeight(transactionId, height, atID, minAmount);
Expand Down
Loading

0 comments on commit 0ba6751

Please sign in to comment.