Reconnect on network errors; use Mutexes to control Socket access more precisely

This commit is contained in:
Andrew Poelstra 2014-07-24 08:27:40 -07:00
parent 9be493eeaf
commit 809bad5ff5
2 changed files with 58 additions and 22 deletions

View File

@ -77,8 +77,9 @@ pub trait Listener {
recv_tx.send(payload); recv_tx.send(payload);
} }
Err(e) => { Err(e) => {
println!("Received error {:} when decoding message. Pausing for 1sec.", e); println!("Received error {:} when decoding message. Pausing for 5 seconds then reconnecting.", e);
timer::sleep(1000); timer::sleep(5000);
sock.reconnect();
} }
} }
} }

View File

@ -20,8 +20,10 @@
use time::now; use time::now;
use std::rand::task_rng; use std::rand::task_rng;
use rand::Rng; use rand::Rng;
use std::io::{BufferedReader, BufferedWriter};
use std::io::{IoError, IoResult, NotConnected, OtherIoError, standard_error}; use std::io::{IoError, IoResult, NotConnected, OtherIoError, standard_error};
use std::io::net::{ip, tcp}; use std::io::net::{ip, tcp};
use std::sync::{Arc, Mutex};
use network::constants; use network::constants;
use network::address::Address; use network::address::Address;
@ -47,8 +49,14 @@ fn ipaddr_to_bitcoin_addr(ipaddr: &ip::IpAddr) -> [u8, ..16] {
/// A network socket along with information about the peer /// A network socket along with information about the peer
#[deriving(Clone)] #[deriving(Clone)]
pub struct Socket { pub struct Socket {
/// The underlying network data stream /// The underlying socket, which is only used directly to (a) get
stream: Option<tcp::TcpStream>, /// information about the socket, and (b) to close down the socket,
/// quickly cancelling any read/writes and unlocking the Mutexes.
socket: Option<tcp::TcpStream>,
/// The underlying network data stream read buffer
buffered_reader: Arc<Mutex<Option<BufferedReader<tcp::TcpStream>>>>,
/// The underlying network data stream write buffer
buffered_writer: Arc<Mutex<Option<BufferedWriter<tcp::TcpStream>>>>,
/// Services supported by us /// Services supported by us
pub services: u64, pub services: u64,
/// Our user agent /// Our user agent
@ -65,7 +73,9 @@ impl Socket {
pub fn new(network: constants::Network) -> Socket { pub fn new(network: constants::Network) -> Socket {
let mut rng = task_rng(); let mut rng = task_rng();
Socket { Socket {
stream: None, socket: None,
buffered_reader: Arc::new(Mutex::new(None)),
buffered_writer: Arc::new(Mutex::new(None)),
services: 0, services: 0,
version_nonce: rng.gen(), version_nonce: rng.gen(),
user_agent: String::from_str(constants::USER_AGENT), user_agent: String::from_str(constants::USER_AGENT),
@ -75,18 +85,40 @@ impl Socket {
/// Connect to the peer /// Connect to the peer
pub fn connect(&mut self, host: &str, port: u16) -> IoResult<()> { pub fn connect(&mut self, host: &str, port: u16) -> IoResult<()> {
// Boot off any lingering readers or writers
if self.socket.is_some() {
let _ = self.socket.get_mut_ref().close_read();
let _ = self.socket.get_mut_ref().close_write();
}
// These locks should just pop open now
let mut reader_lock = self.buffered_reader.lock();
let mut writer_lock = self.buffered_writer.lock();
match tcp::TcpStream::connect(host, port) { match tcp::TcpStream::connect(host, port) {
Ok(s) => { Ok(s) => {
self.stream = Some(s); *reader_lock = Some(BufferedReader::new(s.clone()));
*writer_lock = Some(BufferedWriter::new(s.clone()));
self.socket = Some(s);
Ok(()) Ok(())
} }
Err(e) => Err(e) Err(e) => Err(e)
} }
} }
/// Reset the connection to the peer
pub fn reconnect(&mut self) -> IoResult<()> {
let (host, port) = match self.socket {
Some(ref mut s) => match s.peer_name() {
Ok(addr) => (format!("{}", addr.ip), addr.port),
Err(e) => { return Err(e); }
},
None => { return Err(standard_error(NotConnected)); }
};
self.connect(host.as_slice(), port)
}
/// Peer address /// Peer address
pub fn receiver_address(&mut self) -> IoResult<Address> { pub fn receiver_address(&mut self) -> IoResult<Address> {
match self.stream { match self.socket {
Some(ref mut s) => match s.peer_name() { Some(ref mut s) => match s.peer_name() {
Ok(addr) => { Ok(addr) => {
Ok(Address { Ok(Address {
@ -103,7 +135,7 @@ impl Socket {
/// Our own address /// Our own address
pub fn sender_address(&mut self) -> IoResult<Address> { pub fn sender_address(&mut self) -> IoResult<Address> {
match self.stream { match self.socket {
Some(ref mut s) => match s.socket_name() { Some(ref mut s) => match s.socket_name() {
Ok(addr) => { Ok(addr) => {
Ok(Address { Ok(Address {
@ -148,15 +180,13 @@ impl Socket {
/// Send a general message across the line /// Send a general message across the line
pub fn send_message(&mut self, payload: NetworkMessage) -> IoResult<()> { pub fn send_message(&mut self, payload: NetworkMessage) -> IoResult<()> {
if self.stream.is_none() { let mut writer_lock = self.buffered_writer.lock();
Err(standard_error(NotConnected)) match *writer_lock.deref_mut() {
} None => Err(standard_error(NotConnected)),
else { Some(ref mut writer) => {
let stream = self.stream.get_mut_ref();
let message = RawNetworkMessage { magic: self.magic, payload: payload }; let message = RawNetworkMessage { magic: self.magic, payload: payload };
match stream.write(message.serialize().as_slice()) { try!(writer.write(message.serialize().as_slice()));
Ok(_) => Ok(()), writer.flush()
Err(e) => Err(e)
} }
} }
} }
@ -164,15 +194,16 @@ impl Socket {
/// Receive the next message from the peer, decoding the network header /// Receive the next message from the peer, decoding the network header
/// and verifying its correctness. Returns the undecoded payload. /// and verifying its correctness. Returns the undecoded payload.
pub fn receive_message(&mut self) -> IoResult<NetworkMessage> { pub fn receive_message(&mut self) -> IoResult<NetworkMessage> {
match self.stream { let mut reader_lock = self.buffered_reader.lock();
match *reader_lock.deref_mut() {
None => Err(standard_error(NotConnected)), None => Err(standard_error(NotConnected)),
Some(ref mut s) => { Some(ref mut buf) => {
let mut read_err = None; let mut read_err = None;
// We need a new scope since the closure in here borrows read_err, // We need a new scope since the closure in here borrows read_err,
// and we try to read it afterward. Letting `iter` go out fixes it. // and we try to read it afterward. Letting `iter` go out fixes it.
let ret: IoResult<RawNetworkMessage> = { let ret: IoResult<RawNetworkMessage> = {
// Set up iterator so we will catch network errors properly // Set up iterator so we will catch network errors properly
let iter = s.bytes().filter_map(|res| let iter = buf.bytes().filter_map(|res|
match res { match res {
Ok(ch) => Some(ch), Ok(ch) => Some(ch),
Err(e) => { read_err = Some(e); None } Err(e) => { read_err = Some(e); None }
@ -183,11 +214,15 @@ impl Socket {
// Return // Return
match read_err { match read_err {
// Network errors get priority since they are probably more meaningful // Network errors get priority since they are probably more meaningful
Some(e) => prepend_err("network", Err(e)), Some(e) => {
prepend_err("network", Err(e))
},
_ => { _ => {
match ret { match ret {
// Next come parse errors // Next come parse errors
Err(e) => prepend_err("decode", Err(e)), Err(e) => {
prepend_err("decode", Err(e))
},
Ok(ret) => { Ok(ret) => {
// Finally magic (this should come before parse error, but we can't // Finally magic (this should come before parse error, but we can't
// get to it if the deserialization failed). TODO restructure this // get to it if the deserialization failed). TODO restructure this