Improvements to `StreamReader` (#318)

* Generalizing StreamReader to support arbitrary data structures

* Using Read trait and adding test cases
This commit is contained in:
Dr. Maxim Orlovsky 2019-09-23 08:31:52 +02:00 committed by Tamás Blummer
parent 0b08978af2
commit 4b1d4edc14
1 changed files with 64 additions and 27 deletions

View File

@ -21,14 +21,12 @@
//! //!
use std::fmt; use std::fmt;
use std::io; use std::io::{self, Read};
use std::io::Read;
use network::message::RawNetworkMessage; use consensus::{encode, Decodable};
use consensus::encode;
/// Struct used to configure stream reader function /// Struct used to configure stream reader function
pub struct StreamReader<R> { pub struct StreamReader<R: Read> {
/// Stream to read from /// Stream to read from
pub stream: R, pub stream: R,
/// I/O buffer /// I/O buffer
@ -37,7 +35,7 @@ pub struct StreamReader<R> {
unparsed: Vec<u8> unparsed: Vec<u8>
} }
impl<R> fmt::Debug for StreamReader<R> { impl<R: Read> fmt::Debug for StreamReader<R> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "StreamReader with buffer_size={} and unparsed content {:?}", write!(f, "StreamReader with buffer_size={} and unparsed content {:?}",
self.data.capacity(), self.unparsed) self.data.capacity(), self.unparsed)
@ -57,10 +55,9 @@ impl<R: Read> StreamReader<R> {
/// Reads stream and parses next message from its current input, /// Reads stream and parses next message from its current input,
/// also taking into account previously unparsed partial message (if there was such). /// also taking into account previously unparsed partial message (if there was such).
/// pub fn read_next<D: Decodable>(&mut self) -> Result<D, encode::Error> {
pub fn next_message(&mut self) -> Result<RawNetworkMessage, encode::Error> {
loop { loop {
match encode::deserialize_partial::<RawNetworkMessage>(&self.unparsed) { match encode::deserialize_partial::<D>(&self.unparsed) {
// In this case we just have an incomplete data, so we need to read more // In this case we just have an incomplete data, so we need to read more
Err(encode::Error::Io(ref err)) if err.kind () == io::ErrorKind::UnexpectedEof => { Err(encode::Error::Io(ref err)) if err.kind () == io::ErrorKind::UnexpectedEof => {
let count = self.stream.read(&mut self.data)?; let count = self.stream.read(&mut self.data)?;
@ -84,10 +81,9 @@ impl<R: Read> StreamReader<R> {
#[cfg(test)] #[cfg(test)]
mod test { mod test {
use std::thread; use std::thread;
use std::time::Duration; use std::time::Duration;
use std::io::{self, Write}; use std::io::{self, BufReader, Write};
use std::net::{TcpListener, TcpStream, Shutdown}; use std::net::{TcpListener, TcpStream, Shutdown};
use std::thread::JoinHandle; use std::thread::JoinHandle;
@ -196,13 +192,15 @@ mod test {
#[test] #[test]
fn parse_multipartmsg_test() { fn parse_multipartmsg_test() {
let mut reader = StreamReader::new(io::empty(), None); let stream = io::empty();
let mut reader = StreamReader::new(stream, None);
reader.unparsed = MSG_ALERT[..24].to_vec(); reader.unparsed = MSG_ALERT[..24].to_vec();
assert!(reader.next_message().is_err()); let message: Result<RawNetworkMessage, _> = reader.read_next();
assert!(message.is_err());
assert_eq!(reader.unparsed.len(), 24); assert_eq!(reader.unparsed.len(), 24);
reader.unparsed = MSG_ALERT.to_vec(); reader.unparsed = MSG_ALERT.to_vec();
let message = reader.next_message().unwrap(); let message = reader.read_next().unwrap();
assert_eq!(reader.unparsed.len(), 0); assert_eq!(reader.unparsed.len(), 0);
check_alert_msg(&message); check_alert_msg(&message);
@ -210,7 +208,9 @@ mod test {
#[test] #[test]
fn read_singlemsg_test() { fn read_singlemsg_test() {
let message = StreamReader::new(&MSG_VERSION[..], None).next_message().unwrap(); let stream = MSG_VERSION[..].to_vec();
let stream = stream.as_slice();
let message = StreamReader::new(stream, None).read_next().unwrap();
check_version_msg(&message); check_version_msg(&message);
} }
@ -218,12 +218,12 @@ mod test {
fn read_doublemsgs_test() { fn read_doublemsgs_test() {
let mut stream = MSG_VERSION.to_vec(); let mut stream = MSG_VERSION.to_vec();
stream.extend(&MSG_PING); stream.extend(&MSG_PING);
let stream = stream.as_slice();
let mut reader = StreamReader::new(&stream[..], None); let mut reader = StreamReader::new(stream, None);
let message = reader.next_message().unwrap(); let message = reader.read_next().unwrap();
check_version_msg(&message); check_version_msg(&message);
let msg = reader.next_message().unwrap(); let msg: RawNetworkMessage = reader.read_next().unwrap();
assert_eq!(msg.magic, 0xd9b4bef9); assert_eq!(msg.magic, 0xd9b4bef9);
if let NetworkMessage::Ping(nonce) = msg.payload { if let NetworkMessage::Ping(nonce) = msg.payload {
assert_eq!(nonce, 100); assert_eq!(nonce, 100);
@ -234,7 +234,7 @@ mod test {
// Helper function that set ups emulation of client-server TCP connection for // Helper function that set ups emulation of client-server TCP connection for
// testing message transfer via TCP packets // testing message transfer via TCP packets
fn serve_tcp(pieces: Vec<Vec<u8>>) -> (JoinHandle<()>, TcpStream) { fn serve_tcp(pieces: Vec<Vec<u8>>) -> (JoinHandle<()>, BufReader<TcpStream>) {
// 1. Creating server part (emulating Bitcoin Core node) // 1. Creating server part (emulating Bitcoin Core node)
let listener = TcpListener::bind(format!("127.0.0.1:{}", 0)).unwrap(); let listener = TcpListener::bind(format!("127.0.0.1:{}", 0)).unwrap();
let port = listener.local_addr().unwrap().port(); let port = listener.local_addr().unwrap().port();
@ -258,8 +258,9 @@ mod test {
// 3. Creating client side of the TCP socket connection // 3. Creating client side of the TCP socket connection
thread::sleep(Duration::from_secs(1)); thread::sleep(Duration::from_secs(1));
let istream = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap(); let istream = TcpStream::connect(format!("127.0.0.1:{}", port)).unwrap();
let reader = BufReader::new(istream);
return (handle, istream) return (handle, reader)
} }
#[test] #[test]
@ -269,10 +270,11 @@ mod test {
// single message split in two parts to emulate real network conditions // single message split in two parts to emulate real network conditions
MSG_VERSION[..24].to_vec(), MSG_VERSION[24..].to_vec() MSG_VERSION[..24].to_vec(), MSG_VERSION[24..].to_vec()
]); ]);
let mut reader = StreamReader::new(istream, None); let stream = istream;
let mut reader = StreamReader::new(stream, None);
// Reading and checking the whole message back // Reading and checking the whole message back
let message = reader.next_message().unwrap(); let message = reader.read_next().unwrap();
check_version_msg(&message); check_version_msg(&message);
// Waiting TCP server thread to terminate // Waiting TCP server thread to terminate
@ -288,22 +290,57 @@ mod test {
MSG_VERACK.to_vec(), MSG_VERACK.to_vec(),
MSG_ALERT[..24].to_vec(), MSG_ALERT[24..].to_vec() MSG_ALERT[..24].to_vec(), MSG_ALERT[24..].to_vec()
]); ]);
let mut reader = StreamReader::new(istream, None); let stream = istream;
let mut reader = StreamReader::new(stream, None);
// Reading and checking the first message (Version) // Reading and checking the first message (Version)
let message = reader.next_message().unwrap(); let message = reader.read_next().unwrap();
check_version_msg(&message); check_version_msg(&message);
// Reading and checking the second message (Verack) // Reading and checking the second message (Verack)
let msg = reader.next_message().unwrap(); let msg: RawNetworkMessage = reader.read_next().unwrap();
assert_eq!(msg.magic, 0xd9b4bef9); assert_eq!(msg.magic, 0xd9b4bef9);
assert_eq!(msg.payload, NetworkMessage::Verack, "Wrong message type, expected VerackMessage"); assert_eq!(msg.payload, NetworkMessage::Verack, "Wrong message type, expected VerackMessage");
// Reading and checking the third message (Alert) // Reading and checking the third message (Alert)
let msg = reader.next_message().unwrap(); let msg = reader.read_next().unwrap();
check_alert_msg(&msg); check_alert_msg(&msg);
// Waiting TCP server thread to terminate // Waiting TCP server thread to terminate
handle.join().unwrap(); handle.join().unwrap();
} }
#[test]
fn read_block_from_file_test() {
use std::io;
use consensus::serialize;
use hex::decode as hex_decode;
use Block;
let normal_data = hex_decode("010000004ddccd549d28f385ab457e98d1b11ce80bfea2c5ab93015ade4973e400000000bf4473e53794beae34e64fccc471dace6ae544180816f89591894e0f417a914cd74d6e49ffff001d323b3a7b0201000000010000000000000000000000000000000000000000000000000000000000000000ffffffff0804ffff001d026e04ffffffff0100f2052a0100000043410446ef0102d1ec5240f0d061a4246c1bdef63fc3dbab7733052fbbf0ecd8f41fc26bf049ebb4f9527f374280259e7cfa99c48b0e3f39c51347a19a5819651503a5ac00000000010000000321f75f3139a013f50f315b23b0c9a2b6eac31e2bec98e5891c924664889942260000000049483045022100cb2c6b346a978ab8c61b18b5e9397755cbd17d6eb2fe0083ef32e067fa6c785a02206ce44e613f31d9a6b0517e46f3db1576e9812cc98d159bfdaf759a5014081b5c01ffffffff79cda0945903627c3da1f85fc95d0b8ee3e76ae0cfdc9a65d09744b1f8fc85430000000049483045022047957cdd957cfd0becd642f6b84d82f49b6cb4c51a91f49246908af7c3cfdf4a022100e96b46621f1bffcf5ea5982f88cef651e9354f5791602369bf5a82a6cd61a62501fffffffffe09f5fe3ffbf5ee97a54eb5e5069e9da6b4856ee86fc52938c2f979b0f38e82000000004847304402204165be9a4cbab8049e1af9723b96199bfd3e85f44c6b4c0177e3962686b26073022028f638da23fc003760861ad481ead4099312c60030d4cb57820ce4d33812a5ce01ffffffff01009d966b01000000434104ea1feff861b51fe3f5f8a3b12d0f4712db80e919548a80839fc47c6a21e66d957e9c5d8cd108c7a2d2324bad71f9904ac0ae7336507d785b17a2c115e427a32fac00000000").unwrap();
let cutoff_data = hex_decode("010000004ddccd549d28f385ab457e98d1b11ce80bfea2c5ab93015ade4973e400000000bf4473e53794beae34e64fccc471dace6ae544180816f89591894e0f417a914cd74d6e49ffff001d323b3a7b0201000000010000000000000000000000000000000000000000000000000000000000000000ffffffff0804ffff001d026e04ffffffff0100f2052a0100000043410446ef0102d1ec5240f0d061a4246c1bdef63fc3dbab7733052fbbf0ecd8f41fc26bf049ebb4f9527f374280259e7cfa99c48b0e3f39c51347a19a5819651503a5ac00000000010000000321f75f3139a013f50f315b23b0c9a2b6eac31e2bec98e5891c924664889942260000000049483045022100cb2c6b346a978ab8c61b18b5e9397755cbd17d6eb2fe0083ef32e067fa6c785a02206ce44e613f31d9a6b0517e46f3db1576e9812cc98d159bfdaf759a5014081b5c01ffffffff79cda0945903627c3da1f85fc95d0b8ee3e76ae0cfdc9a65d09744b1f8fc85430000000049483045022047957cdd957cfd0becd642f6b84d82f49b6cb4c51a91f49246908af7c3cfdf4a022100e96b46621f1bffcf5ea5982f88cef651e9354f5791602369bf5a82a6cd61a62501fffffffffe09f5fe3ffbf5ee97a54eb5e5069e9da6b4856ee86fc52938c2f979b0f38e82000000004847304402204165be9a4cbab8049e1af9723b96199bfd3e85f44c6b4c0177e3962686b26073022028f638da23fc003760861ad481ead4099312c60030d4cb57820ce4d33812a5ce01ffffffff01009d966b01000000434104ea1feff861b51fe3f5f8a3b12d0f4712db80e919548a80839fc47c6a21e66d957e9c5d8cd108c7a2d2324bad71f9904ac0ae7336507d785b17a2c115e427a32fac").unwrap();
let prevhash = hex_decode("4ddccd549d28f385ab457e98d1b11ce80bfea2c5ab93015ade4973e400000000").unwrap();
let merkle = hex_decode("bf4473e53794beae34e64fccc471dace6ae544180816f89591894e0f417a914c").unwrap();
let stream = io::BufReader::new(&normal_data[..]);
let mut reader = StreamReader::new(stream, None);
let normal_block = reader.read_next::<Block>();
let stream = io::BufReader::new(&cutoff_data[..]);
let mut reader = StreamReader::new(stream, None);
let cutoff_block = reader.read_next::<Block>();
assert!(normal_block.is_ok());
assert!(cutoff_block.is_err());
let block = normal_block.unwrap();
assert_eq!(block.header.version, 1);
assert_eq!(serialize(&block.header.prev_blockhash), prevhash);
assert_eq!(serialize(&block.header.merkle_root), merkle);
assert_eq!(block.header.time, 1231965655);
assert_eq!(block.header.bits, 486604799);
assert_eq!(block.header.nonce, 2067413810);
// should be also ok for a non-witness block as commitment is optional in that case
assert!(block.check_witness_commitment());
}
} }