Deprecate StreamReader

StreamReader before this commit is trying to repeatedly parse big object like
blocks at every read, causing useless overhead.
consensus_encode deal with partial data by simply blocking.

After this changes it doesn't look what remain of the StreamReader is really giving
value, so it's deprecated
This commit is contained in:
Riccardo Casatta 2021-10-21 11:03:47 +02:00
parent 9ca6c75b18
commit 5dfb93df71
No known key found for this signature in database
GPG Key ID: FD986A969E450397
2 changed files with 33 additions and 62 deletions

View File

@ -3,11 +3,10 @@ extern crate bitcoin;
use std::net::{IpAddr, Ipv4Addr, Shutdown, SocketAddr, TcpStream}; use std::net::{IpAddr, Ipv4Addr, Shutdown, SocketAddr, TcpStream};
use std::time::{SystemTime, UNIX_EPOCH}; use std::time::{SystemTime, UNIX_EPOCH};
use std::{env, process}; use std::{env, process};
use std::io::Write; use std::io::{Write, BufReader};
use bitcoin::consensus::encode; use bitcoin::consensus::{encode, Decodable};
use bitcoin::network::{address, constants, message, message_network}; use bitcoin::network::{address, constants, message, message_network};
use bitcoin::network::stream_reader::StreamReader;
use bitcoin::secp256k1; use bitcoin::secp256k1;
use bitcoin::secp256k1::rand::Rng; use bitcoin::secp256k1::rand::Rng;
@ -41,10 +40,10 @@ fn main() {
// Setup StreamReader // Setup StreamReader
let read_stream = stream.try_clone().unwrap(); let read_stream = stream.try_clone().unwrap();
let mut stream_reader = StreamReader::new(read_stream, None); let mut stream_reader = BufReader::new(read_stream);
loop { loop {
// Loop an retrieve new messages // Loop an retrieve new messages
let reply: message::RawNetworkMessage = stream_reader.read_next().unwrap(); let reply = message::RawNetworkMessage::consensus_decode(&mut stream_reader).unwrap();
match reply.payload { match reply.payload {
message::NetworkMessage::Version(_) => { message::NetworkMessage::Version(_) => {
println!("Received version message: {:?}", reply.payload); println!("Received version message: {:?}", reply.payload);

View File

@ -14,16 +14,16 @@
//! Stream reader. //! Stream reader.
//! //!
//! This module defines the `StreamReader` struct and its implementation which //! Deprecated
//! is used for parsing an incoming stream into separate `RawNetworkMessage`s, //!
//! handling assembling messages from multiple packets, or dealing with partial //! This module defines `StreamReader` struct and its implementation which is used
//! or multiple messages in the stream (e.g. when reading from a TCP socket). //! for parsing incoming stream into separate `Decodable`s, handling assembling
//! messages from multiple packets or dealing with partial or multiple messages in the stream
//! (like can happen with reading from TCP socket)
//! //!
use prelude::*;
use core::fmt; use core::fmt;
use io::{self, Read}; use io::Read;
use consensus::{encode, Decodable}; use consensus::{encode, Decodable};
@ -31,61 +31,36 @@ use consensus::{encode, Decodable};
pub struct StreamReader<R: Read> { pub struct StreamReader<R: Read> {
/// Stream to read from /// Stream to read from
pub stream: R, pub stream: R,
/// I/O buffer
data: Vec<u8>,
/// Buffer containing unparsed message part
unparsed: Vec<u8>
} }
impl<R: Read> fmt::Debug for StreamReader<R> { impl<R: Read> fmt::Debug for StreamReader<R> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "StreamReader with buffer_size={} and unparsed content {:?}", write!(f, "StreamReader")
self.data.capacity(), self.unparsed)
} }
} }
impl<R: Read> StreamReader<R> { impl<R: Read> StreamReader<R> {
/// Constructs new stream reader for a given input stream `stream` with /// Constructs new stream reader for a given input stream `stream`
/// optional parameter `buffer_size` determining reading buffer size #[deprecated(since="0.28.0", note="wrap you stream into a buffered reader if necessary and use consensus_encode directly")]
pub fn new(stream: R, buffer_size: Option<usize>) -> StreamReader<R> { pub fn new(stream: R, _buffer_size: Option<usize>) -> StreamReader<R> {
StreamReader { StreamReader {
stream, stream,
data: vec![0u8; buffer_size.unwrap_or(64 * 1024)],
unparsed: vec![]
} }
} }
/// Reads stream and parses next message from its current input, /// Reads stream and parses next message from its current input
/// also taking into account previously unparsed partial message (if there was such). #[deprecated(since="0.28.0", note="wrap you stream into a buffered reader if necessary and use consensus_encode directly")]
pub fn read_next<D: Decodable>(&mut self) -> Result<D, encode::Error> { pub fn read_next<D: Decodable>(&mut self) -> Result<D, encode::Error> {
loop { Decodable::consensus_decode(&mut self.stream)
match encode::deserialize_partial::<D>(&self.unparsed) {
// In this case we just have an incomplete data, so we need to read more
Err(encode::Error::Io(ref err)) if err.kind () == io::ErrorKind::UnexpectedEof => {
let count = self.stream.read(&mut self.data)?;
if count > 0 {
self.unparsed.extend(self.data[0..count].iter());
}
else {
return Err(encode::Error::Io(io::Error::from(io::ErrorKind::UnexpectedEof)));
}
},
Err(err) => return Err(err),
// We have successfully read from the buffer
Ok((message, index)) => {
self.unparsed.drain(..index);
return Ok(message)
},
}
}
} }
} }
#[allow(deprecated)]
#[cfg(test)] #[cfg(test)]
mod test { mod test {
use std::thread; use std::thread;
use std::time::Duration; use std::time::Duration;
use io::{self, BufReader, Write}; use io::{BufReader, Write};
use std::net::{TcpListener, TcpStream, Shutdown}; use std::net::{TcpListener, TcpStream, Shutdown};
use std::thread::JoinHandle; use std::thread::JoinHandle;
use network::constants::ServiceFlags; use network::constants::ServiceFlags;
@ -193,22 +168,6 @@ mod test {
} }
} }
#[test]
fn parse_multipartmsg_test() {
let stream = io::empty();
let mut reader = StreamReader::new(stream, None);
reader.unparsed = MSG_ALERT[..24].to_vec();
let message: Result<RawNetworkMessage, _> = reader.read_next();
assert!(message.is_err());
assert_eq!(reader.unparsed.len(), 24);
reader.unparsed = MSG_ALERT.to_vec();
let message = reader.read_next().unwrap();
assert_eq!(reader.unparsed.len(), 0);
check_alert_msg(&message);
}
#[test] #[test]
fn read_singlemsg_test() { fn read_singlemsg_test() {
let stream = MSG_VERSION[..].to_vec(); let stream = MSG_VERSION[..].to_vec();
@ -346,4 +305,17 @@ mod test {
// should be also ok for a non-witness block as commitment is optional in that case // should be also ok for a non-witness block as commitment is optional in that case
assert!(block.check_witness_commitment()); assert!(block.check_witness_commitment());
} }
#[test]
fn parse_multipartmsg_test() {
let mut multi = MSG_ALERT.to_vec();
multi.extend(&MSG_ALERT[..]);
let mut reader = StreamReader::new(&multi[..], None);
let message: Result<RawNetworkMessage, _> = reader.read_next();
assert!(message.is_ok());
check_alert_msg(&message.unwrap());
let message: Result<RawNetworkMessage, _> = reader.read_next();
assert!(message.is_ok());
check_alert_msg(&message.unwrap());
}
} }