1use crate::interleave::interleave;
19use ahash::RandomState;
20use arrow_array::builder::BooleanBufferBuilder;
21use arrow_array::cast::AsArray;
22use arrow_array::types::{
23 ArrowDictionaryKeyType, BinaryType, ByteArrayType, LargeBinaryType, LargeUtf8Type, Utf8Type,
24};
25use arrow_array::{Array, ArrayRef, DictionaryArray, GenericByteArray};
26use arrow_buffer::{ArrowNativeType, BooleanBuffer, ScalarBuffer};
27use arrow_schema::{ArrowError, DataType};
28
29struct Interner<'a, V> {
34 state: RandomState,
35 buckets: Vec<Option<(&'a [u8], V)>>,
36 shift: u32,
37}
38
39impl<'a, V> Interner<'a, V> {
40 fn new(capacity: usize) -> Self {
45 let shift = (capacity as u64 + 128).leading_zeros();
47 let num_buckets = (u64::MAX >> shift) as usize;
48 let buckets = (0..num_buckets.saturating_add(1)).map(|_| None).collect();
49 Self {
50 state: RandomState::with_seeds(0, 0, 0, 0),
52 buckets,
53 shift,
54 }
55 }
56
57 fn intern<F: FnOnce() -> Result<V, E>, E>(&mut self, new: &'a [u8], f: F) -> Result<&V, E> {
58 let hash = self.state.hash_one(new);
59 let bucket_idx = hash >> self.shift;
60 Ok(match &mut self.buckets[bucket_idx as usize] {
61 Some((current, v)) => {
62 if *current != new {
63 *v = f()?;
64 *current = new;
65 }
66 v
67 }
68 slot => &slot.insert((new, f()?)).1,
69 })
70 }
71}
72
73pub struct MergedDictionaries<K: ArrowDictionaryKeyType> {
74 pub key_mappings: Vec<Vec<K::Native>>,
76 pub values: ArrayRef,
78}
79
80fn bytes_ptr_eq<T: ByteArrayType>(a: &dyn Array, b: &dyn Array) -> bool {
84 match (a.as_bytes_opt::<T>(), b.as_bytes_opt::<T>()) {
85 (Some(a), Some(b)) => {
86 let values_eq = a.values().ptr_eq(b.values()) && a.offsets().ptr_eq(b.offsets());
87 match (a.nulls(), b.nulls()) {
88 (Some(a), Some(b)) => values_eq && a.inner().ptr_eq(b.inner()),
89 (None, None) => values_eq,
90 _ => false,
91 }
92 }
93 _ => false,
94 }
95}
96
97type PtrEq = dyn Fn(&dyn Array, &dyn Array) -> bool;
99
100pub fn should_merge_dictionary_values<K: ArrowDictionaryKeyType>(
106 dictionaries: &[&DictionaryArray<K>],
107 len: usize,
108) -> bool {
109 use DataType::*;
110 let first_values = dictionaries[0].values().as_ref();
111 let ptr_eq: Box<PtrEq> = match first_values.data_type() {
112 Utf8 => Box::new(bytes_ptr_eq::<Utf8Type>),
113 LargeUtf8 => Box::new(bytes_ptr_eq::<LargeUtf8Type>),
114 Binary => Box::new(bytes_ptr_eq::<BinaryType>),
115 LargeBinary => Box::new(bytes_ptr_eq::<LargeBinaryType>),
116 _ => return false,
117 };
118
119 let mut single_dictionary = true;
120 let mut total_values = first_values.len();
121 for dict in dictionaries.iter().skip(1) {
122 let values = dict.values().as_ref();
123 total_values += values.len();
124 if single_dictionary {
125 single_dictionary = ptr_eq(first_values, values)
126 }
127 }
128
129 let overflow = K::Native::from_usize(total_values).is_none();
130 let values_exceed_length = total_values >= len;
131
132 !single_dictionary && (overflow || values_exceed_length)
133}
134
135pub fn merge_dictionary_values<K: ArrowDictionaryKeyType>(
144 dictionaries: &[&DictionaryArray<K>],
145 masks: Option<&[BooleanBuffer]>,
146) -> Result<MergedDictionaries<K>, ArrowError> {
147 let mut num_values = 0;
148
149 let mut values_arrays = Vec::with_capacity(dictionaries.len());
150 let mut value_slices = Vec::with_capacity(dictionaries.len());
151
152 for (idx, dictionary) in dictionaries.iter().enumerate() {
153 let mask = masks.and_then(|m| m.get(idx));
154 let key_mask = match (dictionary.logical_nulls(), mask) {
155 (Some(n), None) => Some(n.into_inner()),
156 (None, Some(n)) => Some(n.clone()),
157 (Some(n), Some(m)) => Some(n.inner() & m),
158 (None, None) => None,
159 };
160 let keys = dictionary.keys().values();
161 let values = dictionary.values().as_ref();
162 let values_mask = compute_values_mask(keys, key_mask.as_ref(), values.len());
163
164 let masked_values = get_masked_values(values, &values_mask);
165 num_values += masked_values.len();
166 value_slices.push(masked_values);
167 values_arrays.push(values)
168 }
169
170 let mut interner = Interner::new(num_values);
172 let mut indices = Vec::with_capacity(num_values);
174
175 let key_mappings = dictionaries
177 .iter()
178 .enumerate()
179 .zip(value_slices)
180 .map(|((dictionary_idx, dictionary), values)| {
181 let zero = K::Native::from_usize(0).unwrap();
182 let mut mapping = vec![zero; dictionary.values().len()];
183
184 for (value_idx, value) in values {
185 mapping[value_idx] =
186 *interner.intern(value, || match K::Native::from_usize(indices.len()) {
187 Some(idx) => {
188 indices.push((dictionary_idx, value_idx));
189 Ok(idx)
190 }
191 None => Err(ArrowError::DictionaryKeyOverflowError),
192 })?;
193 }
194 Ok(mapping)
195 })
196 .collect::<Result<Vec<_>, ArrowError>>()?;
197
198 Ok(MergedDictionaries {
199 key_mappings,
200 values: interleave(&values_arrays, &indices)?,
201 })
202}
203
204fn compute_values_mask<K: ArrowNativeType>(
207 keys: &ScalarBuffer<K>,
208 mask: Option<&BooleanBuffer>,
209 max_key: usize,
210) -> BooleanBuffer {
211 let mut builder = BooleanBufferBuilder::new(max_key);
212 builder.advance(max_key);
213
214 match mask {
215 Some(n) => n
216 .set_indices()
217 .for_each(|idx| builder.set_bit(keys[idx].as_usize(), true)),
218 None => keys
219 .iter()
220 .for_each(|k| builder.set_bit(k.as_usize(), true)),
221 }
222 builder.finish()
223}
224
225fn get_masked_values<'a>(array: &'a dyn Array, mask: &BooleanBuffer) -> Vec<(usize, &'a [u8])> {
227 match array.data_type() {
228 DataType::Utf8 => masked_bytes(array.as_string::<i32>(), mask),
229 DataType::LargeUtf8 => masked_bytes(array.as_string::<i64>(), mask),
230 DataType::Binary => masked_bytes(array.as_binary::<i32>(), mask),
231 DataType::LargeBinary => masked_bytes(array.as_binary::<i64>(), mask),
232 _ => unimplemented!(),
233 }
234}
235
236fn masked_bytes<'a, T: ByteArrayType>(
240 array: &'a GenericByteArray<T>,
241 mask: &BooleanBuffer,
242) -> Vec<(usize, &'a [u8])> {
243 let mut out = Vec::with_capacity(mask.count_set_bits());
244 for idx in mask.set_indices() {
245 out.push((idx, array.value(idx).as_ref()))
246 }
247 out
248}
249
250#[cfg(test)]
251mod tests {
252 use crate::dictionary::merge_dictionary_values;
253 use arrow_array::cast::as_string_array;
254 use arrow_array::types::Int32Type;
255 use arrow_array::{DictionaryArray, Int32Array, StringArray};
256 use arrow_buffer::{BooleanBuffer, Buffer, NullBuffer, OffsetBuffer};
257 use std::sync::Arc;
258
259 #[test]
260 fn test_merge_strings() {
261 let a = DictionaryArray::<Int32Type>::from_iter(["a", "b", "a", "b", "d", "c", "e"]);
262 let b = DictionaryArray::<Int32Type>::from_iter(["c", "f", "c", "d", "a", "d"]);
263 let merged = merge_dictionary_values(&[&a, &b], None).unwrap();
264
265 let values = as_string_array(merged.values.as_ref());
266 let actual: Vec<_> = values.iter().map(Option::unwrap).collect();
267 assert_eq!(&actual, &["a", "b", "d", "c", "e", "f"]);
268
269 assert_eq!(merged.key_mappings.len(), 2);
270 assert_eq!(&merged.key_mappings[0], &[0, 1, 2, 3, 4]);
271 assert_eq!(&merged.key_mappings[1], &[3, 5, 2, 0]);
272
273 let a_slice = a.slice(1, 4);
274 let merged = merge_dictionary_values(&[&a_slice, &b], None).unwrap();
275
276 let values = as_string_array(merged.values.as_ref());
277 let actual: Vec<_> = values.iter().map(Option::unwrap).collect();
278 assert_eq!(&actual, &["a", "b", "d", "c", "f"]);
279
280 assert_eq!(merged.key_mappings.len(), 2);
281 assert_eq!(&merged.key_mappings[0], &[0, 1, 2, 0, 0]);
282 assert_eq!(&merged.key_mappings[1], &[3, 4, 2, 0]);
283
284 let a_mask = BooleanBuffer::from_iter([false, true, false, true, true, false, false]);
286 let b_mask = BooleanBuffer::new_set(b.len());
287 let merged = merge_dictionary_values(&[&a, &b], Some(&[a_mask, b_mask])).unwrap();
288
289 let values = as_string_array(merged.values.as_ref());
290 let actual: Vec<_> = values.iter().map(Option::unwrap).collect();
291 assert_eq!(&actual, &["b", "d", "c", "f", "a"]);
292
293 assert_eq!(merged.key_mappings.len(), 2);
294 assert_eq!(&merged.key_mappings[0], &[0, 0, 1, 0, 0]);
295 assert_eq!(&merged.key_mappings[1], &[2, 3, 1, 4]);
296 }
297
298 #[test]
299 fn test_merge_nulls() {
300 let buffer = Buffer::from(b"helloworldbingohelloworld");
301 let offsets = OffsetBuffer::from_lengths([5, 5, 5, 5, 5]);
302 let nulls = NullBuffer::from(vec![true, false, true, true, true]);
303 let values = StringArray::new(offsets, buffer, Some(nulls));
304
305 let key_values = vec![1, 2, 3, 1, 8, 2, 3];
306 let key_nulls = NullBuffer::from(vec![true, true, false, true, false, true, true]);
307 let keys = Int32Array::new(key_values.into(), Some(key_nulls));
308 let a = DictionaryArray::new(keys, Arc::new(values));
309 let b = DictionaryArray::new(Int32Array::new_null(10), Arc::new(StringArray::new_null(0)));
312
313 let merged = merge_dictionary_values(&[&a, &b], None).unwrap();
314 let expected = StringArray::from(vec!["bingo", "hello"]);
315 assert_eq!(merged.values.as_ref(), &expected);
316 assert_eq!(merged.key_mappings.len(), 2);
317 assert_eq!(&merged.key_mappings[0], &[0, 0, 0, 1, 0]);
318 assert_eq!(&merged.key_mappings[1], &[]);
319 }
320
321 #[test]
322 fn test_merge_keys_smaller() {
323 let values = StringArray::from_iter_values(["a", "b"]);
324 let keys = Int32Array::from_iter_values([1]);
325 let a = DictionaryArray::new(keys, Arc::new(values));
326
327 let merged = merge_dictionary_values(&[&a], None).unwrap();
328 let expected = StringArray::from(vec!["b"]);
329 assert_eq!(merged.values.as_ref(), &expected);
330 }
331}