integer_encoding/
reader.rs
1use std::io;
2use std::io::{Read, Result};
3
4use crate::fixed::FixedInt;
5use crate::varint::{VarInt, VarIntMaxSize, MSB};
6
7#[cfg(feature = "tokio_async")]
8use tokio::io::{AsyncRead, AsyncReadExt};
9
10#[cfg(feature = "futures_async")]
11use futures_util::{io::AsyncRead, io::AsyncReadExt};
12
13pub trait VarIntReader {
17 fn read_varint<VI: VarInt>(&mut self) -> Result<VI>;
24}
25
26#[cfg(any(feature = "tokio_async", feature = "futures_async"))]
27#[async_trait::async_trait]
29pub trait VarIntAsyncReader {
30 async fn read_varint_async<VI: VarInt>(&mut self) -> Result<VI>;
31}
32
33#[derive(Default)]
35pub struct VarIntProcessor {
36 buf: [u8; 10],
37 maxsize: usize,
38 i: usize,
39}
40
41impl VarIntProcessor {
42 fn new<VI: VarIntMaxSize>() -> VarIntProcessor {
43 VarIntProcessor {
44 maxsize: VI::varint_max_size(),
45 ..VarIntProcessor::default()
46 }
47 }
48 fn push(&mut self, b: u8) -> Result<()> {
49 if self.i >= self.maxsize {
50 return Err(io::Error::new(
51 io::ErrorKind::InvalidData,
52 "Unterminated varint",
53 ));
54 }
55 self.buf[self.i] = b;
56 self.i += 1;
57 Ok(())
58 }
59 fn finished(&self) -> bool {
60 self.i > 0 && (self.buf[self.i - 1] & MSB == 0)
61 }
62 fn decode<VI: VarInt>(&self) -> Option<VI> {
63 Some(VI::decode_var(&self.buf[0..self.i])?.0)
64 }
65}
66
67#[cfg(any(feature = "tokio_async", feature = "futures_async"))]
68#[async_trait::async_trait]
69impl<AR: AsyncRead + Unpin + Send> VarIntAsyncReader for AR {
70 async fn read_varint_async<VI: VarInt>(&mut self) -> Result<VI> {
71 let mut buf = [0 as u8; 1];
72 let mut p = VarIntProcessor::new::<VI>();
73
74 while !p.finished() {
75 let read = self.read(&mut buf).await?;
76
77 if read == 0 && p.i == 0 {
79 return Err(io::Error::new(io::ErrorKind::UnexpectedEof, "Reached EOF"));
80 }
81 if read == 0 {
82 break;
83 }
84
85 p.push(buf[0])?;
86 }
87
88 p.decode()
89 .ok_or_else(|| io::Error::new(io::ErrorKind::UnexpectedEof, "Reached EOF"))
90 }
91}
92
93impl<R: Read> VarIntReader for R {
94 fn read_varint<VI: VarInt>(&mut self) -> Result<VI> {
95 let mut buf = [0 as u8; 1];
96 let mut p = VarIntProcessor::new::<VI>();
97
98 while !p.finished() {
99 let read = self.read(&mut buf)?;
100
101 if read == 0 && p.i == 0 {
103 return Err(io::Error::new(io::ErrorKind::UnexpectedEof, "Reached EOF"));
104 }
105 if read == 0 {
106 break;
107 }
108
109 p.push(buf[0])?;
110 }
111
112 p.decode()
113 .ok_or_else(|| io::Error::new(io::ErrorKind::UnexpectedEof, "Reached EOF"))
114 }
115}
116
117pub trait FixedIntReader {
119 fn read_fixedint<FI: FixedInt>(&mut self) -> Result<FI>;
123}
124
125#[cfg(any(feature = "tokio_async", feature = "futures_async"))]
127#[async_trait::async_trait]
128pub trait FixedIntAsyncReader {
129 async fn read_fixedint_async<FI: FixedInt>(&mut self) -> Result<FI>;
130}
131
132#[cfg(any(feature = "tokio_async", feature = "futures_async"))]
133#[async_trait::async_trait]
134impl<AR: AsyncRead + Unpin + Send> FixedIntAsyncReader for AR {
135 async fn read_fixedint_async<FI: FixedInt>(&mut self) -> Result<FI> {
136 let mut buf = [0 as u8; 8];
137 self.read_exact(&mut buf[0..FI::required_space()]).await?;
138 Ok(FI::decode_fixed(&buf[0..FI::required_space()]))
139 }
140}
141
142impl<R: Read> FixedIntReader for R {
143 fn read_fixedint<FI: FixedInt>(&mut self) -> Result<FI> {
144 let mut buf = [0 as u8; 8];
145 self.read_exact(&mut buf[0..FI::required_space()])?;
146 Ok(FI::decode_fixed(&buf[0..FI::required_space()]))
147 }
148}