1use std::fmt::{self, Debug};
17use std::sync::Arc;
18
19use arrow::array::{Array, ArrayRef, AsArray, BinaryArray, Int64Array};
20use arrow::buffer::OffsetBuffer;
21use arrow::datatypes::{DataType, Int64Type, ToByteSlice};
22use bytes::{BufMut, Bytes};
23use differential_dataflow::trace::Description;
24use mz_ore::bytes::SegmentedBytes;
25use mz_ore::cast::CastFrom;
26use mz_ore::collections::CollectionExt;
27use mz_ore::soft_panic_or_log;
28use mz_persist_types::arrow::{ArrayBound, ArrayOrd};
29use mz_persist_types::columnar::{
30 ColumnEncoder, Schema, codec_to_schema, data_type, schema_to_codec,
31};
32use mz_persist_types::parquet::EncodingConfig;
33use mz_persist_types::part::Part;
34use mz_persist_types::schema::backward_compatible;
35use mz_persist_types::{Codec, Codec64};
36use mz_proto::{RustType, TryFromProtoError};
37use proptest::arbitrary::Arbitrary;
38use proptest::prelude::*;
39use proptest::strategy::{BoxedStrategy, Just};
40use prost::Message;
41use serde::Serialize;
42use timely::PartialOrder;
43use timely::progress::{Antichain, Timestamp};
44use tracing::error;
45
46use crate::error::Error;
47use crate::generated::persist::proto_batch_part_inline::FormatMetadata as ProtoFormatMetadata;
48use crate::generated::persist::{
49 ProtoBatchFormat, ProtoBatchPartInline, ProtoColumnarRecords, ProtoU64Antichain,
50 ProtoU64Description,
51};
52use crate::indexed::columnar::arrow::realloc_array;
53use crate::indexed::columnar::parquet::{decode_trace_parquet, encode_trace_parquet};
54use crate::indexed::columnar::{ColumnarRecords, ColumnarRecordsStructuredExt};
55use crate::location::Blob;
56use crate::metrics::ColumnarMetrics;
57
58#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize)]
60pub enum BatchColumnarFormat {
61 Row,
64 Both(usize),
68 Structured,
71}
72
73impl BatchColumnarFormat {
74 pub const fn default() -> Self {
76 BatchColumnarFormat::Both(2)
77 }
78
79 pub fn from_str(s: &str) -> Self {
82 match s {
83 "row" => BatchColumnarFormat::Row,
84 "both" => BatchColumnarFormat::Both(0),
85 "both_v2" => BatchColumnarFormat::Both(2),
86 "structured" => BatchColumnarFormat::Structured,
87 x => {
88 let default = BatchColumnarFormat::default();
89 soft_panic_or_log!("Invalid batch columnar type: {x}, falling back to {default}");
90 default
91 }
92 }
93 }
94
95 pub const fn as_str(&self) -> &'static str {
97 match self {
98 BatchColumnarFormat::Row => "row",
99 BatchColumnarFormat::Both(0 | 1) => "both",
100 BatchColumnarFormat::Both(2) => "both_v2",
101 _ => panic!("unknown batch columnar format"),
102 }
103 }
104
105 pub const fn is_structured(&self) -> bool {
107 match self {
108 BatchColumnarFormat::Row => false,
109 BatchColumnarFormat::Both(0 | 1) => false,
111 BatchColumnarFormat::Both(_) => true,
112 BatchColumnarFormat::Structured => true,
113 }
114 }
115}
116
117impl fmt::Display for BatchColumnarFormat {
118 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
119 f.write_str(self.as_str())
120 }
121}
122
123impl Arbitrary for BatchColumnarFormat {
124 type Parameters = ();
125 type Strategy = BoxedStrategy<BatchColumnarFormat>;
126
127 fn arbitrary_with(_args: Self::Parameters) -> Self::Strategy {
128 proptest::strategy::Union::new(vec![
129 Just(BatchColumnarFormat::Row).boxed(),
130 Just(BatchColumnarFormat::Both(0)).boxed(),
131 Just(BatchColumnarFormat::Both(1)).boxed(),
132 ])
133 .boxed()
134 }
135}
136
137#[derive(Clone, Debug, Eq, PartialEq)]
146pub struct TraceBatchMeta {
147 pub keys: Vec<String>,
151 pub format: ProtoBatchFormat,
153 pub desc: Description<u64>,
156 pub level: u64,
158 pub size_bytes: u64,
160}
161
162#[derive(Clone, Debug, PartialEq)]
188pub struct BlobTraceBatchPart<T> {
189 pub desc: Description<T>,
194 pub index: u64,
196 pub updates: BlobTraceUpdates,
198}
199
200#[derive(Clone, Debug, PartialEq)]
202pub enum BlobTraceUpdates {
203 Row(ColumnarRecords),
208 Both(ColumnarRecords, ColumnarRecordsStructuredExt),
213 Structured {
216 key_values: ColumnarRecordsStructuredExt,
218 timestamps: Int64Array,
220 diffs: Int64Array,
222 },
223}
224
225impl BlobTraceUpdates {
226 pub fn from_part(part: Part) -> Self {
228 Self::Structured {
229 key_values: ColumnarRecordsStructuredExt {
230 key: part.key,
231 val: part.val,
232 },
233 timestamps: part.time,
234 diffs: part.diff,
235 }
236 }
237
238 pub fn len(&self) -> usize {
240 match self {
241 BlobTraceUpdates::Row(c) => c.len(),
242 BlobTraceUpdates::Both(c, _structured) => c.len(),
243 BlobTraceUpdates::Structured { timestamps, .. } => timestamps.len(),
244 }
245 }
246
247 pub fn timestamps(&self) -> &Int64Array {
249 match self {
250 BlobTraceUpdates::Row(c) => c.timestamps(),
251 BlobTraceUpdates::Both(c, _structured) => c.timestamps(),
252 BlobTraceUpdates::Structured { timestamps, .. } => timestamps,
253 }
254 }
255
256 pub fn diffs(&self) -> &Int64Array {
258 match self {
259 BlobTraceUpdates::Row(c) => c.diffs(),
260 BlobTraceUpdates::Both(c, _structured) => c.diffs(),
261 BlobTraceUpdates::Structured { diffs, .. } => diffs,
262 }
263 }
264
265 pub fn records(&self) -> Option<&ColumnarRecords> {
267 match self {
268 BlobTraceUpdates::Row(c) => Some(c),
269 BlobTraceUpdates::Both(c, _structured) => Some(c),
270 BlobTraceUpdates::Structured { .. } => None,
271 }
272 }
273
274 pub fn structured(&self) -> Option<&ColumnarRecordsStructuredExt> {
276 match self {
277 BlobTraceUpdates::Row(_) => None,
278 BlobTraceUpdates::Both(_, s) => Some(s),
279 BlobTraceUpdates::Structured { key_values, .. } => Some(key_values),
280 }
281 }
282
283 pub fn goodbytes(&self) -> usize {
285 match self {
286 BlobTraceUpdates::Row(c) => c.goodbytes(),
287 BlobTraceUpdates::Both(c, _) => c.goodbytes(),
291 BlobTraceUpdates::Structured {
292 key_values,
293 timestamps,
294 diffs,
295 } => {
296 key_values.goodbytes()
297 + timestamps.values().to_byte_slice().len()
298 + diffs.values().to_byte_slice().len()
299 }
300 }
301 }
302
303 pub fn get_or_make_codec<K: Codec, V: Codec>(
305 &mut self,
306 key_schema: &K::Schema,
307 val_schema: &V::Schema,
308 ) -> &ColumnarRecords {
309 match self {
310 BlobTraceUpdates::Row(records) => records,
311 BlobTraceUpdates::Both(records, _) => records,
312 BlobTraceUpdates::Structured {
313 key_values,
314 timestamps,
315 diffs,
316 } => {
317 let key = schema_to_codec::<K>(key_schema, &*key_values.key).expect("valid keys");
318 let val = schema_to_codec::<V>(val_schema, &*key_values.val).expect("valid values");
319 let records = ColumnarRecords::new(key, val, timestamps.clone(), diffs.clone());
320
321 *self = BlobTraceUpdates::Both(records, key_values.clone());
322 let BlobTraceUpdates::Both(records, _) = self else {
323 unreachable!("set to BlobTraceUpdates::Both in previous line")
324 };
325 records
326 }
327 }
328 }
329
330 pub fn get_or_make_structured<K: Codec, V: Codec>(
332 &mut self,
333 key_schema: &K::Schema,
334 val_schema: &V::Schema,
335 ) -> &ColumnarRecordsStructuredExt {
336 let structured = match self {
337 BlobTraceUpdates::Row(records) => {
338 let key = codec_to_schema::<K>(key_schema, records.keys()).expect("valid keys");
339 let val = codec_to_schema::<V>(val_schema, records.vals()).expect("valid values");
340
341 *self = BlobTraceUpdates::Both(
342 records.clone(),
343 ColumnarRecordsStructuredExt { key, val },
344 );
345 let BlobTraceUpdates::Both(_, structured) = self else {
346 unreachable!("set to BlobTraceUpdates::Both in previous line")
347 };
348 structured
349 }
350 BlobTraceUpdates::Both(_, structured) => structured,
351 BlobTraceUpdates::Structured { key_values, .. } => key_values,
352 };
353
354 let migrate = |array: &mut ArrayRef, to_type: DataType| {
359 let from_type = array.data_type().clone();
361 if from_type != to_type {
362 if let Some(migration) = backward_compatible(&from_type, &to_type) {
363 *array = migration.migrate(Arc::clone(array));
364 } else {
365 error!(
366 ?from_type,
367 ?to_type,
368 "failed to migrate array type; backwards-incompatible schema migration?"
369 );
370 }
371 }
372 };
373 migrate(
374 &mut structured.key,
375 data_type::<K>(key_schema).expect("valid key schema"),
376 );
377 migrate(
378 &mut structured.val,
379 data_type::<V>(val_schema).expect("valid value schema"),
380 );
381
382 structured
383 }
384
385 pub fn into_part<K: Codec, V: Codec>(
387 &mut self,
388 key_schema: &K::Schema,
389 val_schema: &V::Schema,
390 ) -> Part {
391 let ext = self
392 .get_or_make_structured::<K, V>(key_schema, val_schema)
393 .clone();
394 Part {
395 key: ext.key,
396 val: ext.val,
397 time: self.timestamps().clone(),
398 diff: self.diffs().clone(),
399 }
400 }
401
402 pub fn concat<K: Codec, V: Codec>(
407 mut updates: Vec<BlobTraceUpdates>,
408 key_schema: &K::Schema,
409 val_schema: &V::Schema,
410 metrics: &ColumnarMetrics,
411 ) -> anyhow::Result<BlobTraceUpdates> {
412 match updates.len() {
413 0 => {
414 return Ok(BlobTraceUpdates::Structured {
415 key_values: ColumnarRecordsStructuredExt {
416 key: Arc::new(key_schema.encoder().expect("valid schema").finish()),
417 val: Arc::new(val_schema.encoder().expect("valid schema").finish()),
418 },
419 timestamps: Int64Array::from_iter_values(vec![]),
420 diffs: Int64Array::from_iter_values(vec![]),
421 });
422 }
423 1 => return Ok(updates.into_iter().into_element()),
424 _ => {}
425 }
426
427 let mut keys = Vec::with_capacity(updates.len());
429 let mut vals = Vec::with_capacity(updates.len());
430 for updates in &mut updates {
431 let structured = updates.get_or_make_structured::<K, V>(key_schema, val_schema);
432 keys.push(structured.key.as_ref());
433 vals.push(structured.val.as_ref());
434 }
435 let key_values = ColumnarRecordsStructuredExt {
436 key: ::arrow::compute::concat(&keys)?,
437 val: ::arrow::compute::concat(&vals)?,
438 };
439
440 let mut timestamps: Vec<&dyn Array> = Vec::with_capacity(updates.len());
443 let mut diffs: Vec<&dyn Array> = Vec::with_capacity(updates.len());
444
445 for update in &updates {
446 timestamps.push(update.timestamps());
447 diffs.push(update.diffs());
448 }
449 let timestamps = ::arrow::compute::concat(×tamps)?
450 .as_primitive_opt::<Int64Type>()
451 .ok_or_else(|| anyhow::anyhow!("timestamps changed Array type"))?
452 .clone();
453 let diffs = ::arrow::compute::concat(&diffs)?
454 .as_primitive_opt::<Int64Type>()
455 .ok_or_else(|| anyhow::anyhow!("diffs changed Array type"))?
456 .clone();
457
458 let out = Self::Structured {
459 key_values,
460 timestamps,
461 diffs,
462 };
463 metrics
464 .arrow
465 .concat_bytes
466 .inc_by(u64::cast_from(out.goodbytes()));
467 Ok(out)
468 }
469
470 pub fn from_proto(
472 lgbytes: &ColumnarMetrics,
473 proto: ProtoColumnarRecords,
474 ) -> Result<Self, TryFromProtoError> {
475 let binary_array = |data: Bytes, offsets: Vec<i32>| {
476 if offsets.is_empty() && proto.len > 0 {
477 return Ok(None);
478 };
479 match BinaryArray::try_new(
480 OffsetBuffer::new(offsets.into()),
481 ::arrow::buffer::Buffer::from_bytes(data.into()),
482 None,
483 ) {
484 Ok(data) => Ok(Some(realloc_array(&data, lgbytes))),
485 Err(e) => Err(TryFromProtoError::InvalidFieldError(format!(
486 "Unable to decode binary array from repeated proto fields: {e:?}"
487 ))),
488 }
489 };
490
491 let codec_key = binary_array(proto.key_data, proto.key_offsets)?;
492 let codec_val = binary_array(proto.val_data, proto.val_offsets)?;
493
494 let timestamps = realloc_array(&proto.timestamps.into(), lgbytes);
495 let diffs = realloc_array(&proto.diffs.into(), lgbytes);
496 let ext =
497 ColumnarRecordsStructuredExt::from_proto(proto.key_structured, proto.val_structured)?;
498
499 let updates = match (codec_key, codec_val, ext) {
500 (Some(codec_key), Some(codec_val), Some(ext)) => BlobTraceUpdates::Both(
501 ColumnarRecords::new(codec_key, codec_val, timestamps, diffs),
502 ext,
503 ),
504 (Some(codec_key), Some(codec_val), None) => BlobTraceUpdates::Row(
505 ColumnarRecords::new(codec_key, codec_val, timestamps, diffs),
506 ),
507 (None, None, Some(ext)) => BlobTraceUpdates::Structured {
508 key_values: ext,
509 timestamps,
510 diffs,
511 },
512 (k, v, ext) => {
513 return Err(TryFromProtoError::InvalidPersistState(format!(
514 "unexpected mix of key/value columns: k={:?}, v={}, ext={}",
515 k.is_some(),
516 v.is_some(),
517 ext.is_some(),
518 )));
519 }
520 };
521
522 Ok(updates)
523 }
524
525 pub fn into_proto(&self) -> ProtoColumnarRecords {
527 let (key_offsets, key_data, val_offsets, val_data) = match self.records() {
528 None => (vec![], Bytes::new(), vec![], Bytes::new()),
529 Some(records) => (
530 records.keys().offsets().to_vec(),
531 Bytes::copy_from_slice(records.keys().value_data()),
532 records.vals().offsets().to_vec(),
533 Bytes::copy_from_slice(records.vals().value_data()),
534 ),
535 };
536 let (k_struct, v_struct) = match self.structured().map(|x| x.into_proto()) {
537 None => (None, None),
538 Some((k, v)) => (Some(k), Some(v)),
539 };
540
541 ProtoColumnarRecords {
542 len: self.len().into_proto(),
543 key_offsets,
544 key_data,
545 val_offsets,
546 val_data,
547 timestamps: self.timestamps().values().to_vec(),
548 diffs: self.diffs().values().to_vec(),
549 key_structured: k_struct,
550 val_structured: v_struct,
551 }
552 }
553
554 pub fn as_structured<K: Codec, V: Codec>(
557 &self,
558 key_schema: &K::Schema,
559 val_schema: &V::Schema,
560 ) -> Self {
561 let mut this = self.clone();
562 Self::Structured {
563 key_values: this
564 .get_or_make_structured::<K, V>(key_schema, val_schema)
565 .clone(),
566 timestamps: this.timestamps().clone(),
567 diffs: this.diffs().clone(),
568 }
569 }
570}
571
572impl TraceBatchMeta {
573 pub fn validate(&self) -> Result<(), Error> {
576 if PartialOrder::less_equal(self.desc.upper(), self.desc.lower()) {
580 return Err(format!("invalid desc: {:?}", &self.desc).into());
581 }
582
583 Ok(())
584 }
585
586 pub async fn validate_data(
588 &self,
589 blob: &dyn Blob,
590 metrics: &ColumnarMetrics,
591 ) -> Result<(), Error> {
592 let mut batches = vec![];
593 for (idx, key) in self.keys.iter().enumerate() {
594 let value = blob
595 .get(key)
596 .await?
597 .ok_or_else(|| Error::from(format!("no blob for trace batch at key: {}", key)))?;
598 let batch = BlobTraceBatchPart::decode(&value, metrics)?;
599 if batch.desc != self.desc {
600 return Err(format!(
601 "invalid trace batch part desc expected {:?} got {:?}",
602 &self.desc, &batch.desc
603 )
604 .into());
605 }
606
607 if batch.index != u64::cast_from(idx) {
608 return Err(format!(
609 "invalid index for blob trace batch part at key {} expected {} got {}",
610 key, idx, batch.index
611 )
612 .into());
613 }
614
615 batch.validate()?;
616 batches.push(batch);
617 }
618
619 for (batch_idx, batch) in batches.iter().enumerate() {
620 for (row_idx, diff) in batch.updates.diffs().values().iter().enumerate() {
621 let diff: u64 = Codec64::decode(diff.to_le_bytes());
623
624 if diff == 0 {
626 return Err(format!(
627 "update with 0 diff in batch {batch_idx} at row {row_idx}",
628 )
629 .into());
630 }
631 }
632 }
633
634 Ok(())
635 }
636}
637
638impl<T: Timestamp + Codec64> BlobTraceBatchPart<T> {
639 pub fn validate(&self) -> Result<(), Error> {
642 if PartialOrder::less_equal(self.desc.upper(), self.desc.lower()) {
646 return Err(format!("invalid desc: {:?}", &self.desc).into());
647 }
648
649 let uncompacted = PartialOrder::less_equal(self.desc.since(), self.desc.lower());
650
651 for time in self.updates.timestamps().values() {
652 let ts = T::decode(time.to_le_bytes());
653 if !self.desc.lower().less_equal(&ts) {
655 return Err(format!(
656 "timestamp {:?} is less than the batch lower: {:?}",
657 ts, self.desc
658 )
659 .into());
660 }
661
662 if uncompacted && self.desc.upper().less_equal(&ts) {
667 return Err(format!(
668 "timestamp {:?} is greater than or equal to the batch upper: {:?}",
669 ts, self.desc
670 )
671 .into());
672 }
673 }
674
675 for (row_idx, diff) in self.updates.diffs().values().iter().enumerate() {
676 let diff: u64 = Codec64::decode(diff.to_le_bytes());
678
679 if diff == 0 {
681 return Err(format!("update with 0 diff at row {row_idx}",).into());
682 }
683 }
684
685 Ok(())
686 }
687
688 pub fn encode<B>(&self, buf: &mut B, metrics: &ColumnarMetrics, cfg: &EncodingConfig)
690 where
691 B: BufMut + Send,
692 {
693 encode_trace_parquet(&mut buf.writer(), self, metrics, cfg).expect("batch was invalid");
694 }
695
696 pub fn decode(buf: &SegmentedBytes, metrics: &ColumnarMetrics) -> Result<Self, Error> {
698 decode_trace_parquet(buf.clone(), metrics)
699 }
700
701 pub fn key_lower(&self) -> &[u8] {
703 self.updates
704 .records()
705 .and_then(|r| r.keys().iter().flatten().min())
706 .unwrap_or(&[])
707 }
708
709 pub fn structured_key_lower(&self) -> Option<ArrayBound> {
711 self.updates.structured().and_then(|r| {
712 let ord = ArrayOrd::new(&r.key);
713 (0..r.key.len())
714 .min_by_key(|i| ord.at(*i))
715 .map(|i| ArrayBound::new(Arc::clone(&r.key), i))
716 })
717 }
718}
719
720impl<T: Timestamp + Codec64> From<ProtoU64Description> for Description<T> {
721 fn from(x: ProtoU64Description) -> Self {
722 Description::new(
723 x.lower
724 .map_or_else(|| Antichain::from_elem(T::minimum()), |x| x.into()),
725 x.upper
726 .map_or_else(|| Antichain::from_elem(T::minimum()), |x| x.into()),
727 x.since
728 .map_or_else(|| Antichain::from_elem(T::minimum()), |x| x.into()),
729 )
730 }
731}
732
733impl<T: Timestamp + Codec64> From<ProtoU64Antichain> for Antichain<T> {
734 fn from(x: ProtoU64Antichain) -> Self {
735 Antichain::from(
736 x.elements
737 .into_iter()
738 .map(|x| T::decode(u64::to_le_bytes(x)))
739 .collect::<Vec<_>>(),
740 )
741 }
742}
743
744impl<T: Timestamp + Codec64> From<&Antichain<T>> for ProtoU64Antichain {
745 fn from(x: &Antichain<T>) -> Self {
746 ProtoU64Antichain {
747 elements: x
748 .elements()
749 .iter()
750 .map(|x| u64::from_le_bytes(T::encode(x)))
751 .collect(),
752 }
753 }
754}
755
756impl<T: Timestamp + Codec64> From<&Description<T>> for ProtoU64Description {
757 fn from(x: &Description<T>) -> Self {
758 ProtoU64Description {
759 lower: Some(x.lower().into()),
760 upper: Some(x.upper().into()),
761 since: Some(x.since().into()),
762 }
763 }
764}
765
766pub fn encode_trace_inline_meta<T: Timestamp + Codec64>(batch: &BlobTraceBatchPart<T>) -> String {
768 let (format, format_metadata) = match &batch.updates {
769 BlobTraceUpdates::Row(_) => (ProtoBatchFormat::ParquetKvtd, None),
771 BlobTraceUpdates::Both { .. } => {
773 let metadata = ProtoFormatMetadata::StructuredMigration(2);
774 (ProtoBatchFormat::ParquetStructured, Some(metadata))
775 }
776 BlobTraceUpdates::Structured { .. } => {
777 let metadata = ProtoFormatMetadata::StructuredMigration(3);
778 (ProtoBatchFormat::ParquetStructured, Some(metadata))
779 }
780 };
781
782 let inline = ProtoBatchPartInline {
783 format: format.into(),
784 desc: Some((&batch.desc).into()),
785 index: batch.index,
786 format_metadata,
787 };
788 let inline_encoded = inline.encode_to_vec();
789 base64::encode(inline_encoded)
790}
791
792pub fn decode_trace_inline_meta(
794 inline_base64: Option<&String>,
795) -> Result<(ProtoBatchFormat, ProtoBatchPartInline), Error> {
796 let inline_base64 = inline_base64.ok_or("missing batch metadata")?;
797 let inline_encoded = base64::decode(inline_base64).map_err(|err| err.to_string())?;
798 let inline = ProtoBatchPartInline::decode(&*inline_encoded).map_err(|err| err.to_string())?;
799 let format = ProtoBatchFormat::try_from(inline.format)
800 .map_err(|_| Error::from(format!("unknown format: {}", inline.format)))?;
801 Ok((format, inline))
802}
803
804#[cfg(test)]
805mod tests {
806 use std::sync::Arc;
807
808 use bytes::Bytes;
809
810 use crate::error::Error;
811 use crate::indexed::columnar::ColumnarRecordsBuilder;
812 use crate::mem::{MemBlob, MemBlobConfig};
813 use crate::metrics::ColumnarMetrics;
814 use crate::workload::DataGenerator;
815
816 use super::*;
817
818 fn update_with_key(ts: u64, key: &'static str) -> ((Vec<u8>, Vec<u8>), u64, i64) {
819 ((key.into(), "".into()), ts, 1)
820 }
821
822 fn u64_desc(lower: u64, upper: u64) -> Description<u64> {
823 Description::new(
824 Antichain::from_elem(lower),
825 Antichain::from_elem(upper),
826 Antichain::from_elem(0),
827 )
828 }
829
830 fn batch_meta(lower: u64, upper: u64) -> TraceBatchMeta {
831 TraceBatchMeta {
832 keys: vec![],
833 format: ProtoBatchFormat::Unknown,
834 desc: u64_desc(lower, upper),
835 level: 1,
836 size_bytes: 0,
837 }
838 }
839
840 fn u64_desc_since(lower: u64, upper: u64, since: u64) -> Description<u64> {
841 Description::new(
842 Antichain::from_elem(lower),
843 Antichain::from_elem(upper),
844 Antichain::from_elem(since),
845 )
846 }
847
848 fn columnar_records(updates: Vec<((Vec<u8>, Vec<u8>), u64, i64)>) -> BlobTraceUpdates {
849 let mut builder = ColumnarRecordsBuilder::default();
850 for ((k, v), t, d) in updates {
851 assert!(builder.push(((&k, &v), Codec64::encode(&t), Codec64::encode(&d))));
852 }
853 let updates = builder.finish(&ColumnarMetrics::disconnected());
854 BlobTraceUpdates::Row(updates)
855 }
856
857 #[mz_ore::test]
858 fn trace_batch_validate() {
859 let b = BlobTraceBatchPart {
861 desc: u64_desc(0, 2),
862 index: 0,
863 updates: columnar_records(vec![update_with_key(0, "0"), update_with_key(1, "1")]),
864 };
865 assert_eq!(b.validate(), Ok(()));
866
867 let b = BlobTraceBatchPart {
869 desc: u64_desc(0, 2),
870 index: 0,
871 updates: columnar_records(vec![]),
872 };
873 assert_eq!(b.validate(), Ok(()));
874
875 let b = BlobTraceBatchPart {
877 desc: u64_desc(2, 0),
878 index: 0,
879 updates: columnar_records(vec![]),
880 };
881 assert_eq!(
882 b.validate(),
883 Err(Error::from(
884 "invalid desc: Description { lower: Antichain { elements: [2] }, upper: Antichain { elements: [0] }, since: Antichain { elements: [0] } }"
885 ))
886 );
887
888 let b = BlobTraceBatchPart {
890 desc: u64_desc(0, 0),
891 index: 0,
892 updates: columnar_records(vec![]),
893 };
894 assert_eq!(
895 b.validate(),
896 Err(Error::from(
897 "invalid desc: Description { lower: Antichain { elements: [0] }, upper: Antichain { elements: [0] }, since: Antichain { elements: [0] } }"
898 ))
899 );
900
901 let b = BlobTraceBatchPart {
903 desc: u64_desc(1, 2),
904 index: 0,
905 updates: columnar_records(vec![update_with_key(0, "0")]),
906 };
907 assert_eq!(
908 b.validate(),
909 Err(Error::from(
910 "timestamp 0 is less than the batch lower: Description { lower: Antichain { elements: [1] }, upper: Antichain { elements: [2] }, since: Antichain { elements: [0] } }"
911 ))
912 );
913
914 let b = BlobTraceBatchPart {
916 desc: u64_desc(1, 2),
917 index: 0,
918 updates: columnar_records(vec![update_with_key(2, "0")]),
919 };
920 assert_eq!(
921 b.validate(),
922 Err(Error::from(
923 "timestamp 2 is greater than or equal to the batch upper: Description { lower: Antichain { elements: [1] }, upper: Antichain { elements: [2] }, since: Antichain { elements: [0] } }"
924 ))
925 );
926
927 let b = BlobTraceBatchPart {
929 desc: u64_desc_since(1, 2, 4),
930 index: 0,
931 updates: columnar_records(vec![update_with_key(2, "0")]),
932 };
933 assert_eq!(b.validate(), Ok(()));
934
935 let b = BlobTraceBatchPart {
937 desc: u64_desc_since(1, 2, 4),
938 index: 0,
939 updates: columnar_records(vec![update_with_key(4, "0")]),
940 };
941 assert_eq!(b.validate(), Ok(()));
942
943 let b = BlobTraceBatchPart {
945 desc: u64_desc_since(1, 2, 4),
946 index: 0,
947 updates: columnar_records(vec![update_with_key(5, "0")]),
948 };
949 assert_eq!(b.validate(), Ok(()));
950
951 let b = BlobTraceBatchPart {
953 desc: u64_desc(0, 1),
954 index: 0,
955 updates: columnar_records(vec![(("0".into(), "0".into()), 0, 0)]),
956 };
957 assert_eq!(
958 b.validate(),
959 Err(Error::from("update with 0 diff at row 0"))
960 );
961 }
962
963 #[mz_ore::test]
964 fn trace_batch_meta_validate() {
965 let b = batch_meta(0, 1);
967 assert_eq!(b.validate(), Ok(()));
968
969 let b = batch_meta(0, 0);
971 assert_eq!(
972 b.validate(),
973 Err(Error::from(
974 "invalid desc: Description { lower: Antichain { elements: [0] }, upper: Antichain { elements: [0] }, since: Antichain { elements: [0] } }"
975 )),
976 );
977
978 let b = batch_meta(2, 0);
980 assert_eq!(
981 b.validate(),
982 Err(Error::from(
983 "invalid desc: Description { lower: Antichain { elements: [2] }, upper: Antichain { elements: [0] }, since: Antichain { elements: [0] } }"
984 )),
985 );
986 }
987
988 async fn expect_set_trace_batch<T: Timestamp + Codec64>(
989 blob: &dyn Blob,
990 key: &str,
991 batch: &BlobTraceBatchPart<T>,
992 ) -> u64 {
993 let mut val = Vec::new();
994 let metrics = ColumnarMetrics::disconnected();
995 let config = EncodingConfig::default();
996 batch.encode(&mut val, &metrics, &config);
997 let val = Bytes::from(val);
998 let val_len = u64::cast_from(val.len());
999 blob.set(key, val).await.expect("failed to set trace batch");
1000 val_len
1001 }
1002
1003 #[mz_ore::test(tokio::test)]
1004 #[cfg_attr(miri, ignore)] async fn trace_batch_meta_validate_data() -> Result<(), Error> {
1006 let metrics = ColumnarMetrics::disconnected();
1007 let blob = Arc::new(MemBlob::open(MemBlobConfig::default()));
1008 let format = ProtoBatchFormat::ParquetKvtd;
1009
1010 let batch_desc = u64_desc_since(0, 3, 0);
1011 let batch0 = BlobTraceBatchPart {
1012 desc: batch_desc.clone(),
1013 index: 0,
1014 updates: columnar_records(vec![
1015 (("k".as_bytes().to_vec(), "v".as_bytes().to_vec()), 2, 1),
1016 (("k3".as_bytes().to_vec(), "v3".as_bytes().to_vec()), 2, 1),
1017 ]),
1018 };
1019 let batch1 = BlobTraceBatchPart {
1020 desc: batch_desc.clone(),
1021 index: 1,
1022 updates: columnar_records(vec![
1023 (("k4".as_bytes().to_vec(), "v4".as_bytes().to_vec()), 2, 1),
1024 (("k5".as_bytes().to_vec(), "v5".as_bytes().to_vec()), 2, 1),
1025 ]),
1026 };
1027
1028 let batch0_size_bytes = expect_set_trace_batch(blob.as_ref(), "b0", &batch0).await;
1029 let batch1_size_bytes = expect_set_trace_batch(blob.as_ref(), "b1", &batch1).await;
1030 let size_bytes = batch0_size_bytes + batch1_size_bytes;
1031 let batch_meta = TraceBatchMeta {
1032 keys: vec!["b0".into(), "b1".into()],
1033 format,
1034 desc: batch_desc.clone(),
1035 level: 0,
1036 size_bytes,
1037 };
1038
1039 assert_eq!(
1041 batch_meta.validate_data(blob.as_ref(), &metrics).await,
1042 Ok(())
1043 );
1044
1045 let batch_meta = TraceBatchMeta {
1047 keys: vec!["b0".into(), "b1".into()],
1048 format,
1049 desc: u64_desc_since(1, 3, 0),
1050 level: 0,
1051 size_bytes,
1052 };
1053 assert_eq!(
1054 batch_meta.validate_data(blob.as_ref(), &metrics).await,
1055 Err(Error::from(
1056 "invalid trace batch part desc expected Description { lower: Antichain { elements: [1] }, upper: Antichain { elements: [3] }, since: Antichain { elements: [0] } } got Description { lower: Antichain { elements: [0] }, upper: Antichain { elements: [3] }, since: Antichain { elements: [0] } }"
1057 ))
1058 );
1059 let batch_meta = TraceBatchMeta {
1061 keys: vec!["b0".into(), "b1".into(), "b2".into()],
1062 format,
1063 desc: batch_desc.clone(),
1064 level: 0,
1065 size_bytes,
1066 };
1067 assert_eq!(
1068 batch_meta.validate_data(blob.as_ref(), &metrics).await,
1069 Err(Error::from("no blob for trace batch at key: b2"))
1070 );
1071 let batch_meta = TraceBatchMeta {
1073 keys: vec!["b1".into(), "b0".into()],
1074 format,
1075 desc: batch_desc,
1076 level: 0,
1077 size_bytes,
1078 };
1079 assert_eq!(
1080 batch_meta.validate_data(blob.as_ref(), &metrics).await,
1081 Err(Error::from(
1082 "invalid index for blob trace batch part at key b1 expected 0 got 1"
1083 ))
1084 );
1085
1086 Ok(())
1087 }
1088
1089 #[mz_ore::test]
1090 #[cfg_attr(miri, ignore)] fn encoded_batch_sizes() {
1092 fn sizes(data: DataGenerator) -> usize {
1093 let metrics = ColumnarMetrics::disconnected();
1094 let config = EncodingConfig::default();
1095 let updates: Vec<_> = data.batches().collect();
1096 let updates = BlobTraceUpdates::Row(ColumnarRecords::concat(&updates, &metrics));
1097 let trace = BlobTraceBatchPart {
1098 desc: Description::new(
1099 Antichain::from_elem(0u64),
1100 Antichain::new(),
1101 Antichain::from_elem(0u64),
1102 ),
1103 index: 0,
1104 updates,
1105 };
1106 let mut trace_buf = Vec::new();
1107 trace.encode(&mut trace_buf, &metrics, &config);
1108 trace_buf.len()
1109 }
1110
1111 let record_size_bytes = DataGenerator::default().record_size_bytes;
1112 assert_eq!(
1115 format!(
1116 "1/1={:?} 25/1={:?} 1000/1={:?} 1000/100={:?}",
1117 sizes(DataGenerator::new(1, record_size_bytes, 1)),
1118 sizes(DataGenerator::new(25, record_size_bytes, 25)),
1119 sizes(DataGenerator::new(1_000, record_size_bytes, 1_000)),
1120 sizes(DataGenerator::new(1_000, record_size_bytes, 1_000 / 100)),
1121 ),
1122 "1/1=867 25/1=2613 1000/1=72845 1000/100=72845"
1123 );
1124 }
1125}