bytes_utils/
segmented.rs

1#![forbid(unsafe_code)]
2
3use std::cmp;
4use std::collections::VecDeque;
5use std::io::IoSlice;
6use std::iter::FromIterator;
7
8use bytes::{Buf, BufMut, Bytes, BytesMut};
9
10fn chunks_vectored<'s, B, I>(bufs: I, dst: &mut [IoSlice<'s>]) -> usize
11where
12    I: Iterator<Item = &'s B>,
13    B: Buf + 's,
14{
15    let mut filled = 0;
16    for buf in bufs {
17        if filled == dst.len() {
18            break;
19        }
20        filled += buf.chunks_vectored(&mut dst[filled..]);
21    }
22    filled
23}
24
25/// A consumable view of a sequence of buffers.
26///
27/// This allows viewing a sequence of buffers as one buffer, without copying the bytes over. Unlike
28/// the [SegmentedBuf], this doesn't allow for appending more buffers and doesn't drop the buffers
29/// as they are exhausted (though they all get exhausted, no leftovers are kept in them as the
30/// caller advances through it). On the other hand, it doesn't require an internal allocation in
31/// the form of VecDeque and can be based on any kind of slice.
32///
33/// # Example
34///
35/// ```rust
36/// # use bytes_utils::SegmentedSlice;
37/// # use bytes::Buf;
38/// # use std::io::Read;
39/// let mut buffers = [b"Hello" as &[_], b"", b" ", b"", b"World"];
40/// let buf = SegmentedSlice::new(&mut buffers);
41///
42/// assert_eq!(11, buf.remaining());
43/// assert_eq!(b"Hello", buf.chunk());
44///
45/// let mut out = String::new();
46/// buf.reader().read_to_string(&mut out).expect("Doesn't cause IO errors");
47/// assert_eq!("Hello World", out);
48/// ```
49///
50/// # Optimizations
51///
52/// The [copy_to_bytes][SegmentedSlice::copy_to_bytes] method tries to avoid copies by delegating
53/// into the underlying buffer if possible (if the whole request can be fulfilled using only a
54/// single buffer). If that one is optimized (for example, the [Bytes] returns a shared instance
55/// instead of making a copy), the copying is avoided. If the request is across a buffer boundary,
56/// a copy is made.
57///
58/// The [chunks_vectored][SegmentedSlice::chunks_vectored] will properly output as many slices as
59/// possible, not just 1 as the default implementation does.
60#[derive(Debug, Default)]
61pub struct SegmentedSlice<'a, B> {
62    remaining: usize,
63    idx: usize,
64    bufs: &'a mut [B],
65}
66
67impl<'a, B: Buf> SegmentedSlice<'a, B> {
68    /// Creates a new buffer out of a slice of buffers.
69    ///
70    /// The buffers will then be taken in order to form one bigger buffer.
71    ///
72    /// Each of the buffers in turn will be exhausted using its [advance][Buf::advance] before
73    /// proceeding to the next one. Note that the buffers are not dropped (unlike with
74    /// [SegmentedBuf]).
75    pub fn new(bufs: &'a mut [B]) -> Self {
76        let remaining = bufs.iter().map(Buf::remaining).sum();
77        let mut me = Self {
78            remaining,
79            idx: 0,
80            bufs,
81        };
82        me.clean_empty();
83        me
84    }
85
86    fn clean_empty(&mut self) {
87        while self.idx < self.bufs.len() && !self.bufs[self.idx].has_remaining() {
88            self.idx += 1;
89        }
90    }
91}
92
93impl<'a, B: Buf> Buf for SegmentedSlice<'a, B> {
94    fn remaining(&self) -> usize {
95        self.remaining
96    }
97
98    fn chunk(&self) -> &[u8] {
99        self.bufs.get(self.idx).map(Buf::chunk).unwrap_or_default()
100    }
101
102    fn advance(&mut self, mut cnt: usize) {
103        self.remaining -= cnt;
104        while cnt > 0 {
105            let first = &mut self.bufs[self.idx];
106            let rem = first.remaining();
107            let segment = cmp::min(rem, cnt);
108            first.advance(segment);
109            cnt -= segment;
110            self.clean_empty();
111        }
112    }
113
114    fn copy_to_bytes(&mut self, len: usize) -> Bytes {
115        assert!(len <= self.remaining(), "`len` greater than remaining");
116        match self.bufs.get_mut(self.idx) {
117            // Special optimized case. The whole request comes from the front buffer. That one may
118            // be optimized to do something more efficient, like slice the Bytes (if B == Bytes)
119            // instead of copying, so we take the opportunity if it offers itself.
120            Some(front) if front.remaining() >= len => {
121                self.remaining -= len;
122                let res = front.copy_to_bytes(len);
123                self.clean_empty();
124                res
125            }
126            // The general case, borrowed from the default implementation (there's no way to
127            // delegate to it, is there?)
128            _ => {
129                let mut res = BytesMut::with_capacity(len);
130                res.put(self.take(len));
131                res.freeze()
132            }
133        }
134    }
135
136    fn chunks_vectored<'s>(&'s self, dst: &mut [IoSlice<'s>]) -> usize {
137        let bufs = self.bufs.get(self.idx..).unwrap_or_default();
138        chunks_vectored(bufs.iter(), dst)
139    }
140}
141
142/// A concatenation of multiple buffers into a large one, without copying the bytes over.
143///
144/// Note that this doesn't provide a continuous slice view into them, it is split into the segments
145/// of the original smaller buffers.
146///
147/// This variants drop the inner buffers as they are exhausted and new ones can be added. But it
148/// internally keeps a [VecDeque], therefore needs a heap allocation. If you don't need the
149/// extending behaviour, but want to avoid the allocation, the [SegmentedSlice] can be used instead.
150///
151/// # Why
152///
153/// This can be used, for example, if data of unknown length is coming over the network (for
154/// example, the bodies in [hyper] act a bit like this, it returns a stream of [Bytes] buffers).
155/// One might want to accumulate the whole body before acting on it, possibly by parsing it through
156/// [serde] or [prost]. Options would include:
157///
158/// * Have a `Vec<u8>` and extend it with each chunk. This needlessly copy the bytes every time and
159///   reallocates if the vector grows too large.
160/// * Repeatedly use [chain][Buf::chain], but this changes the type of the whole buffer, therefore
161///   needs to be boxed.
162/// * Use [hyper::body::aggregate] to create a [Buf] implementation that concatenates all of them
163///   together, but lacks any kind of flexibility (like protecting against loading too much data
164///   into memory).
165///
166/// This type allows for concatenating multiple buffers, either all at once, or by incrementally
167/// pushing more buffers to the end.
168///
169/// # Heterogeneous buffers
170///
171/// This expects all the buffers are of the same type. If different-typed buffers are needed, one
172/// needs to use dynamic dispatch, either something like `SegmentedBuf<Box<Buf>>` or
173/// `SegmentedBuf<&mut Buf>`.
174///
175/// # Example
176///
177/// ```rust
178/// # use std::io::Read;
179/// # use bytes::{Bytes, Buf};
180/// # use bytes_utils::SegmentedBuf;
181/// let mut buf = SegmentedBuf::new();
182/// buf.push(Bytes::from("Hello"));
183/// buf.push(Bytes::from(" "));
184/// buf.push(Bytes::from("World"));
185///
186/// assert_eq!(3, buf.segments());
187/// assert_eq!(11, buf.remaining());
188/// assert_eq!(b"Hello", buf.chunk());
189///
190/// let mut out = String::new();
191/// buf.reader().read_to_string(&mut out).expect("Doesn't cause IO errors");
192/// assert_eq!("Hello World", out);
193/// ```
194///
195/// # FIFO behaviour
196///
197/// The buffers are dropped once their data are completely consumed. Additionally, it is possible
198/// to add more buffers to the end, even while some of the previous buffers were partially or fully
199/// consumed. That makes it usable as kind of a queue (that operates on the buffers, not individual
200/// bytes).
201///
202/// ```rust
203/// # use bytes::{Bytes, Buf};
204/// # use bytes_utils::SegmentedBuf;
205/// let mut buf = SegmentedBuf::new();
206/// buf.push(Bytes::from("Hello"));
207/// assert_eq!(1, buf.segments());
208///
209/// let mut out = [0; 3];
210/// buf.copy_to_slice(&mut out);
211/// assert_eq!(&out, b"Hel");
212/// assert_eq!(2, buf.remaining());
213/// assert_eq!(1, buf.segments());
214///
215/// buf.push(Bytes::from("World"));
216/// assert_eq!(7, buf.remaining());
217/// assert_eq!(2, buf.segments());
218///
219/// buf.copy_to_slice(&mut out);
220/// assert_eq!(&out, b"loW");
221/// assert_eq!(4, buf.remaining());
222/// assert_eq!(1, buf.segments());
223/// ```
224///
225/// # Optimizations
226///
227/// The [copy_to_bytes][SegmentedBuf::copy_to_bytes] method tries to avoid copies by delegating
228/// into the underlying buffer if possible (if the whole request can be fulfilled using only a
229/// single buffer). If that one is optimized (for example, the [Bytes] returns a shared instance
230/// instead of making a copy), the copying is avoided. If the request is across a buffer boundary,
231/// a copy is made.
232///
233/// The [chunks_vectored][SegmentedBuf::chunks_vectored] will properly output as many slices as
234/// possible, not just 1 as the default implementation does.
235///
236/// [hyper]: https://docs.rs/hyper
237/// [serde]: https://docs.rs/serde
238/// [prost]: https://docs.rs/prost
239/// [hyper::body::aggregate]: https://docs.rs/hyper/0.14.2/hyper/body/fn.aggregate.html
240#[derive(Clone, Debug)]
241pub struct SegmentedBuf<B> {
242    bufs: VecDeque<B>,
243    // Pre-computed sum of the total remaning
244    remaining: usize,
245}
246
247impl<B> SegmentedBuf<B> {
248    /// Creates a new empty instance.
249    ///
250    /// The instance can be [pushed][SegmentedBuf::push] or [extended][Extend] later.
251    ///
252    /// Alternatively, one may create it directly from an iterator, a [Vec] or a [VecDeque] of
253    /// buffers.
254    pub fn new() -> Self {
255        Self::default()
256    }
257
258    /// Returns the yet unconsumed sequence of buffers.
259    pub fn into_inner(self) -> VecDeque<B> {
260        self.into()
261    }
262
263    /// Returns the number of segments (buffers) this contains.
264    pub fn segments(&self) -> usize {
265        self.bufs.len()
266    }
267}
268
269impl<B: Buf> SegmentedBuf<B> {
270    /// Extends the buffer by another segment.
271    ///
272    /// The newly added segment is added to the end of the buffer (the buffer works as a FIFO).
273    pub fn push(&mut self, buf: B) {
274        self.remaining += buf.remaining();
275        self.bufs.push_back(buf);
276        self.clean_empty();
277    }
278    fn update_remaining(&mut self) {
279        self.remaining = self.bufs.iter().map(Buf::remaining).sum();
280    }
281    fn clean_empty(&mut self) {
282        loop {
283            match self.bufs.front() {
284                Some(b) if !b.has_remaining() => {
285                    self.bufs.pop_front();
286                }
287                _ => break,
288            }
289        }
290    }
291}
292
293impl<B> Default for SegmentedBuf<B> {
294    fn default() -> Self {
295        Self {
296            bufs: VecDeque::new(),
297            remaining: 0,
298        }
299    }
300}
301
302impl<B: Buf> From<Vec<B>> for SegmentedBuf<B> {
303    fn from(bufs: Vec<B>) -> Self {
304        Self::from(VecDeque::from(bufs))
305    }
306}
307
308impl<B: Buf> From<VecDeque<B>> for SegmentedBuf<B> {
309    fn from(bufs: VecDeque<B>) -> Self {
310        let mut me = Self { bufs, remaining: 0 };
311        me.clean_empty();
312        me.update_remaining();
313        me
314    }
315}
316
317impl<B> From<SegmentedBuf<B>> for VecDeque<B> {
318    fn from(me: SegmentedBuf<B>) -> Self {
319        me.bufs
320    }
321}
322
323impl<B: Buf> Extend<B> for SegmentedBuf<B> {
324    fn extend<T: IntoIterator<Item = B>>(&mut self, iter: T) {
325        self.bufs.extend(iter);
326        self.clean_empty();
327        self.update_remaining();
328    }
329}
330
331impl<B: Buf> FromIterator<B> for SegmentedBuf<B> {
332    fn from_iter<T: IntoIterator<Item = B>>(iter: T) -> Self {
333        let mut me = Self {
334            bufs: VecDeque::from_iter(iter),
335            remaining: 0,
336        };
337        me.clean_empty();
338        me.update_remaining();
339        me
340    }
341}
342
343impl<B: Buf> Buf for SegmentedBuf<B> {
344    fn remaining(&self) -> usize {
345        self.remaining
346    }
347
348    fn chunk(&self) -> &[u8] {
349        self.bufs.front().map(Buf::chunk).unwrap_or_default()
350    }
351
352    fn advance(&mut self, mut cnt: usize) {
353        assert!(cnt <= self.remaining, "Advance past the end of buffer");
354        self.remaining -= cnt;
355        while cnt > 0 {
356            let front = self
357                .bufs
358                .front_mut()
359                .expect("Missing buffers to provide remaining");
360            let front_remaining = front.remaining();
361            if front_remaining >= cnt {
362                front.advance(cnt);
363                break;
364            } else {
365                // We advance past the whole front buffer
366                cnt -= front_remaining;
367                self.bufs.pop_front();
368            }
369        }
370        self.clean_empty();
371    }
372
373    fn copy_to_bytes(&mut self, len: usize) -> Bytes {
374        assert!(len <= self.remaining(), "`len` greater than remaining");
375        match self.bufs.front_mut() {
376            // Special optimized case. The whole request comes from the front buffer. That one may
377            // be optimized to do something more efficient, like slice the Bytes (if B == Bytes)
378            // instead of copying, so we take the opportunity if it offers itself.
379            Some(front) if front.remaining() >= len => {
380                self.remaining -= len;
381                let res = front.copy_to_bytes(len);
382                self.clean_empty();
383                res
384            }
385            // The general case, borrowed from the default implementation (there's no way to
386            // delegate to it, is there?)
387            _ => {
388                let mut res = BytesMut::with_capacity(len);
389                res.put(self.take(len));
390                res.freeze()
391            }
392        }
393    }
394
395    fn chunks_vectored<'a>(&'a self, dst: &mut [IoSlice<'a>]) -> usize {
396        chunks_vectored(self.bufs.iter(), dst)
397    }
398}
399
400#[cfg(test)]
401mod tests {
402    use std::io::Read;
403    use std::ops::Deref;
404
405    use proptest::prelude::*;
406
407    use super::*;
408
409    #[test]
410    fn empty() {
411        let mut b = SegmentedBuf::<Bytes>::new();
412
413        assert!(!b.has_remaining());
414        assert_eq!(0, b.remaining());
415        assert!(b.chunk().is_empty());
416        assert_eq!(0, b.segments());
417
418        b.copy_to_slice(&mut []);
419        b.advance(0);
420        assert_eq!(0, b.reader().read(&mut [0; 10]).unwrap());
421    }
422
423    #[test]
424    fn empty_slices() {
425        let mut b = SegmentedSlice::<&[u8]>::default();
426
427        assert!(!b.has_remaining());
428        assert_eq!(0, b.remaining());
429        assert!(b.chunk().is_empty());
430
431        b.copy_to_slice(&mut []);
432        b.advance(0);
433        assert_eq!(0, b.reader().read(&mut [0; 10]).unwrap());
434    }
435
436    fn segmented() -> SegmentedBuf<Bytes> {
437        vec![
438            Bytes::from("Hello"),
439            Bytes::from(" "),
440            Bytes::new(),
441            Bytes::from("World"),
442        ]
443        .into()
444    }
445
446    #[test]
447    fn segments() {
448        let mut b = segmented();
449        assert_eq!(11, b.remaining());
450        assert_eq!(b"Hello", b.chunk());
451        assert_eq!(4, b.segments());
452        b.advance(3);
453        assert_eq!(8, b.remaining());
454        assert_eq!(b"lo", b.chunk());
455        assert_eq!(4, b.segments());
456    }
457
458    #[test]
459    fn to_bytes_all() {
460        let mut b = segmented();
461        let bytes = b.copy_to_bytes(11);
462        assert_eq!("Hello World", &bytes);
463    }
464
465    #[test]
466    fn advance_within() {
467        let mut b = segmented();
468        b.advance(2);
469        assert_eq!(4, b.segments());
470        assert_eq!(9, b.remaining());
471        assert_eq!(b"llo", b.chunk());
472    }
473
474    #[test]
475    fn advance_border() {
476        let mut b = segmented();
477        b.advance(5);
478        assert_eq!(3, b.segments());
479        assert_eq!(6, b.remaining());
480        assert_eq!(b" ", b.chunk());
481    }
482
483    #[test]
484    fn advance_across() {
485        let mut b = segmented();
486        b.advance(7);
487        assert_eq!(1, b.segments());
488        assert_eq!(4, b.remaining());
489        assert_eq!(b"orld", b.chunk());
490    }
491
492    #[test]
493    fn empty_at_border() {
494        let mut b = segmented();
495        b.advance(6);
496        assert_eq!(1, b.segments());
497        assert_eq!(5, b.remaining());
498        assert_eq!(b"World", b.chunk());
499    }
500
501    #[test]
502    fn empty_bufs() {
503        fn is_empty(b: &SegmentedBuf<Bytes>) {
504            assert_eq!(0, b.segments());
505            assert_eq!(0, b.remaining());
506            assert_eq!(b"", b.chunk());
507        }
508
509        is_empty(&vec![].into());
510        is_empty(&vec![Bytes::new(), Bytes::new()].into());
511        is_empty(&vec![Bytes::new(), Bytes::new()].into_iter().collect());
512
513        let mut b = SegmentedBuf::new();
514        is_empty(&b);
515        b.push(Bytes::new());
516        is_empty(&b);
517        b.extend(vec![Bytes::new(), Bytes::new()]);
518        is_empty(&b);
519    }
520
521    #[test]
522    fn sliced_hello() {
523        let mut buffers = [b"Hello" as &[_], b"", b" ", b"", b"World"];
524        let buf = SegmentedSlice::new(&mut buffers);
525
526        assert_eq!(11, buf.remaining());
527        assert_eq!(b"Hello", buf.chunk());
528
529        let mut out = String::new();
530        buf.reader()
531            .read_to_string(&mut out)
532            .expect("Doesn't cause IO errors");
533        assert_eq!("Hello World", out);
534    }
535
536    #[test]
537    fn chunk_vectored() {
538        let mut b = segmented();
539        assert_eq!(b.chunks_vectored(&mut []), 0);
540        let mut slices = [IoSlice::new(&[]); 5];
541        assert_eq!(b.segments(), 4);
542        assert_eq!(b.chunks_vectored(&mut slices), 3);
543        assert_eq!(&*slices[0], b"Hello");
544        assert_eq!(&*slices[1], b" ");
545        assert_eq!(&*slices[2], b"World");
546        b.advance(2);
547        let mut slices = [IoSlice::new(&[]); 1];
548        assert_eq!(b.chunks_vectored(&mut slices), 1);
549        assert_eq!(&*slices[0], b"llo");
550    }
551
552    #[test]
553    fn chunk_vectored_nested() {
554        let mut bufs = [segmented(), segmented()];
555        let mut bufs = SegmentedSlice::new(&mut bufs);
556        let mut slices = [IoSlice::new(&[]); 10];
557        assert_eq!(bufs.chunks_vectored(&mut slices), 6);
558        assert_eq!(&*slices[0], b"Hello");
559        assert_eq!(&*slices[1], b" ");
560        assert_eq!(&*slices[2], b"World");
561        assert_eq!(&*slices[3], b"Hello");
562        assert_eq!(&*slices[4], b" ");
563        assert_eq!(&*slices[5], b"World");
564        bufs.advance(2);
565        let mut slices = [IoSlice::new(&[]); 1];
566        assert_eq!(bufs.chunks_vectored(&mut slices), 1);
567        assert_eq!(&*slices[0], b"llo");
568    }
569
570    proptest! {
571        #[test]
572        fn random(bufs: Vec<Vec<u8>>, splits in proptest::collection::vec(0..10usize, 1..10)) {
573            let concat: Vec<u8> = bufs.iter().flat_map(|b| b.iter()).copied().collect();
574            let mut segmented = bufs.iter()
575                .map(|b| &b[..])
576                .collect::<SegmentedBuf<_>>();
577            assert_eq!(concat.len(), segmented.remaining());
578            assert!(segmented.segments() <= bufs.len());
579            assert!(concat.starts_with(segmented.chunk()));
580            let mut bytes = segmented.clone().copy_to_bytes(segmented.remaining());
581            assert_eq!(&concat[..], &bytes[..]);
582            let mut sliced = bufs.iter().map(Deref::deref).collect::<Vec<&[u8]>>();
583            let mut sliced = SegmentedSlice::new(&mut sliced);
584
585            let mut fifo = SegmentedBuf::new();
586            let mut buf_pos = bufs.iter();
587
588            for split in splits {
589                if !bytes.has_remaining() {
590                    break;
591                }
592                let split = cmp::min(bytes.remaining(), split);
593                while fifo.remaining() < split {
594                    fifo.push(&buf_pos.next().unwrap()[..]);
595                }
596                let c1 = bytes.copy_to_bytes(split);
597                let c2 = segmented.copy_to_bytes(split);
598                let c3 = sliced.copy_to_bytes(split);
599                assert_eq!(c1, c2);
600                assert_eq!(c1, c3);
601                assert_eq!(bytes.remaining(), segmented.remaining());
602                assert_eq!(bytes.remaining(), sliced.remaining());
603            }
604        }
605    }
606}