diff --git a/src/daemon.rs b/src/daemon.rs index eb05680b3..780ced933 100644 --- a/src/daemon.rs +++ b/src/daemon.rs @@ -9,6 +9,7 @@ use base64; use bitcoin::hashes::hex::{FromHex, ToHex}; use glob; use hex; +use itertools::Itertools; use serde_json::{from_str, from_value, Value}; #[cfg(not(feature = "liquid"))] @@ -378,19 +379,24 @@ impl Daemon { fn handle_request_batch(&self, method: &str, params_list: &[Value]) -> Result> { let id = self.message_id.next(); - let reqs = params_list + let chunks = params_list .iter() .map(|params| json!({"method": method, "params": params, "id": id})) - .collect(); + .chunks(50_000); // Max Amount of batched requests let mut results = vec![]; - let mut replies = self.call_jsonrpc(method, &reqs)?; - if let Some(replies_vec) = replies.as_array_mut() { - for reply in replies_vec { - results.push(parse_jsonrpc_reply(reply.take(), method, id)?) + for chunk in &chunks { + let reqs = chunk.collect(); + let mut replies = self.call_jsonrpc(method, &reqs)?; + if let Some(replies_vec) = replies.as_array_mut() { + for reply in replies_vec { + results.push(parse_jsonrpc_reply(reply.take(), method, id)?) + } + } else { + bail!("non-array replies: {:?}", replies); } - return Ok(results); } - bail!("non-array replies: {:?}", replies); + + Ok(results) } fn retry_request_batch(&self, method: &str, params_list: &[Value]) -> Result> { diff --git a/src/new_index/db.rs b/src/new_index/db.rs index ef226d34b..dda7fdb97 100644 --- a/src/new_index/db.rs +++ b/src/new_index/db.rs @@ -87,7 +87,7 @@ impl DB { db_opts.create_if_missing(true); db_opts.set_max_open_files(100_000); // TODO: make sure to `ulimit -n` this process correctly db_opts.set_compaction_style(rocksdb::DBCompactionStyle::Level); - db_opts.set_compression_type(rocksdb::DBCompressionType::Snappy); + db_opts.set_compression_type(rocksdb::DBCompressionType::Lz4); db_opts.set_target_file_size_base(1_073_741_824); db_opts.set_write_buffer_size(256 << 20); db_opts.set_disable_auto_compactions(true); // for initial bulk load diff --git a/src/new_index/mempool.rs b/src/new_index/mempool.rs index 9586c5168..188c93cd4 100644 --- a/src/new_index/mempool.rs +++ b/src/new_index/mempool.rs @@ -294,7 +294,7 @@ impl Mempool { } }; // Add new transactions - self.add(to_add); + self.add(to_add)?; // Remove missing transactions self.remove(to_remove); @@ -314,15 +314,17 @@ impl Mempool { Ok(()) } - pub fn add_by_txid(&mut self, daemon: &Daemon, txid: &Txid) { + pub fn add_by_txid(&mut self, daemon: &Daemon, txid: &Txid) -> Result<()> { if self.txstore.get(txid).is_none() { if let Ok(tx) = daemon.getmempooltx(&txid) { - self.add(vec![tx]) + self.add(vec![tx])?; } } + + Ok(()) } - fn add(&mut self, txs: Vec) { + fn add(&mut self, txs: Vec) -> Result<()> { self.delta .with_label_values(&["add"]) .observe(txs.len() as f64); @@ -341,13 +343,13 @@ impl Mempool { Err(err) => { warn!("lookup txouts failed: {}", err); // TODO: should we remove txids from txstore? - return; + return Ok(()); } }; for txid in txids { let tx = self.txstore.get(&txid).expect("missing mempool tx"); let txid_bytes = full_hash(&txid[..]); - let prevouts = extract_tx_prevouts(&tx, &txos, false); + let prevouts = extract_tx_prevouts(&tx, &txos, false)?; // Get feeinfo for caching and recent tx overview let feeinfo = TxFeeInfo::new(&tx, &prevouts, self.config.network_type); @@ -418,6 +420,8 @@ impl Mempool { &mut self.asset_issuance, ); } + + Ok(()) } pub fn lookup_txo(&self, outpoint: &OutPoint) -> Result { diff --git a/src/new_index/query.rs b/src/new_index/query.rs index 1e621ac0d..d69ffe5b8 100644 --- a/src/new_index/query.rs +++ b/src/new_index/query.rs @@ -74,7 +74,7 @@ impl Query { self.mempool .write() .unwrap() - .add_by_txid(&self.daemon, &txid); + .add_by_txid(&self.daemon, &txid)?; Ok(txid) } @@ -118,11 +118,9 @@ impl Query { .or_else(|| self.mempool().lookup_raw_txn(txid)) } - pub fn lookup_txos(&self, outpoints: &BTreeSet) -> HashMap { + pub fn lookup_txos(&self, outpoints: &BTreeSet) -> Result> { // the mempool lookup_txos() internally looks up confirmed txos as well - self.mempool() - .lookup_txos(outpoints) - .expect("failed loading txos") + self.mempool().lookup_txos(outpoints) } pub fn lookup_spend(&self, outpoint: &OutPoint) -> Option { diff --git a/src/new_index/schema.rs b/src/new_index/schema.rs index 35e49f8de..392cfb226 100644 --- a/src/new_index/schema.rs +++ b/src/new_index/schema.rs @@ -19,9 +19,9 @@ use std::collections::{BTreeSet, HashMap, HashSet}; use std::path::Path; use std::sync::{Arc, RwLock}; -use crate::chain::{ +use crate::{chain::{ BlockHash, BlockHeader, Network, OutPoint, Script, Transaction, TxOut, Txid, Value, -}; +}, rest::AddressPaginator}; use crate::config::Config; use crate::daemon::Daemon; use crate::errors::*; @@ -461,21 +461,25 @@ impl ChainQuery { pub fn history( &self, scripthash: &[u8], - last_seen_txid: Option<&Txid>, + address_paginator: Option<&AddressPaginator>, limit: usize, ) -> Vec<(Transaction, BlockId)> { // scripthash lookup - self._history(b'H', scripthash, last_seen_txid, limit) + self._history(b'H', scripthash, address_paginator, limit) } fn _history( &self, code: u8, hash: &[u8], - last_seen_txid: Option<&Txid>, + address_paginator: Option<&AddressPaginator>, limit: usize, ) -> Vec<(Transaction, BlockId)> { let _timer_scan = self.start_timer("history"); + let last_seen_txid = address_paginator.and_then(|ap| match ap { + AddressPaginator::Txid(txid) => Some(txid), + _ => None, + }); let txs_conf = self .history_iter_scan_reverse(code, hash) .map(|row| TxHistoryRow::from_row(row).get_txid()) @@ -486,8 +490,9 @@ impl ChainQuery { // skip until we reach the last_seen_txid last_seen_txid.map_or(false, |last_seen_txid| last_seen_txid != txid) }) - .skip(match last_seen_txid { - Some(_) => 1, // skip the last_seen_txid itself + .skip(match address_paginator { + Some(AddressPaginator::Skip(skip)) => *skip, + Some(AddressPaginator::Txid(_)) => 1, // skip the last_seen_txid itself None => 0, }) .filter_map(|txid| self.tx_confirming_block(&txid).map(|b| (txid, b))) diff --git a/src/rest.rs b/src/rest.rs index 108215634..cbf946862 100644 --- a/src/rest.rs +++ b/src/rest.rs @@ -133,7 +133,8 @@ impl TransactionValue { txos: &HashMap, config: &Config, ) -> Self { - let prevouts = extract_tx_prevouts(&tx, &txos, true); + let prevouts = + extract_tx_prevouts(&tx, &txos, true).expect("Cannot Err when allow_missing is true"); let vins: Vec = tx .input .iter() @@ -490,7 +491,7 @@ fn prepare_txs( txs: Vec<(Transaction, Option)>, query: &Query, config: &Config, -) -> Vec { +) -> Result, errors::Error> { let outpoints = txs .iter() .flat_map(|(tx, _)| { @@ -501,11 +502,12 @@ fn prepare_txs( }) .collect(); - let prevouts = query.lookup_txos(&outpoints); + let prevouts = query.lookup_txos(&outpoints)?; - txs.into_iter() + Ok(txs + .into_iter() .map(|(tx, blockid)| TransactionValue::new(tx, blockid, &prevouts, config)) - .collect() + .collect()) } #[tokio::main] @@ -600,6 +602,38 @@ pub fn start(config: Arc, query: Arc) -> Handle { } } +/// This enum is used to discern between a txid or a usize that is used +/// for pagination on the /api/address|scripthash/:address/txs/chain +/// endpoint. +#[cfg_attr(test, derive(Debug, PartialEq))] +pub enum AddressPaginator { + Txid(Txid), + Skip(usize), +} + +impl FromStr for AddressPaginator { + type Err = String; + + fn from_str(s: &str) -> Result { + // 1) Deal with Options in if else statement + // to utilize filter for usize. + // 2) 64 length usize doesn't exist, + // and from_hex is expensive. + if s.len() == 64 { + Txid::from_hex(s) + .ok() + .and_then(|txid| Some(Self::Txid(txid))) + } else { + s.parse::() + .ok() + .filter(|&skip| skip != 0) // Don't allow 0 for Skip + .and_then(|skip| Some(Self::Skip(skip))) + } + // 3) Convert the return value of the if else statement into a Result. + .ok_or("Invalid AddressPaginator".to_string()) + } +} + pub struct Handle { tx: oneshot::Sender<()>, thread: thread::JoinHandle<()>, @@ -759,7 +793,7 @@ fn handle_request( // XXX orphraned blocks alway get TTL_SHORT let ttl = ttl_by_depth(confirmed_blockid.map(|b| b.height), query); - json_response(prepare_txs(txs, query, config), ttl) + json_maybe_error_response(prepare_txs(txs, query, config), ttl) } (&Method::GET, Some(script_type @ &"address"), Some(script_str), None, None, None) | (&Method::GET, Some(script_type @ &"scripthash"), Some(script_str), None, None, None) => { @@ -810,7 +844,7 @@ fn handle_request( .map(|(tx, blockid)| (tx, Some(blockid))), ); - json_response(prepare_txs(txs, query, config), TTL_SHORT) + json_maybe_error_response(prepare_txs(txs, query, config), TTL_SHORT) } ( @@ -819,7 +853,7 @@ fn handle_request( Some(script_str), Some(&"txs"), Some(&"chain"), - last_seen_txid, + address_paginator, ) | ( &Method::GET, @@ -827,23 +861,23 @@ fn handle_request( Some(script_str), Some(&"txs"), Some(&"chain"), - last_seen_txid, + address_paginator, ) => { let script_hash = to_scripthash(script_type, script_str, config.network_type)?; - let last_seen_txid = last_seen_txid.and_then(|txid| Txid::from_hex(txid).ok()); + let address_paginator = address_paginator.and_then(|ap| ap.parse::().ok()); let txs = query .chain() .history( &script_hash[..], - last_seen_txid.as_ref(), + address_paginator.as_ref(), CHAIN_TXS_PER_PAGE, ) .into_iter() .map(|(tx, blockid)| (tx, Some(blockid))) .collect(); - json_response(prepare_txs(txs, query, config), TTL_SHORT) + json_maybe_error_response(prepare_txs(txs, query, config), TTL_SHORT) } ( &Method::GET, @@ -870,7 +904,7 @@ fn handle_request( .map(|tx| (tx, None)) .collect(); - json_response(prepare_txs(txs, query, config), TTL_SHORT) + json_maybe_error_response(prepare_txs(txs, query, config), TTL_SHORT) } ( @@ -913,9 +947,9 @@ fn handle_request( let blockid = query.chain().tx_confirming_block(&hash); let ttl = ttl_by_depth(blockid.as_ref().map(|b| b.height), query); - let tx = prepare_txs(vec![(tx, blockid)], query, config).remove(0); + let tx = prepare_txs(vec![(tx, blockid)], query, config).map(|mut v| v.remove(0)); - json_response(tx, ttl) + json_maybe_error_response(tx, ttl) } (&Method::GET, Some(&"tx"), Some(hash), Some(out_type @ &"hex"), None, None) | (&Method::GET, Some(&"tx"), Some(hash), Some(out_type @ &"raw"), None, None) => { @@ -1100,7 +1134,7 @@ fn handle_request( .map(|(tx, blockid)| (tx, Some(blockid))), ); - json_response(prepare_txs(txs, query, config), TTL_SHORT) + json_maybe_error_response(prepare_txs(txs, query, config), TTL_SHORT) } #[cfg(feature = "liquid")] @@ -1122,7 +1156,7 @@ fn handle_request( .map(|(tx, blockid)| (tx, Some(blockid))) .collect(); - json_response(prepare_txs(txs, query, config), TTL_SHORT) + json_maybe_error_response(prepare_txs(txs, query, config), TTL_SHORT) } #[cfg(feature = "liquid")] @@ -1136,7 +1170,7 @@ fn handle_request( .map(|tx| (tx, None)) .collect(); - json_response(prepare_txs(txs, query, config), TTL_SHORT) + json_maybe_error_response(prepare_txs(txs, query, config), TTL_SHORT) } #[cfg(feature = "liquid")] @@ -1187,6 +1221,26 @@ fn json_response(value: T, ttl: u32) -> Result, Htt .unwrap()) } +fn json_maybe_error_response( + value: Result, + ttl: u32, +) -> Result, HttpError> { + let response = Response::builder() + .header("Content-Type", "application/json") + .header("Cache-Control", format!("public, max-age={:}", ttl)); + Ok(match value { + Ok(v) => response + .body(Body::from(serde_json::to_string(&v)?)) + .expect("Valid http response"), + Err(e) => response + .status(500) + .body(Body::from(serde_json::to_string( + &json!({ "error": e.to_string() }), + )?)) + .expect("Valid http response"), + }) +} + fn blocks( query: &Query, config: &Config, @@ -1354,7 +1408,8 @@ impl From for HttpError { #[cfg(test)] mod tests { - use crate::rest::HttpError; + use crate::rest::{AddressPaginator, HttpError}; + use bitcoin::{Txid, hashes::hex::FromHex}; use serde_json::Value; use std::collections::HashMap; @@ -1419,4 +1474,67 @@ mod tests { assert!(err.is_err()); } + + #[test] + fn test_address_paginator() { + // Each vector is (result, expected, assert_reason) + let vectors = [ + ( + "".parse::(), + Err("Invalid AddressPaginator".to_string()), + "fails both Txid from_hex and usize parse", + ), + ( + "0".parse::(), + Err("Invalid AddressPaginator".to_string()), + "skipping 0 is pointless, so fails", + ), + ( + "18446744073709551615".parse::(), + Ok(AddressPaginator::Skip(18446744073709551615)), + "valid usize (u64::MAX)", + ), + ( + "1".parse::(), + Ok(AddressPaginator::Skip(1)), + "valid usize", + ), + ( + "0000000000000000000000000000000000000000000000000000000000000001" + .parse::(), + Ok(AddressPaginator::Txid( + Txid::from_hex("0000000000000000000000000000000000000000000000000000000000000001") + .unwrap(), + )), + "64 length is always treated as Txid", + ), + ( + "0000000000000000000000000000000000000000000018446744073709551615" + .parse::(), + Ok(AddressPaginator::Txid( + Txid::from_hex("0000000000000000000000000000000000000000000018446744073709551615") + .unwrap(), + )), + "64 length is always treated as Txid", + ), + ( + "ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff" + .parse::(), + Ok(AddressPaginator::Txid( + Txid::from_hex("ffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff") + .unwrap(), + )), + "valid Txid", + ), + ( + "ffffffxfffffffffffffffffffffffffffffffffffffffffffffffffffffffff" + .parse::(), + Err("Invalid AddressPaginator".to_string()), + "fails both Txid from_hex and usize parse", + ), + ]; + for vector in vectors { + assert_eq!(vector.0, vector.1, "{}", vector.2); + } + } } diff --git a/src/util/transaction.rs b/src/util/transaction.rs index ec0c28a6b..0179fe8c0 100644 --- a/src/util/transaction.rs +++ b/src/util/transaction.rs @@ -1,4 +1,5 @@ use crate::chain::{BlockHash, OutPoint, Transaction, TxIn, TxOut, Txid}; +use crate::errors; use crate::util::BlockId; use std::collections::HashMap; @@ -74,23 +75,35 @@ pub fn is_spendable(txout: &TxOut) -> bool { return !txout.is_fee() && !txout.script_pubkey.is_provably_unspendable(); } +/// Extract the previous TxOuts of a Transaction's TxIns +/// +/// # Errors +/// +/// This function MUST NOT return an error variant when allow_missing is true. +/// If allow_missing is false, it will return an error when any Outpoint is +/// missing from the keys of the txos argument's HashMap. pub fn extract_tx_prevouts<'a>( tx: &Transaction, txos: &'a HashMap, allow_missing: bool, -) -> HashMap { +) -> Result, errors::Error> { tx.input .iter() .enumerate() .filter(|(_, txi)| has_prevout(txi)) .filter_map(|(index, txi)| { - Some(( + Some(Ok(( index as u32, - txos.get(&txi.previous_output).or_else(|| { - assert!(allow_missing, "missing outpoint {:?}", txi.previous_output); - None - })?, - )) + match (allow_missing, txos.get(&txi.previous_output)) { + (_, Some(txo)) => txo, + (true, None) => return None, + (false, None) => { + return Some(Err( + format!("missing outpoint {:?}", txi.previous_output).into() + )); + } + }, + ))) }) .collect() }