use arrow::array::{Array, ArrayRef, AsArray, BinaryArray, Int64Array};
use arrow::buffer::OffsetBuffer;
use arrow::datatypes::{DataType, Int64Type, ToByteSlice};
use bytes::{BufMut, Bytes};
use differential_dataflow::trace::Description;
use itertools::Either;
use mz_ore::bytes::SegmentedBytes;
use mz_ore::cast::CastFrom;
use mz_ore::collections::CollectionExt;
use mz_ore::soft_panic_or_log;
use mz_persist_types::columnar::{codec_to_schema, data_type, schema_to_codec};
use mz_persist_types::parquet::EncodingConfig;
use mz_persist_types::schema::backward_compatible;
use mz_persist_types::{Codec, Codec64};
use mz_proto::{RustType, TryFromProtoError};
use proptest::arbitrary::Arbitrary;
use proptest::prelude::*;
use proptest::strategy::{BoxedStrategy, Just};
use prost::Message;
use serde::Serialize;
use std::fmt::{self, Debug};
use std::sync::Arc;
use timely::progress::{Antichain, Timestamp};
use timely::PartialOrder;
use tracing::error;
use crate::error::Error;
use crate::gen::persist::proto_batch_part_inline::FormatMetadata as ProtoFormatMetadata;
use crate::gen::persist::{
ProtoBatchFormat, ProtoBatchPartInline, ProtoColumnarRecords, ProtoU64Antichain,
ProtoU64Description,
};
use crate::indexed::columnar::arrow::realloc_array;
use crate::indexed::columnar::parquet::{decode_trace_parquet, encode_trace_parquet};
use crate::indexed::columnar::{ColumnarRecords, ColumnarRecordsStructuredExt};
use crate::location::Blob;
use crate::metrics::ColumnarMetrics;
#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize)]
pub enum BatchColumnarFormat {
Row,
Both(usize),
Structured,
}
impl BatchColumnarFormat {
pub const fn default() -> Self {
BatchColumnarFormat::Both(2)
}
pub fn from_str(s: &str) -> Self {
match s {
"row" => BatchColumnarFormat::Row,
"both" => BatchColumnarFormat::Both(0),
"both_v2" => BatchColumnarFormat::Both(2),
"structured" => BatchColumnarFormat::Structured,
x => {
let default = BatchColumnarFormat::default();
soft_panic_or_log!("Invalid batch columnar type: {x}, falling back to {default}");
default
}
}
}
pub const fn as_str(&self) -> &'static str {
match self {
BatchColumnarFormat::Row => "row",
BatchColumnarFormat::Both(0 | 1) => "both",
BatchColumnarFormat::Both(2) => "both_v2",
_ => panic!("unknown batch columnar format"),
}
}
pub const fn is_structured(&self) -> bool {
match self {
BatchColumnarFormat::Row => false,
BatchColumnarFormat::Both(0 | 1) => false,
BatchColumnarFormat::Both(_) => true,
BatchColumnarFormat::Structured => true,
}
}
}
impl fmt::Display for BatchColumnarFormat {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str(self.as_str())
}
}
impl Arbitrary for BatchColumnarFormat {
type Parameters = ();
type Strategy = BoxedStrategy<BatchColumnarFormat>;
fn arbitrary_with(_args: Self::Parameters) -> Self::Strategy {
proptest::strategy::Union::new(vec![
Just(BatchColumnarFormat::Row).boxed(),
Just(BatchColumnarFormat::Both(0)).boxed(),
Just(BatchColumnarFormat::Both(1)).boxed(),
])
.boxed()
}
}
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct TraceBatchMeta {
pub keys: Vec<String>,
pub format: ProtoBatchFormat,
pub desc: Description<u64>,
pub level: u64,
pub size_bytes: u64,
}
#[derive(Clone, Debug, PartialEq)]
pub struct BlobTraceBatchPart<T> {
pub desc: Description<T>,
pub index: u64,
pub updates: BlobTraceUpdates,
}
#[derive(Clone, Debug, PartialEq)]
pub enum BlobTraceUpdates {
Row(ColumnarRecords),
Both(ColumnarRecords, ColumnarRecordsStructuredExt),
Structured {
key_values: ColumnarRecordsStructuredExt,
timestamps: Int64Array,
diffs: Int64Array,
},
}
impl BlobTraceUpdates {
pub fn len(&self) -> usize {
match self {
BlobTraceUpdates::Row(c) => c.len(),
BlobTraceUpdates::Both(c, _structured) => c.len(),
BlobTraceUpdates::Structured { timestamps, .. } => timestamps.len(),
}
}
pub fn timestamps(&self) -> &Int64Array {
match self {
BlobTraceUpdates::Row(c) => c.timestamps(),
BlobTraceUpdates::Both(c, _structured) => c.timestamps(),
BlobTraceUpdates::Structured { timestamps, .. } => timestamps,
}
}
pub fn diffs(&self) -> &Int64Array {
match self {
BlobTraceUpdates::Row(c) => c.diffs(),
BlobTraceUpdates::Both(c, _structured) => c.diffs(),
BlobTraceUpdates::Structured { diffs, .. } => diffs,
}
}
pub fn records(&self) -> Option<&ColumnarRecords> {
match self {
BlobTraceUpdates::Row(c) => Some(c),
BlobTraceUpdates::Both(c, _structured) => Some(c),
BlobTraceUpdates::Structured { .. } => None,
}
}
pub fn structured(&self) -> Option<&ColumnarRecordsStructuredExt> {
match self {
BlobTraceUpdates::Row(_) => None,
BlobTraceUpdates::Both(_, s) => Some(s),
BlobTraceUpdates::Structured { key_values, .. } => Some(key_values),
}
}
pub fn goodbytes(&self) -> usize {
match self {
BlobTraceUpdates::Row(c) => c.goodbytes(),
BlobTraceUpdates::Both(c, _) => c.goodbytes(),
BlobTraceUpdates::Structured {
key_values,
timestamps,
diffs,
} => {
key_values.goodbytes()
+ timestamps.values().to_byte_slice().len()
+ diffs.values().to_byte_slice().len()
}
}
}
pub fn get_or_make_codec<K: Codec, V: Codec>(
&mut self,
key_schema: &K::Schema,
val_schema: &V::Schema,
) -> &ColumnarRecords {
match self {
BlobTraceUpdates::Row(records) => records,
BlobTraceUpdates::Both(records, _) => records,
BlobTraceUpdates::Structured {
key_values,
timestamps,
diffs,
} => {
let key = schema_to_codec::<K>(key_schema, &*key_values.key).expect("valid keys");
let val = schema_to_codec::<V>(val_schema, &*key_values.val).expect("valid values");
let records = ColumnarRecords::new(key, val, timestamps.clone(), diffs.clone());
*self = BlobTraceUpdates::Both(records, key_values.clone());
let BlobTraceUpdates::Both(records, _) = self else {
unreachable!("set to BlobTraceUpdates::Both in previous line")
};
records
}
}
}
pub fn get_or_make_structured<K: Codec, V: Codec>(
&mut self,
key_schema: &K::Schema,
val_schema: &V::Schema,
) -> &ColumnarRecordsStructuredExt {
let structured = match self {
BlobTraceUpdates::Row(records) => {
let key = codec_to_schema::<K>(key_schema, records.keys()).expect("valid keys");
let val = codec_to_schema::<V>(val_schema, records.vals()).expect("valid values");
*self = BlobTraceUpdates::Both(
records.clone(),
ColumnarRecordsStructuredExt { key, val },
);
let BlobTraceUpdates::Both(_, structured) = self else {
unreachable!("set to BlobTraceUpdates::Both in previous line")
};
structured
}
BlobTraceUpdates::Both(_, structured) => structured,
BlobTraceUpdates::Structured { key_values, .. } => key_values,
};
let migrate = |array: &mut ArrayRef, to_type: DataType| {
let from_type = array.data_type().clone();
if from_type != to_type {
if let Some(migration) = backward_compatible(&from_type, &to_type) {
*array = migration.migrate(Arc::clone(array));
} else {
error!(
?from_type,
?to_type,
"failed to migrate array type; backwards-incompatible schema migration?"
);
}
}
};
migrate(
&mut structured.key,
data_type::<K>(key_schema).expect("valid key schema"),
);
migrate(
&mut structured.val,
data_type::<V>(val_schema).expect("valid value schema"),
);
structured
}
pub fn concat<K: Codec, V: Codec>(
mut updates: Vec<BlobTraceUpdates>,
key_schema: &K::Schema,
val_schema: &V::Schema,
metrics: &ColumnarMetrics,
ensure_codec: bool,
) -> anyhow::Result<BlobTraceUpdates> {
match updates.len() {
0 => return Ok(BlobTraceUpdates::Row(ColumnarRecords::default())),
1 => return Ok(updates.into_iter().into_element()),
_ => {}
}
let mut keys = Vec::with_capacity(updates.len());
let mut vals = Vec::with_capacity(updates.len());
for updates in &mut updates {
let structured = updates.get_or_make_structured::<K, V>(key_schema, val_schema);
keys.push(structured.key.as_ref());
vals.push(structured.val.as_ref());
}
let key_values = ColumnarRecordsStructuredExt {
key: ::arrow::compute::concat(&keys)?,
val: ::arrow::compute::concat(&vals)?,
};
let records = if ensure_codec {
let columnar: Vec<_> = updates
.iter_mut()
.map(|u| u.get_or_make_codec::<K, V>(key_schema, val_schema).clone())
.collect();
Either::Left(ColumnarRecords::concat(&columnar, metrics))
} else {
let mut timestamps: Vec<&dyn Array> = Vec::with_capacity(updates.len());
let mut diffs: Vec<&dyn Array> = Vec::with_capacity(updates.len());
for update in &updates {
timestamps.push(update.timestamps());
diffs.push(update.diffs());
}
let timestamps = ::arrow::compute::concat(×tamps)?
.as_primitive_opt::<Int64Type>()
.ok_or_else(|| anyhow::anyhow!("timestamps changed Array type"))?
.clone();
let diffs = ::arrow::compute::concat(&diffs)?
.as_primitive_opt::<Int64Type>()
.ok_or_else(|| anyhow::anyhow!("diffs changed Array type"))?
.clone();
Either::Right((timestamps, diffs))
};
let out = match records {
Either::Left(codec) => Self::Both(codec, key_values),
Either::Right((timestamps, diffs)) => Self::Structured {
key_values,
timestamps,
diffs,
},
};
metrics
.arrow
.concat_bytes
.inc_by(u64::cast_from(out.goodbytes()));
Ok(out)
}
pub fn from_proto(
lgbytes: &ColumnarMetrics,
proto: ProtoColumnarRecords,
) -> Result<Self, TryFromProtoError> {
let binary_array = |data: Bytes, offsets: Vec<i32>| {
if offsets.is_empty() && proto.len > 0 {
return Ok(None);
};
match BinaryArray::try_new(
OffsetBuffer::new(offsets.into()),
::arrow::buffer::Buffer::from_bytes(data.into()),
None,
) {
Ok(data) => Ok(Some(realloc_array(&data, lgbytes))),
Err(e) => Err(TryFromProtoError::InvalidFieldError(format!(
"Unable to decode binary array from repeated proto fields: {e:?}"
))),
}
};
let codec_key = binary_array(proto.key_data, proto.key_offsets)?;
let codec_val = binary_array(proto.val_data, proto.val_offsets)?;
let timestamps = realloc_array(&proto.timestamps.into(), lgbytes);
let diffs = realloc_array(&proto.diffs.into(), lgbytes);
let ext =
ColumnarRecordsStructuredExt::from_proto(proto.key_structured, proto.val_structured)?;
let updates = match (codec_key, codec_val, ext) {
(Some(codec_key), Some(codec_val), Some(ext)) => BlobTraceUpdates::Both(
ColumnarRecords::new(codec_key, codec_val, timestamps, diffs),
ext,
),
(Some(codec_key), Some(codec_val), None) => BlobTraceUpdates::Row(
ColumnarRecords::new(codec_key, codec_val, timestamps, diffs),
),
(None, None, Some(ext)) => BlobTraceUpdates::Structured {
key_values: ext,
timestamps,
diffs,
},
(k, v, ext) => {
return Err(TryFromProtoError::InvalidPersistState(format!(
"unexpected mix of key/value columns: k={:?}, v={}, ext={}",
k.is_some(),
v.is_some(),
ext.is_some(),
)))
}
};
Ok(updates)
}
pub fn into_proto(&self) -> ProtoColumnarRecords {
let (key_offsets, key_data, val_offsets, val_data) = match self.records() {
None => (vec![], Bytes::new(), vec![], Bytes::new()),
Some(records) => (
records.keys().offsets().to_vec(),
Bytes::copy_from_slice(records.keys().value_data()),
records.vals().offsets().to_vec(),
Bytes::copy_from_slice(records.vals().value_data()),
),
};
let (k_struct, v_struct) = match self.structured().map(|x| x.into_proto()) {
None => (None, None),
Some((k, v)) => (Some(k), Some(v)),
};
ProtoColumnarRecords {
len: self.len().into_proto(),
key_offsets,
key_data,
val_offsets,
val_data,
timestamps: self.timestamps().values().to_vec(),
diffs: self.diffs().values().to_vec(),
key_structured: k_struct,
val_structured: v_struct,
}
}
pub fn as_format<K: Codec, V: Codec>(
&self,
format: BatchColumnarFormat,
key_schema: &K::Schema,
val_schema: &V::Schema,
) -> Self {
match format {
BatchColumnarFormat::Row => {
let mut this = self.clone();
Self::Row(
this.get_or_make_codec::<K, V>(key_schema, val_schema)
.clone(),
)
}
BatchColumnarFormat::Both(_) => {
let mut this = self.clone();
Self::Both(
this.get_or_make_codec::<K, V>(key_schema, val_schema)
.clone(),
this.get_or_make_structured::<K, V>(key_schema, val_schema)
.clone(),
)
}
BatchColumnarFormat::Structured => {
let mut this = self.clone();
Self::Structured {
key_values: this
.get_or_make_structured::<K, V>(key_schema, val_schema)
.clone(),
timestamps: this.timestamps().clone(),
diffs: this.diffs().clone(),
}
}
}
}
}
impl TraceBatchMeta {
pub fn validate(&self) -> Result<(), Error> {
if PartialOrder::less_equal(self.desc.upper(), self.desc.lower()) {
return Err(format!("invalid desc: {:?}", &self.desc).into());
}
Ok(())
}
pub async fn validate_data(
&self,
blob: &dyn Blob,
metrics: &ColumnarMetrics,
) -> Result<(), Error> {
let mut batches = vec![];
for (idx, key) in self.keys.iter().enumerate() {
let value = blob
.get(key)
.await?
.ok_or_else(|| Error::from(format!("no blob for trace batch at key: {}", key)))?;
let batch = BlobTraceBatchPart::decode(&value, metrics)?;
if batch.desc != self.desc {
return Err(format!(
"invalid trace batch part desc expected {:?} got {:?}",
&self.desc, &batch.desc
)
.into());
}
if batch.index != u64::cast_from(idx) {
return Err(format!(
"invalid index for blob trace batch part at key {} expected {} got {}",
key, idx, batch.index
)
.into());
}
batch.validate()?;
batches.push(batch);
}
for (batch_idx, batch) in batches.iter().enumerate() {
for (row_idx, diff) in batch.updates.diffs().values().iter().enumerate() {
let diff: u64 = Codec64::decode(diff.to_le_bytes());
if diff == 0 {
return Err(format!(
"update with 0 diff in batch {batch_idx} at row {row_idx}",
)
.into());
}
}
}
Ok(())
}
}
impl<T: Timestamp + Codec64> BlobTraceBatchPart<T> {
pub fn validate(&self) -> Result<(), Error> {
if PartialOrder::less_equal(self.desc.upper(), self.desc.lower()) {
return Err(format!("invalid desc: {:?}", &self.desc).into());
}
let uncompacted = PartialOrder::less_equal(self.desc.since(), self.desc.lower());
for time in self.updates.timestamps().values() {
let ts = T::decode(time.to_le_bytes());
if !self.desc.lower().less_equal(&ts) {
return Err(format!(
"timestamp {:?} is less than the batch lower: {:?}",
ts, self.desc
)
.into());
}
if uncompacted && self.desc.upper().less_equal(&ts) {
return Err(format!(
"timestamp {:?} is greater than or equal to the batch upper: {:?}",
ts, self.desc
)
.into());
}
}
for (row_idx, diff) in self.updates.diffs().values().iter().enumerate() {
let diff: u64 = Codec64::decode(diff.to_le_bytes());
if diff == 0 {
return Err(format!("update with 0 diff at row {row_idx}",).into());
}
}
Ok(())
}
pub fn encode<B>(&self, buf: &mut B, metrics: &ColumnarMetrics, cfg: &EncodingConfig)
where
B: BufMut + Send,
{
encode_trace_parquet(&mut buf.writer(), self, metrics, cfg).expect("batch was invalid");
}
pub fn decode(buf: &SegmentedBytes, metrics: &ColumnarMetrics) -> Result<Self, Error> {
decode_trace_parquet(buf.clone(), metrics)
}
pub fn key_lower(&self) -> &[u8] {
self.updates
.records()
.and_then(|r| r.keys().iter().flatten().min())
.unwrap_or(&[])
}
}
impl<T: Timestamp + Codec64> From<ProtoU64Description> for Description<T> {
fn from(x: ProtoU64Description) -> Self {
Description::new(
x.lower
.map_or_else(|| Antichain::from_elem(T::minimum()), |x| x.into()),
x.upper
.map_or_else(|| Antichain::from_elem(T::minimum()), |x| x.into()),
x.since
.map_or_else(|| Antichain::from_elem(T::minimum()), |x| x.into()),
)
}
}
impl<T: Timestamp + Codec64> From<ProtoU64Antichain> for Antichain<T> {
fn from(x: ProtoU64Antichain) -> Self {
Antichain::from(
x.elements
.into_iter()
.map(|x| T::decode(u64::to_le_bytes(x)))
.collect::<Vec<_>>(),
)
}
}
impl<T: Timestamp + Codec64> From<&Antichain<T>> for ProtoU64Antichain {
fn from(x: &Antichain<T>) -> Self {
ProtoU64Antichain {
elements: x
.elements()
.iter()
.map(|x| u64::from_le_bytes(T::encode(x)))
.collect(),
}
}
}
impl<T: Timestamp + Codec64> From<&Description<T>> for ProtoU64Description {
fn from(x: &Description<T>) -> Self {
ProtoU64Description {
lower: Some(x.lower().into()),
upper: Some(x.upper().into()),
since: Some(x.since().into()),
}
}
}
pub fn encode_trace_inline_meta<T: Timestamp + Codec64>(batch: &BlobTraceBatchPart<T>) -> String {
let (format, format_metadata) = match &batch.updates {
BlobTraceUpdates::Row(_) => (ProtoBatchFormat::ParquetKvtd, None),
BlobTraceUpdates::Both { .. } => {
let metadata = ProtoFormatMetadata::StructuredMigration(2);
(ProtoBatchFormat::ParquetStructured, Some(metadata))
}
BlobTraceUpdates::Structured { .. } => {
let metadata = ProtoFormatMetadata::StructuredMigration(3);
(ProtoBatchFormat::ParquetStructured, Some(metadata))
}
};
let inline = ProtoBatchPartInline {
format: format.into(),
desc: Some((&batch.desc).into()),
index: batch.index,
format_metadata,
};
let inline_encoded = inline.encode_to_vec();
base64::encode(inline_encoded)
}
pub fn decode_trace_inline_meta(
inline_base64: Option<&String>,
) -> Result<(ProtoBatchFormat, ProtoBatchPartInline), Error> {
let inline_base64 = inline_base64.ok_or("missing batch metadata")?;
let inline_encoded = base64::decode(inline_base64).map_err(|err| err.to_string())?;
let inline = ProtoBatchPartInline::decode(&*inline_encoded).map_err(|err| err.to_string())?;
let format = ProtoBatchFormat::try_from(inline.format)
.map_err(|_| Error::from(format!("unknown format: {}", inline.format)))?;
Ok((format, inline))
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use bytes::Bytes;
use crate::error::Error;
use crate::indexed::columnar::ColumnarRecordsBuilder;
use crate::mem::{MemBlob, MemBlobConfig};
use crate::metrics::ColumnarMetrics;
use crate::workload::DataGenerator;
use super::*;
fn update_with_key(ts: u64, key: &'static str) -> ((Vec<u8>, Vec<u8>), u64, i64) {
((key.into(), "".into()), ts, 1)
}
fn u64_desc(lower: u64, upper: u64) -> Description<u64> {
Description::new(
Antichain::from_elem(lower),
Antichain::from_elem(upper),
Antichain::from_elem(0),
)
}
fn batch_meta(lower: u64, upper: u64) -> TraceBatchMeta {
TraceBatchMeta {
keys: vec![],
format: ProtoBatchFormat::Unknown,
desc: u64_desc(lower, upper),
level: 1,
size_bytes: 0,
}
}
fn u64_desc_since(lower: u64, upper: u64, since: u64) -> Description<u64> {
Description::new(
Antichain::from_elem(lower),
Antichain::from_elem(upper),
Antichain::from_elem(since),
)
}
fn columnar_records(updates: Vec<((Vec<u8>, Vec<u8>), u64, i64)>) -> BlobTraceUpdates {
let mut builder = ColumnarRecordsBuilder::default();
for ((k, v), t, d) in updates {
assert!(builder.push(((&k, &v), Codec64::encode(&t), Codec64::encode(&d))));
}
let updates = builder.finish(&ColumnarMetrics::disconnected());
BlobTraceUpdates::Row(updates)
}
#[mz_ore::test]
fn trace_batch_validate() {
let b = BlobTraceBatchPart {
desc: u64_desc(0, 2),
index: 0,
updates: columnar_records(vec![update_with_key(0, "0"), update_with_key(1, "1")]),
};
assert_eq!(b.validate(), Ok(()));
let b = BlobTraceBatchPart {
desc: u64_desc(0, 2),
index: 0,
updates: columnar_records(vec![]),
};
assert_eq!(b.validate(), Ok(()));
let b = BlobTraceBatchPart {
desc: u64_desc(2, 0),
index: 0,
updates: columnar_records(vec![]),
};
assert_eq!(
b.validate(),
Err(Error::from(
"invalid desc: Description { lower: Antichain { elements: [2] }, upper: Antichain { elements: [0] }, since: Antichain { elements: [0] } }"
))
);
let b = BlobTraceBatchPart {
desc: u64_desc(0, 0),
index: 0,
updates: columnar_records(vec![]),
};
assert_eq!(
b.validate(),
Err(Error::from(
"invalid desc: Description { lower: Antichain { elements: [0] }, upper: Antichain { elements: [0] }, since: Antichain { elements: [0] } }"
))
);
let b = BlobTraceBatchPart {
desc: u64_desc(1, 2),
index: 0,
updates: columnar_records(vec![update_with_key(0, "0")]),
};
assert_eq!(b.validate(), Err(Error::from("timestamp 0 is less than the batch lower: Description { lower: Antichain { elements: [1] }, upper: Antichain { elements: [2] }, since: Antichain { elements: [0] } }")));
let b = BlobTraceBatchPart {
desc: u64_desc(1, 2),
index: 0,
updates: columnar_records(vec![update_with_key(2, "0")]),
};
assert_eq!(b.validate(), Err(Error::from("timestamp 2 is greater than or equal to the batch upper: Description { lower: Antichain { elements: [1] }, upper: Antichain { elements: [2] }, since: Antichain { elements: [0] } }")));
let b = BlobTraceBatchPart {
desc: u64_desc_since(1, 2, 4),
index: 0,
updates: columnar_records(vec![update_with_key(2, "0")]),
};
assert_eq!(b.validate(), Ok(()));
let b = BlobTraceBatchPart {
desc: u64_desc_since(1, 2, 4),
index: 0,
updates: columnar_records(vec![update_with_key(4, "0")]),
};
assert_eq!(b.validate(), Ok(()));
let b = BlobTraceBatchPart {
desc: u64_desc_since(1, 2, 4),
index: 0,
updates: columnar_records(vec![update_with_key(5, "0")]),
};
assert_eq!(b.validate(), Ok(()));
let b = BlobTraceBatchPart {
desc: u64_desc(0, 1),
index: 0,
updates: columnar_records(vec![(("0".into(), "0".into()), 0, 0)]),
};
assert_eq!(
b.validate(),
Err(Error::from("update with 0 diff at row 0"))
);
}
#[mz_ore::test]
fn trace_batch_meta_validate() {
let b = batch_meta(0, 1);
assert_eq!(b.validate(), Ok(()));
let b = batch_meta(0, 0);
assert_eq!(b.validate(),
Err(Error::from(
"invalid desc: Description { lower: Antichain { elements: [0] }, upper: Antichain { elements: [0] }, since: Antichain { elements: [0] } }"
)),
);
let b = batch_meta(2, 0);
assert_eq!(b.validate(),
Err(Error::from(
"invalid desc: Description { lower: Antichain { elements: [2] }, upper: Antichain { elements: [0] }, since: Antichain { elements: [0] } }"
)),
);
}
async fn expect_set_trace_batch<T: Timestamp + Codec64>(
blob: &dyn Blob,
key: &str,
batch: &BlobTraceBatchPart<T>,
) -> u64 {
let mut val = Vec::new();
let metrics = ColumnarMetrics::disconnected();
let config = EncodingConfig::default();
batch.encode(&mut val, &metrics, &config);
let val = Bytes::from(val);
let val_len = u64::cast_from(val.len());
blob.set(key, val).await.expect("failed to set trace batch");
val_len
}
#[mz_ore::test(tokio::test)]
#[cfg_attr(miri, ignore)] async fn trace_batch_meta_validate_data() -> Result<(), Error> {
let metrics = ColumnarMetrics::disconnected();
let blob = Arc::new(MemBlob::open(MemBlobConfig::default()));
let format = ProtoBatchFormat::ParquetKvtd;
let batch_desc = u64_desc_since(0, 3, 0);
let batch0 = BlobTraceBatchPart {
desc: batch_desc.clone(),
index: 0,
updates: columnar_records(vec![
(("k".as_bytes().to_vec(), "v".as_bytes().to_vec()), 2, 1),
(("k3".as_bytes().to_vec(), "v3".as_bytes().to_vec()), 2, 1),
]),
};
let batch1 = BlobTraceBatchPart {
desc: batch_desc.clone(),
index: 1,
updates: columnar_records(vec![
(("k4".as_bytes().to_vec(), "v4".as_bytes().to_vec()), 2, 1),
(("k5".as_bytes().to_vec(), "v5".as_bytes().to_vec()), 2, 1),
]),
};
let batch0_size_bytes = expect_set_trace_batch(blob.as_ref(), "b0", &batch0).await;
let batch1_size_bytes = expect_set_trace_batch(blob.as_ref(), "b1", &batch1).await;
let size_bytes = batch0_size_bytes + batch1_size_bytes;
let batch_meta = TraceBatchMeta {
keys: vec!["b0".into(), "b1".into()],
format,
desc: batch_desc.clone(),
level: 0,
size_bytes,
};
assert_eq!(
batch_meta.validate_data(blob.as_ref(), &metrics).await,
Ok(())
);
let batch_meta = TraceBatchMeta {
keys: vec!["b0".into(), "b1".into()],
format,
desc: u64_desc_since(1, 3, 0),
level: 0,
size_bytes,
};
assert_eq!(batch_meta.validate_data(blob.as_ref(), &metrics).await, Err(Error::from("invalid trace batch part desc expected Description { lower: Antichain { elements: [1] }, upper: Antichain { elements: [3] }, since: Antichain { elements: [0] } } got Description { lower: Antichain { elements: [0] }, upper: Antichain { elements: [3] }, since: Antichain { elements: [0] } }")));
let batch_meta = TraceBatchMeta {
keys: vec!["b0".into(), "b1".into(), "b2".into()],
format,
desc: batch_desc.clone(),
level: 0,
size_bytes,
};
assert_eq!(
batch_meta.validate_data(blob.as_ref(), &metrics).await,
Err(Error::from("no blob for trace batch at key: b2"))
);
let batch_meta = TraceBatchMeta {
keys: vec!["b1".into(), "b0".into()],
format,
desc: batch_desc,
level: 0,
size_bytes,
};
assert_eq!(
batch_meta.validate_data(blob.as_ref(), &metrics).await,
Err(Error::from(
"invalid index for blob trace batch part at key b1 expected 0 got 1"
))
);
Ok(())
}
#[mz_ore::test]
#[cfg_attr(miri, ignore)] fn encoded_batch_sizes() {
fn sizes(data: DataGenerator) -> usize {
let metrics = ColumnarMetrics::disconnected();
let config = EncodingConfig::default();
let updates: Vec<_> = data.batches().collect();
let updates = BlobTraceUpdates::Row(ColumnarRecords::concat(&updates, &metrics));
let trace = BlobTraceBatchPart {
desc: Description::new(
Antichain::from_elem(0u64),
Antichain::new(),
Antichain::from_elem(0u64),
),
index: 0,
updates,
};
let mut trace_buf = Vec::new();
trace.encode(&mut trace_buf, &metrics, &config);
trace_buf.len()
}
let record_size_bytes = DataGenerator::default().record_size_bytes;
assert_eq!(
format!(
"1/1={:?} 25/1={:?} 1000/1={:?} 1000/100={:?}",
sizes(DataGenerator::new(1, record_size_bytes, 1)),
sizes(DataGenerator::new(25, record_size_bytes, 25)),
sizes(DataGenerator::new(1_000, record_size_bytes, 1_000)),
sizes(DataGenerator::new(1_000, record_size_bytes, 1_000 / 100)),
),
"1/1=867 25/1=2613 1000/1=72845 1000/100=72845"
);
}
}