From 4764dccbbe4cd04a6dc79771a686847d8e6e2edf Mon Sep 17 00:00:00 2001 From: Martin Habovstiak Date: Tue, 28 Jul 2020 18:43:08 +0200 Subject: [PATCH] Drop receiver in handle_replies When `Message::Done` was received before the cleanup loop executed it led to a deadlock since the loop didn't see the receiving end being disconnected and it also had strong reference via `Arc` preventing it from dropping. This change solves the problem by splitting the channel and only keeping the sender in `Connection`. The reciver is passed to `handle_replies` as an extra argument, making sure it will be used exactly once. The main advantage of this method over trying to keep the receiver inside `Connection` is that it hanles cases when the thread panicks and possibly forgetting explicit close in future refactors. Another advantage is being able to remove one `Arc::clone` and I have an idea for removing locking on `senders` in the future too. Closes #283 --- src/rpc.rs | 30 ++++++++++++++++-------------- 1 file changed, 16 insertions(+), 14 deletions(-) diff --git a/src/rpc.rs b/src/rpc.rs index 7cee9d5f0..cc545c946 100644 --- a/src/rpc.rs +++ b/src/rpc.rs @@ -7,14 +7,14 @@ use serde_json::{from_str, Value}; use std::collections::HashMap; use std::io::{BufRead, BufReader, Write}; use std::net::{Shutdown, SocketAddr, TcpListener, TcpStream}; -use std::sync::mpsc::{Sender, SyncSender, TrySendError}; +use std::sync::mpsc::{self, Sender, SyncSender, Receiver, TrySendError}; use std::sync::{Arc, Mutex}; use std::thread; use crate::errors::*; use crate::metrics::{Gauge, HistogramOpts, HistogramVec, MetricOpts, Metrics}; use crate::query::{Query, Status}; -use crate::util::{spawn_thread, Channel, HeaderEntry, SyncChannel}; +use crate::util::{spawn_thread, Channel, HeaderEntry}; const ELECTRS_VERSION: &str = env!("CARGO_PKG_VERSION"); const PROTOCOL_VERSION: &str = "1.4"; @@ -74,7 +74,7 @@ struct Connection { status_hashes: HashMap, // ScriptHash -> StatusHash stream: TcpStream, addr: SocketAddr, - chan: SyncChannel, + chan: SyncSender, stats: Arc, relayfee: f64, } @@ -86,6 +86,7 @@ impl Connection { addr: SocketAddr, stats: Arc, relayfee: f64, + chan: SyncSender, ) -> Connection { Connection { query, @@ -93,7 +94,7 @@ impl Connection { status_hashes: HashMap::new(), stream, addr, - chan: SyncChannel::new(10), + chan, stats, relayfee, } @@ -251,7 +252,7 @@ impl Connection { let tx: Transaction = deserialize(&tx).chain_err(|| "failed to parse tx")?; let txid = self.query.broadcast(&tx)?; self.query.update_mempool()?; - if let Err(e) = self.chan.sender().try_send(Message::PeriodicUpdate) { + if let Err(e) = self.chan.try_send(Message::PeriodicUpdate) { warn!("failed to issue PeriodicUpdate after broadcast: {}", e); } Ok(json!(txid.to_hex())) @@ -390,10 +391,10 @@ impl Connection { Ok(()) } - fn handle_replies(&mut self) -> Result<()> { + fn handle_replies(&mut self, receiver: Receiver) -> Result<()> { let empty_params = json!([]); loop { - let msg = self.chan.receiver().recv().chain_err(|| "channel closed")?; + let msg = receiver.recv().chain_err(|| "channel closed")?; trace!("RPC {:?}", msg); match msg { Message::Request(line) => { @@ -451,11 +452,11 @@ impl Connection { } } - pub fn run(mut self) { + pub fn run(mut self, receiver: Receiver) { let reader = BufReader::new(self.stream.try_clone().expect("failed to clone TcpStream")); - let tx = self.chan.sender(); + let tx = self.chan.clone(); let child = spawn_thread("reader", || Connection::handle_requests(reader, tx)); - if let Err(e) = self.handle_replies() { + if let Err(e) = self.handle_replies(receiver) { error!( "[{}] connection handling failed: {}", self.addr, @@ -571,15 +572,16 @@ impl RPC { while let Some((stream, addr)) = acceptor.receiver().recv().unwrap() { // explicitely scope the shadowed variables for the new thread let query = Arc::clone(&query); - let senders = Arc::clone(&senders); let stats = Arc::clone(&stats); let garbage_sender = garbage_sender.clone(); + let (sender, receiver) = mpsc::sync_channel(10); + + senders.lock().unwrap().push(sender.clone()); let spawned = spawn_thread("peer", move || { info!("[{}] connected peer", addr); - let conn = Connection::new(query, stream, addr, stats, relayfee); - senders.lock().unwrap().push(conn.chan.sender()); - conn.run(); + let conn = Connection::new(query, stream, addr, stats, relayfee, sender); + conn.run(receiver); info!("[{}] disconnected peer", addr); let _ = garbage_sender.send(std::thread::current().id()); });