Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature/REST] Pagination by index for address|scripthash txs/chain endpoint #50

Open
wants to merge 12 commits into
base: new-index
Choose a base branch
from
22 changes: 14 additions & 8 deletions src/daemon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use base64;
use bitcoin::hashes::hex::{FromHex, ToHex};
use glob;
use hex;
use itertools::Itertools;
use serde_json::{from_str, from_value, Value};

#[cfg(not(feature = "liquid"))]
Expand Down Expand Up @@ -378,19 +379,24 @@ impl Daemon {

fn handle_request_batch(&self, method: &str, params_list: &[Value]) -> Result<Vec<Value>> {
let id = self.message_id.next();
let reqs = params_list
let chunks = params_list
.iter()
.map(|params| json!({"method": method, "params": params, "id": id}))
.collect();
.chunks(50_000); // Max Amount of batched requests
let mut results = vec![];
let mut replies = self.call_jsonrpc(method, &reqs)?;
if let Some(replies_vec) = replies.as_array_mut() {
for reply in replies_vec {
results.push(parse_jsonrpc_reply(reply.take(), method, id)?)
for chunk in &chunks {
let reqs = chunk.collect();
let mut replies = self.call_jsonrpc(method, &reqs)?;
if let Some(replies_vec) = replies.as_array_mut() {
for reply in replies_vec {
results.push(parse_jsonrpc_reply(reply.take(), method, id)?)
}
} else {
bail!("non-array replies: {:?}", replies);
}
return Ok(results);
}
bail!("non-array replies: {:?}", replies);

Ok(results)
}

fn retry_request_batch(&self, method: &str, params_list: &[Value]) -> Result<Vec<Value>> {
Expand Down
2 changes: 1 addition & 1 deletion src/new_index/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ impl DB {
db_opts.create_if_missing(true);
db_opts.set_max_open_files(100_000); // TODO: make sure to `ulimit -n` this process correctly
db_opts.set_compaction_style(rocksdb::DBCompactionStyle::Level);
db_opts.set_compression_type(rocksdb::DBCompressionType::Snappy);
db_opts.set_compression_type(rocksdb::DBCompressionType::Lz4);
db_opts.set_target_file_size_base(1_073_741_824);
db_opts.set_write_buffer_size(256 << 20);
db_opts.set_disable_auto_compactions(true); // for initial bulk load
Expand Down
16 changes: 10 additions & 6 deletions src/new_index/mempool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ impl Mempool {
}
};
// Add new transactions
self.add(to_add);
self.add(to_add)?;
// Remove missing transactions
self.remove(to_remove);

Expand All @@ -314,15 +314,17 @@ impl Mempool {
Ok(())
}

pub fn add_by_txid(&mut self, daemon: &Daemon, txid: &Txid) {
pub fn add_by_txid(&mut self, daemon: &Daemon, txid: &Txid) -> Result<()> {
if self.txstore.get(txid).is_none() {
if let Ok(tx) = daemon.getmempooltx(&txid) {
self.add(vec![tx])
self.add(vec![tx])?;
}
}

Ok(())
}

fn add(&mut self, txs: Vec<Transaction>) {
fn add(&mut self, txs: Vec<Transaction>) -> Result<()> {
self.delta
.with_label_values(&["add"])
.observe(txs.len() as f64);
Expand All @@ -341,13 +343,13 @@ impl Mempool {
Err(err) => {
warn!("lookup txouts failed: {}", err);
// TODO: should we remove txids from txstore?
return;
return Ok(());
}
};
for txid in txids {
let tx = self.txstore.get(&txid).expect("missing mempool tx");
let txid_bytes = full_hash(&txid[..]);
let prevouts = extract_tx_prevouts(&tx, &txos, false);
let prevouts = extract_tx_prevouts(&tx, &txos, false)?;

// Get feeinfo for caching and recent tx overview
let feeinfo = TxFeeInfo::new(&tx, &prevouts, self.config.network_type);
Expand Down Expand Up @@ -418,6 +420,8 @@ impl Mempool {
&mut self.asset_issuance,
);
}

Ok(())
}

pub fn lookup_txo(&self, outpoint: &OutPoint) -> Result<TxOut> {
Expand Down
8 changes: 3 additions & 5 deletions src/new_index/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ impl Query {
self.mempool
.write()
.unwrap()
.add_by_txid(&self.daemon, &txid);
.add_by_txid(&self.daemon, &txid)?;
Ok(txid)
}

Expand Down Expand Up @@ -118,11 +118,9 @@ impl Query {
.or_else(|| self.mempool().lookup_raw_txn(txid))
}

pub fn lookup_txos(&self, outpoints: &BTreeSet<OutPoint>) -> HashMap<OutPoint, TxOut> {
pub fn lookup_txos(&self, outpoints: &BTreeSet<OutPoint>) -> Result<HashMap<OutPoint, TxOut>> {
// the mempool lookup_txos() internally looks up confirmed txos as well
self.mempool()
.lookup_txos(outpoints)
.expect("failed loading txos")
self.mempool().lookup_txos(outpoints)
}

pub fn lookup_spend(&self, outpoint: &OutPoint) -> Option<SpendingInput> {
Expand Down
19 changes: 12 additions & 7 deletions src/new_index/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@ use std::collections::{BTreeSet, HashMap, HashSet};
use std::path::Path;
use std::sync::{Arc, RwLock};

use crate::chain::{
use crate::{chain::{
BlockHash, BlockHeader, Network, OutPoint, Script, Transaction, TxOut, Txid, Value,
};
}, rest::AddressPaginator};
use crate::config::Config;
use crate::daemon::Daemon;
use crate::errors::*;
Expand Down Expand Up @@ -461,21 +461,25 @@ impl ChainQuery {
pub fn history(
&self,
scripthash: &[u8],
last_seen_txid: Option<&Txid>,
address_paginator: Option<&AddressPaginator>,
limit: usize,
) -> Vec<(Transaction, BlockId)> {
// scripthash lookup
self._history(b'H', scripthash, last_seen_txid, limit)
self._history(b'H', scripthash, address_paginator, limit)
}

fn _history(
&self,
code: u8,
hash: &[u8],
last_seen_txid: Option<&Txid>,
address_paginator: Option<&AddressPaginator>,
limit: usize,
) -> Vec<(Transaction, BlockId)> {
let _timer_scan = self.start_timer("history");
let last_seen_txid = address_paginator.and_then(|ap| match ap {
AddressPaginator::Txid(txid) => Some(txid),
_ => None,
});
let txs_conf = self
.history_iter_scan_reverse(code, hash)
.map(|row| TxHistoryRow::from_row(row).get_txid())
Expand All @@ -486,8 +490,9 @@ impl ChainQuery {
// skip until we reach the last_seen_txid
last_seen_txid.map_or(false, |last_seen_txid| last_seen_txid != txid)
})
.skip(match last_seen_txid {
Some(_) => 1, // skip the last_seen_txid itself
.skip(match address_paginator {
Some(AddressPaginator::Skip(skip)) => *skip,
Some(AddressPaginator::Txid(_)) => 1, // skip the last_seen_txid itself
None => 0,
})
.filter_map(|txid| self.tx_confirming_block(&txid).map(|b| (txid, b)))
Expand Down
Loading