1use std::fmt::{self, Debug};
13use std::marker::PhantomData;
14use std::sync::Arc;
15use std::time::Instant;
16
17use anyhow::anyhow;
18use arrow::array::{Array, ArrayRef, AsArray, BooleanArray, Int64Array};
19use arrow::compute::FilterBuilder;
20use differential_dataflow::difference::Semigroup;
21use differential_dataflow::lattice::Lattice;
22use differential_dataflow::trace::Description;
23use itertools::EitherOrBoth;
24use mz_dyncfg::{Config, ConfigSet, ConfigValHandle};
25use mz_ore::bytes::SegmentedBytes;
26use mz_ore::cast::CastFrom;
27use mz_ore::{soft_panic_no_log, soft_panic_or_log};
28use mz_persist::indexed::columnar::arrow::{realloc_any, realloc_array};
29use mz_persist::indexed::columnar::{ColumnarRecords, ColumnarRecordsStructuredExt};
30use mz_persist::indexed::encoding::{BlobTraceBatchPart, BlobTraceUpdates};
31use mz_persist::location::{Blob, SeqNo};
32use mz_persist::metrics::ColumnarMetrics;
33use mz_persist_types::arrow::ArrayOrd;
34use mz_persist_types::columnar::{ColumnDecoder, Schema, data_type};
35use mz_persist_types::part::Codec64Mut;
36use mz_persist_types::schema::backward_compatible;
37use mz_persist_types::stats::PartStats;
38use mz_persist_types::{Codec, Codec64};
39use mz_proto::RustType;
40use serde::{Deserialize, Serialize};
41use timely::PartialOrder;
42use timely::progress::frontier::AntichainRef;
43use timely::progress::{Antichain, Timestamp};
44use tracing::{Instrument, debug, debug_span, trace_span};
45
46use crate::ShardId;
47use crate::cfg::PersistConfig;
48use crate::error::InvalidUsage;
49use crate::internal::encoding::{LazyInlineBatchPart, LazyPartStats, LazyProto, Schemas};
50use crate::internal::machine::retry_external;
51use crate::internal::metrics::{Metrics, MetricsPermits, ReadMetrics, ShardMetrics};
52use crate::internal::paths::BlobKey;
53use crate::internal::state::{
54 BatchPart, HollowBatchPart, ProtoHollowBatchPart, ProtoInlineBatchPart,
55};
56use crate::read::LeasedReaderId;
57use crate::schema::{PartMigration, SchemaCache};
58
59pub(crate) const FETCH_SEMAPHORE_COST_ADJUSTMENT: Config<f64> = Config::new(
60 "persist_fetch_semaphore_cost_adjustment",
61 1.2,
65 "\
66 An adjustment multiplied by encoded_size_bytes to approximate an upper \
67 bound on the size in lgalloc, which includes the decoded version.",
68);
69
70pub(crate) const FETCH_SEMAPHORE_PERMIT_ADJUSTMENT: Config<f64> = Config::new(
71 "persist_fetch_semaphore_permit_adjustment",
72 1.0,
73 "\
74 A limit on the number of outstanding persist bytes being fetched and \
75 parsed, expressed as a multiplier of the process's memory limit. This data \
76 all spills to lgalloc, so values > 1.0 are safe. Only applied to cc \
77 replicas.",
78);
79
80pub(crate) const PART_DECODE_FORMAT: Config<&'static str> = Config::new(
81 "persist_part_decode_format",
82 PartDecodeFormat::default().as_str(),
83 "\
84 Format we'll use to decode a Persist Part, either 'row', \
85 'row_with_validate', or 'arrow' (Materialize).",
86);
87
88pub(crate) const OPTIMIZE_IGNORED_DATA_FETCH: Config<bool> = Config::new(
89 "persist_optimize_ignored_data_fetch",
90 true,
91 "CYA to allow opt-out of a performance optimization to skip fetching ignored data",
92);
93
94pub(crate) const VALIDATE_PART_BOUNDS_ON_READ: Config<bool> = Config::new(
95 "persist_validate_part_bounds_on_read",
96 true,
97 "Validate the part lower <= the batch lower and the part upper <= batch upper,\
98 for the batch containing that part",
99);
100
101#[derive(Debug, Clone)]
102pub(crate) struct FetchConfig {
103 pub(crate) validate_bounds_on_read: bool,
104}
105
106impl FetchConfig {
107 pub fn from_persist_config(cfg: &PersistConfig) -> Self {
108 Self {
109 validate_bounds_on_read: VALIDATE_PART_BOUNDS_ON_READ.get(cfg),
110 }
111 }
112}
113
114#[derive(Debug, Clone)]
115pub(crate) struct BatchFetcherConfig {
116 pub(crate) part_decode_format: ConfigValHandle<String>,
117 pub(crate) fetch_config: FetchConfig,
118}
119
120impl BatchFetcherConfig {
121 pub fn new(value: &PersistConfig) -> Self {
122 Self {
123 part_decode_format: PART_DECODE_FORMAT.handle(value),
124 fetch_config: FetchConfig::from_persist_config(value),
125 }
126 }
127
128 pub fn part_decode_format(&self) -> PartDecodeFormat {
129 PartDecodeFormat::from_str(self.part_decode_format.get().as_str())
130 }
131}
132
133#[derive(Debug)]
135pub struct BatchFetcher<K, V, T, D>
136where
137 T: Timestamp + Lattice + Codec64,
138 K: Debug + Codec,
140 V: Debug + Codec,
141 D: Semigroup + Codec64 + Send + Sync,
142{
143 pub(crate) cfg: BatchFetcherConfig,
144 pub(crate) blob: Arc<dyn Blob>,
145 pub(crate) metrics: Arc<Metrics>,
146 pub(crate) shard_metrics: Arc<ShardMetrics>,
147 pub(crate) shard_id: ShardId,
148 pub(crate) read_schemas: Schemas<K, V>,
149 pub(crate) schema_cache: SchemaCache<K, V, T, D>,
150 pub(crate) is_transient: bool,
151
152 pub(crate) _phantom: PhantomData<fn() -> (K, V, T, D)>,
155}
156
157impl<K, V, T, D> BatchFetcher<K, V, T, D>
158where
159 K: Debug + Codec,
160 V: Debug + Codec,
161 T: Timestamp + Lattice + Codec64 + Sync,
162 D: Semigroup + Codec64 + Send + Sync,
163{
164 pub async fn fetch_leased_part(
169 &mut self,
170 part: ExchangeableBatchPart<T>,
171 ) -> Result<Result<FetchedBlob<K, V, T, D>, BlobKey>, InvalidUsage<T>> {
172 let ExchangeableBatchPart {
173 shard_id,
174 encoded_size_bytes: _,
175 desc,
176 filter,
177 filter_pushdown_audit,
178 part,
179 } = part;
180 let part: BatchPart<T> = part.decode_to().expect("valid part");
181 if shard_id != self.shard_id {
182 return Err(InvalidUsage::BatchNotFromThisShard {
183 batch_shard: shard_id,
184 handle_shard: self.shard_id.clone(),
185 });
186 }
187
188 let migration =
189 PartMigration::new(&part, self.read_schemas.clone(), &mut self.schema_cache)
190 .await
191 .unwrap_or_else(|read_schemas| {
192 panic!(
193 "could not decode part {:?} with schema: {:?}",
194 part.schema_id(),
195 read_schemas
196 )
197 });
198
199 let (buf, fetch_permit) = match &part {
200 BatchPart::Hollow(x) => {
201 let fetch_permit = self
202 .metrics
203 .semaphore
204 .acquire_fetch_permits(x.encoded_size_bytes)
205 .await;
206 let read_metrics = if self.is_transient {
207 &self.metrics.read.unindexed
208 } else {
209 &self.metrics.read.batch_fetcher
210 };
211 let buf = fetch_batch_part_blob(
212 &shard_id,
213 self.blob.as_ref(),
214 &self.metrics,
215 &self.shard_metrics,
216 read_metrics,
217 x,
218 )
219 .await;
220 let buf = match buf {
221 Ok(buf) => buf,
222 Err(key) => return Ok(Err(key)),
223 };
224 let buf = FetchedBlobBuf::Hollow {
225 buf,
226 part: x.clone(),
227 };
228 (buf, Some(Arc::new(fetch_permit)))
229 }
230 BatchPart::Inline {
231 updates,
232 ts_rewrite,
233 ..
234 } => {
235 let buf = FetchedBlobBuf::Inline {
236 desc: desc.clone(),
237 updates: updates.clone(),
238 ts_rewrite: ts_rewrite.clone(),
239 };
240 (buf, None)
241 }
242 };
243 let fetched_blob = FetchedBlob {
244 metrics: Arc::clone(&self.metrics),
245 read_metrics: self.metrics.read.batch_fetcher.clone(),
246 buf,
247 registered_desc: desc.clone(),
248 migration,
249 filter: filter.clone(),
250 filter_pushdown_audit,
251 structured_part_audit: self.cfg.part_decode_format(),
252 fetch_permit,
253 _phantom: PhantomData,
254 fetch_config: self.cfg.fetch_config.clone(),
255 };
256 Ok(Ok(fetched_blob))
257 }
258}
259
260#[derive(Debug, Clone, Serialize, Deserialize)]
261pub(crate) enum FetchBatchFilter<T> {
262 Snapshot {
263 as_of: Antichain<T>,
264 },
265 Listen {
266 as_of: Antichain<T>,
267 lower: Antichain<T>,
268 },
269 Compaction {
270 since: Antichain<T>,
271 },
272}
273
274impl<T: Timestamp + Lattice> FetchBatchFilter<T> {
275 pub(crate) fn filter_ts(&self, t: &mut T) -> bool {
276 match self {
277 FetchBatchFilter::Snapshot { as_of } => {
278 if as_of.less_than(t) {
280 return false;
281 }
282 t.advance_by(as_of.borrow());
283 true
284 }
285 FetchBatchFilter::Listen { as_of, lower } => {
286 if !as_of.less_than(t) {
288 return false;
289 }
290
291 if !lower.less_equal(t) {
299 return false;
300 }
301 true
302 }
303 FetchBatchFilter::Compaction { since } => {
304 t.advance_by(since.borrow());
305 true
306 }
307 }
308 }
309}
310
311pub(crate) async fn fetch_leased_part<K, V, T, D>(
316 cfg: &PersistConfig,
317 part: &LeasedBatchPart<T>,
318 blob: &dyn Blob,
319 metrics: Arc<Metrics>,
320 read_metrics: &ReadMetrics,
321 shard_metrics: &ShardMetrics,
322 reader_id: &LeasedReaderId,
323 read_schemas: Schemas<K, V>,
324 schema_cache: &mut SchemaCache<K, V, T, D>,
325) -> FetchedPart<K, V, T, D>
326where
327 K: Debug + Codec,
328 V: Debug + Codec,
329 T: Timestamp + Lattice + Codec64 + Sync,
330 D: Semigroup + Codec64 + Send + Sync,
331{
332 let fetch_config = FetchConfig::from_persist_config(cfg);
333 let encoded_part = EncodedPart::fetch(
334 &fetch_config,
335 &part.shard_id,
336 blob,
337 &metrics,
338 shard_metrics,
339 read_metrics,
340 &part.desc,
341 &part.part,
342 )
343 .await
344 .unwrap_or_else(|blob_key| {
345 panic!("{} could not fetch batch part: {}", reader_id, blob_key)
354 });
355 let part_cfg = BatchFetcherConfig::new(cfg);
356 let migration = PartMigration::new(&part.part, read_schemas, schema_cache)
357 .await
358 .unwrap_or_else(|read_schemas| {
359 panic!(
360 "could not decode part {:?} with schema: {:?}",
361 part.part.schema_id(),
362 read_schemas
363 )
364 });
365 FetchedPart::new(
366 metrics,
367 encoded_part,
368 migration,
369 part.filter.clone(),
370 part.filter_pushdown_audit,
371 part_cfg.part_decode_format(),
372 part.part.stats(),
373 )
374}
375
376pub(crate) async fn fetch_batch_part_blob<T>(
377 shard_id: &ShardId,
378 blob: &dyn Blob,
379 metrics: &Metrics,
380 shard_metrics: &ShardMetrics,
381 read_metrics: &ReadMetrics,
382 part: &HollowBatchPart<T>,
383) -> Result<SegmentedBytes, BlobKey> {
384 let now = Instant::now();
385 let get_span = debug_span!("fetch_batch::get");
386 let blob_key = part.key.complete(shard_id);
387 let value = retry_external(&metrics.retries.external.fetch_batch_get, || async {
388 shard_metrics.blob_gets.inc();
389 blob.get(&blob_key).await
390 })
391 .instrument(get_span.clone())
392 .await
393 .ok_or(blob_key)?;
394
395 drop(get_span);
396
397 read_metrics.part_count.inc();
398 read_metrics.part_bytes.inc_by(u64::cast_from(value.len()));
399 read_metrics.seconds.inc_by(now.elapsed().as_secs_f64());
400
401 Ok(value)
402}
403
404pub(crate) fn decode_batch_part_blob<T>(
405 cfg: &FetchConfig,
406 metrics: &Metrics,
407 read_metrics: &ReadMetrics,
408 registered_desc: Description<T>,
409 part: &HollowBatchPart<T>,
410 buf: &SegmentedBytes,
411) -> EncodedPart<T>
412where
413 T: Timestamp + Lattice + Codec64,
414{
415 trace_span!("fetch_batch::decode").in_scope(|| {
416 let parsed = metrics
417 .codecs
418 .batch
419 .decode(|| BlobTraceBatchPart::decode(buf, &metrics.columnar))
420 .map_err(|err| anyhow!("couldn't decode batch at key {}: {}", part.key, err))
421 .expect("internal error: invalid encoded state");
426 read_metrics
427 .part_goodbytes
428 .inc_by(u64::cast_from(parsed.updates.goodbytes()));
429 EncodedPart::from_hollow(cfg, read_metrics.clone(), registered_desc, part, parsed)
430 })
431}
432
433pub(crate) async fn fetch_batch_part<T>(
434 cfg: &FetchConfig,
435 shard_id: &ShardId,
436 blob: &dyn Blob,
437 metrics: &Metrics,
438 shard_metrics: &ShardMetrics,
439 read_metrics: &ReadMetrics,
440 registered_desc: &Description<T>,
441 part: &HollowBatchPart<T>,
442) -> Result<EncodedPart<T>, BlobKey>
443where
444 T: Timestamp + Lattice + Codec64,
445{
446 let buf =
447 fetch_batch_part_blob(shard_id, blob, metrics, shard_metrics, read_metrics, part).await?;
448 let part = decode_batch_part_blob(
449 cfg,
450 metrics,
451 read_metrics,
452 registered_desc.clone(),
453 part,
454 &buf,
455 );
456 Ok(part)
457}
458
459#[derive(Clone, Debug)]
466pub struct Lease(Arc<SeqNo>);
467
468impl Lease {
469 pub fn new(seqno: SeqNo) -> Self {
471 Self(Arc::new(seqno))
472 }
473
474 #[cfg(test)]
476 pub fn seqno(&self) -> SeqNo {
477 *self.0
478 }
479
480 pub fn count(&self) -> usize {
482 Arc::strong_count(&self.0)
483 }
484}
485
486#[derive(Debug)]
512pub struct LeasedBatchPart<T> {
513 pub(crate) metrics: Arc<Metrics>,
514 pub(crate) shard_id: ShardId,
515 pub(crate) filter: FetchBatchFilter<T>,
516 pub(crate) desc: Description<T>,
517 pub(crate) part: BatchPart<T>,
518 pub(crate) lease: Lease,
521 pub(crate) filter_pushdown_audit: bool,
522}
523
524impl<T> LeasedBatchPart<T>
525where
526 T: Timestamp + Codec64,
527{
528 pub(crate) fn into_exchangeable_part(self) -> (ExchangeableBatchPart<T>, Lease) {
538 let lease = self.lease.clone();
540 let part = ExchangeableBatchPart {
541 shard_id: self.shard_id,
542 encoded_size_bytes: self.part.encoded_size_bytes(),
543 desc: self.desc.clone(),
544 filter: self.filter.clone(),
545 part: LazyProto::from(&self.part.into_proto()),
546 filter_pushdown_audit: self.filter_pushdown_audit,
547 };
548 (part, lease)
549 }
550
551 pub fn encoded_size_bytes(&self) -> usize {
553 self.part.encoded_size_bytes()
554 }
555
556 pub fn request_filter_pushdown_audit(&mut self) {
561 self.filter_pushdown_audit = true;
562 }
563
564 pub fn stats(&self) -> Option<PartStats> {
566 self.part.stats().map(|x| x.decode())
567 }
568
569 pub fn maybe_optimize(&mut self, cfg: &ConfigSet, key: ArrayRef, val: ArrayRef) {
572 assert_eq!(key.len(), 1, "expect a single-row key array");
573 assert_eq!(val.len(), 1, "expect a single-row val array");
574 let as_of = match &self.filter {
575 FetchBatchFilter::Snapshot { as_of } => as_of,
576 FetchBatchFilter::Listen { .. } | FetchBatchFilter::Compaction { .. } => return,
577 };
578 if !OPTIMIZE_IGNORED_DATA_FETCH.get(cfg) {
579 return;
580 }
581 let (diffs_sum, _stats) = match &self.part {
582 BatchPart::Hollow(x) => (x.diffs_sum, x.stats.as_ref()),
583 BatchPart::Inline { .. } => return,
584 };
585 debug!(
586 "try_optimize_ignored_data_fetch diffs_sum={:?} as_of={:?} lower={:?} upper={:?}",
587 diffs_sum.map(i64::decode),
589 as_of.elements(),
590 self.desc.lower().elements(),
591 self.desc.upper().elements()
592 );
593 let as_of = match &as_of.elements() {
594 &[as_of] => as_of,
595 _ => return,
596 };
597 let eligible = self.desc.upper().less_equal(as_of) && self.desc.since().less_equal(as_of);
598 if !eligible {
599 return;
600 }
601 let Some(diffs_sum) = diffs_sum else {
602 return;
603 };
604
605 debug!(
606 "try_optimize_ignored_data_fetch faked {:?} diffs at ts {:?} skipping fetch of {} bytes",
607 i64::decode(diffs_sum),
609 as_of,
610 self.part.encoded_size_bytes(),
611 );
612 self.metrics.pushdown.parts_faked_count.inc();
613 self.metrics
614 .pushdown
615 .parts_faked_bytes
616 .inc_by(u64::cast_from(self.part.encoded_size_bytes()));
617 let timestamps = {
618 let mut col = Codec64Mut::with_capacity(1);
619 col.push(as_of);
620 col.finish()
621 };
622 let diffs = {
623 let mut col = Codec64Mut::with_capacity(1);
624 col.push_raw(diffs_sum);
625 col.finish()
626 };
627 let updates = BlobTraceUpdates::Structured {
628 key_values: ColumnarRecordsStructuredExt { key, val },
629 timestamps,
630 diffs,
631 };
632 let faked_data = LazyInlineBatchPart::from(&ProtoInlineBatchPart {
633 desc: Some(self.desc.into_proto()),
634 index: 0,
635 updates: Some(updates.into_proto()),
636 });
637 self.part = BatchPart::Inline {
638 updates: faked_data,
639 ts_rewrite: None,
640 schema_id: None,
641 deprecated_schema_id: None,
642 };
643 }
644}
645
646impl<T> Drop for LeasedBatchPart<T> {
647 fn drop(&mut self) {
649 self.metrics.lease.dropped_part.inc()
650 }
651}
652
653#[derive(Debug)]
658pub struct FetchedBlob<K: Codec, V: Codec, T, D> {
659 metrics: Arc<Metrics>,
660 read_metrics: ReadMetrics,
661 buf: FetchedBlobBuf<T>,
662 registered_desc: Description<T>,
663 migration: PartMigration<K, V>,
664 filter: FetchBatchFilter<T>,
665 filter_pushdown_audit: bool,
666 structured_part_audit: PartDecodeFormat,
667 fetch_permit: Option<Arc<MetricsPermits>>,
668 fetch_config: FetchConfig,
669 _phantom: PhantomData<fn() -> D>,
670}
671
672#[derive(Debug, Clone)]
673enum FetchedBlobBuf<T> {
674 Hollow {
675 buf: SegmentedBytes,
676 part: HollowBatchPart<T>,
677 },
678 Inline {
679 desc: Description<T>,
680 updates: LazyInlineBatchPart,
681 ts_rewrite: Option<Antichain<T>>,
682 },
683}
684
685impl<K: Codec, V: Codec, T: Clone, D> Clone for FetchedBlob<K, V, T, D> {
686 fn clone(&self) -> Self {
687 Self {
688 metrics: Arc::clone(&self.metrics),
689 read_metrics: self.read_metrics.clone(),
690 buf: self.buf.clone(),
691 registered_desc: self.registered_desc.clone(),
692 migration: self.migration.clone(),
693 filter: self.filter.clone(),
694 filter_pushdown_audit: self.filter_pushdown_audit.clone(),
695 fetch_permit: self.fetch_permit.clone(),
696 structured_part_audit: self.structured_part_audit.clone(),
697 fetch_config: self.fetch_config.clone(),
698 _phantom: self._phantom.clone(),
699 }
700 }
701}
702
703pub struct ShardSourcePart<K: Codec, V: Codec, T, D> {
706 pub part: FetchedPart<K, V, T, D>,
708 fetch_permit: Option<Arc<MetricsPermits>>,
709}
710
711impl<K, V, T: Debug, D: Debug> Debug for ShardSourcePart<K, V, T, D>
712where
713 K: Codec + Debug,
714 <K as Codec>::Storage: Debug,
715 V: Codec + Debug,
716 <V as Codec>::Storage: Debug,
717{
718 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
719 let ShardSourcePart { part, fetch_permit } = self;
720 f.debug_struct("ShardSourcePart")
721 .field("part", part)
722 .field("fetch_permit", fetch_permit)
723 .finish()
724 }
725}
726
727impl<K: Codec, V: Codec, T: Timestamp + Lattice + Codec64, D> FetchedBlob<K, V, T, D> {
728 pub fn parse(&self) -> ShardSourcePart<K, V, T, D> {
730 self.parse_internal(&self.fetch_config)
731 }
732
733 pub(crate) fn parse_internal(&self, cfg: &FetchConfig) -> ShardSourcePart<K, V, T, D> {
735 let (part, stats) = match &self.buf {
736 FetchedBlobBuf::Hollow { buf, part } => {
737 let parsed = decode_batch_part_blob(
738 cfg,
739 &self.metrics,
740 &self.read_metrics,
741 self.registered_desc.clone(),
742 part,
743 buf,
744 );
745 (parsed, part.stats.as_ref())
746 }
747 FetchedBlobBuf::Inline {
748 desc,
749 updates,
750 ts_rewrite,
751 } => {
752 let parsed = EncodedPart::from_inline(
753 cfg,
754 &self.metrics,
755 self.read_metrics.clone(),
756 desc.clone(),
757 updates,
758 ts_rewrite.as_ref(),
759 );
760 (parsed, None)
761 }
762 };
763 let part = FetchedPart::new(
764 Arc::clone(&self.metrics),
765 part,
766 self.migration.clone(),
767 self.filter.clone(),
768 self.filter_pushdown_audit,
769 self.structured_part_audit,
770 stats,
771 );
772 ShardSourcePart {
773 part,
774 fetch_permit: self.fetch_permit.clone(),
775 }
776 }
777
778 pub fn stats(&self) -> Option<PartStats> {
780 match &self.buf {
781 FetchedBlobBuf::Hollow { part, .. } => part.stats.as_ref().map(|x| x.decode()),
782 FetchedBlobBuf::Inline { .. } => None,
783 }
784 }
785}
786
787#[derive(Debug)]
792pub struct FetchedPart<K: Codec, V: Codec, T, D> {
793 metrics: Arc<Metrics>,
794 ts_filter: FetchBatchFilter<T>,
795 part: EitherOrBoth<
798 ColumnarRecords,
799 (
800 <K::Schema as Schema<K>>::Decoder,
801 <V::Schema as Schema<V>>::Decoder,
802 ),
803 >,
804 timestamps: Int64Array,
805 diffs: Int64Array,
806 migration: PartMigration<K, V>,
807 filter_pushdown_audit: Option<LazyPartStats>,
808 peek_stash: Option<((Result<K, String>, Result<V, String>), T, D)>,
809 part_cursor: usize,
810 key_storage: Option<K::Storage>,
811 val_storage: Option<V::Storage>,
812
813 _phantom: PhantomData<fn() -> D>,
814}
815
816impl<K: Codec, V: Codec, T: Timestamp + Lattice + Codec64, D> FetchedPart<K, V, T, D> {
817 pub(crate) fn new(
818 metrics: Arc<Metrics>,
819 part: EncodedPart<T>,
820 migration: PartMigration<K, V>,
821 ts_filter: FetchBatchFilter<T>,
822 filter_pushdown_audit: bool,
823 part_decode_format: PartDecodeFormat,
824 stats: Option<&LazyPartStats>,
825 ) -> Self {
826 let part_len = u64::cast_from(part.part.updates.len());
827 match &migration {
828 PartMigration::SameSchema { .. } => metrics.schema.migration_count_same.inc(),
829 PartMigration::Schemaless { .. } => {
830 metrics.schema.migration_count_codec.inc();
831 metrics.schema.migration_len_legacy_codec.inc_by(part_len);
832 }
833 PartMigration::Either { .. } => {
834 metrics.schema.migration_count_either.inc();
835 match part_decode_format {
836 PartDecodeFormat::Row {
837 validate_structured: false,
838 } => metrics.schema.migration_len_either_codec.inc_by(part_len),
839 PartDecodeFormat::Row {
840 validate_structured: true,
841 } => {
842 metrics.schema.migration_len_either_codec.inc_by(part_len);
843 metrics.schema.migration_len_either_arrow.inc_by(part_len);
844 }
845 PartDecodeFormat::Arrow => {
846 metrics.schema.migration_len_either_arrow.inc_by(part_len)
847 }
848 }
849 }
850 }
851
852 let filter_pushdown_audit = if filter_pushdown_audit {
853 stats.cloned()
854 } else {
855 None
856 };
857
858 let downcast_structured = |structured: ColumnarRecordsStructuredExt,
859 structured_only: bool| {
860 let key_size_before = ArrayOrd::new(&structured.key).goodbytes();
861
862 let structured = match &migration {
863 PartMigration::SameSchema { .. } => structured,
864 PartMigration::Schemaless { read } if structured_only => {
865 let start = Instant::now();
867 let read_key = data_type::<K>(&*read.key).ok()?;
868 let read_val = data_type::<V>(&*read.val).ok()?;
869 let key_migration = backward_compatible(structured.key.data_type(), &read_key)?;
870 let val_migration = backward_compatible(structured.val.data_type(), &read_val)?;
871 let key = key_migration.migrate(structured.key);
872 let val = val_migration.migrate(structured.val);
873 metrics
874 .schema
875 .migration_migrate_seconds
876 .inc_by(start.elapsed().as_secs_f64());
877 ColumnarRecordsStructuredExt { key, val }
878 }
879 PartMigration::Schemaless { .. } => return None,
880 PartMigration::Either {
881 write: _,
882 read: _,
883 key_migration,
884 val_migration,
885 } => {
886 let start = Instant::now();
887 let key = key_migration.migrate(structured.key);
888 let val = val_migration.migrate(structured.val);
889 metrics
890 .schema
891 .migration_migrate_seconds
892 .inc_by(start.elapsed().as_secs_f64());
893 ColumnarRecordsStructuredExt { key, val }
894 }
895 };
896
897 let read_schema = migration.codec_read();
898 let key = K::Schema::decoder_any(&*read_schema.key, &*structured.key);
899 let val = V::Schema::decoder_any(&*read_schema.val, &*structured.val);
900
901 match &key {
902 Ok(key_decoder) => {
903 let key_size_after = key_decoder.goodbytes();
904 let key_diff = key_size_before.saturating_sub(key_size_after);
905 metrics
906 .pushdown
907 .parts_projection_trimmed_bytes
908 .inc_by(u64::cast_from(key_diff));
909 }
910 Err(e) => {
911 soft_panic_or_log!("failed to create decoder: {e:#?}");
912 }
913 }
914
915 Some((key.ok()?, val.ok()?))
916 };
917
918 let updates = part.normalize(&metrics.columnar);
919 let timestamps = updates.timestamps().clone();
920 let diffs = updates.diffs().clone();
921 let part = match updates {
922 BlobTraceUpdates::Row(records) => EitherOrBoth::Left(records),
924 BlobTraceUpdates::Structured { key_values, .. } => EitherOrBoth::Right(
925 downcast_structured(key_values, true).expect("valid schemas for structured data"),
928 ),
929 BlobTraceUpdates::Both(records, ext) => match part_decode_format {
931 PartDecodeFormat::Row {
932 validate_structured: false,
933 } => EitherOrBoth::Left(records),
934 PartDecodeFormat::Row {
935 validate_structured: true,
936 } => match downcast_structured(ext, false) {
937 Some(decoders) => EitherOrBoth::Both(records, decoders),
938 None => EitherOrBoth::Left(records),
939 },
940 PartDecodeFormat::Arrow => match downcast_structured(ext, false) {
941 Some(decoders) => EitherOrBoth::Right(decoders),
942 None => EitherOrBoth::Left(records),
943 },
944 },
945 };
946
947 FetchedPart {
948 metrics,
949 ts_filter,
950 part,
951 peek_stash: None,
952 timestamps,
953 diffs,
954 migration,
955 filter_pushdown_audit,
956 part_cursor: 0,
957 key_storage: None,
958 val_storage: None,
959 _phantom: PhantomData,
960 }
961 }
962
963 pub fn is_filter_pushdown_audit(&self) -> Option<impl std::fmt::Debug + use<K, V, T, D>> {
969 self.filter_pushdown_audit.clone()
970 }
971}
972
973#[derive(Debug)]
976pub(crate) struct EncodedPart<T> {
977 metrics: ReadMetrics,
978 registered_desc: Description<T>,
979 part: BlobTraceBatchPart<T>,
980 needs_truncation: bool,
981 ts_rewrite: Option<Antichain<T>>,
982}
983
984impl<K, V, T, D> FetchedPart<K, V, T, D>
985where
986 K: Debug + Codec,
987 V: Debug + Codec,
988 T: Timestamp + Lattice + Codec64,
989 D: Semigroup + Codec64 + Send + Sync,
990{
991 pub fn next_with_storage(
996 &mut self,
997 key: &mut Option<K>,
998 val: &mut Option<V>,
999 ) -> Option<((Result<K, String>, Result<V, String>), T, D)> {
1000 let mut consolidated = self.peek_stash.take();
1001 loop {
1002 let next = if self.part_cursor < self.timestamps.len() {
1004 let next_idx = self.part_cursor;
1005 self.part_cursor += 1;
1006 let mut t = T::decode(self.timestamps.values()[next_idx].to_le_bytes());
1009 if !self.ts_filter.filter_ts(&mut t) {
1010 continue;
1011 }
1012 let d = D::decode(self.diffs.values()[next_idx].to_le_bytes());
1013 if d.is_zero() {
1014 continue;
1015 }
1016 let kv = self.decode_kv(next_idx, key, val);
1017 (kv, t, d)
1018 } else {
1019 break;
1020 };
1021
1022 if let Some((kv, t, d)) = &mut consolidated {
1024 let (kv_next, t_next, d_next) = &next;
1025 if kv == kv_next && t == t_next {
1026 d.plus_equals(d_next);
1027 if d.is_zero() {
1028 consolidated = None;
1029 }
1030 } else {
1031 self.peek_stash = Some(next);
1032 break;
1033 }
1034 } else {
1035 consolidated = Some(next);
1036 }
1037 }
1038
1039 let (kv, t, d) = consolidated?;
1040
1041 Some((kv, t, d))
1042 }
1043
1044 fn decode_kv(
1045 &mut self,
1046 index: usize,
1047 key: &mut Option<K>,
1048 val: &mut Option<V>,
1049 ) -> (Result<K, String>, Result<V, String>) {
1050 let decoded = self
1051 .part
1052 .as_ref()
1053 .map_left(|codec| {
1054 let ((ck, cv), _, _) = codec.get(index).expect("valid index");
1055 Self::decode_codec(
1056 &*self.metrics,
1057 self.migration.codec_read(),
1058 ck,
1059 cv,
1060 key,
1061 val,
1062 &mut self.key_storage,
1063 &mut self.val_storage,
1064 )
1065 })
1066 .map_right(|(structured_key, structured_val)| {
1067 self.decode_structured(index, structured_key, structured_val, key, val)
1068 });
1069
1070 match decoded {
1071 EitherOrBoth::Both((k, v), (k_s, v_s)) => {
1072 let is_valid = self
1074 .metrics
1075 .columnar
1076 .arrow()
1077 .key()
1078 .report_valid(|| k_s == k);
1079 if !is_valid {
1080 soft_panic_no_log!("structured key did not match, {k_s:?} != {k:?}");
1081 }
1082 let is_valid = self
1084 .metrics
1085 .columnar
1086 .arrow()
1087 .val()
1088 .report_valid(|| v_s == v);
1089 if !is_valid {
1090 soft_panic_no_log!("structured val did not match, {v_s:?} != {v:?}");
1091 }
1092
1093 (k, v)
1094 }
1095 EitherOrBoth::Left(kv) => kv,
1096 EitherOrBoth::Right(kv) => kv,
1097 }
1098 }
1099
1100 fn decode_codec(
1101 metrics: &Metrics,
1102 read_schemas: &Schemas<K, V>,
1103 key_buf: &[u8],
1104 val_buf: &[u8],
1105 key: &mut Option<K>,
1106 val: &mut Option<V>,
1107 key_storage: &mut Option<K::Storage>,
1108 val_storage: &mut Option<V::Storage>,
1109 ) -> (Result<K, String>, Result<V, String>) {
1110 let k = metrics.codecs.key.decode(|| match key.take() {
1111 Some(mut key) => {
1112 match K::decode_from(&mut key, key_buf, key_storage, &read_schemas.key) {
1113 Ok(()) => Ok(key),
1114 Err(err) => Err(err),
1115 }
1116 }
1117 None => K::decode(key_buf, &read_schemas.key),
1118 });
1119 let v = metrics.codecs.val.decode(|| match val.take() {
1120 Some(mut val) => {
1121 match V::decode_from(&mut val, val_buf, val_storage, &read_schemas.val) {
1122 Ok(()) => Ok(val),
1123 Err(err) => Err(err),
1124 }
1125 }
1126 None => V::decode(val_buf, &read_schemas.val),
1127 });
1128 (k, v)
1129 }
1130
1131 fn decode_structured(
1132 &self,
1133 idx: usize,
1134 keys: &<K::Schema as Schema<K>>::Decoder,
1135 vals: &<V::Schema as Schema<V>>::Decoder,
1136 key: &mut Option<K>,
1137 val: &mut Option<V>,
1138 ) -> (Result<K, String>, Result<V, String>) {
1139 let mut key = key.take().unwrap_or_default();
1140 keys.decode(idx, &mut key);
1141
1142 let mut val = val.take().unwrap_or_default();
1143 vals.decode(idx, &mut val);
1144
1145 (Ok(key), Ok(val))
1146 }
1147}
1148
1149impl<K, V, T, D> Iterator for FetchedPart<K, V, T, D>
1150where
1151 K: Debug + Codec,
1152 V: Debug + Codec,
1153 T: Timestamp + Lattice + Codec64,
1154 D: Semigroup + Codec64 + Send + Sync,
1155{
1156 type Item = ((Result<K, String>, Result<V, String>), T, D);
1157
1158 fn next(&mut self) -> Option<Self::Item> {
1159 self.next_with_storage(&mut None, &mut None)
1160 }
1161
1162 fn size_hint(&self) -> (usize, Option<usize>) {
1163 let max_len = self.timestamps.len();
1165 (0, Some(max_len))
1166 }
1167}
1168
1169impl<T> EncodedPart<T>
1170where
1171 T: Timestamp + Lattice + Codec64,
1172{
1173 pub async fn fetch(
1174 cfg: &FetchConfig,
1175 shard_id: &ShardId,
1176 blob: &dyn Blob,
1177 metrics: &Metrics,
1178 shard_metrics: &ShardMetrics,
1179 read_metrics: &ReadMetrics,
1180 registered_desc: &Description<T>,
1181 part: &BatchPart<T>,
1182 ) -> Result<Self, BlobKey> {
1183 match part {
1184 BatchPart::Hollow(x) => {
1185 fetch_batch_part(
1186 cfg,
1187 shard_id,
1188 blob,
1189 metrics,
1190 shard_metrics,
1191 read_metrics,
1192 registered_desc,
1193 x,
1194 )
1195 .await
1196 }
1197 BatchPart::Inline {
1198 updates,
1199 ts_rewrite,
1200 ..
1201 } => Ok(EncodedPart::from_inline(
1202 cfg,
1203 metrics,
1204 read_metrics.clone(),
1205 registered_desc.clone(),
1206 updates,
1207 ts_rewrite.as_ref(),
1208 )),
1209 }
1210 }
1211
1212 pub(crate) fn from_inline(
1213 cfg: &FetchConfig,
1214 metrics: &Metrics,
1215 read_metrics: ReadMetrics,
1216 desc: Description<T>,
1217 x: &LazyInlineBatchPart,
1218 ts_rewrite: Option<&Antichain<T>>,
1219 ) -> Self {
1220 let parsed = x.decode(&metrics.columnar).expect("valid inline part");
1221 Self::new(cfg, read_metrics, desc, "inline", ts_rewrite, parsed)
1222 }
1223
1224 pub(crate) fn from_hollow(
1225 cfg: &FetchConfig,
1226 metrics: ReadMetrics,
1227 registered_desc: Description<T>,
1228 part: &HollowBatchPart<T>,
1229 parsed: BlobTraceBatchPart<T>,
1230 ) -> Self {
1231 Self::new(
1232 cfg,
1233 metrics,
1234 registered_desc,
1235 &part.key.0,
1236 part.ts_rewrite.as_ref(),
1237 parsed,
1238 )
1239 }
1240
1241 pub(crate) fn new(
1242 cfg: &FetchConfig,
1243 metrics: ReadMetrics,
1244 registered_desc: Description<T>,
1245 printable_name: &str,
1246 ts_rewrite: Option<&Antichain<T>>,
1247 parsed: BlobTraceBatchPart<T>,
1248 ) -> Self {
1249 let inline_desc = &parsed.desc;
1261 let needs_truncation = inline_desc.lower() != registered_desc.lower()
1262 || inline_desc.upper() != registered_desc.upper();
1263 if needs_truncation {
1264 if cfg.validate_bounds_on_read {
1265 assert!(
1266 PartialOrder::less_equal(inline_desc.lower(), registered_desc.lower()),
1267 "key={} inline={:?} registered={:?}",
1268 printable_name,
1269 inline_desc,
1270 registered_desc
1271 );
1272
1273 if ts_rewrite.is_none() {
1274 assert!(
1279 PartialOrder::less_equal(registered_desc.upper(), inline_desc.upper()),
1280 "key={} inline={:?} registered={:?}",
1281 printable_name,
1282 inline_desc,
1283 registered_desc
1284 );
1285 }
1286 }
1287 assert_eq!(
1292 inline_desc.since(),
1293 &Antichain::from_elem(T::minimum()),
1294 "key={} inline={:?} registered={:?}",
1295 printable_name,
1296 inline_desc,
1297 registered_desc
1298 );
1299 } else {
1300 assert_eq!(
1301 inline_desc, ®istered_desc,
1302 "key={} inline={:?} registered={:?}",
1303 printable_name, inline_desc, registered_desc
1304 );
1305 }
1306
1307 EncodedPart {
1308 metrics,
1309 registered_desc,
1310 part: parsed,
1311 needs_truncation,
1312 ts_rewrite: ts_rewrite.cloned(),
1313 }
1314 }
1315
1316 pub(crate) fn maybe_unconsolidated(&self) -> bool {
1317 self.part.desc.since().borrow() == AntichainRef::new(&[T::minimum()])
1320 }
1321
1322 pub(crate) fn updates(&self) -> &BlobTraceUpdates {
1323 &self.part.updates
1324 }
1325
1326 pub(crate) fn normalize(&self, metrics: &ColumnarMetrics) -> BlobTraceUpdates {
1328 let updates = self.part.updates.clone();
1329 if !self.needs_truncation && self.ts_rewrite.is_none() {
1330 return updates;
1331 }
1332
1333 let mut codec = updates
1334 .records()
1335 .map(|r| (r.keys().clone(), r.vals().clone()));
1336 let mut structured = updates.structured().cloned();
1337 let mut timestamps = updates.timestamps().clone();
1338 let mut diffs = updates.diffs().clone();
1339
1340 if let Some(rewrite) = self.ts_rewrite.as_ref() {
1341 timestamps = arrow::compute::unary(×tamps, |i: i64| {
1342 let mut t = T::decode(i.to_le_bytes());
1343 t.advance_by(rewrite.borrow());
1344 i64::from_le_bytes(T::encode(&t))
1345 });
1346 }
1347
1348 let reallocated = if self.needs_truncation {
1349 let filter = BooleanArray::from_unary(×tamps, |i| {
1350 let t = T::decode(i.to_le_bytes());
1351 let truncate_t = {
1352 !self.registered_desc.lower().less_equal(&t)
1353 || self.registered_desc.upper().less_equal(&t)
1354 };
1355 !truncate_t
1356 });
1357 if filter.false_count() == 0 {
1358 false
1360 } else {
1361 let filter = FilterBuilder::new(&filter).optimize().build();
1362 let do_filter = |array: &dyn Array| filter.filter(array).expect("valid filter len");
1363 if let Some((keys, vals)) = codec {
1364 codec = Some((
1365 realloc_array(do_filter(&keys).as_binary(), metrics),
1366 realloc_array(do_filter(&vals).as_binary(), metrics),
1367 ));
1368 }
1369 if let Some(ext) = structured {
1370 structured = Some(ColumnarRecordsStructuredExt {
1371 key: realloc_any(do_filter(&*ext.key), metrics),
1372 val: realloc_any(do_filter(&*ext.val), metrics),
1373 });
1374 }
1375 timestamps = realloc_array(do_filter(×tamps).as_primitive(), metrics);
1376 diffs = realloc_array(do_filter(&diffs).as_primitive(), metrics);
1377 true
1378 }
1379 } else {
1380 false
1381 };
1382
1383 if self.ts_rewrite.is_some() && !reallocated {
1384 timestamps = realloc_array(×tamps, metrics);
1385 }
1386
1387 if self.ts_rewrite.is_some() {
1388 self.metrics
1389 .ts_rewrite
1390 .inc_by(u64::cast_from(timestamps.len()));
1391 }
1392
1393 match (codec, structured) {
1394 (Some((key, value)), None) => {
1395 BlobTraceUpdates::Row(ColumnarRecords::new(key, value, timestamps, diffs))
1396 }
1397 (Some((key, value)), Some(ext)) => {
1398 BlobTraceUpdates::Both(ColumnarRecords::new(key, value, timestamps, diffs), ext)
1399 }
1400 (None, Some(ext)) => BlobTraceUpdates::Structured {
1401 key_values: ext,
1402 timestamps,
1403 diffs,
1404 },
1405 (None, None) => unreachable!(),
1406 }
1407 }
1408}
1409
1410#[derive(Debug, Serialize, Deserialize, Clone)]
1419pub struct ExchangeableBatchPart<T> {
1420 shard_id: ShardId,
1421 encoded_size_bytes: usize,
1423 desc: Description<T>,
1424 filter: FetchBatchFilter<T>,
1425 part: LazyProto<ProtoHollowBatchPart>,
1426 filter_pushdown_audit: bool,
1427}
1428
1429impl<T> ExchangeableBatchPart<T> {
1430 pub fn encoded_size_bytes(&self) -> usize {
1432 self.encoded_size_bytes
1433 }
1434}
1435
1436#[derive(Debug, Copy, Clone)]
1440pub enum PartDecodeFormat {
1441 Row {
1443 validate_structured: bool,
1445 },
1446 Arrow,
1448}
1449
1450impl PartDecodeFormat {
1451 pub const fn default() -> Self {
1453 PartDecodeFormat::Arrow
1454 }
1455
1456 pub fn from_str(s: &str) -> Self {
1459 match s {
1460 "row" => PartDecodeFormat::Row {
1461 validate_structured: false,
1462 },
1463 "row_with_validate" => PartDecodeFormat::Row {
1464 validate_structured: true,
1465 },
1466 "arrow" => PartDecodeFormat::Arrow,
1467 x => {
1468 let default = PartDecodeFormat::default();
1469 soft_panic_or_log!("Invalid part decode format: '{x}', falling back to {default}");
1470 default
1471 }
1472 }
1473 }
1474
1475 pub const fn as_str(&self) -> &'static str {
1477 match self {
1478 PartDecodeFormat::Row {
1479 validate_structured: false,
1480 } => "row",
1481 PartDecodeFormat::Row {
1482 validate_structured: true,
1483 } => "row_with_validate",
1484 PartDecodeFormat::Arrow => "arrow",
1485 }
1486 }
1487}
1488
1489impl fmt::Display for PartDecodeFormat {
1490 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1491 f.write_str(self.as_str())
1492 }
1493}
1494
1495#[mz_ore::test]
1496fn client_exchange_data() {
1497 fn is_exchange_data<T: timely::ExchangeData>() {}
1501 is_exchange_data::<ExchangeableBatchPart<u64>>();
1502 is_exchange_data::<ExchangeableBatchPart<u64>>();
1503}