Replaced Read trait with a generic over Read (#307)

Removed tempfile usage from stream_reader
This commit is contained in:
Elichai Turkel 2019-08-07 11:35:22 -04:00 committed by Tamás Blummer
parent db8c8b497b
commit 4a1830c423
2 changed files with 15 additions and 34 deletions

View File

@ -41,4 +41,3 @@ features = [ "rand" ]
serde_derive = "1" serde_derive = "1"
serde_json = "1" serde_json = "1"
serde_test = "1" serde_test = "1"
tempfile = "3"

View File

@ -28,26 +28,26 @@ use network::message::RawNetworkMessage;
use consensus::encode; use consensus::encode;
/// Struct used to configure stream reader function /// Struct used to configure stream reader function
pub struct StreamReader<'a> { pub struct StreamReader<R> {
/// Stream to read from /// Stream to read from
pub stream: &'a mut Read, pub stream: R,
/// I/O buffer /// I/O buffer
data: Vec<u8>, data: Vec<u8>,
/// Buffer containing unparsed message part /// Buffer containing unparsed message part
unparsed: Vec<u8> unparsed: Vec<u8>
} }
impl<'a> fmt::Debug for StreamReader<'a> { impl<R> fmt::Debug for StreamReader<R> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "StreamReader with buffer_size={} and unparsed content {:?}", write!(f, "StreamReader with buffer_size={} and unparsed content {:?}",
self.data.capacity(), self.unparsed) self.data.capacity(), self.unparsed)
} }
} }
impl<'a> StreamReader<'a> { impl<R: Read> StreamReader<R> {
/// Constructs new stream reader for a given input stream `stream` with /// Constructs new stream reader for a given input stream `stream` with
/// optional parameter `buffer_size` determining reading buffer size /// optional parameter `buffer_size` determining reading buffer size
pub fn new(stream: &mut Read, buffer_size: Option<usize>) -> StreamReader { pub fn new(stream: R, buffer_size: Option<usize>) -> StreamReader<R> {
StreamReader { StreamReader {
stream, stream,
data: vec![0u8; buffer_size.unwrap_or(64 * 1024)], data: vec![0u8; buffer_size.unwrap_or(64 * 1024)],
@ -84,12 +84,10 @@ impl<'a> StreamReader<'a> {
#[cfg(test)] #[cfg(test)]
mod test { mod test {
extern crate tempfile;
use std::thread; use std::thread;
use std::fs::File;
use std::time::Duration; use std::time::Duration;
use std::io::{Write, Seek, SeekFrom}; use std::io::{self, Write};
use std::net::{TcpListener, TcpStream, Shutdown}; use std::net::{TcpListener, TcpStream, Shutdown};
use std::thread::JoinHandle; use std::thread::JoinHandle;
@ -198,8 +196,7 @@ mod test {
#[test] #[test]
fn parse_multipartmsg_test() { fn parse_multipartmsg_test() {
let mut tmpfile: File = tempfile::tempfile().unwrap(); let mut reader = StreamReader::new(io::empty(), None);
let mut reader = StreamReader::new(&mut tmpfile, None);
reader.unparsed = MSG_ALERT[..24].to_vec(); reader.unparsed = MSG_ALERT[..24].to_vec();
assert!(reader.next_message().is_err()); assert!(reader.next_message().is_err());
assert_eq!(reader.unparsed.len(), 24); assert_eq!(reader.unparsed.len(), 24);
@ -211,33 +208,18 @@ mod test {
check_alert_msg(&message); check_alert_msg(&message);
} }
fn init_stream(buf: &[u8]) -> File {
let mut tmpfile: File = tempfile::tempfile().unwrap();
write_file(&mut tmpfile, &buf);
tmpfile
}
fn write_file(tmpfile: &mut File, buf: &[u8]) {
tmpfile.seek(SeekFrom::End(0)).unwrap();
tmpfile.write(&buf).unwrap();
tmpfile.flush().unwrap();
tmpfile.seek(SeekFrom::Start(0)).unwrap();
}
#[test] #[test]
fn read_singlemsg_test() { fn read_singlemsg_test() {
let mut stream = init_stream(&MSG_VERSION); let message = StreamReader::new(&MSG_VERSION[..], None).next_message().unwrap();
let message = StreamReader::new(&mut stream, None).next_message().unwrap();
check_version_msg(&message); check_version_msg(&message);
} }
#[test] #[test]
fn read_doublemsgs_test() { fn read_doublemsgs_test() {
let mut stream = init_stream(&MSG_VERSION); let mut stream = MSG_VERSION.to_vec();
write_file(&mut stream, &MSG_PING); stream.extend(&MSG_PING);
let mut reader = StreamReader::new(&mut stream, None); let mut reader = StreamReader::new(&stream[..], None);
let message = reader.next_message().unwrap(); let message = reader.next_message().unwrap();
check_version_msg(&message); check_version_msg(&message);
@ -283,11 +265,11 @@ mod test {
#[test] #[test]
fn read_multipartmsg_test() { fn read_multipartmsg_test() {
// Setting up TCP connection emulation // Setting up TCP connection emulation
let (handle, mut istream) = serve_tcp(vec![ let (handle, istream) = serve_tcp(vec![
// single message split in two parts to emulate real network conditions // single message split in two parts to emulate real network conditions
MSG_VERSION[..24].to_vec(), MSG_VERSION[24..].to_vec() MSG_VERSION[..24].to_vec(), MSG_VERSION[24..].to_vec()
]); ]);
let mut reader = StreamReader::new(&mut istream, None); let mut reader = StreamReader::new(istream, None);
// Reading and checking the whole message back // Reading and checking the whole message back
let message = reader.next_message().unwrap(); let message = reader.next_message().unwrap();
@ -300,13 +282,13 @@ mod test {
#[test] #[test]
fn read_sequencemsg_test() { fn read_sequencemsg_test() {
// Setting up TCP connection emulation // Setting up TCP connection emulation
let (handle, mut istream) = serve_tcp(vec![ let (handle, istream) = serve_tcp(vec![
// Real-world Bitcoin core communication case for /Satoshi:0.17.1/ // Real-world Bitcoin core communication case for /Satoshi:0.17.1/
MSG_VERSION[..23].to_vec(), MSG_VERSION[23..].to_vec(), MSG_VERSION[..23].to_vec(), MSG_VERSION[23..].to_vec(),
MSG_VERACK.to_vec(), MSG_VERACK.to_vec(),
MSG_ALERT[..24].to_vec(), MSG_ALERT[24..].to_vec() MSG_ALERT[..24].to_vec(), MSG_ALERT[24..].to_vec()
]); ]);
let mut reader = StreamReader::new(&mut istream, None); let mut reader = StreamReader::new(istream, None);
// Reading and checking the first message (Version) // Reading and checking the first message (Version)
let message = reader.next_message().unwrap(); let message = reader.next_message().unwrap();