1use std::fmt::{self, Debug};
17use std::sync::Arc;
18
19use arrow::array::{Array, ArrayRef, AsArray, BinaryArray, Int64Array};
20use arrow::buffer::OffsetBuffer;
21use arrow::datatypes::{DataType, Int64Type, ToByteSlice};
22use base64::Engine;
23use bytes::{BufMut, Bytes};
24use differential_dataflow::difference::Semigroup;
25use differential_dataflow::trace::Description;
26use mz_ore::bytes::SegmentedBytes;
27use mz_ore::cast::CastFrom;
28use mz_ore::collections::CollectionExt;
29use mz_ore::soft_panic_or_log;
30use mz_persist_types::arrow::{ArrayBound, ArrayOrd};
31use mz_persist_types::columnar::{
32 ColumnEncoder, Schema, codec_to_schema, data_type, schema_to_codec,
33};
34use mz_persist_types::parquet::EncodingConfig;
35use mz_persist_types::part::Part;
36use mz_persist_types::schema::backward_compatible;
37use mz_persist_types::{Codec, Codec64};
38use mz_proto::{RustType, TryFromProtoError};
39use proptest::arbitrary::Arbitrary;
40use proptest::prelude::*;
41use proptest::strategy::{BoxedStrategy, Just};
42use prost::Message;
43use serde::Serialize;
44use timely::PartialOrder;
45use timely::progress::{Antichain, Timestamp};
46use tracing::error;
47
48use crate::error::Error;
49use crate::generated::persist::proto_batch_part_inline::FormatMetadata as ProtoFormatMetadata;
50use crate::generated::persist::{
51 ProtoBatchFormat, ProtoBatchPartInline, ProtoColumnarRecords, ProtoU64Antichain,
52 ProtoU64Description,
53};
54use crate::indexed::columnar::arrow::realloc_array;
55use crate::indexed::columnar::parquet::{decode_trace_parquet, encode_trace_parquet};
56use crate::indexed::columnar::{ColumnarRecords, ColumnarRecordsStructuredExt};
57use crate::location::Blob;
58use crate::metrics::ColumnarMetrics;
59
60#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize)]
62pub enum BatchColumnarFormat {
63 Row,
66 Both(usize),
70 Structured,
73}
74
75impl BatchColumnarFormat {
76 pub const fn default() -> Self {
78 BatchColumnarFormat::Both(2)
79 }
80
81 pub fn from_str(s: &str) -> Self {
84 match s {
85 "row" => BatchColumnarFormat::Row,
86 "both" => BatchColumnarFormat::Both(0),
87 "both_v2" => BatchColumnarFormat::Both(2),
88 "structured" => BatchColumnarFormat::Structured,
89 x => {
90 let default = BatchColumnarFormat::default();
91 soft_panic_or_log!("Invalid batch columnar type: {x}, falling back to {default}");
92 default
93 }
94 }
95 }
96
97 pub const fn as_str(&self) -> &'static str {
99 match self {
100 BatchColumnarFormat::Row => "row",
101 BatchColumnarFormat::Both(0 | 1) => "both",
102 BatchColumnarFormat::Both(2) => "both_v2",
103 _ => panic!("unknown batch columnar format"),
104 }
105 }
106
107 pub const fn is_structured(&self) -> bool {
109 match self {
110 BatchColumnarFormat::Row => false,
111 BatchColumnarFormat::Both(0 | 1) => false,
113 BatchColumnarFormat::Both(_) => true,
114 BatchColumnarFormat::Structured => true,
115 }
116 }
117}
118
119impl fmt::Display for BatchColumnarFormat {
120 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
121 f.write_str(self.as_str())
122 }
123}
124
125impl Arbitrary for BatchColumnarFormat {
126 type Parameters = ();
127 type Strategy = BoxedStrategy<BatchColumnarFormat>;
128
129 fn arbitrary_with(_args: Self::Parameters) -> Self::Strategy {
130 proptest::strategy::Union::new(vec![
131 Just(BatchColumnarFormat::Row).boxed(),
132 Just(BatchColumnarFormat::Both(0)).boxed(),
133 Just(BatchColumnarFormat::Both(1)).boxed(),
134 ])
135 .boxed()
136 }
137}
138
139#[derive(Clone, Debug, Eq, PartialEq)]
148pub struct TraceBatchMeta {
149 pub keys: Vec<String>,
153 pub format: ProtoBatchFormat,
155 pub desc: Description<u64>,
158 pub level: u64,
160 pub size_bytes: u64,
162}
163
164#[derive(Clone, Debug, PartialEq)]
190pub struct BlobTraceBatchPart<T> {
191 pub desc: Description<T>,
196 pub index: u64,
198 pub updates: BlobTraceUpdates,
200}
201
202#[derive(Clone, Debug, PartialEq)]
204pub enum BlobTraceUpdates {
205 Row(ColumnarRecords),
210 Both(ColumnarRecords, ColumnarRecordsStructuredExt),
215 Structured {
218 key_values: ColumnarRecordsStructuredExt,
220 timestamps: Int64Array,
222 diffs: Int64Array,
224 },
225}
226
227impl BlobTraceUpdates {
228 pub fn from_part(part: Part) -> Self {
230 Self::Structured {
231 key_values: ColumnarRecordsStructuredExt {
232 key: part.key,
233 val: part.val,
234 },
235 timestamps: part.time,
236 diffs: part.diff,
237 }
238 }
239
240 pub fn len(&self) -> usize {
242 match self {
243 BlobTraceUpdates::Row(c) => c.len(),
244 BlobTraceUpdates::Both(c, _structured) => c.len(),
245 BlobTraceUpdates::Structured { timestamps, .. } => timestamps.len(),
246 }
247 }
248
249 pub fn timestamps(&self) -> &Int64Array {
251 match self {
252 BlobTraceUpdates::Row(c) => c.timestamps(),
253 BlobTraceUpdates::Both(c, _structured) => c.timestamps(),
254 BlobTraceUpdates::Structured { timestamps, .. } => timestamps,
255 }
256 }
257
258 pub fn diffs(&self) -> &Int64Array {
260 match self {
261 BlobTraceUpdates::Row(c) => c.diffs(),
262 BlobTraceUpdates::Both(c, _structured) => c.diffs(),
263 BlobTraceUpdates::Structured { diffs, .. } => diffs,
264 }
265 }
266
267 pub fn diffs_sum<D: Codec64 + Semigroup>(&self) -> Option<D> {
269 let mut sum = None;
270 for d in self.diffs().values().iter() {
271 let d = D::decode(d.to_le_bytes());
272 match &mut sum {
273 None => sum = Some(d),
274 Some(x) => x.plus_equals(&d),
275 }
276 }
277
278 sum
279 }
280
281 pub fn records(&self) -> Option<&ColumnarRecords> {
283 match self {
284 BlobTraceUpdates::Row(c) => Some(c),
285 BlobTraceUpdates::Both(c, _structured) => Some(c),
286 BlobTraceUpdates::Structured { .. } => None,
287 }
288 }
289
290 pub fn structured(&self) -> Option<&ColumnarRecordsStructuredExt> {
292 match self {
293 BlobTraceUpdates::Row(_) => None,
294 BlobTraceUpdates::Both(_, s) => Some(s),
295 BlobTraceUpdates::Structured { key_values, .. } => Some(key_values),
296 }
297 }
298
299 pub fn goodbytes(&self) -> usize {
301 match self {
302 BlobTraceUpdates::Row(c) => c.goodbytes(),
303 BlobTraceUpdates::Both(c, _) => c.goodbytes(),
307 BlobTraceUpdates::Structured {
308 key_values,
309 timestamps,
310 diffs,
311 } => {
312 key_values.goodbytes()
313 + timestamps.values().to_byte_slice().len()
314 + diffs.values().to_byte_slice().len()
315 }
316 }
317 }
318
319 pub fn get_or_make_codec<K: Codec, V: Codec>(
321 &mut self,
322 key_schema: &K::Schema,
323 val_schema: &V::Schema,
324 ) -> &ColumnarRecords {
325 match self {
326 BlobTraceUpdates::Row(records) => records,
327 BlobTraceUpdates::Both(records, _) => records,
328 BlobTraceUpdates::Structured {
329 key_values,
330 timestamps,
331 diffs,
332 } => {
333 let key = schema_to_codec::<K>(key_schema, &*key_values.key).expect("valid keys");
334 let val = schema_to_codec::<V>(val_schema, &*key_values.val).expect("valid values");
335 let records = ColumnarRecords::new(key, val, timestamps.clone(), diffs.clone());
336
337 *self = BlobTraceUpdates::Both(records, key_values.clone());
338 let BlobTraceUpdates::Both(records, _) = self else {
339 unreachable!("set to BlobTraceUpdates::Both in previous line")
340 };
341 records
342 }
343 }
344 }
345
346 pub fn get_or_make_structured<K: Codec, V: Codec>(
348 &mut self,
349 key_schema: &K::Schema,
350 val_schema: &V::Schema,
351 ) -> &ColumnarRecordsStructuredExt {
352 let structured = match self {
353 BlobTraceUpdates::Row(records) => {
354 let key = codec_to_schema::<K>(key_schema, records.keys()).expect("valid keys");
355 let val = codec_to_schema::<V>(val_schema, records.vals()).expect("valid values");
356
357 *self = BlobTraceUpdates::Both(
358 records.clone(),
359 ColumnarRecordsStructuredExt { key, val },
360 );
361 let BlobTraceUpdates::Both(_, structured) = self else {
362 unreachable!("set to BlobTraceUpdates::Both in previous line")
363 };
364 structured
365 }
366 BlobTraceUpdates::Both(_, structured) => structured,
367 BlobTraceUpdates::Structured { key_values, .. } => key_values,
368 };
369
370 let migrate = |array: &mut ArrayRef, to_type: DataType| {
375 let from_type = array.data_type().clone();
377 if from_type != to_type {
378 if let Some(migration) = backward_compatible(&from_type, &to_type) {
379 *array = migration.migrate(Arc::clone(array));
380 } else {
381 error!(
382 ?from_type,
383 ?to_type,
384 "failed to migrate array type; backwards-incompatible schema migration?"
385 );
386 }
387 }
388 };
389 migrate(
390 &mut structured.key,
391 data_type::<K>(key_schema).expect("valid key schema"),
392 );
393 migrate(
394 &mut structured.val,
395 data_type::<V>(val_schema).expect("valid value schema"),
396 );
397
398 structured
399 }
400
401 pub fn as_part(&self) -> Option<Part> {
403 let ext = self.structured()?.clone();
404 Some(Part {
405 key: ext.key,
406 val: ext.val,
407 time: self.timestamps().clone(),
408 diff: self.diffs().clone(),
409 })
410 }
411
412 pub fn into_part<K: Codec, V: Codec>(
414 &mut self,
415 key_schema: &K::Schema,
416 val_schema: &V::Schema,
417 ) -> Part {
418 let ext = self
419 .get_or_make_structured::<K, V>(key_schema, val_schema)
420 .clone();
421 Part {
422 key: ext.key,
423 val: ext.val,
424 time: self.timestamps().clone(),
425 diff: self.diffs().clone(),
426 }
427 }
428
429 pub fn concat<K: Codec, V: Codec>(
434 mut updates: Vec<BlobTraceUpdates>,
435 key_schema: &K::Schema,
436 val_schema: &V::Schema,
437 metrics: &ColumnarMetrics,
438 ) -> anyhow::Result<BlobTraceUpdates> {
439 match updates.len() {
440 0 => {
441 return Ok(BlobTraceUpdates::Structured {
442 key_values: ColumnarRecordsStructuredExt {
443 key: Arc::new(key_schema.encoder().expect("valid schema").finish()),
444 val: Arc::new(val_schema.encoder().expect("valid schema").finish()),
445 },
446 timestamps: Int64Array::from_iter_values(vec![]),
447 diffs: Int64Array::from_iter_values(vec![]),
448 });
449 }
450 1 => return Ok(updates.into_iter().into_element()),
451 _ => {}
452 }
453
454 let mut keys = Vec::with_capacity(updates.len());
456 let mut vals = Vec::with_capacity(updates.len());
457 for updates in &mut updates {
458 let structured = updates.get_or_make_structured::<K, V>(key_schema, val_schema);
459 keys.push(structured.key.as_ref());
460 vals.push(structured.val.as_ref());
461 }
462 let key_values = ColumnarRecordsStructuredExt {
463 key: ::arrow::compute::concat(&keys)?,
464 val: ::arrow::compute::concat(&vals)?,
465 };
466
467 let mut timestamps: Vec<&dyn Array> = Vec::with_capacity(updates.len());
470 let mut diffs: Vec<&dyn Array> = Vec::with_capacity(updates.len());
471
472 for update in &updates {
473 timestamps.push(update.timestamps());
474 diffs.push(update.diffs());
475 }
476 let timestamps = ::arrow::compute::concat(×tamps)?
477 .as_primitive_opt::<Int64Type>()
478 .ok_or_else(|| anyhow::anyhow!("timestamps changed Array type"))?
479 .clone();
480 let diffs = ::arrow::compute::concat(&diffs)?
481 .as_primitive_opt::<Int64Type>()
482 .ok_or_else(|| anyhow::anyhow!("diffs changed Array type"))?
483 .clone();
484
485 let out = Self::Structured {
486 key_values,
487 timestamps,
488 diffs,
489 };
490 metrics
491 .arrow
492 .concat_bytes
493 .inc_by(u64::cast_from(out.goodbytes()));
494 Ok(out)
495 }
496
497 pub fn from_proto(
499 lgbytes: &ColumnarMetrics,
500 proto: ProtoColumnarRecords,
501 ) -> Result<Self, TryFromProtoError> {
502 let binary_array = |data: Bytes, offsets: Vec<i32>| {
503 if offsets.is_empty() && proto.len > 0 {
504 return Ok(None);
505 };
506 match BinaryArray::try_new(
507 OffsetBuffer::new(offsets.into()),
508 ::arrow::buffer::Buffer::from(data),
509 None,
510 ) {
511 Ok(data) => Ok(Some(realloc_array(&data, lgbytes))),
512 Err(e) => Err(TryFromProtoError::InvalidFieldError(format!(
513 "Unable to decode binary array from repeated proto fields: {e:?}"
514 ))),
515 }
516 };
517
518 let codec_key = binary_array(proto.key_data, proto.key_offsets)?;
519 let codec_val = binary_array(proto.val_data, proto.val_offsets)?;
520
521 let timestamps = realloc_array(&proto.timestamps.into(), lgbytes);
522 let diffs = realloc_array(&proto.diffs.into(), lgbytes);
523 let ext =
524 ColumnarRecordsStructuredExt::from_proto(proto.key_structured, proto.val_structured)?;
525
526 let updates = match (codec_key, codec_val, ext) {
527 (Some(codec_key), Some(codec_val), Some(ext)) => BlobTraceUpdates::Both(
528 ColumnarRecords::new(codec_key, codec_val, timestamps, diffs),
529 ext,
530 ),
531 (Some(codec_key), Some(codec_val), None) => BlobTraceUpdates::Row(
532 ColumnarRecords::new(codec_key, codec_val, timestamps, diffs),
533 ),
534 (None, None, Some(ext)) => BlobTraceUpdates::Structured {
535 key_values: ext,
536 timestamps,
537 diffs,
538 },
539 (k, v, ext) => {
540 return Err(TryFromProtoError::InvalidPersistState(format!(
541 "unexpected mix of key/value columns: k={:?}, v={}, ext={}",
542 k.is_some(),
543 v.is_some(),
544 ext.is_some(),
545 )));
546 }
547 };
548
549 Ok(updates)
550 }
551
552 pub fn into_proto(&self) -> ProtoColumnarRecords {
554 let (key_offsets, key_data, val_offsets, val_data) = match self.records() {
555 None => (vec![], Bytes::new(), vec![], Bytes::new()),
556 Some(records) => (
557 records.keys().offsets().to_vec(),
558 Bytes::copy_from_slice(records.keys().value_data()),
559 records.vals().offsets().to_vec(),
560 Bytes::copy_from_slice(records.vals().value_data()),
561 ),
562 };
563 let (k_struct, v_struct) = match self.structured().map(|x| x.into_proto()) {
564 None => (None, None),
565 Some((k, v)) => (Some(k), Some(v)),
566 };
567
568 ProtoColumnarRecords {
569 len: self.len().into_proto(),
570 key_offsets,
571 key_data,
572 val_offsets,
573 val_data,
574 timestamps: self.timestamps().values().to_vec(),
575 diffs: self.diffs().values().to_vec(),
576 key_structured: k_struct,
577 val_structured: v_struct,
578 }
579 }
580
581 pub fn as_structured<K: Codec, V: Codec>(
584 &self,
585 key_schema: &K::Schema,
586 val_schema: &V::Schema,
587 ) -> Self {
588 let mut this = self.clone();
589 Self::Structured {
590 key_values: this
591 .get_or_make_structured::<K, V>(key_schema, val_schema)
592 .clone(),
593 timestamps: this.timestamps().clone(),
594 diffs: this.diffs().clone(),
595 }
596 }
597}
598
599impl TraceBatchMeta {
600 pub fn validate(&self) -> Result<(), Error> {
603 if PartialOrder::less_equal(self.desc.upper(), self.desc.lower()) {
607 return Err(format!("invalid desc: {:?}", &self.desc).into());
608 }
609
610 Ok(())
611 }
612
613 pub async fn validate_data(
615 &self,
616 blob: &dyn Blob,
617 metrics: &ColumnarMetrics,
618 ) -> Result<(), Error> {
619 let mut batches = vec![];
620 for (idx, key) in self.keys.iter().enumerate() {
621 let value = blob
622 .get(key)
623 .await?
624 .ok_or_else(|| Error::from(format!("no blob for trace batch at key: {}", key)))?;
625 let batch = BlobTraceBatchPart::decode(&value, metrics)?;
626 if batch.desc != self.desc {
627 return Err(format!(
628 "invalid trace batch part desc expected {:?} got {:?}",
629 &self.desc, &batch.desc
630 )
631 .into());
632 }
633
634 if batch.index != u64::cast_from(idx) {
635 return Err(format!(
636 "invalid index for blob trace batch part at key {} expected {} got {}",
637 key, idx, batch.index
638 )
639 .into());
640 }
641
642 batch.validate()?;
643 batches.push(batch);
644 }
645
646 for (batch_idx, batch) in batches.iter().enumerate() {
647 for (row_idx, diff) in batch.updates.diffs().values().iter().enumerate() {
648 let diff: u64 = Codec64::decode(diff.to_le_bytes());
650
651 if diff == 0 {
653 return Err(format!(
654 "update with 0 diff in batch {batch_idx} at row {row_idx}",
655 )
656 .into());
657 }
658 }
659 }
660
661 Ok(())
662 }
663}
664
665impl<T: Timestamp + Codec64> BlobTraceBatchPart<T> {
666 pub fn validate(&self) -> Result<(), Error> {
669 if PartialOrder::less_equal(self.desc.upper(), self.desc.lower()) {
673 return Err(format!("invalid desc: {:?}", &self.desc).into());
674 }
675
676 let uncompacted = PartialOrder::less_equal(self.desc.since(), self.desc.lower());
677
678 for time in self.updates.timestamps().values() {
679 let ts = T::decode(time.to_le_bytes());
680 if !self.desc.lower().less_equal(&ts) {
682 return Err(format!(
683 "timestamp {:?} is less than the batch lower: {:?}",
684 ts, self.desc
685 )
686 .into());
687 }
688
689 if uncompacted && self.desc.upper().less_equal(&ts) {
694 return Err(format!(
695 "timestamp {:?} is greater than or equal to the batch upper: {:?}",
696 ts, self.desc
697 )
698 .into());
699 }
700 }
701
702 for (row_idx, diff) in self.updates.diffs().values().iter().enumerate() {
703 let diff: u64 = Codec64::decode(diff.to_le_bytes());
705
706 if diff == 0 {
708 return Err(format!("update with 0 diff at row {row_idx}",).into());
709 }
710 }
711
712 Ok(())
713 }
714
715 pub fn encode<B>(&self, buf: &mut B, metrics: &ColumnarMetrics, cfg: &EncodingConfig)
717 where
718 B: BufMut + Send,
719 {
720 encode_trace_parquet(&mut buf.writer(), self, metrics, cfg).expect("batch was invalid");
721 }
722
723 pub fn decode(buf: &SegmentedBytes, metrics: &ColumnarMetrics) -> Result<Self, Error> {
725 decode_trace_parquet(buf.clone(), metrics)
726 }
727
728 pub fn key_lower(&self) -> &[u8] {
730 self.updates
731 .records()
732 .and_then(|r| r.keys().iter().flatten().min())
733 .unwrap_or(&[])
734 }
735
736 pub fn structured_key_lower(&self) -> Option<ArrayBound> {
738 self.updates.structured().and_then(|r| {
739 let ord = ArrayOrd::new(&r.key);
740 (0..r.key.len())
741 .min_by_key(|i| ord.at(*i))
742 .map(|i| ArrayBound::new(Arc::clone(&r.key), i))
743 })
744 }
745}
746
747impl<T: Timestamp + Codec64> From<ProtoU64Description> for Description<T> {
748 fn from(x: ProtoU64Description) -> Self {
749 Description::new(
750 x.lower
751 .map_or_else(|| Antichain::from_elem(T::minimum()), |x| x.into()),
752 x.upper
753 .map_or_else(|| Antichain::from_elem(T::minimum()), |x| x.into()),
754 x.since
755 .map_or_else(|| Antichain::from_elem(T::minimum()), |x| x.into()),
756 )
757 }
758}
759
760impl<T: Timestamp + Codec64> From<ProtoU64Antichain> for Antichain<T> {
761 fn from(x: ProtoU64Antichain) -> Self {
762 Antichain::from(
763 x.elements
764 .into_iter()
765 .map(|x| T::decode(u64::to_le_bytes(x)))
766 .collect::<Vec<_>>(),
767 )
768 }
769}
770
771impl<T: Timestamp + Codec64> From<&Antichain<T>> for ProtoU64Antichain {
772 fn from(x: &Antichain<T>) -> Self {
773 ProtoU64Antichain {
774 elements: x
775 .elements()
776 .iter()
777 .map(|x| u64::from_le_bytes(T::encode(x)))
778 .collect(),
779 }
780 }
781}
782
783impl<T: Timestamp + Codec64> From<&Description<T>> for ProtoU64Description {
784 fn from(x: &Description<T>) -> Self {
785 ProtoU64Description {
786 lower: Some(x.lower().into()),
787 upper: Some(x.upper().into()),
788 since: Some(x.since().into()),
789 }
790 }
791}
792
793pub fn encode_trace_inline_meta<T: Timestamp + Codec64>(batch: &BlobTraceBatchPart<T>) -> String {
795 let (format, format_metadata) = match &batch.updates {
796 BlobTraceUpdates::Row(_) => (ProtoBatchFormat::ParquetKvtd, None),
798 BlobTraceUpdates::Both { .. } => {
800 let metadata = ProtoFormatMetadata::StructuredMigration(2);
801 (ProtoBatchFormat::ParquetStructured, Some(metadata))
802 }
803 BlobTraceUpdates::Structured { .. } => {
804 let metadata = ProtoFormatMetadata::StructuredMigration(3);
805 (ProtoBatchFormat::ParquetStructured, Some(metadata))
806 }
807 };
808
809 let inline = ProtoBatchPartInline {
810 format: format.into(),
811 desc: Some((&batch.desc).into()),
812 index: batch.index,
813 format_metadata,
814 };
815 let inline_encoded = inline.encode_to_vec();
816 base64::engine::general_purpose::STANDARD.encode(inline_encoded)
817}
818
819pub fn decode_trace_inline_meta(
821 inline_base64: Option<&String>,
822) -> Result<(ProtoBatchFormat, ProtoBatchPartInline), Error> {
823 let inline_base64 = inline_base64.ok_or("missing batch metadata")?;
824 let inline_encoded = base64::engine::general_purpose::STANDARD
825 .decode(inline_base64)
826 .map_err(|err| err.to_string())?;
827 let inline = ProtoBatchPartInline::decode(&*inline_encoded).map_err(|err| err.to_string())?;
828 let format = ProtoBatchFormat::try_from(inline.format)
829 .map_err(|_| Error::from(format!("unknown format: {}", inline.format)))?;
830 Ok((format, inline))
831}
832
833#[cfg(test)]
834mod tests {
835 use std::sync::Arc;
836
837 use bytes::Bytes;
838
839 use crate::error::Error;
840 use crate::indexed::columnar::ColumnarRecordsBuilder;
841 use crate::mem::{MemBlob, MemBlobConfig};
842 use crate::metrics::ColumnarMetrics;
843 use crate::workload::DataGenerator;
844
845 use super::*;
846
847 fn update_with_key(ts: u64, key: &'static str) -> ((Vec<u8>, Vec<u8>), u64, i64) {
848 ((key.into(), "".into()), ts, 1)
849 }
850
851 fn u64_desc(lower: u64, upper: u64) -> Description<u64> {
852 Description::new(
853 Antichain::from_elem(lower),
854 Antichain::from_elem(upper),
855 Antichain::from_elem(0),
856 )
857 }
858
859 fn batch_meta(lower: u64, upper: u64) -> TraceBatchMeta {
860 TraceBatchMeta {
861 keys: vec![],
862 format: ProtoBatchFormat::Unknown,
863 desc: u64_desc(lower, upper),
864 level: 1,
865 size_bytes: 0,
866 }
867 }
868
869 fn u64_desc_since(lower: u64, upper: u64, since: u64) -> Description<u64> {
870 Description::new(
871 Antichain::from_elem(lower),
872 Antichain::from_elem(upper),
873 Antichain::from_elem(since),
874 )
875 }
876
877 fn columnar_records(updates: Vec<((Vec<u8>, Vec<u8>), u64, i64)>) -> BlobTraceUpdates {
878 let mut builder = ColumnarRecordsBuilder::default();
879 for ((k, v), t, d) in updates {
880 assert!(builder.push(((&k, &v), Codec64::encode(&t), Codec64::encode(&d))));
881 }
882 let updates = builder.finish(&ColumnarMetrics::disconnected());
883 BlobTraceUpdates::Row(updates)
884 }
885
886 #[mz_ore::test]
887 fn trace_batch_validate() {
888 let b = BlobTraceBatchPart {
890 desc: u64_desc(0, 2),
891 index: 0,
892 updates: columnar_records(vec![update_with_key(0, "0"), update_with_key(1, "1")]),
893 };
894 assert_eq!(b.validate(), Ok(()));
895
896 let b = BlobTraceBatchPart {
898 desc: u64_desc(0, 2),
899 index: 0,
900 updates: columnar_records(vec![]),
901 };
902 assert_eq!(b.validate(), Ok(()));
903
904 let b = BlobTraceBatchPart {
906 desc: u64_desc(2, 0),
907 index: 0,
908 updates: columnar_records(vec![]),
909 };
910 assert_eq!(
911 b.validate(),
912 Err(Error::from(
913 "invalid desc: Description { lower: Antichain { elements: [2] }, upper: Antichain { elements: [0] }, since: Antichain { elements: [0] } }"
914 ))
915 );
916
917 let b = BlobTraceBatchPart {
919 desc: u64_desc(0, 0),
920 index: 0,
921 updates: columnar_records(vec![]),
922 };
923 assert_eq!(
924 b.validate(),
925 Err(Error::from(
926 "invalid desc: Description { lower: Antichain { elements: [0] }, upper: Antichain { elements: [0] }, since: Antichain { elements: [0] } }"
927 ))
928 );
929
930 let b = BlobTraceBatchPart {
932 desc: u64_desc(1, 2),
933 index: 0,
934 updates: columnar_records(vec![update_with_key(0, "0")]),
935 };
936 assert_eq!(
937 b.validate(),
938 Err(Error::from(
939 "timestamp 0 is less than the batch lower: Description { lower: Antichain { elements: [1] }, upper: Antichain { elements: [2] }, since: Antichain { elements: [0] } }"
940 ))
941 );
942
943 let b = BlobTraceBatchPart {
945 desc: u64_desc(1, 2),
946 index: 0,
947 updates: columnar_records(vec![update_with_key(2, "0")]),
948 };
949 assert_eq!(
950 b.validate(),
951 Err(Error::from(
952 "timestamp 2 is greater than or equal to the batch upper: Description { lower: Antichain { elements: [1] }, upper: Antichain { elements: [2] }, since: Antichain { elements: [0] } }"
953 ))
954 );
955
956 let b = BlobTraceBatchPart {
958 desc: u64_desc_since(1, 2, 4),
959 index: 0,
960 updates: columnar_records(vec![update_with_key(2, "0")]),
961 };
962 assert_eq!(b.validate(), Ok(()));
963
964 let b = BlobTraceBatchPart {
966 desc: u64_desc_since(1, 2, 4),
967 index: 0,
968 updates: columnar_records(vec![update_with_key(4, "0")]),
969 };
970 assert_eq!(b.validate(), Ok(()));
971
972 let b = BlobTraceBatchPart {
974 desc: u64_desc_since(1, 2, 4),
975 index: 0,
976 updates: columnar_records(vec![update_with_key(5, "0")]),
977 };
978 assert_eq!(b.validate(), Ok(()));
979
980 let b = BlobTraceBatchPart {
982 desc: u64_desc(0, 1),
983 index: 0,
984 updates: columnar_records(vec![(("0".into(), "0".into()), 0, 0)]),
985 };
986 assert_eq!(
987 b.validate(),
988 Err(Error::from("update with 0 diff at row 0"))
989 );
990 }
991
992 #[mz_ore::test]
993 fn trace_batch_meta_validate() {
994 let b = batch_meta(0, 1);
996 assert_eq!(b.validate(), Ok(()));
997
998 let b = batch_meta(0, 0);
1000 assert_eq!(
1001 b.validate(),
1002 Err(Error::from(
1003 "invalid desc: Description { lower: Antichain { elements: [0] }, upper: Antichain { elements: [0] }, since: Antichain { elements: [0] } }"
1004 )),
1005 );
1006
1007 let b = batch_meta(2, 0);
1009 assert_eq!(
1010 b.validate(),
1011 Err(Error::from(
1012 "invalid desc: Description { lower: Antichain { elements: [2] }, upper: Antichain { elements: [0] }, since: Antichain { elements: [0] } }"
1013 )),
1014 );
1015 }
1016
1017 async fn expect_set_trace_batch<T: Timestamp + Codec64>(
1018 blob: &dyn Blob,
1019 key: &str,
1020 batch: &BlobTraceBatchPart<T>,
1021 ) -> u64 {
1022 let mut val = Vec::new();
1023 let metrics = ColumnarMetrics::disconnected();
1024 let config = EncodingConfig::default();
1025 batch.encode(&mut val, &metrics, &config);
1026 let val = Bytes::from(val);
1027 let val_len = u64::cast_from(val.len());
1028 blob.set(key, val).await.expect("failed to set trace batch");
1029 val_len
1030 }
1031
1032 #[mz_ore::test(tokio::test)]
1033 #[cfg_attr(miri, ignore)] async fn trace_batch_meta_validate_data() -> Result<(), Error> {
1035 let metrics = ColumnarMetrics::disconnected();
1036 let blob = Arc::new(MemBlob::open(MemBlobConfig::default()));
1037 let format = ProtoBatchFormat::ParquetKvtd;
1038
1039 let batch_desc = u64_desc_since(0, 3, 0);
1040 let batch0 = BlobTraceBatchPart {
1041 desc: batch_desc.clone(),
1042 index: 0,
1043 updates: columnar_records(vec![
1044 (("k".as_bytes().to_vec(), "v".as_bytes().to_vec()), 2, 1),
1045 (("k3".as_bytes().to_vec(), "v3".as_bytes().to_vec()), 2, 1),
1046 ]),
1047 };
1048 let batch1 = BlobTraceBatchPart {
1049 desc: batch_desc.clone(),
1050 index: 1,
1051 updates: columnar_records(vec![
1052 (("k4".as_bytes().to_vec(), "v4".as_bytes().to_vec()), 2, 1),
1053 (("k5".as_bytes().to_vec(), "v5".as_bytes().to_vec()), 2, 1),
1054 ]),
1055 };
1056
1057 let batch0_size_bytes = expect_set_trace_batch(blob.as_ref(), "b0", &batch0).await;
1058 let batch1_size_bytes = expect_set_trace_batch(blob.as_ref(), "b1", &batch1).await;
1059 let size_bytes = batch0_size_bytes + batch1_size_bytes;
1060 let batch_meta = TraceBatchMeta {
1061 keys: vec!["b0".into(), "b1".into()],
1062 format,
1063 desc: batch_desc.clone(),
1064 level: 0,
1065 size_bytes,
1066 };
1067
1068 assert_eq!(
1070 batch_meta.validate_data(blob.as_ref(), &metrics).await,
1071 Ok(())
1072 );
1073
1074 let batch_meta = TraceBatchMeta {
1076 keys: vec!["b0".into(), "b1".into()],
1077 format,
1078 desc: u64_desc_since(1, 3, 0),
1079 level: 0,
1080 size_bytes,
1081 };
1082 assert_eq!(
1083 batch_meta.validate_data(blob.as_ref(), &metrics).await,
1084 Err(Error::from(
1085 "invalid trace batch part desc expected Description { lower: Antichain { elements: [1] }, upper: Antichain { elements: [3] }, since: Antichain { elements: [0] } } got Description { lower: Antichain { elements: [0] }, upper: Antichain { elements: [3] }, since: Antichain { elements: [0] } }"
1086 ))
1087 );
1088 let batch_meta = TraceBatchMeta {
1090 keys: vec!["b0".into(), "b1".into(), "b2".into()],
1091 format,
1092 desc: batch_desc.clone(),
1093 level: 0,
1094 size_bytes,
1095 };
1096 assert_eq!(
1097 batch_meta.validate_data(blob.as_ref(), &metrics).await,
1098 Err(Error::from("no blob for trace batch at key: b2"))
1099 );
1100 let batch_meta = TraceBatchMeta {
1102 keys: vec!["b1".into(), "b0".into()],
1103 format,
1104 desc: batch_desc,
1105 level: 0,
1106 size_bytes,
1107 };
1108 assert_eq!(
1109 batch_meta.validate_data(blob.as_ref(), &metrics).await,
1110 Err(Error::from(
1111 "invalid index for blob trace batch part at key b1 expected 0 got 1"
1112 ))
1113 );
1114
1115 Ok(())
1116 }
1117
1118 #[mz_ore::test]
1119 #[cfg_attr(miri, ignore)] fn encoded_batch_sizes() {
1121 fn sizes(data: DataGenerator) -> usize {
1122 let metrics = ColumnarMetrics::disconnected();
1123 let config = EncodingConfig::default();
1124 let updates: Vec<_> = data.batches().collect();
1125 let updates = BlobTraceUpdates::Row(ColumnarRecords::concat(&updates, &metrics));
1126 let trace = BlobTraceBatchPart {
1127 desc: Description::new(
1128 Antichain::from_elem(0u64),
1129 Antichain::new(),
1130 Antichain::from_elem(0u64),
1131 ),
1132 index: 0,
1133 updates,
1134 };
1135 let mut trace_buf = Vec::new();
1136 trace.encode(&mut trace_buf, &metrics, &config);
1137 trace_buf.len()
1138 }
1139
1140 let record_size_bytes = DataGenerator::default().record_size_bytes;
1141 assert_eq!(
1144 format!(
1145 "1/1={:?} 25/1={:?} 1000/1={:?} 1000/100={:?}",
1146 sizes(DataGenerator::new(1, record_size_bytes, 1)),
1147 sizes(DataGenerator::new(25, record_size_bytes, 25)),
1148 sizes(DataGenerator::new(1_000, record_size_bytes, 1_000)),
1149 sizes(DataGenerator::new(1_000, record_size_bytes, 1_000 / 100)),
1150 ),
1151 "1/1=867 25/1=2613 1000/1=72845 1000/100=72845"
1152 );
1153 }
1154}