diff --git a/Cargo.toml b/Cargo.toml index 92c78923..68f4586f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -27,6 +27,9 @@ rand = "0.3" bitcoin_hashes = "0.3" bitcoinconsensus = { version = "0.16", optional = true } +[dev-dependencies] +tempfile = "3" + [dependencies.serde] version = "1" optional = true diff --git a/src/consensus/encode.rs b/src/consensus/encode.rs index c7788140..4a5fd543 100644 --- a/src/consensus/encode.rs +++ b/src/consensus/encode.rs @@ -204,17 +204,29 @@ pub fn serialize_hex(data: &T) -> String pub fn deserialize<'a, T>(data: &'a [u8]) -> Result where T: Decodable> { - let mut decoder = Cursor::new(data); - let rv = Decodable::consensus_decode(&mut decoder)?; + let (rv, consumed) = deserialize_partial(data)?; - // Fail if data is not consumed entirely. - if decoder.position() == data.len() as u64 { + // Fail if data are not consumed entirely. + if consumed == data.len() { Ok(rv) } else { Err(Error::ParseFailed("data not consumed entirely when explicitly deserializing")) } } +/// Deserialize an object from a vector, but will not report an error if said deserialization +/// doesn't consume the entire vector. +pub fn deserialize_partial<'a, T>(data: &'a [u8]) -> Result<(T, usize), Error> + where T: Decodable> +{ + let mut decoder = Cursor::new(data); + let rv = Decodable::consensus_decode(&mut decoder)?; + let consumed = decoder.position() as usize; + + Ok((rv, consumed)) +} + + /// A simple Encoder trait pub trait Encoder { /// Output a 64-bit uint diff --git a/src/consensus/mod.rs b/src/consensus/mod.rs index db59ce40..634b23ca 100644 --- a/src/consensus/mod.rs +++ b/src/consensus/mod.rs @@ -21,5 +21,6 @@ pub mod encode; pub mod params; -pub use self::encode::{Encodable, Decodable, Encoder, Decoder, serialize, deserialize}; +pub use self::encode::{Encodable, Decodable, Encoder, Decoder, + serialize, deserialize, deserialize_partial}; pub use self::params::Params; diff --git a/src/network/message.rs b/src/network/message.rs index 6066617b..e5969215 100644 --- a/src/network/message.rs +++ b/src/network/message.rs @@ -21,7 +21,6 @@ use std::iter; use std::io::Cursor; -use std::sync::mpsc::Sender; use blockdata::block; use blockdata::transaction; @@ -32,7 +31,6 @@ use network::message_filter; use consensus::encode::{Decodable, Encodable}; use consensus::encode::CheckedData; use consensus::encode::{self, serialize, Encoder, Decoder}; -use util; /// Serializer for command string #[derive(PartialEq, Eq, Clone, Debug)] @@ -63,6 +61,7 @@ impl Decodable for CommandString { } } +#[derive(Debug)] /// A Network message pub struct RawNetworkMessage { /// Magic bytes to identify the network these messages are meant for @@ -71,14 +70,6 @@ pub struct RawNetworkMessage { pub payload: NetworkMessage } -/// A response from the peer-connected socket -pub enum SocketResponse { - /// A message was received - MessageReceived(NetworkMessage), - /// An error occurred and the socket needs to close - ConnectionFailed(util::Error, Sender<()>) -} - #[derive(Clone, PartialEq, Eq, Debug)] /// A Network message payload. Proper documentation is available on at /// [Bitcoin Wiki: Protocol Specification](https://en.bitcoin.it/wiki/Protocol_specification) @@ -237,8 +228,7 @@ impl Decodable for RawNetworkMessage { #[cfg(test)] mod test { use super::{RawNetworkMessage, NetworkMessage, CommandString}; - - use consensus::encode::{deserialize, serialize}; + use consensus::encode::{deserialize, deserialize_partial, serialize}; #[test] fn serialize_commandstring_test() { @@ -290,4 +280,89 @@ mod test { 0x00, 0x00, 0x00, 0x00, 0x5d, 0xf6, 0xe0, 0xe2]); } + #[test] + fn deserialize_getaddr_test() { + let msg = deserialize( + &[0xf9, 0xbe, 0xb4, 0xd9, 0x67, 0x65, 0x74, 0x61, + 0x64, 0x64, 0x72, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x5d, 0xf6, 0xe0, 0xe2]); + let preimage = RawNetworkMessage { magic: 0xd9b4bef9, payload: NetworkMessage::GetAddr }; + assert!(msg.is_ok()); + let msg : RawNetworkMessage = msg.unwrap(); + assert_eq!(preimage.magic, msg.magic); + assert_eq!(preimage.payload, msg.payload); + } + + #[test] + fn deserialize_version_test() { + let msg = deserialize::( + &[ 0xf9, 0xbe, 0xb4, 0xd9, 0x76, 0x65, 0x72, 0x73, + 0x69, 0x6f, 0x6e, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x66, 0x00, 0x00, 0x00, 0xbe, 0x61, 0xb8, 0x27, + 0x7f, 0x11, 0x01, 0x00, 0x0d, 0x04, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0xf0, 0x0f, 0x4d, 0x5c, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0xff, 0xff, + 0x5b, 0xf0, 0x8c, 0x80, 0xb4, 0xbd, 0x0d, 0x04, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0xfa, 0xa9, 0x95, 0x59, 0xcc, 0x68, 0xa1, 0xc1, + 0x10, 0x2f, 0x53, 0x61, 0x74, 0x6f, 0x73, 0x68, + 0x69, 0x3a, 0x30, 0x2e, 0x31, 0x37, 0x2e, 0x31, + 0x2f, 0x93, 0x8c, 0x08, 0x00, 0x01 ]); + + assert!(msg.is_ok()); + let msg = msg.unwrap(); + assert_eq!(msg.magic, 0xd9b4bef9); + if let NetworkMessage::Version(version_msg) = msg.payload { + assert_eq!(version_msg.version, 70015); + assert_eq!(version_msg.services, 1037); + assert_eq!(version_msg.timestamp, 1548554224); + assert_eq!(version_msg.nonce, 13952548347456104954); + assert_eq!(version_msg.user_agent, "/Satoshi:0.17.1/"); + assert_eq!(version_msg.start_height, 560275); + assert_eq!(version_msg.relay, true); + } else { + panic!("Wrong message type"); + } + } + + #[test] + fn deserialize_partial_message_test() { + let data = [ 0xf9, 0xbe, 0xb4, 0xd9, 0x76, 0x65, 0x72, 0x73, + 0x69, 0x6f, 0x6e, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x66, 0x00, 0x00, 0x00, 0xbe, 0x61, 0xb8, 0x27, + 0x7f, 0x11, 0x01, 0x00, 0x0d, 0x04, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0xf0, 0x0f, 0x4d, 0x5c, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0xff, 0xff, + 0x5b, 0xf0, 0x8c, 0x80, 0xb4, 0xbd, 0x0d, 0x04, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0xfa, 0xa9, 0x95, 0x59, 0xcc, 0x68, 0xa1, 0xc1, + 0x10, 0x2f, 0x53, 0x61, 0x74, 0x6f, 0x73, 0x68, + 0x69, 0x3a, 0x30, 0x2e, 0x31, 0x37, 0x2e, 0x31, + 0x2f, 0x93, 0x8c, 0x08, 0x00, 0x01, 0, 0 ]; + let msg = deserialize_partial::(&data); + assert!(msg.is_ok()); + + let (msg, consumed) = msg.unwrap(); + assert_eq!(consumed, data.to_vec().len() - 2); + assert_eq!(msg.magic, 0xd9b4bef9); + if let NetworkMessage::Version(version_msg) = msg.payload { + assert_eq!(version_msg.version, 70015); + assert_eq!(version_msg.services, 1037); + assert_eq!(version_msg.timestamp, 1548554224); + assert_eq!(version_msg.nonce, 13952548347456104954); + assert_eq!(version_msg.user_agent, "/Satoshi:0.17.1/"); + assert_eq!(version_msg.start_height, 560275); + assert_eq!(version_msg.relay, true); + } else { + panic!("Wrong message type"); + } + } } diff --git a/src/network/mod.rs b/src/network/mod.rs index 21aaeaba..d8fd9539 100644 --- a/src/network/mod.rs +++ b/src/network/mod.rs @@ -29,6 +29,7 @@ pub mod message; pub mod message_blockdata; pub mod message_network; pub mod message_filter; +pub mod stream_reader; /// Network error #[derive(Debug)] @@ -50,14 +51,14 @@ impl fmt::Display for Error { } } -impl error::Error for Error { - fn cause(&self) -> Option<&error::Error> { - match *self { - Error::Io(ref e) => Some(e), - Error::SocketMutexPoisoned | Error::SocketNotConnectedToPeer => None, - } +#[doc(hidden)] +impl From for Error { + fn from(err: io::Error) -> Self { + Error::Io(err) } +} +impl error::Error for Error { fn description(&self) -> &str { match *self { Error::Io(ref e) => e.description(), @@ -65,4 +66,11 @@ impl error::Error for Error { Error::SocketNotConnectedToPeer => "not connected to peer", } } + + fn cause(&self) -> Option<&error::Error> { + match *self { + Error::Io(ref e) => Some(e), + Error::SocketMutexPoisoned | Error::SocketNotConnectedToPeer => None, + } + } } diff --git a/src/network/stream_reader.rs b/src/network/stream_reader.rs new file mode 100644 index 00000000..23de2f0e --- /dev/null +++ b/src/network/stream_reader.rs @@ -0,0 +1,327 @@ +// Rust Bitcoin Library +// Written in 2014 by +// Andrew Poelstra +// +// To the extent possible under law, the author(s) have dedicated all +// copyright and related and neighboring rights to this software to +// the public domain worldwide. This software is distributed without +// any warranty. +// +// You should have received a copy of the CC0 Public Domain Dedication +// along with this software. +// If not, see . +// + +//! Stream reader +//! +//! This module defines `StreamReader` struct and its implementation which is used +//! for parsing incoming stream into separate `RawNetworkMessage`s, handling assembling +//! messages from multiple packets or dealing with partial or multiple messages in the stream +//! (like can happen with reading from TCP socket) +//! + +use std::fmt; +use std::io; +use std::io::Read; + +use network::message::RawNetworkMessage; +use consensus::encode; + +/// Struct used to configure stream reader function +pub struct StreamReader<'a> { + /// Stream to read from + pub stream: &'a mut Read, + /// I/O buffer + data: Vec, + /// Buffer containing unparsed message part + unparsed: Vec +} + +impl<'a> fmt::Debug for StreamReader<'a> { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + write!(f, "StreamReader with buffer_size={} and unparsed content {:?}", + self.data.capacity(), self.unparsed) + } +} + +impl<'a> StreamReader<'a> { + /// Constructs new stream reader for a given input stream `stream` with + /// optional parameter `buffer_size` determining reading buffer size + pub fn new(stream: &mut Read, buffer_size: Option) -> StreamReader { + StreamReader { + stream, + data: vec![0u8; buffer_size.unwrap_or(64 * 1024)], + unparsed: vec![] + } + } + + /// Reads stream and parses next message from its current input, + /// also taking into account previously unparsed partial message (if there was such). + /// + pub fn next_message(&mut self) -> Result { + loop { + match encode::deserialize_partial::(&self.unparsed) { + // In this case we just have an incomplete data, so we need to read more + Err(encode::Error::Io(ref err)) if err.kind () == io::ErrorKind::UnexpectedEof => { + let count = self.stream.read(&mut self.data)?; + if count > 0 { + self.unparsed.extend(self.data[0..count].iter()); + } + else { + return Err(encode::Error::Io(io::Error::from(io::ErrorKind::UnexpectedEof))); + } + }, + Err(err) => return Err(err), + // We have successfully read from the buffer + Ok((message, index)) => { + self.unparsed.drain(..index); + return Ok(message) + }, + } + } + } +} + +#[cfg(test)] +mod test { + extern crate tempfile; + + use std::thread; + use std::fs::File; + use std::time::Duration; + use std::io::{Write, Seek, SeekFrom}; + use std::net::{TcpListener, TcpStream, Shutdown}; + use std::thread::JoinHandle; + + use super::StreamReader; + use network::message::{NetworkMessage, RawNetworkMessage}; + + // First, let's define some byte arrays for sample messages - dumps are taken from live + // Bitcoin Core node v0.17.1 with Wireshark + const MSG_VERSION: [u8; 126] = [ + 0xf9, 0xbe, 0xb4, 0xd9, 0x76, 0x65, 0x72, 0x73, + 0x69, 0x6f, 0x6e, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x66, 0x00, 0x00, 0x00, 0xbe, 0x61, 0xb8, 0x27, + 0x7f, 0x11, 0x01, 0x00, 0x0d, 0x04, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0xf0, 0x0f, 0x4d, 0x5c, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0xff, 0xff, + 0x5b, 0xf0, 0x8c, 0x80, 0xb4, 0xbd, 0x0d, 0x04, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0xfa, 0xa9, 0x95, 0x59, 0xcc, 0x68, 0xa1, 0xc1, + 0x10, 0x2f, 0x53, 0x61, 0x74, 0x6f, 0x73, 0x68, + 0x69, 0x3a, 0x30, 0x2e, 0x31, 0x37, 0x2e, 0x31, + 0x2f, 0x93, 0x8c, 0x08, 0x00, 0x01 + ]; + + const MSG_VERACK: [u8; 24] = [ + 0xf9, 0xbe, 0xb4, 0xd9, 0x76, 0x65, 0x72, 0x61, + 0x63, 0x6b, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x5d, 0xf6, 0xe0, 0xe2 + ]; + + const MSG_PING: [u8; 32] = [ + 0xf9, 0xbe, 0xb4, 0xd9, 0x70, 0x69, 0x6e, 0x67, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x08, 0x00, 0x00, 0x00, 0x24, 0x67, 0xf1, 0x1d, + 0x64, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + ]; + + const MSG_ALERT: [u8; 192] = [ + 0xf9, 0xbe, 0xb4, 0xd9, 0x61, 0x6c, 0x65, 0x72, + 0x74, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0xa8, 0x00, 0x00, 0x00, 0x1b, 0xf9, 0xaa, 0xea, + 0x60, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0xff, 0xff, 0xff, + 0x7f, 0x00, 0x00, 0x00, 0x00, 0xff, 0xff, 0xff, + 0x7f, 0xfe, 0xff, 0xff, 0x7f, 0x01, 0xff, 0xff, + 0xff, 0x7f, 0x00, 0x00, 0x00, 0x00, 0xff, 0xff, + 0xff, 0x7f, 0x00, 0xff, 0xff, 0xff, 0x7f, 0x00, + 0x2f, 0x55, 0x52, 0x47, 0x45, 0x4e, 0x54, 0x3a, + 0x20, 0x41, 0x6c, 0x65, 0x72, 0x74, 0x20, 0x6b, + 0x65, 0x79, 0x20, 0x63, 0x6f, 0x6d, 0x70, 0x72, + 0x6f, 0x6d, 0x69, 0x73, 0x65, 0x64, 0x2c, 0x20, + 0x75, 0x70, 0x67, 0x72, 0x61, 0x64, 0x65, 0x20, + 0x72, 0x65, 0x71, 0x75, 0x69, 0x72, 0x65, 0x64, + 0x00, 0x46, 0x30, 0x44, 0x02, 0x20, 0x65, 0x3f, + 0xeb, 0xd6, 0x41, 0x0f, 0x47, 0x0f, 0x6b, 0xae, + 0x11, 0xca, 0xd1, 0x9c, 0x48, 0x41, 0x3b, 0xec, + 0xb1, 0xac, 0x2c, 0x17, 0xf9, 0x08, 0xfd, 0x0f, + 0xd5, 0x3b, 0xdc, 0x3a, 0xbd, 0x52, 0x02, 0x20, + 0x6d, 0x0e, 0x9c, 0x96, 0xfe, 0x88, 0xd4, 0xa0, + 0xf0, 0x1e, 0xd9, 0xde, 0xda, 0xe2, 0xb6, 0xf9, + 0xe0, 0x0d, 0xa9, 0x4c, 0xad, 0x0f, 0xec, 0xaa, + 0xe6, 0x6e, 0xcf, 0x68, 0x9b, 0xf7, 0x1b, 0x50 + ]; + + // Helper functions that checks parsed versions of the messages from the byte arrays above + fn check_version_msg(msg: &RawNetworkMessage) { + assert_eq!(msg.magic, 0xd9b4bef9); + if let NetworkMessage::Version(ref version_msg) = msg.payload { + assert_eq!(version_msg.version, 70015); + assert_eq!(version_msg.services, 1037); + assert_eq!(version_msg.timestamp, 1548554224); + assert_eq!(version_msg.nonce, 13952548347456104954); + assert_eq!(version_msg.user_agent, "/Satoshi:0.17.1/"); + assert_eq!(version_msg.start_height, 560275); + assert_eq!(version_msg.relay, true); + } else { + panic!("Wrong message type: expected VersionMessage"); + } + } + + fn check_alert_msg(msg: &RawNetworkMessage) { + assert_eq!(msg.magic, 0xd9b4bef9); + if let NetworkMessage::Alert(ref alert) = msg.payload { + assert_eq!(alert.clone(), [ + 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0xff, 0xff, 0xff, + 0x7f, 0x00, 0x00, 0x00, 0x00, 0xff, 0xff, 0xff, + 0x7f, 0xfe, 0xff, 0xff, 0x7f, 0x01, 0xff, 0xff, + 0xff, 0x7f, 0x00, 0x00, 0x00, 0x00, 0xff, 0xff, + 0xff, 0x7f, 0x00, 0xff, 0xff, 0xff, 0x7f, 0x00, + 0x2f, 0x55, 0x52, 0x47, 0x45, 0x4e, 0x54, 0x3a, + 0x20, 0x41, 0x6c, 0x65, 0x72, 0x74, 0x20, 0x6b, + 0x65, 0x79, 0x20, 0x63, 0x6f, 0x6d, 0x70, 0x72, + 0x6f, 0x6d, 0x69, 0x73, 0x65, 0x64, 0x2c, 0x20, + 0x75, 0x70, 0x67, 0x72, 0x61, 0x64, 0x65, 0x20, + 0x72, 0x65, 0x71, 0x75, 0x69, 0x72, 0x65, 0x64, + 0x00, + ].to_vec()); + } else { + panic!("Wrong message type: expected AlertMessage"); + } + } + + #[test] + fn parse_multipartmsg_test() { + let mut tmpfile: File = tempfile::tempfile().unwrap(); + let mut reader = StreamReader::new(&mut tmpfile, None); + reader.unparsed = MSG_ALERT[..24].to_vec(); + assert!(reader.next_message().is_err()); + assert_eq!(reader.unparsed.len(), 24); + + reader.unparsed = MSG_ALERT.to_vec(); + let message = reader.next_message().unwrap(); + assert_eq!(reader.unparsed.len(), 0); + + check_alert_msg(&message); + } + + fn init_stream(buf: &[u8]) -> File { + let mut tmpfile: File = tempfile::tempfile().unwrap(); + write_file(&mut tmpfile, &buf); + tmpfile + } + + fn write_file(tmpfile: &mut File, buf: &[u8]) { + tmpfile.seek(SeekFrom::End(0)).unwrap(); + tmpfile.write(&buf).unwrap(); + tmpfile.flush().unwrap(); + tmpfile.seek(SeekFrom::Start(0)).unwrap(); + } + + #[test] + fn read_singlemsg_test() { + let mut stream = init_stream(&MSG_VERSION); + let message = StreamReader::new(&mut stream, None).next_message().unwrap(); + + check_version_msg(&message); + } + + #[test] + fn read_doublemsgs_test() { + let mut stream = init_stream(&MSG_VERSION); + write_file(&mut stream, &MSG_PING); + + let mut reader = StreamReader::new(&mut stream, None); + let message = reader.next_message().unwrap(); + check_version_msg(&message); + + let msg = reader.next_message().unwrap(); + assert_eq!(msg.magic, 0xd9b4bef9); + if let NetworkMessage::Ping(nonce) = msg.payload { + assert_eq!(nonce, 100); + } else { + panic!("Wrong message type, expected PingMessage"); + } + } + + // Helper function that set ups emulation of client-server TCP connection for + // testing message transfer via TCP packets + fn serve_tcp(pieces: Vec>) -> (JoinHandle<()>, TcpStream) { + // 1. Creating server part (emulating Bitcoin Core node) + let listener = TcpListener::bind(format!("127.0.0.1:{}", 0)).unwrap(); + let port = listener.local_addr().unwrap().port(); + // 2. Spawning thread that will be writing our messages to the TCP Stream at the server side + // in async mode + let handle = thread::spawn(move || { + for ostream in listener.incoming() { + let mut ostream = ostream.unwrap(); + + for piece in pieces { + ostream.write(&piece[..]).unwrap(); + ostream.flush().unwrap(); + thread::sleep(Duration::from_secs(1)); + } + + ostream.shutdown(Shutdown::Both).unwrap(); + break; + } + }); + + // 3. Creating client side of the TCP socket connection + thread::sleep(Duration::from_secs(1)); + let istream = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap(); + + return (handle, istream) + } + + #[test] + fn read_multipartmsg_test() { + // Setting up TCP connection emulation + let (handle, mut istream) = serve_tcp(vec![ + // single message split in two parts to emulate real network conditions + MSG_VERSION[..24].to_vec(), MSG_VERSION[24..].to_vec() + ]); + let mut reader = StreamReader::new(&mut istream, None); + + // Reading and checking the whole message back + let message = reader.next_message().unwrap(); + check_version_msg(&message); + + // Waiting TCP server thread to terminate + handle.join().unwrap(); + } + + #[test] + fn read_sequencemsg_test() { + // Setting up TCP connection emulation + let (handle, mut istream) = serve_tcp(vec![ + // Real-world Bitcoin core communication case for /Satoshi:0.17.1/ + MSG_VERSION[..23].to_vec(), MSG_VERSION[23..].to_vec(), + MSG_VERACK.to_vec(), + MSG_ALERT[..24].to_vec(), MSG_ALERT[24..].to_vec() + ]); + let mut reader = StreamReader::new(&mut istream, None); + + // Reading and checking the first message (Version) + let message = reader.next_message().unwrap(); + check_version_msg(&message); + + // Reading and checking the second message (Verack) + let msg = reader.next_message().unwrap(); + assert_eq!(msg.magic, 0xd9b4bef9); + assert_eq!(msg.payload, NetworkMessage::Verack, "Wrong message type, expected VerackMessage"); + + // Reading and checking the third message (Alert) + let msg = reader.next_message().unwrap(); + check_alert_msg(&msg); + + // Waiting TCP server thread to terminate + handle.join().unwrap(); + } +}