From 373f355b5a22a94b5ca477bb2f28aa719e74bcc3 Mon Sep 17 00:00:00 2001 From: Alekos Filini Date: Thu, 3 Dec 2020 12:25:08 +0100 Subject: [PATCH] Flush unrecognized network messages from the read buffer Currently whenever an unrecognized network message is received, it is never flushed from the read buffer, meaning that unless the stream is closed and recreated it will keep returning the same error every time `read_next()` is called. This commit adds the length of the message to `UnrecognizedNetworkCommand`, so that the `StreamReader` can flush those bytes before returning the error to the caller. --- src/consensus/encode.rs | 12 ++++++++---- src/network/message.rs | 5 +++-- src/network/stream_reader.rs | 4 ++++ 3 files changed, 15 insertions(+), 6 deletions(-) diff --git a/src/consensus/encode.rs b/src/consensus/encode.rs index 07b9853c..163eeda1 100644 --- a/src/consensus/encode.rs +++ b/src/consensus/encode.rs @@ -80,10 +80,12 @@ pub enum Error { ParseFailed(&'static str), /// Unsupported Segwit flag UnsupportedSegwitFlag(u8), - /// Unrecognized network command - UnrecognizedNetworkCommand(String), + /// Unrecognized network command with its length + UnrecognizedNetworkCommand(String, usize), /// Invalid Inventory type UnknownInventoryType(u32), + /// The network command is longer than the maximum allowed (12 chars) + NetworkCommandTooLong(String), } impl fmt::Display for Error { @@ -102,9 +104,10 @@ impl fmt::Display for Error { Error::ParseFailed(ref e) => write!(f, "parse failed: {}", e), Error::UnsupportedSegwitFlag(ref swflag) => write!(f, "unsupported segwit version: {}", swflag), - Error::UnrecognizedNetworkCommand(ref nwcmd) => write!(f, + Error::UnrecognizedNetworkCommand(ref nwcmd, _) => write!(f, "unrecognized network command: {}", nwcmd), Error::UnknownInventoryType(ref tp) => write!(f, "Unknown Inventory type: {}", tp), + Error::NetworkCommandTooLong(ref cmd) => write!(f, "Network Command too long: {}", cmd), } } } @@ -122,7 +125,8 @@ impl error::Error for Error { | Error::ParseFailed(..) | Error::UnsupportedSegwitFlag(..) | Error::UnrecognizedNetworkCommand(..) - | Error::UnknownInventoryType(..) => None, + | Error::UnknownInventoryType(..) + | Error::NetworkCommandTooLong(..) => None, } } } diff --git a/src/network/message.rs b/src/network/message.rs index 5b045cef..b0f399fa 100644 --- a/src/network/message.rs +++ b/src/network/message.rs @@ -70,7 +70,7 @@ impl Encodable for CommandString { let mut rawbytes = [0u8; 12]; let strbytes = self.0.as_bytes(); if strbytes.len() > 12 { - return Err(encode::Error::UnrecognizedNetworkCommand(self.0.clone().into_owned())); + return Err(encode::Error::NetworkCommandTooLong(self.0.clone().into_owned())); } rawbytes[..strbytes.len()].clone_from_slice(&strbytes[..]); rawbytes.consensus_encode(s) @@ -306,6 +306,7 @@ impl Decodable for RawNetworkMessage { let magic = Decodable::consensus_decode(&mut d)?; let cmd = CommandString::consensus_decode(&mut d)?.0; let raw_payload = CheckedData::consensus_decode(&mut d)?.0; + let raw_payload_len = raw_payload.len(); let mut mem_d = Cursor::new(raw_payload); let payload = match &cmd[..] { @@ -339,7 +340,7 @@ impl Decodable for RawNetworkMessage { "wtxidrelay" => NetworkMessage::WtxidRelay, "addrv2" => NetworkMessage::AddrV2(Decodable::consensus_decode(&mut mem_d)?), "sendaddrv2" => NetworkMessage::SendAddrV2, - _ => return Err(encode::Error::UnrecognizedNetworkCommand(cmd.into_owned())), + _ => return Err(encode::Error::UnrecognizedNetworkCommand(cmd.into_owned(), 4 + 12 + 4 + 4 + raw_payload_len)), // magic + msg str + payload len + checksum + payload }; Ok(RawNetworkMessage { magic: magic, diff --git a/src/network/stream_reader.rs b/src/network/stream_reader.rs index 501bf607..3ab347ec 100644 --- a/src/network/stream_reader.rs +++ b/src/network/stream_reader.rs @@ -68,6 +68,10 @@ impl StreamReader { return Err(encode::Error::Io(io::Error::from(io::ErrorKind::UnexpectedEof))); } }, + Err(encode::Error::UnrecognizedNetworkCommand(message, len)) => { + self.unparsed.drain(..len); + return Err(encode::Error::UnrecognizedNetworkCommand(message, len)) + }, Err(err) => return Err(err), // We have successfully read from the buffer Ok((message, index)) => {