integer_encoding/
reader.rs

1use std::io;
2use std::io::{Read, Result};
3
4use crate::fixed::FixedInt;
5use crate::varint::{VarInt, VarIntMaxSize, MSB};
6
7#[cfg(feature = "tokio_async")]
8use tokio::io::{AsyncRead, AsyncReadExt};
9
10#[cfg(feature = "futures_async")]
11use futures_util::{io::AsyncRead, io::AsyncReadExt};
12
13/// A trait for reading VarInts from any other `Reader`.
14///
15/// It's recommended to use a buffered reader, as many small reads will happen.
16pub trait VarIntReader {
17    /// Returns either the decoded integer, or an error.
18    ///
19    /// In general, this always reads a whole varint. If the encoded varint's value is bigger
20    /// than the valid value range of `VI`, then the value is truncated.
21    ///
22    /// On EOF, an io::Error with io::ErrorKind::UnexpectedEof is returned.
23    fn read_varint<VI: VarInt>(&mut self) -> Result<VI>;
24}
25
26#[cfg(any(feature = "tokio_async", feature = "futures_async"))]
27/// Like a VarIntReader, but returns a future.
28#[async_trait::async_trait]
29pub trait VarIntAsyncReader {
30    async fn read_varint_async<VI: VarInt>(&mut self) -> Result<VI>;
31}
32
33/// VarIntProcessor encapsulates the logic for decoding a VarInt byte-by-byte.
34#[derive(Default)]
35pub struct VarIntProcessor {
36    buf: [u8; 10],
37    maxsize: usize,
38    i: usize,
39}
40
41impl VarIntProcessor {
42    fn new<VI: VarIntMaxSize>() -> VarIntProcessor {
43        VarIntProcessor {
44            maxsize: VI::varint_max_size(),
45            ..VarIntProcessor::default()
46        }
47    }
48    fn push(&mut self, b: u8) -> Result<()> {
49        if self.i >= self.maxsize {
50            return Err(io::Error::new(
51                io::ErrorKind::InvalidData,
52                "Unterminated varint",
53            ));
54        }
55        self.buf[self.i] = b;
56        self.i += 1;
57        Ok(())
58    }
59    fn finished(&self) -> bool {
60        self.i > 0 && (self.buf[self.i - 1] & MSB == 0)
61    }
62    fn decode<VI: VarInt>(&self) -> Option<VI> {
63        Some(VI::decode_var(&self.buf[0..self.i])?.0)
64    }
65}
66
67#[cfg(any(feature = "tokio_async", feature = "futures_async"))]
68#[async_trait::async_trait]
69impl<AR: AsyncRead + Unpin + Send> VarIntAsyncReader for AR {
70    async fn read_varint_async<VI: VarInt>(&mut self) -> Result<VI> {
71        let mut buf = [0 as u8; 1];
72        let mut p = VarIntProcessor::new::<VI>();
73
74        while !p.finished() {
75            let read = self.read(&mut buf).await?;
76
77            // EOF
78            if read == 0 && p.i == 0 {
79                return Err(io::Error::new(io::ErrorKind::UnexpectedEof, "Reached EOF"));
80            }
81            if read == 0 {
82                break;
83            }
84
85            p.push(buf[0])?;
86        }
87
88        p.decode()
89            .ok_or_else(|| io::Error::new(io::ErrorKind::UnexpectedEof, "Reached EOF"))
90    }
91}
92
93impl<R: Read> VarIntReader for R {
94    fn read_varint<VI: VarInt>(&mut self) -> Result<VI> {
95        let mut buf = [0 as u8; 1];
96        let mut p = VarIntProcessor::new::<VI>();
97
98        while !p.finished() {
99            let read = self.read(&mut buf)?;
100
101            // EOF
102            if read == 0 && p.i == 0 {
103                return Err(io::Error::new(io::ErrorKind::UnexpectedEof, "Reached EOF"));
104            }
105            if read == 0 {
106                break;
107            }
108
109            p.push(buf[0])?;
110        }
111
112        p.decode()
113            .ok_or_else(|| io::Error::new(io::ErrorKind::UnexpectedEof, "Reached EOF"))
114    }
115}
116
117/// A trait for reading FixedInts from any other `Reader`.
118pub trait FixedIntReader {
119    /// Read a fixed integer from a reader. How many bytes are read depends on `FI`.
120    ///
121    /// On EOF, an io::Error with io::ErrorKind::UnexpectedEof is returned.
122    fn read_fixedint<FI: FixedInt>(&mut self) -> Result<FI>;
123}
124
125/// Like FixedIntReader, but returns a future.
126#[cfg(any(feature = "tokio_async", feature = "futures_async"))]
127#[async_trait::async_trait]
128pub trait FixedIntAsyncReader {
129    async fn read_fixedint_async<FI: FixedInt>(&mut self) -> Result<FI>;
130}
131
132#[cfg(any(feature = "tokio_async", feature = "futures_async"))]
133#[async_trait::async_trait]
134impl<AR: AsyncRead + Unpin + Send> FixedIntAsyncReader for AR {
135    async fn read_fixedint_async<FI: FixedInt>(&mut self) -> Result<FI> {
136        let mut buf = [0 as u8; 8];
137        self.read_exact(&mut buf[0..FI::required_space()]).await?;
138        Ok(FI::decode_fixed(&buf[0..FI::required_space()]))
139    }
140}
141
142impl<R: Read> FixedIntReader for R {
143    fn read_fixedint<FI: FixedInt>(&mut self) -> Result<FI> {
144        let mut buf = [0 as u8; 8];
145        self.read_exact(&mut buf[0..FI::required_space()])?;
146        Ok(FI::decode_fixed(&buf[0..FI::required_space()]))
147    }
148}