diff --git a/src/rpc.rs b/src/rpc.rs index 7cee9d5f0..cc545c946 100644 --- a/src/rpc.rs +++ b/src/rpc.rs @@ -7,14 +7,14 @@ use serde_json::{from_str, Value}; use std::collections::HashMap; use std::io::{BufRead, BufReader, Write}; use std::net::{Shutdown, SocketAddr, TcpListener, TcpStream}; -use std::sync::mpsc::{Sender, SyncSender, TrySendError}; +use std::sync::mpsc::{self, Sender, SyncSender, Receiver, TrySendError}; use std::sync::{Arc, Mutex}; use std::thread; use crate::errors::*; use crate::metrics::{Gauge, HistogramOpts, HistogramVec, MetricOpts, Metrics}; use crate::query::{Query, Status}; -use crate::util::{spawn_thread, Channel, HeaderEntry, SyncChannel}; +use crate::util::{spawn_thread, Channel, HeaderEntry}; const ELECTRS_VERSION: &str = env!("CARGO_PKG_VERSION"); const PROTOCOL_VERSION: &str = "1.4"; @@ -74,7 +74,7 @@ struct Connection { status_hashes: HashMap, // ScriptHash -> StatusHash stream: TcpStream, addr: SocketAddr, - chan: SyncChannel, + chan: SyncSender, stats: Arc, relayfee: f64, } @@ -86,6 +86,7 @@ impl Connection { addr: SocketAddr, stats: Arc, relayfee: f64, + chan: SyncSender, ) -> Connection { Connection { query, @@ -93,7 +94,7 @@ impl Connection { status_hashes: HashMap::new(), stream, addr, - chan: SyncChannel::new(10), + chan, stats, relayfee, } @@ -251,7 +252,7 @@ impl Connection { let tx: Transaction = deserialize(&tx).chain_err(|| "failed to parse tx")?; let txid = self.query.broadcast(&tx)?; self.query.update_mempool()?; - if let Err(e) = self.chan.sender().try_send(Message::PeriodicUpdate) { + if let Err(e) = self.chan.try_send(Message::PeriodicUpdate) { warn!("failed to issue PeriodicUpdate after broadcast: {}", e); } Ok(json!(txid.to_hex())) @@ -390,10 +391,10 @@ impl Connection { Ok(()) } - fn handle_replies(&mut self) -> Result<()> { + fn handle_replies(&mut self, receiver: Receiver) -> Result<()> { let empty_params = json!([]); loop { - let msg = self.chan.receiver().recv().chain_err(|| "channel closed")?; + let msg = receiver.recv().chain_err(|| "channel closed")?; trace!("RPC {:?}", msg); match msg { Message::Request(line) => { @@ -451,11 +452,11 @@ impl Connection { } } - pub fn run(mut self) { + pub fn run(mut self, receiver: Receiver) { let reader = BufReader::new(self.stream.try_clone().expect("failed to clone TcpStream")); - let tx = self.chan.sender(); + let tx = self.chan.clone(); let child = spawn_thread("reader", || Connection::handle_requests(reader, tx)); - if let Err(e) = self.handle_replies() { + if let Err(e) = self.handle_replies(receiver) { error!( "[{}] connection handling failed: {}", self.addr, @@ -571,15 +572,16 @@ impl RPC { while let Some((stream, addr)) = acceptor.receiver().recv().unwrap() { // explicitely scope the shadowed variables for the new thread let query = Arc::clone(&query); - let senders = Arc::clone(&senders); let stats = Arc::clone(&stats); let garbage_sender = garbage_sender.clone(); + let (sender, receiver) = mpsc::sync_channel(10); + + senders.lock().unwrap().push(sender.clone()); let spawned = spawn_thread("peer", move || { info!("[{}] connected peer", addr); - let conn = Connection::new(query, stream, addr, stats, relayfee); - senders.lock().unwrap().push(conn.chan.sender()); - conn.run(); + let conn = Connection::new(query, stream, addr, stats, relayfee, sender); + conn.run(receiver); info!("[{}] disconnected peer", addr); let _ = garbage_sender.send(std::thread::current().id()); });