1use std::mem;
13use std::sync::Arc;
14
15use arrow::array::{Array, ArrayRef, AsArray, Int64Array};
16use arrow::datatypes::ToByteSlice;
17use mz_ore::result::ResultExt;
18
19use crate::Codec64;
20use crate::arrow::ArrayOrd;
21use crate::columnar::{ColumnDecoder, ColumnEncoder, Schema};
22
23#[derive(Debug, Clone)]
25pub struct Part {
26 pub key: Arc<dyn Array>,
28 pub val: Arc<dyn Array>,
30 pub time: Int64Array,
32 pub diff: Int64Array,
34}
35
36impl Part {
37 pub fn len(&self) -> usize {
39 self.key.len()
40 }
41
42 pub fn goodbytes(&self) -> usize {
44 ArrayOrd::new(&self.key).goodbytes()
45 + ArrayOrd::new(&self.val).goodbytes()
46 + self.time.values().to_byte_slice().len()
47 + self.diff.values().to_byte_slice().len()
48 }
49
50 fn combine(
51 parts: &[Part],
52 mut combine_fn: impl FnMut(&[&dyn Array]) -> anyhow::Result<ArrayRef>,
53 ) -> anyhow::Result<Self> {
54 let mut field_array = Vec::with_capacity(parts.len());
55 let mut combine = |get: fn(&Part) -> &dyn Array| {
56 field_array.extend(parts.iter().map(get));
57 let res = combine_fn(&field_array);
58 field_array.clear();
59 res
60 };
61
62 Ok(Self {
63 key: combine(|p| &p.key)?,
64 val: combine(|p| &p.val)?,
65 time: combine(|p| &p.time)?.as_primitive().clone(),
66 diff: combine(|p| &p.diff)?.as_primitive().clone(),
67 })
68 }
69
70 pub fn concat(parts: &[Part]) -> anyhow::Result<Option<Self>> {
72 match parts.len() {
73 0 => return Ok(None),
74 1 => return Ok(Some(parts[0].clone())),
75 _ => {}
76 }
77 let combined = Part::combine(parts, |cols| ::arrow::compute::concat(cols).err_into())?;
78 Ok(Some(combined))
79 }
80
81 pub fn interleave(parts: &[Part], indices: &[(usize, usize)]) -> anyhow::Result<Self> {
83 Part::combine(parts, |cols| {
84 ::arrow::compute::interleave(cols, indices).err_into()
85 })
86 }
87
88 pub fn decode_iter<
90 'a,
91 K: Default + Clone + 'static,
92 V: Default + Clone + 'static,
93 T: Codec64,
94 D: Codec64,
95 >(
96 &'a self,
97 key_schema: &'a impl Schema<K>,
98 val_schema: &'a impl Schema<V>,
99 ) -> anyhow::Result<impl Iterator<Item = ((K, V), T, D)> + 'a> {
100 let key_decoder = key_schema.decoder_any(&*self.key)?;
101 let val_decoder = val_schema.decoder_any(&*self.val)?;
102 let mut key = K::default();
103 let mut val = V::default();
104 let iter = (0..self.len()).map(move |i| {
105 key_decoder.decode(i, &mut key);
106 val_decoder.decode(i, &mut val);
107 let time = T::decode(self.time.value(i).to_le_bytes());
108 let diff = D::decode(self.diff.value(i).to_le_bytes());
109 ((key.clone(), val.clone()), time, diff)
110 });
111 Ok(iter)
112 }
113}
114
115impl PartialEq for Part {
116 fn eq(&self, other: &Self) -> bool {
117 let Part {
118 key,
119 val,
120 time,
121 diff,
122 } = self;
123 let Part {
124 key: other_key,
125 val: other_val,
126 time: other_time,
127 diff: other_diff,
128 } = other;
129 key == other_key && val == other_val && time == other_time && diff == other_diff
130 }
131}
132
133#[derive(Debug)]
135pub struct PartBuilder<K, KS: Schema<K>, V, VS: Schema<V>> {
136 key: KS::Encoder,
137 val: VS::Encoder,
138 time: Codec64Mut,
139 diff: Codec64Mut,
140}
141
142impl<K, KS: Schema<K>, V, VS: Schema<V>> PartBuilder<K, KS, V, VS> {
143 pub fn new(key_schema: &KS, val_schema: &VS) -> Self {
145 let key = key_schema.encoder().unwrap();
146 let val = val_schema.encoder().unwrap();
147 let time = Codec64Mut(Vec::new());
148 let diff = Codec64Mut(Vec::new());
149
150 PartBuilder {
151 key,
152 val,
153 time,
154 diff,
155 }
156 }
157
158 pub fn goodbytes(&self) -> usize {
160 self.key.goodbytes() + self.val.goodbytes() + self.time.goodbytes() + self.diff.goodbytes()
161 }
162
163 pub fn push<T: Codec64, D: Codec64>(&mut self, key: &K, val: &V, t: T, d: D) {
165 self.key.append(key);
166 self.val.append(val);
167 self.time.push(&t);
168 self.diff.push(&d);
169 }
170
171 pub fn finish(self) -> Part {
173 let PartBuilder {
174 key,
175 val,
176 time,
177 diff,
178 } = self;
179
180 let key_col = key.finish();
181 let val_col = val.finish();
182 let time = Int64Array::from(time.0);
183 let diff = Int64Array::from(diff.0);
184
185 Part {
186 key: Arc::new(key_col),
187 val: Arc::new(val_col),
188 time,
189 diff,
190 }
191 }
192
193 pub fn finish_and_replace(&mut self, key_schema: &KS, val_schema: &VS) -> Part {
195 let builder = mem::replace(self, PartBuilder::new(key_schema, val_schema));
196 builder.finish()
197 }
198}
199
200#[derive(Debug)]
202pub struct Codec64Mut(Vec<i64>);
203
204impl Codec64Mut {
205 pub fn with_capacity(capacity: usize) -> Self {
207 Codec64Mut(Vec::with_capacity(capacity))
208 }
209
210 pub fn goodbytes(&self) -> usize {
212 self.0.len() * size_of::<i64>()
213 }
214
215 pub fn len(&self) -> usize {
217 self.0.len()
218 }
219
220 pub fn push(&mut self, val: &impl Codec64) {
222 self.push_raw(val.encode());
223 }
224
225 pub fn push_raw(&mut self, val: [u8; 8]) {
227 self.0.push(i64::from_le_bytes(val));
228 }
229
230 pub fn finish(self) -> Int64Array {
232 self.0.into()
233 }
234}
235
236#[cfg(test)]
237mod tests {
238 use std::marker::PhantomData;
239
240 use super::*;
241 use crate::codec_impls::UnitSchema;
242
243 #[allow(unused)]
246 fn sync_send() {
247 fn is_send_sync<T: Send + Sync>(_: PhantomData<T>) -> bool {
248 true
249 }
250
251 assert!(is_send_sync::<Part>(PhantomData));
252 assert!(is_send_sync::<PartBuilder<(), UnitSchema, (), UnitSchema>>(
253 PhantomData
254 ));
255 }
256}