1use std::fmt::{self, Debug};
17use std::sync::Arc;
18
19use arrow::array::{Array, ArrayRef, AsArray, BinaryArray, Int64Array};
20use arrow::buffer::OffsetBuffer;
21use arrow::datatypes::{DataType, Int64Type, ToByteSlice};
22use base64::Engine;
23use bytes::{BufMut, Bytes};
24use differential_dataflow::difference::Monoid;
25use differential_dataflow::trace::Description;
26use mz_ore::bytes::SegmentedBytes;
27use mz_ore::cast::CastFrom;
28use mz_ore::collections::CollectionExt;
29use mz_ore::soft_panic_or_log;
30use mz_persist_types::arrow::{ArrayBound, ArrayOrd};
31use mz_persist_types::columnar::{
32 ColumnEncoder, Schema, codec_to_schema, data_type, schema_to_codec,
33};
34use mz_persist_types::parquet::EncodingConfig;
35use mz_persist_types::part::Part;
36use mz_persist_types::schema::backward_compatible;
37use mz_persist_types::{Codec, Codec64};
38use mz_proto::{RustType, TryFromProtoError};
39use proptest::arbitrary::Arbitrary;
40use proptest::prelude::*;
41use proptest::strategy::{BoxedStrategy, Just};
42use prost::Message;
43use serde::Serialize;
44use timely::PartialOrder;
45use timely::progress::{Antichain, Timestamp};
46use tracing::error;
47
48use crate::error::Error;
49use crate::generated::persist::proto_batch_part_inline::FormatMetadata as ProtoFormatMetadata;
50use crate::generated::persist::{
51 ProtoBatchFormat, ProtoBatchPartInline, ProtoColumnarRecords, ProtoU64Antichain,
52 ProtoU64Description,
53};
54use crate::indexed::columnar::arrow::realloc_array;
55use crate::indexed::columnar::parquet::{decode_trace_parquet, encode_trace_parquet};
56use crate::indexed::columnar::{ColumnarRecords, ColumnarRecordsStructuredExt};
57use crate::location::Blob;
58use crate::metrics::ColumnarMetrics;
59
60#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize)]
62pub enum BatchColumnarFormat {
63 Row,
66 Both(usize),
70 Structured,
73}
74
75impl BatchColumnarFormat {
76 pub const fn default() -> Self {
78 BatchColumnarFormat::Both(2)
79 }
80
81 pub fn from_str(s: &str) -> Self {
84 match s {
85 "row" => BatchColumnarFormat::Row,
86 "both" => BatchColumnarFormat::Both(0),
87 "both_v2" => BatchColumnarFormat::Both(2),
88 "structured" => BatchColumnarFormat::Structured,
89 x => {
90 let default = BatchColumnarFormat::default();
91 soft_panic_or_log!("Invalid batch columnar type: {x}, falling back to {default}");
92 default
93 }
94 }
95 }
96
97 pub const fn as_str(&self) -> &'static str {
99 match self {
100 BatchColumnarFormat::Row => "row",
101 BatchColumnarFormat::Both(0 | 1) => "both",
102 BatchColumnarFormat::Both(2) => "both_v2",
103 BatchColumnarFormat::Structured => "structured",
104 BatchColumnarFormat::Both(_) => panic!("unknown batch columnar format"),
105 }
106 }
107
108 pub const fn is_structured(&self) -> bool {
110 match self {
111 BatchColumnarFormat::Row => false,
112 BatchColumnarFormat::Both(0 | 1) => false,
114 BatchColumnarFormat::Both(_) => true,
115 BatchColumnarFormat::Structured => true,
116 }
117 }
118}
119
120impl fmt::Display for BatchColumnarFormat {
121 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
122 f.write_str(self.as_str())
123 }
124}
125
126impl Arbitrary for BatchColumnarFormat {
127 type Parameters = ();
128 type Strategy = BoxedStrategy<BatchColumnarFormat>;
129
130 fn arbitrary_with(_args: Self::Parameters) -> Self::Strategy {
131 proptest::strategy::Union::new(vec![
132 Just(BatchColumnarFormat::Row).boxed(),
133 Just(BatchColumnarFormat::Both(0)).boxed(),
134 Just(BatchColumnarFormat::Both(1)).boxed(),
135 Just(BatchColumnarFormat::Both(2)).boxed(),
136 Just(BatchColumnarFormat::Structured).boxed(),
137 ])
138 .boxed()
139 }
140}
141
142#[derive(Clone, Debug, Eq, PartialEq)]
151pub struct TraceBatchMeta {
152 pub keys: Vec<String>,
156 pub format: ProtoBatchFormat,
158 pub desc: Description<u64>,
161 pub level: u64,
163 pub size_bytes: u64,
165}
166
167#[derive(Clone, Debug, PartialEq)]
193pub struct BlobTraceBatchPart<T> {
194 pub desc: Description<T>,
199 pub index: u64,
201 pub updates: BlobTraceUpdates,
203}
204
205#[derive(Clone, Debug, PartialEq)]
207pub enum BlobTraceUpdates {
208 Row(ColumnarRecords),
213 Both(ColumnarRecords, ColumnarRecordsStructuredExt),
218 Structured {
221 key_values: ColumnarRecordsStructuredExt,
223 timestamps: Int64Array,
225 diffs: Int64Array,
227 },
228}
229
230impl BlobTraceUpdates {
231 pub fn from_part(part: Part) -> Self {
233 Self::Structured {
234 key_values: ColumnarRecordsStructuredExt {
235 key: part.key,
236 val: part.val,
237 },
238 timestamps: part.time,
239 diffs: part.diff,
240 }
241 }
242
243 pub fn len(&self) -> usize {
245 match self {
246 BlobTraceUpdates::Row(c) => c.len(),
247 BlobTraceUpdates::Both(c, _structured) => c.len(),
248 BlobTraceUpdates::Structured { timestamps, .. } => timestamps.len(),
249 }
250 }
251
252 pub fn timestamps(&self) -> &Int64Array {
254 match self {
255 BlobTraceUpdates::Row(c) => c.timestamps(),
256 BlobTraceUpdates::Both(c, _structured) => c.timestamps(),
257 BlobTraceUpdates::Structured { timestamps, .. } => timestamps,
258 }
259 }
260
261 pub fn diffs(&self) -> &Int64Array {
263 match self {
264 BlobTraceUpdates::Row(c) => c.diffs(),
265 BlobTraceUpdates::Both(c, _structured) => c.diffs(),
266 BlobTraceUpdates::Structured { diffs, .. } => diffs,
267 }
268 }
269
270 pub fn diffs_sum<D: Codec64 + Monoid>(&self) -> D {
272 let mut sum = D::zero();
273 for d in self.diffs().values().iter() {
274 let d = D::decode(d.to_le_bytes());
275 sum.plus_equals(&d);
276 }
277 sum
278 }
279
280 pub fn records(&self) -> Option<&ColumnarRecords> {
282 match self {
283 BlobTraceUpdates::Row(c) => Some(c),
284 BlobTraceUpdates::Both(c, _structured) => Some(c),
285 BlobTraceUpdates::Structured { .. } => None,
286 }
287 }
288
289 pub fn structured(&self) -> Option<&ColumnarRecordsStructuredExt> {
291 match self {
292 BlobTraceUpdates::Row(_) => None,
293 BlobTraceUpdates::Both(_, s) => Some(s),
294 BlobTraceUpdates::Structured { key_values, .. } => Some(key_values),
295 }
296 }
297
298 pub fn goodbytes(&self) -> usize {
300 match self {
301 BlobTraceUpdates::Row(c) => c.goodbytes(),
302 BlobTraceUpdates::Both(c, _) => c.goodbytes(),
306 BlobTraceUpdates::Structured {
307 key_values,
308 timestamps,
309 diffs,
310 } => {
311 key_values.goodbytes()
312 + timestamps.values().to_byte_slice().len()
313 + diffs.values().to_byte_slice().len()
314 }
315 }
316 }
317
318 pub fn get_or_make_codec<K: Codec, V: Codec>(
320 &mut self,
321 key_schema: &K::Schema,
322 val_schema: &V::Schema,
323 ) -> &ColumnarRecords {
324 match self {
325 BlobTraceUpdates::Row(records) => records,
326 BlobTraceUpdates::Both(records, _) => records,
327 BlobTraceUpdates::Structured {
328 key_values,
329 timestamps,
330 diffs,
331 } => {
332 let key = schema_to_codec::<K>(key_schema, &*key_values.key).expect("valid keys");
333 let val = schema_to_codec::<V>(val_schema, &*key_values.val).expect("valid values");
334 let records = ColumnarRecords::new(key, val, timestamps.clone(), diffs.clone());
335
336 *self = BlobTraceUpdates::Both(records, key_values.clone());
337 let BlobTraceUpdates::Both(records, _) = self else {
338 unreachable!("set to BlobTraceUpdates::Both in previous line")
339 };
340 records
341 }
342 }
343 }
344
345 pub fn get_or_make_structured<K: Codec, V: Codec>(
347 &mut self,
348 key_schema: &K::Schema,
349 val_schema: &V::Schema,
350 ) -> &ColumnarRecordsStructuredExt {
351 let structured = match self {
352 BlobTraceUpdates::Row(records) => {
353 let key = codec_to_schema::<K>(key_schema, records.keys()).expect("valid keys");
354 let val = codec_to_schema::<V>(val_schema, records.vals()).expect("valid values");
355
356 *self = BlobTraceUpdates::Both(
357 records.clone(),
358 ColumnarRecordsStructuredExt { key, val },
359 );
360 let BlobTraceUpdates::Both(_, structured) = self else {
361 unreachable!("set to BlobTraceUpdates::Both in previous line")
362 };
363 structured
364 }
365 BlobTraceUpdates::Both(_, structured) => structured,
366 BlobTraceUpdates::Structured { key_values, .. } => key_values,
367 };
368
369 let migrate = |array: &mut ArrayRef, to_type: DataType| {
374 let from_type = array.data_type().clone();
376 if from_type != to_type {
377 if let Some(migration) = backward_compatible(&from_type, &to_type) {
378 *array = migration.migrate(Arc::clone(array));
379 if array.data_type() != &to_type {
380 error!(
381 ?from_type,
382 actual_type=?array.data_type(),
383 ?to_type,
384 "migration failed; returned non-matching data type!"
385 );
386 }
387 } else {
388 error!(
389 ?from_type,
390 ?to_type,
391 "failed to migrate array type; backwards-incompatible schema migration?"
392 );
393 }
394 }
395 };
396 migrate(
397 &mut structured.key,
398 data_type::<K>(key_schema).expect("valid key schema"),
399 );
400 migrate(
401 &mut structured.val,
402 data_type::<V>(val_schema).expect("valid value schema"),
403 );
404
405 structured
406 }
407
408 pub fn as_part(&self) -> Option<Part> {
410 let ext = self.structured()?.clone();
411 Some(Part {
412 key: ext.key,
413 val: ext.val,
414 time: self.timestamps().clone(),
415 diff: self.diffs().clone(),
416 })
417 }
418
419 pub fn into_part<K: Codec, V: Codec>(
421 &mut self,
422 key_schema: &K::Schema,
423 val_schema: &V::Schema,
424 ) -> Part {
425 let ext = self
426 .get_or_make_structured::<K, V>(key_schema, val_schema)
427 .clone();
428 Part {
429 key: ext.key,
430 val: ext.val,
431 time: self.timestamps().clone(),
432 diff: self.diffs().clone(),
433 }
434 }
435
436 pub fn concat<K: Codec, V: Codec>(
441 mut updates: Vec<BlobTraceUpdates>,
442 key_schema: &K::Schema,
443 val_schema: &V::Schema,
444 metrics: &ColumnarMetrics,
445 ) -> anyhow::Result<BlobTraceUpdates> {
446 match updates.len() {
447 0 => {
448 return Ok(BlobTraceUpdates::Structured {
449 key_values: ColumnarRecordsStructuredExt {
450 key: Arc::new(key_schema.encoder().expect("valid schema").finish()),
451 val: Arc::new(val_schema.encoder().expect("valid schema").finish()),
452 },
453 timestamps: Int64Array::from_iter_values(vec![]),
454 diffs: Int64Array::from_iter_values(vec![]),
455 });
456 }
457 1 => return Ok(updates.into_iter().into_element()),
458 _ => {}
459 }
460
461 let mut keys = Vec::with_capacity(updates.len());
463 let mut vals = Vec::with_capacity(updates.len());
464 for updates in &mut updates {
465 let structured = updates.get_or_make_structured::<K, V>(key_schema, val_schema);
466 keys.push(structured.key.as_ref());
467 vals.push(structured.val.as_ref());
468 }
469 let key_values = ColumnarRecordsStructuredExt {
470 key: ::arrow::compute::concat(&keys)?,
471 val: ::arrow::compute::concat(&vals)?,
472 };
473
474 let mut timestamps: Vec<&dyn Array> = Vec::with_capacity(updates.len());
477 let mut diffs: Vec<&dyn Array> = Vec::with_capacity(updates.len());
478
479 for update in &updates {
480 timestamps.push(update.timestamps());
481 diffs.push(update.diffs());
482 }
483 let timestamps = ::arrow::compute::concat(×tamps)?
484 .as_primitive_opt::<Int64Type>()
485 .ok_or_else(|| anyhow::anyhow!("timestamps changed Array type"))?
486 .clone();
487 let diffs = ::arrow::compute::concat(&diffs)?
488 .as_primitive_opt::<Int64Type>()
489 .ok_or_else(|| anyhow::anyhow!("diffs changed Array type"))?
490 .clone();
491
492 let out = Self::Structured {
493 key_values,
494 timestamps,
495 diffs,
496 };
497 metrics
498 .arrow
499 .concat_bytes
500 .inc_by(u64::cast_from(out.goodbytes()));
501 Ok(out)
502 }
503
504 pub fn from_proto(
506 lgbytes: &ColumnarMetrics,
507 proto: ProtoColumnarRecords,
508 ) -> Result<Self, TryFromProtoError> {
509 let binary_array = |data: Bytes, offsets: Vec<i32>| {
510 if offsets.is_empty() && proto.len > 0 {
511 return Ok(None);
512 };
513 match BinaryArray::try_new(
514 OffsetBuffer::new(offsets.into()),
515 ::arrow::buffer::Buffer::from(data),
516 None,
517 ) {
518 Ok(data) => Ok(Some(realloc_array(&data, lgbytes))),
519 Err(e) => Err(TryFromProtoError::InvalidFieldError(format!(
520 "Unable to decode binary array from repeated proto fields: {e:?}"
521 ))),
522 }
523 };
524
525 let codec_key = binary_array(proto.key_data, proto.key_offsets)?;
526 let codec_val = binary_array(proto.val_data, proto.val_offsets)?;
527
528 let timestamps = realloc_array(&proto.timestamps.into(), lgbytes);
529 let diffs = realloc_array(&proto.diffs.into(), lgbytes);
530 let ext =
531 ColumnarRecordsStructuredExt::from_proto(proto.key_structured, proto.val_structured)?;
532
533 let updates = match (codec_key, codec_val, ext) {
534 (Some(codec_key), Some(codec_val), Some(ext)) => BlobTraceUpdates::Both(
535 ColumnarRecords::new(codec_key, codec_val, timestamps, diffs),
536 ext,
537 ),
538 (Some(codec_key), Some(codec_val), None) => BlobTraceUpdates::Row(
539 ColumnarRecords::new(codec_key, codec_val, timestamps, diffs),
540 ),
541 (None, None, Some(ext)) => BlobTraceUpdates::Structured {
542 key_values: ext,
543 timestamps,
544 diffs,
545 },
546 (k, v, ext) => {
547 return Err(TryFromProtoError::InvalidPersistState(format!(
548 "unexpected mix of key/value columns: k={:?}, v={}, ext={}",
549 k.is_some(),
550 v.is_some(),
551 ext.is_some(),
552 )));
553 }
554 };
555
556 Ok(updates)
557 }
558
559 pub fn into_proto(&self) -> ProtoColumnarRecords {
561 let (key_offsets, key_data, val_offsets, val_data) = match self.records() {
562 None => (vec![], Bytes::new(), vec![], Bytes::new()),
563 Some(records) => (
564 records.keys().offsets().to_vec(),
565 Bytes::copy_from_slice(records.keys().value_data()),
566 records.vals().offsets().to_vec(),
567 Bytes::copy_from_slice(records.vals().value_data()),
568 ),
569 };
570 let (k_struct, v_struct) = match self.structured().map(|x| x.into_proto()) {
571 None => (None, None),
572 Some((k, v)) => (Some(k), Some(v)),
573 };
574
575 ProtoColumnarRecords {
576 len: self.len().into_proto(),
577 key_offsets,
578 key_data,
579 val_offsets,
580 val_data,
581 timestamps: self.timestamps().values().to_vec(),
582 diffs: self.diffs().values().to_vec(),
583 key_structured: k_struct,
584 val_structured: v_struct,
585 }
586 }
587
588 pub fn as_structured<K: Codec, V: Codec>(
591 &self,
592 key_schema: &K::Schema,
593 val_schema: &V::Schema,
594 ) -> Self {
595 let mut this = self.clone();
596 Self::Structured {
597 key_values: this
598 .get_or_make_structured::<K, V>(key_schema, val_schema)
599 .clone(),
600 timestamps: this.timestamps().clone(),
601 diffs: this.diffs().clone(),
602 }
603 }
604}
605
606impl TraceBatchMeta {
607 pub fn validate(&self) -> Result<(), Error> {
610 if PartialOrder::less_equal(self.desc.upper(), self.desc.lower()) {
614 return Err(format!("invalid desc: {:?}", &self.desc).into());
615 }
616
617 Ok(())
618 }
619
620 pub async fn validate_data(
622 &self,
623 blob: &dyn Blob,
624 metrics: &ColumnarMetrics,
625 ) -> Result<(), Error> {
626 let mut batches = vec![];
627 for (idx, key) in self.keys.iter().enumerate() {
628 let value = blob
629 .get(key)
630 .await?
631 .ok_or_else(|| Error::from(format!("no blob for trace batch at key: {}", key)))?;
632 let batch = BlobTraceBatchPart::decode(&value, metrics)?;
633 if batch.desc != self.desc {
634 return Err(format!(
635 "invalid trace batch part desc expected {:?} got {:?}",
636 &self.desc, &batch.desc
637 )
638 .into());
639 }
640
641 if batch.index != u64::cast_from(idx) {
642 return Err(format!(
643 "invalid index for blob trace batch part at key {} expected {} got {}",
644 key, idx, batch.index
645 )
646 .into());
647 }
648
649 batch.validate()?;
650 batches.push(batch);
651 }
652
653 for (batch_idx, batch) in batches.iter().enumerate() {
654 for (row_idx, diff) in batch.updates.diffs().values().iter().enumerate() {
655 let diff: u64 = Codec64::decode(diff.to_le_bytes());
657
658 if diff == 0 {
660 return Err(format!(
661 "update with 0 diff in batch {batch_idx} at row {row_idx}",
662 )
663 .into());
664 }
665 }
666 }
667
668 Ok(())
669 }
670}
671
672impl<T: Timestamp + Codec64> BlobTraceBatchPart<T> {
673 pub fn validate(&self) -> Result<(), Error> {
676 if PartialOrder::less_equal(self.desc.upper(), self.desc.lower()) {
680 return Err(format!("invalid desc: {:?}", &self.desc).into());
681 }
682
683 let uncompacted = PartialOrder::less_equal(self.desc.since(), self.desc.lower());
684
685 for time in self.updates.timestamps().values() {
686 let ts = T::decode(time.to_le_bytes());
687 if !self.desc.lower().less_equal(&ts) {
689 return Err(format!(
690 "timestamp {:?} is less than the batch lower: {:?}",
691 ts, self.desc
692 )
693 .into());
694 }
695
696 if uncompacted && self.desc.upper().less_equal(&ts) {
701 return Err(format!(
702 "timestamp {:?} is greater than or equal to the batch upper: {:?}",
703 ts, self.desc
704 )
705 .into());
706 }
707 }
708
709 for (row_idx, diff) in self.updates.diffs().values().iter().enumerate() {
710 let diff: u64 = Codec64::decode(diff.to_le_bytes());
712
713 if diff == 0 {
715 return Err(format!("update with 0 diff at row {row_idx}",).into());
716 }
717 }
718
719 Ok(())
720 }
721
722 pub fn encode<B>(&self, buf: &mut B, metrics: &ColumnarMetrics, cfg: &EncodingConfig)
724 where
725 B: BufMut + Send,
726 {
727 encode_trace_parquet(&mut buf.writer(), self, metrics, cfg).expect("batch was invalid");
728 }
729
730 pub fn decode(buf: &SegmentedBytes, metrics: &ColumnarMetrics) -> Result<Self, Error> {
732 decode_trace_parquet(buf.clone(), metrics)
733 }
734
735 pub fn key_lower(&self) -> &[u8] {
737 self.updates
738 .records()
739 .and_then(|r| r.keys().iter().flatten().min())
740 .unwrap_or(&[])
741 }
742
743 pub fn structured_key_lower(&self) -> Option<ArrayBound> {
745 self.updates.structured().and_then(|r| {
746 let ord = ArrayOrd::new(&r.key);
747 (0..r.key.len())
748 .min_by_key(|i| ord.at(*i))
749 .map(|i| ArrayBound::new(Arc::clone(&r.key), i))
750 })
751 }
752}
753
754impl<T: Timestamp + Codec64> From<ProtoU64Description> for Description<T> {
755 fn from(x: ProtoU64Description) -> Self {
756 Description::new(
757 x.lower
758 .map_or_else(|| Antichain::from_elem(T::minimum()), |x| x.into()),
759 x.upper
760 .map_or_else(|| Antichain::from_elem(T::minimum()), |x| x.into()),
761 x.since
762 .map_or_else(|| Antichain::from_elem(T::minimum()), |x| x.into()),
763 )
764 }
765}
766
767impl<T: Timestamp + Codec64> From<ProtoU64Antichain> for Antichain<T> {
768 fn from(x: ProtoU64Antichain) -> Self {
769 Antichain::from(
770 x.elements
771 .into_iter()
772 .map(|x| T::decode(u64::to_le_bytes(x)))
773 .collect::<Vec<_>>(),
774 )
775 }
776}
777
778impl<T: Timestamp + Codec64> From<&Antichain<T>> for ProtoU64Antichain {
779 fn from(x: &Antichain<T>) -> Self {
780 ProtoU64Antichain {
781 elements: x
782 .elements()
783 .iter()
784 .map(|x| u64::from_le_bytes(T::encode(x)))
785 .collect(),
786 }
787 }
788}
789
790impl<T: Timestamp + Codec64> From<&Description<T>> for ProtoU64Description {
791 fn from(x: &Description<T>) -> Self {
792 ProtoU64Description {
793 lower: Some(x.lower().into()),
794 upper: Some(x.upper().into()),
795 since: Some(x.since().into()),
796 }
797 }
798}
799
800pub fn encode_trace_inline_meta<T: Timestamp + Codec64>(batch: &BlobTraceBatchPart<T>) -> String {
802 let (format, format_metadata) = match &batch.updates {
803 BlobTraceUpdates::Row(_) => (ProtoBatchFormat::ParquetKvtd, None),
805 BlobTraceUpdates::Both { .. } => {
807 let metadata = ProtoFormatMetadata::StructuredMigration(2);
808 (ProtoBatchFormat::ParquetStructured, Some(metadata))
809 }
810 BlobTraceUpdates::Structured { .. } => {
811 let metadata = ProtoFormatMetadata::StructuredMigration(3);
812 (ProtoBatchFormat::ParquetStructured, Some(metadata))
813 }
814 };
815
816 let inline = ProtoBatchPartInline {
817 format: format.into(),
818 desc: Some((&batch.desc).into()),
819 index: batch.index,
820 format_metadata,
821 };
822 let inline_encoded = inline.encode_to_vec();
823 base64::engine::general_purpose::STANDARD.encode(inline_encoded)
824}
825
826pub fn decode_trace_inline_meta(
828 inline_base64: Option<&String>,
829) -> Result<(ProtoBatchFormat, ProtoBatchPartInline), Error> {
830 let inline_base64 = inline_base64.ok_or("missing batch metadata")?;
831 let inline_encoded = base64::engine::general_purpose::STANDARD
832 .decode(inline_base64)
833 .map_err(|err| err.to_string())?;
834 let inline = ProtoBatchPartInline::decode(&*inline_encoded).map_err(|err| err.to_string())?;
835 let format = ProtoBatchFormat::try_from(inline.format)
836 .map_err(|_| Error::from(format!("unknown format: {}", inline.format)))?;
837 Ok((format, inline))
838}
839
840#[cfg(test)]
841mod tests {
842 use std::sync::Arc;
843
844 use bytes::Bytes;
845
846 use crate::error::Error;
847 use crate::indexed::columnar::ColumnarRecordsBuilder;
848 use crate::mem::{MemBlob, MemBlobConfig};
849 use crate::metrics::ColumnarMetrics;
850 use crate::workload::DataGenerator;
851
852 use super::*;
853
854 fn update_with_key(ts: u64, key: &'static str) -> ((Vec<u8>, Vec<u8>), u64, i64) {
855 ((key.into(), "".into()), ts, 1)
856 }
857
858 fn u64_desc(lower: u64, upper: u64) -> Description<u64> {
859 Description::new(
860 Antichain::from_elem(lower),
861 Antichain::from_elem(upper),
862 Antichain::from_elem(0),
863 )
864 }
865
866 fn batch_meta(lower: u64, upper: u64) -> TraceBatchMeta {
867 TraceBatchMeta {
868 keys: vec![],
869 format: ProtoBatchFormat::Unknown,
870 desc: u64_desc(lower, upper),
871 level: 1,
872 size_bytes: 0,
873 }
874 }
875
876 fn u64_desc_since(lower: u64, upper: u64, since: u64) -> Description<u64> {
877 Description::new(
878 Antichain::from_elem(lower),
879 Antichain::from_elem(upper),
880 Antichain::from_elem(since),
881 )
882 }
883
884 fn columnar_records(updates: Vec<((Vec<u8>, Vec<u8>), u64, i64)>) -> BlobTraceUpdates {
885 let mut builder = ColumnarRecordsBuilder::default();
886 for ((k, v), t, d) in updates {
887 assert!(builder.push(((&k, &v), Codec64::encode(&t), Codec64::encode(&d))));
888 }
889 let updates = builder.finish(&ColumnarMetrics::disconnected());
890 BlobTraceUpdates::Row(updates)
891 }
892
893 #[mz_ore::test]
894 fn trace_batch_validate() {
895 let b = BlobTraceBatchPart {
897 desc: u64_desc(0, 2),
898 index: 0,
899 updates: columnar_records(vec![update_with_key(0, "0"), update_with_key(1, "1")]),
900 };
901 assert_eq!(b.validate(), Ok(()));
902
903 let b = BlobTraceBatchPart {
905 desc: u64_desc(0, 2),
906 index: 0,
907 updates: columnar_records(vec![]),
908 };
909 assert_eq!(b.validate(), Ok(()));
910
911 let b = BlobTraceBatchPart {
913 desc: u64_desc(2, 0),
914 index: 0,
915 updates: columnar_records(vec![]),
916 };
917 assert_eq!(
918 b.validate(),
919 Err(Error::from(
920 "invalid desc: Description { lower: Antichain { elements: [2] }, upper: Antichain { elements: [0] }, since: Antichain { elements: [0] } }"
921 ))
922 );
923
924 let b = BlobTraceBatchPart {
926 desc: u64_desc(0, 0),
927 index: 0,
928 updates: columnar_records(vec![]),
929 };
930 assert_eq!(
931 b.validate(),
932 Err(Error::from(
933 "invalid desc: Description { lower: Antichain { elements: [0] }, upper: Antichain { elements: [0] }, since: Antichain { elements: [0] } }"
934 ))
935 );
936
937 let b = BlobTraceBatchPart {
939 desc: u64_desc(1, 2),
940 index: 0,
941 updates: columnar_records(vec![update_with_key(0, "0")]),
942 };
943 assert_eq!(
944 b.validate(),
945 Err(Error::from(
946 "timestamp 0 is less than the batch lower: Description { lower: Antichain { elements: [1] }, upper: Antichain { elements: [2] }, since: Antichain { elements: [0] } }"
947 ))
948 );
949
950 let b = BlobTraceBatchPart {
952 desc: u64_desc(1, 2),
953 index: 0,
954 updates: columnar_records(vec![update_with_key(2, "0")]),
955 };
956 assert_eq!(
957 b.validate(),
958 Err(Error::from(
959 "timestamp 2 is greater than or equal to the batch upper: Description { lower: Antichain { elements: [1] }, upper: Antichain { elements: [2] }, since: Antichain { elements: [0] } }"
960 ))
961 );
962
963 let b = BlobTraceBatchPart {
965 desc: u64_desc_since(1, 2, 4),
966 index: 0,
967 updates: columnar_records(vec![update_with_key(2, "0")]),
968 };
969 assert_eq!(b.validate(), Ok(()));
970
971 let b = BlobTraceBatchPart {
973 desc: u64_desc_since(1, 2, 4),
974 index: 0,
975 updates: columnar_records(vec![update_with_key(4, "0")]),
976 };
977 assert_eq!(b.validate(), Ok(()));
978
979 let b = BlobTraceBatchPart {
981 desc: u64_desc_since(1, 2, 4),
982 index: 0,
983 updates: columnar_records(vec![update_with_key(5, "0")]),
984 };
985 assert_eq!(b.validate(), Ok(()));
986
987 let b = BlobTraceBatchPart {
989 desc: u64_desc(0, 1),
990 index: 0,
991 updates: columnar_records(vec![(("0".into(), "0".into()), 0, 0)]),
992 };
993 assert_eq!(
994 b.validate(),
995 Err(Error::from("update with 0 diff at row 0"))
996 );
997 }
998
999 #[mz_ore::test]
1000 fn trace_batch_meta_validate() {
1001 let b = batch_meta(0, 1);
1003 assert_eq!(b.validate(), Ok(()));
1004
1005 let b = batch_meta(0, 0);
1007 assert_eq!(
1008 b.validate(),
1009 Err(Error::from(
1010 "invalid desc: Description { lower: Antichain { elements: [0] }, upper: Antichain { elements: [0] }, since: Antichain { elements: [0] } }"
1011 )),
1012 );
1013
1014 let b = batch_meta(2, 0);
1016 assert_eq!(
1017 b.validate(),
1018 Err(Error::from(
1019 "invalid desc: Description { lower: Antichain { elements: [2] }, upper: Antichain { elements: [0] }, since: Antichain { elements: [0] } }"
1020 )),
1021 );
1022 }
1023
1024 async fn expect_set_trace_batch<T: Timestamp + Codec64>(
1025 blob: &dyn Blob,
1026 key: &str,
1027 batch: &BlobTraceBatchPart<T>,
1028 ) -> u64 {
1029 let mut val = Vec::new();
1030 let metrics = ColumnarMetrics::disconnected();
1031 let config = EncodingConfig::default();
1032 batch.encode(&mut val, &metrics, &config);
1033 let val = Bytes::from(val);
1034 let val_len = u64::cast_from(val.len());
1035 blob.set(key, val).await.expect("failed to set trace batch");
1036 val_len
1037 }
1038
1039 #[mz_ore::test(tokio::test)]
1040 #[cfg_attr(miri, ignore)] async fn trace_batch_meta_validate_data() -> Result<(), Error> {
1042 let metrics = ColumnarMetrics::disconnected();
1043 let blob = Arc::new(MemBlob::open(MemBlobConfig::default()));
1044 let format = ProtoBatchFormat::ParquetKvtd;
1045
1046 let batch_desc = u64_desc_since(0, 3, 0);
1047 let batch0 = BlobTraceBatchPart {
1048 desc: batch_desc.clone(),
1049 index: 0,
1050 updates: columnar_records(vec![
1051 (("k".as_bytes().to_vec(), "v".as_bytes().to_vec()), 2, 1),
1052 (("k3".as_bytes().to_vec(), "v3".as_bytes().to_vec()), 2, 1),
1053 ]),
1054 };
1055 let batch1 = BlobTraceBatchPart {
1056 desc: batch_desc.clone(),
1057 index: 1,
1058 updates: columnar_records(vec![
1059 (("k4".as_bytes().to_vec(), "v4".as_bytes().to_vec()), 2, 1),
1060 (("k5".as_bytes().to_vec(), "v5".as_bytes().to_vec()), 2, 1),
1061 ]),
1062 };
1063
1064 let batch0_size_bytes = expect_set_trace_batch(blob.as_ref(), "b0", &batch0).await;
1065 let batch1_size_bytes = expect_set_trace_batch(blob.as_ref(), "b1", &batch1).await;
1066 let size_bytes = batch0_size_bytes + batch1_size_bytes;
1067 let batch_meta = TraceBatchMeta {
1068 keys: vec!["b0".into(), "b1".into()],
1069 format,
1070 desc: batch_desc.clone(),
1071 level: 0,
1072 size_bytes,
1073 };
1074
1075 assert_eq!(
1077 batch_meta.validate_data(blob.as_ref(), &metrics).await,
1078 Ok(())
1079 );
1080
1081 let batch_meta = TraceBatchMeta {
1083 keys: vec!["b0".into(), "b1".into()],
1084 format,
1085 desc: u64_desc_since(1, 3, 0),
1086 level: 0,
1087 size_bytes,
1088 };
1089 assert_eq!(
1090 batch_meta.validate_data(blob.as_ref(), &metrics).await,
1091 Err(Error::from(
1092 "invalid trace batch part desc expected Description { lower: Antichain { elements: [1] }, upper: Antichain { elements: [3] }, since: Antichain { elements: [0] } } got Description { lower: Antichain { elements: [0] }, upper: Antichain { elements: [3] }, since: Antichain { elements: [0] } }"
1093 ))
1094 );
1095 let batch_meta = TraceBatchMeta {
1097 keys: vec!["b0".into(), "b1".into(), "b2".into()],
1098 format,
1099 desc: batch_desc.clone(),
1100 level: 0,
1101 size_bytes,
1102 };
1103 assert_eq!(
1104 batch_meta.validate_data(blob.as_ref(), &metrics).await,
1105 Err(Error::from("no blob for trace batch at key: b2"))
1106 );
1107 let batch_meta = TraceBatchMeta {
1109 keys: vec!["b1".into(), "b0".into()],
1110 format,
1111 desc: batch_desc,
1112 level: 0,
1113 size_bytes,
1114 };
1115 assert_eq!(
1116 batch_meta.validate_data(blob.as_ref(), &metrics).await,
1117 Err(Error::from(
1118 "invalid index for blob trace batch part at key b1 expected 0 got 1"
1119 ))
1120 );
1121
1122 Ok(())
1123 }
1124
1125 #[mz_ore::test]
1126 #[cfg_attr(miri, ignore)] fn encoded_batch_sizes() {
1128 fn sizes(data: DataGenerator) -> usize {
1129 let metrics = ColumnarMetrics::disconnected();
1130 let config = EncodingConfig::default();
1131 let updates: Vec<_> = data.batches().collect();
1132 let updates = BlobTraceUpdates::Row(ColumnarRecords::concat(&updates, &metrics));
1133 let trace = BlobTraceBatchPart {
1134 desc: Description::new(
1135 Antichain::from_elem(0u64),
1136 Antichain::new(),
1137 Antichain::from_elem(0u64),
1138 ),
1139 index: 0,
1140 updates,
1141 };
1142 let mut trace_buf = Vec::new();
1143 trace.encode(&mut trace_buf, &metrics, &config);
1144 trace_buf.len()
1145 }
1146
1147 let record_size_bytes = DataGenerator::default().record_size_bytes;
1148 assert_eq!(
1151 format!(
1152 "1/1={:?} 25/1={:?} 1000/1={:?} 1000/100={:?}",
1153 sizes(DataGenerator::new(1, record_size_bytes, 1)),
1154 sizes(DataGenerator::new(25, record_size_bytes, 25)),
1155 sizes(DataGenerator::new(1_000, record_size_bytes, 1_000)),
1156 sizes(DataGenerator::new(1_000, record_size_bytes, 1_000 / 100)),
1157 ),
1158 "1/1=903 25/1=2649 1000/1=72881 1000/100=72881"
1159 );
1160 }
1161}