mz_persist_types/part.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131
// Copyright Materialize, Inc. and contributors. All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.
//! A columnar representation of one blob's worth of data
use std::sync::Arc;
use arrow::array::Array;
use arrow::buffer::ScalarBuffer;
use crate::columnar::{ColumnEncoder, Schema2};
use crate::Codec64;
/// A structured columnar representation of one blob's worth of data.
pub struct Part2 {
/// The 'k' values from a Part, generally `SourceData`.
pub key: Arc<dyn Array>,
/// The 'v' values from a Part, generally `()`.
pub val: Arc<dyn Array>,
/// The `ts` values from a Part.
pub time: ScalarBuffer<i64>,
/// The `diff` values from a Part.
pub diff: ScalarBuffer<i64>,
}
/// A builder for [`Part2`].
pub struct PartBuilder2<K, KS: Schema2<K>, V, VS: Schema2<V>> {
key: KS::Encoder,
val: VS::Encoder,
time: Codec64Mut,
diff: Codec64Mut,
}
impl<K, KS: Schema2<K>, V, VS: Schema2<V>> PartBuilder2<K, KS, V, VS> {
/// Returns a new [`PartBuilder2`].
pub fn new(key_schema: &KS, val_schema: &VS) -> Self {
let key = key_schema.encoder().unwrap();
let val = val_schema.encoder().unwrap();
let time = Codec64Mut(Vec::new());
let diff = Codec64Mut(Vec::new());
PartBuilder2 {
key,
val,
time,
diff,
}
}
/// Estimate the size of the part this builder will build.
pub fn goodbytes(&self) -> usize {
self.key.goodbytes() + self.val.goodbytes() + self.time.goodbytes() + self.diff.goodbytes()
}
/// Push a new row onto this [`PartBuilder2`].
pub fn push<T: Codec64, D: Codec64>(&mut self, key: &K, val: &V, t: T, d: D) {
self.key.append(key);
self.val.append(val);
self.time.push(t);
self.diff.push(d);
}
/// Finishes the builder returning a [`Part2`].
pub fn finish(self) -> Part2 {
let PartBuilder2 {
key,
val,
time,
diff,
} = self;
let key_col = key.finish();
let val_col = val.finish();
let time = ScalarBuffer::from(time.0);
let diff = ScalarBuffer::from(diff.0);
Part2 {
key: Arc::new(key_col),
val: Arc::new(val_col),
time,
diff,
}
}
}
/// Mutable access to a column of a [`Codec64`] implementor.
#[derive(Debug)]
pub struct Codec64Mut(Vec<i64>);
impl Codec64Mut {
/// Returns the overall size of the stored data in bytes.
pub fn goodbytes(&self) -> usize {
self.0.len() * size_of::<i64>()
}
/// Returns the length of the column.
pub fn len(&self) -> usize {
self.0.len()
}
/// Pushes the given value into this column.
pub fn push<X: Codec64>(&mut self, val: X) {
self.0.push(i64::from_le_bytes(Codec64::encode(&val)));
}
}
#[cfg(test)]
mod tests {
use std::marker::PhantomData;
use super::*;
use crate::codec_impls::UnitSchema;
// Make sure that the API structs are Sync + Send, so that they can be used in async tasks.
// NOTE: This is a compile-time only test. If it compiles, we're good.
#[allow(unused)]
fn sync_send() {
fn is_send_sync<T: Send + Sync>(_: PhantomData<T>) -> bool {
true
}
assert!(is_send_sync::<Part2>(PhantomData));
assert!(is_send_sync::<PartBuilder2<(), UnitSchema, (), UnitSchema>>(PhantomData));
}
}