diff --git a/src/network/listener.rs b/src/network/listener.rs index 910b01d1..6bdb2d01 100644 --- a/src/network/listener.rs +++ b/src/network/listener.rs @@ -77,8 +77,9 @@ pub trait Listener { recv_tx.send(payload); } Err(e) => { - println!("Received error {:} when decoding message. Pausing for 1sec.", e); - timer::sleep(1000); + println!("Received error {:} when decoding message. Pausing for 5 seconds then reconnecting.", e); + timer::sleep(5000); + sock.reconnect(); } } } diff --git a/src/network/socket.rs b/src/network/socket.rs index 01d74802..1040e3d5 100644 --- a/src/network/socket.rs +++ b/src/network/socket.rs @@ -20,8 +20,10 @@ use time::now; use std::rand::task_rng; use rand::Rng; +use std::io::{BufferedReader, BufferedWriter}; use std::io::{IoError, IoResult, NotConnected, OtherIoError, standard_error}; use std::io::net::{ip, tcp}; +use std::sync::{Arc, Mutex}; use network::constants; use network::address::Address; @@ -47,8 +49,14 @@ fn ipaddr_to_bitcoin_addr(ipaddr: &ip::IpAddr) -> [u8, ..16] { /// A network socket along with information about the peer #[deriving(Clone)] pub struct Socket { - /// The underlying network data stream - stream: Option, + /// The underlying socket, which is only used directly to (a) get + /// information about the socket, and (b) to close down the socket, + /// quickly cancelling any read/writes and unlocking the Mutexes. + socket: Option, + /// The underlying network data stream read buffer + buffered_reader: Arc>>>, + /// The underlying network data stream write buffer + buffered_writer: Arc>>>, /// Services supported by us pub services: u64, /// Our user agent @@ -65,7 +73,9 @@ impl Socket { pub fn new(network: constants::Network) -> Socket { let mut rng = task_rng(); Socket { - stream: None, + socket: None, + buffered_reader: Arc::new(Mutex::new(None)), + buffered_writer: Arc::new(Mutex::new(None)), services: 0, version_nonce: rng.gen(), user_agent: String::from_str(constants::USER_AGENT), @@ -75,18 +85,40 @@ impl Socket { /// Connect to the peer pub fn connect(&mut self, host: &str, port: u16) -> IoResult<()> { + // Boot off any lingering readers or writers + if self.socket.is_some() { + let _ = self.socket.get_mut_ref().close_read(); + let _ = self.socket.get_mut_ref().close_write(); + } + // These locks should just pop open now + let mut reader_lock = self.buffered_reader.lock(); + let mut writer_lock = self.buffered_writer.lock(); match tcp::TcpStream::connect(host, port) { Ok(s) => { - self.stream = Some(s); + *reader_lock = Some(BufferedReader::new(s.clone())); + *writer_lock = Some(BufferedWriter::new(s.clone())); + self.socket = Some(s); Ok(()) } Err(e) => Err(e) } } + /// Reset the connection to the peer + pub fn reconnect(&mut self) -> IoResult<()> { + let (host, port) = match self.socket { + Some(ref mut s) => match s.peer_name() { + Ok(addr) => (format!("{}", addr.ip), addr.port), + Err(e) => { return Err(e); } + }, + None => { return Err(standard_error(NotConnected)); } + }; + self.connect(host.as_slice(), port) + } + /// Peer address pub fn receiver_address(&mut self) -> IoResult
{ - match self.stream { + match self.socket { Some(ref mut s) => match s.peer_name() { Ok(addr) => { Ok(Address { @@ -103,7 +135,7 @@ impl Socket { /// Our own address pub fn sender_address(&mut self) -> IoResult
{ - match self.stream { + match self.socket { Some(ref mut s) => match s.socket_name() { Ok(addr) => { Ok(Address { @@ -148,15 +180,13 @@ impl Socket { /// Send a general message across the line pub fn send_message(&mut self, payload: NetworkMessage) -> IoResult<()> { - if self.stream.is_none() { - Err(standard_error(NotConnected)) - } - else { - let stream = self.stream.get_mut_ref(); - let message = RawNetworkMessage { magic: self.magic, payload: payload }; - match stream.write(message.serialize().as_slice()) { - Ok(_) => Ok(()), - Err(e) => Err(e) + let mut writer_lock = self.buffered_writer.lock(); + match *writer_lock.deref_mut() { + None => Err(standard_error(NotConnected)), + Some(ref mut writer) => { + let message = RawNetworkMessage { magic: self.magic, payload: payload }; + try!(writer.write(message.serialize().as_slice())); + writer.flush() } } } @@ -164,15 +194,16 @@ impl Socket { /// Receive the next message from the peer, decoding the network header /// and verifying its correctness. Returns the undecoded payload. pub fn receive_message(&mut self) -> IoResult { - match self.stream { + let mut reader_lock = self.buffered_reader.lock(); + match *reader_lock.deref_mut() { None => Err(standard_error(NotConnected)), - Some(ref mut s) => { + Some(ref mut buf) => { let mut read_err = None; // We need a new scope since the closure in here borrows read_err, // and we try to read it afterward. Letting `iter` go out fixes it. let ret: IoResult = { // Set up iterator so we will catch network errors properly - let iter = s.bytes().filter_map(|res| + let iter = buf.bytes().filter_map(|res| match res { Ok(ch) => Some(ch), Err(e) => { read_err = Some(e); None } @@ -183,11 +214,15 @@ impl Socket { // Return match read_err { // Network errors get priority since they are probably more meaningful - Some(e) => prepend_err("network", Err(e)), + Some(e) => { + prepend_err("network", Err(e)) + }, _ => { match ret { // Next come parse errors - Err(e) => prepend_err("decode", Err(e)), + Err(e) => { + prepend_err("decode", Err(e)) + }, Ok(ret) => { // Finally magic (this should come before parse error, but we can't // get to it if the deserialization failed). TODO restructure this