use crate::builder::*;
use crate::StructArray;
use arrow_buffer::NullBufferBuilder;
use arrow_schema::{DataType, Fields, IntervalUnit, SchemaBuilder, TimeUnit};
use std::sync::Arc;
pub struct StructBuilder {
fields: Fields,
field_builders: Vec<Box<dyn ArrayBuilder>>,
null_buffer_builder: NullBufferBuilder,
}
impl std::fmt::Debug for StructBuilder {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("StructBuilder")
.field("fields", &self.fields)
.field("bitmap_builder", &self.null_buffer_builder)
.field("len", &self.len())
.finish()
}
}
impl ArrayBuilder for StructBuilder {
fn len(&self) -> usize {
self.null_buffer_builder.len()
}
fn finish(&mut self) -> ArrayRef {
Arc::new(self.finish())
}
fn finish_cloned(&self) -> ArrayRef {
Arc::new(self.finish_cloned())
}
fn as_any(&self) -> &dyn Any {
self
}
fn as_any_mut(&mut self) -> &mut dyn Any {
self
}
fn into_box_any(self: Box<Self>) -> Box<dyn Any> {
self
}
}
pub fn make_builder(datatype: &DataType, capacity: usize) -> Box<dyn ArrayBuilder> {
use crate::builder::*;
match datatype {
DataType::Null => Box::new(NullBuilder::with_capacity(capacity)),
DataType::Boolean => Box::new(BooleanBuilder::with_capacity(capacity)),
DataType::Int8 => Box::new(Int8Builder::with_capacity(capacity)),
DataType::Int16 => Box::new(Int16Builder::with_capacity(capacity)),
DataType::Int32 => Box::new(Int32Builder::with_capacity(capacity)),
DataType::Int64 => Box::new(Int64Builder::with_capacity(capacity)),
DataType::UInt8 => Box::new(UInt8Builder::with_capacity(capacity)),
DataType::UInt16 => Box::new(UInt16Builder::with_capacity(capacity)),
DataType::UInt32 => Box::new(UInt32Builder::with_capacity(capacity)),
DataType::UInt64 => Box::new(UInt64Builder::with_capacity(capacity)),
DataType::Float16 => Box::new(Float16Builder::with_capacity(capacity)),
DataType::Float32 => Box::new(Float32Builder::with_capacity(capacity)),
DataType::Float64 => Box::new(Float64Builder::with_capacity(capacity)),
DataType::Binary => Box::new(BinaryBuilder::with_capacity(capacity, 1024)),
DataType::LargeBinary => Box::new(LargeBinaryBuilder::with_capacity(capacity, 1024)),
DataType::FixedSizeBinary(len) => {
Box::new(FixedSizeBinaryBuilder::with_capacity(capacity, *len))
}
DataType::Decimal128(p, s) => Box::new(
Decimal128Builder::with_capacity(capacity).with_data_type(DataType::Decimal128(*p, *s)),
),
DataType::Decimal256(p, s) => Box::new(
Decimal256Builder::with_capacity(capacity).with_data_type(DataType::Decimal256(*p, *s)),
),
DataType::Utf8 => Box::new(StringBuilder::with_capacity(capacity, 1024)),
DataType::LargeUtf8 => Box::new(LargeStringBuilder::with_capacity(capacity, 1024)),
DataType::Date32 => Box::new(Date32Builder::with_capacity(capacity)),
DataType::Date64 => Box::new(Date64Builder::with_capacity(capacity)),
DataType::Time32(TimeUnit::Second) => {
Box::new(Time32SecondBuilder::with_capacity(capacity))
}
DataType::Time32(TimeUnit::Millisecond) => {
Box::new(Time32MillisecondBuilder::with_capacity(capacity))
}
DataType::Time64(TimeUnit::Microsecond) => {
Box::new(Time64MicrosecondBuilder::with_capacity(capacity))
}
DataType::Time64(TimeUnit::Nanosecond) => {
Box::new(Time64NanosecondBuilder::with_capacity(capacity))
}
DataType::Timestamp(TimeUnit::Second, tz) => Box::new(
TimestampSecondBuilder::with_capacity(capacity)
.with_data_type(DataType::Timestamp(TimeUnit::Second, tz.clone())),
),
DataType::Timestamp(TimeUnit::Millisecond, tz) => Box::new(
TimestampMillisecondBuilder::with_capacity(capacity)
.with_data_type(DataType::Timestamp(TimeUnit::Millisecond, tz.clone())),
),
DataType::Timestamp(TimeUnit::Microsecond, tz) => Box::new(
TimestampMicrosecondBuilder::with_capacity(capacity)
.with_data_type(DataType::Timestamp(TimeUnit::Microsecond, tz.clone())),
),
DataType::Timestamp(TimeUnit::Nanosecond, tz) => Box::new(
TimestampNanosecondBuilder::with_capacity(capacity)
.with_data_type(DataType::Timestamp(TimeUnit::Nanosecond, tz.clone())),
),
DataType::Interval(IntervalUnit::YearMonth) => {
Box::new(IntervalYearMonthBuilder::with_capacity(capacity))
}
DataType::Interval(IntervalUnit::DayTime) => {
Box::new(IntervalDayTimeBuilder::with_capacity(capacity))
}
DataType::Interval(IntervalUnit::MonthDayNano) => {
Box::new(IntervalMonthDayNanoBuilder::with_capacity(capacity))
}
DataType::Duration(TimeUnit::Second) => {
Box::new(DurationSecondBuilder::with_capacity(capacity))
}
DataType::Duration(TimeUnit::Millisecond) => {
Box::new(DurationMillisecondBuilder::with_capacity(capacity))
}
DataType::Duration(TimeUnit::Microsecond) => {
Box::new(DurationMicrosecondBuilder::with_capacity(capacity))
}
DataType::Duration(TimeUnit::Nanosecond) => {
Box::new(DurationNanosecondBuilder::with_capacity(capacity))
}
DataType::List(field) => {
let builder = make_builder(field.data_type(), capacity);
Box::new(ListBuilder::with_capacity(builder, capacity).with_field(field.clone()))
}
DataType::LargeList(field) => {
let builder = make_builder(field.data_type(), capacity);
Box::new(LargeListBuilder::with_capacity(builder, capacity).with_field(field.clone()))
}
DataType::Map(field, _) => match field.data_type() {
DataType::Struct(fields) => {
let map_field_names = MapFieldNames {
key: fields[0].name().clone(),
value: fields[1].name().clone(),
entry: field.name().clone(),
};
let key_builder = make_builder(fields[0].data_type(), capacity);
let value_builder = make_builder(fields[1].data_type(), capacity);
Box::new(
MapBuilder::with_capacity(
Some(map_field_names),
key_builder,
value_builder,
capacity,
)
.with_values_field(fields[1].clone()),
)
}
t => panic!("The field of Map data type {t:?} should has a child Struct field"),
},
DataType::Struct(fields) => Box::new(StructBuilder::from_fields(fields.clone(), capacity)),
t => panic!("Data type {t:?} is not currently supported"),
}
}
impl StructBuilder {
pub fn new(fields: impl Into<Fields>, field_builders: Vec<Box<dyn ArrayBuilder>>) -> Self {
Self {
field_builders,
fields: fields.into(),
null_buffer_builder: NullBufferBuilder::new(0),
}
}
pub fn from_fields(fields: impl Into<Fields>, capacity: usize) -> Self {
let fields = fields.into();
let mut builders = Vec::with_capacity(fields.len());
for field in &fields {
builders.push(make_builder(field.data_type(), capacity));
}
Self::new(fields, builders)
}
pub fn field_builder<T: ArrayBuilder>(&mut self, i: usize) -> Option<&mut T> {
self.field_builders[i].as_any_mut().downcast_mut::<T>()
}
pub fn num_fields(&self) -> usize {
self.field_builders.len()
}
#[inline]
pub fn append(&mut self, is_valid: bool) {
self.null_buffer_builder.append(is_valid);
}
#[inline]
pub fn append_null(&mut self) {
self.append(false)
}
pub fn finish(&mut self) -> StructArray {
self.validate_content();
if self.fields.is_empty() {
return StructArray::new_empty_fields(self.len(), self.null_buffer_builder.finish());
}
let arrays = self.field_builders.iter_mut().map(|f| f.finish()).collect();
let nulls = self.null_buffer_builder.finish();
StructArray::new(self.fields.clone(), arrays, nulls)
}
pub fn finish_cloned(&self) -> StructArray {
self.validate_content();
if self.fields.is_empty() {
return StructArray::new_empty_fields(
self.len(),
self.null_buffer_builder.finish_cloned(),
);
}
let arrays = self
.field_builders
.iter()
.map(|f| f.finish_cloned())
.collect();
let nulls = self.null_buffer_builder.finish_cloned();
StructArray::new(self.fields.clone(), arrays, nulls)
}
fn validate_content(&self) {
if self.fields.len() != self.field_builders.len() {
panic!("Number of fields is not equal to the number of field_builders.");
}
self.field_builders.iter().enumerate().for_each(|(idx, x)| {
if x.len() != self.len() {
let builder = SchemaBuilder::from(&self.fields);
let schema = builder.finish();
panic!("{}", format!(
"StructBuilder ({:?}) and field_builder with index {} ({:?}) are of unequal lengths: ({} != {}).",
schema,
idx,
self.fields[idx].data_type(),
self.len(),
x.len()
));
}
});
}
}
#[cfg(test)]
mod tests {
use super::*;
use arrow_buffer::Buffer;
use arrow_data::ArrayData;
use arrow_schema::Field;
use crate::array::Array;
#[test]
fn test_struct_array_builder() {
let string_builder = StringBuilder::new();
let int_builder = Int32Builder::new();
let fields = vec![
Field::new("f1", DataType::Utf8, true),
Field::new("f2", DataType::Int32, true),
];
let field_builders = vec![
Box::new(string_builder) as Box<dyn ArrayBuilder>,
Box::new(int_builder) as Box<dyn ArrayBuilder>,
];
let mut builder = StructBuilder::new(fields, field_builders);
assert_eq!(2, builder.num_fields());
let string_builder = builder
.field_builder::<StringBuilder>(0)
.expect("builder at field 0 should be string builder");
string_builder.append_value("joe");
string_builder.append_null();
string_builder.append_null();
string_builder.append_value("mark");
let int_builder = builder
.field_builder::<Int32Builder>(1)
.expect("builder at field 1 should be int builder");
int_builder.append_value(1);
int_builder.append_value(2);
int_builder.append_null();
int_builder.append_value(4);
builder.append(true);
builder.append(true);
builder.append_null();
builder.append(true);
let struct_data = builder.finish().into_data();
assert_eq!(4, struct_data.len());
assert_eq!(1, struct_data.null_count());
assert_eq!(&[11_u8], struct_data.nulls().unwrap().validity());
let expected_string_data = ArrayData::builder(DataType::Utf8)
.len(4)
.null_bit_buffer(Some(Buffer::from(&[9_u8])))
.add_buffer(Buffer::from_slice_ref([0, 3, 3, 3, 7]))
.add_buffer(Buffer::from_slice_ref(b"joemark"))
.build()
.unwrap();
let expected_int_data = ArrayData::builder(DataType::Int32)
.len(4)
.null_bit_buffer(Some(Buffer::from_slice_ref([11_u8])))
.add_buffer(Buffer::from_slice_ref([1, 2, 0, 4]))
.build()
.unwrap();
assert_eq!(expected_string_data, struct_data.child_data()[0]);
assert_eq!(expected_int_data, struct_data.child_data()[1]);
}
#[test]
fn test_struct_array_builder_finish() {
let int_builder = Int32Builder::new();
let bool_builder = BooleanBuilder::new();
let fields = vec![
Field::new("f1", DataType::Int32, false),
Field::new("f2", DataType::Boolean, false),
];
let field_builders = vec![
Box::new(int_builder) as Box<dyn ArrayBuilder>,
Box::new(bool_builder) as Box<dyn ArrayBuilder>,
];
let mut builder = StructBuilder::new(fields, field_builders);
builder
.field_builder::<Int32Builder>(0)
.unwrap()
.append_slice(&[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
builder
.field_builder::<BooleanBuilder>(1)
.unwrap()
.append_slice(&[
false, true, false, true, false, true, false, true, false, true,
]);
for _ in 0..10 {
builder.append(true);
}
assert_eq!(10, builder.len());
let arr = builder.finish();
assert_eq!(10, arr.len());
assert_eq!(0, builder.len());
builder
.field_builder::<Int32Builder>(0)
.unwrap()
.append_slice(&[1, 3, 5, 7, 9]);
builder
.field_builder::<BooleanBuilder>(1)
.unwrap()
.append_slice(&[false, true, false, true, false]);
for _ in 0..5 {
builder.append(true);
}
assert_eq!(5, builder.len());
let arr = builder.finish();
assert_eq!(5, arr.len());
assert_eq!(0, builder.len());
}
#[test]
fn test_struct_array_builder_finish_cloned() {
let int_builder = Int32Builder::new();
let bool_builder = BooleanBuilder::new();
let fields = vec![
Field::new("f1", DataType::Int32, false),
Field::new("f2", DataType::Boolean, false),
];
let field_builders = vec![
Box::new(int_builder) as Box<dyn ArrayBuilder>,
Box::new(bool_builder) as Box<dyn ArrayBuilder>,
];
let mut builder = StructBuilder::new(fields, field_builders);
builder
.field_builder::<Int32Builder>(0)
.unwrap()
.append_slice(&[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);
builder
.field_builder::<BooleanBuilder>(1)
.unwrap()
.append_slice(&[
false, true, false, true, false, true, false, true, false, true,
]);
for _ in 0..10 {
builder.append(true);
}
assert_eq!(10, builder.len());
let mut arr = builder.finish_cloned();
assert_eq!(10, arr.len());
assert_eq!(10, builder.len());
builder
.field_builder::<Int32Builder>(0)
.unwrap()
.append_slice(&[1, 3, 5, 7, 9]);
builder
.field_builder::<BooleanBuilder>(1)
.unwrap()
.append_slice(&[false, true, false, true, false]);
for _ in 0..5 {
builder.append(true);
}
assert_eq!(15, builder.len());
arr = builder.finish();
assert_eq!(15, arr.len());
assert_eq!(0, builder.len());
}
#[test]
fn test_struct_array_builder_from_schema() {
let mut fields = vec![
Field::new("f1", DataType::Float32, false),
Field::new("f2", DataType::Utf8, false),
];
let sub_fields = vec![
Field::new("g1", DataType::Int32, false),
Field::new("g2", DataType::Boolean, false),
];
let struct_type = DataType::Struct(sub_fields.into());
fields.push(Field::new("f3", struct_type, false));
let mut builder = StructBuilder::from_fields(fields, 5);
assert_eq!(3, builder.num_fields());
assert!(builder.field_builder::<Float32Builder>(0).is_some());
assert!(builder.field_builder::<StringBuilder>(1).is_some());
assert!(builder.field_builder::<StructBuilder>(2).is_some());
}
#[test]
fn test_datatype_properties() {
let fields = Fields::from(vec![
Field::new("f1", DataType::Decimal128(1, 2), false),
Field::new(
"f2",
DataType::Timestamp(TimeUnit::Millisecond, Some("+00:00".into())),
false,
),
]);
let mut builder = StructBuilder::from_fields(fields.clone(), 1);
builder
.field_builder::<Decimal128Builder>(0)
.unwrap()
.append_value(1);
builder
.field_builder::<TimestampMillisecondBuilder>(1)
.unwrap()
.append_value(1);
builder.append(true);
let array = builder.finish();
assert_eq!(array.data_type(), &DataType::Struct(fields.clone()));
assert_eq!(array.column(0).data_type(), fields[0].data_type());
assert_eq!(array.column(1).data_type(), fields[1].data_type());
}
#[test]
#[should_panic(expected = "Data type Dictionary(Int32, Utf8) is not currently supported")]
fn test_struct_array_builder_from_schema_unsupported_type() {
let fields = vec![
Field::new("f1", DataType::Int16, false),
Field::new(
"f2",
DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
false,
),
];
let _ = StructBuilder::from_fields(fields, 5);
}
#[test]
fn test_struct_array_builder_field_builder_type_mismatch() {
let int_builder = Int32Builder::with_capacity(10);
let fields = vec![Field::new("f1", DataType::Int32, false)];
let field_builders = vec![Box::new(int_builder) as Box<dyn ArrayBuilder>];
let mut builder = StructBuilder::new(fields, field_builders);
assert!(builder.field_builder::<BinaryBuilder>(0).is_none());
}
#[test]
#[should_panic(
expected = "StructBuilder (Schema { fields: [Field { name: \"f1\", data_type: Int32, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }, Field { name: \"f2\", data_type: Boolean, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }], metadata: {} }) and field_builder with index 1 (Boolean) are of unequal lengths: (2 != 1)."
)]
fn test_struct_array_builder_unequal_field_builders_lengths() {
let mut int_builder = Int32Builder::with_capacity(10);
let mut bool_builder = BooleanBuilder::new();
int_builder.append_value(1);
int_builder.append_value(2);
bool_builder.append_value(true);
let fields = vec![
Field::new("f1", DataType::Int32, false),
Field::new("f2", DataType::Boolean, false),
];
let field_builders = vec![
Box::new(int_builder) as Box<dyn ArrayBuilder>,
Box::new(bool_builder) as Box<dyn ArrayBuilder>,
];
let mut builder = StructBuilder::new(fields, field_builders);
builder.append(true);
builder.append(true);
builder.finish();
}
#[test]
#[should_panic(expected = "Number of fields is not equal to the number of field_builders.")]
fn test_struct_array_builder_unequal_field_field_builders() {
let int_builder = Int32Builder::with_capacity(10);
let fields = vec![
Field::new("f1", DataType::Int32, false),
Field::new("f2", DataType::Boolean, false),
];
let field_builders = vec![Box::new(int_builder) as Box<dyn ArrayBuilder>];
let mut builder = StructBuilder::new(fields, field_builders);
builder.finish();
}
#[test]
#[should_panic(
expected = "Incorrect datatype for StructArray field \\\"timestamp\\\", expected Timestamp(Nanosecond, Some(\\\"UTC\\\")) got Timestamp(Nanosecond, None)"
)]
fn test_struct_array_mismatch_builder() {
let fields = vec![Field::new(
"timestamp",
DataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".to_owned().into())),
false,
)];
let field_builders: Vec<Box<dyn ArrayBuilder>> =
vec![Box::new(TimestampNanosecondBuilder::new())];
let mut sa = StructBuilder::new(fields, field_builders);
sa.finish();
}
#[test]
fn test_empty() {
let mut builder = StructBuilder::new(Fields::empty(), vec![]);
builder.append(true);
builder.append(false);
let a1 = builder.finish_cloned();
let a2 = builder.finish();
assert_eq!(a1, a2);
assert_eq!(a1.len(), 2);
assert_eq!(a1.null_count(), 1);
assert!(a1.is_valid(0));
assert!(a1.is_null(1));
}
}