From 5dfb93df7108d109154727a0ba3dc5b0d7680784 Mon Sep 17 00:00:00 2001 From: Riccardo Casatta Date: Thu, 21 Oct 2021 11:03:47 +0200 Subject: [PATCH] Deprecate StreamReader StreamReader before this commit is trying to repeatedly parse big object like blocks at every read, causing useless overhead. consensus_encode deal with partial data by simply blocking. After this changes it doesn't look what remain of the StreamReader is really giving value, so it's deprecated --- examples/handshake.rs | 9 ++-- src/network/stream_reader.rs | 86 ++++++++++++------------------------ 2 files changed, 33 insertions(+), 62 deletions(-) diff --git a/examples/handshake.rs b/examples/handshake.rs index 036d4223..9c4cf046 100644 --- a/examples/handshake.rs +++ b/examples/handshake.rs @@ -3,11 +3,10 @@ extern crate bitcoin; use std::net::{IpAddr, Ipv4Addr, Shutdown, SocketAddr, TcpStream}; use std::time::{SystemTime, UNIX_EPOCH}; use std::{env, process}; -use std::io::Write; +use std::io::{Write, BufReader}; -use bitcoin::consensus::encode; +use bitcoin::consensus::{encode, Decodable}; use bitcoin::network::{address, constants, message, message_network}; -use bitcoin::network::stream_reader::StreamReader; use bitcoin::secp256k1; use bitcoin::secp256k1::rand::Rng; @@ -41,10 +40,10 @@ fn main() { // Setup StreamReader let read_stream = stream.try_clone().unwrap(); - let mut stream_reader = StreamReader::new(read_stream, None); + let mut stream_reader = BufReader::new(read_stream); loop { // Loop an retrieve new messages - let reply: message::RawNetworkMessage = stream_reader.read_next().unwrap(); + let reply = message::RawNetworkMessage::consensus_decode(&mut stream_reader).unwrap(); match reply.payload { message::NetworkMessage::Version(_) => { println!("Received version message: {:?}", reply.payload); diff --git a/src/network/stream_reader.rs b/src/network/stream_reader.rs index 4b5d61d3..3becfabc 100644 --- a/src/network/stream_reader.rs +++ b/src/network/stream_reader.rs @@ -14,16 +14,16 @@ //! Stream reader. //! -//! This module defines the `StreamReader` struct and its implementation which -//! is used for parsing an incoming stream into separate `RawNetworkMessage`s, -//! handling assembling messages from multiple packets, or dealing with partial -//! or multiple messages in the stream (e.g. when reading from a TCP socket). +//! Deprecated +//! +//! This module defines `StreamReader` struct and its implementation which is used +//! for parsing incoming stream into separate `Decodable`s, handling assembling +//! messages from multiple packets or dealing with partial or multiple messages in the stream +//! (like can happen with reading from TCP socket) //! -use prelude::*; - use core::fmt; -use io::{self, Read}; +use io::Read; use consensus::{encode, Decodable}; @@ -31,61 +31,36 @@ use consensus::{encode, Decodable}; pub struct StreamReader { /// Stream to read from pub stream: R, - /// I/O buffer - data: Vec, - /// Buffer containing unparsed message part - unparsed: Vec } impl fmt::Debug for StreamReader { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!(f, "StreamReader with buffer_size={} and unparsed content {:?}", - self.data.capacity(), self.unparsed) + write!(f, "StreamReader") } } impl StreamReader { - /// Constructs new stream reader for a given input stream `stream` with - /// optional parameter `buffer_size` determining reading buffer size - pub fn new(stream: R, buffer_size: Option) -> StreamReader { + /// Constructs new stream reader for a given input stream `stream` + #[deprecated(since="0.28.0", note="wrap you stream into a buffered reader if necessary and use consensus_encode directly")] + pub fn new(stream: R, _buffer_size: Option) -> StreamReader { StreamReader { stream, - data: vec![0u8; buffer_size.unwrap_or(64 * 1024)], - unparsed: vec![] } } - /// Reads stream and parses next message from its current input, - /// also taking into account previously unparsed partial message (if there was such). + /// Reads stream and parses next message from its current input + #[deprecated(since="0.28.0", note="wrap you stream into a buffered reader if necessary and use consensus_encode directly")] pub fn read_next(&mut self) -> Result { - loop { - match encode::deserialize_partial::(&self.unparsed) { - // In this case we just have an incomplete data, so we need to read more - Err(encode::Error::Io(ref err)) if err.kind () == io::ErrorKind::UnexpectedEof => { - let count = self.stream.read(&mut self.data)?; - if count > 0 { - self.unparsed.extend(self.data[0..count].iter()); - } - else { - return Err(encode::Error::Io(io::Error::from(io::ErrorKind::UnexpectedEof))); - } - }, - Err(err) => return Err(err), - // We have successfully read from the buffer - Ok((message, index)) => { - self.unparsed.drain(..index); - return Ok(message) - }, - } - } + Decodable::consensus_decode(&mut self.stream) } } +#[allow(deprecated)] #[cfg(test)] mod test { use std::thread; use std::time::Duration; - use io::{self, BufReader, Write}; + use io::{BufReader, Write}; use std::net::{TcpListener, TcpStream, Shutdown}; use std::thread::JoinHandle; use network::constants::ServiceFlags; @@ -193,22 +168,6 @@ mod test { } } - #[test] - fn parse_multipartmsg_test() { - let stream = io::empty(); - let mut reader = StreamReader::new(stream, None); - reader.unparsed = MSG_ALERT[..24].to_vec(); - let message: Result = reader.read_next(); - assert!(message.is_err()); - assert_eq!(reader.unparsed.len(), 24); - - reader.unparsed = MSG_ALERT.to_vec(); - let message = reader.read_next().unwrap(); - assert_eq!(reader.unparsed.len(), 0); - - check_alert_msg(&message); - } - #[test] fn read_singlemsg_test() { let stream = MSG_VERSION[..].to_vec(); @@ -346,4 +305,17 @@ mod test { // should be also ok for a non-witness block as commitment is optional in that case assert!(block.check_witness_commitment()); } + + #[test] + fn parse_multipartmsg_test() { + let mut multi = MSG_ALERT.to_vec(); + multi.extend(&MSG_ALERT[..]); + let mut reader = StreamReader::new(&multi[..], None); + let message: Result = reader.read_next(); + assert!(message.is_ok()); + check_alert_msg(&message.unwrap()); + let message: Result = reader.read_next(); + assert!(message.is_ok()); + check_alert_msg(&message.unwrap()); + } }