Skip to content

Commit

Permalink
Use a thread in PersistenceManager for reading persisted files instea…
Browse files Browse the repository at this point in the history
…d of using thread in client code

Before we use a thread in readFromResources and readAllPersisted. To avoid that client code need to deal with
threading we moved that to the PersistenceManager and changed the API accordingly so it will not return the persisted object but calls a consumer once it is completed with reading.
  • Loading branch information
chimp1984 committed Oct 12, 2020
1 parent 3687a03 commit fa2163f
Show file tree
Hide file tree
Showing 34 changed files with 387 additions and 334 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;

import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -213,14 +214,41 @@ public void shutdown() {
// Reading file
///////////////////////////////////////////////////////////////////////////////////////////

/**
* Read persisted file in a thread.
*
* @param resultHandler Consumer of persisted data once it was read from disk.
* @param orElse Called if no file exists or reading of file failed.
*/
public void readPersisted(Consumer<T> resultHandler, Runnable orElse) {
readPersisted(checkNotNull(fileName), resultHandler, orElse);
}

/**
* Read persisted file in a thread.
*
* @param fileName File name of our persisted data.
* @param resultHandler Consumer of persisted data once it was read from disk.
* @param orElse Called if no file exists or reading of file failed.
*/
public void readPersisted(String fileName, Consumer<T> resultHandler, Runnable orElse) {
new Thread(() -> {
T persisted = getPersisted(fileName);
if (persisted != null) {
resultHandler.accept(persisted);
} else {
orElse.run();
}
}, "BisqExecutable-read-" + fileName).start();
}

// API for synchronous reading of data. Not recommended to be used in application code.
// Currently used by tests and monitor. Should be converted to the threaded API as well.
@Nullable
public T getPersisted() {
return getPersisted(checkNotNull(fileName));
}

//TODO use threading here instead in the clients
// We get called at startup either by readAllPersisted or readFromResources. Both are wrapped in a thread so we
// are not on the user thread.
@Nullable
public T getPersisted(String fileName) {
File storageFile = new File(dir, fileName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,6 @@

package bisq.common.proto.persistable;

import java.util.List;

public interface PersistedDataHost {
void readPersisted();

static void apply(List<PersistedDataHost> persistedDataHosts) {
persistedDataHosts.forEach(PersistedDataHost::readPersisted);
}
void readPersisted(Runnable completeHandler);
}
9 changes: 3 additions & 6 deletions core/src/main/java/bisq/core/app/BisqExecutable.java
Original file line number Diff line number Diff line change
Expand Up @@ -169,14 +169,11 @@ protected void readAllPersisted(@Nullable List<PersistedDataHost> additionalHost

AtomicInteger remaining = new AtomicInteger(hosts.size());
hosts.forEach(e -> {
new Thread(() -> {
e.readPersisted();
remaining.decrementAndGet();
if (remaining.get() == 0) {
e.readPersisted(() -> {
if (remaining.decrementAndGet() == 0) {
UserThread.execute(completeHandler);
}

}, "BisqExecutable-read-" + e.getClass().getSimpleName()).start();
});
});
}

Expand Down
8 changes: 3 additions & 5 deletions core/src/main/java/bisq/core/app/BisqSetup.java
Original file line number Diff line number Diff line change
Expand Up @@ -317,11 +317,9 @@ private void maybeShowTac(Runnable nextStep) {
}
}

private void readMapsFromResources(Runnable nextStep) {
SetupUtils.readFromResources(p2PService.getP2PDataStorage(), config).addListener((observable, oldValue, newValue) -> {
if (newValue)
nextStep.run();
});
private void readMapsFromResources(Runnable completeHandler) {
String postFix = "_" + config.baseCurrencyNetwork.name();
p2PService.getP2PDataStorage().readFromResources(postFix, completeHandler);
}

private void startP2pNetworkAndWallet(Runnable nextStep) {
Expand Down
49 changes: 0 additions & 49 deletions core/src/main/java/bisq/core/app/SetupUtils.java

This file was deleted.

10 changes: 4 additions & 6 deletions core/src/main/java/bisq/core/app/misc/AppSetupWithP2P.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import bisq.core.account.sign.SignedWitnessService;
import bisq.core.account.witness.AccountAgeWitnessService;
import bisq.core.app.SetupUtils;
import bisq.core.app.TorSetup;
import bisq.core.filter.FilterManager;
import bisq.core.trade.statistics.TradeStatisticsManager;
Expand Down Expand Up @@ -79,7 +78,8 @@ public void initPersistedDataHosts() {
// we apply at startup the reading of persisted data but don't want to get it triggered in the constructor
persistedDataHosts.forEach(e -> {
try {
e.readPersisted();
e.readPersisted(() -> {
});
} catch (Throwable e1) {
log.error("readPersisted error", e1);
}
Expand All @@ -88,10 +88,8 @@ public void initPersistedDataHosts() {

@Override
protected void initBasicServices() {
SetupUtils.readFromResources(p2PService.getP2PDataStorage(), config).addListener((observable, oldValue, newValue) -> {
if (newValue)
startInitP2PNetwork();
});
String postFix = "_" + config.baseCurrencyNetwork.name();
p2PService.getP2PDataStorage().readFromResources(postFix, this::startInitP2PNetwork);
}

private void startInitP2PNetwork() {
Expand Down
25 changes: 13 additions & 12 deletions core/src/main/java/bisq/core/btc/model/AddressEntryList.java
Original file line number Diff line number Diff line change
Expand Up @@ -63,12 +63,13 @@ public AddressEntryList(PersistenceManager<AddressEntryList> persistenceManager)
}

@Override
public void readPersisted() {
AddressEntryList persisted = persistenceManager.getPersisted();
if (persisted != null) {
entrySet.clear();
entrySet.addAll(persisted.entrySet);
}
public void readPersisted(Runnable completeHandler) {
persistenceManager.readPersisted(persisted -> {
entrySet.clear();
entrySet.addAll(persisted.entrySet);
completeHandler.run();
},
completeHandler);
}


Expand Down Expand Up @@ -110,12 +111,12 @@ public void onWalletReady(Wallet wallet) {
Set<AddressEntry> toBeRemoved = new HashSet<>();
entrySet.forEach(addressEntry -> {
Script.ScriptType scriptType = addressEntry.isSegwit() ? Script.ScriptType.P2WPKH
: Script.ScriptType.P2PKH;
: Script.ScriptType.P2PKH;
DeterministicKey keyFromPubHash = (DeterministicKey) wallet.findKeyFromPubKeyHash(
addressEntry.getPubKeyHash(), scriptType);
addressEntry.getPubKeyHash(), scriptType);
if (keyFromPubHash != null) {
Address addressFromKey = Address.fromKey(Config.baseCurrencyNetworkParameters(), keyFromPubHash,
scriptType);
scriptType);
// We want to ensure key and address matches in case we have address in entry available already
if (addressEntry.getAddress() == null || addressFromKey.equals(addressEntry.getAddress())) {
addressEntry.setDeterministicKey(keyFromPubHash);
Expand Down Expand Up @@ -197,8 +198,8 @@ public void addAddressEntry(AddressEntry addressEntry) {
public void swapToAvailable(AddressEntry addressEntry) {
boolean setChangedByRemove = entrySet.remove(addressEntry);
boolean setChangedByAdd = entrySet.add(new AddressEntry(addressEntry.getKeyPair(),
AddressEntry.Context.AVAILABLE,
addressEntry.isSegwit()));
AddressEntry.Context.AVAILABLE,
addressEntry.isSegwit()));
if (setChangedByRemove || setChangedByAdd) {
requestPersistence();
}
Expand Down Expand Up @@ -234,7 +235,7 @@ private void maybeAddNewAddressEntry(Transaction tx) {
.map(address -> Pair.of(address, (DeterministicKey) wallet.findKeyFromAddress(address)))
.filter(pair -> pair.getRight() != null)
.map(pair -> new AddressEntry(pair.getRight(), AddressEntry.Context.AVAILABLE,
pair.getLeft() instanceof SegwitAddress))
pair.getLeft() instanceof SegwitAddress))
.forEach(this::addAddressEntry);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,13 +129,16 @@ public void start() {
///////////////////////////////////////////////////////////////////////////////////////////

@Override
public void readPersisted() {
public void readPersisted(Runnable completeHandler) {
if (DevEnv.isDaoActivated()) {
BallotList persisted = persistenceManager.getPersisted();
if (persisted != null) {
ballotList.setAll(persisted.getList());
listeners.forEach(l -> l.onListChanged(ballotList.getList()));
}
persistenceManager.readPersisted(persisted -> {
ballotList.setAll(persisted.getList());
listeners.forEach(l -> l.onListChanged(ballotList.getList()));
completeHandler.run();
},
completeHandler);
} else {
completeHandler.run();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,12 +162,15 @@ public void start() {
///////////////////////////////////////////////////////////////////////////////////////////

@Override
public void readPersisted() {
public void readPersisted(Runnable completeHandler) {
if (DevEnv.isDaoActivated()) {
MyBlindVoteList persisted = persistenceManager.getPersisted();
if (persisted != null) {
myBlindVoteList.setAll(persisted.getList());
}
persistenceManager.readPersisted(persisted -> {
myBlindVoteList.setAll(persisted.getList());
completeHandler.run();
},
completeHandler);
} else {
completeHandler.run();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,15 @@ public MyReputationListService(PersistenceManager<MyReputationList> persistenceM
///////////////////////////////////////////////////////////////////////////////////////////

@Override
public void readPersisted() {
public void readPersisted(Runnable completeHandler) {
if (DevEnv.isDaoActivated()) {
MyReputationList persisted = persistenceManager.getPersisted();
if (persisted != null) {
myReputationList.setAll(persisted.getList());
}
persistenceManager.readPersisted(persisted -> {
myReputationList.setAll(persisted.getList());
completeHandler.run();
},
completeHandler);
} else {
completeHandler.run();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,12 +69,15 @@ public MyVoteListService(DaoStateService daoStateService,
///////////////////////////////////////////////////////////////////////////////////////////

@Override
public void readPersisted() {
public void readPersisted(Runnable completeHandler) {
if (DevEnv.isDaoActivated()) {
MyVoteList persisted = persistenceManager.getPersisted();
if (persisted != null) {
this.myVoteList.setAll(persisted.getList());
}
persistenceManager.readPersisted(persisted -> {
myVoteList.setAll(persisted.getList());
completeHandler.run();
},
completeHandler);
} else {
completeHandler.run();
}
}

Expand All @@ -97,7 +100,8 @@ public void applyRevealTxId(MyVote myVote, String voteRevealTxId) {
requestPersistence();
}

public Tuple2<Long, Long> getMeritAndStakeForProposal(String proposalTxId, MyBlindVoteListService myBlindVoteListService) {
public Tuple2<Long, Long> getMeritAndStakeForProposal(String proposalTxId,
MyBlindVoteListService myBlindVoteListService) {
long merit = 0;
long stake = 0;
List<MyVote> list = new ArrayList<>(myVoteList.getList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,15 @@ public MyProofOfBurnListService(PersistenceManager<MyProofOfBurnList> persistenc
///////////////////////////////////////////////////////////////////////////////////////////

@Override
public void readPersisted() {
public void readPersisted(Runnable completeHandler) {
if (DevEnv.isDaoActivated()) {
MyProofOfBurnList persisted = persistenceManager.getPersisted();
if (persisted != null) {
myProofOfBurnList.setAll(persisted.getList());
}
persistenceManager.readPersisted(persisted -> {
myProofOfBurnList.setAll(persisted.getList());
completeHandler.run();
},
completeHandler);
} else {
completeHandler.run();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,13 +107,16 @@ public MyProposalListService(P2PService p2PService,
///////////////////////////////////////////////////////////////////////////////////////////

@Override
public void readPersisted() {
public void readPersisted(Runnable completeHandler) {
if (DevEnv.isDaoActivated()) {
MyProposalList persisted = persistenceManager.getPersisted();
if (persisted != null) {
myProposalList.setAll(persisted.getList());
listeners.forEach(l -> l.onListChanged(getList()));
}
persistenceManager.readPersisted(persisted -> {
myProposalList.setAll(persisted.getList());
listeners.forEach(l -> l.onListChanged(getList()));
completeHandler.run();
},
completeHandler);
} else {
completeHandler.run();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,9 @@ public boolean canHandle(ProtectedStorageEntry entry) {
}

@Override
protected void readFromResources(String postFix) {
protected void readFromResources(String postFix, Runnable completeHandler) {
// We do not have a resource file for that store, so we just call the readStore method instead.
readStore();
readStore(persisted -> completeHandler.run());
}


Expand Down
Loading

0 comments on commit fa2163f

Please sign in to comment.