1use std::mem;
13use std::sync::Arc;
14
15use arrow::array::{Array, ArrayRef, AsArray, Int64Array};
16use arrow::datatypes::ToByteSlice;
17use mz_ore::result::ResultExt;
18
19use crate::Codec64;
20use crate::arrow::{ArrayIdx, ArrayOrd};
21use crate::columnar::{ColumnDecoder, ColumnEncoder, Schema};
22
23#[derive(Debug, Clone)]
25pub struct Part {
26 pub key: Arc<dyn Array>,
28 pub val: Arc<dyn Array>,
30 pub time: Int64Array,
32 pub diff: Int64Array,
34}
35
36impl Part {
37 pub fn len(&self) -> usize {
39 self.key.len()
40 }
41
42 pub fn goodbytes(&self) -> usize {
44 ArrayOrd::new(&self.key).goodbytes()
45 + ArrayOrd::new(&self.val).goodbytes()
46 + self.time.values().to_byte_slice().len()
47 + self.diff.values().to_byte_slice().len()
48 }
49
50 fn combine(
51 parts: &[Part],
52 mut combine_fn: impl FnMut(&[&dyn Array]) -> anyhow::Result<ArrayRef>,
53 ) -> anyhow::Result<Self> {
54 let mut field_array = Vec::with_capacity(parts.len());
55 let mut combine = |get: fn(&Part) -> &dyn Array| {
56 field_array.extend(parts.iter().map(get));
57 let res = combine_fn(&field_array);
58 field_array.clear();
59 res
60 };
61
62 Ok(Self {
63 key: combine(|p| &p.key)?,
64 val: combine(|p| &p.val)?,
65 time: combine(|p| &p.time)?.as_primitive().clone(),
66 diff: combine(|p| &p.diff)?.as_primitive().clone(),
67 })
68 }
69
70 pub fn concat(parts: &[Part]) -> anyhow::Result<Option<Self>> {
72 match parts.len() {
73 0 => return Ok(None),
74 1 => return Ok(Some(parts[0].clone())),
75 _ => {}
76 }
77 let combined = Part::combine(parts, |cols| ::arrow::compute::concat(cols).err_into())?;
78 Ok(Some(combined))
79 }
80
81 pub fn interleave(parts: &[Part], indices: &[(usize, usize)]) -> anyhow::Result<Self> {
83 Part::combine(parts, |cols| {
84 ::arrow::compute::interleave(cols, indices).err_into()
85 })
86 }
87
88 pub fn decode_iter<
90 'a,
91 K: Default + Clone + 'static,
92 V: Default + Clone + 'static,
93 T: Codec64,
94 D: Codec64,
95 >(
96 &'a self,
97 key_schema: &'a impl Schema<K>,
98 val_schema: &'a impl Schema<V>,
99 ) -> anyhow::Result<impl Iterator<Item = ((K, V), T, D)> + 'a> {
100 let key_decoder = key_schema.decoder_any(&*self.key)?;
101 let val_decoder = val_schema.decoder_any(&*self.val)?;
102 let mut key = K::default();
103 let mut val = V::default();
104 let iter = (0..self.len()).map(move |i| {
105 key_decoder.decode(i, &mut key);
106 val_decoder.decode(i, &mut val);
107 let time = T::decode(self.time.value(i).to_le_bytes());
108 let diff = D::decode(self.diff.value(i).to_le_bytes());
109 ((key.clone(), val.clone()), time, diff)
110 });
111 Ok(iter)
112 }
113
114 pub fn as_ord(&self) -> PartOrd {
116 PartOrd {
117 key: ArrayOrd::new(&*self.key),
118 val: ArrayOrd::new(&*self.val),
119 time: self.time.clone(),
120 diff: self.diff.clone(),
121 }
122 }
123}
124
125#[derive(Debug, Clone)]
127pub struct PartOrd {
128 key: ArrayOrd,
129 val: ArrayOrd,
130 time: Int64Array,
131 diff: Int64Array,
132}
133
134impl PartOrd {
135 pub fn iter(&self) -> impl Iterator<Item = (ArrayIdx<'_>, ArrayIdx<'_>, [u8; 8], [u8; 8])> {
137 (0..self.time.len()).map(move |i| {
138 let key = self.key.at(i);
139 let val = self.val.at(i);
140 let time = self.time.value(i).to_le_bytes();
141 let diff = self.diff.value(i).to_le_bytes();
142 (key, val, time, diff)
143 })
144 }
145}
146
147impl PartialEq for Part {
148 fn eq(&self, other: &Self) -> bool {
149 let Part {
150 key,
151 val,
152 time,
153 diff,
154 } = self;
155 let Part {
156 key: other_key,
157 val: other_val,
158 time: other_time,
159 diff: other_diff,
160 } = other;
161 key == other_key && val == other_val && time == other_time && diff == other_diff
162 }
163}
164
165#[derive(Debug)]
167pub struct PartBuilder<K, KS: Schema<K>, V, VS: Schema<V>> {
168 key: KS::Encoder,
169 val: VS::Encoder,
170 time: Codec64Mut,
171 diff: Codec64Mut,
172}
173
174impl<K, KS: Schema<K>, V, VS: Schema<V>> PartBuilder<K, KS, V, VS> {
175 pub fn new(key_schema: &KS, val_schema: &VS) -> Self {
177 let key = key_schema.encoder().unwrap();
178 let val = val_schema.encoder().unwrap();
179 let time = Codec64Mut(Vec::new());
180 let diff = Codec64Mut(Vec::new());
181
182 PartBuilder {
183 key,
184 val,
185 time,
186 diff,
187 }
188 }
189
190 pub fn goodbytes(&self) -> usize {
192 self.key.goodbytes() + self.val.goodbytes() + self.time.goodbytes() + self.diff.goodbytes()
193 }
194
195 pub fn push<T: Codec64, D: Codec64>(&mut self, key: &K, val: &V, t: T, d: D) {
197 self.key.append(key);
198 self.val.append(val);
199 self.time.push(&t);
200 self.diff.push(&d);
201 }
202
203 pub fn finish(self) -> Part {
205 let PartBuilder {
206 key,
207 val,
208 time,
209 diff,
210 } = self;
211
212 let key_col = key.finish();
213 let val_col = val.finish();
214 let time = Int64Array::from(time.0);
215 let diff = Int64Array::from(diff.0);
216
217 Part {
218 key: Arc::new(key_col),
219 val: Arc::new(val_col),
220 time,
221 diff,
222 }
223 }
224
225 pub fn finish_and_replace(&mut self, key_schema: &KS, val_schema: &VS) -> Part {
227 let builder = mem::replace(self, PartBuilder::new(key_schema, val_schema));
228 builder.finish()
229 }
230}
231
232#[derive(Debug)]
234pub struct Codec64Mut(Vec<i64>);
235
236impl Codec64Mut {
237 pub fn with_capacity(capacity: usize) -> Self {
239 Codec64Mut(Vec::with_capacity(capacity))
240 }
241
242 pub fn goodbytes(&self) -> usize {
244 self.0.len() * size_of::<i64>()
245 }
246
247 pub fn len(&self) -> usize {
249 self.0.len()
250 }
251
252 pub fn push(&mut self, val: &impl Codec64) {
254 self.push_raw(val.encode());
255 }
256
257 pub fn push_raw(&mut self, val: [u8; 8]) {
259 self.0.push(i64::from_le_bytes(val));
260 }
261
262 pub fn finish(self) -> Int64Array {
264 self.0.into()
265 }
266}
267
268#[cfg(test)]
269mod tests {
270 use std::marker::PhantomData;
271
272 use super::*;
273 use crate::codec_impls::UnitSchema;
274
275 #[allow(unused)]
278 fn sync_send() {
279 fn is_send_sync<T: Send + Sync>(_: PhantomData<T>) -> bool {
280 true
281 }
282
283 assert!(is_send_sync::<Part>(PhantomData));
284 assert!(is_send_sync::<PartBuilder<(), UnitSchema, (), UnitSchema>>(
285 PhantomData
286 ));
287 }
288}