use crate::builder::ArrayBuilder;
use crate::types::{BinaryViewType, ByteViewType, StringViewType};
use crate::{ArrayRef, GenericByteViewArray};
use arrow_buffer::{Buffer, BufferBuilder, NullBufferBuilder, ScalarBuffer};
use arrow_data::ByteView;
use std::any::Any;
use std::marker::PhantomData;
use std::sync::Arc;
const DEFAULT_BLOCK_SIZE: u32 = 8 * 1024;
pub struct GenericByteViewBuilder<T: ByteViewType + ?Sized> {
views_builder: BufferBuilder<u128>,
null_buffer_builder: NullBufferBuilder,
completed: Vec<Buffer>,
in_progress: Vec<u8>,
block_size: u32,
phantom: PhantomData<T>,
}
impl<T: ByteViewType + ?Sized> GenericByteViewBuilder<T> {
pub fn new() -> Self {
Self::with_capacity(1024)
}
pub fn with_capacity(capacity: usize) -> Self {
Self {
views_builder: BufferBuilder::new(capacity),
null_buffer_builder: NullBufferBuilder::new(capacity),
completed: vec![],
in_progress: vec![],
block_size: DEFAULT_BLOCK_SIZE,
phantom: Default::default(),
}
}
pub fn with_block_size(self, block_size: u32) -> Self {
Self { block_size, ..self }
}
#[inline]
pub fn append_value(&mut self, value: impl AsRef<T::Native>) {
let v: &[u8] = value.as_ref().as_ref();
let length: u32 = v.len().try_into().unwrap();
if length <= 12 {
let mut view_buffer = [0; 16];
view_buffer[0..4].copy_from_slice(&length.to_le_bytes());
view_buffer[4..4 + v.len()].copy_from_slice(v);
self.views_builder.append(u128::from_le_bytes(view_buffer));
self.null_buffer_builder.append_non_null();
return;
}
let required_cap = self.in_progress.len() + v.len();
if self.in_progress.capacity() < required_cap {
let in_progress = Vec::with_capacity(v.len().max(self.block_size as usize));
let flushed = std::mem::replace(&mut self.in_progress, in_progress);
if !flushed.is_empty() {
assert!(self.completed.len() < u32::MAX as usize);
self.completed.push(flushed.into());
}
};
let offset = self.in_progress.len() as u32;
self.in_progress.extend_from_slice(v);
let view = ByteView {
length,
prefix: u32::from_le_bytes(v[0..4].try_into().unwrap()),
buffer_index: self.completed.len() as u32,
offset,
};
self.views_builder.append(view.into());
self.null_buffer_builder.append_non_null();
}
#[inline]
pub fn append_option(&mut self, value: Option<impl AsRef<T::Native>>) {
match value {
None => self.append_null(),
Some(v) => self.append_value(v),
};
}
#[inline]
pub fn append_null(&mut self) {
self.null_buffer_builder.append_null();
self.views_builder.append(0);
}
pub fn finish(&mut self) -> GenericByteViewArray<T> {
let mut completed = std::mem::take(&mut self.completed);
if !self.in_progress.is_empty() {
completed.push(std::mem::take(&mut self.in_progress).into());
}
let len = self.views_builder.len();
let views = ScalarBuffer::new(self.views_builder.finish(), 0, len);
let nulls = self.null_buffer_builder.finish();
unsafe { GenericByteViewArray::new_unchecked(views, completed, nulls) }
}
pub fn finish_cloned(&self) -> GenericByteViewArray<T> {
let mut completed = self.completed.clone();
if !self.in_progress.is_empty() {
completed.push(Buffer::from_slice_ref(&self.in_progress));
}
let len = self.views_builder.len();
let views = Buffer::from_slice_ref(self.views_builder.as_slice());
let views = ScalarBuffer::new(views, 0, len);
let nulls = self.null_buffer_builder.finish_cloned();
unsafe { GenericByteViewArray::new_unchecked(views, completed, nulls) }
}
}
impl<T: ByteViewType + ?Sized> Default for GenericByteViewBuilder<T> {
fn default() -> Self {
Self::new()
}
}
impl<T: ByteViewType + ?Sized> std::fmt::Debug for GenericByteViewBuilder<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}ViewBuilder", T::PREFIX)?;
f.debug_struct("")
.field("views_builder", &self.views_builder)
.field("in_progress", &self.in_progress)
.field("completed", &self.completed)
.field("null_buffer_builder", &self.null_buffer_builder)
.finish()
}
}
impl<T: ByteViewType + ?Sized> ArrayBuilder for GenericByteViewBuilder<T> {
fn len(&self) -> usize {
self.null_buffer_builder.len()
}
fn finish(&mut self) -> ArrayRef {
Arc::new(self.finish())
}
fn finish_cloned(&self) -> ArrayRef {
Arc::new(self.finish_cloned())
}
fn as_any(&self) -> &dyn Any {
self
}
fn as_any_mut(&mut self) -> &mut dyn Any {
self
}
fn into_box_any(self: Box<Self>) -> Box<dyn Any> {
self
}
}
impl<T: ByteViewType + ?Sized, V: AsRef<T::Native>> Extend<Option<V>>
for GenericByteViewBuilder<T>
{
#[inline]
fn extend<I: IntoIterator<Item = Option<V>>>(&mut self, iter: I) {
for v in iter {
self.append_option(v)
}
}
}
pub type StringViewBuilder = GenericByteViewBuilder<StringViewType>;
pub type BinaryViewBuilder = GenericByteViewBuilder<BinaryViewType>;