mz_ore/
bytes.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License in the LICENSE file at the
6// root of this repository, or online at
7//
8//     http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16//! One bytes type to rule them all!
17//!
18//! TODO(parkertimmerman): Ideally we don't implement this "bytes type" on our own
19//! and use something else, e.g. `SegmentedBuf` from the `bytes-utils` crate. Currently
20//! that type, nor anything else, implement std::io::Read and std::io::Seek, which
21//! we need. We have an open issue with the `bytes-utils` crate, <https://github.com/vorner/bytes-utils/issues/16>
22//! to add these trait impls.
23//!
24
25#[cfg(feature = "parquet")]
26use std::io::Seek;
27
28use bytes::{Buf, BufMut, Bytes, BytesMut};
29use internal::SegmentedReader;
30#[cfg(feature = "parquet")]
31use parquet::errors::ParquetError;
32use serde::{Deserialize, Serialize};
33use smallvec::SmallVec;
34
35#[cfg(feature = "parquet")]
36use crate::cast::CastFrom;
37
38/// A cheaply clonable collection of possibly non-contiguous bytes.
39///
40/// `Vec<u8>` or `Bytes` are contiguous chunks of memory, which are fast (e.g. better cache
41/// locality) and easier to work with (e.g. can take a slice), but can cause problems (e.g.
42/// memory fragmentation) if you try to allocate a single very large chunk. Depending on the
43/// application, you probably don't need a contiguous chunk of memory, just a way to store and
44/// iterate over a collection of bytes.
45///
46/// Note: [`SegmentedBytes`] is generic over a `const N: usize`. Internally we use a
47/// [`smallvec::SmallVec`] to store our [`Bytes`] segments, and `N` is how many `Bytes` we'll
48/// store inline before spilling to the heap. We default `N = 1`, so in the case of a single
49/// `Bytes` segment, we avoid one layer of indirection.
50#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
51pub struct SegmentedBytes<const N: usize = 1> {
52    /// Collection of non-contiguous segments, each segment is guaranteed to be non-empty.
53    segments: SmallVec<[(Bytes, Padding); N]>,
54    /// Pre-computed length of all the segments.
55    len: usize,
56}
57
58/// We add [`Padding`] to segments to prevent needing to re-allocate our
59/// collection when creating a [`SegmentedReader`].
60type Padding = usize;
61
62/// Default value used for [`Padding`].
63const PADDING_DEFAULT: usize = 0;
64
65impl Default for SegmentedBytes {
66    fn default() -> Self {
67        SegmentedBytes {
68            segments: SmallVec::new(),
69            len: 0,
70        }
71    }
72}
73
74impl SegmentedBytes {
75    /// Creates a new empty [`SegmentedBytes`], reserving space inline for `N` **segments**.
76    ///
77    /// Note: If you don't know how many segments you have, you should use [`SegmentedBytes::default`].
78    pub fn new<const N: usize>() -> SegmentedBytes<N> {
79        SegmentedBytes {
80            segments: SmallVec::new(),
81            len: 0,
82        }
83    }
84}
85
86impl<const N: usize> SegmentedBytes<N> {
87    /// Creates a new empty [`SegmentedBytes`] with space for `capacity` **segments**.
88    pub fn with_capacity(capacity: usize) -> SegmentedBytes<N> {
89        SegmentedBytes {
90            segments: SmallVec::with_capacity(capacity),
91            len: 0,
92        }
93    }
94
95    /// Returns the number of bytes contained in this [`SegmentedBytes`].
96    pub fn len(&self) -> usize {
97        self.len
98    }
99
100    /// Returns if this [`SegmentedBytes`] is empty.
101    pub fn is_empty(&self) -> bool {
102        self.len() == 0
103    }
104
105    /// Consumes `self` returning an [`Iterator`] over all of the non-contiguous segments
106    /// that make up this buffer.
107    pub fn into_segments(self) -> impl Iterator<Item = Bytes> {
108        self.segments.into_iter().map(|(bytes, _len)| bytes)
109    }
110
111    /// Copies all of the bytes from `self` returning one contiguous blob.
112    pub fn into_contiguous(mut self) -> Vec<u8> {
113        self.copy_to_bytes(self.remaining()).into()
114    }
115
116    /// Extends the buffer by one more segment of [`Bytes`].
117    ///
118    /// If the provided [`Bytes`] is empty, we skip appending it.
119    #[inline]
120    pub fn push<B: Into<Bytes>>(&mut self, b: B) {
121        let b: Bytes = b.into();
122        if !b.is_empty() {
123            self.len += b.len();
124            self.segments.push((b, PADDING_DEFAULT));
125        }
126    }
127
128    /// Consumes `self` returning a type that implements [`io::Read`] and [`io::Seek`].
129    ///
130    /// Note: [`Clone`]-ing a [`SegmentedBytes`] is cheap, so if you need to retain the original
131    /// [`SegmentedBytes`] you should clone it.
132    ///
133    /// [`io::Read`]: std::io::Read
134    /// [`io::Seek`]: std::io::Seek
135    pub fn reader(self) -> SegmentedReader<N> {
136        SegmentedReader::new(self)
137    }
138}
139
140impl<const N: usize> Buf for SegmentedBytes<N> {
141    fn remaining(&self) -> usize {
142        self.len()
143    }
144
145    fn chunk(&self) -> &[u8] {
146        // Return the first non-empty segment.
147        self.segments
148            .iter()
149            .filter(|(c, _len)| !c.is_empty())
150            .map(|(c, _len)| Buf::chunk(c))
151            .next()
152            .unwrap_or_default()
153    }
154
155    fn advance(&mut self, mut cnt: usize) {
156        assert!(cnt <= self.len, "Advance past the end of buffer");
157        self.len -= cnt;
158
159        while cnt > 0 {
160            if let Some((seg, _len)) = self.segments.first_mut() {
161                if seg.remaining() > cnt {
162                    seg.advance(cnt);
163                    // We advanced `cnt` bytes, so no more need to advance.
164                    cnt = 0;
165                } else {
166                    // Remove the whole first buffer.
167                    cnt = cnt.saturating_sub(seg.remaining());
168                    self.segments.remove(0);
169                }
170            }
171        }
172    }
173
174    fn copy_to_bytes(&mut self, len: usize) -> Bytes {
175        // If possible, use the zero-copy implementation on individual segments.
176        if let Some((seg, _len)) = self.segments.first_mut() {
177            if len <= seg.len() {
178                self.len -= len;
179                return seg.copy_to_bytes(len);
180            }
181        }
182        // Otherwise, fall back to a generic implementation.
183        assert!(
184            len <= self.len(),
185            "tried to copy {len} bytes with {} remaining",
186            self.len()
187        );
188        let mut out = BytesMut::with_capacity(len);
189        out.put(self.take(len));
190        out.freeze()
191    }
192}
193
194#[cfg(feature = "parquet")]
195impl parquet::file::reader::Length for SegmentedBytes {
196    fn len(&self) -> u64 {
197        u64::cast_from(self.len)
198    }
199}
200
201#[cfg(feature = "parquet")]
202impl parquet::file::reader::ChunkReader for SegmentedBytes {
203    type T = internal::SegmentedReader;
204
205    fn get_read(&self, start: u64) -> parquet::errors::Result<Self::T> {
206        let mut reader = self.clone().reader();
207        reader.seek(std::io::SeekFrom::Start(start))?;
208        Ok(reader)
209    }
210
211    fn get_bytes(&self, start: u64, length: usize) -> parquet::errors::Result<Bytes> {
212        let start = usize::cast_from(start);
213        let mut buf = self.clone();
214        if start > buf.remaining() {
215            return Err(ParquetError::EOF(format!(
216                "seeking {start} bytes ahead, but only {} remaining",
217                buf.remaining()
218            )));
219        }
220        buf.advance(start);
221        if length > buf.remaining() {
222            return Err(ParquetError::EOF(format!(
223                "copying {length} bytes, but only {} remaining",
224                buf.remaining()
225            )));
226        }
227        let bytes = buf.copy_to_bytes(length);
228        Ok(bytes)
229    }
230}
231
232impl From<Bytes> for SegmentedBytes {
233    fn from(value: Bytes) -> Self {
234        let mut s = SegmentedBytes::default();
235        s.push(value);
236        s
237    }
238}
239
240impl From<Vec<u8>> for SegmentedBytes {
241    fn from(value: Vec<u8>) -> Self {
242        let b = Bytes::from(value);
243        SegmentedBytes::from(b)
244    }
245}
246
247impl From<Vec<Bytes>> for SegmentedBytes {
248    fn from(value: Vec<Bytes>) -> Self {
249        let mut s = SegmentedBytes::with_capacity(value.len());
250        for segment in value {
251            s.push(segment);
252        }
253        s
254    }
255}
256
257impl<const N: usize> FromIterator<Bytes> for SegmentedBytes<N> {
258    fn from_iter<T: IntoIterator<Item = Bytes>>(iter: T) -> Self {
259        let mut s = SegmentedBytes::new();
260        for segment in iter {
261            s.push(segment);
262        }
263        s
264    }
265}
266
267impl<const N: usize> FromIterator<Vec<u8>> for SegmentedBytes<N> {
268    fn from_iter<T: IntoIterator<Item = Vec<u8>>>(iter: T) -> Self {
269        iter.into_iter().map(Bytes::from).collect()
270    }
271}
272
273mod internal {
274    use std::io;
275
276    use smallvec::SmallVec;
277
278    use crate::bytes::{Bytes, SegmentedBytes};
279    use crate::cast::CastFrom;
280
281    /// Provides efficient reading and seeking across a collection of segmented bytes.
282    #[derive(Debug)]
283    pub struct SegmentedReader<const N: usize = 1> {
284        segments: SmallVec<[(Bytes, usize); N]>,
285        /// Total length of all segments.
286        len: usize,
287        // Overall byte position we're currently pointing at.
288        overall_ptr: usize,
289        /// Current segement that we'd read from.
290        segment_ptr: usize,
291    }
292
293    impl<const N: usize> SegmentedReader<N> {
294        pub fn new(mut bytes: SegmentedBytes<N>) -> Self {
295            // Re-adjust our accumlated lengths.
296            //
297            // Note: `SegmentedBytes` could track the accumulated lengths, but
298            // it's complicated by the impl of `Buf::advance`.
299            let mut accum_length = 0;
300            for (segment, len) in &mut bytes.segments {
301                accum_length += segment.len();
302                *len = accum_length;
303            }
304
305            SegmentedReader {
306                segments: bytes.segments,
307                len: bytes.len,
308                overall_ptr: 0,
309                segment_ptr: 0,
310            }
311        }
312
313        /// The total number of bytes this [`SegmentedReader`] is mapped over.
314        pub fn len(&self) -> usize {
315            self.len
316        }
317
318        /// The current position of the internal cursor for this [`SegmentedReader`].
319        ///
320        /// Note: It's possible for the current position to be greater than the length,
321        /// as [`std::io::Seek`] allows you to seek past the end of the stream.
322        pub fn position(&self) -> usize {
323            self.overall_ptr
324        }
325    }
326
327    impl<const N: usize> io::Read for SegmentedReader<N> {
328        fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
329            // We've seeked past the end, return nothing.
330            if self.overall_ptr >= self.len {
331                return Ok(0);
332            }
333
334            let (segment, accum_length) = &self.segments[self.segment_ptr];
335
336            // How many bytes we have left in this segment.
337            let remaining_len = accum_length.checked_sub(self.overall_ptr).unwrap();
338            // Position within the segment to begin reading.
339            let segment_pos = segment.len().checked_sub(remaining_len).unwrap();
340
341            // How many bytes we'll read.
342            let len = core::cmp::min(remaining_len, buf.len());
343            // Copy bytes from the current segment into the buffer.
344            let segment_buf = &segment[..];
345            buf[..len].copy_from_slice(&segment_buf[segment_pos..segment_pos + len]);
346
347            // Advance our pointers.
348            self.overall_ptr += len;
349
350            // Advance to the next segment if we've reached the end of the current.
351            if self.overall_ptr == *accum_length {
352                self.segment_ptr += 1;
353            }
354
355            Ok(len)
356        }
357    }
358
359    impl<const N: usize> io::Seek for SegmentedReader<N> {
360        fn seek(&mut self, pos: io::SeekFrom) -> io::Result<u64> {
361            use io::SeekFrom;
362
363            // Get an offset from the start.
364            let maybe_offset = match pos {
365                SeekFrom::Start(n) => Some(usize::cast_from(n)),
366                SeekFrom::End(n) => {
367                    let n = isize::cast_from(n);
368                    self.len().checked_add_signed(n)
369                }
370                SeekFrom::Current(n) => {
371                    let n = isize::cast_from(n);
372                    self.overall_ptr.checked_add_signed(n)
373                }
374            };
375
376            // Check for integer overflow, but we don't check our bounds!
377            //
378            // The contract for io::Seek denotes that seeking beyond the end
379            // of the stream is allowed. If we're beyond the end of the stream
380            // then we won't read back any bytes, but it won't be an error.
381            let offset = maybe_offset.ok_or_else(|| {
382                io::Error::new(
383                    io::ErrorKind::InvalidInput,
384                    "Invalid seek to an overflowing position",
385                )
386            })?;
387
388            // Special case we want to be fast, seeking back to the beginning.
389            if offset == 0 {
390                self.overall_ptr = 0;
391                self.segment_ptr = 0;
392
393                return Ok(u64::cast_from(offset));
394            }
395
396            // Seek through our segments until we get to the correct offset.
397            let result = self
398                .segments
399                .binary_search_by(|(_s, accum_len)| accum_len.cmp(&offset));
400
401            self.segment_ptr = match result {
402                Ok(segment_ptr) => segment_ptr + 1,
403                Err(segment_ptr) => segment_ptr,
404            };
405            self.overall_ptr = offset;
406
407            Ok(u64::cast_from(offset))
408        }
409    }
410}
411
412#[cfg(test)]
413mod tests {
414    use std::io::{Read, Seek, SeekFrom};
415
416    use bytes::{Buf, Bytes};
417    use proptest::prelude::*;
418
419    use super::SegmentedBytes;
420    use crate::cast::CastFrom;
421
422    #[crate::test]
423    fn test_empty() {
424        let s = SegmentedBytes::default();
425
426        // We should report as empty.
427        assert!(s.is_empty());
428        assert_eq!(s.len(), 0);
429
430        // An iterator of segments should return nothing.
431        let mut i = s.clone().into_segments();
432        assert_eq!(i.next(), None);
433
434        // bytes::Buf shouldn't report anything as remaining.
435        assert_eq!(s.remaining(), 0);
436        // We should get back an empty chunk if we try to read anything.
437        assert!(s.chunk().is_empty());
438
439        // Turn ourselves into a type that impls io::Read.
440        let mut reader = s.reader();
441
442        // We shouldn't panic, but we shouldn't get back any bytes.
443        let mut buf = Vec::new();
444        let bytes_read = reader.read(&mut buf[..]).unwrap();
445        assert_eq!(bytes_read, 0);
446
447        // We should be able to seek past the end without panicking.
448        reader.seek(SeekFrom::Current(20)).unwrap();
449        let bytes_read = reader.read(&mut buf[..]).unwrap();
450        assert_eq!(bytes_read, 0);
451    }
452
453    #[crate::test]
454    fn test_bytes_buf() {
455        let mut s = SegmentedBytes::from(vec![0, 1, 2, 3, 4, 5, 6, 7]);
456
457        assert_eq!(s.len(), 8);
458        assert_eq!(s.len(), s.remaining());
459        assert_eq!(s.chunk(), &[0, 1, 2, 3, 4, 5, 6, 7]);
460
461        // Advance far into the buffer.
462        s.advance(6);
463        assert_eq!(s.len(), 2);
464        assert_eq!(s.len(), s.remaining());
465        assert_eq!(s.chunk(), &[6, 7]);
466    }
467
468    #[crate::test]
469    fn test_bytes_buf_multi() {
470        let segments = vec![vec![0, 1, 2, 3], vec![4, 5, 6, 7], vec![8, 9, 10, 11]];
471        let mut s: SegmentedBytes<2> = segments.into_iter().collect();
472
473        assert_eq!(s.len(), 12);
474        assert_eq!(s.len(), s.remaining());
475
476        // Chunk should return the entirety of the first segment.
477        assert_eq!(s.chunk(), &[0, 1, 2, 3]);
478
479        // Advance into the middle segment
480        s.advance(6);
481        assert_eq!(s.len(), 6);
482        assert_eq!(s.len(), s.remaining());
483
484        // Chunk should return the rest of the second segment.
485        assert_eq!(s.chunk(), &[6, 7]);
486
487        // Read across two segments.
488        let x = s.get_u32();
489        assert_eq!(x, u32::from_be_bytes([6, 7, 8, 9]));
490
491        // Only two bytes remaining.
492        assert_eq!(s.len(), 2);
493        assert_eq!(s.len(), s.remaining());
494
495        let mut s = s.chain(&[12, 13, 14, 15][..]);
496
497        // Should have 6 bytes total now.
498        assert_eq!(s.remaining(), 6);
499        // We'll read out the last two bytes from the last segment.
500        assert_eq!(s.chunk(), &[10, 11]);
501
502        // Advance into the chained segment.
503        s.advance(3);
504        // Read the remaining bytes.
505        assert_eq!(s.chunk(), &[13, 14, 15]);
506    }
507
508    #[crate::test]
509    fn test_io_read() {
510        let s = SegmentedBytes::from(vec![0, 1, 2, 3, 4, 5, 6, 7]);
511        let mut reader = s.reader();
512
513        assert_eq!(reader.len(), 8);
514        assert_eq!(reader.position(), 0);
515
516        // Read a small amount.
517        let mut buf = [0; 4];
518        let bytes_read = reader.read(&mut buf).unwrap();
519        assert_eq!(bytes_read, 4);
520        assert_eq!(buf, [0, 1, 2, 3]);
521
522        // We should still report our original length.
523        assert_eq!(reader.len(), 8);
524        // But our position has moved.
525        assert_eq!(reader.position(), 4);
526
527        // We can seek forwards and read bytes.
528        reader.seek(SeekFrom::Current(1)).unwrap();
529        let bytes_read = reader.read(&mut buf).unwrap();
530        assert_eq!(bytes_read, 3);
531        assert_eq!(buf, [5, 6, 7, 3]);
532
533        assert_eq!(reader.len(), 8);
534        // We've read to the end!
535        assert_eq!(reader.position(), 8);
536
537        // We shouldn't read any bytes
538        let bytes_read = reader.read(&mut buf).unwrap();
539        assert_eq!(bytes_read, 0);
540        // Buffer shouldn't change from what it was last.
541        assert_eq!(buf, [5, 6, 7, 3]);
542
543        // Seek backwards and re-read bytes.
544        reader.seek(SeekFrom::Start(2)).unwrap();
545        let bytes_read = reader.read(&mut buf).unwrap();
546        assert_eq!(bytes_read, 4);
547        assert_eq!(buf, [2, 3, 4, 5]);
548    }
549
550    #[crate::test]
551    fn test_io_read_multi() {
552        let segments = vec![vec![0, 1, 2, 3], vec![4, 5, 6, 7, 8, 9], vec![10, 11]];
553        let s: SegmentedBytes<2> = segments.into_iter().collect();
554        let mut reader = s.reader();
555
556        assert_eq!(reader.len(), 12);
557        assert_eq!(reader.position(), 0);
558
559        // Read up to the first segment.
560        let mut buf = [0; 6];
561        let bytes_read = reader.read(&mut buf).unwrap();
562        assert_eq!(bytes_read, 4);
563        assert_eq!(buf, [0, 1, 2, 3, 0, 0]);
564
565        // Read 5 bytes, which should come from the second segment.
566        let bytes_read = reader.read(&mut buf[1..]).unwrap();
567        assert_eq!(bytes_read, 5);
568        assert_eq!(buf, [0, 4, 5, 6, 7, 8]);
569
570        // Seek backwards to the middle of the first segment.
571        reader.seek(SeekFrom::Start(2)).unwrap();
572        let bytes_read = reader.read(&mut buf).unwrap();
573        assert_eq!(bytes_read, 2);
574        assert_eq!(buf, [2, 3, 5, 6, 7, 8]);
575
576        // Seek past the end.
577        reader.seek(SeekFrom::Start(1000)).unwrap();
578        let bytes_read = reader.read(&mut buf).unwrap();
579        assert_eq!(bytes_read, 0);
580
581        assert_eq!(reader.len(), 12);
582        assert_eq!(reader.position(), 1000);
583
584        // Seek back to the middle.
585        reader.seek(SeekFrom::Start(6)).unwrap();
586        // Read the entire bufffer.
587        let mut buf = Vec::new();
588        let bytes_read = reader.read_to_end(&mut buf).unwrap();
589        assert_eq!(bytes_read, 6);
590        assert_eq!(buf, &[6, 7, 8, 9, 10, 11]);
591    }
592
593    #[crate::test]
594    fn test_multi() {
595        let segments = vec![vec![0, 1, 2, 3], vec![4, 5, 6, 7, 8, 9], vec![10, 11]];
596        let mut s: SegmentedBytes<2> = segments.into_iter().collect();
597
598        assert_eq!(s.len(), 12);
599        assert_eq!(s.remaining(), 12);
600
601        // Read the first chunk.
602        assert_eq!(s.chunk(), [0, 1, 2, 3]);
603
604        // Advance to the middle.
605        s.advance(6);
606        assert_eq!(s.remaining(), 6);
607
608        // Convert to a reader.
609        let mut reader = s.reader();
610        // We should be at the beginning, and only see the remaining 6 bytes.
611        assert_eq!(reader.len(), 6);
612        assert_eq!(reader.position(), 0);
613
614        // Read to the end of the second segment.
615        let mut buf = [0; 8];
616        let bytes_read = reader.read(&mut buf).unwrap();
617        assert_eq!(bytes_read, 4);
618        assert_eq!(buf, [6, 7, 8, 9, 0, 0, 0, 0]);
619
620        // Read again to get the final segment.
621        let bytes_read = reader.read(&mut buf[4..]).unwrap();
622        assert_eq!(bytes_read, 2);
623        assert_eq!(buf, [6, 7, 8, 9, 10, 11, 0, 0]);
624
625        // Seek back to the beginning.
626        reader.seek(SeekFrom::Start(0)).unwrap();
627        // Read everything.
628        reader.read_exact(&mut buf[..6]).unwrap();
629        assert_eq!(buf, [6, 7, 8, 9, 10, 11, 0, 0]);
630    }
631
632    #[crate::test]
633    fn test_single_empty_segment() {
634        let s = SegmentedBytes::from(Vec::<u8>::new());
635
636        // Everything should comeback empty.
637        assert_eq!(s.len(), 0);
638        assert_eq!(s.remaining(), 0);
639        assert!(s.chunk().is_empty());
640
641        let mut reader = s.reader();
642
643        // Reading shouldn't fail, but it also shouldn't yield any bytes.
644        let mut buf = [0; 4];
645        let bytes_read = reader.read(&mut buf).unwrap();
646        assert_eq!(bytes_read, 0);
647        assert_eq!(buf, [0, 0, 0, 0]);
648    }
649
650    #[crate::test]
651    fn test_middle_segment_empty() {
652        let segments = vec![vec![1, 2], vec![], vec![3, 4, 5, 6]];
653        let mut s: SegmentedBytes = segments.clone().into_iter().collect();
654
655        assert_eq!(s.len(), 6);
656        assert_eq!(s.remaining(), 6);
657
658        // Read and advanced past the first chunk.
659        let first_chunk = s.chunk();
660        assert_eq!(first_chunk, [1, 2]);
661        s.advance(first_chunk.len());
662
663        assert_eq!(s.remaining(), 4);
664
665        // We should skip the empty segment and continue to the next.
666        let second_chunk = s.chunk();
667        assert_eq!(second_chunk, [3, 4, 5, 6]);
668
669        // Recreate SegmentedBytes.
670        let s: SegmentedBytes = segments.into_iter().collect();
671        let mut reader = s.reader();
672
673        // We should be able to read the first chunk.
674        let mut buf = [0; 4];
675        let bytes_read = reader.read(&mut buf).unwrap();
676        assert_eq!(bytes_read, 2);
677        assert_eq!(buf, [1, 2, 0, 0]);
678
679        // And we should be able to read the second chunk without issue.
680        let bytes_read = reader.read(&mut buf).unwrap();
681        assert_eq!(bytes_read, 4);
682        assert_eq!(buf, [3, 4, 5, 6]);
683
684        // Seek backwards and read again.
685        reader.seek(SeekFrom::Current(-2)).unwrap();
686        let bytes_read = reader.read(&mut buf).unwrap();
687        assert_eq!(bytes_read, 2);
688        assert_eq!(buf, [5, 6, 5, 6]);
689    }
690
691    #[crate::test]
692    fn test_last_segment_empty() {
693        let segments = vec![vec![1, 2], vec![3, 4, 5, 6], vec![]];
694        let mut s: SegmentedBytes = segments.clone().into_iter().collect();
695
696        assert_eq!(s.len(), 6);
697        assert_eq!(s.remaining(), 6);
698
699        // Read and advanced past the first chunk.
700        let first_chunk = s.chunk();
701        assert_eq!(first_chunk, [1, 2]);
702        s.advance(first_chunk.len());
703
704        assert_eq!(s.remaining(), 4);
705
706        // Read and advance past the second chunk.
707        let second_chunk = s.chunk();
708        assert_eq!(second_chunk, [3, 4, 5, 6]);
709        s.advance(second_chunk.len());
710
711        // No bytes should remain.
712        assert_eq!(s.remaining(), 0);
713        assert!(s.chunk().is_empty());
714
715        // Recreate SegmentedBytes.
716        let s: SegmentedBytes = segments.into_iter().collect();
717        let mut reader = s.reader();
718
719        // We should be able to read the first chunk.
720        let mut buf = [0; 4];
721        let bytes_read = reader.read(&mut buf).unwrap();
722        assert_eq!(bytes_read, 2);
723        assert_eq!(buf, [1, 2, 0, 0]);
724
725        // And we should be able to read the second chunk without issue.
726        let bytes_read = reader.read(&mut buf).unwrap();
727        assert_eq!(bytes_read, 4);
728        assert_eq!(buf, [3, 4, 5, 6]);
729
730        // Reading again shouldn't provide any bytes.
731        let bytes_read = reader.read(&mut buf).unwrap();
732        assert_eq!(bytes_read, 0);
733        // Buffer shouldn't change.
734        assert_eq!(buf, [3, 4, 5, 6]);
735
736        // Seek backwards and read again.
737        reader.seek(SeekFrom::Current(-2)).unwrap();
738        let bytes_read = reader.read(&mut buf).unwrap();
739        assert_eq!(bytes_read, 2);
740        assert_eq!(buf, [5, 6, 5, 6]);
741    }
742
743    #[crate::test]
744    #[cfg_attr(miri, ignore)] // slow
745    fn proptest_copy_to_bytes() {
746        fn test(segments: Vec<Vec<u8>>, num_bytes: usize) {
747            let contiguous: Vec<u8> = segments.clone().into_iter().flatten().collect();
748            let mut contiguous = Bytes::from(contiguous);
749            let mut segmented: SegmentedBytes = segments.into_iter().map(Bytes::from).collect();
750
751            // Cap num_bytes at the size of contiguous.
752            let num_bytes = contiguous.len() % num_bytes;
753            let remaining = contiguous.len() - num_bytes;
754
755            let copied_c = contiguous.copy_to_bytes(num_bytes);
756            let copied_s = segmented.copy_to_bytes(num_bytes);
757
758            assert_eq!(copied_c, copied_s);
759
760            let copied_c = contiguous.copy_to_bytes(remaining);
761            let copied_s = segmented.copy_to_bytes(remaining);
762
763            assert_eq!(copied_c, copied_s);
764        }
765
766        proptest!(|(segments in any::<Vec<Vec<u8>>>(), num_bytes in any::<usize>())| {
767            test(segments, num_bytes);
768        })
769    }
770
771    #[crate::test]
772    #[cfg_attr(miri, ignore)] // slow
773    fn proptest_read_to_end() {
774        fn test(segments: Vec<Vec<u8>>) {
775            let contiguous: Vec<u8> = segments.clone().into_iter().flatten().collect();
776            let contiguous = Bytes::from(contiguous);
777            let segmented: SegmentedBytes = segments.into_iter().map(Bytes::from).collect();
778
779            let mut reader_c = contiguous.reader();
780            let mut reader_s = segmented.reader();
781
782            let mut buf_c = Vec::new();
783            reader_c.read_to_end(&mut buf_c).unwrap();
784
785            let mut buf_s = Vec::new();
786            reader_s.read_to_end(&mut buf_s).unwrap();
787
788            assert_eq!(buf_s, buf_s);
789        }
790
791        proptest!(|(segments in any::<Vec<Vec<u8>>>())| {
792            test(segments);
793        })
794    }
795
796    #[crate::test]
797    #[cfg_attr(miri, ignore)] // slow
798    fn proptest_read_and_seek() {
799        fn test(segments: Vec<Vec<u8>>, from_start: u64, from_current: i64, from_end: i64) {
800            let contiguous: Vec<u8> = segments.clone().into_iter().flatten().collect();
801            let total_len = contiguous.len();
802            let contiguous = std::io::Cursor::new(&contiguous[..]);
803            let segmented: SegmentedBytes = segments.into_iter().map(Bytes::from).collect();
804
805            let mut reader_c = contiguous;
806            let mut reader_s = segmented.reader();
807
808            let mut buf_c = Vec::new();
809            let mut buf_s = Vec::new();
810
811            // Seek from the start.
812
813            let from_start = from_start % (u64::cast_from(total_len).max(1));
814            reader_c.seek(SeekFrom::Start(from_start)).unwrap();
815            reader_s.seek(SeekFrom::Start(from_start)).unwrap();
816
817            reader_c.read_to_end(&mut buf_c).unwrap();
818            reader_s.read_to_end(&mut buf_s).unwrap();
819
820            assert_eq!(&buf_c, &buf_s);
821            buf_c.clear();
822            buf_s.clear();
823
824            // Seek from the current position.
825
826            let from_current = from_current % i64::try_from(total_len).unwrap().max(1);
827            reader_c.seek(SeekFrom::Current(from_current)).unwrap();
828            reader_s.seek(SeekFrom::Current(from_current)).unwrap();
829
830            reader_c.read_to_end(&mut buf_c).unwrap();
831            reader_s.read_to_end(&mut buf_s).unwrap();
832
833            assert_eq!(&buf_c, &buf_s);
834            buf_c.clear();
835            buf_s.clear();
836
837            // Seek from the end.
838
839            let from_end = from_end % i64::try_from(total_len).unwrap().max(1);
840            reader_c.seek(SeekFrom::End(from_end)).unwrap();
841            reader_s.seek(SeekFrom::End(from_end)).unwrap();
842
843            reader_c.read_to_end(&mut buf_c).unwrap();
844            reader_s.read_to_end(&mut buf_s).unwrap();
845
846            assert_eq!(&buf_c, &buf_s);
847            buf_c.clear();
848            buf_s.clear();
849        }
850
851        proptest!(|(segments in any::<Vec<Vec<u8>>>(), s in any::<u64>(), c in any::<i64>(), e in any::<i64>())| {
852            test(segments, s, c, e);
853        })
854    }
855
856    #[crate::test]
857    #[cfg_attr(miri, ignore)] // slow
858    fn proptest_non_empty_segments() {
859        fn test(segments: Vec<Vec<u8>>) {
860            // Vec
861            let segment = segments.first().cloned().unwrap_or_default();
862            let s = SegmentedBytes::from(segment.clone());
863            assert!(s.into_segments().all(|segment| !segment.is_empty()));
864
865            // Bytes
866            let bytes = Bytes::from(segment.clone());
867            let s = SegmentedBytes::from(bytes);
868            assert!(s.into_segments().all(|segment| !segment.is_empty()));
869
870            // SegmentedBytes::push
871            let mut s = SegmentedBytes::default();
872            s.push(Bytes::from(segment));
873            assert!(s.into_segments().all(|segment| !segment.is_empty()));
874
875            // Vec<Vec<u8>>
876            let s: SegmentedBytes = segments.clone().into_iter().collect();
877            assert!(s.into_segments().all(|segment| !segment.is_empty()));
878
879            // Vec<Bytes>
880            let s: SegmentedBytes = segments.clone().into_iter().map(Bytes::from).collect();
881            assert!(s.into_segments().all(|segment| !segment.is_empty()));
882
883            // Vec<Bytes>
884            let segments: Vec<_> = segments.into_iter().map(Bytes::from).collect();
885            let s = SegmentedBytes::from(segments);
886            assert!(s.into_segments().all(|segment| !segment.is_empty()));
887        }
888
889        proptest!(|(segments in any::<Vec<Vec<u8>>>())| {
890            test(segments);
891        })
892    }
893}