#[cfg(feature = "parquet")]
use std::io::{Read, Seek};
use bytes::{Buf, Bytes};
use internal::SegmentedReader;
use smallvec::SmallVec;
#[cfg(feature = "parquet")]
use crate::cast::CastFrom;
use crate::lgbytes::LgBytes;
#[derive(Clone, Debug, PartialEq, Eq)]
pub struct SegmentedBytes<const N: usize = 1> {
segments: SmallVec<[(MaybeLgBytes, Padding); N]>,
len: usize,
}
type Padding = usize;
const PADDING_DEFAULT: usize = 0;
#[derive(Clone, Debug)]
pub enum MaybeLgBytes {
Bytes(Bytes),
LgBytes(LgBytes),
}
impl PartialEq for MaybeLgBytes {
fn eq(&self, other: &Self) -> bool {
self.as_ref() == other.as_ref()
}
}
impl Eq for MaybeLgBytes {}
impl AsRef<[u8]> for MaybeLgBytes {
fn as_ref(&self) -> &[u8] {
match self {
MaybeLgBytes::Bytes(x) => x.as_ref(),
MaybeLgBytes::LgBytes(x) => x.as_ref(),
}
}
}
impl From<Bytes> for MaybeLgBytes {
#[inline]
fn from(value: Bytes) -> Self {
MaybeLgBytes::Bytes(value)
}
}
impl MaybeLgBytes {
pub fn len(&self) -> usize {
match self {
MaybeLgBytes::Bytes(x) => x.len(),
MaybeLgBytes::LgBytes(x) => x.len(),
}
}
pub fn is_empty(&self) -> bool {
match self {
MaybeLgBytes::Bytes(x) => x.is_empty(),
MaybeLgBytes::LgBytes(x) => x.is_empty(),
}
}
}
impl Buf for MaybeLgBytes {
fn remaining(&self) -> usize {
match self {
MaybeLgBytes::Bytes(x) => x.remaining(),
MaybeLgBytes::LgBytes(x) => x.remaining(),
}
}
fn chunk(&self) -> &[u8] {
match self {
MaybeLgBytes::Bytes(x) => x.chunk(),
MaybeLgBytes::LgBytes(x) => x.chunk(),
}
}
fn advance(&mut self, cnt: usize) {
match self {
MaybeLgBytes::Bytes(x) => x.advance(cnt),
MaybeLgBytes::LgBytes(x) => x.advance(cnt),
}
}
}
impl Default for SegmentedBytes {
fn default() -> Self {
SegmentedBytes {
segments: SmallVec::new(),
len: 0,
}
}
}
impl SegmentedBytes {
pub fn new<const N: usize>() -> SegmentedBytes<N> {
SegmentedBytes {
segments: SmallVec::new(),
len: 0,
}
}
}
impl<const N: usize> SegmentedBytes<N> {
pub fn with_capacity(capacity: usize) -> SegmentedBytes<N> {
SegmentedBytes {
segments: SmallVec::with_capacity(capacity),
len: 0,
}
}
pub fn len(&self) -> usize {
self.len
}
pub fn is_empty(&self) -> bool {
self.len() == 0
}
pub fn into_segments(self) -> impl Iterator<Item = MaybeLgBytes> {
self.segments.into_iter().map(|(bytes, _len)| bytes)
}
pub fn into_contiguous(mut self) -> Vec<u8> {
self.copy_to_bytes(self.remaining()).into()
}
#[inline]
pub fn push<B: Into<MaybeLgBytes>>(&mut self, b: B) {
let b: MaybeLgBytes = b.into();
if !b.is_empty() {
self.len += b.len();
self.segments.push((b, PADDING_DEFAULT));
}
}
pub fn reader(self) -> SegmentedReader<N> {
SegmentedReader::new(self)
}
}
impl<const N: usize> Buf for SegmentedBytes<N> {
fn remaining(&self) -> usize {
self.len()
}
fn chunk(&self) -> &[u8] {
self.segments
.iter()
.filter(|(c, _len)| !c.is_empty())
.map(|(c, _len)| Buf::chunk(c))
.next()
.unwrap_or_default()
}
fn advance(&mut self, mut cnt: usize) {
assert!(cnt <= self.len, "Advance past the end of buffer");
self.len -= cnt;
while cnt > 0 {
if let Some((seg, _len)) = self.segments.first_mut() {
if seg.remaining() > cnt {
seg.advance(cnt);
cnt = 0;
} else {
cnt = cnt.saturating_sub(seg.remaining());
self.segments.remove(0);
}
}
}
}
}
#[cfg(feature = "parquet")]
impl parquet::file::reader::Length for SegmentedBytes {
fn len(&self) -> u64 {
u64::cast_from(self.len)
}
}
#[cfg(feature = "parquet")]
impl parquet::file::reader::ChunkReader for SegmentedBytes {
type T = internal::SegmentedReader;
fn get_read(&self, start: u64) -> parquet::errors::Result<Self::T> {
let mut reader = self.clone().reader();
reader.seek(std::io::SeekFrom::Start(start))?;
Ok(reader)
}
fn get_bytes(&self, start: u64, length: usize) -> parquet::errors::Result<Bytes> {
let mut reader = self.clone().reader();
reader.seek(std::io::SeekFrom::Start(start))?;
let mut buf = vec![0; length];
reader.read_exact(&mut buf)?;
Ok(Bytes::from(buf))
}
}
impl From<Bytes> for SegmentedBytes {
fn from(value: Bytes) -> Self {
let mut s = SegmentedBytes::default();
s.push(value);
s
}
}
impl From<MaybeLgBytes> for SegmentedBytes {
fn from(value: MaybeLgBytes) -> Self {
let mut s = SegmentedBytes::default();
s.push(value);
s
}
}
impl From<Vec<u8>> for SegmentedBytes {
fn from(value: Vec<u8>) -> Self {
let b = Bytes::from(value);
SegmentedBytes::from(b)
}
}
impl From<Vec<MaybeLgBytes>> for SegmentedBytes {
fn from(value: Vec<MaybeLgBytes>) -> Self {
let mut s = SegmentedBytes::with_capacity(value.len());
for segment in value {
s.push(segment);
}
s
}
}
impl<const N: usize> FromIterator<Bytes> for SegmentedBytes<N> {
fn from_iter<T: IntoIterator<Item = Bytes>>(iter: T) -> Self {
let mut s = SegmentedBytes::new();
for segment in iter {
s.push(segment);
}
s
}
}
impl<const N: usize> FromIterator<Vec<u8>> for SegmentedBytes<N> {
fn from_iter<T: IntoIterator<Item = Vec<u8>>>(iter: T) -> Self {
iter.into_iter().map(Bytes::from).collect()
}
}
mod internal {
use std::io;
use smallvec::SmallVec;
use crate::bytes::{MaybeLgBytes, SegmentedBytes};
use crate::cast::CastFrom;
#[derive(Debug)]
pub struct SegmentedReader<const N: usize = 1> {
segments: SmallVec<[(MaybeLgBytes, usize); N]>,
len: usize,
overall_ptr: usize,
segment_ptr: usize,
}
impl<const N: usize> SegmentedReader<N> {
pub fn new(mut bytes: SegmentedBytes<N>) -> Self {
let mut accum_length = 0;
for (segment, len) in &mut bytes.segments {
accum_length += segment.len();
*len = accum_length;
}
SegmentedReader {
segments: bytes.segments,
len: bytes.len,
overall_ptr: 0,
segment_ptr: 0,
}
}
pub fn len(&self) -> usize {
self.len
}
pub fn position(&self) -> usize {
self.overall_ptr
}
}
impl<const N: usize> io::Read for SegmentedReader<N> {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
if self.overall_ptr >= self.len {
return Ok(0);
}
let (segment, accum_length) = &self.segments[self.segment_ptr];
let remaining_len = accum_length.checked_sub(self.overall_ptr).unwrap();
let segment_pos = segment.len().checked_sub(remaining_len).unwrap();
let len = core::cmp::min(remaining_len, buf.len());
let segment_buf = match segment {
MaybeLgBytes::Bytes(x) => x.as_ref(),
MaybeLgBytes::LgBytes(x) => x.as_ref(),
};
buf[..len].copy_from_slice(&segment_buf[segment_pos..segment_pos + len]);
self.overall_ptr += len;
if self.overall_ptr == *accum_length {
self.segment_ptr += 1;
}
Ok(len)
}
}
impl<const N: usize> io::Seek for SegmentedReader<N> {
fn seek(&mut self, pos: io::SeekFrom) -> io::Result<u64> {
use io::SeekFrom;
let maybe_offset = match pos {
SeekFrom::Start(n) => Some(usize::cast_from(n)),
SeekFrom::End(n) => {
let n = isize::cast_from(n);
self.len().checked_add_signed(n)
}
SeekFrom::Current(n) => {
let n = isize::cast_from(n);
self.overall_ptr.checked_add_signed(n)
}
};
let offset = maybe_offset.ok_or_else(|| {
io::Error::new(
io::ErrorKind::InvalidInput,
"Invalid seek to an overflowing position",
)
})?;
if offset == 0 {
self.overall_ptr = 0;
self.segment_ptr = 0;
return Ok(u64::cast_from(offset));
}
let result = self
.segments
.binary_search_by(|(_s, accum_len)| accum_len.cmp(&offset));
self.segment_ptr = match result {
Ok(segment_ptr) => segment_ptr + 1,
Err(segment_ptr) => segment_ptr,
};
self.overall_ptr = offset;
Ok(u64::cast_from(offset))
}
}
}
#[cfg(test)]
mod tests {
use std::io::{Read, Seek, SeekFrom};
use bytes::{Buf, Bytes};
use proptest::prelude::*;
use super::SegmentedBytes;
use crate::bytes::MaybeLgBytes;
use crate::cast::CastFrom;
#[crate::test]
fn test_empty() {
let s = SegmentedBytes::default();
assert!(s.is_empty());
assert_eq!(s.len(), 0);
let mut i = s.clone().into_segments();
assert_eq!(i.next(), None);
assert_eq!(s.remaining(), 0);
assert!(s.chunk().is_empty());
let mut reader = s.reader();
let mut buf = Vec::new();
let bytes_read = reader.read(&mut buf[..]).unwrap();
assert_eq!(bytes_read, 0);
reader.seek(SeekFrom::Current(20)).unwrap();
let bytes_read = reader.read(&mut buf[..]).unwrap();
assert_eq!(bytes_read, 0);
}
#[crate::test]
fn test_bytes_buf() {
let mut s = SegmentedBytes::from(vec![0, 1, 2, 3, 4, 5, 6, 7]);
assert_eq!(s.len(), 8);
assert_eq!(s.len(), s.remaining());
assert_eq!(s.chunk(), &[0, 1, 2, 3, 4, 5, 6, 7]);
s.advance(6);
assert_eq!(s.len(), 2);
assert_eq!(s.len(), s.remaining());
assert_eq!(s.chunk(), &[6, 7]);
}
#[crate::test]
fn test_bytes_buf_multi() {
let segments = vec![vec![0, 1, 2, 3], vec![4, 5, 6, 7], vec![8, 9, 10, 11]];
let mut s: SegmentedBytes<2> = segments.into_iter().collect();
assert_eq!(s.len(), 12);
assert_eq!(s.len(), s.remaining());
assert_eq!(s.chunk(), &[0, 1, 2, 3]);
s.advance(6);
assert_eq!(s.len(), 6);
assert_eq!(s.len(), s.remaining());
assert_eq!(s.chunk(), &[6, 7]);
let x = s.get_u32();
assert_eq!(x, u32::from_be_bytes([6, 7, 8, 9]));
assert_eq!(s.len(), 2);
assert_eq!(s.len(), s.remaining());
let mut s = s.chain(&[12, 13, 14, 15][..]);
assert_eq!(s.remaining(), 6);
assert_eq!(s.chunk(), &[10, 11]);
s.advance(3);
assert_eq!(s.chunk(), &[13, 14, 15]);
}
#[crate::test]
fn test_io_read() {
let s = SegmentedBytes::from(vec![0, 1, 2, 3, 4, 5, 6, 7]);
let mut reader = s.reader();
assert_eq!(reader.len(), 8);
assert_eq!(reader.position(), 0);
let mut buf = [0; 4];
let bytes_read = reader.read(&mut buf).unwrap();
assert_eq!(bytes_read, 4);
assert_eq!(buf, [0, 1, 2, 3]);
assert_eq!(reader.len(), 8);
assert_eq!(reader.position(), 4);
reader.seek(SeekFrom::Current(1)).unwrap();
let bytes_read = reader.read(&mut buf).unwrap();
assert_eq!(bytes_read, 3);
assert_eq!(buf, [5, 6, 7, 3]);
assert_eq!(reader.len(), 8);
assert_eq!(reader.position(), 8);
let bytes_read = reader.read(&mut buf).unwrap();
assert_eq!(bytes_read, 0);
assert_eq!(buf, [5, 6, 7, 3]);
reader.seek(SeekFrom::Start(2)).unwrap();
let bytes_read = reader.read(&mut buf).unwrap();
assert_eq!(bytes_read, 4);
assert_eq!(buf, [2, 3, 4, 5]);
}
#[crate::test]
fn test_io_read_multi() {
let segments = vec![vec![0, 1, 2, 3], vec![4, 5, 6, 7, 8, 9], vec![10, 11]];
let s: SegmentedBytes<2> = segments.into_iter().collect();
let mut reader = s.reader();
assert_eq!(reader.len(), 12);
assert_eq!(reader.position(), 0);
let mut buf = [0; 6];
let bytes_read = reader.read(&mut buf).unwrap();
assert_eq!(bytes_read, 4);
assert_eq!(buf, [0, 1, 2, 3, 0, 0]);
let bytes_read = reader.read(&mut buf[1..]).unwrap();
assert_eq!(bytes_read, 5);
assert_eq!(buf, [0, 4, 5, 6, 7, 8]);
reader.seek(SeekFrom::Start(2)).unwrap();
let bytes_read = reader.read(&mut buf).unwrap();
assert_eq!(bytes_read, 2);
assert_eq!(buf, [2, 3, 5, 6, 7, 8]);
reader.seek(SeekFrom::Start(1000)).unwrap();
let bytes_read = reader.read(&mut buf).unwrap();
assert_eq!(bytes_read, 0);
assert_eq!(reader.len(), 12);
assert_eq!(reader.position(), 1000);
reader.seek(SeekFrom::Start(6)).unwrap();
let mut buf = Vec::new();
let bytes_read = reader.read_to_end(&mut buf).unwrap();
assert_eq!(bytes_read, 6);
assert_eq!(buf, &[6, 7, 8, 9, 10, 11]);
}
#[crate::test]
fn test_multi() {
let segments = vec![vec![0, 1, 2, 3], vec![4, 5, 6, 7, 8, 9], vec![10, 11]];
let mut s: SegmentedBytes<2> = segments.into_iter().collect();
assert_eq!(s.len(), 12);
assert_eq!(s.remaining(), 12);
assert_eq!(s.chunk(), [0, 1, 2, 3]);
s.advance(6);
assert_eq!(s.remaining(), 6);
let mut reader = s.reader();
assert_eq!(reader.len(), 6);
assert_eq!(reader.position(), 0);
let mut buf = [0; 8];
let bytes_read = reader.read(&mut buf).unwrap();
assert_eq!(bytes_read, 4);
assert_eq!(buf, [6, 7, 8, 9, 0, 0, 0, 0]);
let bytes_read = reader.read(&mut buf[4..]).unwrap();
assert_eq!(bytes_read, 2);
assert_eq!(buf, [6, 7, 8, 9, 10, 11, 0, 0]);
reader.seek(SeekFrom::Start(0)).unwrap();
reader.read_exact(&mut buf[..6]).unwrap();
assert_eq!(buf, [6, 7, 8, 9, 10, 11, 0, 0]);
}
#[crate::test]
fn test_single_empty_segment() {
let s = SegmentedBytes::from(Vec::<u8>::new());
assert_eq!(s.len(), 0);
assert_eq!(s.remaining(), 0);
assert!(s.chunk().is_empty());
let mut reader = s.reader();
let mut buf = [0; 4];
let bytes_read = reader.read(&mut buf).unwrap();
assert_eq!(bytes_read, 0);
assert_eq!(buf, [0, 0, 0, 0]);
}
#[crate::test]
fn test_middle_segment_empty() {
let segments = vec![vec![1, 2], vec![], vec![3, 4, 5, 6]];
let mut s: SegmentedBytes = segments.clone().into_iter().collect();
assert_eq!(s.len(), 6);
assert_eq!(s.remaining(), 6);
let first_chunk = s.chunk();
assert_eq!(first_chunk, [1, 2]);
s.advance(first_chunk.len());
assert_eq!(s.remaining(), 4);
let second_chunk = s.chunk();
assert_eq!(second_chunk, [3, 4, 5, 6]);
let s: SegmentedBytes = segments.into_iter().collect();
let mut reader = s.reader();
let mut buf = [0; 4];
let bytes_read = reader.read(&mut buf).unwrap();
assert_eq!(bytes_read, 2);
assert_eq!(buf, [1, 2, 0, 0]);
let bytes_read = reader.read(&mut buf).unwrap();
assert_eq!(bytes_read, 4);
assert_eq!(buf, [3, 4, 5, 6]);
reader.seek(SeekFrom::Current(-2)).unwrap();
let bytes_read = reader.read(&mut buf).unwrap();
assert_eq!(bytes_read, 2);
assert_eq!(buf, [5, 6, 5, 6]);
}
#[crate::test]
fn test_last_segment_empty() {
let segments = vec![vec![1, 2], vec![3, 4, 5, 6], vec![]];
let mut s: SegmentedBytes = segments.clone().into_iter().collect();
assert_eq!(s.len(), 6);
assert_eq!(s.remaining(), 6);
let first_chunk = s.chunk();
assert_eq!(first_chunk, [1, 2]);
s.advance(first_chunk.len());
assert_eq!(s.remaining(), 4);
let second_chunk = s.chunk();
assert_eq!(second_chunk, [3, 4, 5, 6]);
s.advance(second_chunk.len());
assert_eq!(s.remaining(), 0);
assert!(s.chunk().is_empty());
let s: SegmentedBytes = segments.into_iter().collect();
let mut reader = s.reader();
let mut buf = [0; 4];
let bytes_read = reader.read(&mut buf).unwrap();
assert_eq!(bytes_read, 2);
assert_eq!(buf, [1, 2, 0, 0]);
let bytes_read = reader.read(&mut buf).unwrap();
assert_eq!(bytes_read, 4);
assert_eq!(buf, [3, 4, 5, 6]);
let bytes_read = reader.read(&mut buf).unwrap();
assert_eq!(bytes_read, 0);
assert_eq!(buf, [3, 4, 5, 6]);
reader.seek(SeekFrom::Current(-2)).unwrap();
let bytes_read = reader.read(&mut buf).unwrap();
assert_eq!(bytes_read, 2);
assert_eq!(buf, [5, 6, 5, 6]);
}
#[crate::test]
#[cfg_attr(miri, ignore)] fn proptest_copy_to_bytes() {
fn test(segments: Vec<Vec<u8>>, num_bytes: usize) {
let contiguous: Vec<u8> = segments.clone().into_iter().flatten().collect();
let mut contiguous = Bytes::from(contiguous);
let mut segmented: SegmentedBytes = segments.into_iter().map(Bytes::from).collect();
let num_bytes = contiguous.len() % num_bytes;
let copied_c = contiguous.copy_to_bytes(num_bytes);
let copied_s = segmented.copy_to_bytes(num_bytes);
assert_eq!(copied_c, copied_s);
}
proptest!(|(segments in any::<Vec<Vec<u8>>>(), num_bytes in any::<usize>())| {
test(segments, num_bytes);
})
}
#[crate::test]
#[cfg_attr(miri, ignore)] fn proptest_read_to_end() {
fn test(segments: Vec<Vec<u8>>) {
let contiguous: Vec<u8> = segments.clone().into_iter().flatten().collect();
let contiguous = Bytes::from(contiguous);
let segmented: SegmentedBytes = segments.into_iter().map(Bytes::from).collect();
let mut reader_c = contiguous.reader();
let mut reader_s = segmented.reader();
let mut buf_c = Vec::new();
reader_c.read_to_end(&mut buf_c).unwrap();
let mut buf_s = Vec::new();
reader_s.read_to_end(&mut buf_s).unwrap();
assert_eq!(buf_s, buf_s);
}
proptest!(|(segments in any::<Vec<Vec<u8>>>())| {
test(segments);
})
}
#[crate::test]
#[cfg_attr(miri, ignore)] fn proptest_read_and_seek() {
fn test(segments: Vec<Vec<u8>>, from_start: u64, from_current: i64, from_end: i64) {
let contiguous: Vec<u8> = segments.clone().into_iter().flatten().collect();
let total_len = contiguous.len();
let contiguous = std::io::Cursor::new(&contiguous[..]);
let segmented: SegmentedBytes = segments.into_iter().map(Bytes::from).collect();
let mut reader_c = contiguous;
let mut reader_s = segmented.reader();
let mut buf_c = Vec::new();
let mut buf_s = Vec::new();
let from_start = from_start % (u64::cast_from(total_len).max(1));
reader_c.seek(SeekFrom::Start(from_start)).unwrap();
reader_s.seek(SeekFrom::Start(from_start)).unwrap();
reader_c.read_to_end(&mut buf_c).unwrap();
reader_s.read_to_end(&mut buf_s).unwrap();
assert_eq!(&buf_c, &buf_s);
buf_c.clear();
buf_s.clear();
let from_current = from_current % i64::try_from(total_len).unwrap().max(1);
reader_c.seek(SeekFrom::Current(from_current)).unwrap();
reader_s.seek(SeekFrom::Current(from_current)).unwrap();
reader_c.read_to_end(&mut buf_c).unwrap();
reader_s.read_to_end(&mut buf_s).unwrap();
assert_eq!(&buf_c, &buf_s);
buf_c.clear();
buf_s.clear();
let from_end = from_end % i64::try_from(total_len).unwrap().max(1);
reader_c.seek(SeekFrom::End(from_end)).unwrap();
reader_s.seek(SeekFrom::End(from_end)).unwrap();
reader_c.read_to_end(&mut buf_c).unwrap();
reader_s.read_to_end(&mut buf_s).unwrap();
assert_eq!(&buf_c, &buf_s);
buf_c.clear();
buf_s.clear();
}
proptest!(|(segments in any::<Vec<Vec<u8>>>(), s in any::<u64>(), c in any::<i64>(), e in any::<i64>())| {
test(segments, s, c, e);
})
}
#[crate::test]
#[cfg_attr(miri, ignore)] fn proptest_non_empty_segments() {
fn test(segments: Vec<Vec<u8>>) {
let segment = segments.first().unwrap_or(&Vec::default()).clone();
let s = SegmentedBytes::from(segment.clone());
assert!(s.into_segments().all(|segment| !segment.is_empty()));
let bytes = Bytes::from(segment.clone());
let s = SegmentedBytes::from(bytes);
assert!(s.into_segments().all(|segment| !segment.is_empty()));
let bytes = MaybeLgBytes::Bytes(Bytes::from(segment.clone()));
let s = SegmentedBytes::from(bytes);
assert!(s.into_segments().all(|segment| !segment.is_empty()));
let mut s = SegmentedBytes::default();
s.push(Bytes::from(segment));
assert!(s.into_segments().all(|segment| !segment.is_empty()));
let s: SegmentedBytes = segments.clone().into_iter().collect();
assert!(s.into_segments().all(|segment| !segment.is_empty()));
let s: SegmentedBytes = segments.clone().into_iter().map(Bytes::from).collect();
assert!(s.into_segments().all(|segment| !segment.is_empty()));
let segments: Vec<_> = segments
.into_iter()
.map(|s| MaybeLgBytes::Bytes(Bytes::from(s)))
.collect();
let s = SegmentedBytes::from(segments);
assert!(s.into_segments().all(|segment| !segment.is_empty()));
}
proptest!(|(segments in any::<Vec<Vec<u8>>>())| {
test(segments);
})
}
}