Skip to content

Commit

Permalink
feat: broadcast claim after timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
michael1011 committed Feb 15, 2024
1 parent 4359331 commit e055199
Show file tree
Hide file tree
Showing 12 changed files with 281 additions and 54 deletions.
5 changes: 3 additions & 2 deletions .env
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
DATABASE_URL=sqlite://./db.sqlite

SWEEP_TIME=120
SWEEP_INTERVAL=30

NETWORK=regtest

API_HOST=127.0.0.1
API_PORT=1234

ELEMENTS_HOST=127.0.0.1
# ELEMENTS_PORT=7041
# ELEMENTS_COOKIE=/media/michael/990/Liquid/liquidv1/.cookie
ELEMENTS_PORT=18884
ELEMENTS_COOKIE=/home/michael/Git/TypeScript/boltz-backend/docker/regtest/data/core/cookies/.elements-cookie
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ panic = "abort"
[dependencies]
tokio = { version = "1.36.0", features = ["full"] }
axum = "0.7.4"
diesel = { version = "2.1.4", features = ["postgres", "sqlite", "r2d2"] }
diesel = { version = "2.1.4", features = ["sqlite", "r2d2", "chrono"] }
dotenvy = "0.15.7"
env_logger = "0.11.2"
log = "0.4.20"
Expand Down
2 changes: 2 additions & 0 deletions migrations/2024-02-14-014006_setup_schema/up.sql
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ CREATE TABLE pending_covenants (
swap_tree VARCHAR NOT NULL,
address BLOB NOT NULL,
blinding_key BLOB,
tx_id BLOB,
tx_time DATETIME,
created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP
);

Expand Down
2 changes: 2 additions & 0 deletions src/api/routes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,8 @@ pub async fn post_covenant_claim(
.address(body.internal_key, &state.address_params)
.script_pubkey(),
),
tx_id: None,
tx_time: None,
},
) {
Ok(_) => {
Expand Down
32 changes: 22 additions & 10 deletions src/chain/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,16 +102,15 @@ impl ChainClient {

let block_hex = self.request_params::<String>("getblock", params).await?;

match elements::encode::deserialize(
match hex::decode(block_hex) {
Ok(res) => res,
Err(err) => return Err(Box::new(err)),
}
.as_ref(),
) {
Ok(block) => Ok(block),
Err(e) => Err(Box::new(e)),
}
Self::parse_hex(block_hex)
}

pub async fn get_transaction(self, hash: String) -> Result<Transaction, Box<dyn Error>> {
let tx_hex = self
.request_params::<String>("getrawtransaction", vec![hash])
.await?;

Self::parse_hex(tx_hex)
}

pub async fn get_network_info(self) -> Result<NetworkInfo, Box<dyn Error>> {
Expand Down Expand Up @@ -169,4 +168,17 @@ impl ChainClient {

Ok(res.result.unwrap())
}

fn parse_hex<T: elements::encode::Decodable>(hex_str: String) -> Result<T, Box<dyn Error>> {
match elements::encode::deserialize(
match hex::decode(hex_str) {
Ok(res) => res,
Err(err) => return Err(Box::new(err)),
}
.as_ref(),
) {
Ok(block) => Ok(block),
Err(e) => Err(Box::new(e)),
}
}
}
166 changes: 155 additions & 11 deletions src/claimer/constructor.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
use diesel::internal::derives::multiconnection::chrono::{TimeDelta, Utc};
use std::error::Error;
use std::ops::Sub;

use elements::bitcoin::Witness;
use elements::confidential::{Asset, AssetBlindingFactor, Nonce, Value, ValueBlindingFactor};
use elements::script::Builder;
use elements::secp256k1_zkp::rand::rngs::OsRng;
use elements::secp256k1_zkp::SecretKey;
use elements::{
opcodes, LockTime, OutPoint, Script, Sequence, Transaction, TxIn, TxInWitness, TxOut,
TxOutWitness,
opcodes, AddressParams, LockTime, OutPoint, Script, Sequence, Transaction, TxIn, TxInWitness,
TxOut, TxOutWitness,
};
use log::{debug, trace};
use log::{debug, error, info, trace, warn};
use tokio::time;

use crate::chain::client::ChainClient;
use crate::claimer::tree::SwapTree;
Expand All @@ -20,26 +23,163 @@ use crate::db::models::PendingCovenant;
pub struct Constructor {
db: db::Pool,
chain_client: ChainClient,
sweep_time: u64,
sweep_interval: u64,
address_params: &'static AddressParams,
}

impl Constructor {
pub fn new(db: db::Pool, chain_client: ChainClient) -> Constructor {
Constructor { db, chain_client }
pub fn new(
db: db::Pool,
chain_client: ChainClient,
sweep_time: u64,
sweep_interval: u64,
address_params: &'static AddressParams,
) -> Constructor {
Constructor {
db,
sweep_time,
chain_client,
address_params,
sweep_interval,
}
}

pub async fn start_interval(self) {
if self.clone().claim_instantly() {
info!("Broadcasting sweeps instantly");
return;
}

info!(
"Broadcasting claims {} seconds after lockup transactions and checking on interval of {} seconds",
self.sweep_time,
self.sweep_interval
);
let mut interval = time::interval(time::Duration::from_secs(self.sweep_interval));

self.clone().broadcast().await;

loop {
interval.tick().await;

trace!("Checking for claims to broadcast");
self.clone().broadcast().await;
}
}

pub async fn schedule_broadcast(self, covenant: PendingCovenant, lockup_tx: Transaction) {
if self.clone().claim_instantly() {
self.broadcast_covenant(covenant, lockup_tx).await;
return;
}

debug!(
"Scheduling claim of {}",
hex::encode(covenant.output_script.clone())
);
match db::helpers::set_covenant_transaction(
self.db,
covenant.output_script,
hex::decode(lockup_tx.txid().to_string()).unwrap(),
Utc::now().naive_utc(),
) {
Ok(_) => {}
Err(err) => {
warn!("Could not schedule covenant claim: {}", err);
return;
}
};
}

async fn broadcast(self) {
let covenants = match db::helpers::get_covenants_to_claim(
self.clone().db,
Utc::now()
.sub(TimeDelta::seconds(self.sweep_time as i64))
.naive_utc(),
) {
Ok(res) => res,
Err(err) => {
warn!("Could not fetch covenants to claim: {}", err);
return;
}
};

if covenants.len() == 0 {
return;
}

debug!("Broadcasting {} claims", covenants.len());

let self_clone = self.clone();
for cov in covenants {
let tx = match self_clone
.clone()
.chain_client
.get_transaction(hex::encode(cov.tx_id.clone().unwrap()))
.await
{
Ok(res) => res,
Err(err) => {
error!(
"Could not fetch transaction for {}: {}",
hex::encode(cov.clone().output_script),
err
);
return;
}
};

self_clone.clone().broadcast_covenant(cov, tx).await;
}
}

async fn broadcast_covenant(self, cov: PendingCovenant, tx: Transaction) {
match self.clone().broadcast_tx(cov.clone(), tx).await {
Ok(tx) => {
info!(
"Broadcast claim for {}: {}",
hex::encode(cov.clone().output_script),
tx.txid().to_string(),
)
}
Err(err) => {
error!(
"Could not broadcast claim for {}: {}",
hex::encode(cov.clone().output_script),
err
)
}
}
}

pub async fn broadcast_claim(
async fn broadcast_tx(
self,
covenant: PendingCovenant,
lockup_tx: Transaction,
vout: u32,
prevout: &TxOut,
) -> Result<Transaction, Box<dyn Error>> {
) -> Result<Transaction, Box<dyn Error + Send + Sync>> {
let tree = serde_json::from_str::<SwapTree>(covenant.swap_tree.as_str()).unwrap();
let (prevout, vout) = match tree.clone().find_output(
lockup_tx.clone(),
covenant.clone().internal_key,
self.address_params,
) {
Some(res) => res,
None => {
return Err(format!(
"could not find swap output for {}",
hex::encode(covenant.output_script)
)
.into());
}
};

debug!(
"Broadcasting claim for: {}",
hex::encode(covenant.clone().output_script)
);

let tree = serde_json::from_str::<SwapTree>(covenant.swap_tree.as_str()).unwrap();
let cov_details = tree.clone().covenant_details().unwrap();

let mut witness = Witness::new();
Expand Down Expand Up @@ -195,9 +335,13 @@ impl Constructor {
{
Ok(tx)
} else {
Err(err)
Err(err.to_string().into())
}
}
}
}

fn claim_instantly(self) -> bool {
self.sweep_interval == 0
}
}
47 changes: 24 additions & 23 deletions src/claimer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use crossbeam_channel::Receiver;
use std::cmp;
use std::error::Error;

use elements::Transaction;
use elements::{AddressParams, Transaction};
use log::{debug, error, info, trace, warn};
use rayon::iter::IntoParallelRefIterator;
use rayon::iter::ParallelIterator;
Expand All @@ -26,16 +26,33 @@ pub struct Claimer {
}

impl Claimer {
pub fn new(db: db::Pool, chain_client: ChainClient) -> Claimer {
pub fn new(
db: db::Pool,
chain_client: ChainClient,
sweep_time: u64,
sweep_interval: u64,
address_param: &'static AddressParams,
) -> Claimer {
Claimer {
constructor: Constructor::new(db.clone(), chain_client.clone()),
constructor: Constructor::new(
db.clone(),
chain_client.clone(),
sweep_time,
sweep_interval,
address_param,
),
db,
chain_client,
}
}

pub fn start(self) {
debug!("Starting claimer");
let constructor_clone = self.constructor.clone();
tokio::spawn(async move {
constructor_clone.start_interval().await;
});

let tx_clone = self.clone();
let tx_receiver = self.clone().chain_client.get_tx_receiver();
tokio::spawn(async move {
Expand Down Expand Up @@ -200,27 +217,11 @@ impl Claimer {
tx.txid().to_string(),
vout
);
match self

self.clone()
.constructor
.clone()
.broadcast_claim(covenant.clone(), tx.clone(), vout as u32, out)
.await
{
Ok(tx) => {
info!(
"Broadcasted claim for {}: {}",
hex::encode(covenant.clone().output_script),
tx.txid().to_string(),
)
}
Err(err) => {
error!(
"Could not broadcast claim for {}: {}",
hex::encode(covenant.clone().output_script),
err
)
}
};
.schedule_broadcast(covenant, tx.clone())
.await;
}
None => {}
};
Expand Down
Loading

0 comments on commit e055199

Please sign in to comment.