1use std::fmt::{self, Debug};
17use std::sync::Arc;
18
19use arrow::array::{Array, ArrayRef, AsArray, BinaryArray, Int64Array};
20use arrow::buffer::OffsetBuffer;
21use arrow::datatypes::{DataType, Int64Type, ToByteSlice};
22use base64::Engine;
23use bytes::{BufMut, Bytes};
24use differential_dataflow::difference::Monoid;
25use differential_dataflow::trace::Description;
26use mz_ore::bytes::SegmentedBytes;
27use mz_ore::cast::CastFrom;
28use mz_ore::collections::CollectionExt;
29use mz_ore::soft_panic_or_log;
30use mz_persist_types::arrow::{ArrayBound, ArrayOrd};
31use mz_persist_types::columnar::{
32 ColumnEncoder, Schema, codec_to_schema, data_type, schema_to_codec,
33};
34use mz_persist_types::parquet::EncodingConfig;
35use mz_persist_types::part::Part;
36use mz_persist_types::schema::backward_compatible;
37use mz_persist_types::{Codec, Codec64};
38use mz_proto::{RustType, TryFromProtoError};
39use proptest::arbitrary::Arbitrary;
40use proptest::prelude::*;
41use proptest::strategy::{BoxedStrategy, Just};
42use prost::Message;
43use serde::Serialize;
44use timely::PartialOrder;
45use timely::progress::{Antichain, Timestamp};
46use tracing::error;
47
48use crate::error::Error;
49use crate::generated::persist::proto_batch_part_inline::FormatMetadata as ProtoFormatMetadata;
50use crate::generated::persist::{
51 ProtoBatchFormat, ProtoBatchPartInline, ProtoColumnarRecords, ProtoU64Antichain,
52 ProtoU64Description,
53};
54use crate::indexed::columnar::arrow::realloc_array;
55use crate::indexed::columnar::parquet::{decode_trace_parquet, encode_trace_parquet};
56use crate::indexed::columnar::{ColumnarRecords, ColumnarRecordsStructuredExt};
57use crate::location::Blob;
58use crate::metrics::ColumnarMetrics;
59
60#[derive(Debug, Copy, Clone, PartialEq, Eq, PartialOrd, Ord, Serialize)]
62pub enum BatchColumnarFormat {
63 Row,
66 Both(usize),
70 Structured,
73}
74
75impl BatchColumnarFormat {
76 pub const fn default() -> Self {
78 BatchColumnarFormat::Both(2)
79 }
80
81 pub fn from_str(s: &str) -> Self {
84 match s {
85 "row" => BatchColumnarFormat::Row,
86 "both" => BatchColumnarFormat::Both(0),
87 "both_v2" => BatchColumnarFormat::Both(2),
88 "structured" => BatchColumnarFormat::Structured,
89 x => {
90 let default = BatchColumnarFormat::default();
91 soft_panic_or_log!("Invalid batch columnar type: {x}, falling back to {default}");
92 default
93 }
94 }
95 }
96
97 pub const fn as_str(&self) -> &'static str {
99 match self {
100 BatchColumnarFormat::Row => "row",
101 BatchColumnarFormat::Both(0 | 1) => "both",
102 BatchColumnarFormat::Both(2) => "both_v2",
103 BatchColumnarFormat::Structured => "structured",
104 _ => panic!("unknown batch columnar format"),
105 }
106 }
107
108 pub const fn is_structured(&self) -> bool {
110 match self {
111 BatchColumnarFormat::Row => false,
112 BatchColumnarFormat::Both(0 | 1) => false,
114 BatchColumnarFormat::Both(_) => true,
115 BatchColumnarFormat::Structured => true,
116 }
117 }
118}
119
120impl fmt::Display for BatchColumnarFormat {
121 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
122 f.write_str(self.as_str())
123 }
124}
125
126impl Arbitrary for BatchColumnarFormat {
127 type Parameters = ();
128 type Strategy = BoxedStrategy<BatchColumnarFormat>;
129
130 fn arbitrary_with(_args: Self::Parameters) -> Self::Strategy {
131 proptest::strategy::Union::new(vec![
132 Just(BatchColumnarFormat::Row).boxed(),
133 Just(BatchColumnarFormat::Both(0)).boxed(),
134 Just(BatchColumnarFormat::Both(1)).boxed(),
135 ])
136 .boxed()
137 }
138}
139
140#[derive(Clone, Debug, Eq, PartialEq)]
149pub struct TraceBatchMeta {
150 pub keys: Vec<String>,
154 pub format: ProtoBatchFormat,
156 pub desc: Description<u64>,
159 pub level: u64,
161 pub size_bytes: u64,
163}
164
165#[derive(Clone, Debug, PartialEq)]
191pub struct BlobTraceBatchPart<T> {
192 pub desc: Description<T>,
197 pub index: u64,
199 pub updates: BlobTraceUpdates,
201}
202
203#[derive(Clone, Debug, PartialEq)]
205pub enum BlobTraceUpdates {
206 Row(ColumnarRecords),
211 Both(ColumnarRecords, ColumnarRecordsStructuredExt),
216 Structured {
219 key_values: ColumnarRecordsStructuredExt,
221 timestamps: Int64Array,
223 diffs: Int64Array,
225 },
226}
227
228impl BlobTraceUpdates {
229 pub fn from_part(part: Part) -> Self {
231 Self::Structured {
232 key_values: ColumnarRecordsStructuredExt {
233 key: part.key,
234 val: part.val,
235 },
236 timestamps: part.time,
237 diffs: part.diff,
238 }
239 }
240
241 pub fn len(&self) -> usize {
243 match self {
244 BlobTraceUpdates::Row(c) => c.len(),
245 BlobTraceUpdates::Both(c, _structured) => c.len(),
246 BlobTraceUpdates::Structured { timestamps, .. } => timestamps.len(),
247 }
248 }
249
250 pub fn timestamps(&self) -> &Int64Array {
252 match self {
253 BlobTraceUpdates::Row(c) => c.timestamps(),
254 BlobTraceUpdates::Both(c, _structured) => c.timestamps(),
255 BlobTraceUpdates::Structured { timestamps, .. } => timestamps,
256 }
257 }
258
259 pub fn diffs(&self) -> &Int64Array {
261 match self {
262 BlobTraceUpdates::Row(c) => c.diffs(),
263 BlobTraceUpdates::Both(c, _structured) => c.diffs(),
264 BlobTraceUpdates::Structured { diffs, .. } => diffs,
265 }
266 }
267
268 pub fn diffs_sum<D: Codec64 + Monoid>(&self) -> D {
270 let mut sum = D::zero();
271 for d in self.diffs().values().iter() {
272 let d = D::decode(d.to_le_bytes());
273 sum.plus_equals(&d);
274 }
275 sum
276 }
277
278 pub fn records(&self) -> Option<&ColumnarRecords> {
280 match self {
281 BlobTraceUpdates::Row(c) => Some(c),
282 BlobTraceUpdates::Both(c, _structured) => Some(c),
283 BlobTraceUpdates::Structured { .. } => None,
284 }
285 }
286
287 pub fn structured(&self) -> Option<&ColumnarRecordsStructuredExt> {
289 match self {
290 BlobTraceUpdates::Row(_) => None,
291 BlobTraceUpdates::Both(_, s) => Some(s),
292 BlobTraceUpdates::Structured { key_values, .. } => Some(key_values),
293 }
294 }
295
296 pub fn goodbytes(&self) -> usize {
298 match self {
299 BlobTraceUpdates::Row(c) => c.goodbytes(),
300 BlobTraceUpdates::Both(c, _) => c.goodbytes(),
304 BlobTraceUpdates::Structured {
305 key_values,
306 timestamps,
307 diffs,
308 } => {
309 key_values.goodbytes()
310 + timestamps.values().to_byte_slice().len()
311 + diffs.values().to_byte_slice().len()
312 }
313 }
314 }
315
316 pub fn get_or_make_codec<K: Codec, V: Codec>(
318 &mut self,
319 key_schema: &K::Schema,
320 val_schema: &V::Schema,
321 ) -> &ColumnarRecords {
322 match self {
323 BlobTraceUpdates::Row(records) => records,
324 BlobTraceUpdates::Both(records, _) => records,
325 BlobTraceUpdates::Structured {
326 key_values,
327 timestamps,
328 diffs,
329 } => {
330 let key = schema_to_codec::<K>(key_schema, &*key_values.key).expect("valid keys");
331 let val = schema_to_codec::<V>(val_schema, &*key_values.val).expect("valid values");
332 let records = ColumnarRecords::new(key, val, timestamps.clone(), diffs.clone());
333
334 *self = BlobTraceUpdates::Both(records, key_values.clone());
335 let BlobTraceUpdates::Both(records, _) = self else {
336 unreachable!("set to BlobTraceUpdates::Both in previous line")
337 };
338 records
339 }
340 }
341 }
342
343 pub fn get_or_make_structured<K: Codec, V: Codec>(
345 &mut self,
346 key_schema: &K::Schema,
347 val_schema: &V::Schema,
348 ) -> &ColumnarRecordsStructuredExt {
349 let structured = match self {
350 BlobTraceUpdates::Row(records) => {
351 let key = codec_to_schema::<K>(key_schema, records.keys()).expect("valid keys");
352 let val = codec_to_schema::<V>(val_schema, records.vals()).expect("valid values");
353
354 *self = BlobTraceUpdates::Both(
355 records.clone(),
356 ColumnarRecordsStructuredExt { key, val },
357 );
358 let BlobTraceUpdates::Both(_, structured) = self else {
359 unreachable!("set to BlobTraceUpdates::Both in previous line")
360 };
361 structured
362 }
363 BlobTraceUpdates::Both(_, structured) => structured,
364 BlobTraceUpdates::Structured { key_values, .. } => key_values,
365 };
366
367 let migrate = |array: &mut ArrayRef, to_type: DataType| {
372 let from_type = array.data_type().clone();
374 if from_type != to_type {
375 if let Some(migration) = backward_compatible(&from_type, &to_type) {
376 *array = migration.migrate(Arc::clone(array));
377 if array.data_type() != &to_type {
378 error!(
379 ?from_type,
380 actual_type=?array.data_type(),
381 ?to_type,
382 "migration failed; returned non-matching data type!"
383 );
384 }
385 } else {
386 error!(
387 ?from_type,
388 ?to_type,
389 "failed to migrate array type; backwards-incompatible schema migration?"
390 );
391 }
392 }
393 };
394 migrate(
395 &mut structured.key,
396 data_type::<K>(key_schema).expect("valid key schema"),
397 );
398 migrate(
399 &mut structured.val,
400 data_type::<V>(val_schema).expect("valid value schema"),
401 );
402
403 structured
404 }
405
406 pub fn as_part(&self) -> Option<Part> {
408 let ext = self.structured()?.clone();
409 Some(Part {
410 key: ext.key,
411 val: ext.val,
412 time: self.timestamps().clone(),
413 diff: self.diffs().clone(),
414 })
415 }
416
417 pub fn into_part<K: Codec, V: Codec>(
419 &mut self,
420 key_schema: &K::Schema,
421 val_schema: &V::Schema,
422 ) -> Part {
423 let ext = self
424 .get_or_make_structured::<K, V>(key_schema, val_schema)
425 .clone();
426 Part {
427 key: ext.key,
428 val: ext.val,
429 time: self.timestamps().clone(),
430 diff: self.diffs().clone(),
431 }
432 }
433
434 pub fn concat<K: Codec, V: Codec>(
439 mut updates: Vec<BlobTraceUpdates>,
440 key_schema: &K::Schema,
441 val_schema: &V::Schema,
442 metrics: &ColumnarMetrics,
443 ) -> anyhow::Result<BlobTraceUpdates> {
444 match updates.len() {
445 0 => {
446 return Ok(BlobTraceUpdates::Structured {
447 key_values: ColumnarRecordsStructuredExt {
448 key: Arc::new(key_schema.encoder().expect("valid schema").finish()),
449 val: Arc::new(val_schema.encoder().expect("valid schema").finish()),
450 },
451 timestamps: Int64Array::from_iter_values(vec![]),
452 diffs: Int64Array::from_iter_values(vec![]),
453 });
454 }
455 1 => return Ok(updates.into_iter().into_element()),
456 _ => {}
457 }
458
459 let mut keys = Vec::with_capacity(updates.len());
461 let mut vals = Vec::with_capacity(updates.len());
462 for updates in &mut updates {
463 let structured = updates.get_or_make_structured::<K, V>(key_schema, val_schema);
464 keys.push(structured.key.as_ref());
465 vals.push(structured.val.as_ref());
466 }
467 let key_values = ColumnarRecordsStructuredExt {
468 key: ::arrow::compute::concat(&keys)?,
469 val: ::arrow::compute::concat(&vals)?,
470 };
471
472 let mut timestamps: Vec<&dyn Array> = Vec::with_capacity(updates.len());
475 let mut diffs: Vec<&dyn Array> = Vec::with_capacity(updates.len());
476
477 for update in &updates {
478 timestamps.push(update.timestamps());
479 diffs.push(update.diffs());
480 }
481 let timestamps = ::arrow::compute::concat(×tamps)?
482 .as_primitive_opt::<Int64Type>()
483 .ok_or_else(|| anyhow::anyhow!("timestamps changed Array type"))?
484 .clone();
485 let diffs = ::arrow::compute::concat(&diffs)?
486 .as_primitive_opt::<Int64Type>()
487 .ok_or_else(|| anyhow::anyhow!("diffs changed Array type"))?
488 .clone();
489
490 let out = Self::Structured {
491 key_values,
492 timestamps,
493 diffs,
494 };
495 metrics
496 .arrow
497 .concat_bytes
498 .inc_by(u64::cast_from(out.goodbytes()));
499 Ok(out)
500 }
501
502 pub fn from_proto(
504 lgbytes: &ColumnarMetrics,
505 proto: ProtoColumnarRecords,
506 ) -> Result<Self, TryFromProtoError> {
507 let binary_array = |data: Bytes, offsets: Vec<i32>| {
508 if offsets.is_empty() && proto.len > 0 {
509 return Ok(None);
510 };
511 match BinaryArray::try_new(
512 OffsetBuffer::new(offsets.into()),
513 ::arrow::buffer::Buffer::from(data),
514 None,
515 ) {
516 Ok(data) => Ok(Some(realloc_array(&data, lgbytes))),
517 Err(e) => Err(TryFromProtoError::InvalidFieldError(format!(
518 "Unable to decode binary array from repeated proto fields: {e:?}"
519 ))),
520 }
521 };
522
523 let codec_key = binary_array(proto.key_data, proto.key_offsets)?;
524 let codec_val = binary_array(proto.val_data, proto.val_offsets)?;
525
526 let timestamps = realloc_array(&proto.timestamps.into(), lgbytes);
527 let diffs = realloc_array(&proto.diffs.into(), lgbytes);
528 let ext =
529 ColumnarRecordsStructuredExt::from_proto(proto.key_structured, proto.val_structured)?;
530
531 let updates = match (codec_key, codec_val, ext) {
532 (Some(codec_key), Some(codec_val), Some(ext)) => BlobTraceUpdates::Both(
533 ColumnarRecords::new(codec_key, codec_val, timestamps, diffs),
534 ext,
535 ),
536 (Some(codec_key), Some(codec_val), None) => BlobTraceUpdates::Row(
537 ColumnarRecords::new(codec_key, codec_val, timestamps, diffs),
538 ),
539 (None, None, Some(ext)) => BlobTraceUpdates::Structured {
540 key_values: ext,
541 timestamps,
542 diffs,
543 },
544 (k, v, ext) => {
545 return Err(TryFromProtoError::InvalidPersistState(format!(
546 "unexpected mix of key/value columns: k={:?}, v={}, ext={}",
547 k.is_some(),
548 v.is_some(),
549 ext.is_some(),
550 )));
551 }
552 };
553
554 Ok(updates)
555 }
556
557 pub fn into_proto(&self) -> ProtoColumnarRecords {
559 let (key_offsets, key_data, val_offsets, val_data) = match self.records() {
560 None => (vec![], Bytes::new(), vec![], Bytes::new()),
561 Some(records) => (
562 records.keys().offsets().to_vec(),
563 Bytes::copy_from_slice(records.keys().value_data()),
564 records.vals().offsets().to_vec(),
565 Bytes::copy_from_slice(records.vals().value_data()),
566 ),
567 };
568 let (k_struct, v_struct) = match self.structured().map(|x| x.into_proto()) {
569 None => (None, None),
570 Some((k, v)) => (Some(k), Some(v)),
571 };
572
573 ProtoColumnarRecords {
574 len: self.len().into_proto(),
575 key_offsets,
576 key_data,
577 val_offsets,
578 val_data,
579 timestamps: self.timestamps().values().to_vec(),
580 diffs: self.diffs().values().to_vec(),
581 key_structured: k_struct,
582 val_structured: v_struct,
583 }
584 }
585
586 pub fn as_structured<K: Codec, V: Codec>(
589 &self,
590 key_schema: &K::Schema,
591 val_schema: &V::Schema,
592 ) -> Self {
593 let mut this = self.clone();
594 Self::Structured {
595 key_values: this
596 .get_or_make_structured::<K, V>(key_schema, val_schema)
597 .clone(),
598 timestamps: this.timestamps().clone(),
599 diffs: this.diffs().clone(),
600 }
601 }
602}
603
604impl TraceBatchMeta {
605 pub fn validate(&self) -> Result<(), Error> {
608 if PartialOrder::less_equal(self.desc.upper(), self.desc.lower()) {
612 return Err(format!("invalid desc: {:?}", &self.desc).into());
613 }
614
615 Ok(())
616 }
617
618 pub async fn validate_data(
620 &self,
621 blob: &dyn Blob,
622 metrics: &ColumnarMetrics,
623 ) -> Result<(), Error> {
624 let mut batches = vec![];
625 for (idx, key) in self.keys.iter().enumerate() {
626 let value = blob
627 .get(key)
628 .await?
629 .ok_or_else(|| Error::from(format!("no blob for trace batch at key: {}", key)))?;
630 let batch = BlobTraceBatchPart::decode(&value, metrics)?;
631 if batch.desc != self.desc {
632 return Err(format!(
633 "invalid trace batch part desc expected {:?} got {:?}",
634 &self.desc, &batch.desc
635 )
636 .into());
637 }
638
639 if batch.index != u64::cast_from(idx) {
640 return Err(format!(
641 "invalid index for blob trace batch part at key {} expected {} got {}",
642 key, idx, batch.index
643 )
644 .into());
645 }
646
647 batch.validate()?;
648 batches.push(batch);
649 }
650
651 for (batch_idx, batch) in batches.iter().enumerate() {
652 for (row_idx, diff) in batch.updates.diffs().values().iter().enumerate() {
653 let diff: u64 = Codec64::decode(diff.to_le_bytes());
655
656 if diff == 0 {
658 return Err(format!(
659 "update with 0 diff in batch {batch_idx} at row {row_idx}",
660 )
661 .into());
662 }
663 }
664 }
665
666 Ok(())
667 }
668}
669
670impl<T: Timestamp + Codec64> BlobTraceBatchPart<T> {
671 pub fn validate(&self) -> Result<(), Error> {
674 if PartialOrder::less_equal(self.desc.upper(), self.desc.lower()) {
678 return Err(format!("invalid desc: {:?}", &self.desc).into());
679 }
680
681 let uncompacted = PartialOrder::less_equal(self.desc.since(), self.desc.lower());
682
683 for time in self.updates.timestamps().values() {
684 let ts = T::decode(time.to_le_bytes());
685 if !self.desc.lower().less_equal(&ts) {
687 return Err(format!(
688 "timestamp {:?} is less than the batch lower: {:?}",
689 ts, self.desc
690 )
691 .into());
692 }
693
694 if uncompacted && self.desc.upper().less_equal(&ts) {
699 return Err(format!(
700 "timestamp {:?} is greater than or equal to the batch upper: {:?}",
701 ts, self.desc
702 )
703 .into());
704 }
705 }
706
707 for (row_idx, diff) in self.updates.diffs().values().iter().enumerate() {
708 let diff: u64 = Codec64::decode(diff.to_le_bytes());
710
711 if diff == 0 {
713 return Err(format!("update with 0 diff at row {row_idx}",).into());
714 }
715 }
716
717 Ok(())
718 }
719
720 pub fn encode<B>(&self, buf: &mut B, metrics: &ColumnarMetrics, cfg: &EncodingConfig)
722 where
723 B: BufMut + Send,
724 {
725 encode_trace_parquet(&mut buf.writer(), self, metrics, cfg).expect("batch was invalid");
726 }
727
728 pub fn decode(buf: &SegmentedBytes, metrics: &ColumnarMetrics) -> Result<Self, Error> {
730 decode_trace_parquet(buf.clone(), metrics)
731 }
732
733 pub fn key_lower(&self) -> &[u8] {
735 self.updates
736 .records()
737 .and_then(|r| r.keys().iter().flatten().min())
738 .unwrap_or(&[])
739 }
740
741 pub fn structured_key_lower(&self) -> Option<ArrayBound> {
743 self.updates.structured().and_then(|r| {
744 let ord = ArrayOrd::new(&r.key);
745 (0..r.key.len())
746 .min_by_key(|i| ord.at(*i))
747 .map(|i| ArrayBound::new(Arc::clone(&r.key), i))
748 })
749 }
750}
751
752impl<T: Timestamp + Codec64> From<ProtoU64Description> for Description<T> {
753 fn from(x: ProtoU64Description) -> Self {
754 Description::new(
755 x.lower
756 .map_or_else(|| Antichain::from_elem(T::minimum()), |x| x.into()),
757 x.upper
758 .map_or_else(|| Antichain::from_elem(T::minimum()), |x| x.into()),
759 x.since
760 .map_or_else(|| Antichain::from_elem(T::minimum()), |x| x.into()),
761 )
762 }
763}
764
765impl<T: Timestamp + Codec64> From<ProtoU64Antichain> for Antichain<T> {
766 fn from(x: ProtoU64Antichain) -> Self {
767 Antichain::from(
768 x.elements
769 .into_iter()
770 .map(|x| T::decode(u64::to_le_bytes(x)))
771 .collect::<Vec<_>>(),
772 )
773 }
774}
775
776impl<T: Timestamp + Codec64> From<&Antichain<T>> for ProtoU64Antichain {
777 fn from(x: &Antichain<T>) -> Self {
778 ProtoU64Antichain {
779 elements: x
780 .elements()
781 .iter()
782 .map(|x| u64::from_le_bytes(T::encode(x)))
783 .collect(),
784 }
785 }
786}
787
788impl<T: Timestamp + Codec64> From<&Description<T>> for ProtoU64Description {
789 fn from(x: &Description<T>) -> Self {
790 ProtoU64Description {
791 lower: Some(x.lower().into()),
792 upper: Some(x.upper().into()),
793 since: Some(x.since().into()),
794 }
795 }
796}
797
798pub fn encode_trace_inline_meta<T: Timestamp + Codec64>(batch: &BlobTraceBatchPart<T>) -> String {
800 let (format, format_metadata) = match &batch.updates {
801 BlobTraceUpdates::Row(_) => (ProtoBatchFormat::ParquetKvtd, None),
803 BlobTraceUpdates::Both { .. } => {
805 let metadata = ProtoFormatMetadata::StructuredMigration(2);
806 (ProtoBatchFormat::ParquetStructured, Some(metadata))
807 }
808 BlobTraceUpdates::Structured { .. } => {
809 let metadata = ProtoFormatMetadata::StructuredMigration(3);
810 (ProtoBatchFormat::ParquetStructured, Some(metadata))
811 }
812 };
813
814 let inline = ProtoBatchPartInline {
815 format: format.into(),
816 desc: Some((&batch.desc).into()),
817 index: batch.index,
818 format_metadata,
819 };
820 let inline_encoded = inline.encode_to_vec();
821 base64::engine::general_purpose::STANDARD.encode(inline_encoded)
822}
823
824pub fn decode_trace_inline_meta(
826 inline_base64: Option<&String>,
827) -> Result<(ProtoBatchFormat, ProtoBatchPartInline), Error> {
828 let inline_base64 = inline_base64.ok_or("missing batch metadata")?;
829 let inline_encoded = base64::engine::general_purpose::STANDARD
830 .decode(inline_base64)
831 .map_err(|err| err.to_string())?;
832 let inline = ProtoBatchPartInline::decode(&*inline_encoded).map_err(|err| err.to_string())?;
833 let format = ProtoBatchFormat::try_from(inline.format)
834 .map_err(|_| Error::from(format!("unknown format: {}", inline.format)))?;
835 Ok((format, inline))
836}
837
838#[cfg(test)]
839mod tests {
840 use std::sync::Arc;
841
842 use bytes::Bytes;
843
844 use crate::error::Error;
845 use crate::indexed::columnar::ColumnarRecordsBuilder;
846 use crate::mem::{MemBlob, MemBlobConfig};
847 use crate::metrics::ColumnarMetrics;
848 use crate::workload::DataGenerator;
849
850 use super::*;
851
852 fn update_with_key(ts: u64, key: &'static str) -> ((Vec<u8>, Vec<u8>), u64, i64) {
853 ((key.into(), "".into()), ts, 1)
854 }
855
856 fn u64_desc(lower: u64, upper: u64) -> Description<u64> {
857 Description::new(
858 Antichain::from_elem(lower),
859 Antichain::from_elem(upper),
860 Antichain::from_elem(0),
861 )
862 }
863
864 fn batch_meta(lower: u64, upper: u64) -> TraceBatchMeta {
865 TraceBatchMeta {
866 keys: vec![],
867 format: ProtoBatchFormat::Unknown,
868 desc: u64_desc(lower, upper),
869 level: 1,
870 size_bytes: 0,
871 }
872 }
873
874 fn u64_desc_since(lower: u64, upper: u64, since: u64) -> Description<u64> {
875 Description::new(
876 Antichain::from_elem(lower),
877 Antichain::from_elem(upper),
878 Antichain::from_elem(since),
879 )
880 }
881
882 fn columnar_records(updates: Vec<((Vec<u8>, Vec<u8>), u64, i64)>) -> BlobTraceUpdates {
883 let mut builder = ColumnarRecordsBuilder::default();
884 for ((k, v), t, d) in updates {
885 assert!(builder.push(((&k, &v), Codec64::encode(&t), Codec64::encode(&d))));
886 }
887 let updates = builder.finish(&ColumnarMetrics::disconnected());
888 BlobTraceUpdates::Row(updates)
889 }
890
891 #[mz_ore::test]
892 fn trace_batch_validate() {
893 let b = BlobTraceBatchPart {
895 desc: u64_desc(0, 2),
896 index: 0,
897 updates: columnar_records(vec![update_with_key(0, "0"), update_with_key(1, "1")]),
898 };
899 assert_eq!(b.validate(), Ok(()));
900
901 let b = BlobTraceBatchPart {
903 desc: u64_desc(0, 2),
904 index: 0,
905 updates: columnar_records(vec![]),
906 };
907 assert_eq!(b.validate(), Ok(()));
908
909 let b = BlobTraceBatchPart {
911 desc: u64_desc(2, 0),
912 index: 0,
913 updates: columnar_records(vec![]),
914 };
915 assert_eq!(
916 b.validate(),
917 Err(Error::from(
918 "invalid desc: Description { lower: Antichain { elements: [2] }, upper: Antichain { elements: [0] }, since: Antichain { elements: [0] } }"
919 ))
920 );
921
922 let b = BlobTraceBatchPart {
924 desc: u64_desc(0, 0),
925 index: 0,
926 updates: columnar_records(vec![]),
927 };
928 assert_eq!(
929 b.validate(),
930 Err(Error::from(
931 "invalid desc: Description { lower: Antichain { elements: [0] }, upper: Antichain { elements: [0] }, since: Antichain { elements: [0] } }"
932 ))
933 );
934
935 let b = BlobTraceBatchPart {
937 desc: u64_desc(1, 2),
938 index: 0,
939 updates: columnar_records(vec![update_with_key(0, "0")]),
940 };
941 assert_eq!(
942 b.validate(),
943 Err(Error::from(
944 "timestamp 0 is less than the batch lower: Description { lower: Antichain { elements: [1] }, upper: Antichain { elements: [2] }, since: Antichain { elements: [0] } }"
945 ))
946 );
947
948 let b = BlobTraceBatchPart {
950 desc: u64_desc(1, 2),
951 index: 0,
952 updates: columnar_records(vec![update_with_key(2, "0")]),
953 };
954 assert_eq!(
955 b.validate(),
956 Err(Error::from(
957 "timestamp 2 is greater than or equal to the batch upper: Description { lower: Antichain { elements: [1] }, upper: Antichain { elements: [2] }, since: Antichain { elements: [0] } }"
958 ))
959 );
960
961 let b = BlobTraceBatchPart {
963 desc: u64_desc_since(1, 2, 4),
964 index: 0,
965 updates: columnar_records(vec![update_with_key(2, "0")]),
966 };
967 assert_eq!(b.validate(), Ok(()));
968
969 let b = BlobTraceBatchPart {
971 desc: u64_desc_since(1, 2, 4),
972 index: 0,
973 updates: columnar_records(vec![update_with_key(4, "0")]),
974 };
975 assert_eq!(b.validate(), Ok(()));
976
977 let b = BlobTraceBatchPart {
979 desc: u64_desc_since(1, 2, 4),
980 index: 0,
981 updates: columnar_records(vec![update_with_key(5, "0")]),
982 };
983 assert_eq!(b.validate(), Ok(()));
984
985 let b = BlobTraceBatchPart {
987 desc: u64_desc(0, 1),
988 index: 0,
989 updates: columnar_records(vec![(("0".into(), "0".into()), 0, 0)]),
990 };
991 assert_eq!(
992 b.validate(),
993 Err(Error::from("update with 0 diff at row 0"))
994 );
995 }
996
997 #[mz_ore::test]
998 fn trace_batch_meta_validate() {
999 let b = batch_meta(0, 1);
1001 assert_eq!(b.validate(), Ok(()));
1002
1003 let b = batch_meta(0, 0);
1005 assert_eq!(
1006 b.validate(),
1007 Err(Error::from(
1008 "invalid desc: Description { lower: Antichain { elements: [0] }, upper: Antichain { elements: [0] }, since: Antichain { elements: [0] } }"
1009 )),
1010 );
1011
1012 let b = batch_meta(2, 0);
1014 assert_eq!(
1015 b.validate(),
1016 Err(Error::from(
1017 "invalid desc: Description { lower: Antichain { elements: [2] }, upper: Antichain { elements: [0] }, since: Antichain { elements: [0] } }"
1018 )),
1019 );
1020 }
1021
1022 async fn expect_set_trace_batch<T: Timestamp + Codec64>(
1023 blob: &dyn Blob,
1024 key: &str,
1025 batch: &BlobTraceBatchPart<T>,
1026 ) -> u64 {
1027 let mut val = Vec::new();
1028 let metrics = ColumnarMetrics::disconnected();
1029 let config = EncodingConfig::default();
1030 batch.encode(&mut val, &metrics, &config);
1031 let val = Bytes::from(val);
1032 let val_len = u64::cast_from(val.len());
1033 blob.set(key, val).await.expect("failed to set trace batch");
1034 val_len
1035 }
1036
1037 #[mz_ore::test(tokio::test)]
1038 #[cfg_attr(miri, ignore)] async fn trace_batch_meta_validate_data() -> Result<(), Error> {
1040 let metrics = ColumnarMetrics::disconnected();
1041 let blob = Arc::new(MemBlob::open(MemBlobConfig::default()));
1042 let format = ProtoBatchFormat::ParquetKvtd;
1043
1044 let batch_desc = u64_desc_since(0, 3, 0);
1045 let batch0 = BlobTraceBatchPart {
1046 desc: batch_desc.clone(),
1047 index: 0,
1048 updates: columnar_records(vec![
1049 (("k".as_bytes().to_vec(), "v".as_bytes().to_vec()), 2, 1),
1050 (("k3".as_bytes().to_vec(), "v3".as_bytes().to_vec()), 2, 1),
1051 ]),
1052 };
1053 let batch1 = BlobTraceBatchPart {
1054 desc: batch_desc.clone(),
1055 index: 1,
1056 updates: columnar_records(vec![
1057 (("k4".as_bytes().to_vec(), "v4".as_bytes().to_vec()), 2, 1),
1058 (("k5".as_bytes().to_vec(), "v5".as_bytes().to_vec()), 2, 1),
1059 ]),
1060 };
1061
1062 let batch0_size_bytes = expect_set_trace_batch(blob.as_ref(), "b0", &batch0).await;
1063 let batch1_size_bytes = expect_set_trace_batch(blob.as_ref(), "b1", &batch1).await;
1064 let size_bytes = batch0_size_bytes + batch1_size_bytes;
1065 let batch_meta = TraceBatchMeta {
1066 keys: vec!["b0".into(), "b1".into()],
1067 format,
1068 desc: batch_desc.clone(),
1069 level: 0,
1070 size_bytes,
1071 };
1072
1073 assert_eq!(
1075 batch_meta.validate_data(blob.as_ref(), &metrics).await,
1076 Ok(())
1077 );
1078
1079 let batch_meta = TraceBatchMeta {
1081 keys: vec!["b0".into(), "b1".into()],
1082 format,
1083 desc: u64_desc_since(1, 3, 0),
1084 level: 0,
1085 size_bytes,
1086 };
1087 assert_eq!(
1088 batch_meta.validate_data(blob.as_ref(), &metrics).await,
1089 Err(Error::from(
1090 "invalid trace batch part desc expected Description { lower: Antichain { elements: [1] }, upper: Antichain { elements: [3] }, since: Antichain { elements: [0] } } got Description { lower: Antichain { elements: [0] }, upper: Antichain { elements: [3] }, since: Antichain { elements: [0] } }"
1091 ))
1092 );
1093 let batch_meta = TraceBatchMeta {
1095 keys: vec!["b0".into(), "b1".into(), "b2".into()],
1096 format,
1097 desc: batch_desc.clone(),
1098 level: 0,
1099 size_bytes,
1100 };
1101 assert_eq!(
1102 batch_meta.validate_data(blob.as_ref(), &metrics).await,
1103 Err(Error::from("no blob for trace batch at key: b2"))
1104 );
1105 let batch_meta = TraceBatchMeta {
1107 keys: vec!["b1".into(), "b0".into()],
1108 format,
1109 desc: batch_desc,
1110 level: 0,
1111 size_bytes,
1112 };
1113 assert_eq!(
1114 batch_meta.validate_data(blob.as_ref(), &metrics).await,
1115 Err(Error::from(
1116 "invalid index for blob trace batch part at key b1 expected 0 got 1"
1117 ))
1118 );
1119
1120 Ok(())
1121 }
1122
1123 #[mz_ore::test]
1124 #[cfg_attr(miri, ignore)] fn encoded_batch_sizes() {
1126 fn sizes(data: DataGenerator) -> usize {
1127 let metrics = ColumnarMetrics::disconnected();
1128 let config = EncodingConfig::default();
1129 let updates: Vec<_> = data.batches().collect();
1130 let updates = BlobTraceUpdates::Row(ColumnarRecords::concat(&updates, &metrics));
1131 let trace = BlobTraceBatchPart {
1132 desc: Description::new(
1133 Antichain::from_elem(0u64),
1134 Antichain::new(),
1135 Antichain::from_elem(0u64),
1136 ),
1137 index: 0,
1138 updates,
1139 };
1140 let mut trace_buf = Vec::new();
1141 trace.encode(&mut trace_buf, &metrics, &config);
1142 trace_buf.len()
1143 }
1144
1145 let record_size_bytes = DataGenerator::default().record_size_bytes;
1146 assert_eq!(
1149 format!(
1150 "1/1={:?} 25/1={:?} 1000/1={:?} 1000/100={:?}",
1151 sizes(DataGenerator::new(1, record_size_bytes, 1)),
1152 sizes(DataGenerator::new(25, record_size_bytes, 25)),
1153 sizes(DataGenerator::new(1_000, record_size_bytes, 1_000)),
1154 sizes(DataGenerator::new(1_000, record_size_bytes, 1_000 / 100)),
1155 ),
1156 "1/1=903 25/1=2649 1000/1=72881 1000/100=72881"
1157 );
1158 }
1159}