Better RawNewtorkMessage deserealization from IO stream (#231)
Follow-up to https://github.com/rust-bitcoin/rust-bitcoin/pull/229 While working with remote peers over the network it is required to deserealize RawNetworkMessage from `TCPStream` to read the incoming messages. These messages can be partial – or one TCP packet can contain few of them. To make the library usable for such use cases, I have implemented the required functionality and covered it with unit tests. Sample usage: ```rust fn run() -> Result<(), Error> { // Opening stream to the remote bitcoind peer let mut stream = TcpStream::connect(SocketAddr::from(([37, 187, 0, 47], 8333)); let start = SystemTime::now(); // Constructing and sending `version` message to get some messages back from the remote peer let since_the_epoch = start.duration_since(UNIX_EPOCH) .expect("Time went backwards"); let version_msg = message::RawNetworkMessage { magic: constants::Network::Bitcoin.magic(), payload: message::NetworkMessage::Version(message_network::VersionMessage::new( 0, since_the_epoch.as_secs() as i64, address::Address::new(receiver, 0), address::Address::new(receiver, 0), 0, String::from("macx0r"), 0 )) }; stream.write(encode::serialize(&version_msg).as_slice())?; // Receiving incoming messages let mut buffer = vec![]; loop { let result = StreamReader::new(&mut stream, None).read_messages(); if let Err(err) = result { stream.shutdown(Shutdown::Both)?; return Err(Error::DataError(err)) } for msg in result.unwrap() { println!("Received message: {:?}", msg.payload); } } } ``` Sample output is the following: ``` Received message: Version(VersionMessage { version: 70015, services: 1037, timestamp: 1548637162, receiver: Address {services: 0, address: [0, 0, 0, 0, 0, 65535, 23536, 35968], port: 33716}, sender: Address {services: 1037, address: [0, 0, 0, 0, 0, 0, 0, 0], port: 0}, nonce: 1370726880972892633, user_agent: "/Satoshi:0.17.99/", start_height: 560412, relay: true }) Received message: Verack Received message: Alert([1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 255, 255, 255, 127, 0, 0, 0, 0, 255, 255, 255, 127, 254, 255, 255, 127, 1, 255, 255, 255, 127, 0, 0, 0, 0, 255, 255, 255, 127, 0, 255, 255, 255, 127, 0, 47, 85, 82, 71, 69, 78, 84, 58, 32, 65, 108, 101, 114, 116, 32, 107, 101, 121, 32, 99, 111, 109, 112, 114, 111, 109, 105, 115, 101, 100, 44, 32, 117, 112, 103, 114, 97, 100, 101, 32, 114, 101, 113, 117, 105, 114, 101, 100, 0]) ``` Working sample code can be found here: https://github.com/dr-orlovsky/bitcoinbigdata-netlistener
This commit is contained in:
parent
4bf99e79f8
commit
3c21e301aa
|
@ -27,6 +27,9 @@ rand = "0.3"
|
||||||
bitcoin_hashes = "0.3"
|
bitcoin_hashes = "0.3"
|
||||||
bitcoinconsensus = { version = "0.16", optional = true }
|
bitcoinconsensus = { version = "0.16", optional = true }
|
||||||
|
|
||||||
|
[dev-dependencies]
|
||||||
|
tempfile = "3"
|
||||||
|
|
||||||
[dependencies.serde]
|
[dependencies.serde]
|
||||||
version = "1"
|
version = "1"
|
||||||
optional = true
|
optional = true
|
||||||
|
|
|
@ -204,17 +204,29 @@ pub fn serialize_hex<T: ?Sized>(data: &T) -> String
|
||||||
pub fn deserialize<'a, T>(data: &'a [u8]) -> Result<T, Error>
|
pub fn deserialize<'a, T>(data: &'a [u8]) -> Result<T, Error>
|
||||||
where T: Decodable<Cursor<&'a [u8]>>
|
where T: Decodable<Cursor<&'a [u8]>>
|
||||||
{
|
{
|
||||||
let mut decoder = Cursor::new(data);
|
let (rv, consumed) = deserialize_partial(data)?;
|
||||||
let rv = Decodable::consensus_decode(&mut decoder)?;
|
|
||||||
|
|
||||||
// Fail if data is not consumed entirely.
|
// Fail if data are not consumed entirely.
|
||||||
if decoder.position() == data.len() as u64 {
|
if consumed == data.len() {
|
||||||
Ok(rv)
|
Ok(rv)
|
||||||
} else {
|
} else {
|
||||||
Err(Error::ParseFailed("data not consumed entirely when explicitly deserializing"))
|
Err(Error::ParseFailed("data not consumed entirely when explicitly deserializing"))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Deserialize an object from a vector, but will not report an error if said deserialization
|
||||||
|
/// doesn't consume the entire vector.
|
||||||
|
pub fn deserialize_partial<'a, T>(data: &'a [u8]) -> Result<(T, usize), Error>
|
||||||
|
where T: Decodable<Cursor<&'a [u8]>>
|
||||||
|
{
|
||||||
|
let mut decoder = Cursor::new(data);
|
||||||
|
let rv = Decodable::consensus_decode(&mut decoder)?;
|
||||||
|
let consumed = decoder.position() as usize;
|
||||||
|
|
||||||
|
Ok((rv, consumed))
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
/// A simple Encoder trait
|
/// A simple Encoder trait
|
||||||
pub trait Encoder {
|
pub trait Encoder {
|
||||||
/// Output a 64-bit uint
|
/// Output a 64-bit uint
|
||||||
|
|
|
@ -21,5 +21,6 @@
|
||||||
pub mod encode;
|
pub mod encode;
|
||||||
pub mod params;
|
pub mod params;
|
||||||
|
|
||||||
pub use self::encode::{Encodable, Decodable, Encoder, Decoder, serialize, deserialize};
|
pub use self::encode::{Encodable, Decodable, Encoder, Decoder,
|
||||||
|
serialize, deserialize, deserialize_partial};
|
||||||
pub use self::params::Params;
|
pub use self::params::Params;
|
||||||
|
|
|
@ -21,7 +21,6 @@
|
||||||
|
|
||||||
use std::iter;
|
use std::iter;
|
||||||
use std::io::Cursor;
|
use std::io::Cursor;
|
||||||
use std::sync::mpsc::Sender;
|
|
||||||
|
|
||||||
use blockdata::block;
|
use blockdata::block;
|
||||||
use blockdata::transaction;
|
use blockdata::transaction;
|
||||||
|
@ -32,7 +31,6 @@ use network::message_filter;
|
||||||
use consensus::encode::{Decodable, Encodable};
|
use consensus::encode::{Decodable, Encodable};
|
||||||
use consensus::encode::CheckedData;
|
use consensus::encode::CheckedData;
|
||||||
use consensus::encode::{self, serialize, Encoder, Decoder};
|
use consensus::encode::{self, serialize, Encoder, Decoder};
|
||||||
use util;
|
|
||||||
|
|
||||||
/// Serializer for command string
|
/// Serializer for command string
|
||||||
#[derive(PartialEq, Eq, Clone, Debug)]
|
#[derive(PartialEq, Eq, Clone, Debug)]
|
||||||
|
@ -63,6 +61,7 @@ impl<D: Decoder> Decodable<D> for CommandString {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
/// A Network message
|
/// A Network message
|
||||||
pub struct RawNetworkMessage {
|
pub struct RawNetworkMessage {
|
||||||
/// Magic bytes to identify the network these messages are meant for
|
/// Magic bytes to identify the network these messages are meant for
|
||||||
|
@ -71,14 +70,6 @@ pub struct RawNetworkMessage {
|
||||||
pub payload: NetworkMessage
|
pub payload: NetworkMessage
|
||||||
}
|
}
|
||||||
|
|
||||||
/// A response from the peer-connected socket
|
|
||||||
pub enum SocketResponse {
|
|
||||||
/// A message was received
|
|
||||||
MessageReceived(NetworkMessage),
|
|
||||||
/// An error occurred and the socket needs to close
|
|
||||||
ConnectionFailed(util::Error, Sender<()>)
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Clone, PartialEq, Eq, Debug)]
|
#[derive(Clone, PartialEq, Eq, Debug)]
|
||||||
/// A Network message payload. Proper documentation is available on at
|
/// A Network message payload. Proper documentation is available on at
|
||||||
/// [Bitcoin Wiki: Protocol Specification](https://en.bitcoin.it/wiki/Protocol_specification)
|
/// [Bitcoin Wiki: Protocol Specification](https://en.bitcoin.it/wiki/Protocol_specification)
|
||||||
|
@ -237,8 +228,7 @@ impl<D: Decoder> Decodable<D> for RawNetworkMessage {
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod test {
|
mod test {
|
||||||
use super::{RawNetworkMessage, NetworkMessage, CommandString};
|
use super::{RawNetworkMessage, NetworkMessage, CommandString};
|
||||||
|
use consensus::encode::{deserialize, deserialize_partial, serialize};
|
||||||
use consensus::encode::{deserialize, serialize};
|
|
||||||
|
|
||||||
#[test]
|
#[test]
|
||||||
fn serialize_commandstring_test() {
|
fn serialize_commandstring_test() {
|
||||||
|
@ -290,4 +280,89 @@ mod test {
|
||||||
0x00, 0x00, 0x00, 0x00, 0x5d, 0xf6, 0xe0, 0xe2]);
|
0x00, 0x00, 0x00, 0x00, 0x5d, 0xf6, 0xe0, 0xe2]);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn deserialize_getaddr_test() {
|
||||||
|
let msg = deserialize(
|
||||||
|
&[0xf9, 0xbe, 0xb4, 0xd9, 0x67, 0x65, 0x74, 0x61,
|
||||||
|
0x64, 0x64, 0x72, 0x00, 0x00, 0x00, 0x00, 0x00,
|
||||||
|
0x00, 0x00, 0x00, 0x00, 0x5d, 0xf6, 0xe0, 0xe2]);
|
||||||
|
let preimage = RawNetworkMessage { magic: 0xd9b4bef9, payload: NetworkMessage::GetAddr };
|
||||||
|
assert!(msg.is_ok());
|
||||||
|
let msg : RawNetworkMessage = msg.unwrap();
|
||||||
|
assert_eq!(preimage.magic, msg.magic);
|
||||||
|
assert_eq!(preimage.payload, msg.payload);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn deserialize_version_test() {
|
||||||
|
let msg = deserialize::<RawNetworkMessage>(
|
||||||
|
&[ 0xf9, 0xbe, 0xb4, 0xd9, 0x76, 0x65, 0x72, 0x73,
|
||||||
|
0x69, 0x6f, 0x6e, 0x00, 0x00, 0x00, 0x00, 0x00,
|
||||||
|
0x66, 0x00, 0x00, 0x00, 0xbe, 0x61, 0xb8, 0x27,
|
||||||
|
0x7f, 0x11, 0x01, 0x00, 0x0d, 0x04, 0x00, 0x00,
|
||||||
|
0x00, 0x00, 0x00, 0x00, 0xf0, 0x0f, 0x4d, 0x5c,
|
||||||
|
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
|
||||||
|
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
|
||||||
|
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0xff, 0xff,
|
||||||
|
0x5b, 0xf0, 0x8c, 0x80, 0xb4, 0xbd, 0x0d, 0x04,
|
||||||
|
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
|
||||||
|
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
|
||||||
|
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
|
||||||
|
0xfa, 0xa9, 0x95, 0x59, 0xcc, 0x68, 0xa1, 0xc1,
|
||||||
|
0x10, 0x2f, 0x53, 0x61, 0x74, 0x6f, 0x73, 0x68,
|
||||||
|
0x69, 0x3a, 0x30, 0x2e, 0x31, 0x37, 0x2e, 0x31,
|
||||||
|
0x2f, 0x93, 0x8c, 0x08, 0x00, 0x01 ]);
|
||||||
|
|
||||||
|
assert!(msg.is_ok());
|
||||||
|
let msg = msg.unwrap();
|
||||||
|
assert_eq!(msg.magic, 0xd9b4bef9);
|
||||||
|
if let NetworkMessage::Version(version_msg) = msg.payload {
|
||||||
|
assert_eq!(version_msg.version, 70015);
|
||||||
|
assert_eq!(version_msg.services, 1037);
|
||||||
|
assert_eq!(version_msg.timestamp, 1548554224);
|
||||||
|
assert_eq!(version_msg.nonce, 13952548347456104954);
|
||||||
|
assert_eq!(version_msg.user_agent, "/Satoshi:0.17.1/");
|
||||||
|
assert_eq!(version_msg.start_height, 560275);
|
||||||
|
assert_eq!(version_msg.relay, true);
|
||||||
|
} else {
|
||||||
|
panic!("Wrong message type");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn deserialize_partial_message_test() {
|
||||||
|
let data = [ 0xf9, 0xbe, 0xb4, 0xd9, 0x76, 0x65, 0x72, 0x73,
|
||||||
|
0x69, 0x6f, 0x6e, 0x00, 0x00, 0x00, 0x00, 0x00,
|
||||||
|
0x66, 0x00, 0x00, 0x00, 0xbe, 0x61, 0xb8, 0x27,
|
||||||
|
0x7f, 0x11, 0x01, 0x00, 0x0d, 0x04, 0x00, 0x00,
|
||||||
|
0x00, 0x00, 0x00, 0x00, 0xf0, 0x0f, 0x4d, 0x5c,
|
||||||
|
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
|
||||||
|
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
|
||||||
|
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0xff, 0xff,
|
||||||
|
0x5b, 0xf0, 0x8c, 0x80, 0xb4, 0xbd, 0x0d, 0x04,
|
||||||
|
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
|
||||||
|
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
|
||||||
|
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
|
||||||
|
0xfa, 0xa9, 0x95, 0x59, 0xcc, 0x68, 0xa1, 0xc1,
|
||||||
|
0x10, 0x2f, 0x53, 0x61, 0x74, 0x6f, 0x73, 0x68,
|
||||||
|
0x69, 0x3a, 0x30, 0x2e, 0x31, 0x37, 0x2e, 0x31,
|
||||||
|
0x2f, 0x93, 0x8c, 0x08, 0x00, 0x01, 0, 0 ];
|
||||||
|
let msg = deserialize_partial::<RawNetworkMessage>(&data);
|
||||||
|
assert!(msg.is_ok());
|
||||||
|
|
||||||
|
let (msg, consumed) = msg.unwrap();
|
||||||
|
assert_eq!(consumed, data.to_vec().len() - 2);
|
||||||
|
assert_eq!(msg.magic, 0xd9b4bef9);
|
||||||
|
if let NetworkMessage::Version(version_msg) = msg.payload {
|
||||||
|
assert_eq!(version_msg.version, 70015);
|
||||||
|
assert_eq!(version_msg.services, 1037);
|
||||||
|
assert_eq!(version_msg.timestamp, 1548554224);
|
||||||
|
assert_eq!(version_msg.nonce, 13952548347456104954);
|
||||||
|
assert_eq!(version_msg.user_agent, "/Satoshi:0.17.1/");
|
||||||
|
assert_eq!(version_msg.start_height, 560275);
|
||||||
|
assert_eq!(version_msg.relay, true);
|
||||||
|
} else {
|
||||||
|
panic!("Wrong message type");
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -29,6 +29,7 @@ pub mod message;
|
||||||
pub mod message_blockdata;
|
pub mod message_blockdata;
|
||||||
pub mod message_network;
|
pub mod message_network;
|
||||||
pub mod message_filter;
|
pub mod message_filter;
|
||||||
|
pub mod stream_reader;
|
||||||
|
|
||||||
/// Network error
|
/// Network error
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
|
@ -50,14 +51,14 @@ impl fmt::Display for Error {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl error::Error for Error {
|
#[doc(hidden)]
|
||||||
fn cause(&self) -> Option<&error::Error> {
|
impl From<io::Error> for Error {
|
||||||
match *self {
|
fn from(err: io::Error) -> Self {
|
||||||
Error::Io(ref e) => Some(e),
|
Error::Io(err)
|
||||||
Error::SocketMutexPoisoned | Error::SocketNotConnectedToPeer => None,
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl error::Error for Error {
|
||||||
fn description(&self) -> &str {
|
fn description(&self) -> &str {
|
||||||
match *self {
|
match *self {
|
||||||
Error::Io(ref e) => e.description(),
|
Error::Io(ref e) => e.description(),
|
||||||
|
@ -65,4 +66,11 @@ impl error::Error for Error {
|
||||||
Error::SocketNotConnectedToPeer => "not connected to peer",
|
Error::SocketNotConnectedToPeer => "not connected to peer",
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn cause(&self) -> Option<&error::Error> {
|
||||||
|
match *self {
|
||||||
|
Error::Io(ref e) => Some(e),
|
||||||
|
Error::SocketMutexPoisoned | Error::SocketNotConnectedToPeer => None,
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,327 @@
|
||||||
|
// Rust Bitcoin Library
|
||||||
|
// Written in 2014 by
|
||||||
|
// Andrew Poelstra <apoelstra@wpsoftware.net>
|
||||||
|
//
|
||||||
|
// To the extent possible under law, the author(s) have dedicated all
|
||||||
|
// copyright and related and neighboring rights to this software to
|
||||||
|
// the public domain worldwide. This software is distributed without
|
||||||
|
// any warranty.
|
||||||
|
//
|
||||||
|
// You should have received a copy of the CC0 Public Domain Dedication
|
||||||
|
// along with this software.
|
||||||
|
// If not, see <http://creativecommons.org/publicdomain/zero/1.0/>.
|
||||||
|
//
|
||||||
|
|
||||||
|
//! Stream reader
|
||||||
|
//!
|
||||||
|
//! This module defines `StreamReader` struct and its implementation which is used
|
||||||
|
//! for parsing incoming stream into separate `RawNetworkMessage`s, handling assembling
|
||||||
|
//! messages from multiple packets or dealing with partial or multiple messages in the stream
|
||||||
|
//! (like can happen with reading from TCP socket)
|
||||||
|
//!
|
||||||
|
|
||||||
|
use std::fmt;
|
||||||
|
use std::io;
|
||||||
|
use std::io::Read;
|
||||||
|
|
||||||
|
use network::message::RawNetworkMessage;
|
||||||
|
use consensus::encode;
|
||||||
|
|
||||||
|
/// Struct used to configure stream reader function
|
||||||
|
pub struct StreamReader<'a> {
|
||||||
|
/// Stream to read from
|
||||||
|
pub stream: &'a mut Read,
|
||||||
|
/// I/O buffer
|
||||||
|
data: Vec<u8>,
|
||||||
|
/// Buffer containing unparsed message part
|
||||||
|
unparsed: Vec<u8>
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'a> fmt::Debug for StreamReader<'a> {
|
||||||
|
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||||
|
write!(f, "StreamReader with buffer_size={} and unparsed content {:?}",
|
||||||
|
self.data.capacity(), self.unparsed)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'a> StreamReader<'a> {
|
||||||
|
/// Constructs new stream reader for a given input stream `stream` with
|
||||||
|
/// optional parameter `buffer_size` determining reading buffer size
|
||||||
|
pub fn new(stream: &mut Read, buffer_size: Option<usize>) -> StreamReader {
|
||||||
|
StreamReader {
|
||||||
|
stream,
|
||||||
|
data: vec![0u8; buffer_size.unwrap_or(64 * 1024)],
|
||||||
|
unparsed: vec![]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Reads stream and parses next message from its current input,
|
||||||
|
/// also taking into account previously unparsed partial message (if there was such).
|
||||||
|
///
|
||||||
|
pub fn next_message(&mut self) -> Result<RawNetworkMessage, encode::Error> {
|
||||||
|
loop {
|
||||||
|
match encode::deserialize_partial::<RawNetworkMessage>(&self.unparsed) {
|
||||||
|
// In this case we just have an incomplete data, so we need to read more
|
||||||
|
Err(encode::Error::Io(ref err)) if err.kind () == io::ErrorKind::UnexpectedEof => {
|
||||||
|
let count = self.stream.read(&mut self.data)?;
|
||||||
|
if count > 0 {
|
||||||
|
self.unparsed.extend(self.data[0..count].iter());
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
return Err(encode::Error::Io(io::Error::from(io::ErrorKind::UnexpectedEof)));
|
||||||
|
}
|
||||||
|
},
|
||||||
|
Err(err) => return Err(err),
|
||||||
|
// We have successfully read from the buffer
|
||||||
|
Ok((message, index)) => {
|
||||||
|
self.unparsed.drain(..index);
|
||||||
|
return Ok(message)
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod test {
|
||||||
|
extern crate tempfile;
|
||||||
|
|
||||||
|
use std::thread;
|
||||||
|
use std::fs::File;
|
||||||
|
use std::time::Duration;
|
||||||
|
use std::io::{Write, Seek, SeekFrom};
|
||||||
|
use std::net::{TcpListener, TcpStream, Shutdown};
|
||||||
|
use std::thread::JoinHandle;
|
||||||
|
|
||||||
|
use super::StreamReader;
|
||||||
|
use network::message::{NetworkMessage, RawNetworkMessage};
|
||||||
|
|
||||||
|
// First, let's define some byte arrays for sample messages - dumps are taken from live
|
||||||
|
// Bitcoin Core node v0.17.1 with Wireshark
|
||||||
|
const MSG_VERSION: [u8; 126] = [
|
||||||
|
0xf9, 0xbe, 0xb4, 0xd9, 0x76, 0x65, 0x72, 0x73,
|
||||||
|
0x69, 0x6f, 0x6e, 0x00, 0x00, 0x00, 0x00, 0x00,
|
||||||
|
0x66, 0x00, 0x00, 0x00, 0xbe, 0x61, 0xb8, 0x27,
|
||||||
|
0x7f, 0x11, 0x01, 0x00, 0x0d, 0x04, 0x00, 0x00,
|
||||||
|
0x00, 0x00, 0x00, 0x00, 0xf0, 0x0f, 0x4d, 0x5c,
|
||||||
|
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
|
||||||
|
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
|
||||||
|
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0xff, 0xff,
|
||||||
|
0x5b, 0xf0, 0x8c, 0x80, 0xb4, 0xbd, 0x0d, 0x04,
|
||||||
|
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
|
||||||
|
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
|
||||||
|
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
|
||||||
|
0xfa, 0xa9, 0x95, 0x59, 0xcc, 0x68, 0xa1, 0xc1,
|
||||||
|
0x10, 0x2f, 0x53, 0x61, 0x74, 0x6f, 0x73, 0x68,
|
||||||
|
0x69, 0x3a, 0x30, 0x2e, 0x31, 0x37, 0x2e, 0x31,
|
||||||
|
0x2f, 0x93, 0x8c, 0x08, 0x00, 0x01
|
||||||
|
];
|
||||||
|
|
||||||
|
const MSG_VERACK: [u8; 24] = [
|
||||||
|
0xf9, 0xbe, 0xb4, 0xd9, 0x76, 0x65, 0x72, 0x61,
|
||||||
|
0x63, 0x6b, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
|
||||||
|
0x00, 0x00, 0x00, 0x00, 0x5d, 0xf6, 0xe0, 0xe2
|
||||||
|
];
|
||||||
|
|
||||||
|
const MSG_PING: [u8; 32] = [
|
||||||
|
0xf9, 0xbe, 0xb4, 0xd9, 0x70, 0x69, 0x6e, 0x67,
|
||||||
|
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
|
||||||
|
0x08, 0x00, 0x00, 0x00, 0x24, 0x67, 0xf1, 0x1d,
|
||||||
|
0x64, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
|
||||||
|
];
|
||||||
|
|
||||||
|
const MSG_ALERT: [u8; 192] = [
|
||||||
|
0xf9, 0xbe, 0xb4, 0xd9, 0x61, 0x6c, 0x65, 0x72,
|
||||||
|
0x74, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
|
||||||
|
0xa8, 0x00, 0x00, 0x00, 0x1b, 0xf9, 0xaa, 0xea,
|
||||||
|
0x60, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
|
||||||
|
0x00, 0x00, 0x00, 0x00, 0x00, 0xff, 0xff, 0xff,
|
||||||
|
0x7f, 0x00, 0x00, 0x00, 0x00, 0xff, 0xff, 0xff,
|
||||||
|
0x7f, 0xfe, 0xff, 0xff, 0x7f, 0x01, 0xff, 0xff,
|
||||||
|
0xff, 0x7f, 0x00, 0x00, 0x00, 0x00, 0xff, 0xff,
|
||||||
|
0xff, 0x7f, 0x00, 0xff, 0xff, 0xff, 0x7f, 0x00,
|
||||||
|
0x2f, 0x55, 0x52, 0x47, 0x45, 0x4e, 0x54, 0x3a,
|
||||||
|
0x20, 0x41, 0x6c, 0x65, 0x72, 0x74, 0x20, 0x6b,
|
||||||
|
0x65, 0x79, 0x20, 0x63, 0x6f, 0x6d, 0x70, 0x72,
|
||||||
|
0x6f, 0x6d, 0x69, 0x73, 0x65, 0x64, 0x2c, 0x20,
|
||||||
|
0x75, 0x70, 0x67, 0x72, 0x61, 0x64, 0x65, 0x20,
|
||||||
|
0x72, 0x65, 0x71, 0x75, 0x69, 0x72, 0x65, 0x64,
|
||||||
|
0x00, 0x46, 0x30, 0x44, 0x02, 0x20, 0x65, 0x3f,
|
||||||
|
0xeb, 0xd6, 0x41, 0x0f, 0x47, 0x0f, 0x6b, 0xae,
|
||||||
|
0x11, 0xca, 0xd1, 0x9c, 0x48, 0x41, 0x3b, 0xec,
|
||||||
|
0xb1, 0xac, 0x2c, 0x17, 0xf9, 0x08, 0xfd, 0x0f,
|
||||||
|
0xd5, 0x3b, 0xdc, 0x3a, 0xbd, 0x52, 0x02, 0x20,
|
||||||
|
0x6d, 0x0e, 0x9c, 0x96, 0xfe, 0x88, 0xd4, 0xa0,
|
||||||
|
0xf0, 0x1e, 0xd9, 0xde, 0xda, 0xe2, 0xb6, 0xf9,
|
||||||
|
0xe0, 0x0d, 0xa9, 0x4c, 0xad, 0x0f, 0xec, 0xaa,
|
||||||
|
0xe6, 0x6e, 0xcf, 0x68, 0x9b, 0xf7, 0x1b, 0x50
|
||||||
|
];
|
||||||
|
|
||||||
|
// Helper functions that checks parsed versions of the messages from the byte arrays above
|
||||||
|
fn check_version_msg(msg: &RawNetworkMessage) {
|
||||||
|
assert_eq!(msg.magic, 0xd9b4bef9);
|
||||||
|
if let NetworkMessage::Version(ref version_msg) = msg.payload {
|
||||||
|
assert_eq!(version_msg.version, 70015);
|
||||||
|
assert_eq!(version_msg.services, 1037);
|
||||||
|
assert_eq!(version_msg.timestamp, 1548554224);
|
||||||
|
assert_eq!(version_msg.nonce, 13952548347456104954);
|
||||||
|
assert_eq!(version_msg.user_agent, "/Satoshi:0.17.1/");
|
||||||
|
assert_eq!(version_msg.start_height, 560275);
|
||||||
|
assert_eq!(version_msg.relay, true);
|
||||||
|
} else {
|
||||||
|
panic!("Wrong message type: expected VersionMessage");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn check_alert_msg(msg: &RawNetworkMessage) {
|
||||||
|
assert_eq!(msg.magic, 0xd9b4bef9);
|
||||||
|
if let NetworkMessage::Alert(ref alert) = msg.payload {
|
||||||
|
assert_eq!(alert.clone(), [
|
||||||
|
0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
|
||||||
|
0x00, 0x00, 0x00, 0x00, 0x00, 0xff, 0xff, 0xff,
|
||||||
|
0x7f, 0x00, 0x00, 0x00, 0x00, 0xff, 0xff, 0xff,
|
||||||
|
0x7f, 0xfe, 0xff, 0xff, 0x7f, 0x01, 0xff, 0xff,
|
||||||
|
0xff, 0x7f, 0x00, 0x00, 0x00, 0x00, 0xff, 0xff,
|
||||||
|
0xff, 0x7f, 0x00, 0xff, 0xff, 0xff, 0x7f, 0x00,
|
||||||
|
0x2f, 0x55, 0x52, 0x47, 0x45, 0x4e, 0x54, 0x3a,
|
||||||
|
0x20, 0x41, 0x6c, 0x65, 0x72, 0x74, 0x20, 0x6b,
|
||||||
|
0x65, 0x79, 0x20, 0x63, 0x6f, 0x6d, 0x70, 0x72,
|
||||||
|
0x6f, 0x6d, 0x69, 0x73, 0x65, 0x64, 0x2c, 0x20,
|
||||||
|
0x75, 0x70, 0x67, 0x72, 0x61, 0x64, 0x65, 0x20,
|
||||||
|
0x72, 0x65, 0x71, 0x75, 0x69, 0x72, 0x65, 0x64,
|
||||||
|
0x00,
|
||||||
|
].to_vec());
|
||||||
|
} else {
|
||||||
|
panic!("Wrong message type: expected AlertMessage");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn parse_multipartmsg_test() {
|
||||||
|
let mut tmpfile: File = tempfile::tempfile().unwrap();
|
||||||
|
let mut reader = StreamReader::new(&mut tmpfile, None);
|
||||||
|
reader.unparsed = MSG_ALERT[..24].to_vec();
|
||||||
|
assert!(reader.next_message().is_err());
|
||||||
|
assert_eq!(reader.unparsed.len(), 24);
|
||||||
|
|
||||||
|
reader.unparsed = MSG_ALERT.to_vec();
|
||||||
|
let message = reader.next_message().unwrap();
|
||||||
|
assert_eq!(reader.unparsed.len(), 0);
|
||||||
|
|
||||||
|
check_alert_msg(&message);
|
||||||
|
}
|
||||||
|
|
||||||
|
fn init_stream(buf: &[u8]) -> File {
|
||||||
|
let mut tmpfile: File = tempfile::tempfile().unwrap();
|
||||||
|
write_file(&mut tmpfile, &buf);
|
||||||
|
tmpfile
|
||||||
|
}
|
||||||
|
|
||||||
|
fn write_file(tmpfile: &mut File, buf: &[u8]) {
|
||||||
|
tmpfile.seek(SeekFrom::End(0)).unwrap();
|
||||||
|
tmpfile.write(&buf).unwrap();
|
||||||
|
tmpfile.flush().unwrap();
|
||||||
|
tmpfile.seek(SeekFrom::Start(0)).unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn read_singlemsg_test() {
|
||||||
|
let mut stream = init_stream(&MSG_VERSION);
|
||||||
|
let message = StreamReader::new(&mut stream, None).next_message().unwrap();
|
||||||
|
|
||||||
|
check_version_msg(&message);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn read_doublemsgs_test() {
|
||||||
|
let mut stream = init_stream(&MSG_VERSION);
|
||||||
|
write_file(&mut stream, &MSG_PING);
|
||||||
|
|
||||||
|
let mut reader = StreamReader::new(&mut stream, None);
|
||||||
|
let message = reader.next_message().unwrap();
|
||||||
|
check_version_msg(&message);
|
||||||
|
|
||||||
|
let msg = reader.next_message().unwrap();
|
||||||
|
assert_eq!(msg.magic, 0xd9b4bef9);
|
||||||
|
if let NetworkMessage::Ping(nonce) = msg.payload {
|
||||||
|
assert_eq!(nonce, 100);
|
||||||
|
} else {
|
||||||
|
panic!("Wrong message type, expected PingMessage");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Helper function that set ups emulation of client-server TCP connection for
|
||||||
|
// testing message transfer via TCP packets
|
||||||
|
fn serve_tcp(pieces: Vec<Vec<u8>>) -> (JoinHandle<()>, TcpStream) {
|
||||||
|
// 1. Creating server part (emulating Bitcoin Core node)
|
||||||
|
let listener = TcpListener::bind(format!("127.0.0.1:{}", 0)).unwrap();
|
||||||
|
let port = listener.local_addr().unwrap().port();
|
||||||
|
// 2. Spawning thread that will be writing our messages to the TCP Stream at the server side
|
||||||
|
// in async mode
|
||||||
|
let handle = thread::spawn(move || {
|
||||||
|
for ostream in listener.incoming() {
|
||||||
|
let mut ostream = ostream.unwrap();
|
||||||
|
|
||||||
|
for piece in pieces {
|
||||||
|
ostream.write(&piece[..]).unwrap();
|
||||||
|
ostream.flush().unwrap();
|
||||||
|
thread::sleep(Duration::from_secs(1));
|
||||||
|
}
|
||||||
|
|
||||||
|
ostream.shutdown(Shutdown::Both).unwrap();
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
// 3. Creating client side of the TCP socket connection
|
||||||
|
thread::sleep(Duration::from_secs(1));
|
||||||
|
let istream = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
|
||||||
|
|
||||||
|
return (handle, istream)
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn read_multipartmsg_test() {
|
||||||
|
// Setting up TCP connection emulation
|
||||||
|
let (handle, mut istream) = serve_tcp(vec![
|
||||||
|
// single message split in two parts to emulate real network conditions
|
||||||
|
MSG_VERSION[..24].to_vec(), MSG_VERSION[24..].to_vec()
|
||||||
|
]);
|
||||||
|
let mut reader = StreamReader::new(&mut istream, None);
|
||||||
|
|
||||||
|
// Reading and checking the whole message back
|
||||||
|
let message = reader.next_message().unwrap();
|
||||||
|
check_version_msg(&message);
|
||||||
|
|
||||||
|
// Waiting TCP server thread to terminate
|
||||||
|
handle.join().unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn read_sequencemsg_test() {
|
||||||
|
// Setting up TCP connection emulation
|
||||||
|
let (handle, mut istream) = serve_tcp(vec![
|
||||||
|
// Real-world Bitcoin core communication case for /Satoshi:0.17.1/
|
||||||
|
MSG_VERSION[..23].to_vec(), MSG_VERSION[23..].to_vec(),
|
||||||
|
MSG_VERACK.to_vec(),
|
||||||
|
MSG_ALERT[..24].to_vec(), MSG_ALERT[24..].to_vec()
|
||||||
|
]);
|
||||||
|
let mut reader = StreamReader::new(&mut istream, None);
|
||||||
|
|
||||||
|
// Reading and checking the first message (Version)
|
||||||
|
let message = reader.next_message().unwrap();
|
||||||
|
check_version_msg(&message);
|
||||||
|
|
||||||
|
// Reading and checking the second message (Verack)
|
||||||
|
let msg = reader.next_message().unwrap();
|
||||||
|
assert_eq!(msg.magic, 0xd9b4bef9);
|
||||||
|
assert_eq!(msg.payload, NetworkMessage::Verack, "Wrong message type, expected VerackMessage");
|
||||||
|
|
||||||
|
// Reading and checking the third message (Alert)
|
||||||
|
let msg = reader.next_message().unwrap();
|
||||||
|
check_alert_msg(&msg);
|
||||||
|
|
||||||
|
// Waiting TCP server thread to terminate
|
||||||
|
handle.join().unwrap();
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue