1use std::fmt::{self, Debug};
17use std::sync::Arc;
18
19use arrow::array::{Array, ArrayRef, AsArray, BinaryArray, Int64Array};
20use arrow::buffer::OffsetBuffer;
21use arrow::datatypes::{DataType, Int64Type, ToByteSlice};
22use base64::Engine;
23use bytes::{BufMut, Bytes};
24use differential_dataflow::trace::Description;
25use mz_ore::bytes::SegmentedBytes;
26use mz_ore::cast::CastFrom;
27use mz_ore::collections::CollectionExt;
28use mz_ore::soft_panic_or_log;
29use mz_persist_types::arrow::{ArrayBound, ArrayOrd};
30use mz_persist_types::columnar::{
31 ColumnEncoder, Schema, codec_to_schema, data_type, schema_to_codec,
32};
33use mz_persist_types::parquet::EncodingConfig;
34use mz_persist_types::part::Part;
35use mz_persist_types::schema::backward_compatible;
36use mz_persist_types::{Codec, Codec64};
37use mz_proto::{RustType, TryFromProtoError};
38use proptest::arbitrary::Arbitrary;
39use proptest::prelude::*;
40use proptest::strategy::{BoxedStrategy, Just};
41use prost::Message;
42use serde::Serialize;
43use timely::PartialOrder;
44use timely::progress::{Antichain, Timestamp};
45use tracing::error;
46
47use crate::error::Error;
48use crate::generated::persist::proto_batch_part_inline::FormatMetadata as ProtoFormatMetadata;
49use crate::generated::persist::{
50 ProtoBatchFormat, ProtoBatchPartInline, ProtoColumnarRecords, ProtoU64Antichain,
51 ProtoU64Description,
52};
53use crate::indexed::columnar::arrow::realloc_array;
54use crate::indexed::columnar::parquet::{decode_trace_parquet, encode_trace_parquet};
55use crate::indexed::columnar::{ColumnarRecords, ColumnarRecordsStructuredExt};
56use crate::location::Blob;
57use crate::metrics::ColumnarMetrics;
58
59#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize)]
61pub enum BatchColumnarFormat {
62 Row,
65 Both(usize),
69 Structured,
72}
73
74impl BatchColumnarFormat {
75 pub const fn default() -> Self {
77 BatchColumnarFormat::Both(2)
78 }
79
80 pub fn from_str(s: &str) -> Self {
83 match s {
84 "row" => BatchColumnarFormat::Row,
85 "both" => BatchColumnarFormat::Both(0),
86 "both_v2" => BatchColumnarFormat::Both(2),
87 "structured" => BatchColumnarFormat::Structured,
88 x => {
89 let default = BatchColumnarFormat::default();
90 soft_panic_or_log!("Invalid batch columnar type: {x}, falling back to {default}");
91 default
92 }
93 }
94 }
95
96 pub const fn as_str(&self) -> &'static str {
98 match self {
99 BatchColumnarFormat::Row => "row",
100 BatchColumnarFormat::Both(0 | 1) => "both",
101 BatchColumnarFormat::Both(2) => "both_v2",
102 _ => panic!("unknown batch columnar format"),
103 }
104 }
105
106 pub const fn is_structured(&self) -> bool {
108 match self {
109 BatchColumnarFormat::Row => false,
110 BatchColumnarFormat::Both(0 | 1) => false,
112 BatchColumnarFormat::Both(_) => true,
113 BatchColumnarFormat::Structured => true,
114 }
115 }
116}
117
118impl fmt::Display for BatchColumnarFormat {
119 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
120 f.write_str(self.as_str())
121 }
122}
123
124impl Arbitrary for BatchColumnarFormat {
125 type Parameters = ();
126 type Strategy = BoxedStrategy<BatchColumnarFormat>;
127
128 fn arbitrary_with(_args: Self::Parameters) -> Self::Strategy {
129 proptest::strategy::Union::new(vec![
130 Just(BatchColumnarFormat::Row).boxed(),
131 Just(BatchColumnarFormat::Both(0)).boxed(),
132 Just(BatchColumnarFormat::Both(1)).boxed(),
133 ])
134 .boxed()
135 }
136}
137
138#[derive(Clone, Debug, Eq, PartialEq)]
147pub struct TraceBatchMeta {
148 pub keys: Vec<String>,
152 pub format: ProtoBatchFormat,
154 pub desc: Description<u64>,
157 pub level: u64,
159 pub size_bytes: u64,
161}
162
163#[derive(Clone, Debug, PartialEq)]
189pub struct BlobTraceBatchPart<T> {
190 pub desc: Description<T>,
195 pub index: u64,
197 pub updates: BlobTraceUpdates,
199}
200
201#[derive(Clone, Debug, PartialEq)]
203pub enum BlobTraceUpdates {
204 Row(ColumnarRecords),
209 Both(ColumnarRecords, ColumnarRecordsStructuredExt),
214 Structured {
217 key_values: ColumnarRecordsStructuredExt,
219 timestamps: Int64Array,
221 diffs: Int64Array,
223 },
224}
225
226impl BlobTraceUpdates {
227 pub fn from_part(part: Part) -> Self {
229 Self::Structured {
230 key_values: ColumnarRecordsStructuredExt {
231 key: part.key,
232 val: part.val,
233 },
234 timestamps: part.time,
235 diffs: part.diff,
236 }
237 }
238
239 pub fn len(&self) -> usize {
241 match self {
242 BlobTraceUpdates::Row(c) => c.len(),
243 BlobTraceUpdates::Both(c, _structured) => c.len(),
244 BlobTraceUpdates::Structured { timestamps, .. } => timestamps.len(),
245 }
246 }
247
248 pub fn timestamps(&self) -> &Int64Array {
250 match self {
251 BlobTraceUpdates::Row(c) => c.timestamps(),
252 BlobTraceUpdates::Both(c, _structured) => c.timestamps(),
253 BlobTraceUpdates::Structured { timestamps, .. } => timestamps,
254 }
255 }
256
257 pub fn diffs(&self) -> &Int64Array {
259 match self {
260 BlobTraceUpdates::Row(c) => c.diffs(),
261 BlobTraceUpdates::Both(c, _structured) => c.diffs(),
262 BlobTraceUpdates::Structured { diffs, .. } => diffs,
263 }
264 }
265
266 pub fn records(&self) -> Option<&ColumnarRecords> {
268 match self {
269 BlobTraceUpdates::Row(c) => Some(c),
270 BlobTraceUpdates::Both(c, _structured) => Some(c),
271 BlobTraceUpdates::Structured { .. } => None,
272 }
273 }
274
275 pub fn structured(&self) -> Option<&ColumnarRecordsStructuredExt> {
277 match self {
278 BlobTraceUpdates::Row(_) => None,
279 BlobTraceUpdates::Both(_, s) => Some(s),
280 BlobTraceUpdates::Structured { key_values, .. } => Some(key_values),
281 }
282 }
283
284 pub fn goodbytes(&self) -> usize {
286 match self {
287 BlobTraceUpdates::Row(c) => c.goodbytes(),
288 BlobTraceUpdates::Both(c, _) => c.goodbytes(),
292 BlobTraceUpdates::Structured {
293 key_values,
294 timestamps,
295 diffs,
296 } => {
297 key_values.goodbytes()
298 + timestamps.values().to_byte_slice().len()
299 + diffs.values().to_byte_slice().len()
300 }
301 }
302 }
303
304 pub fn get_or_make_codec<K: Codec, V: Codec>(
306 &mut self,
307 key_schema: &K::Schema,
308 val_schema: &V::Schema,
309 ) -> &ColumnarRecords {
310 match self {
311 BlobTraceUpdates::Row(records) => records,
312 BlobTraceUpdates::Both(records, _) => records,
313 BlobTraceUpdates::Structured {
314 key_values,
315 timestamps,
316 diffs,
317 } => {
318 let key = schema_to_codec::<K>(key_schema, &*key_values.key).expect("valid keys");
319 let val = schema_to_codec::<V>(val_schema, &*key_values.val).expect("valid values");
320 let records = ColumnarRecords::new(key, val, timestamps.clone(), diffs.clone());
321
322 *self = BlobTraceUpdates::Both(records, key_values.clone());
323 let BlobTraceUpdates::Both(records, _) = self else {
324 unreachable!("set to BlobTraceUpdates::Both in previous line")
325 };
326 records
327 }
328 }
329 }
330
331 pub fn get_or_make_structured<K: Codec, V: Codec>(
333 &mut self,
334 key_schema: &K::Schema,
335 val_schema: &V::Schema,
336 ) -> &ColumnarRecordsStructuredExt {
337 let structured = match self {
338 BlobTraceUpdates::Row(records) => {
339 let key = codec_to_schema::<K>(key_schema, records.keys()).expect("valid keys");
340 let val = codec_to_schema::<V>(val_schema, records.vals()).expect("valid values");
341
342 *self = BlobTraceUpdates::Both(
343 records.clone(),
344 ColumnarRecordsStructuredExt { key, val },
345 );
346 let BlobTraceUpdates::Both(_, structured) = self else {
347 unreachable!("set to BlobTraceUpdates::Both in previous line")
348 };
349 structured
350 }
351 BlobTraceUpdates::Both(_, structured) => structured,
352 BlobTraceUpdates::Structured { key_values, .. } => key_values,
353 };
354
355 let migrate = |array: &mut ArrayRef, to_type: DataType| {
360 let from_type = array.data_type().clone();
362 if from_type != to_type {
363 if let Some(migration) = backward_compatible(&from_type, &to_type) {
364 *array = migration.migrate(Arc::clone(array));
365 } else {
366 error!(
367 ?from_type,
368 ?to_type,
369 "failed to migrate array type; backwards-incompatible schema migration?"
370 );
371 }
372 }
373 };
374 migrate(
375 &mut structured.key,
376 data_type::<K>(key_schema).expect("valid key schema"),
377 );
378 migrate(
379 &mut structured.val,
380 data_type::<V>(val_schema).expect("valid value schema"),
381 );
382
383 structured
384 }
385
386 pub fn into_part<K: Codec, V: Codec>(
388 &mut self,
389 key_schema: &K::Schema,
390 val_schema: &V::Schema,
391 ) -> Part {
392 let ext = self
393 .get_or_make_structured::<K, V>(key_schema, val_schema)
394 .clone();
395 Part {
396 key: ext.key,
397 val: ext.val,
398 time: self.timestamps().clone(),
399 diff: self.diffs().clone(),
400 }
401 }
402
403 pub fn concat<K: Codec, V: Codec>(
408 mut updates: Vec<BlobTraceUpdates>,
409 key_schema: &K::Schema,
410 val_schema: &V::Schema,
411 metrics: &ColumnarMetrics,
412 ) -> anyhow::Result<BlobTraceUpdates> {
413 match updates.len() {
414 0 => {
415 return Ok(BlobTraceUpdates::Structured {
416 key_values: ColumnarRecordsStructuredExt {
417 key: Arc::new(key_schema.encoder().expect("valid schema").finish()),
418 val: Arc::new(val_schema.encoder().expect("valid schema").finish()),
419 },
420 timestamps: Int64Array::from_iter_values(vec![]),
421 diffs: Int64Array::from_iter_values(vec![]),
422 });
423 }
424 1 => return Ok(updates.into_iter().into_element()),
425 _ => {}
426 }
427
428 let mut keys = Vec::with_capacity(updates.len());
430 let mut vals = Vec::with_capacity(updates.len());
431 for updates in &mut updates {
432 let structured = updates.get_or_make_structured::<K, V>(key_schema, val_schema);
433 keys.push(structured.key.as_ref());
434 vals.push(structured.val.as_ref());
435 }
436 let key_values = ColumnarRecordsStructuredExt {
437 key: ::arrow::compute::concat(&keys)?,
438 val: ::arrow::compute::concat(&vals)?,
439 };
440
441 let mut timestamps: Vec<&dyn Array> = Vec::with_capacity(updates.len());
444 let mut diffs: Vec<&dyn Array> = Vec::with_capacity(updates.len());
445
446 for update in &updates {
447 timestamps.push(update.timestamps());
448 diffs.push(update.diffs());
449 }
450 let timestamps = ::arrow::compute::concat(×tamps)?
451 .as_primitive_opt::<Int64Type>()
452 .ok_or_else(|| anyhow::anyhow!("timestamps changed Array type"))?
453 .clone();
454 let diffs = ::arrow::compute::concat(&diffs)?
455 .as_primitive_opt::<Int64Type>()
456 .ok_or_else(|| anyhow::anyhow!("diffs changed Array type"))?
457 .clone();
458
459 let out = Self::Structured {
460 key_values,
461 timestamps,
462 diffs,
463 };
464 metrics
465 .arrow
466 .concat_bytes
467 .inc_by(u64::cast_from(out.goodbytes()));
468 Ok(out)
469 }
470
471 pub fn from_proto(
473 lgbytes: &ColumnarMetrics,
474 proto: ProtoColumnarRecords,
475 ) -> Result<Self, TryFromProtoError> {
476 let binary_array = |data: Bytes, offsets: Vec<i32>| {
477 if offsets.is_empty() && proto.len > 0 {
478 return Ok(None);
479 };
480 match BinaryArray::try_new(
481 OffsetBuffer::new(offsets.into()),
482 ::arrow::buffer::Buffer::from_bytes(data.into()),
483 None,
484 ) {
485 Ok(data) => Ok(Some(realloc_array(&data, lgbytes))),
486 Err(e) => Err(TryFromProtoError::InvalidFieldError(format!(
487 "Unable to decode binary array from repeated proto fields: {e:?}"
488 ))),
489 }
490 };
491
492 let codec_key = binary_array(proto.key_data, proto.key_offsets)?;
493 let codec_val = binary_array(proto.val_data, proto.val_offsets)?;
494
495 let timestamps = realloc_array(&proto.timestamps.into(), lgbytes);
496 let diffs = realloc_array(&proto.diffs.into(), lgbytes);
497 let ext =
498 ColumnarRecordsStructuredExt::from_proto(proto.key_structured, proto.val_structured)?;
499
500 let updates = match (codec_key, codec_val, ext) {
501 (Some(codec_key), Some(codec_val), Some(ext)) => BlobTraceUpdates::Both(
502 ColumnarRecords::new(codec_key, codec_val, timestamps, diffs),
503 ext,
504 ),
505 (Some(codec_key), Some(codec_val), None) => BlobTraceUpdates::Row(
506 ColumnarRecords::new(codec_key, codec_val, timestamps, diffs),
507 ),
508 (None, None, Some(ext)) => BlobTraceUpdates::Structured {
509 key_values: ext,
510 timestamps,
511 diffs,
512 },
513 (k, v, ext) => {
514 return Err(TryFromProtoError::InvalidPersistState(format!(
515 "unexpected mix of key/value columns: k={:?}, v={}, ext={}",
516 k.is_some(),
517 v.is_some(),
518 ext.is_some(),
519 )));
520 }
521 };
522
523 Ok(updates)
524 }
525
526 pub fn into_proto(&self) -> ProtoColumnarRecords {
528 let (key_offsets, key_data, val_offsets, val_data) = match self.records() {
529 None => (vec![], Bytes::new(), vec![], Bytes::new()),
530 Some(records) => (
531 records.keys().offsets().to_vec(),
532 Bytes::copy_from_slice(records.keys().value_data()),
533 records.vals().offsets().to_vec(),
534 Bytes::copy_from_slice(records.vals().value_data()),
535 ),
536 };
537 let (k_struct, v_struct) = match self.structured().map(|x| x.into_proto()) {
538 None => (None, None),
539 Some((k, v)) => (Some(k), Some(v)),
540 };
541
542 ProtoColumnarRecords {
543 len: self.len().into_proto(),
544 key_offsets,
545 key_data,
546 val_offsets,
547 val_data,
548 timestamps: self.timestamps().values().to_vec(),
549 diffs: self.diffs().values().to_vec(),
550 key_structured: k_struct,
551 val_structured: v_struct,
552 }
553 }
554
555 pub fn as_structured<K: Codec, V: Codec>(
558 &self,
559 key_schema: &K::Schema,
560 val_schema: &V::Schema,
561 ) -> Self {
562 let mut this = self.clone();
563 Self::Structured {
564 key_values: this
565 .get_or_make_structured::<K, V>(key_schema, val_schema)
566 .clone(),
567 timestamps: this.timestamps().clone(),
568 diffs: this.diffs().clone(),
569 }
570 }
571}
572
573impl TraceBatchMeta {
574 pub fn validate(&self) -> Result<(), Error> {
577 if PartialOrder::less_equal(self.desc.upper(), self.desc.lower()) {
581 return Err(format!("invalid desc: {:?}", &self.desc).into());
582 }
583
584 Ok(())
585 }
586
587 pub async fn validate_data(
589 &self,
590 blob: &dyn Blob,
591 metrics: &ColumnarMetrics,
592 ) -> Result<(), Error> {
593 let mut batches = vec![];
594 for (idx, key) in self.keys.iter().enumerate() {
595 let value = blob
596 .get(key)
597 .await?
598 .ok_or_else(|| Error::from(format!("no blob for trace batch at key: {}", key)))?;
599 let batch = BlobTraceBatchPart::decode(&value, metrics)?;
600 if batch.desc != self.desc {
601 return Err(format!(
602 "invalid trace batch part desc expected {:?} got {:?}",
603 &self.desc, &batch.desc
604 )
605 .into());
606 }
607
608 if batch.index != u64::cast_from(idx) {
609 return Err(format!(
610 "invalid index for blob trace batch part at key {} expected {} got {}",
611 key, idx, batch.index
612 )
613 .into());
614 }
615
616 batch.validate()?;
617 batches.push(batch);
618 }
619
620 for (batch_idx, batch) in batches.iter().enumerate() {
621 for (row_idx, diff) in batch.updates.diffs().values().iter().enumerate() {
622 let diff: u64 = Codec64::decode(diff.to_le_bytes());
624
625 if diff == 0 {
627 return Err(format!(
628 "update with 0 diff in batch {batch_idx} at row {row_idx}",
629 )
630 .into());
631 }
632 }
633 }
634
635 Ok(())
636 }
637}
638
639impl<T: Timestamp + Codec64> BlobTraceBatchPart<T> {
640 pub fn validate(&self) -> Result<(), Error> {
643 if PartialOrder::less_equal(self.desc.upper(), self.desc.lower()) {
647 return Err(format!("invalid desc: {:?}", &self.desc).into());
648 }
649
650 let uncompacted = PartialOrder::less_equal(self.desc.since(), self.desc.lower());
651
652 for time in self.updates.timestamps().values() {
653 let ts = T::decode(time.to_le_bytes());
654 if !self.desc.lower().less_equal(&ts) {
656 return Err(format!(
657 "timestamp {:?} is less than the batch lower: {:?}",
658 ts, self.desc
659 )
660 .into());
661 }
662
663 if uncompacted && self.desc.upper().less_equal(&ts) {
668 return Err(format!(
669 "timestamp {:?} is greater than or equal to the batch upper: {:?}",
670 ts, self.desc
671 )
672 .into());
673 }
674 }
675
676 for (row_idx, diff) in self.updates.diffs().values().iter().enumerate() {
677 let diff: u64 = Codec64::decode(diff.to_le_bytes());
679
680 if diff == 0 {
682 return Err(format!("update with 0 diff at row {row_idx}",).into());
683 }
684 }
685
686 Ok(())
687 }
688
689 pub fn encode<B>(&self, buf: &mut B, metrics: &ColumnarMetrics, cfg: &EncodingConfig)
691 where
692 B: BufMut + Send,
693 {
694 encode_trace_parquet(&mut buf.writer(), self, metrics, cfg).expect("batch was invalid");
695 }
696
697 pub fn decode(buf: &SegmentedBytes, metrics: &ColumnarMetrics) -> Result<Self, Error> {
699 decode_trace_parquet(buf.clone(), metrics)
700 }
701
702 pub fn key_lower(&self) -> &[u8] {
704 self.updates
705 .records()
706 .and_then(|r| r.keys().iter().flatten().min())
707 .unwrap_or(&[])
708 }
709
710 pub fn structured_key_lower(&self) -> Option<ArrayBound> {
712 self.updates.structured().and_then(|r| {
713 let ord = ArrayOrd::new(&r.key);
714 (0..r.key.len())
715 .min_by_key(|i| ord.at(*i))
716 .map(|i| ArrayBound::new(Arc::clone(&r.key), i))
717 })
718 }
719}
720
721impl<T: Timestamp + Codec64> From<ProtoU64Description> for Description<T> {
722 fn from(x: ProtoU64Description) -> Self {
723 Description::new(
724 x.lower
725 .map_or_else(|| Antichain::from_elem(T::minimum()), |x| x.into()),
726 x.upper
727 .map_or_else(|| Antichain::from_elem(T::minimum()), |x| x.into()),
728 x.since
729 .map_or_else(|| Antichain::from_elem(T::minimum()), |x| x.into()),
730 )
731 }
732}
733
734impl<T: Timestamp + Codec64> From<ProtoU64Antichain> for Antichain<T> {
735 fn from(x: ProtoU64Antichain) -> Self {
736 Antichain::from(
737 x.elements
738 .into_iter()
739 .map(|x| T::decode(u64::to_le_bytes(x)))
740 .collect::<Vec<_>>(),
741 )
742 }
743}
744
745impl<T: Timestamp + Codec64> From<&Antichain<T>> for ProtoU64Antichain {
746 fn from(x: &Antichain<T>) -> Self {
747 ProtoU64Antichain {
748 elements: x
749 .elements()
750 .iter()
751 .map(|x| u64::from_le_bytes(T::encode(x)))
752 .collect(),
753 }
754 }
755}
756
757impl<T: Timestamp + Codec64> From<&Description<T>> for ProtoU64Description {
758 fn from(x: &Description<T>) -> Self {
759 ProtoU64Description {
760 lower: Some(x.lower().into()),
761 upper: Some(x.upper().into()),
762 since: Some(x.since().into()),
763 }
764 }
765}
766
767pub fn encode_trace_inline_meta<T: Timestamp + Codec64>(batch: &BlobTraceBatchPart<T>) -> String {
769 let (format, format_metadata) = match &batch.updates {
770 BlobTraceUpdates::Row(_) => (ProtoBatchFormat::ParquetKvtd, None),
772 BlobTraceUpdates::Both { .. } => {
774 let metadata = ProtoFormatMetadata::StructuredMigration(2);
775 (ProtoBatchFormat::ParquetStructured, Some(metadata))
776 }
777 BlobTraceUpdates::Structured { .. } => {
778 let metadata = ProtoFormatMetadata::StructuredMigration(3);
779 (ProtoBatchFormat::ParquetStructured, Some(metadata))
780 }
781 };
782
783 let inline = ProtoBatchPartInline {
784 format: format.into(),
785 desc: Some((&batch.desc).into()),
786 index: batch.index,
787 format_metadata,
788 };
789 let inline_encoded = inline.encode_to_vec();
790 base64::engine::general_purpose::STANDARD.encode(inline_encoded)
791}
792
793pub fn decode_trace_inline_meta(
795 inline_base64: Option<&String>,
796) -> Result<(ProtoBatchFormat, ProtoBatchPartInline), Error> {
797 let inline_base64 = inline_base64.ok_or("missing batch metadata")?;
798 let inline_encoded = base64::engine::general_purpose::STANDARD
799 .decode(inline_base64)
800 .map_err(|err| err.to_string())?;
801 let inline = ProtoBatchPartInline::decode(&*inline_encoded).map_err(|err| err.to_string())?;
802 let format = ProtoBatchFormat::try_from(inline.format)
803 .map_err(|_| Error::from(format!("unknown format: {}", inline.format)))?;
804 Ok((format, inline))
805}
806
807#[cfg(test)]
808mod tests {
809 use std::sync::Arc;
810
811 use bytes::Bytes;
812
813 use crate::error::Error;
814 use crate::indexed::columnar::ColumnarRecordsBuilder;
815 use crate::mem::{MemBlob, MemBlobConfig};
816 use crate::metrics::ColumnarMetrics;
817 use crate::workload::DataGenerator;
818
819 use super::*;
820
821 fn update_with_key(ts: u64, key: &'static str) -> ((Vec<u8>, Vec<u8>), u64, i64) {
822 ((key.into(), "".into()), ts, 1)
823 }
824
825 fn u64_desc(lower: u64, upper: u64) -> Description<u64> {
826 Description::new(
827 Antichain::from_elem(lower),
828 Antichain::from_elem(upper),
829 Antichain::from_elem(0),
830 )
831 }
832
833 fn batch_meta(lower: u64, upper: u64) -> TraceBatchMeta {
834 TraceBatchMeta {
835 keys: vec![],
836 format: ProtoBatchFormat::Unknown,
837 desc: u64_desc(lower, upper),
838 level: 1,
839 size_bytes: 0,
840 }
841 }
842
843 fn u64_desc_since(lower: u64, upper: u64, since: u64) -> Description<u64> {
844 Description::new(
845 Antichain::from_elem(lower),
846 Antichain::from_elem(upper),
847 Antichain::from_elem(since),
848 )
849 }
850
851 fn columnar_records(updates: Vec<((Vec<u8>, Vec<u8>), u64, i64)>) -> BlobTraceUpdates {
852 let mut builder = ColumnarRecordsBuilder::default();
853 for ((k, v), t, d) in updates {
854 assert!(builder.push(((&k, &v), Codec64::encode(&t), Codec64::encode(&d))));
855 }
856 let updates = builder.finish(&ColumnarMetrics::disconnected());
857 BlobTraceUpdates::Row(updates)
858 }
859
860 #[mz_ore::test]
861 fn trace_batch_validate() {
862 let b = BlobTraceBatchPart {
864 desc: u64_desc(0, 2),
865 index: 0,
866 updates: columnar_records(vec![update_with_key(0, "0"), update_with_key(1, "1")]),
867 };
868 assert_eq!(b.validate(), Ok(()));
869
870 let b = BlobTraceBatchPart {
872 desc: u64_desc(0, 2),
873 index: 0,
874 updates: columnar_records(vec![]),
875 };
876 assert_eq!(b.validate(), Ok(()));
877
878 let b = BlobTraceBatchPart {
880 desc: u64_desc(2, 0),
881 index: 0,
882 updates: columnar_records(vec![]),
883 };
884 assert_eq!(
885 b.validate(),
886 Err(Error::from(
887 "invalid desc: Description { lower: Antichain { elements: [2] }, upper: Antichain { elements: [0] }, since: Antichain { elements: [0] } }"
888 ))
889 );
890
891 let b = BlobTraceBatchPart {
893 desc: u64_desc(0, 0),
894 index: 0,
895 updates: columnar_records(vec![]),
896 };
897 assert_eq!(
898 b.validate(),
899 Err(Error::from(
900 "invalid desc: Description { lower: Antichain { elements: [0] }, upper: Antichain { elements: [0] }, since: Antichain { elements: [0] } }"
901 ))
902 );
903
904 let b = BlobTraceBatchPart {
906 desc: u64_desc(1, 2),
907 index: 0,
908 updates: columnar_records(vec![update_with_key(0, "0")]),
909 };
910 assert_eq!(
911 b.validate(),
912 Err(Error::from(
913 "timestamp 0 is less than the batch lower: Description { lower: Antichain { elements: [1] }, upper: Antichain { elements: [2] }, since: Antichain { elements: [0] } }"
914 ))
915 );
916
917 let b = BlobTraceBatchPart {
919 desc: u64_desc(1, 2),
920 index: 0,
921 updates: columnar_records(vec![update_with_key(2, "0")]),
922 };
923 assert_eq!(
924 b.validate(),
925 Err(Error::from(
926 "timestamp 2 is greater than or equal to the batch upper: Description { lower: Antichain { elements: [1] }, upper: Antichain { elements: [2] }, since: Antichain { elements: [0] } }"
927 ))
928 );
929
930 let b = BlobTraceBatchPart {
932 desc: u64_desc_since(1, 2, 4),
933 index: 0,
934 updates: columnar_records(vec![update_with_key(2, "0")]),
935 };
936 assert_eq!(b.validate(), Ok(()));
937
938 let b = BlobTraceBatchPart {
940 desc: u64_desc_since(1, 2, 4),
941 index: 0,
942 updates: columnar_records(vec![update_with_key(4, "0")]),
943 };
944 assert_eq!(b.validate(), Ok(()));
945
946 let b = BlobTraceBatchPart {
948 desc: u64_desc_since(1, 2, 4),
949 index: 0,
950 updates: columnar_records(vec![update_with_key(5, "0")]),
951 };
952 assert_eq!(b.validate(), Ok(()));
953
954 let b = BlobTraceBatchPart {
956 desc: u64_desc(0, 1),
957 index: 0,
958 updates: columnar_records(vec![(("0".into(), "0".into()), 0, 0)]),
959 };
960 assert_eq!(
961 b.validate(),
962 Err(Error::from("update with 0 diff at row 0"))
963 );
964 }
965
966 #[mz_ore::test]
967 fn trace_batch_meta_validate() {
968 let b = batch_meta(0, 1);
970 assert_eq!(b.validate(), Ok(()));
971
972 let b = batch_meta(0, 0);
974 assert_eq!(
975 b.validate(),
976 Err(Error::from(
977 "invalid desc: Description { lower: Antichain { elements: [0] }, upper: Antichain { elements: [0] }, since: Antichain { elements: [0] } }"
978 )),
979 );
980
981 let b = batch_meta(2, 0);
983 assert_eq!(
984 b.validate(),
985 Err(Error::from(
986 "invalid desc: Description { lower: Antichain { elements: [2] }, upper: Antichain { elements: [0] }, since: Antichain { elements: [0] } }"
987 )),
988 );
989 }
990
991 async fn expect_set_trace_batch<T: Timestamp + Codec64>(
992 blob: &dyn Blob,
993 key: &str,
994 batch: &BlobTraceBatchPart<T>,
995 ) -> u64 {
996 let mut val = Vec::new();
997 let metrics = ColumnarMetrics::disconnected();
998 let config = EncodingConfig::default();
999 batch.encode(&mut val, &metrics, &config);
1000 let val = Bytes::from(val);
1001 let val_len = u64::cast_from(val.len());
1002 blob.set(key, val).await.expect("failed to set trace batch");
1003 val_len
1004 }
1005
1006 #[mz_ore::test(tokio::test)]
1007 #[cfg_attr(miri, ignore)] async fn trace_batch_meta_validate_data() -> Result<(), Error> {
1009 let metrics = ColumnarMetrics::disconnected();
1010 let blob = Arc::new(MemBlob::open(MemBlobConfig::default()));
1011 let format = ProtoBatchFormat::ParquetKvtd;
1012
1013 let batch_desc = u64_desc_since(0, 3, 0);
1014 let batch0 = BlobTraceBatchPart {
1015 desc: batch_desc.clone(),
1016 index: 0,
1017 updates: columnar_records(vec![
1018 (("k".as_bytes().to_vec(), "v".as_bytes().to_vec()), 2, 1),
1019 (("k3".as_bytes().to_vec(), "v3".as_bytes().to_vec()), 2, 1),
1020 ]),
1021 };
1022 let batch1 = BlobTraceBatchPart {
1023 desc: batch_desc.clone(),
1024 index: 1,
1025 updates: columnar_records(vec![
1026 (("k4".as_bytes().to_vec(), "v4".as_bytes().to_vec()), 2, 1),
1027 (("k5".as_bytes().to_vec(), "v5".as_bytes().to_vec()), 2, 1),
1028 ]),
1029 };
1030
1031 let batch0_size_bytes = expect_set_trace_batch(blob.as_ref(), "b0", &batch0).await;
1032 let batch1_size_bytes = expect_set_trace_batch(blob.as_ref(), "b1", &batch1).await;
1033 let size_bytes = batch0_size_bytes + batch1_size_bytes;
1034 let batch_meta = TraceBatchMeta {
1035 keys: vec!["b0".into(), "b1".into()],
1036 format,
1037 desc: batch_desc.clone(),
1038 level: 0,
1039 size_bytes,
1040 };
1041
1042 assert_eq!(
1044 batch_meta.validate_data(blob.as_ref(), &metrics).await,
1045 Ok(())
1046 );
1047
1048 let batch_meta = TraceBatchMeta {
1050 keys: vec!["b0".into(), "b1".into()],
1051 format,
1052 desc: u64_desc_since(1, 3, 0),
1053 level: 0,
1054 size_bytes,
1055 };
1056 assert_eq!(
1057 batch_meta.validate_data(blob.as_ref(), &metrics).await,
1058 Err(Error::from(
1059 "invalid trace batch part desc expected Description { lower: Antichain { elements: [1] }, upper: Antichain { elements: [3] }, since: Antichain { elements: [0] } } got Description { lower: Antichain { elements: [0] }, upper: Antichain { elements: [3] }, since: Antichain { elements: [0] } }"
1060 ))
1061 );
1062 let batch_meta = TraceBatchMeta {
1064 keys: vec!["b0".into(), "b1".into(), "b2".into()],
1065 format,
1066 desc: batch_desc.clone(),
1067 level: 0,
1068 size_bytes,
1069 };
1070 assert_eq!(
1071 batch_meta.validate_data(blob.as_ref(), &metrics).await,
1072 Err(Error::from("no blob for trace batch at key: b2"))
1073 );
1074 let batch_meta = TraceBatchMeta {
1076 keys: vec!["b1".into(), "b0".into()],
1077 format,
1078 desc: batch_desc,
1079 level: 0,
1080 size_bytes,
1081 };
1082 assert_eq!(
1083 batch_meta.validate_data(blob.as_ref(), &metrics).await,
1084 Err(Error::from(
1085 "invalid index for blob trace batch part at key b1 expected 0 got 1"
1086 ))
1087 );
1088
1089 Ok(())
1090 }
1091
1092 #[mz_ore::test]
1093 #[cfg_attr(miri, ignore)] fn encoded_batch_sizes() {
1095 fn sizes(data: DataGenerator) -> usize {
1096 let metrics = ColumnarMetrics::disconnected();
1097 let config = EncodingConfig::default();
1098 let updates: Vec<_> = data.batches().collect();
1099 let updates = BlobTraceUpdates::Row(ColumnarRecords::concat(&updates, &metrics));
1100 let trace = BlobTraceBatchPart {
1101 desc: Description::new(
1102 Antichain::from_elem(0u64),
1103 Antichain::new(),
1104 Antichain::from_elem(0u64),
1105 ),
1106 index: 0,
1107 updates,
1108 };
1109 let mut trace_buf = Vec::new();
1110 trace.encode(&mut trace_buf, &metrics, &config);
1111 trace_buf.len()
1112 }
1113
1114 let record_size_bytes = DataGenerator::default().record_size_bytes;
1115 assert_eq!(
1118 format!(
1119 "1/1={:?} 25/1={:?} 1000/1={:?} 1000/100={:?}",
1120 sizes(DataGenerator::new(1, record_size_bytes, 1)),
1121 sizes(DataGenerator::new(25, record_size_bytes, 25)),
1122 sizes(DataGenerator::new(1_000, record_size_bytes, 1_000)),
1123 sizes(DataGenerator::new(1_000, record_size_bytes, 1_000 / 100)),
1124 ),
1125 "1/1=867 25/1=2613 1000/1=72845 1000/100=72845"
1126 );
1127 }
1128}