mz_ore/
bytes.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License in the LICENSE file at the
6// root of this repository, or online at
7//
8//     http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16//! One bytes type to rule them all!
17//!
18//! TODO(parkertimmerman): Ideally we don't implement this "bytes type" on our own
19//! and use something else, e.g. `SegmentedBuf` from the `bytes-utils` crate. Currently
20//! that type, nor anything else, implement std::io::Read and std::io::Seek, which
21//! we need. We have an open issue with the `bytes-utils` crate, <https://github.com/vorner/bytes-utils/issues/16>
22//! to add these trait impls.
23//!
24
25#[cfg(feature = "parquet")]
26use std::io::Seek;
27
28use bytes::{Buf, BufMut, Bytes, BytesMut};
29use internal::SegmentedReader;
30#[cfg(feature = "parquet")]
31use parquet::errors::ParquetError;
32use smallvec::SmallVec;
33
34#[cfg(feature = "parquet")]
35use crate::cast::CastFrom;
36
37/// A cheaply clonable collection of possibly non-contiguous bytes.
38///
39/// `Vec<u8>` or `Bytes` are contiguous chunks of memory, which are fast (e.g. better cache
40/// locality) and easier to work with (e.g. can take a slice), but can cause problems (e.g.
41/// memory fragmentation) if you try to allocate a single very large chunk. Depending on the
42/// application, you probably don't need a contiguous chunk of memory, just a way to store and
43/// iterate over a collection of bytes.
44///
45/// Note: [`SegmentedBytes`] is generic over a `const N: usize`. Internally we use a
46/// [`smallvec::SmallVec`] to store our [`Bytes`] segments, and `N` is how many `Bytes` we'll
47/// store inline before spilling to the heap. We default `N = 1`, so in the case of a single
48/// `Bytes` segment, we avoid one layer of indirection.
49#[derive(Clone, Debug, PartialEq, Eq)]
50pub struct SegmentedBytes<const N: usize = 1> {
51    /// Collection of non-contiguous segments, each segment is guaranteed to be non-empty.
52    segments: SmallVec<[(Bytes, Padding); N]>,
53    /// Pre-computed length of all the segments.
54    len: usize,
55}
56
57/// We add [`Padding`] to segments to prevent needing to re-allocate our
58/// collection when creating a [`SegmentedReader`].
59type Padding = usize;
60
61/// Default value used for [`Padding`].
62const PADDING_DEFAULT: usize = 0;
63
64impl Default for SegmentedBytes {
65    fn default() -> Self {
66        SegmentedBytes {
67            segments: SmallVec::new(),
68            len: 0,
69        }
70    }
71}
72
73impl SegmentedBytes {
74    /// Creates a new empty [`SegmentedBytes`], reserving space inline for `N` **segments**.
75    ///
76    /// Note: If you don't know how many segments you have, you should use [`SegmentedBytes::default`].
77    pub fn new<const N: usize>() -> SegmentedBytes<N> {
78        SegmentedBytes {
79            segments: SmallVec::new(),
80            len: 0,
81        }
82    }
83}
84
85impl<const N: usize> SegmentedBytes<N> {
86    /// Creates a new empty [`SegmentedBytes`] with space for `capacity` **segments**.
87    pub fn with_capacity(capacity: usize) -> SegmentedBytes<N> {
88        SegmentedBytes {
89            segments: SmallVec::with_capacity(capacity),
90            len: 0,
91        }
92    }
93
94    /// Returns the number of bytes contained in this [`SegmentedBytes`].
95    pub fn len(&self) -> usize {
96        self.len
97    }
98
99    /// Returns if this [`SegmentedBytes`] is empty.
100    pub fn is_empty(&self) -> bool {
101        self.len() == 0
102    }
103
104    /// Consumes `self` returning an [`Iterator`] over all of the non-contiguous segments
105    /// that make up this buffer.
106    pub fn into_segments(self) -> impl Iterator<Item = Bytes> {
107        self.segments.into_iter().map(|(bytes, _len)| bytes)
108    }
109
110    /// Copies all of the bytes from `self` returning one contiguous blob.
111    pub fn into_contiguous(mut self) -> Vec<u8> {
112        self.copy_to_bytes(self.remaining()).into()
113    }
114
115    /// Extends the buffer by one more segment of [`Bytes`].
116    ///
117    /// If the provided [`Bytes`] is empty, we skip appending it.
118    #[inline]
119    pub fn push<B: Into<Bytes>>(&mut self, b: B) {
120        let b: Bytes = b.into();
121        if !b.is_empty() {
122            self.len += b.len();
123            self.segments.push((b, PADDING_DEFAULT));
124        }
125    }
126
127    /// Consumes `self` returning a type that implements [`io::Read`] and [`io::Seek`].
128    ///
129    /// Note: [`Clone`]-ing a [`SegmentedBytes`] is cheap, so if you need to retain the original
130    /// [`SegmentedBytes`] you should clone it.
131    ///
132    /// [`io::Read`]: std::io::Read
133    /// [`io::Seek`]: std::io::Seek
134    pub fn reader(self) -> SegmentedReader<N> {
135        SegmentedReader::new(self)
136    }
137}
138
139impl<const N: usize> Buf for SegmentedBytes<N> {
140    fn remaining(&self) -> usize {
141        self.len()
142    }
143
144    fn chunk(&self) -> &[u8] {
145        // Return the first non-empty segment.
146        self.segments
147            .iter()
148            .filter(|(c, _len)| !c.is_empty())
149            .map(|(c, _len)| Buf::chunk(c))
150            .next()
151            .unwrap_or_default()
152    }
153
154    fn advance(&mut self, mut cnt: usize) {
155        assert!(cnt <= self.len, "Advance past the end of buffer");
156        self.len -= cnt;
157
158        while cnt > 0 {
159            if let Some((seg, _len)) = self.segments.first_mut() {
160                if seg.remaining() > cnt {
161                    seg.advance(cnt);
162                    // We advanced `cnt` bytes, so no more need to advance.
163                    cnt = 0;
164                } else {
165                    // Remove the whole first buffer.
166                    cnt = cnt.saturating_sub(seg.remaining());
167                    self.segments.remove(0);
168                }
169            }
170        }
171    }
172
173    fn copy_to_bytes(&mut self, len: usize) -> Bytes {
174        // If possible, use the zero-copy implementation on individual segments.
175        if let Some((seg, _len)) = self.segments.first_mut() {
176            if len <= seg.len() {
177                self.len -= len;
178                return seg.copy_to_bytes(len);
179            }
180        }
181        // Otherwise, fall back to a generic implementation.
182        assert!(
183            len <= self.len(),
184            "tried to copy {len} bytes with {} remaining",
185            self.len()
186        );
187        let mut out = BytesMut::with_capacity(len);
188        out.put(self.take(len));
189        out.freeze()
190    }
191}
192
193#[cfg(feature = "parquet")]
194impl parquet::file::reader::Length for SegmentedBytes {
195    fn len(&self) -> u64 {
196        u64::cast_from(self.len)
197    }
198}
199
200#[cfg(feature = "parquet")]
201impl parquet::file::reader::ChunkReader for SegmentedBytes {
202    type T = internal::SegmentedReader;
203
204    fn get_read(&self, start: u64) -> parquet::errors::Result<Self::T> {
205        let mut reader = self.clone().reader();
206        reader.seek(std::io::SeekFrom::Start(start))?;
207        Ok(reader)
208    }
209
210    fn get_bytes(&self, start: u64, length: usize) -> parquet::errors::Result<Bytes> {
211        let start = usize::cast_from(start);
212        let mut buf = self.clone();
213        if start > buf.remaining() {
214            return Err(ParquetError::EOF(format!(
215                "seeking {start} bytes ahead, but only {} remaining",
216                buf.remaining()
217            )));
218        }
219        buf.advance(start);
220        if length > buf.remaining() {
221            return Err(ParquetError::EOF(format!(
222                "copying {length} bytes, but only {} remaining",
223                buf.remaining()
224            )));
225        }
226        let bytes = buf.copy_to_bytes(length);
227        Ok(bytes)
228    }
229}
230
231impl From<Bytes> for SegmentedBytes {
232    fn from(value: Bytes) -> Self {
233        let mut s = SegmentedBytes::default();
234        s.push(value);
235        s
236    }
237}
238
239impl From<Vec<u8>> for SegmentedBytes {
240    fn from(value: Vec<u8>) -> Self {
241        let b = Bytes::from(value);
242        SegmentedBytes::from(b)
243    }
244}
245
246impl From<Vec<Bytes>> for SegmentedBytes {
247    fn from(value: Vec<Bytes>) -> Self {
248        let mut s = SegmentedBytes::with_capacity(value.len());
249        for segment in value {
250            s.push(segment);
251        }
252        s
253    }
254}
255
256impl<const N: usize> FromIterator<Bytes> for SegmentedBytes<N> {
257    fn from_iter<T: IntoIterator<Item = Bytes>>(iter: T) -> Self {
258        let mut s = SegmentedBytes::new();
259        for segment in iter {
260            s.push(segment);
261        }
262        s
263    }
264}
265
266impl<const N: usize> FromIterator<Vec<u8>> for SegmentedBytes<N> {
267    fn from_iter<T: IntoIterator<Item = Vec<u8>>>(iter: T) -> Self {
268        iter.into_iter().map(Bytes::from).collect()
269    }
270}
271
272mod internal {
273    use std::io;
274
275    use smallvec::SmallVec;
276
277    use crate::bytes::{Bytes, SegmentedBytes};
278    use crate::cast::CastFrom;
279
280    /// Provides efficient reading and seeking across a collection of segmented bytes.
281    #[derive(Debug)]
282    pub struct SegmentedReader<const N: usize = 1> {
283        segments: SmallVec<[(Bytes, usize); N]>,
284        /// Total length of all segments.
285        len: usize,
286        // Overall byte position we're currently pointing at.
287        overall_ptr: usize,
288        /// Current segement that we'd read from.
289        segment_ptr: usize,
290    }
291
292    impl<const N: usize> SegmentedReader<N> {
293        pub fn new(mut bytes: SegmentedBytes<N>) -> Self {
294            // Re-adjust our accumlated lengths.
295            //
296            // Note: `SegmentedBytes` could track the accumulated lengths, but
297            // it's complicated by the impl of `Buf::advance`.
298            let mut accum_length = 0;
299            for (segment, len) in &mut bytes.segments {
300                accum_length += segment.len();
301                *len = accum_length;
302            }
303
304            SegmentedReader {
305                segments: bytes.segments,
306                len: bytes.len,
307                overall_ptr: 0,
308                segment_ptr: 0,
309            }
310        }
311
312        /// The total number of bytes this [`SegmentedReader`] is mapped over.
313        pub fn len(&self) -> usize {
314            self.len
315        }
316
317        /// The current position of the internal cursor for this [`SegmentedReader`].
318        ///
319        /// Note: It's possible for the current position to be greater than the length,
320        /// as [`std::io::Seek`] allows you to seek past the end of the stream.
321        pub fn position(&self) -> usize {
322            self.overall_ptr
323        }
324    }
325
326    impl<const N: usize> io::Read for SegmentedReader<N> {
327        fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
328            // We've seeked past the end, return nothing.
329            if self.overall_ptr >= self.len {
330                return Ok(0);
331            }
332
333            let (segment, accum_length) = &self.segments[self.segment_ptr];
334
335            // How many bytes we have left in this segment.
336            let remaining_len = accum_length.checked_sub(self.overall_ptr).unwrap();
337            // Position within the segment to begin reading.
338            let segment_pos = segment.len().checked_sub(remaining_len).unwrap();
339
340            // How many bytes we'll read.
341            let len = core::cmp::min(remaining_len, buf.len());
342            // Copy bytes from the current segment into the buffer.
343            let segment_buf = &segment[..];
344            buf[..len].copy_from_slice(&segment_buf[segment_pos..segment_pos + len]);
345
346            // Advance our pointers.
347            self.overall_ptr += len;
348
349            // Advance to the next segment if we've reached the end of the current.
350            if self.overall_ptr == *accum_length {
351                self.segment_ptr += 1;
352            }
353
354            Ok(len)
355        }
356    }
357
358    impl<const N: usize> io::Seek for SegmentedReader<N> {
359        fn seek(&mut self, pos: io::SeekFrom) -> io::Result<u64> {
360            use io::SeekFrom;
361
362            // Get an offset from the start.
363            let maybe_offset = match pos {
364                SeekFrom::Start(n) => Some(usize::cast_from(n)),
365                SeekFrom::End(n) => {
366                    let n = isize::cast_from(n);
367                    self.len().checked_add_signed(n)
368                }
369                SeekFrom::Current(n) => {
370                    let n = isize::cast_from(n);
371                    self.overall_ptr.checked_add_signed(n)
372                }
373            };
374
375            // Check for integer overflow, but we don't check our bounds!
376            //
377            // The contract for io::Seek denotes that seeking beyond the end
378            // of the stream is allowed. If we're beyond the end of the stream
379            // then we won't read back any bytes, but it won't be an error.
380            let offset = maybe_offset.ok_or_else(|| {
381                io::Error::new(
382                    io::ErrorKind::InvalidInput,
383                    "Invalid seek to an overflowing position",
384                )
385            })?;
386
387            // Special case we want to be fast, seeking back to the beginning.
388            if offset == 0 {
389                self.overall_ptr = 0;
390                self.segment_ptr = 0;
391
392                return Ok(u64::cast_from(offset));
393            }
394
395            // Seek through our segments until we get to the correct offset.
396            let result = self
397                .segments
398                .binary_search_by(|(_s, accum_len)| accum_len.cmp(&offset));
399
400            self.segment_ptr = match result {
401                Ok(segment_ptr) => segment_ptr + 1,
402                Err(segment_ptr) => segment_ptr,
403            };
404            self.overall_ptr = offset;
405
406            Ok(u64::cast_from(offset))
407        }
408    }
409}
410
411#[cfg(test)]
412mod tests {
413    use std::io::{Read, Seek, SeekFrom};
414
415    use bytes::{Buf, Bytes};
416    use proptest::prelude::*;
417
418    use super::SegmentedBytes;
419    use crate::cast::CastFrom;
420
421    #[crate::test]
422    fn test_empty() {
423        let s = SegmentedBytes::default();
424
425        // We should report as empty.
426        assert!(s.is_empty());
427        assert_eq!(s.len(), 0);
428
429        // An iterator of segments should return nothing.
430        let mut i = s.clone().into_segments();
431        assert_eq!(i.next(), None);
432
433        // bytes::Buf shouldn't report anything as remaining.
434        assert_eq!(s.remaining(), 0);
435        // We should get back an empty chunk if we try to read anything.
436        assert!(s.chunk().is_empty());
437
438        // Turn ourselves into a type that impls io::Read.
439        let mut reader = s.reader();
440
441        // We shouldn't panic, but we shouldn't get back any bytes.
442        let mut buf = Vec::new();
443        let bytes_read = reader.read(&mut buf[..]).unwrap();
444        assert_eq!(bytes_read, 0);
445
446        // We should be able to seek past the end without panicking.
447        reader.seek(SeekFrom::Current(20)).unwrap();
448        let bytes_read = reader.read(&mut buf[..]).unwrap();
449        assert_eq!(bytes_read, 0);
450    }
451
452    #[crate::test]
453    fn test_bytes_buf() {
454        let mut s = SegmentedBytes::from(vec![0, 1, 2, 3, 4, 5, 6, 7]);
455
456        assert_eq!(s.len(), 8);
457        assert_eq!(s.len(), s.remaining());
458        assert_eq!(s.chunk(), &[0, 1, 2, 3, 4, 5, 6, 7]);
459
460        // Advance far into the buffer.
461        s.advance(6);
462        assert_eq!(s.len(), 2);
463        assert_eq!(s.len(), s.remaining());
464        assert_eq!(s.chunk(), &[6, 7]);
465    }
466
467    #[crate::test]
468    fn test_bytes_buf_multi() {
469        let segments = vec![vec![0, 1, 2, 3], vec![4, 5, 6, 7], vec![8, 9, 10, 11]];
470        let mut s: SegmentedBytes<2> = segments.into_iter().collect();
471
472        assert_eq!(s.len(), 12);
473        assert_eq!(s.len(), s.remaining());
474
475        // Chunk should return the entirety of the first segment.
476        assert_eq!(s.chunk(), &[0, 1, 2, 3]);
477
478        // Advance into the middle segment
479        s.advance(6);
480        assert_eq!(s.len(), 6);
481        assert_eq!(s.len(), s.remaining());
482
483        // Chunk should return the rest of the second segment.
484        assert_eq!(s.chunk(), &[6, 7]);
485
486        // Read across two segments.
487        let x = s.get_u32();
488        assert_eq!(x, u32::from_be_bytes([6, 7, 8, 9]));
489
490        // Only two bytes remaining.
491        assert_eq!(s.len(), 2);
492        assert_eq!(s.len(), s.remaining());
493
494        let mut s = s.chain(&[12, 13, 14, 15][..]);
495
496        // Should have 6 bytes total now.
497        assert_eq!(s.remaining(), 6);
498        // We'll read out the last two bytes from the last segment.
499        assert_eq!(s.chunk(), &[10, 11]);
500
501        // Advance into the chained segment.
502        s.advance(3);
503        // Read the remaining bytes.
504        assert_eq!(s.chunk(), &[13, 14, 15]);
505    }
506
507    #[crate::test]
508    fn test_io_read() {
509        let s = SegmentedBytes::from(vec![0, 1, 2, 3, 4, 5, 6, 7]);
510        let mut reader = s.reader();
511
512        assert_eq!(reader.len(), 8);
513        assert_eq!(reader.position(), 0);
514
515        // Read a small amount.
516        let mut buf = [0; 4];
517        let bytes_read = reader.read(&mut buf).unwrap();
518        assert_eq!(bytes_read, 4);
519        assert_eq!(buf, [0, 1, 2, 3]);
520
521        // We should still report our original length.
522        assert_eq!(reader.len(), 8);
523        // But our position has moved.
524        assert_eq!(reader.position(), 4);
525
526        // We can seek forwards and read bytes.
527        reader.seek(SeekFrom::Current(1)).unwrap();
528        let bytes_read = reader.read(&mut buf).unwrap();
529        assert_eq!(bytes_read, 3);
530        assert_eq!(buf, [5, 6, 7, 3]);
531
532        assert_eq!(reader.len(), 8);
533        // We've read to the end!
534        assert_eq!(reader.position(), 8);
535
536        // We shouldn't read any bytes
537        let bytes_read = reader.read(&mut buf).unwrap();
538        assert_eq!(bytes_read, 0);
539        // Buffer shouldn't change from what it was last.
540        assert_eq!(buf, [5, 6, 7, 3]);
541
542        // Seek backwards and re-read bytes.
543        reader.seek(SeekFrom::Start(2)).unwrap();
544        let bytes_read = reader.read(&mut buf).unwrap();
545        assert_eq!(bytes_read, 4);
546        assert_eq!(buf, [2, 3, 4, 5]);
547    }
548
549    #[crate::test]
550    fn test_io_read_multi() {
551        let segments = vec![vec![0, 1, 2, 3], vec![4, 5, 6, 7, 8, 9], vec![10, 11]];
552        let s: SegmentedBytes<2> = segments.into_iter().collect();
553        let mut reader = s.reader();
554
555        assert_eq!(reader.len(), 12);
556        assert_eq!(reader.position(), 0);
557
558        // Read up to the first segment.
559        let mut buf = [0; 6];
560        let bytes_read = reader.read(&mut buf).unwrap();
561        assert_eq!(bytes_read, 4);
562        assert_eq!(buf, [0, 1, 2, 3, 0, 0]);
563
564        // Read 5 bytes, which should come from the second segment.
565        let bytes_read = reader.read(&mut buf[1..]).unwrap();
566        assert_eq!(bytes_read, 5);
567        assert_eq!(buf, [0, 4, 5, 6, 7, 8]);
568
569        // Seek backwards to the middle of the first segment.
570        reader.seek(SeekFrom::Start(2)).unwrap();
571        let bytes_read = reader.read(&mut buf).unwrap();
572        assert_eq!(bytes_read, 2);
573        assert_eq!(buf, [2, 3, 5, 6, 7, 8]);
574
575        // Seek past the end.
576        reader.seek(SeekFrom::Start(1000)).unwrap();
577        let bytes_read = reader.read(&mut buf).unwrap();
578        assert_eq!(bytes_read, 0);
579
580        assert_eq!(reader.len(), 12);
581        assert_eq!(reader.position(), 1000);
582
583        // Seek back to the middle.
584        reader.seek(SeekFrom::Start(6)).unwrap();
585        // Read the entire bufffer.
586        let mut buf = Vec::new();
587        let bytes_read = reader.read_to_end(&mut buf).unwrap();
588        assert_eq!(bytes_read, 6);
589        assert_eq!(buf, &[6, 7, 8, 9, 10, 11]);
590    }
591
592    #[crate::test]
593    fn test_multi() {
594        let segments = vec![vec![0, 1, 2, 3], vec![4, 5, 6, 7, 8, 9], vec![10, 11]];
595        let mut s: SegmentedBytes<2> = segments.into_iter().collect();
596
597        assert_eq!(s.len(), 12);
598        assert_eq!(s.remaining(), 12);
599
600        // Read the first chunk.
601        assert_eq!(s.chunk(), [0, 1, 2, 3]);
602
603        // Advance to the middle.
604        s.advance(6);
605        assert_eq!(s.remaining(), 6);
606
607        // Convert to a reader.
608        let mut reader = s.reader();
609        // We should be at the beginning, and only see the remaining 6 bytes.
610        assert_eq!(reader.len(), 6);
611        assert_eq!(reader.position(), 0);
612
613        // Read to the end of the second segment.
614        let mut buf = [0; 8];
615        let bytes_read = reader.read(&mut buf).unwrap();
616        assert_eq!(bytes_read, 4);
617        assert_eq!(buf, [6, 7, 8, 9, 0, 0, 0, 0]);
618
619        // Read again to get the final segment.
620        let bytes_read = reader.read(&mut buf[4..]).unwrap();
621        assert_eq!(bytes_read, 2);
622        assert_eq!(buf, [6, 7, 8, 9, 10, 11, 0, 0]);
623
624        // Seek back to the beginning.
625        reader.seek(SeekFrom::Start(0)).unwrap();
626        // Read everything.
627        reader.read_exact(&mut buf[..6]).unwrap();
628        assert_eq!(buf, [6, 7, 8, 9, 10, 11, 0, 0]);
629    }
630
631    #[crate::test]
632    fn test_single_empty_segment() {
633        let s = SegmentedBytes::from(Vec::<u8>::new());
634
635        // Everything should comeback empty.
636        assert_eq!(s.len(), 0);
637        assert_eq!(s.remaining(), 0);
638        assert!(s.chunk().is_empty());
639
640        let mut reader = s.reader();
641
642        // Reading shouldn't fail, but it also shouldn't yield any bytes.
643        let mut buf = [0; 4];
644        let bytes_read = reader.read(&mut buf).unwrap();
645        assert_eq!(bytes_read, 0);
646        assert_eq!(buf, [0, 0, 0, 0]);
647    }
648
649    #[crate::test]
650    fn test_middle_segment_empty() {
651        let segments = vec![vec![1, 2], vec![], vec![3, 4, 5, 6]];
652        let mut s: SegmentedBytes = segments.clone().into_iter().collect();
653
654        assert_eq!(s.len(), 6);
655        assert_eq!(s.remaining(), 6);
656
657        // Read and advanced past the first chunk.
658        let first_chunk = s.chunk();
659        assert_eq!(first_chunk, [1, 2]);
660        s.advance(first_chunk.len());
661
662        assert_eq!(s.remaining(), 4);
663
664        // We should skip the empty segment and continue to the next.
665        let second_chunk = s.chunk();
666        assert_eq!(second_chunk, [3, 4, 5, 6]);
667
668        // Recreate SegmentedBytes.
669        let s: SegmentedBytes = segments.into_iter().collect();
670        let mut reader = s.reader();
671
672        // We should be able to read the first chunk.
673        let mut buf = [0; 4];
674        let bytes_read = reader.read(&mut buf).unwrap();
675        assert_eq!(bytes_read, 2);
676        assert_eq!(buf, [1, 2, 0, 0]);
677
678        // And we should be able to read the second chunk without issue.
679        let bytes_read = reader.read(&mut buf).unwrap();
680        assert_eq!(bytes_read, 4);
681        assert_eq!(buf, [3, 4, 5, 6]);
682
683        // Seek backwards and read again.
684        reader.seek(SeekFrom::Current(-2)).unwrap();
685        let bytes_read = reader.read(&mut buf).unwrap();
686        assert_eq!(bytes_read, 2);
687        assert_eq!(buf, [5, 6, 5, 6]);
688    }
689
690    #[crate::test]
691    fn test_last_segment_empty() {
692        let segments = vec![vec![1, 2], vec![3, 4, 5, 6], vec![]];
693        let mut s: SegmentedBytes = segments.clone().into_iter().collect();
694
695        assert_eq!(s.len(), 6);
696        assert_eq!(s.remaining(), 6);
697
698        // Read and advanced past the first chunk.
699        let first_chunk = s.chunk();
700        assert_eq!(first_chunk, [1, 2]);
701        s.advance(first_chunk.len());
702
703        assert_eq!(s.remaining(), 4);
704
705        // Read and advance past the second chunk.
706        let second_chunk = s.chunk();
707        assert_eq!(second_chunk, [3, 4, 5, 6]);
708        s.advance(second_chunk.len());
709
710        // No bytes should remain.
711        assert_eq!(s.remaining(), 0);
712        assert!(s.chunk().is_empty());
713
714        // Recreate SegmentedBytes.
715        let s: SegmentedBytes = segments.into_iter().collect();
716        let mut reader = s.reader();
717
718        // We should be able to read the first chunk.
719        let mut buf = [0; 4];
720        let bytes_read = reader.read(&mut buf).unwrap();
721        assert_eq!(bytes_read, 2);
722        assert_eq!(buf, [1, 2, 0, 0]);
723
724        // And we should be able to read the second chunk without issue.
725        let bytes_read = reader.read(&mut buf).unwrap();
726        assert_eq!(bytes_read, 4);
727        assert_eq!(buf, [3, 4, 5, 6]);
728
729        // Reading again shouldn't provide any bytes.
730        let bytes_read = reader.read(&mut buf).unwrap();
731        assert_eq!(bytes_read, 0);
732        // Buffer shouldn't change.
733        assert_eq!(buf, [3, 4, 5, 6]);
734
735        // Seek backwards and read again.
736        reader.seek(SeekFrom::Current(-2)).unwrap();
737        let bytes_read = reader.read(&mut buf).unwrap();
738        assert_eq!(bytes_read, 2);
739        assert_eq!(buf, [5, 6, 5, 6]);
740    }
741
742    #[crate::test]
743    #[cfg_attr(miri, ignore)] // slow
744    fn proptest_copy_to_bytes() {
745        fn test(segments: Vec<Vec<u8>>, num_bytes: usize) {
746            let contiguous: Vec<u8> = segments.clone().into_iter().flatten().collect();
747            let mut contiguous = Bytes::from(contiguous);
748            let mut segmented: SegmentedBytes = segments.into_iter().map(Bytes::from).collect();
749
750            // Cap num_bytes at the size of contiguous.
751            let num_bytes = contiguous.len() % num_bytes;
752            let remaining = contiguous.len() - num_bytes;
753
754            let copied_c = contiguous.copy_to_bytes(num_bytes);
755            let copied_s = segmented.copy_to_bytes(num_bytes);
756
757            assert_eq!(copied_c, copied_s);
758
759            let copied_c = contiguous.copy_to_bytes(remaining);
760            let copied_s = segmented.copy_to_bytes(remaining);
761
762            assert_eq!(copied_c, copied_s);
763        }
764
765        proptest!(|(segments in any::<Vec<Vec<u8>>>(), num_bytes in any::<usize>())| {
766            test(segments, num_bytes);
767        })
768    }
769
770    #[crate::test]
771    #[cfg_attr(miri, ignore)] // slow
772    fn proptest_read_to_end() {
773        fn test(segments: Vec<Vec<u8>>) {
774            let contiguous: Vec<u8> = segments.clone().into_iter().flatten().collect();
775            let contiguous = Bytes::from(contiguous);
776            let segmented: SegmentedBytes = segments.into_iter().map(Bytes::from).collect();
777
778            let mut reader_c = contiguous.reader();
779            let mut reader_s = segmented.reader();
780
781            let mut buf_c = Vec::new();
782            reader_c.read_to_end(&mut buf_c).unwrap();
783
784            let mut buf_s = Vec::new();
785            reader_s.read_to_end(&mut buf_s).unwrap();
786
787            assert_eq!(buf_s, buf_s);
788        }
789
790        proptest!(|(segments in any::<Vec<Vec<u8>>>())| {
791            test(segments);
792        })
793    }
794
795    #[crate::test]
796    #[cfg_attr(miri, ignore)] // slow
797    fn proptest_read_and_seek() {
798        fn test(segments: Vec<Vec<u8>>, from_start: u64, from_current: i64, from_end: i64) {
799            let contiguous: Vec<u8> = segments.clone().into_iter().flatten().collect();
800            let total_len = contiguous.len();
801            let contiguous = std::io::Cursor::new(&contiguous[..]);
802            let segmented: SegmentedBytes = segments.into_iter().map(Bytes::from).collect();
803
804            let mut reader_c = contiguous;
805            let mut reader_s = segmented.reader();
806
807            let mut buf_c = Vec::new();
808            let mut buf_s = Vec::new();
809
810            // Seek from the start.
811
812            let from_start = from_start % (u64::cast_from(total_len).max(1));
813            reader_c.seek(SeekFrom::Start(from_start)).unwrap();
814            reader_s.seek(SeekFrom::Start(from_start)).unwrap();
815
816            reader_c.read_to_end(&mut buf_c).unwrap();
817            reader_s.read_to_end(&mut buf_s).unwrap();
818
819            assert_eq!(&buf_c, &buf_s);
820            buf_c.clear();
821            buf_s.clear();
822
823            // Seek from the current position.
824
825            let from_current = from_current % i64::try_from(total_len).unwrap().max(1);
826            reader_c.seek(SeekFrom::Current(from_current)).unwrap();
827            reader_s.seek(SeekFrom::Current(from_current)).unwrap();
828
829            reader_c.read_to_end(&mut buf_c).unwrap();
830            reader_s.read_to_end(&mut buf_s).unwrap();
831
832            assert_eq!(&buf_c, &buf_s);
833            buf_c.clear();
834            buf_s.clear();
835
836            // Seek from the end.
837
838            let from_end = from_end % i64::try_from(total_len).unwrap().max(1);
839            reader_c.seek(SeekFrom::End(from_end)).unwrap();
840            reader_s.seek(SeekFrom::End(from_end)).unwrap();
841
842            reader_c.read_to_end(&mut buf_c).unwrap();
843            reader_s.read_to_end(&mut buf_s).unwrap();
844
845            assert_eq!(&buf_c, &buf_s);
846            buf_c.clear();
847            buf_s.clear();
848        }
849
850        proptest!(|(segments in any::<Vec<Vec<u8>>>(), s in any::<u64>(), c in any::<i64>(), e in any::<i64>())| {
851            test(segments, s, c, e);
852        })
853    }
854
855    #[crate::test]
856    #[cfg_attr(miri, ignore)] // slow
857    fn proptest_non_empty_segments() {
858        fn test(segments: Vec<Vec<u8>>) {
859            // Vec
860            let segment = segments.first().unwrap_or(&Vec::default()).clone();
861            let s = SegmentedBytes::from(segment.clone());
862            assert!(s.into_segments().all(|segment| !segment.is_empty()));
863
864            // Bytes
865            let bytes = Bytes::from(segment.clone());
866            let s = SegmentedBytes::from(bytes);
867            assert!(s.into_segments().all(|segment| !segment.is_empty()));
868
869            // SegmentedBytes::push
870            let mut s = SegmentedBytes::default();
871            s.push(Bytes::from(segment));
872            assert!(s.into_segments().all(|segment| !segment.is_empty()));
873
874            // Vec<Vec<u8>>
875            let s: SegmentedBytes = segments.clone().into_iter().collect();
876            assert!(s.into_segments().all(|segment| !segment.is_empty()));
877
878            // Vec<Bytes>
879            let s: SegmentedBytes = segments.clone().into_iter().map(Bytes::from).collect();
880            assert!(s.into_segments().all(|segment| !segment.is_empty()));
881
882            // Vec<Bytes>
883            let segments: Vec<_> = segments.into_iter().map(Bytes::from).collect();
884            let s = SegmentedBytes::from(segments);
885            assert!(s.into_segments().all(|segment| !segment.is_empty()));
886        }
887
888        proptest!(|(segments in any::<Vec<Vec<u8>>>())| {
889            test(segments);
890        })
891    }
892}