use crate::{null_sentinel, RowConverter, Rows, SortField};
use arrow_array::{Array, GenericListArray, OffsetSizeTrait};
use arrow_buffer::{Buffer, MutableBuffer};
use arrow_data::ArrayDataBuilder;
use arrow_schema::{ArrowError, SortOptions};
use std::ops::Range;
pub fn compute_lengths<O: OffsetSizeTrait>(
lengths: &mut [usize],
rows: &Rows,
array: &GenericListArray<O>,
) {
let offsets = array.value_offsets().windows(2);
lengths
.iter_mut()
.zip(offsets)
.enumerate()
.for_each(|(idx, (length, offsets))| {
let start = offsets[0].as_usize();
let end = offsets[1].as_usize();
let range = array.is_valid(idx).then_some(start..end);
*length += encoded_len(rows, range);
});
}
fn encoded_len(rows: &Rows, range: Option<Range<usize>>) -> usize {
match range {
None => 1,
Some(range) => {
1 + range
.map(|i| super::variable::padded_length(Some(rows.row(i).as_ref().len())))
.sum::<usize>()
}
}
}
pub fn encode<O: OffsetSizeTrait>(
data: &mut [u8],
offsets: &mut [usize],
rows: &Rows,
opts: SortOptions,
array: &GenericListArray<O>,
) {
offsets
.iter_mut()
.skip(1)
.zip(array.value_offsets().windows(2))
.enumerate()
.for_each(|(idx, (offset, offsets))| {
let start = offsets[0].as_usize();
let end = offsets[1].as_usize();
let range = array.is_valid(idx).then_some(start..end);
let out = &mut data[*offset..];
*offset += encode_one(out, rows, range, opts)
});
}
#[inline]
fn encode_one(
out: &mut [u8],
rows: &Rows,
range: Option<Range<usize>>,
opts: SortOptions,
) -> usize {
match range {
None => super::variable::encode_null(out, opts),
Some(range) if range.start == range.end => super::variable::encode_empty(out, opts),
Some(range) => {
let mut offset = 0;
for i in range {
let row = rows.row(i);
offset += super::variable::encode_one(&mut out[offset..], Some(row.data), opts);
}
offset += super::variable::encode_empty(&mut out[offset..], opts);
offset
}
}
}
pub unsafe fn decode<O: OffsetSizeTrait>(
converter: &RowConverter,
rows: &mut [&[u8]],
field: &SortField,
validate_utf8: bool,
) -> Result<GenericListArray<O>, ArrowError> {
let opts = field.options;
let mut values_bytes = 0;
let mut offset = 0;
let mut offsets = Vec::with_capacity(rows.len() + 1);
offsets.push(O::usize_as(0));
for row in rows.iter_mut() {
let mut row_offset = 0;
loop {
let decoded = super::variable::decode_blocks(&row[row_offset..], opts, |x| {
values_bytes += x.len();
});
if decoded <= 1 {
offsets.push(O::usize_as(offset));
break;
}
row_offset += decoded;
offset += 1;
}
}
O::from_usize(offset).expect("overflow");
let mut null_count = 0;
let nulls = MutableBuffer::collect_bool(rows.len(), |x| {
let valid = rows[x][0] != null_sentinel(opts);
null_count += !valid as usize;
valid
});
let mut values_offsets = Vec::with_capacity(offset);
let mut values_bytes = Vec::with_capacity(values_bytes);
for row in rows.iter_mut() {
let mut row_offset = 0;
loop {
let decoded = super::variable::decode_blocks(&row[row_offset..], opts, |x| {
values_bytes.extend_from_slice(x)
});
row_offset += decoded;
if decoded <= 1 {
break;
}
values_offsets.push(values_bytes.len());
}
*row = &row[row_offset..];
}
if opts.descending {
values_bytes.iter_mut().for_each(|o| *o = !*o);
}
let mut last_value_offset = 0;
let mut child_rows: Vec<_> = values_offsets
.into_iter()
.map(|offset| {
let v = &values_bytes[last_value_offset..offset];
last_value_offset = offset;
v
})
.collect();
let child = converter.convert_raw(&mut child_rows, validate_utf8)?;
assert_eq!(child.len(), 1);
let child_data = child[0].to_data();
let builder = ArrayDataBuilder::new(field.data_type.clone())
.len(rows.len())
.null_count(null_count)
.null_bit_buffer(Some(nulls.into()))
.add_buffer(Buffer::from_vec(offsets))
.add_child_data(child_data);
Ok(GenericListArray::from(unsafe { builder.build_unchecked() }))
}