Skip to content

Commit

Permalink
feat: efficient bulk inserts on indirect_incoming, and account_balances
Browse files Browse the repository at this point in the history
  • Loading branch information
ohager committed May 5, 2024
1 parent cb0b3ee commit 05609be
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 30 deletions.
43 changes: 32 additions & 11 deletions src/brs/db/sql/SqlAccountStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import brs.db.store.DerivedTableManager;
import brs.fluxcapacitor.FluxValues;
import brs.props.Props;
import brs.schema.tables.records.AccountBalanceRecord;
import brs.util.Convert;
import signumj.crypto.SignumCrypto;

Expand Down Expand Up @@ -169,19 +170,39 @@ protected Account.Balance load(DSLContext ctx, Record rs) {

@Override
protected void bulkInsert(DSLContext ctx, Collection<Account.Balance> accounts) {
List<Query> accountQueries = new ArrayList<>();
int height = Signum.getBlockchain().getHeight();
for (Account.Balance account : accounts) {
if (account == null) continue;
accountQueries.add(
ctx.insertInto(
ACCOUNT_BALANCE, ACCOUNT_BALANCE.ID, ACCOUNT_BALANCE.HEIGHT,
ACCOUNT_BALANCE.BALANCE, ACCOUNT_BALANCE.UNCONFIRMED_BALANCE, ACCOUNT_BALANCE.FORGED_BALANCE, ACCOUNT.LATEST)
.values(account.getId(), height,
account.getBalanceNqt(), account.getUnconfirmedBalanceNqt(), account.getForgedBalanceNqt(), true)
);
Iterator<Account.Balance> iterator = accounts.iterator();
List<Record6<Long, Integer, Long, Long, Long, Boolean>> rows = new ArrayList<>();
while (iterator.hasNext()) {
Account.Balance balance = iterator.next();
if(balance == null) {
continue;
}

rows.add(ctx.newRecord(ACCOUNT_BALANCE.ID, ACCOUNT_BALANCE.HEIGHT,
ACCOUNT_BALANCE.BALANCE, ACCOUNT_BALANCE.UNCONFIRMED_BALANCE,
ACCOUNT_BALANCE.FORGED_BALANCE, ACCOUNT.LATEST)
.values(balance.getId(), height,
balance.getBalanceNqt(), balance.getUnconfirmedBalanceNqt(),
balance.getForgedBalanceNqt(), true));

if(rows.size() >= 250000){
ctx.insertInto(ACCOUNT_BALANCE, ACCOUNT_BALANCE.ID, ACCOUNT_BALANCE.HEIGHT,
ACCOUNT_BALANCE.BALANCE, ACCOUNT_BALANCE.UNCONFIRMED_BALANCE,
ACCOUNT_BALANCE.FORGED_BALANCE, ACCOUNT.LATEST)
.valuesOfRecords(rows)
.execute();
rows.clear();
}
}

if(!rows.isEmpty()){
ctx.insertInto(ACCOUNT_BALANCE, ACCOUNT_BALANCE.ID, ACCOUNT_BALANCE.HEIGHT,
ACCOUNT_BALANCE.BALANCE, ACCOUNT_BALANCE.UNCONFIRMED_BALANCE,
ACCOUNT_BALANCE.FORGED_BALANCE, ACCOUNT.LATEST)
.valuesOfRecords(rows)
.execute();
}
ctx.batch(accountQueries).execute();
}
};
}
Expand Down
39 changes: 20 additions & 19 deletions src/brs/db/sql/SqlIndirectIncomingStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,13 @@
import brs.db.SignumKey;
import brs.db.store.DerivedTableManager;
import brs.db.store.IndirectIncomingStore;
import org.jooq.BatchBindStep;
import org.jooq.DSLContext;
import org.jooq.Query;
import org.jooq.Record;
import org.jooq.*;
import org.jooq.exception.DataAccessException;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.stream.Collectors;

import static brs.schema.Tables.INDIRECT_INCOMING;
Expand Down Expand Up @@ -58,30 +57,32 @@ void save(DSLContext ctx, IndirectIncoming indirectIncoming) {
@Override
void save(DSLContext ctx, Collection<IndirectIncoming> indirectIncomings) {
Iterator<IndirectIncoming> iterator = indirectIncomings.iterator();
Query insertStatement = ctx.insertInto(
INDIRECT_INCOMING,
INDIRECT_INCOMING.ACCOUNT_ID,
INDIRECT_INCOMING.TRANSACTION_ID,
INDIRECT_INCOMING.AMOUNT,
INDIRECT_INCOMING.QUANTITY,
INDIRECT_INCOMING.HEIGHT)
.values(0L, 0L, 0L, 0L, 0);
while (iterator.hasNext()) {
BatchBindStep bindStep = ctx.batch(insertStatement);
// break into batches of 50k queries max
for (int i = 0; i < 50000 && iterator.hasNext(); i++) {
List<Record5<Long, Long, Long, Long, Integer>> rows = new ArrayList<>();
// break into batches
for (int i = 0; i < 250000 && iterator.hasNext(); i++) {
IndirectIncoming indirectIncoming = iterator.next();
bindStep.bind(
rows.add(ctx.newRecord(INDIRECT_INCOMING.ACCOUNT_ID,
INDIRECT_INCOMING.TRANSACTION_ID,
INDIRECT_INCOMING.AMOUNT,
INDIRECT_INCOMING.QUANTITY,
INDIRECT_INCOMING.HEIGHT).values(
indirectIncoming.getAccountId(),
indirectIncoming.getTransactionId(),
indirectIncoming.getAmount(),
indirectIncoming.getQuantity(),
indirectIncoming.getHeight()
);
));
}
try {
bindStep.execute();
} catch (DataAccessException e) {
ctx.insertInto(INDIRECT_INCOMING, INDIRECT_INCOMING.ACCOUNT_ID,
INDIRECT_INCOMING.TRANSACTION_ID,
INDIRECT_INCOMING.AMOUNT,
INDIRECT_INCOMING.QUANTITY,
INDIRECT_INCOMING.HEIGHT)
.valuesOfRecords(rows)
.execute();
} catch (Exception e) {
// TODO: remove this catch after better handling of indirects and forks
}
}
Expand Down

0 comments on commit 05609be

Please sign in to comment.