1use std::fmt::{self, Debug};
13use std::marker::PhantomData;
14use std::sync::Arc;
15use std::time::Instant;
16
17use anyhow::anyhow;
18use arrow::array::{Array, ArrayRef, AsArray, BooleanArray, Int64Array};
19use arrow::compute::FilterBuilder;
20use differential_dataflow::difference::Monoid;
21use differential_dataflow::lattice::Lattice;
22use differential_dataflow::trace::Description;
23use itertools::EitherOrBoth;
24use mz_dyncfg::{Config, ConfigSet, ConfigValHandle};
25use mz_ore::bytes::SegmentedBytes;
26use mz_ore::cast::CastFrom;
27use mz_ore::{soft_assert_or_log, soft_panic_no_log, soft_panic_or_log};
28use mz_persist::indexed::columnar::arrow::{realloc_any, realloc_array};
29use mz_persist::indexed::columnar::{ColumnarRecords, ColumnarRecordsStructuredExt};
30use mz_persist::indexed::encoding::{BlobTraceBatchPart, BlobTraceUpdates};
31use mz_persist::location::{Blob, SeqNo};
32use mz_persist::metrics::ColumnarMetrics;
33use mz_persist_types::arrow::ArrayOrd;
34use mz_persist_types::columnar::{ColumnDecoder, Schema, data_type};
35use mz_persist_types::part::Codec64Mut;
36use mz_persist_types::schema::backward_compatible;
37use mz_persist_types::stats::PartStats;
38use mz_persist_types::{Codec, Codec64};
39use mz_proto::RustType;
40use serde::{Deserialize, Serialize};
41use timely::PartialOrder;
42use timely::progress::frontier::AntichainRef;
43use timely::progress::{Antichain, Timestamp};
44use tracing::{Instrument, debug, debug_span, trace_span};
45
46use crate::ShardId;
47use crate::cfg::PersistConfig;
48use crate::error::InvalidUsage;
49use crate::internal::encoding::{LazyInlineBatchPart, LazyPartStats, LazyProto, Schemas};
50use crate::internal::machine::retry_external;
51use crate::internal::metrics::{Metrics, MetricsPermits, ReadMetrics, ShardMetrics};
52use crate::internal::paths::BlobKey;
53use crate::internal::state::{
54 BatchPart, HollowBatchPart, ProtoHollowBatchPart, ProtoInlineBatchPart,
55};
56use crate::read::LeasedReaderId;
57use crate::schema::{PartMigration, SchemaCache};
58
59pub(crate) const FETCH_SEMAPHORE_COST_ADJUSTMENT: Config<f64> = Config::new(
60 "persist_fetch_semaphore_cost_adjustment",
61 1.2,
65 "\
66 An adjustment multiplied by encoded_size_bytes to approximate an upper \
67 bound on the size in lgalloc, which includes the decoded version.",
68);
69
70pub(crate) const FETCH_SEMAPHORE_PERMIT_ADJUSTMENT: Config<f64> = Config::new(
71 "persist_fetch_semaphore_permit_adjustment",
72 1.0,
73 "\
74 A limit on the number of outstanding persist bytes being fetched and \
75 parsed, expressed as a multiplier of the process's memory limit. This data \
76 all spills to lgalloc, so values > 1.0 are safe. Only applied to cc \
77 replicas.",
78);
79
80pub(crate) const PART_DECODE_FORMAT: Config<&'static str> = Config::new(
81 "persist_part_decode_format",
82 PartDecodeFormat::default().as_str(),
83 "\
84 Format we'll use to decode a Persist Part, either 'row', \
85 'row_with_validate', or 'arrow' (Materialize).",
86);
87
88pub(crate) const OPTIMIZE_IGNORED_DATA_FETCH: Config<bool> = Config::new(
89 "persist_optimize_ignored_data_fetch",
90 true,
91 "CYA to allow opt-out of a performance optimization to skip fetching ignored data",
92);
93
94pub(crate) const VALIDATE_PART_BOUNDS_ON_READ: Config<bool> = Config::new(
95 "persist_validate_part_bounds_on_read",
96 false,
97 "Validate the part lower <= the batch lower and the part upper <= batch upper,\
98 for the batch containing that part",
99);
100
101#[derive(Debug, Clone)]
102pub(crate) struct FetchConfig {
103 pub(crate) validate_bounds_on_read: bool,
104}
105
106impl FetchConfig {
107 pub fn from_persist_config(cfg: &PersistConfig) -> Self {
108 Self {
109 validate_bounds_on_read: VALIDATE_PART_BOUNDS_ON_READ.get(cfg),
110 }
111 }
112}
113
114#[derive(Debug, Clone)]
115pub(crate) struct BatchFetcherConfig {
116 pub(crate) part_decode_format: ConfigValHandle<String>,
117 pub(crate) fetch_config: FetchConfig,
118}
119
120impl BatchFetcherConfig {
121 pub fn new(value: &PersistConfig) -> Self {
122 Self {
123 part_decode_format: PART_DECODE_FORMAT.handle(value),
124 fetch_config: FetchConfig::from_persist_config(value),
125 }
126 }
127
128 pub fn part_decode_format(&self) -> PartDecodeFormat {
129 PartDecodeFormat::from_str(self.part_decode_format.get().as_str())
130 }
131}
132
133#[derive(Debug)]
135pub struct BatchFetcher<K, V, T, D>
136where
137 T: Timestamp + Lattice + Codec64,
138 K: Debug + Codec,
140 V: Debug + Codec,
141 D: Monoid + Codec64 + Send + Sync,
142{
143 pub(crate) cfg: BatchFetcherConfig,
144 pub(crate) blob: Arc<dyn Blob>,
145 pub(crate) metrics: Arc<Metrics>,
146 pub(crate) shard_metrics: Arc<ShardMetrics>,
147 pub(crate) shard_id: ShardId,
148 pub(crate) read_schemas: Schemas<K, V>,
149 pub(crate) schema_cache: SchemaCache<K, V, T, D>,
150 pub(crate) is_transient: bool,
151
152 pub(crate) _phantom: PhantomData<fn() -> (K, V, T, D)>,
155}
156
157impl<K, V, T, D> BatchFetcher<K, V, T, D>
158where
159 K: Debug + Codec,
160 V: Debug + Codec,
161 T: Timestamp + Lattice + Codec64 + Sync,
162 D: Monoid + Codec64 + Send + Sync,
163{
164 pub async fn fetch_leased_part(
169 &mut self,
170 part: ExchangeableBatchPart<T>,
171 ) -> Result<Result<FetchedBlob<K, V, T, D>, BlobKey>, InvalidUsage<T>> {
172 let ExchangeableBatchPart {
173 shard_id,
174 encoded_size_bytes: _,
175 desc,
176 filter,
177 filter_pushdown_audit,
178 part,
179 } = part;
180 let part: BatchPart<T> = part.decode_to().expect("valid part");
181 if shard_id != self.shard_id {
182 return Err(InvalidUsage::BatchNotFromThisShard {
183 batch_shard: shard_id,
184 handle_shard: self.shard_id.clone(),
185 });
186 }
187
188 let migration =
189 PartMigration::new(&part, self.read_schemas.clone(), &mut self.schema_cache)
190 .await
191 .unwrap_or_else(|read_schemas| {
192 panic!(
193 "could not decode part {:?} with schema: {:?}",
194 part.schema_id(),
195 read_schemas
196 )
197 });
198
199 let (buf, fetch_permit) = match &part {
200 BatchPart::Hollow(x) => {
201 let fetch_permit = self
202 .metrics
203 .semaphore
204 .acquire_fetch_permits(x.encoded_size_bytes)
205 .await;
206 let read_metrics = if self.is_transient {
207 &self.metrics.read.unindexed
208 } else {
209 &self.metrics.read.batch_fetcher
210 };
211 let buf = fetch_batch_part_blob(
212 &shard_id,
213 self.blob.as_ref(),
214 &self.metrics,
215 &self.shard_metrics,
216 read_metrics,
217 x,
218 )
219 .await;
220 let buf = match buf {
221 Ok(buf) => buf,
222 Err(key) => return Ok(Err(key)),
223 };
224 let buf = FetchedBlobBuf::Hollow {
225 buf,
226 part: x.clone(),
227 };
228 (buf, Some(Arc::new(fetch_permit)))
229 }
230 BatchPart::Inline {
231 updates,
232 ts_rewrite,
233 ..
234 } => {
235 let buf = FetchedBlobBuf::Inline {
236 desc: desc.clone(),
237 updates: updates.clone(),
238 ts_rewrite: ts_rewrite.clone(),
239 };
240 (buf, None)
241 }
242 };
243 let fetched_blob = FetchedBlob {
244 metrics: Arc::clone(&self.metrics),
245 read_metrics: self.metrics.read.batch_fetcher.clone(),
246 buf,
247 registered_desc: desc.clone(),
248 migration,
249 filter: filter.clone(),
250 filter_pushdown_audit,
251 structured_part_audit: self.cfg.part_decode_format(),
252 fetch_permit,
253 _phantom: PhantomData,
254 fetch_config: self.cfg.fetch_config.clone(),
255 };
256 Ok(Ok(fetched_blob))
257 }
258}
259
260#[derive(Debug, Clone, Serialize, Deserialize)]
261pub(crate) enum FetchBatchFilter<T> {
262 Snapshot {
263 as_of: Antichain<T>,
264 },
265 Listen {
266 as_of: Antichain<T>,
267 lower: Antichain<T>,
268 },
269 Compaction {
270 since: Antichain<T>,
271 },
272}
273
274impl<T: Timestamp + Lattice> FetchBatchFilter<T> {
275 pub(crate) fn filter_ts(&self, t: &mut T) -> bool {
276 match self {
277 FetchBatchFilter::Snapshot { as_of } => {
278 if as_of.less_than(t) {
280 return false;
281 }
282 t.advance_by(as_of.borrow());
283 true
284 }
285 FetchBatchFilter::Listen { as_of, lower } => {
286 if !as_of.less_than(t) {
288 return false;
289 }
290
291 if !lower.less_equal(t) {
299 return false;
300 }
301 true
302 }
303 FetchBatchFilter::Compaction { since } => {
304 t.advance_by(since.borrow());
305 true
306 }
307 }
308 }
309}
310
311pub(crate) async fn fetch_leased_part<K, V, T, D>(
316 cfg: &PersistConfig,
317 part: &LeasedBatchPart<T>,
318 blob: &dyn Blob,
319 metrics: Arc<Metrics>,
320 read_metrics: &ReadMetrics,
321 shard_metrics: &ShardMetrics,
322 reader_id: &LeasedReaderId,
323 read_schemas: Schemas<K, V>,
324 schema_cache: &mut SchemaCache<K, V, T, D>,
325) -> FetchedPart<K, V, T, D>
326where
327 K: Debug + Codec,
328 V: Debug + Codec,
329 T: Timestamp + Lattice + Codec64 + Sync,
330 D: Monoid + Codec64 + Send + Sync,
331{
332 let fetch_config = FetchConfig::from_persist_config(cfg);
333 let encoded_part = EncodedPart::fetch(
334 &fetch_config,
335 &part.shard_id,
336 blob,
337 &metrics,
338 shard_metrics,
339 read_metrics,
340 &part.desc,
341 &part.part,
342 )
343 .await
344 .unwrap_or_else(|blob_key| {
345 panic!("{} could not fetch batch part: {}", reader_id, blob_key)
354 });
355 let part_cfg = BatchFetcherConfig::new(cfg);
356 let migration = PartMigration::new(&part.part, read_schemas, schema_cache)
357 .await
358 .unwrap_or_else(|read_schemas| {
359 panic!(
360 "could not decode part {:?} with schema: {:?}",
361 part.part.schema_id(),
362 read_schemas
363 )
364 });
365 FetchedPart::new(
366 metrics,
367 encoded_part,
368 migration,
369 part.filter.clone(),
370 part.filter_pushdown_audit,
371 part_cfg.part_decode_format(),
372 part.part.stats(),
373 )
374}
375
376pub(crate) async fn fetch_batch_part_blob<T>(
377 shard_id: &ShardId,
378 blob: &dyn Blob,
379 metrics: &Metrics,
380 shard_metrics: &ShardMetrics,
381 read_metrics: &ReadMetrics,
382 part: &HollowBatchPart<T>,
383) -> Result<SegmentedBytes, BlobKey> {
384 let now = Instant::now();
385 let get_span = debug_span!("fetch_batch::get");
386 let blob_key = part.key.complete(shard_id);
387 let value = retry_external(&metrics.retries.external.fetch_batch_get, || async {
388 shard_metrics.blob_gets.inc();
389 blob.get(&blob_key).await
390 })
391 .instrument(get_span.clone())
392 .await
393 .ok_or(blob_key)?;
394
395 drop(get_span);
396
397 read_metrics.part_count.inc();
398 read_metrics.part_bytes.inc_by(u64::cast_from(value.len()));
399 read_metrics.seconds.inc_by(now.elapsed().as_secs_f64());
400
401 Ok(value)
402}
403
404pub(crate) fn decode_batch_part_blob<T>(
405 cfg: &FetchConfig,
406 metrics: &Metrics,
407 read_metrics: &ReadMetrics,
408 registered_desc: Description<T>,
409 part: &HollowBatchPart<T>,
410 buf: &SegmentedBytes,
411) -> EncodedPart<T>
412where
413 T: Timestamp + Lattice + Codec64,
414{
415 trace_span!("fetch_batch::decode").in_scope(|| {
416 let parsed = metrics
417 .codecs
418 .batch
419 .decode(|| BlobTraceBatchPart::decode(buf, &metrics.columnar))
420 .map_err(|err| anyhow!("couldn't decode batch at key {}: {}", part.key, err))
421 .expect("internal error: invalid encoded state");
426 read_metrics
427 .part_goodbytes
428 .inc_by(u64::cast_from(parsed.updates.goodbytes()));
429 EncodedPart::from_hollow(cfg, read_metrics.clone(), registered_desc, part, parsed)
430 })
431}
432
433pub(crate) async fn fetch_batch_part<T>(
434 cfg: &FetchConfig,
435 shard_id: &ShardId,
436 blob: &dyn Blob,
437 metrics: &Metrics,
438 shard_metrics: &ShardMetrics,
439 read_metrics: &ReadMetrics,
440 registered_desc: &Description<T>,
441 part: &HollowBatchPart<T>,
442) -> Result<EncodedPart<T>, BlobKey>
443where
444 T: Timestamp + Lattice + Codec64,
445{
446 let buf =
447 fetch_batch_part_blob(shard_id, blob, metrics, shard_metrics, read_metrics, part).await?;
448 let part = decode_batch_part_blob(
449 cfg,
450 metrics,
451 read_metrics,
452 registered_desc.clone(),
453 part,
454 &buf,
455 );
456 Ok(part)
457}
458
459#[derive(Clone, Debug)]
466pub struct Lease(Arc<SeqNo>);
467
468impl Lease {
469 pub fn new(seqno: SeqNo) -> Self {
471 Self(Arc::new(seqno))
472 }
473
474 pub fn seqno(&self) -> SeqNo {
476 *self.0
477 }
478
479 pub fn count(&self) -> usize {
481 Arc::strong_count(&self.0)
482 }
483}
484
485#[derive(Debug)]
511pub struct LeasedBatchPart<T> {
512 pub(crate) metrics: Arc<Metrics>,
513 pub(crate) shard_id: ShardId,
514 pub(crate) filter: FetchBatchFilter<T>,
515 pub(crate) desc: Description<T>,
516 pub(crate) part: BatchPart<T>,
517 pub(crate) lease: Lease,
520 pub(crate) filter_pushdown_audit: bool,
521}
522
523impl<T> LeasedBatchPart<T>
524where
525 T: Timestamp + Codec64,
526{
527 pub(crate) fn into_exchangeable_part(self) -> (ExchangeableBatchPart<T>, Lease) {
537 let lease = self.lease.clone();
539 let part = ExchangeableBatchPart {
540 shard_id: self.shard_id,
541 encoded_size_bytes: self.part.encoded_size_bytes(),
542 desc: self.desc.clone(),
543 filter: self.filter.clone(),
544 part: LazyProto::from(&self.part.into_proto()),
545 filter_pushdown_audit: self.filter_pushdown_audit,
546 };
547 (part, lease)
548 }
549
550 pub fn encoded_size_bytes(&self) -> usize {
552 self.part.encoded_size_bytes()
553 }
554
555 pub fn request_filter_pushdown_audit(&mut self) {
560 self.filter_pushdown_audit = true;
561 }
562
563 pub fn stats(&self) -> Option<PartStats> {
565 self.part.stats().map(|x| x.decode())
566 }
567
568 pub fn maybe_optimize(&mut self, cfg: &ConfigSet, key: ArrayRef, val: ArrayRef) {
571 assert_eq!(key.len(), 1, "expect a single-row key array");
572 assert_eq!(val.len(), 1, "expect a single-row val array");
573 let as_of = match &self.filter {
574 FetchBatchFilter::Snapshot { as_of } => as_of,
575 FetchBatchFilter::Listen { .. } | FetchBatchFilter::Compaction { .. } => return,
576 };
577 if !OPTIMIZE_IGNORED_DATA_FETCH.get(cfg) {
578 return;
579 }
580 let (diffs_sum, _stats) = match &self.part {
581 BatchPart::Hollow(x) => (x.diffs_sum, x.stats.as_ref()),
582 BatchPart::Inline { .. } => return,
583 };
584 debug!(
585 "try_optimize_ignored_data_fetch diffs_sum={:?} as_of={:?} lower={:?} upper={:?}",
586 diffs_sum.map(i64::decode),
588 as_of.elements(),
589 self.desc.lower().elements(),
590 self.desc.upper().elements()
591 );
592 let as_of = match &as_of.elements() {
593 &[as_of] => as_of,
594 _ => return,
595 };
596 let eligible = self.desc.upper().less_equal(as_of) && self.desc.since().less_equal(as_of);
597 if !eligible {
598 return;
599 }
600 let Some(diffs_sum) = diffs_sum else {
601 return;
602 };
603
604 debug!(
605 "try_optimize_ignored_data_fetch faked {:?} diffs at ts {:?} skipping fetch of {} bytes",
606 i64::decode(diffs_sum),
608 as_of,
609 self.part.encoded_size_bytes(),
610 );
611 self.metrics.pushdown.parts_faked_count.inc();
612 self.metrics
613 .pushdown
614 .parts_faked_bytes
615 .inc_by(u64::cast_from(self.part.encoded_size_bytes()));
616 let timestamps = {
617 let mut col = Codec64Mut::with_capacity(1);
618 col.push(as_of);
619 col.finish()
620 };
621 let diffs = {
622 let mut col = Codec64Mut::with_capacity(1);
623 col.push_raw(diffs_sum);
624 col.finish()
625 };
626 let updates = BlobTraceUpdates::Structured {
627 key_values: ColumnarRecordsStructuredExt { key, val },
628 timestamps,
629 diffs,
630 };
631 let faked_data = LazyInlineBatchPart::from(&ProtoInlineBatchPart {
632 desc: Some(self.desc.into_proto()),
633 index: 0,
634 updates: Some(updates.into_proto()),
635 });
636 self.part = BatchPart::Inline {
637 updates: faked_data,
638 ts_rewrite: None,
639 schema_id: None,
640 deprecated_schema_id: None,
641 };
642 }
643}
644
645impl<T> Drop for LeasedBatchPart<T> {
646 fn drop(&mut self) {
648 self.metrics.lease.dropped_part.inc()
649 }
650}
651
652#[derive(Debug)]
657pub struct FetchedBlob<K: Codec, V: Codec, T, D> {
658 metrics: Arc<Metrics>,
659 read_metrics: ReadMetrics,
660 buf: FetchedBlobBuf<T>,
661 registered_desc: Description<T>,
662 migration: PartMigration<K, V>,
663 filter: FetchBatchFilter<T>,
664 filter_pushdown_audit: bool,
665 structured_part_audit: PartDecodeFormat,
666 fetch_permit: Option<Arc<MetricsPermits>>,
667 fetch_config: FetchConfig,
668 _phantom: PhantomData<fn() -> D>,
669}
670
671#[derive(Debug, Clone)]
672enum FetchedBlobBuf<T> {
673 Hollow {
674 buf: SegmentedBytes,
675 part: HollowBatchPart<T>,
676 },
677 Inline {
678 desc: Description<T>,
679 updates: LazyInlineBatchPart,
680 ts_rewrite: Option<Antichain<T>>,
681 },
682}
683
684impl<K: Codec, V: Codec, T: Clone, D> Clone for FetchedBlob<K, V, T, D> {
685 fn clone(&self) -> Self {
686 Self {
687 metrics: Arc::clone(&self.metrics),
688 read_metrics: self.read_metrics.clone(),
689 buf: self.buf.clone(),
690 registered_desc: self.registered_desc.clone(),
691 migration: self.migration.clone(),
692 filter: self.filter.clone(),
693 filter_pushdown_audit: self.filter_pushdown_audit.clone(),
694 fetch_permit: self.fetch_permit.clone(),
695 structured_part_audit: self.structured_part_audit.clone(),
696 fetch_config: self.fetch_config.clone(),
697 _phantom: self._phantom.clone(),
698 }
699 }
700}
701
702pub struct ShardSourcePart<K: Codec, V: Codec, T, D> {
705 pub part: FetchedPart<K, V, T, D>,
707 fetch_permit: Option<Arc<MetricsPermits>>,
708}
709
710impl<K, V, T: Debug, D: Debug> Debug for ShardSourcePart<K, V, T, D>
711where
712 K: Codec + Debug,
713 <K as Codec>::Storage: Debug,
714 V: Codec + Debug,
715 <V as Codec>::Storage: Debug,
716{
717 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
718 let ShardSourcePart { part, fetch_permit } = self;
719 f.debug_struct("ShardSourcePart")
720 .field("part", part)
721 .field("fetch_permit", fetch_permit)
722 .finish()
723 }
724}
725
726impl<K: Codec, V: Codec, T: Timestamp + Lattice + Codec64, D> FetchedBlob<K, V, T, D> {
727 pub fn parse(&self) -> ShardSourcePart<K, V, T, D> {
729 self.parse_internal(&self.fetch_config)
730 }
731
732 pub(crate) fn parse_internal(&self, cfg: &FetchConfig) -> ShardSourcePart<K, V, T, D> {
734 let (part, stats) = match &self.buf {
735 FetchedBlobBuf::Hollow { buf, part } => {
736 let parsed = decode_batch_part_blob(
737 cfg,
738 &self.metrics,
739 &self.read_metrics,
740 self.registered_desc.clone(),
741 part,
742 buf,
743 );
744 (parsed, part.stats.as_ref())
745 }
746 FetchedBlobBuf::Inline {
747 desc,
748 updates,
749 ts_rewrite,
750 } => {
751 let parsed = EncodedPart::from_inline(
752 cfg,
753 &self.metrics,
754 self.read_metrics.clone(),
755 desc.clone(),
756 updates,
757 ts_rewrite.as_ref(),
758 );
759 (parsed, None)
760 }
761 };
762 let part = FetchedPart::new(
763 Arc::clone(&self.metrics),
764 part,
765 self.migration.clone(),
766 self.filter.clone(),
767 self.filter_pushdown_audit,
768 self.structured_part_audit,
769 stats,
770 );
771 ShardSourcePart {
772 part,
773 fetch_permit: self.fetch_permit.clone(),
774 }
775 }
776
777 pub fn stats(&self) -> Option<PartStats> {
779 match &self.buf {
780 FetchedBlobBuf::Hollow { part, .. } => part.stats.as_ref().map(|x| x.decode()),
781 FetchedBlobBuf::Inline { .. } => None,
782 }
783 }
784}
785
786#[derive(Debug)]
791pub struct FetchedPart<K: Codec, V: Codec, T, D> {
792 metrics: Arc<Metrics>,
793 ts_filter: FetchBatchFilter<T>,
794 part: EitherOrBoth<
797 ColumnarRecords,
798 (
799 <K::Schema as Schema<K>>::Decoder,
800 <V::Schema as Schema<V>>::Decoder,
801 ),
802 >,
803 timestamps: Int64Array,
804 diffs: Int64Array,
805 migration: PartMigration<K, V>,
806 filter_pushdown_audit: Option<LazyPartStats>,
807 peek_stash: Option<((K, V), T, D)>,
808 part_cursor: usize,
809 key_storage: Option<K::Storage>,
810 val_storage: Option<V::Storage>,
811
812 _phantom: PhantomData<fn() -> D>,
813}
814
815impl<K: Codec, V: Codec, T: Timestamp + Lattice + Codec64, D> FetchedPart<K, V, T, D> {
816 pub(crate) fn new(
817 metrics: Arc<Metrics>,
818 part: EncodedPart<T>,
819 migration: PartMigration<K, V>,
820 ts_filter: FetchBatchFilter<T>,
821 filter_pushdown_audit: bool,
822 part_decode_format: PartDecodeFormat,
823 stats: Option<&LazyPartStats>,
824 ) -> Self {
825 let part_len = u64::cast_from(part.part.updates.len());
826 match &migration {
827 PartMigration::SameSchema { .. } => metrics.schema.migration_count_same.inc(),
828 PartMigration::Schemaless { .. } => {
829 metrics.schema.migration_count_codec.inc();
830 metrics.schema.migration_len_legacy_codec.inc_by(part_len);
831 }
832 PartMigration::Either { .. } => {
833 metrics.schema.migration_count_either.inc();
834 match part_decode_format {
835 PartDecodeFormat::Row {
836 validate_structured: false,
837 } => metrics.schema.migration_len_either_codec.inc_by(part_len),
838 PartDecodeFormat::Row {
839 validate_structured: true,
840 } => {
841 metrics.schema.migration_len_either_codec.inc_by(part_len);
842 metrics.schema.migration_len_either_arrow.inc_by(part_len);
843 }
844 PartDecodeFormat::Arrow => {
845 metrics.schema.migration_len_either_arrow.inc_by(part_len)
846 }
847 }
848 }
849 }
850
851 let filter_pushdown_audit = if filter_pushdown_audit {
852 stats.cloned()
853 } else {
854 None
855 };
856
857 let downcast_structured = |structured: ColumnarRecordsStructuredExt,
858 structured_only: bool| {
859 let key_size_before = ArrayOrd::new(&structured.key).goodbytes();
860
861 let structured = match &migration {
862 PartMigration::SameSchema { .. } => structured,
863 PartMigration::Schemaless { read } if structured_only => {
864 let start = Instant::now();
866 let read_key = data_type::<K>(&*read.key).ok()?;
867 let read_val = data_type::<V>(&*read.val).ok()?;
868 let key_migration = backward_compatible(structured.key.data_type(), &read_key)?;
869 let val_migration = backward_compatible(structured.val.data_type(), &read_val)?;
870 let key = key_migration.migrate(structured.key);
871 let val = val_migration.migrate(structured.val);
872 metrics
873 .schema
874 .migration_migrate_seconds
875 .inc_by(start.elapsed().as_secs_f64());
876 ColumnarRecordsStructuredExt { key, val }
877 }
878 PartMigration::Schemaless { .. } => return None,
879 PartMigration::Either {
880 write: _,
881 read: _,
882 key_migration,
883 val_migration,
884 } => {
885 let start = Instant::now();
886 let key = key_migration.migrate(structured.key);
887 let val = val_migration.migrate(structured.val);
888 metrics
889 .schema
890 .migration_migrate_seconds
891 .inc_by(start.elapsed().as_secs_f64());
892 ColumnarRecordsStructuredExt { key, val }
893 }
894 };
895
896 let read_schema = migration.codec_read();
897 let key = K::Schema::decoder_any(&*read_schema.key, &*structured.key);
898 let val = V::Schema::decoder_any(&*read_schema.val, &*structured.val);
899
900 match &key {
901 Ok(key_decoder) => {
902 let key_size_after = key_decoder.goodbytes();
903 let key_diff = key_size_before.saturating_sub(key_size_after);
904 metrics
905 .pushdown
906 .parts_projection_trimmed_bytes
907 .inc_by(u64::cast_from(key_diff));
908 }
909 Err(e) => {
910 soft_panic_or_log!("failed to create decoder: {e:#?}");
911 }
912 }
913
914 Some((key.ok()?, val.ok()?))
915 };
916
917 let updates = part.normalize(&metrics.columnar);
918 let timestamps = updates.timestamps().clone();
919 let diffs = updates.diffs().clone();
920 let part = match updates {
921 BlobTraceUpdates::Row(records) => EitherOrBoth::Left(records),
923 BlobTraceUpdates::Structured { key_values, .. } => EitherOrBoth::Right(
924 downcast_structured(key_values, true).expect("valid schemas for structured data"),
927 ),
928 BlobTraceUpdates::Both(records, ext) => match part_decode_format {
930 PartDecodeFormat::Row {
931 validate_structured: false,
932 } => EitherOrBoth::Left(records),
933 PartDecodeFormat::Row {
934 validate_structured: true,
935 } => match downcast_structured(ext, false) {
936 Some(decoders) => EitherOrBoth::Both(records, decoders),
937 None => EitherOrBoth::Left(records),
938 },
939 PartDecodeFormat::Arrow => match downcast_structured(ext, false) {
940 Some(decoders) => EitherOrBoth::Right(decoders),
941 None => EitherOrBoth::Left(records),
942 },
943 },
944 };
945
946 FetchedPart {
947 metrics,
948 ts_filter,
949 part,
950 peek_stash: None,
951 timestamps,
952 diffs,
953 migration,
954 filter_pushdown_audit,
955 part_cursor: 0,
956 key_storage: None,
957 val_storage: None,
958 _phantom: PhantomData,
959 }
960 }
961
962 pub fn is_filter_pushdown_audit(&self) -> Option<impl std::fmt::Debug + use<K, V, T, D>> {
968 self.filter_pushdown_audit.clone()
969 }
970}
971
972#[derive(Debug)]
975pub(crate) struct EncodedPart<T> {
976 metrics: ReadMetrics,
977 registered_desc: Description<T>,
978 part: BlobTraceBatchPart<T>,
979 needs_truncation: bool,
980 ts_rewrite: Option<Antichain<T>>,
981}
982
983impl<K, V, T, D> FetchedPart<K, V, T, D>
984where
985 K: Debug + Codec,
986 V: Debug + Codec,
987 T: Timestamp + Lattice + Codec64,
988 D: Monoid + Codec64 + Send + Sync,
989{
990 pub fn next_with_storage(
995 &mut self,
996 key: &mut Option<K>,
997 val: &mut Option<V>,
998 ) -> Option<((K, V), T, D)> {
999 let mut consolidated = self.peek_stash.take();
1000 loop {
1001 let next = if self.part_cursor < self.timestamps.len() {
1003 let next_idx = self.part_cursor;
1004 self.part_cursor += 1;
1005 let mut t = T::decode(self.timestamps.values()[next_idx].to_le_bytes());
1008 if !self.ts_filter.filter_ts(&mut t) {
1009 continue;
1010 }
1011 let d = D::decode(self.diffs.values()[next_idx].to_le_bytes());
1012 if d.is_zero() {
1013 continue;
1014 }
1015 let kv = self.decode_kv(next_idx, key, val);
1016 (kv, t, d)
1017 } else {
1018 break;
1019 };
1020
1021 if let Some((kv, t, d)) = &mut consolidated {
1023 let (kv_next, t_next, d_next) = &next;
1024 if kv == kv_next && t == t_next {
1025 d.plus_equals(d_next);
1026 if d.is_zero() {
1027 consolidated = None;
1028 }
1029 } else {
1030 self.peek_stash = Some(next);
1031 break;
1032 }
1033 } else {
1034 consolidated = Some(next);
1035 }
1036 }
1037
1038 let (kv, t, d) = consolidated?;
1039
1040 Some((kv, t, d))
1041 }
1042
1043 fn decode_kv(&mut self, index: usize, key: &mut Option<K>, val: &mut Option<V>) -> (K, V) {
1044 let decoded = self
1045 .part
1046 .as_ref()
1047 .map_left(|codec| {
1048 let ((ck, cv), _, _) = codec.get(index).expect("valid index");
1049 let (k, v) = Self::decode_codec(
1050 &*self.metrics,
1051 self.migration.codec_read(),
1052 ck,
1053 cv,
1054 key,
1055 val,
1056 &mut self.key_storage,
1057 &mut self.val_storage,
1058 );
1059 (k.expect("valid legacy key"), v.expect("valid legacy value"))
1060 })
1061 .map_right(|(structured_key, structured_val)| {
1062 self.decode_structured(index, structured_key, structured_val, key, val)
1063 });
1064
1065 match decoded {
1066 EitherOrBoth::Both((k, v), (k_s, v_s)) => {
1067 let is_valid = self
1069 .metrics
1070 .columnar
1071 .arrow()
1072 .key()
1073 .report_valid(|| k_s == k);
1074 if !is_valid {
1075 soft_panic_no_log!("structured key did not match, {k_s:?} != {k:?}");
1076 }
1077 let is_valid = self
1079 .metrics
1080 .columnar
1081 .arrow()
1082 .val()
1083 .report_valid(|| v_s == v);
1084 if !is_valid {
1085 soft_panic_no_log!("structured val did not match, {v_s:?} != {v:?}");
1086 }
1087
1088 (k, v)
1089 }
1090 EitherOrBoth::Left(kv) => kv,
1091 EitherOrBoth::Right(kv) => kv,
1092 }
1093 }
1094
1095 fn decode_codec(
1096 metrics: &Metrics,
1097 read_schemas: &Schemas<K, V>,
1098 key_buf: &[u8],
1099 val_buf: &[u8],
1100 key: &mut Option<K>,
1101 val: &mut Option<V>,
1102 key_storage: &mut Option<K::Storage>,
1103 val_storage: &mut Option<V::Storage>,
1104 ) -> (Result<K, String>, Result<V, String>) {
1105 let k = metrics.codecs.key.decode(|| match key.take() {
1106 Some(mut key) => {
1107 match K::decode_from(&mut key, key_buf, key_storage, &read_schemas.key) {
1108 Ok(()) => Ok(key),
1109 Err(err) => Err(err),
1110 }
1111 }
1112 None => K::decode(key_buf, &read_schemas.key),
1113 });
1114 let v = metrics.codecs.val.decode(|| match val.take() {
1115 Some(mut val) => {
1116 match V::decode_from(&mut val, val_buf, val_storage, &read_schemas.val) {
1117 Ok(()) => Ok(val),
1118 Err(err) => Err(err),
1119 }
1120 }
1121 None => V::decode(val_buf, &read_schemas.val),
1122 });
1123 (k, v)
1124 }
1125
1126 fn decode_structured(
1127 &self,
1128 idx: usize,
1129 keys: &<K::Schema as Schema<K>>::Decoder,
1130 vals: &<V::Schema as Schema<V>>::Decoder,
1131 key: &mut Option<K>,
1132 val: &mut Option<V>,
1133 ) -> (K, V) {
1134 let mut key = key.take().unwrap_or_default();
1135 keys.decode(idx, &mut key);
1136
1137 let mut val = val.take().unwrap_or_default();
1138 vals.decode(idx, &mut val);
1139
1140 (key, val)
1141 }
1142}
1143
1144impl<K, V, T, D> Iterator for FetchedPart<K, V, T, D>
1145where
1146 K: Debug + Codec,
1147 V: Debug + Codec,
1148 T: Timestamp + Lattice + Codec64,
1149 D: Monoid + Codec64 + Send + Sync,
1150{
1151 type Item = ((K, V), T, D);
1152
1153 fn next(&mut self) -> Option<Self::Item> {
1154 self.next_with_storage(&mut None, &mut None)
1155 }
1156
1157 fn size_hint(&self) -> (usize, Option<usize>) {
1158 let max_len = self.timestamps.len();
1160 (0, Some(max_len))
1161 }
1162}
1163
1164impl<T> EncodedPart<T>
1165where
1166 T: Timestamp + Lattice + Codec64,
1167{
1168 pub async fn fetch(
1169 cfg: &FetchConfig,
1170 shard_id: &ShardId,
1171 blob: &dyn Blob,
1172 metrics: &Metrics,
1173 shard_metrics: &ShardMetrics,
1174 read_metrics: &ReadMetrics,
1175 registered_desc: &Description<T>,
1176 part: &BatchPart<T>,
1177 ) -> Result<Self, BlobKey> {
1178 match part {
1179 BatchPart::Hollow(x) => {
1180 fetch_batch_part(
1181 cfg,
1182 shard_id,
1183 blob,
1184 metrics,
1185 shard_metrics,
1186 read_metrics,
1187 registered_desc,
1188 x,
1189 )
1190 .await
1191 }
1192 BatchPart::Inline {
1193 updates,
1194 ts_rewrite,
1195 ..
1196 } => Ok(EncodedPart::from_inline(
1197 cfg,
1198 metrics,
1199 read_metrics.clone(),
1200 registered_desc.clone(),
1201 updates,
1202 ts_rewrite.as_ref(),
1203 )),
1204 }
1205 }
1206
1207 pub(crate) fn from_inline(
1208 cfg: &FetchConfig,
1209 metrics: &Metrics,
1210 read_metrics: ReadMetrics,
1211 desc: Description<T>,
1212 x: &LazyInlineBatchPart,
1213 ts_rewrite: Option<&Antichain<T>>,
1214 ) -> Self {
1215 let parsed = x.decode(&metrics.columnar).expect("valid inline part");
1216 Self::new(cfg, read_metrics, desc, "inline", ts_rewrite, parsed)
1217 }
1218
1219 pub(crate) fn from_hollow(
1220 cfg: &FetchConfig,
1221 metrics: ReadMetrics,
1222 registered_desc: Description<T>,
1223 part: &HollowBatchPart<T>,
1224 parsed: BlobTraceBatchPart<T>,
1225 ) -> Self {
1226 Self::new(
1227 cfg,
1228 metrics,
1229 registered_desc,
1230 &part.key.0,
1231 part.ts_rewrite.as_ref(),
1232 parsed,
1233 )
1234 }
1235
1236 pub(crate) fn new(
1237 cfg: &FetchConfig,
1238 metrics: ReadMetrics,
1239 registered_desc: Description<T>,
1240 printable_name: &str,
1241 ts_rewrite: Option<&Antichain<T>>,
1242 parsed: BlobTraceBatchPart<T>,
1243 ) -> Self {
1244 let inline_desc = &parsed.desc;
1259 let needs_truncation = inline_desc.lower() != registered_desc.lower()
1260 || inline_desc.upper() != registered_desc.upper();
1261 if needs_truncation {
1262 if cfg.validate_bounds_on_read {
1263 soft_assert_or_log!(
1264 PartialOrder::less_equal(inline_desc.lower(), registered_desc.lower()),
1265 "key={} inline={:?} registered={:?}",
1266 printable_name,
1267 inline_desc,
1268 registered_desc
1269 );
1270
1271 if ts_rewrite.is_none() {
1272 soft_assert_or_log!(
1277 PartialOrder::less_equal(registered_desc.upper(), inline_desc.upper()),
1278 "key={} inline={:?} registered={:?}",
1279 printable_name,
1280 inline_desc,
1281 registered_desc
1282 );
1283 }
1284 }
1285 assert_eq!(
1290 inline_desc.since(),
1291 &Antichain::from_elem(T::minimum()),
1292 "key={} inline={:?} registered={:?}",
1293 printable_name,
1294 inline_desc,
1295 registered_desc
1296 );
1297 } else {
1298 assert!(
1299 PartialOrder::less_equal(inline_desc.since(), registered_desc.since()),
1300 "key={} inline={:?} registered={:?}",
1301 printable_name,
1302 inline_desc,
1303 registered_desc
1304 );
1305 assert_eq!(
1306 inline_desc.lower(),
1307 registered_desc.lower(),
1308 "key={} inline={:?} registered={:?}",
1309 printable_name,
1310 inline_desc,
1311 registered_desc
1312 );
1313 assert_eq!(
1314 inline_desc.upper(),
1315 registered_desc.upper(),
1316 "key={} inline={:?} registered={:?}",
1317 printable_name,
1318 inline_desc,
1319 registered_desc
1320 );
1321 }
1322
1323 EncodedPart {
1324 metrics,
1325 registered_desc,
1326 part: parsed,
1327 needs_truncation,
1328 ts_rewrite: ts_rewrite.cloned(),
1329 }
1330 }
1331
1332 pub(crate) fn maybe_unconsolidated(&self) -> bool {
1333 self.part.desc.since().borrow() == AntichainRef::new(&[T::minimum()])
1336 }
1337
1338 pub(crate) fn updates(&self) -> &BlobTraceUpdates {
1339 &self.part.updates
1340 }
1341
1342 pub(crate) fn normalize(&self, metrics: &ColumnarMetrics) -> BlobTraceUpdates {
1344 let updates = self.part.updates.clone();
1345 if !self.needs_truncation && self.ts_rewrite.is_none() {
1346 return updates;
1347 }
1348
1349 let mut codec = updates
1350 .records()
1351 .map(|r| (r.keys().clone(), r.vals().clone()));
1352 let mut structured = updates.structured().cloned();
1353 let mut timestamps = updates.timestamps().clone();
1354 let mut diffs = updates.diffs().clone();
1355
1356 if let Some(rewrite) = self.ts_rewrite.as_ref() {
1357 timestamps = arrow::compute::unary(×tamps, |i: i64| {
1358 let mut t = T::decode(i.to_le_bytes());
1359 t.advance_by(rewrite.borrow());
1360 i64::from_le_bytes(T::encode(&t))
1361 });
1362 }
1363
1364 let reallocated = if self.needs_truncation {
1365 let filter = BooleanArray::from_unary(×tamps, |i| {
1366 let t = T::decode(i.to_le_bytes());
1367 let truncate_t = {
1368 !self.registered_desc.lower().less_equal(&t)
1369 || self.registered_desc.upper().less_equal(&t)
1370 };
1371 !truncate_t
1372 });
1373 if filter.false_count() == 0 {
1374 false
1376 } else {
1377 let filter = FilterBuilder::new(&filter).optimize().build();
1378 let do_filter = |array: &dyn Array| filter.filter(array).expect("valid filter len");
1379 if let Some((keys, vals)) = codec {
1380 codec = Some((
1381 realloc_array(do_filter(&keys).as_binary(), metrics),
1382 realloc_array(do_filter(&vals).as_binary(), metrics),
1383 ));
1384 }
1385 if let Some(ext) = structured {
1386 structured = Some(ColumnarRecordsStructuredExt {
1387 key: realloc_any(do_filter(&*ext.key), metrics),
1388 val: realloc_any(do_filter(&*ext.val), metrics),
1389 });
1390 }
1391 timestamps = realloc_array(do_filter(×tamps).as_primitive(), metrics);
1392 diffs = realloc_array(do_filter(&diffs).as_primitive(), metrics);
1393 true
1394 }
1395 } else {
1396 false
1397 };
1398
1399 if self.ts_rewrite.is_some() && !reallocated {
1400 timestamps = realloc_array(×tamps, metrics);
1401 }
1402
1403 if self.ts_rewrite.is_some() {
1404 self.metrics
1405 .ts_rewrite
1406 .inc_by(u64::cast_from(timestamps.len()));
1407 }
1408
1409 match (codec, structured) {
1410 (Some((key, value)), None) => {
1411 BlobTraceUpdates::Row(ColumnarRecords::new(key, value, timestamps, diffs))
1412 }
1413 (Some((key, value)), Some(ext)) => {
1414 BlobTraceUpdates::Both(ColumnarRecords::new(key, value, timestamps, diffs), ext)
1415 }
1416 (None, Some(ext)) => BlobTraceUpdates::Structured {
1417 key_values: ext,
1418 timestamps,
1419 diffs,
1420 },
1421 (None, None) => unreachable!(),
1422 }
1423 }
1424}
1425
1426#[derive(Debug, Serialize, Deserialize, Clone)]
1435pub struct ExchangeableBatchPart<T> {
1436 shard_id: ShardId,
1437 encoded_size_bytes: usize,
1439 desc: Description<T>,
1440 filter: FetchBatchFilter<T>,
1441 part: LazyProto<ProtoHollowBatchPart>,
1442 filter_pushdown_audit: bool,
1443}
1444
1445impl<T> ExchangeableBatchPart<T> {
1446 pub fn encoded_size_bytes(&self) -> usize {
1448 self.encoded_size_bytes
1449 }
1450}
1451
1452#[derive(Debug, Copy, Clone)]
1456pub enum PartDecodeFormat {
1457 Row {
1459 validate_structured: bool,
1461 },
1462 Arrow,
1464}
1465
1466impl PartDecodeFormat {
1467 pub const fn default() -> Self {
1469 PartDecodeFormat::Arrow
1470 }
1471
1472 pub fn from_str(s: &str) -> Self {
1475 match s {
1476 "row" => PartDecodeFormat::Row {
1477 validate_structured: false,
1478 },
1479 "row_with_validate" => PartDecodeFormat::Row {
1480 validate_structured: true,
1481 },
1482 "arrow" => PartDecodeFormat::Arrow,
1483 x => {
1484 let default = PartDecodeFormat::default();
1485 soft_panic_or_log!("Invalid part decode format: '{x}', falling back to {default}");
1486 default
1487 }
1488 }
1489 }
1490
1491 pub const fn as_str(&self) -> &'static str {
1493 match self {
1494 PartDecodeFormat::Row {
1495 validate_structured: false,
1496 } => "row",
1497 PartDecodeFormat::Row {
1498 validate_structured: true,
1499 } => "row_with_validate",
1500 PartDecodeFormat::Arrow => "arrow",
1501 }
1502 }
1503}
1504
1505impl fmt::Display for PartDecodeFormat {
1506 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1507 f.write_str(self.as_str())
1508 }
1509}
1510
1511#[mz_ore::test]
1512fn client_exchange_data() {
1513 fn is_exchange_data<T: timely::ExchangeData>() {}
1517 is_exchange_data::<ExchangeableBatchPart<u64>>();
1518 is_exchange_data::<ExchangeableBatchPart<u64>>();
1519}