Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert changes for json export of dao data #4689

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions core/src/main/java/bisq/core/dao/node/BsqNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ protected void onParseBlockChainComplete() {
parseBlockchainComplete = true;
daoStateService.onParseBlockChainComplete();

exportJsonFilesService.onParseBlockChainComplete();
maybeExportToJson();
}

@SuppressWarnings("WeakerAccess")
Expand Down Expand Up @@ -291,7 +291,7 @@ protected Optional<Block> doParseBlock(RawBlock rawBlock) throws RequiredReorgFr
return Optional.empty();
}

protected void maybeExportNewBlockToJson(Block block) {
exportJsonFilesService.onNewBlock(block);
protected void maybeExportToJson() {
exportJsonFilesService.maybeExportToJson();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,15 @@

import bisq.core.dao.DaoSetupService;
import bisq.core.dao.state.DaoStateService;
import bisq.core.dao.state.model.DaoState;
import bisq.core.dao.state.model.blockchain.Block;
import bisq.core.dao.state.model.blockchain.PubKeyScript;
import bisq.core.dao.state.model.blockchain.Tx;
import bisq.core.dao.state.model.blockchain.TxOutput;
import bisq.core.dao.state.model.blockchain.TxType;

import bisq.common.config.Config;
import bisq.common.file.FileUtil;
import bisq.common.file.JsonFileManager;
import bisq.common.util.Utilities;

Expand All @@ -35,25 +37,36 @@

import javax.inject.Named;

import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;

import java.nio.file.Paths;

import java.io.File;
import java.io.IOException;

import java.util.Arrays;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;

import lombok.extern.slf4j.Slf4j;

import org.jetbrains.annotations.NotNull;

@Slf4j
public class ExportJsonFilesService implements DaoSetupService {
private final DaoStateService daoStateService;
private final File storageDir;
private boolean dumpBlockchainData;
private JsonFileManager blockFileManager, txFileManager, txOutputFileManager, bsqStateFileManager;
private File blockDir;
private final boolean dumpBlockchainData;

private final ListeningExecutorService executor = Utilities.getListeningExecutorService("JsonExporter",
1, 1, 1200);
private JsonFileManager txFileManager, txOutputFileManager, bsqStateFileManager;

@Inject
public ExportJsonFilesService(DaoStateService daoStateService,
Expand All @@ -75,135 +88,88 @@ public void addListeners() {

@Override
public void start() {
if (!dumpBlockchainData) {
return;
}

File jsonDir = new File(Paths.get(storageDir.getAbsolutePath(), "json").toString());
blockDir = new File(Paths.get(storageDir.getAbsolutePath(), "json", "block").toString());
File txDir = new File(Paths.get(storageDir.getAbsolutePath(), "json", "tx").toString());
File txOutputDir = new File(Paths.get(storageDir.getAbsolutePath(), "json", "txo").toString());
File bsqStateDir = new File(Paths.get(storageDir.getAbsolutePath(), "json", "all").toString());

if (!jsonDir.mkdir())
log.warn("make jsonDir failed.\njsonDir=" + jsonDir.getAbsolutePath());

if (!blockDir.mkdir())
log.warn("make blockDir failed.\njsonDir=" + blockDir.getAbsolutePath());

if (!txDir.mkdir())
log.warn("make txDir failed.\ntxDir=" + txDir.getAbsolutePath());

if (!txOutputDir.mkdir())
log.warn("make txOutputDir failed.\ntxOutputDir=" + txOutputDir.getAbsolutePath());
if (dumpBlockchainData) {
File jsonDir = new File(Paths.get(storageDir.getAbsolutePath(), "json").toString());
File txDir = new File(Paths.get(storageDir.getAbsolutePath(), "json", "tx").toString());
File txOutputDir = new File(Paths.get(storageDir.getAbsolutePath(), "json", "txo").toString());
File bsqStateDir = new File(Paths.get(storageDir.getAbsolutePath(), "json", "all").toString());
try {
if (txDir.exists())
FileUtil.deleteDirectory(txDir);
if (txOutputDir.exists())
FileUtil.deleteDirectory(txOutputDir);
if (bsqStateDir.exists())
FileUtil.deleteDirectory(bsqStateDir);
if (jsonDir.exists())
FileUtil.deleteDirectory(jsonDir);
} catch (IOException e) {
log.error(e.toString());
e.printStackTrace();
}

if (!bsqStateDir.mkdir())
log.warn("make bsqStateDir failed.\nbsqStateDir=" + bsqStateDir.getAbsolutePath());
if (!jsonDir.mkdir())
log.warn("make jsonDir failed.\njsonDir=" + jsonDir.getAbsolutePath());

blockFileManager = new JsonFileManager(blockDir);
txFileManager = new JsonFileManager(txDir);
txOutputFileManager = new JsonFileManager(txOutputDir);
bsqStateFileManager = new JsonFileManager(bsqStateDir);
}
if (!txDir.mkdir())
log.warn("make txDir failed.\ntxDir=" + txDir.getAbsolutePath());

public void shutDown() {
if (!dumpBlockchainData) {
return;
}
if (!txOutputDir.mkdir())
log.warn("make txOutputDir failed.\ntxOutputDir=" + txOutputDir.getAbsolutePath());

blockFileManager.shutDown();
txFileManager.shutDown();
txOutputFileManager.shutDown();
bsqStateFileManager.shutDown();
dumpBlockchainData = false;
}
if (!bsqStateDir.mkdir())
log.warn("make bsqStateDir failed.\nbsqStateDir=" + bsqStateDir.getAbsolutePath());

public void onNewBlock(Block block) {
if (!dumpBlockchainData) {
return;
txFileManager = new JsonFileManager(txDir);
txOutputFileManager = new JsonFileManager(txOutputDir);
bsqStateFileManager = new JsonFileManager(bsqStateDir);
}

// We do write the block on the main thread as the overhead to create a thread and risk for inconsistency is not
// worth the potential performance gain.
processBlock(block, true);
}

private void processBlock(Block block, boolean doDumpDaoState) {
int lastPersistedBlock = getLastPersistedBlock();
if (block.getHeight() <= lastPersistedBlock) {
return;
}

long ts = System.currentTimeMillis();
JsonBlock jsonBlock = getJsonBlock(block);
blockFileManager.writeToDisc(Utilities.objectToJson(jsonBlock), String.valueOf(jsonBlock.getHeight()));

jsonBlock.getTxs().forEach(jsonTx -> {
txFileManager.writeToDisc(Utilities.objectToJson(jsonTx), jsonTx.getId());

jsonTx.getOutputs().forEach(jsonTxOutput ->
txOutputFileManager.writeToDisc(Utilities.objectToJson(jsonTxOutput), jsonTxOutput.getId()));
});

log.info("Write json data for block {} took {} ms", block.getHeight(), System.currentTimeMillis() - ts);

if (doDumpDaoState) {
dumpDaoState();
public void shutDown() {
if (dumpBlockchainData && txFileManager != null) {
txFileManager.shutDown();
txOutputFileManager.shutDown();
bsqStateFileManager.shutDown();
}
}

public void onParseBlockChainComplete() {
if (!dumpBlockchainData) {
return;
}

int lastPersistedBlock = getLastPersistedBlock();
List<Block> blocks = daoStateService.getBlocksFromBlockHeight(lastPersistedBlock + 1, Integer.MAX_VALUE);

// We use a thread here to write all past blocks to avoid that the main thread gets blocked for too long.
new Thread(() -> {
Thread.currentThread().setName("Write all blocks to json");
blocks.forEach(e -> processBlock(e, false));
}).start();

dumpDaoState();
}
public void maybeExportToJson() {
if (dumpBlockchainData &&
daoStateService.isParseBlockChainComplete()) {
// We store the data we need once we write the data to disk (in the thread) locally.
// Access to daoStateService is single threaded, we must not access daoStateService from the thread.
List<JsonTxOutput> allJsonTxOutputs = new ArrayList<>();

List<JsonTx> jsonTxs = daoStateService.getUnorderedTxStream()
.map(tx -> {
JsonTx jsonTx = getJsonTx(tx);
allJsonTxOutputs.addAll(jsonTx.getOutputs());
return jsonTx;
}).collect(Collectors.toList());

DaoState daoState = daoStateService.getClone();
List<JsonBlock> jsonBlockList = daoState.getBlocks().stream()
.map(this::getJsonBlock)
.collect(Collectors.toList());
JsonBlocks jsonBlocks = new JsonBlocks(daoState.getChainHeight(), jsonBlockList);

private void dumpDaoState() {
// TODO we should get rid of that data structure and use the individual jsonBlocks instead as we cannot cache data
// here and re-write each time the full blockchain which is already > 200 MB
// Once the webapp has impl the changes we can delete that here.
long ts = System.currentTimeMillis();
List<JsonBlock> jsonBlockList = daoStateService.getBlocks().stream()
.map(this::getJsonBlock)
.collect(Collectors.toList());
JsonBlocks jsonBlocks = new JsonBlocks(daoStateService.getChainHeight(), jsonBlockList);
ListenableFuture<Void> future = executor.submit(() -> {
bsqStateFileManager.writeToDisc(Utilities.objectToJson(jsonBlocks), "blocks");
allJsonTxOutputs.forEach(jsonTxOutput -> txOutputFileManager.writeToDisc(Utilities.objectToJson(jsonTxOutput), jsonTxOutput.getId()));
jsonTxs.forEach(jsonTx -> txFileManager.writeToDisc(Utilities.objectToJson(jsonTx), jsonTx.getId()));
return null;
});

// We use here the thread write method as the data is quite large and write can take a bit
bsqStateFileManager.writeToDiscThreaded(Utilities.objectToJson(jsonBlocks), "blocks");
log.info("Dumping full bsqState with {} blocks took {} ms",
jsonBlocks.getBlocks().size(), System.currentTimeMillis() - ts);
}
Futures.addCallback(future, new FutureCallback<>() {
public void onSuccess(Void ignore) {
}

private int getLastPersistedBlock() {
// At start we use one block before genesis
int result = daoStateService.getGenesisBlockHeight() - 1;
String[] list = blockDir.list();
if (list != null && list.length > 0) {
List<Integer> blocks = Arrays.stream(list)
.filter(e -> !e.endsWith(".tmp"))
.map(e -> e.replace(".json", ""))
.map(Integer::valueOf)
.sorted()
.collect(Collectors.toList());
if (!blocks.isEmpty()) {
Integer lastBlockHeight = blocks.get(blocks.size() - 1);
if (lastBlockHeight > result) {
result = lastBlockHeight;
public void onFailure(@NotNull Throwable throwable) {
log.error(throwable.toString());
throwable.printStackTrace();
}
}
}, MoreExecutors.directExecutor());
}
return result;
}

private JsonBlock getJsonBlock(Block block) {
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/java/bisq/core/dao/node/full/FullNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ private void addBlockHandler() {
}

private void onNewBlock(Block block) {
maybeExportNewBlockToJson(block);
maybeExportToJson();

if (p2pNetworkReady && parseBlockchainComplete)
fullNodeNetworkService.publishNewBlock(block);
Expand Down
13 changes: 6 additions & 7 deletions core/src/main/java/bisq/core/dao/node/lite/LiteNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import bisq.core.dao.node.parser.exceptions.RequiredReorgFromSnapshotException;
import bisq.core.dao.state.DaoStateService;
import bisq.core.dao.state.DaoStateSnapshotService;
import bisq.core.dao.state.model.blockchain.Block;

import bisq.network.p2p.P2PService;
import bisq.network.p2p.network.Connection;
Expand All @@ -40,7 +39,6 @@

import java.util.ArrayList;
import java.util.List;
import java.util.Optional;

import lombok.extern.slf4j.Slf4j;

Expand Down Expand Up @@ -229,18 +227,19 @@ private void runDelayedBatchProcessing(List<RawBlock> blocks, Runnable resultHan
}

// We received a new block
private void onNewBlockReceived(RawBlock rawBlock) {
int blockHeight = rawBlock.getHeight();
log.debug("onNewBlockReceived: block at height {}, hash={}", blockHeight, rawBlock.getHash());
private void onNewBlockReceived(RawBlock block) {
int blockHeight = block.getHeight();
log.debug("onNewBlockReceived: block at height {}, hash={}", blockHeight, block.getHash());

// We only update chainTipHeight if we get a newer block
if (blockHeight > chainTipHeight)
chainTipHeight = blockHeight;

try {
Optional<Block> optionalBlock = doParseBlock(rawBlock);
optionalBlock.ifPresent(this::maybeExportNewBlockToJson);
doParseBlock(block);
} catch (RequiredReorgFromSnapshotException ignore) {
}

maybeExportToJson();
}
}