1use std::fmt::{self, Debug};
17use std::sync::Arc;
18
19use arrow::array::{Array, ArrayRef, AsArray, BinaryArray, Int64Array};
20use arrow::buffer::OffsetBuffer;
21use arrow::datatypes::{DataType, Int64Type, ToByteSlice};
22use base64::Engine;
23use bytes::{BufMut, Bytes};
24use differential_dataflow::difference::Monoid;
25use differential_dataflow::trace::Description;
26use mz_ore::bytes::SegmentedBytes;
27use mz_ore::cast::CastFrom;
28use mz_ore::collections::CollectionExt;
29use mz_ore::soft_panic_or_log;
30use mz_persist_types::arrow::{ArrayBound, ArrayOrd};
31use mz_persist_types::columnar::{
32 ColumnEncoder, Schema, codec_to_schema, data_type, schema_to_codec,
33};
34use mz_persist_types::parquet::EncodingConfig;
35use mz_persist_types::part::Part;
36use mz_persist_types::schema::backward_compatible;
37use mz_persist_types::{Codec, Codec64};
38use mz_proto::{RustType, TryFromProtoError};
39use proptest::arbitrary::Arbitrary;
40use proptest::prelude::*;
41use proptest::strategy::{BoxedStrategy, Just};
42use prost::Message;
43use serde::Serialize;
44use timely::PartialOrder;
45use timely::progress::{Antichain, Timestamp};
46use tracing::error;
47
48use crate::error::Error;
49use crate::generated::persist::proto_batch_part_inline::FormatMetadata as ProtoFormatMetadata;
50use crate::generated::persist::{
51 ProtoBatchFormat, ProtoBatchPartInline, ProtoColumnarRecords, ProtoU64Antichain,
52 ProtoU64Description,
53};
54use crate::indexed::columnar::arrow::realloc_array;
55use crate::indexed::columnar::parquet::{decode_trace_parquet, encode_trace_parquet};
56use crate::indexed::columnar::{ColumnarRecords, ColumnarRecordsStructuredExt};
57use crate::location::Blob;
58use crate::metrics::ColumnarMetrics;
59
60#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize)]
62pub enum BatchColumnarFormat {
63 Row,
66 Both(usize),
70 Structured,
73}
74
75impl BatchColumnarFormat {
76 pub const fn default() -> Self {
78 BatchColumnarFormat::Both(2)
79 }
80
81 pub fn from_str(s: &str) -> Self {
84 match s {
85 "row" => BatchColumnarFormat::Row,
86 "both" => BatchColumnarFormat::Both(0),
87 "both_v2" => BatchColumnarFormat::Both(2),
88 "structured" => BatchColumnarFormat::Structured,
89 x => {
90 let default = BatchColumnarFormat::default();
91 soft_panic_or_log!("Invalid batch columnar type: {x}, falling back to {default}");
92 default
93 }
94 }
95 }
96
97 pub const fn as_str(&self) -> &'static str {
99 match self {
100 BatchColumnarFormat::Row => "row",
101 BatchColumnarFormat::Both(0 | 1) => "both",
102 BatchColumnarFormat::Both(2) => "both_v2",
103 _ => panic!("unknown batch columnar format"),
104 }
105 }
106
107 pub const fn is_structured(&self) -> bool {
109 match self {
110 BatchColumnarFormat::Row => false,
111 BatchColumnarFormat::Both(0 | 1) => false,
113 BatchColumnarFormat::Both(_) => true,
114 BatchColumnarFormat::Structured => true,
115 }
116 }
117}
118
119impl fmt::Display for BatchColumnarFormat {
120 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
121 f.write_str(self.as_str())
122 }
123}
124
125impl Arbitrary for BatchColumnarFormat {
126 type Parameters = ();
127 type Strategy = BoxedStrategy<BatchColumnarFormat>;
128
129 fn arbitrary_with(_args: Self::Parameters) -> Self::Strategy {
130 proptest::strategy::Union::new(vec![
131 Just(BatchColumnarFormat::Row).boxed(),
132 Just(BatchColumnarFormat::Both(0)).boxed(),
133 Just(BatchColumnarFormat::Both(1)).boxed(),
134 ])
135 .boxed()
136 }
137}
138
139#[derive(Clone, Debug, Eq, PartialEq)]
148pub struct TraceBatchMeta {
149 pub keys: Vec<String>,
153 pub format: ProtoBatchFormat,
155 pub desc: Description<u64>,
158 pub level: u64,
160 pub size_bytes: u64,
162}
163
164#[derive(Clone, Debug, PartialEq)]
190pub struct BlobTraceBatchPart<T> {
191 pub desc: Description<T>,
196 pub index: u64,
198 pub updates: BlobTraceUpdates,
200}
201
202#[derive(Clone, Debug, PartialEq)]
204pub enum BlobTraceUpdates {
205 Row(ColumnarRecords),
210 Both(ColumnarRecords, ColumnarRecordsStructuredExt),
215 Structured {
218 key_values: ColumnarRecordsStructuredExt,
220 timestamps: Int64Array,
222 diffs: Int64Array,
224 },
225}
226
227impl BlobTraceUpdates {
228 pub fn from_part(part: Part) -> Self {
230 Self::Structured {
231 key_values: ColumnarRecordsStructuredExt {
232 key: part.key,
233 val: part.val,
234 },
235 timestamps: part.time,
236 diffs: part.diff,
237 }
238 }
239
240 pub fn len(&self) -> usize {
242 match self {
243 BlobTraceUpdates::Row(c) => c.len(),
244 BlobTraceUpdates::Both(c, _structured) => c.len(),
245 BlobTraceUpdates::Structured { timestamps, .. } => timestamps.len(),
246 }
247 }
248
249 pub fn timestamps(&self) -> &Int64Array {
251 match self {
252 BlobTraceUpdates::Row(c) => c.timestamps(),
253 BlobTraceUpdates::Both(c, _structured) => c.timestamps(),
254 BlobTraceUpdates::Structured { timestamps, .. } => timestamps,
255 }
256 }
257
258 pub fn diffs(&self) -> &Int64Array {
260 match self {
261 BlobTraceUpdates::Row(c) => c.diffs(),
262 BlobTraceUpdates::Both(c, _structured) => c.diffs(),
263 BlobTraceUpdates::Structured { diffs, .. } => diffs,
264 }
265 }
266
267 pub fn diffs_sum<D: Codec64 + Monoid>(&self) -> D {
269 let mut sum = D::zero();
270 for d in self.diffs().values().iter() {
271 let d = D::decode(d.to_le_bytes());
272 sum.plus_equals(&d);
273 }
274 sum
275 }
276
277 pub fn records(&self) -> Option<&ColumnarRecords> {
279 match self {
280 BlobTraceUpdates::Row(c) => Some(c),
281 BlobTraceUpdates::Both(c, _structured) => Some(c),
282 BlobTraceUpdates::Structured { .. } => None,
283 }
284 }
285
286 pub fn structured(&self) -> Option<&ColumnarRecordsStructuredExt> {
288 match self {
289 BlobTraceUpdates::Row(_) => None,
290 BlobTraceUpdates::Both(_, s) => Some(s),
291 BlobTraceUpdates::Structured { key_values, .. } => Some(key_values),
292 }
293 }
294
295 pub fn goodbytes(&self) -> usize {
297 match self {
298 BlobTraceUpdates::Row(c) => c.goodbytes(),
299 BlobTraceUpdates::Both(c, _) => c.goodbytes(),
303 BlobTraceUpdates::Structured {
304 key_values,
305 timestamps,
306 diffs,
307 } => {
308 key_values.goodbytes()
309 + timestamps.values().to_byte_slice().len()
310 + diffs.values().to_byte_slice().len()
311 }
312 }
313 }
314
315 pub fn get_or_make_codec<K: Codec, V: Codec>(
317 &mut self,
318 key_schema: &K::Schema,
319 val_schema: &V::Schema,
320 ) -> &ColumnarRecords {
321 match self {
322 BlobTraceUpdates::Row(records) => records,
323 BlobTraceUpdates::Both(records, _) => records,
324 BlobTraceUpdates::Structured {
325 key_values,
326 timestamps,
327 diffs,
328 } => {
329 let key = schema_to_codec::<K>(key_schema, &*key_values.key).expect("valid keys");
330 let val = schema_to_codec::<V>(val_schema, &*key_values.val).expect("valid values");
331 let records = ColumnarRecords::new(key, val, timestamps.clone(), diffs.clone());
332
333 *self = BlobTraceUpdates::Both(records, key_values.clone());
334 let BlobTraceUpdates::Both(records, _) = self else {
335 unreachable!("set to BlobTraceUpdates::Both in previous line")
336 };
337 records
338 }
339 }
340 }
341
342 pub fn get_or_make_structured<K: Codec, V: Codec>(
344 &mut self,
345 key_schema: &K::Schema,
346 val_schema: &V::Schema,
347 ) -> &ColumnarRecordsStructuredExt {
348 let structured = match self {
349 BlobTraceUpdates::Row(records) => {
350 let key = codec_to_schema::<K>(key_schema, records.keys()).expect("valid keys");
351 let val = codec_to_schema::<V>(val_schema, records.vals()).expect("valid values");
352
353 *self = BlobTraceUpdates::Both(
354 records.clone(),
355 ColumnarRecordsStructuredExt { key, val },
356 );
357 let BlobTraceUpdates::Both(_, structured) = self else {
358 unreachable!("set to BlobTraceUpdates::Both in previous line")
359 };
360 structured
361 }
362 BlobTraceUpdates::Both(_, structured) => structured,
363 BlobTraceUpdates::Structured { key_values, .. } => key_values,
364 };
365
366 let migrate = |array: &mut ArrayRef, to_type: DataType| {
371 let from_type = array.data_type().clone();
373 if from_type != to_type {
374 if let Some(migration) = backward_compatible(&from_type, &to_type) {
375 *array = migration.migrate(Arc::clone(array));
376 if array.data_type() != &to_type {
377 error!(
378 ?from_type,
379 actual_type=?array.data_type(),
380 ?to_type,
381 "migration failed; returned non-matching data type!"
382 );
383 }
384 } else {
385 error!(
386 ?from_type,
387 ?to_type,
388 "failed to migrate array type; backwards-incompatible schema migration?"
389 );
390 }
391 }
392 };
393 migrate(
394 &mut structured.key,
395 data_type::<K>(key_schema).expect("valid key schema"),
396 );
397 migrate(
398 &mut structured.val,
399 data_type::<V>(val_schema).expect("valid value schema"),
400 );
401
402 structured
403 }
404
405 pub fn as_part(&self) -> Option<Part> {
407 let ext = self.structured()?.clone();
408 Some(Part {
409 key: ext.key,
410 val: ext.val,
411 time: self.timestamps().clone(),
412 diff: self.diffs().clone(),
413 })
414 }
415
416 pub fn into_part<K: Codec, V: Codec>(
418 &mut self,
419 key_schema: &K::Schema,
420 val_schema: &V::Schema,
421 ) -> Part {
422 let ext = self
423 .get_or_make_structured::<K, V>(key_schema, val_schema)
424 .clone();
425 Part {
426 key: ext.key,
427 val: ext.val,
428 time: self.timestamps().clone(),
429 diff: self.diffs().clone(),
430 }
431 }
432
433 pub fn concat<K: Codec, V: Codec>(
438 mut updates: Vec<BlobTraceUpdates>,
439 key_schema: &K::Schema,
440 val_schema: &V::Schema,
441 metrics: &ColumnarMetrics,
442 ) -> anyhow::Result<BlobTraceUpdates> {
443 match updates.len() {
444 0 => {
445 return Ok(BlobTraceUpdates::Structured {
446 key_values: ColumnarRecordsStructuredExt {
447 key: Arc::new(key_schema.encoder().expect("valid schema").finish()),
448 val: Arc::new(val_schema.encoder().expect("valid schema").finish()),
449 },
450 timestamps: Int64Array::from_iter_values(vec![]),
451 diffs: Int64Array::from_iter_values(vec![]),
452 });
453 }
454 1 => return Ok(updates.into_iter().into_element()),
455 _ => {}
456 }
457
458 let mut keys = Vec::with_capacity(updates.len());
460 let mut vals = Vec::with_capacity(updates.len());
461 for updates in &mut updates {
462 let structured = updates.get_or_make_structured::<K, V>(key_schema, val_schema);
463 keys.push(structured.key.as_ref());
464 vals.push(structured.val.as_ref());
465 }
466 let key_values = ColumnarRecordsStructuredExt {
467 key: ::arrow::compute::concat(&keys)?,
468 val: ::arrow::compute::concat(&vals)?,
469 };
470
471 let mut timestamps: Vec<&dyn Array> = Vec::with_capacity(updates.len());
474 let mut diffs: Vec<&dyn Array> = Vec::with_capacity(updates.len());
475
476 for update in &updates {
477 timestamps.push(update.timestamps());
478 diffs.push(update.diffs());
479 }
480 let timestamps = ::arrow::compute::concat(×tamps)?
481 .as_primitive_opt::<Int64Type>()
482 .ok_or_else(|| anyhow::anyhow!("timestamps changed Array type"))?
483 .clone();
484 let diffs = ::arrow::compute::concat(&diffs)?
485 .as_primitive_opt::<Int64Type>()
486 .ok_or_else(|| anyhow::anyhow!("diffs changed Array type"))?
487 .clone();
488
489 let out = Self::Structured {
490 key_values,
491 timestamps,
492 diffs,
493 };
494 metrics
495 .arrow
496 .concat_bytes
497 .inc_by(u64::cast_from(out.goodbytes()));
498 Ok(out)
499 }
500
501 pub fn from_proto(
503 lgbytes: &ColumnarMetrics,
504 proto: ProtoColumnarRecords,
505 ) -> Result<Self, TryFromProtoError> {
506 let binary_array = |data: Bytes, offsets: Vec<i32>| {
507 if offsets.is_empty() && proto.len > 0 {
508 return Ok(None);
509 };
510 match BinaryArray::try_new(
511 OffsetBuffer::new(offsets.into()),
512 ::arrow::buffer::Buffer::from(data),
513 None,
514 ) {
515 Ok(data) => Ok(Some(realloc_array(&data, lgbytes))),
516 Err(e) => Err(TryFromProtoError::InvalidFieldError(format!(
517 "Unable to decode binary array from repeated proto fields: {e:?}"
518 ))),
519 }
520 };
521
522 let codec_key = binary_array(proto.key_data, proto.key_offsets)?;
523 let codec_val = binary_array(proto.val_data, proto.val_offsets)?;
524
525 let timestamps = realloc_array(&proto.timestamps.into(), lgbytes);
526 let diffs = realloc_array(&proto.diffs.into(), lgbytes);
527 let ext =
528 ColumnarRecordsStructuredExt::from_proto(proto.key_structured, proto.val_structured)?;
529
530 let updates = match (codec_key, codec_val, ext) {
531 (Some(codec_key), Some(codec_val), Some(ext)) => BlobTraceUpdates::Both(
532 ColumnarRecords::new(codec_key, codec_val, timestamps, diffs),
533 ext,
534 ),
535 (Some(codec_key), Some(codec_val), None) => BlobTraceUpdates::Row(
536 ColumnarRecords::new(codec_key, codec_val, timestamps, diffs),
537 ),
538 (None, None, Some(ext)) => BlobTraceUpdates::Structured {
539 key_values: ext,
540 timestamps,
541 diffs,
542 },
543 (k, v, ext) => {
544 return Err(TryFromProtoError::InvalidPersistState(format!(
545 "unexpected mix of key/value columns: k={:?}, v={}, ext={}",
546 k.is_some(),
547 v.is_some(),
548 ext.is_some(),
549 )));
550 }
551 };
552
553 Ok(updates)
554 }
555
556 pub fn into_proto(&self) -> ProtoColumnarRecords {
558 let (key_offsets, key_data, val_offsets, val_data) = match self.records() {
559 None => (vec![], Bytes::new(), vec![], Bytes::new()),
560 Some(records) => (
561 records.keys().offsets().to_vec(),
562 Bytes::copy_from_slice(records.keys().value_data()),
563 records.vals().offsets().to_vec(),
564 Bytes::copy_from_slice(records.vals().value_data()),
565 ),
566 };
567 let (k_struct, v_struct) = match self.structured().map(|x| x.into_proto()) {
568 None => (None, None),
569 Some((k, v)) => (Some(k), Some(v)),
570 };
571
572 ProtoColumnarRecords {
573 len: self.len().into_proto(),
574 key_offsets,
575 key_data,
576 val_offsets,
577 val_data,
578 timestamps: self.timestamps().values().to_vec(),
579 diffs: self.diffs().values().to_vec(),
580 key_structured: k_struct,
581 val_structured: v_struct,
582 }
583 }
584
585 pub fn as_structured<K: Codec, V: Codec>(
588 &self,
589 key_schema: &K::Schema,
590 val_schema: &V::Schema,
591 ) -> Self {
592 let mut this = self.clone();
593 Self::Structured {
594 key_values: this
595 .get_or_make_structured::<K, V>(key_schema, val_schema)
596 .clone(),
597 timestamps: this.timestamps().clone(),
598 diffs: this.diffs().clone(),
599 }
600 }
601}
602
603impl TraceBatchMeta {
604 pub fn validate(&self) -> Result<(), Error> {
607 if PartialOrder::less_equal(self.desc.upper(), self.desc.lower()) {
611 return Err(format!("invalid desc: {:?}", &self.desc).into());
612 }
613
614 Ok(())
615 }
616
617 pub async fn validate_data(
619 &self,
620 blob: &dyn Blob,
621 metrics: &ColumnarMetrics,
622 ) -> Result<(), Error> {
623 let mut batches = vec![];
624 for (idx, key) in self.keys.iter().enumerate() {
625 let value = blob
626 .get(key)
627 .await?
628 .ok_or_else(|| Error::from(format!("no blob for trace batch at key: {}", key)))?;
629 let batch = BlobTraceBatchPart::decode(&value, metrics)?;
630 if batch.desc != self.desc {
631 return Err(format!(
632 "invalid trace batch part desc expected {:?} got {:?}",
633 &self.desc, &batch.desc
634 )
635 .into());
636 }
637
638 if batch.index != u64::cast_from(idx) {
639 return Err(format!(
640 "invalid index for blob trace batch part at key {} expected {} got {}",
641 key, idx, batch.index
642 )
643 .into());
644 }
645
646 batch.validate()?;
647 batches.push(batch);
648 }
649
650 for (batch_idx, batch) in batches.iter().enumerate() {
651 for (row_idx, diff) in batch.updates.diffs().values().iter().enumerate() {
652 let diff: u64 = Codec64::decode(diff.to_le_bytes());
654
655 if diff == 0 {
657 return Err(format!(
658 "update with 0 diff in batch {batch_idx} at row {row_idx}",
659 )
660 .into());
661 }
662 }
663 }
664
665 Ok(())
666 }
667}
668
669impl<T: Timestamp + Codec64> BlobTraceBatchPart<T> {
670 pub fn validate(&self) -> Result<(), Error> {
673 if PartialOrder::less_equal(self.desc.upper(), self.desc.lower()) {
677 return Err(format!("invalid desc: {:?}", &self.desc).into());
678 }
679
680 let uncompacted = PartialOrder::less_equal(self.desc.since(), self.desc.lower());
681
682 for time in self.updates.timestamps().values() {
683 let ts = T::decode(time.to_le_bytes());
684 if !self.desc.lower().less_equal(&ts) {
686 return Err(format!(
687 "timestamp {:?} is less than the batch lower: {:?}",
688 ts, self.desc
689 )
690 .into());
691 }
692
693 if uncompacted && self.desc.upper().less_equal(&ts) {
698 return Err(format!(
699 "timestamp {:?} is greater than or equal to the batch upper: {:?}",
700 ts, self.desc
701 )
702 .into());
703 }
704 }
705
706 for (row_idx, diff) in self.updates.diffs().values().iter().enumerate() {
707 let diff: u64 = Codec64::decode(diff.to_le_bytes());
709
710 if diff == 0 {
712 return Err(format!("update with 0 diff at row {row_idx}",).into());
713 }
714 }
715
716 Ok(())
717 }
718
719 pub fn encode<B>(&self, buf: &mut B, metrics: &ColumnarMetrics, cfg: &EncodingConfig)
721 where
722 B: BufMut + Send,
723 {
724 encode_trace_parquet(&mut buf.writer(), self, metrics, cfg).expect("batch was invalid");
725 }
726
727 pub fn decode(buf: &SegmentedBytes, metrics: &ColumnarMetrics) -> Result<Self, Error> {
729 decode_trace_parquet(buf.clone(), metrics)
730 }
731
732 pub fn key_lower(&self) -> &[u8] {
734 self.updates
735 .records()
736 .and_then(|r| r.keys().iter().flatten().min())
737 .unwrap_or(&[])
738 }
739
740 pub fn structured_key_lower(&self) -> Option<ArrayBound> {
742 self.updates.structured().and_then(|r| {
743 let ord = ArrayOrd::new(&r.key);
744 (0..r.key.len())
745 .min_by_key(|i| ord.at(*i))
746 .map(|i| ArrayBound::new(Arc::clone(&r.key), i))
747 })
748 }
749}
750
751impl<T: Timestamp + Codec64> From<ProtoU64Description> for Description<T> {
752 fn from(x: ProtoU64Description) -> Self {
753 Description::new(
754 x.lower
755 .map_or_else(|| Antichain::from_elem(T::minimum()), |x| x.into()),
756 x.upper
757 .map_or_else(|| Antichain::from_elem(T::minimum()), |x| x.into()),
758 x.since
759 .map_or_else(|| Antichain::from_elem(T::minimum()), |x| x.into()),
760 )
761 }
762}
763
764impl<T: Timestamp + Codec64> From<ProtoU64Antichain> for Antichain<T> {
765 fn from(x: ProtoU64Antichain) -> Self {
766 Antichain::from(
767 x.elements
768 .into_iter()
769 .map(|x| T::decode(u64::to_le_bytes(x)))
770 .collect::<Vec<_>>(),
771 )
772 }
773}
774
775impl<T: Timestamp + Codec64> From<&Antichain<T>> for ProtoU64Antichain {
776 fn from(x: &Antichain<T>) -> Self {
777 ProtoU64Antichain {
778 elements: x
779 .elements()
780 .iter()
781 .map(|x| u64::from_le_bytes(T::encode(x)))
782 .collect(),
783 }
784 }
785}
786
787impl<T: Timestamp + Codec64> From<&Description<T>> for ProtoU64Description {
788 fn from(x: &Description<T>) -> Self {
789 ProtoU64Description {
790 lower: Some(x.lower().into()),
791 upper: Some(x.upper().into()),
792 since: Some(x.since().into()),
793 }
794 }
795}
796
797pub fn encode_trace_inline_meta<T: Timestamp + Codec64>(batch: &BlobTraceBatchPart<T>) -> String {
799 let (format, format_metadata) = match &batch.updates {
800 BlobTraceUpdates::Row(_) => (ProtoBatchFormat::ParquetKvtd, None),
802 BlobTraceUpdates::Both { .. } => {
804 let metadata = ProtoFormatMetadata::StructuredMigration(2);
805 (ProtoBatchFormat::ParquetStructured, Some(metadata))
806 }
807 BlobTraceUpdates::Structured { .. } => {
808 let metadata = ProtoFormatMetadata::StructuredMigration(3);
809 (ProtoBatchFormat::ParquetStructured, Some(metadata))
810 }
811 };
812
813 let inline = ProtoBatchPartInline {
814 format: format.into(),
815 desc: Some((&batch.desc).into()),
816 index: batch.index,
817 format_metadata,
818 };
819 let inline_encoded = inline.encode_to_vec();
820 base64::engine::general_purpose::STANDARD.encode(inline_encoded)
821}
822
823pub fn decode_trace_inline_meta(
825 inline_base64: Option<&String>,
826) -> Result<(ProtoBatchFormat, ProtoBatchPartInline), Error> {
827 let inline_base64 = inline_base64.ok_or("missing batch metadata")?;
828 let inline_encoded = base64::engine::general_purpose::STANDARD
829 .decode(inline_base64)
830 .map_err(|err| err.to_string())?;
831 let inline = ProtoBatchPartInline::decode(&*inline_encoded).map_err(|err| err.to_string())?;
832 let format = ProtoBatchFormat::try_from(inline.format)
833 .map_err(|_| Error::from(format!("unknown format: {}", inline.format)))?;
834 Ok((format, inline))
835}
836
837#[cfg(test)]
838mod tests {
839 use std::sync::Arc;
840
841 use bytes::Bytes;
842
843 use crate::error::Error;
844 use crate::indexed::columnar::ColumnarRecordsBuilder;
845 use crate::mem::{MemBlob, MemBlobConfig};
846 use crate::metrics::ColumnarMetrics;
847 use crate::workload::DataGenerator;
848
849 use super::*;
850
851 fn update_with_key(ts: u64, key: &'static str) -> ((Vec<u8>, Vec<u8>), u64, i64) {
852 ((key.into(), "".into()), ts, 1)
853 }
854
855 fn u64_desc(lower: u64, upper: u64) -> Description<u64> {
856 Description::new(
857 Antichain::from_elem(lower),
858 Antichain::from_elem(upper),
859 Antichain::from_elem(0),
860 )
861 }
862
863 fn batch_meta(lower: u64, upper: u64) -> TraceBatchMeta {
864 TraceBatchMeta {
865 keys: vec![],
866 format: ProtoBatchFormat::Unknown,
867 desc: u64_desc(lower, upper),
868 level: 1,
869 size_bytes: 0,
870 }
871 }
872
873 fn u64_desc_since(lower: u64, upper: u64, since: u64) -> Description<u64> {
874 Description::new(
875 Antichain::from_elem(lower),
876 Antichain::from_elem(upper),
877 Antichain::from_elem(since),
878 )
879 }
880
881 fn columnar_records(updates: Vec<((Vec<u8>, Vec<u8>), u64, i64)>) -> BlobTraceUpdates {
882 let mut builder = ColumnarRecordsBuilder::default();
883 for ((k, v), t, d) in updates {
884 assert!(builder.push(((&k, &v), Codec64::encode(&t), Codec64::encode(&d))));
885 }
886 let updates = builder.finish(&ColumnarMetrics::disconnected());
887 BlobTraceUpdates::Row(updates)
888 }
889
890 #[mz_ore::test]
891 fn trace_batch_validate() {
892 let b = BlobTraceBatchPart {
894 desc: u64_desc(0, 2),
895 index: 0,
896 updates: columnar_records(vec![update_with_key(0, "0"), update_with_key(1, "1")]),
897 };
898 assert_eq!(b.validate(), Ok(()));
899
900 let b = BlobTraceBatchPart {
902 desc: u64_desc(0, 2),
903 index: 0,
904 updates: columnar_records(vec![]),
905 };
906 assert_eq!(b.validate(), Ok(()));
907
908 let b = BlobTraceBatchPart {
910 desc: u64_desc(2, 0),
911 index: 0,
912 updates: columnar_records(vec![]),
913 };
914 assert_eq!(
915 b.validate(),
916 Err(Error::from(
917 "invalid desc: Description { lower: Antichain { elements: [2] }, upper: Antichain { elements: [0] }, since: Antichain { elements: [0] } }"
918 ))
919 );
920
921 let b = BlobTraceBatchPart {
923 desc: u64_desc(0, 0),
924 index: 0,
925 updates: columnar_records(vec![]),
926 };
927 assert_eq!(
928 b.validate(),
929 Err(Error::from(
930 "invalid desc: Description { lower: Antichain { elements: [0] }, upper: Antichain { elements: [0] }, since: Antichain { elements: [0] } }"
931 ))
932 );
933
934 let b = BlobTraceBatchPart {
936 desc: u64_desc(1, 2),
937 index: 0,
938 updates: columnar_records(vec![update_with_key(0, "0")]),
939 };
940 assert_eq!(
941 b.validate(),
942 Err(Error::from(
943 "timestamp 0 is less than the batch lower: Description { lower: Antichain { elements: [1] }, upper: Antichain { elements: [2] }, since: Antichain { elements: [0] } }"
944 ))
945 );
946
947 let b = BlobTraceBatchPart {
949 desc: u64_desc(1, 2),
950 index: 0,
951 updates: columnar_records(vec![update_with_key(2, "0")]),
952 };
953 assert_eq!(
954 b.validate(),
955 Err(Error::from(
956 "timestamp 2 is greater than or equal to the batch upper: Description { lower: Antichain { elements: [1] }, upper: Antichain { elements: [2] }, since: Antichain { elements: [0] } }"
957 ))
958 );
959
960 let b = BlobTraceBatchPart {
962 desc: u64_desc_since(1, 2, 4),
963 index: 0,
964 updates: columnar_records(vec![update_with_key(2, "0")]),
965 };
966 assert_eq!(b.validate(), Ok(()));
967
968 let b = BlobTraceBatchPart {
970 desc: u64_desc_since(1, 2, 4),
971 index: 0,
972 updates: columnar_records(vec![update_with_key(4, "0")]),
973 };
974 assert_eq!(b.validate(), Ok(()));
975
976 let b = BlobTraceBatchPart {
978 desc: u64_desc_since(1, 2, 4),
979 index: 0,
980 updates: columnar_records(vec![update_with_key(5, "0")]),
981 };
982 assert_eq!(b.validate(), Ok(()));
983
984 let b = BlobTraceBatchPart {
986 desc: u64_desc(0, 1),
987 index: 0,
988 updates: columnar_records(vec![(("0".into(), "0".into()), 0, 0)]),
989 };
990 assert_eq!(
991 b.validate(),
992 Err(Error::from("update with 0 diff at row 0"))
993 );
994 }
995
996 #[mz_ore::test]
997 fn trace_batch_meta_validate() {
998 let b = batch_meta(0, 1);
1000 assert_eq!(b.validate(), Ok(()));
1001
1002 let b = batch_meta(0, 0);
1004 assert_eq!(
1005 b.validate(),
1006 Err(Error::from(
1007 "invalid desc: Description { lower: Antichain { elements: [0] }, upper: Antichain { elements: [0] }, since: Antichain { elements: [0] } }"
1008 )),
1009 );
1010
1011 let b = batch_meta(2, 0);
1013 assert_eq!(
1014 b.validate(),
1015 Err(Error::from(
1016 "invalid desc: Description { lower: Antichain { elements: [2] }, upper: Antichain { elements: [0] }, since: Antichain { elements: [0] } }"
1017 )),
1018 );
1019 }
1020
1021 async fn expect_set_trace_batch<T: Timestamp + Codec64>(
1022 blob: &dyn Blob,
1023 key: &str,
1024 batch: &BlobTraceBatchPart<T>,
1025 ) -> u64 {
1026 let mut val = Vec::new();
1027 let metrics = ColumnarMetrics::disconnected();
1028 let config = EncodingConfig::default();
1029 batch.encode(&mut val, &metrics, &config);
1030 let val = Bytes::from(val);
1031 let val_len = u64::cast_from(val.len());
1032 blob.set(key, val).await.expect("failed to set trace batch");
1033 val_len
1034 }
1035
1036 #[mz_ore::test(tokio::test)]
1037 #[cfg_attr(miri, ignore)] async fn trace_batch_meta_validate_data() -> Result<(), Error> {
1039 let metrics = ColumnarMetrics::disconnected();
1040 let blob = Arc::new(MemBlob::open(MemBlobConfig::default()));
1041 let format = ProtoBatchFormat::ParquetKvtd;
1042
1043 let batch_desc = u64_desc_since(0, 3, 0);
1044 let batch0 = BlobTraceBatchPart {
1045 desc: batch_desc.clone(),
1046 index: 0,
1047 updates: columnar_records(vec![
1048 (("k".as_bytes().to_vec(), "v".as_bytes().to_vec()), 2, 1),
1049 (("k3".as_bytes().to_vec(), "v3".as_bytes().to_vec()), 2, 1),
1050 ]),
1051 };
1052 let batch1 = BlobTraceBatchPart {
1053 desc: batch_desc.clone(),
1054 index: 1,
1055 updates: columnar_records(vec![
1056 (("k4".as_bytes().to_vec(), "v4".as_bytes().to_vec()), 2, 1),
1057 (("k5".as_bytes().to_vec(), "v5".as_bytes().to_vec()), 2, 1),
1058 ]),
1059 };
1060
1061 let batch0_size_bytes = expect_set_trace_batch(blob.as_ref(), "b0", &batch0).await;
1062 let batch1_size_bytes = expect_set_trace_batch(blob.as_ref(), "b1", &batch1).await;
1063 let size_bytes = batch0_size_bytes + batch1_size_bytes;
1064 let batch_meta = TraceBatchMeta {
1065 keys: vec!["b0".into(), "b1".into()],
1066 format,
1067 desc: batch_desc.clone(),
1068 level: 0,
1069 size_bytes,
1070 };
1071
1072 assert_eq!(
1074 batch_meta.validate_data(blob.as_ref(), &metrics).await,
1075 Ok(())
1076 );
1077
1078 let batch_meta = TraceBatchMeta {
1080 keys: vec!["b0".into(), "b1".into()],
1081 format,
1082 desc: u64_desc_since(1, 3, 0),
1083 level: 0,
1084 size_bytes,
1085 };
1086 assert_eq!(
1087 batch_meta.validate_data(blob.as_ref(), &metrics).await,
1088 Err(Error::from(
1089 "invalid trace batch part desc expected Description { lower: Antichain { elements: [1] }, upper: Antichain { elements: [3] }, since: Antichain { elements: [0] } } got Description { lower: Antichain { elements: [0] }, upper: Antichain { elements: [3] }, since: Antichain { elements: [0] } }"
1090 ))
1091 );
1092 let batch_meta = TraceBatchMeta {
1094 keys: vec!["b0".into(), "b1".into(), "b2".into()],
1095 format,
1096 desc: batch_desc.clone(),
1097 level: 0,
1098 size_bytes,
1099 };
1100 assert_eq!(
1101 batch_meta.validate_data(blob.as_ref(), &metrics).await,
1102 Err(Error::from("no blob for trace batch at key: b2"))
1103 );
1104 let batch_meta = TraceBatchMeta {
1106 keys: vec!["b1".into(), "b0".into()],
1107 format,
1108 desc: batch_desc,
1109 level: 0,
1110 size_bytes,
1111 };
1112 assert_eq!(
1113 batch_meta.validate_data(blob.as_ref(), &metrics).await,
1114 Err(Error::from(
1115 "invalid index for blob trace batch part at key b1 expected 0 got 1"
1116 ))
1117 );
1118
1119 Ok(())
1120 }
1121
1122 #[mz_ore::test]
1123 #[cfg_attr(miri, ignore)] fn encoded_batch_sizes() {
1125 fn sizes(data: DataGenerator) -> usize {
1126 let metrics = ColumnarMetrics::disconnected();
1127 let config = EncodingConfig::default();
1128 let updates: Vec<_> = data.batches().collect();
1129 let updates = BlobTraceUpdates::Row(ColumnarRecords::concat(&updates, &metrics));
1130 let trace = BlobTraceBatchPart {
1131 desc: Description::new(
1132 Antichain::from_elem(0u64),
1133 Antichain::new(),
1134 Antichain::from_elem(0u64),
1135 ),
1136 index: 0,
1137 updates,
1138 };
1139 let mut trace_buf = Vec::new();
1140 trace.encode(&mut trace_buf, &metrics, &config);
1141 trace_buf.len()
1142 }
1143
1144 let record_size_bytes = DataGenerator::default().record_size_bytes;
1145 assert_eq!(
1148 format!(
1149 "1/1={:?} 25/1={:?} 1000/1={:?} 1000/100={:?}",
1150 sizes(DataGenerator::new(1, record_size_bytes, 1)),
1151 sizes(DataGenerator::new(25, record_size_bytes, 25)),
1152 sizes(DataGenerator::new(1_000, record_size_bytes, 1_000)),
1153 sizes(DataGenerator::new(1_000, record_size_bytes, 1_000 / 100)),
1154 ),
1155 "1/1=903 25/1=2649 1000/1=72881 1000/100=72881"
1156 );
1157 }
1158}