1use std::fmt::{self, Debug};
13use std::marker::PhantomData;
14use std::sync::Arc;
15use std::time::Instant;
16
17use anyhow::anyhow;
18use arrow::array::{Array, ArrayRef, AsArray, BooleanArray, Int64Array};
19use arrow::compute::FilterBuilder;
20use differential_dataflow::difference::Monoid;
21use differential_dataflow::lattice::Lattice;
22use differential_dataflow::trace::Description;
23use itertools::EitherOrBoth;
24use mz_dyncfg::{Config, ConfigSet, ConfigValHandle};
25use mz_ore::bytes::SegmentedBytes;
26use mz_ore::cast::CastFrom;
27use mz_ore::{soft_assert_or_log, soft_panic_no_log, soft_panic_or_log};
28use mz_persist::indexed::columnar::arrow::{realloc_any, realloc_array};
29use mz_persist::indexed::columnar::{ColumnarRecords, ColumnarRecordsStructuredExt};
30use mz_persist::indexed::encoding::{BlobTraceBatchPart, BlobTraceUpdates};
31use mz_persist::location::{Blob, SeqNo};
32use mz_persist::metrics::ColumnarMetrics;
33use mz_persist_types::arrow::ArrayOrd;
34use mz_persist_types::columnar::{ColumnDecoder, Schema, data_type};
35use mz_persist_types::part::Codec64Mut;
36use mz_persist_types::schema::backward_compatible;
37use mz_persist_types::stats::PartStats;
38use mz_persist_types::{Codec, Codec64};
39use mz_proto::RustType;
40use serde::{Deserialize, Serialize};
41use timely::PartialOrder;
42use timely::progress::frontier::AntichainRef;
43use timely::progress::{Antichain, Timestamp};
44use tracing::{Instrument, debug, debug_span, trace_span};
45
46use crate::ShardId;
47use crate::cfg::PersistConfig;
48use crate::error::InvalidUsage;
49use crate::internal::encoding::{LazyInlineBatchPart, LazyPartStats, LazyProto, Schemas};
50use crate::internal::machine::retry_external;
51use crate::internal::metrics::{Metrics, MetricsPermits, ReadMetrics, ShardMetrics};
52use crate::internal::paths::BlobKey;
53use crate::internal::state::{
54 BatchPart, HollowBatchPart, ProtoHollowBatchPart, ProtoInlineBatchPart,
55};
56use crate::read::LeasedReaderId;
57use crate::schema::{PartMigration, SchemaCache};
58
59pub(crate) const FETCH_SEMAPHORE_COST_ADJUSTMENT: Config<f64> = Config::new(
60 "persist_fetch_semaphore_cost_adjustment",
61 1.2,
65 "\
66 An adjustment multiplied by encoded_size_bytes to approximate an upper \
67 bound on the size in lgalloc, which includes the decoded version.",
68);
69
70pub(crate) const FETCH_SEMAPHORE_PERMIT_ADJUSTMENT: Config<f64> = Config::new(
71 "persist_fetch_semaphore_permit_adjustment",
72 1.0,
73 "\
74 A limit on the number of outstanding persist bytes being fetched and \
75 parsed, expressed as a multiplier of the process's memory limit. This data \
76 all spills to lgalloc, so values > 1.0 are safe. Only applied to cc \
77 replicas.",
78);
79
80pub(crate) const PART_DECODE_FORMAT: Config<&'static str> = Config::new(
81 "persist_part_decode_format",
82 PartDecodeFormat::default().as_str(),
83 "\
84 Format we'll use to decode a Persist Part, either 'row', \
85 'row_with_validate', or 'arrow' (Materialize).",
86);
87
88pub(crate) const OPTIMIZE_IGNORED_DATA_FETCH: Config<bool> = Config::new(
89 "persist_optimize_ignored_data_fetch",
90 true,
91 "CYA to allow opt-out of a performance optimization to skip fetching ignored data",
92);
93
94pub(crate) const VALIDATE_PART_BOUNDS_ON_READ: Config<bool> = Config::new(
95 "persist_validate_part_bounds_on_read",
96 false,
97 "Validate the part lower <= the batch lower and the part upper <= batch upper,\
98 for the batch containing that part",
99);
100
101#[derive(Debug, Clone)]
102pub(crate) struct FetchConfig {
103 pub(crate) validate_bounds_on_read: bool,
104}
105
106impl FetchConfig {
107 pub fn from_persist_config(cfg: &PersistConfig) -> Self {
108 Self {
109 validate_bounds_on_read: VALIDATE_PART_BOUNDS_ON_READ.get(cfg),
110 }
111 }
112}
113
114#[derive(Debug, Clone)]
115pub(crate) struct BatchFetcherConfig {
116 pub(crate) part_decode_format: ConfigValHandle<String>,
117 pub(crate) fetch_config: FetchConfig,
118}
119
120impl BatchFetcherConfig {
121 pub fn new(value: &PersistConfig) -> Self {
122 Self {
123 part_decode_format: PART_DECODE_FORMAT.handle(value),
124 fetch_config: FetchConfig::from_persist_config(value),
125 }
126 }
127
128 pub fn part_decode_format(&self) -> PartDecodeFormat {
129 PartDecodeFormat::from_str(self.part_decode_format.get().as_str())
130 }
131}
132
133#[derive(Debug)]
135pub struct BatchFetcher<K, V, T, D>
136where
137 T: Timestamp + Lattice + Codec64,
138 K: Debug + Codec,
140 V: Debug + Codec,
141 D: Monoid + Codec64 + Send + Sync,
142{
143 pub(crate) cfg: BatchFetcherConfig,
144 pub(crate) blob: Arc<dyn Blob>,
145 pub(crate) metrics: Arc<Metrics>,
146 pub(crate) shard_metrics: Arc<ShardMetrics>,
147 pub(crate) shard_id: ShardId,
148 pub(crate) read_schemas: Schemas<K, V>,
149 pub(crate) schema_cache: SchemaCache<K, V, T, D>,
150 pub(crate) is_transient: bool,
151
152 pub(crate) _phantom: PhantomData<fn() -> (K, V, T, D)>,
155}
156
157impl<K, V, T, D> BatchFetcher<K, V, T, D>
158where
159 K: Debug + Codec,
160 V: Debug + Codec,
161 T: Timestamp + Lattice + Codec64 + Sync,
162 D: Monoid + Codec64 + Send + Sync,
163{
164 pub async fn fetch_leased_part(
169 &mut self,
170 part: ExchangeableBatchPart<T>,
171 ) -> Result<Result<FetchedBlob<K, V, T, D>, BlobKey>, InvalidUsage<T>> {
172 let ExchangeableBatchPart {
173 shard_id,
174 encoded_size_bytes: _,
175 desc,
176 filter,
177 filter_pushdown_audit,
178 part,
179 } = part;
180 let part: BatchPart<T> = part.decode_to().expect("valid part");
181 if shard_id != self.shard_id {
182 return Err(InvalidUsage::BatchNotFromThisShard {
183 batch_shard: shard_id,
184 handle_shard: self.shard_id.clone(),
185 });
186 }
187
188 let migration =
189 PartMigration::new(&part, self.read_schemas.clone(), &mut self.schema_cache)
190 .await
191 .unwrap_or_else(|read_schemas| {
192 panic!(
193 "could not decode part {:?} with schema: {:?}",
194 part.schema_id(),
195 read_schemas
196 )
197 });
198
199 let (buf, fetch_permit) = match &part {
200 BatchPart::Hollow(x) => {
201 let fetch_permit = self
202 .metrics
203 .semaphore
204 .acquire_fetch_permits(x.encoded_size_bytes)
205 .await;
206 let read_metrics = if self.is_transient {
207 &self.metrics.read.unindexed
208 } else {
209 &self.metrics.read.batch_fetcher
210 };
211 let buf = fetch_batch_part_blob(
212 &shard_id,
213 self.blob.as_ref(),
214 &self.metrics,
215 &self.shard_metrics,
216 read_metrics,
217 x,
218 )
219 .await;
220 let buf = match buf {
221 Ok(buf) => buf,
222 Err(key) => return Ok(Err(key)),
223 };
224 let buf = FetchedBlobBuf::Hollow {
225 buf,
226 part: x.clone(),
227 };
228 (buf, Some(Arc::new(fetch_permit)))
229 }
230 BatchPart::Inline {
231 updates,
232 ts_rewrite,
233 ..
234 } => {
235 let buf = FetchedBlobBuf::Inline {
236 desc: desc.clone(),
237 updates: updates.clone(),
238 ts_rewrite: ts_rewrite.clone(),
239 };
240 (buf, None)
241 }
242 };
243 let fetched_blob = FetchedBlob {
244 metrics: Arc::clone(&self.metrics),
245 read_metrics: self.metrics.read.batch_fetcher.clone(),
246 buf,
247 registered_desc: desc.clone(),
248 migration,
249 filter: filter.clone(),
250 filter_pushdown_audit,
251 structured_part_audit: self.cfg.part_decode_format(),
252 fetch_permit,
253 _phantom: PhantomData,
254 fetch_config: self.cfg.fetch_config.clone(),
255 };
256 Ok(Ok(fetched_blob))
257 }
258}
259
260#[derive(Debug, Clone, Serialize, Deserialize)]
261pub(crate) enum FetchBatchFilter<T> {
262 Snapshot {
263 as_of: Antichain<T>,
264 },
265 Listen {
266 as_of: Antichain<T>,
267 lower: Antichain<T>,
268 },
269 Compaction {
270 since: Antichain<T>,
271 },
272}
273
274impl<T: Timestamp + Lattice> FetchBatchFilter<T> {
275 pub(crate) fn filter_ts(&self, t: &mut T) -> bool {
276 match self {
277 FetchBatchFilter::Snapshot { as_of } => {
278 if as_of.less_than(t) {
280 return false;
281 }
282 t.advance_by(as_of.borrow());
283 true
284 }
285 FetchBatchFilter::Listen { as_of, lower } => {
286 if !as_of.less_than(t) {
288 return false;
289 }
290
291 if !lower.less_equal(t) {
299 return false;
300 }
301 true
302 }
303 FetchBatchFilter::Compaction { since } => {
304 t.advance_by(since.borrow());
305 true
306 }
307 }
308 }
309}
310
311pub(crate) async fn fetch_leased_part<K, V, T, D>(
316 cfg: &PersistConfig,
317 part: &LeasedBatchPart<T>,
318 blob: &dyn Blob,
319 metrics: Arc<Metrics>,
320 read_metrics: &ReadMetrics,
321 shard_metrics: &ShardMetrics,
322 reader_id: &LeasedReaderId,
323 read_schemas: Schemas<K, V>,
324 schema_cache: &mut SchemaCache<K, V, T, D>,
325) -> FetchedPart<K, V, T, D>
326where
327 K: Debug + Codec,
328 V: Debug + Codec,
329 T: Timestamp + Lattice + Codec64 + Sync,
330 D: Monoid + Codec64 + Send + Sync,
331{
332 let fetch_config = FetchConfig::from_persist_config(cfg);
333 let encoded_part = EncodedPart::fetch(
334 &fetch_config,
335 &part.shard_id,
336 blob,
337 &metrics,
338 shard_metrics,
339 read_metrics,
340 &part.desc,
341 &part.part,
342 )
343 .await
344 .unwrap_or_else(|blob_key| {
345 panic!("{} could not fetch batch part: {}", reader_id, blob_key)
354 });
355 let part_cfg = BatchFetcherConfig::new(cfg);
356 let migration = PartMigration::new(&part.part, read_schemas, schema_cache)
357 .await
358 .unwrap_or_else(|read_schemas| {
359 panic!(
360 "could not decode part {:?} with schema: {:?}",
361 part.part.schema_id(),
362 read_schemas
363 )
364 });
365 FetchedPart::new(
366 metrics,
367 encoded_part,
368 migration,
369 part.filter.clone(),
370 part.filter_pushdown_audit,
371 part_cfg.part_decode_format(),
372 part.part.stats(),
373 )
374}
375
376pub(crate) async fn fetch_batch_part_blob<T>(
377 shard_id: &ShardId,
378 blob: &dyn Blob,
379 metrics: &Metrics,
380 shard_metrics: &ShardMetrics,
381 read_metrics: &ReadMetrics,
382 part: &HollowBatchPart<T>,
383) -> Result<SegmentedBytes, BlobKey> {
384 let now = Instant::now();
385 let get_span = debug_span!("fetch_batch::get");
386 let blob_key = part.key.complete(shard_id);
387 let value = retry_external(&metrics.retries.external.fetch_batch_get, || async {
388 shard_metrics.blob_gets.inc();
389 blob.get(&blob_key).await
390 })
391 .instrument(get_span.clone())
392 .await
393 .ok_or(blob_key)?;
394
395 drop(get_span);
396
397 read_metrics.part_count.inc();
398 read_metrics.part_bytes.inc_by(u64::cast_from(value.len()));
399 read_metrics.seconds.inc_by(now.elapsed().as_secs_f64());
400
401 Ok(value)
402}
403
404pub(crate) fn decode_batch_part_blob<T>(
405 cfg: &FetchConfig,
406 metrics: &Metrics,
407 read_metrics: &ReadMetrics,
408 registered_desc: Description<T>,
409 part: &HollowBatchPart<T>,
410 buf: &SegmentedBytes,
411) -> EncodedPart<T>
412where
413 T: Timestamp + Lattice + Codec64,
414{
415 trace_span!("fetch_batch::decode").in_scope(|| {
416 let parsed = metrics
417 .codecs
418 .batch
419 .decode(|| BlobTraceBatchPart::decode(buf, &metrics.columnar))
420 .map_err(|err| anyhow!("couldn't decode batch at key {}: {}", part.key, err))
421 .expect("internal error: invalid encoded state");
426 read_metrics
427 .part_goodbytes
428 .inc_by(u64::cast_from(parsed.updates.goodbytes()));
429 EncodedPart::from_hollow(cfg, read_metrics.clone(), registered_desc, part, parsed)
430 })
431}
432
433pub(crate) async fn fetch_batch_part<T>(
434 cfg: &FetchConfig,
435 shard_id: &ShardId,
436 blob: &dyn Blob,
437 metrics: &Metrics,
438 shard_metrics: &ShardMetrics,
439 read_metrics: &ReadMetrics,
440 registered_desc: &Description<T>,
441 part: &HollowBatchPart<T>,
442) -> Result<EncodedPart<T>, BlobKey>
443where
444 T: Timestamp + Lattice + Codec64,
445{
446 let buf =
447 fetch_batch_part_blob(shard_id, blob, metrics, shard_metrics, read_metrics, part).await?;
448 let part = decode_batch_part_blob(
449 cfg,
450 metrics,
451 read_metrics,
452 registered_desc.clone(),
453 part,
454 &buf,
455 );
456 Ok(part)
457}
458
459#[derive(Clone, Debug)]
466pub struct Lease(Arc<SeqNo>);
467
468impl Lease {
469 pub fn new(seqno: SeqNo) -> Self {
471 Self(Arc::new(seqno))
472 }
473
474 #[cfg(test)]
476 pub fn seqno(&self) -> SeqNo {
477 *self.0
478 }
479
480 pub fn count(&self) -> usize {
482 Arc::strong_count(&self.0)
483 }
484}
485
486#[derive(Debug)]
512pub struct LeasedBatchPart<T> {
513 pub(crate) metrics: Arc<Metrics>,
514 pub(crate) shard_id: ShardId,
515 pub(crate) filter: FetchBatchFilter<T>,
516 pub(crate) desc: Description<T>,
517 pub(crate) part: BatchPart<T>,
518 pub(crate) lease: Lease,
521 pub(crate) filter_pushdown_audit: bool,
522}
523
524impl<T> LeasedBatchPart<T>
525where
526 T: Timestamp + Codec64,
527{
528 pub(crate) fn into_exchangeable_part(self) -> (ExchangeableBatchPart<T>, Lease) {
538 let lease = self.lease.clone();
540 let part = ExchangeableBatchPart {
541 shard_id: self.shard_id,
542 encoded_size_bytes: self.part.encoded_size_bytes(),
543 desc: self.desc.clone(),
544 filter: self.filter.clone(),
545 part: LazyProto::from(&self.part.into_proto()),
546 filter_pushdown_audit: self.filter_pushdown_audit,
547 };
548 (part, lease)
549 }
550
551 pub fn encoded_size_bytes(&self) -> usize {
553 self.part.encoded_size_bytes()
554 }
555
556 pub fn request_filter_pushdown_audit(&mut self) {
561 self.filter_pushdown_audit = true;
562 }
563
564 pub fn stats(&self) -> Option<PartStats> {
566 self.part.stats().map(|x| x.decode())
567 }
568
569 pub fn maybe_optimize(&mut self, cfg: &ConfigSet, key: ArrayRef, val: ArrayRef) {
572 assert_eq!(key.len(), 1, "expect a single-row key array");
573 assert_eq!(val.len(), 1, "expect a single-row val array");
574 let as_of = match &self.filter {
575 FetchBatchFilter::Snapshot { as_of } => as_of,
576 FetchBatchFilter::Listen { .. } | FetchBatchFilter::Compaction { .. } => return,
577 };
578 if !OPTIMIZE_IGNORED_DATA_FETCH.get(cfg) {
579 return;
580 }
581 let (diffs_sum, _stats) = match &self.part {
582 BatchPart::Hollow(x) => (x.diffs_sum, x.stats.as_ref()),
583 BatchPart::Inline { .. } => return,
584 };
585 debug!(
586 "try_optimize_ignored_data_fetch diffs_sum={:?} as_of={:?} lower={:?} upper={:?}",
587 diffs_sum.map(i64::decode),
589 as_of.elements(),
590 self.desc.lower().elements(),
591 self.desc.upper().elements()
592 );
593 let as_of = match &as_of.elements() {
594 &[as_of] => as_of,
595 _ => return,
596 };
597 let eligible = self.desc.upper().less_equal(as_of) && self.desc.since().less_equal(as_of);
598 if !eligible {
599 return;
600 }
601 let Some(diffs_sum) = diffs_sum else {
602 return;
603 };
604
605 debug!(
606 "try_optimize_ignored_data_fetch faked {:?} diffs at ts {:?} skipping fetch of {} bytes",
607 i64::decode(diffs_sum),
609 as_of,
610 self.part.encoded_size_bytes(),
611 );
612 self.metrics.pushdown.parts_faked_count.inc();
613 self.metrics
614 .pushdown
615 .parts_faked_bytes
616 .inc_by(u64::cast_from(self.part.encoded_size_bytes()));
617 let timestamps = {
618 let mut col = Codec64Mut::with_capacity(1);
619 col.push(as_of);
620 col.finish()
621 };
622 let diffs = {
623 let mut col = Codec64Mut::with_capacity(1);
624 col.push_raw(diffs_sum);
625 col.finish()
626 };
627 let updates = BlobTraceUpdates::Structured {
628 key_values: ColumnarRecordsStructuredExt { key, val },
629 timestamps,
630 diffs,
631 };
632 let faked_data = LazyInlineBatchPart::from(&ProtoInlineBatchPart {
633 desc: Some(self.desc.into_proto()),
634 index: 0,
635 updates: Some(updates.into_proto()),
636 });
637 self.part = BatchPart::Inline {
638 updates: faked_data,
639 ts_rewrite: None,
640 schema_id: None,
641 deprecated_schema_id: None,
642 };
643 }
644}
645
646impl<T> Drop for LeasedBatchPart<T> {
647 fn drop(&mut self) {
649 self.metrics.lease.dropped_part.inc()
650 }
651}
652
653#[derive(Debug)]
658pub struct FetchedBlob<K: Codec, V: Codec, T, D> {
659 metrics: Arc<Metrics>,
660 read_metrics: ReadMetrics,
661 buf: FetchedBlobBuf<T>,
662 registered_desc: Description<T>,
663 migration: PartMigration<K, V>,
664 filter: FetchBatchFilter<T>,
665 filter_pushdown_audit: bool,
666 structured_part_audit: PartDecodeFormat,
667 fetch_permit: Option<Arc<MetricsPermits>>,
668 fetch_config: FetchConfig,
669 _phantom: PhantomData<fn() -> D>,
670}
671
672#[derive(Debug, Clone)]
673enum FetchedBlobBuf<T> {
674 Hollow {
675 buf: SegmentedBytes,
676 part: HollowBatchPart<T>,
677 },
678 Inline {
679 desc: Description<T>,
680 updates: LazyInlineBatchPart,
681 ts_rewrite: Option<Antichain<T>>,
682 },
683}
684
685impl<K: Codec, V: Codec, T: Clone, D> Clone for FetchedBlob<K, V, T, D> {
686 fn clone(&self) -> Self {
687 Self {
688 metrics: Arc::clone(&self.metrics),
689 read_metrics: self.read_metrics.clone(),
690 buf: self.buf.clone(),
691 registered_desc: self.registered_desc.clone(),
692 migration: self.migration.clone(),
693 filter: self.filter.clone(),
694 filter_pushdown_audit: self.filter_pushdown_audit.clone(),
695 fetch_permit: self.fetch_permit.clone(),
696 structured_part_audit: self.structured_part_audit.clone(),
697 fetch_config: self.fetch_config.clone(),
698 _phantom: self._phantom.clone(),
699 }
700 }
701}
702
703pub struct ShardSourcePart<K: Codec, V: Codec, T, D> {
706 pub part: FetchedPart<K, V, T, D>,
708 fetch_permit: Option<Arc<MetricsPermits>>,
709}
710
711impl<K, V, T: Debug, D: Debug> Debug for ShardSourcePart<K, V, T, D>
712where
713 K: Codec + Debug,
714 <K as Codec>::Storage: Debug,
715 V: Codec + Debug,
716 <V as Codec>::Storage: Debug,
717{
718 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
719 let ShardSourcePart { part, fetch_permit } = self;
720 f.debug_struct("ShardSourcePart")
721 .field("part", part)
722 .field("fetch_permit", fetch_permit)
723 .finish()
724 }
725}
726
727impl<K: Codec, V: Codec, T: Timestamp + Lattice + Codec64, D> FetchedBlob<K, V, T, D> {
728 pub fn parse(&self) -> ShardSourcePart<K, V, T, D> {
730 self.parse_internal(&self.fetch_config)
731 }
732
733 pub(crate) fn parse_internal(&self, cfg: &FetchConfig) -> ShardSourcePart<K, V, T, D> {
735 let (part, stats) = match &self.buf {
736 FetchedBlobBuf::Hollow { buf, part } => {
737 let parsed = decode_batch_part_blob(
738 cfg,
739 &self.metrics,
740 &self.read_metrics,
741 self.registered_desc.clone(),
742 part,
743 buf,
744 );
745 (parsed, part.stats.as_ref())
746 }
747 FetchedBlobBuf::Inline {
748 desc,
749 updates,
750 ts_rewrite,
751 } => {
752 let parsed = EncodedPart::from_inline(
753 cfg,
754 &self.metrics,
755 self.read_metrics.clone(),
756 desc.clone(),
757 updates,
758 ts_rewrite.as_ref(),
759 );
760 (parsed, None)
761 }
762 };
763 let part = FetchedPart::new(
764 Arc::clone(&self.metrics),
765 part,
766 self.migration.clone(),
767 self.filter.clone(),
768 self.filter_pushdown_audit,
769 self.structured_part_audit,
770 stats,
771 );
772 ShardSourcePart {
773 part,
774 fetch_permit: self.fetch_permit.clone(),
775 }
776 }
777
778 pub fn stats(&self) -> Option<PartStats> {
780 match &self.buf {
781 FetchedBlobBuf::Hollow { part, .. } => part.stats.as_ref().map(|x| x.decode()),
782 FetchedBlobBuf::Inline { .. } => None,
783 }
784 }
785}
786
787#[derive(Debug)]
792pub struct FetchedPart<K: Codec, V: Codec, T, D> {
793 metrics: Arc<Metrics>,
794 ts_filter: FetchBatchFilter<T>,
795 part: EitherOrBoth<
798 ColumnarRecords,
799 (
800 <K::Schema as Schema<K>>::Decoder,
801 <V::Schema as Schema<V>>::Decoder,
802 ),
803 >,
804 timestamps: Int64Array,
805 diffs: Int64Array,
806 migration: PartMigration<K, V>,
807 filter_pushdown_audit: Option<LazyPartStats>,
808 peek_stash: Option<((K, V), T, D)>,
809 part_cursor: usize,
810 key_storage: Option<K::Storage>,
811 val_storage: Option<V::Storage>,
812
813 _phantom: PhantomData<fn() -> D>,
814}
815
816impl<K: Codec, V: Codec, T: Timestamp + Lattice + Codec64, D> FetchedPart<K, V, T, D> {
817 pub(crate) fn new(
818 metrics: Arc<Metrics>,
819 part: EncodedPart<T>,
820 migration: PartMigration<K, V>,
821 ts_filter: FetchBatchFilter<T>,
822 filter_pushdown_audit: bool,
823 part_decode_format: PartDecodeFormat,
824 stats: Option<&LazyPartStats>,
825 ) -> Self {
826 let part_len = u64::cast_from(part.part.updates.len());
827 match &migration {
828 PartMigration::SameSchema { .. } => metrics.schema.migration_count_same.inc(),
829 PartMigration::Schemaless { .. } => {
830 metrics.schema.migration_count_codec.inc();
831 metrics.schema.migration_len_legacy_codec.inc_by(part_len);
832 }
833 PartMigration::Either { .. } => {
834 metrics.schema.migration_count_either.inc();
835 match part_decode_format {
836 PartDecodeFormat::Row {
837 validate_structured: false,
838 } => metrics.schema.migration_len_either_codec.inc_by(part_len),
839 PartDecodeFormat::Row {
840 validate_structured: true,
841 } => {
842 metrics.schema.migration_len_either_codec.inc_by(part_len);
843 metrics.schema.migration_len_either_arrow.inc_by(part_len);
844 }
845 PartDecodeFormat::Arrow => {
846 metrics.schema.migration_len_either_arrow.inc_by(part_len)
847 }
848 }
849 }
850 }
851
852 let filter_pushdown_audit = if filter_pushdown_audit {
853 stats.cloned()
854 } else {
855 None
856 };
857
858 let downcast_structured = |structured: ColumnarRecordsStructuredExt,
859 structured_only: bool| {
860 let key_size_before = ArrayOrd::new(&structured.key).goodbytes();
861
862 let structured = match &migration {
863 PartMigration::SameSchema { .. } => structured,
864 PartMigration::Schemaless { read } if structured_only => {
865 let start = Instant::now();
867 let read_key = data_type::<K>(&*read.key).ok()?;
868 let read_val = data_type::<V>(&*read.val).ok()?;
869 let key_migration = backward_compatible(structured.key.data_type(), &read_key)?;
870 let val_migration = backward_compatible(structured.val.data_type(), &read_val)?;
871 let key = key_migration.migrate(structured.key);
872 let val = val_migration.migrate(structured.val);
873 metrics
874 .schema
875 .migration_migrate_seconds
876 .inc_by(start.elapsed().as_secs_f64());
877 ColumnarRecordsStructuredExt { key, val }
878 }
879 PartMigration::Schemaless { .. } => return None,
880 PartMigration::Either {
881 write: _,
882 read: _,
883 key_migration,
884 val_migration,
885 } => {
886 let start = Instant::now();
887 let key = key_migration.migrate(structured.key);
888 let val = val_migration.migrate(structured.val);
889 metrics
890 .schema
891 .migration_migrate_seconds
892 .inc_by(start.elapsed().as_secs_f64());
893 ColumnarRecordsStructuredExt { key, val }
894 }
895 };
896
897 let read_schema = migration.codec_read();
898 let key = K::Schema::decoder_any(&*read_schema.key, &*structured.key);
899 let val = V::Schema::decoder_any(&*read_schema.val, &*structured.val);
900
901 match &key {
902 Ok(key_decoder) => {
903 let key_size_after = key_decoder.goodbytes();
904 let key_diff = key_size_before.saturating_sub(key_size_after);
905 metrics
906 .pushdown
907 .parts_projection_trimmed_bytes
908 .inc_by(u64::cast_from(key_diff));
909 }
910 Err(e) => {
911 soft_panic_or_log!("failed to create decoder: {e:#?}");
912 }
913 }
914
915 Some((key.ok()?, val.ok()?))
916 };
917
918 let updates = part.normalize(&metrics.columnar);
919 let timestamps = updates.timestamps().clone();
920 let diffs = updates.diffs().clone();
921 let part = match updates {
922 BlobTraceUpdates::Row(records) => EitherOrBoth::Left(records),
924 BlobTraceUpdates::Structured { key_values, .. } => EitherOrBoth::Right(
925 downcast_structured(key_values, true).expect("valid schemas for structured data"),
928 ),
929 BlobTraceUpdates::Both(records, ext) => match part_decode_format {
931 PartDecodeFormat::Row {
932 validate_structured: false,
933 } => EitherOrBoth::Left(records),
934 PartDecodeFormat::Row {
935 validate_structured: true,
936 } => match downcast_structured(ext, false) {
937 Some(decoders) => EitherOrBoth::Both(records, decoders),
938 None => EitherOrBoth::Left(records),
939 },
940 PartDecodeFormat::Arrow => match downcast_structured(ext, false) {
941 Some(decoders) => EitherOrBoth::Right(decoders),
942 None => EitherOrBoth::Left(records),
943 },
944 },
945 };
946
947 FetchedPart {
948 metrics,
949 ts_filter,
950 part,
951 peek_stash: None,
952 timestamps,
953 diffs,
954 migration,
955 filter_pushdown_audit,
956 part_cursor: 0,
957 key_storage: None,
958 val_storage: None,
959 _phantom: PhantomData,
960 }
961 }
962
963 pub fn is_filter_pushdown_audit(&self) -> Option<impl std::fmt::Debug + use<K, V, T, D>> {
969 self.filter_pushdown_audit.clone()
970 }
971}
972
973#[derive(Debug)]
976pub(crate) struct EncodedPart<T> {
977 metrics: ReadMetrics,
978 registered_desc: Description<T>,
979 part: BlobTraceBatchPart<T>,
980 needs_truncation: bool,
981 ts_rewrite: Option<Antichain<T>>,
982}
983
984impl<K, V, T, D> FetchedPart<K, V, T, D>
985where
986 K: Debug + Codec,
987 V: Debug + Codec,
988 T: Timestamp + Lattice + Codec64,
989 D: Monoid + Codec64 + Send + Sync,
990{
991 pub fn next_with_storage(
996 &mut self,
997 key: &mut Option<K>,
998 val: &mut Option<V>,
999 ) -> Option<((K, V), T, D)> {
1000 let mut consolidated = self.peek_stash.take();
1001 loop {
1002 let next = if self.part_cursor < self.timestamps.len() {
1004 let next_idx = self.part_cursor;
1005 self.part_cursor += 1;
1006 let mut t = T::decode(self.timestamps.values()[next_idx].to_le_bytes());
1009 if !self.ts_filter.filter_ts(&mut t) {
1010 continue;
1011 }
1012 let d = D::decode(self.diffs.values()[next_idx].to_le_bytes());
1013 if d.is_zero() {
1014 continue;
1015 }
1016 let kv = self.decode_kv(next_idx, key, val);
1017 (kv, t, d)
1018 } else {
1019 break;
1020 };
1021
1022 if let Some((kv, t, d)) = &mut consolidated {
1024 let (kv_next, t_next, d_next) = &next;
1025 if kv == kv_next && t == t_next {
1026 d.plus_equals(d_next);
1027 if d.is_zero() {
1028 consolidated = None;
1029 }
1030 } else {
1031 self.peek_stash = Some(next);
1032 break;
1033 }
1034 } else {
1035 consolidated = Some(next);
1036 }
1037 }
1038
1039 let (kv, t, d) = consolidated?;
1040
1041 Some((kv, t, d))
1042 }
1043
1044 fn decode_kv(&mut self, index: usize, key: &mut Option<K>, val: &mut Option<V>) -> (K, V) {
1045 let decoded = self
1046 .part
1047 .as_ref()
1048 .map_left(|codec| {
1049 let ((ck, cv), _, _) = codec.get(index).expect("valid index");
1050 let (k, v) = Self::decode_codec(
1051 &*self.metrics,
1052 self.migration.codec_read(),
1053 ck,
1054 cv,
1055 key,
1056 val,
1057 &mut self.key_storage,
1058 &mut self.val_storage,
1059 );
1060 (k.expect("valid legacy key"), v.expect("valid legacy value"))
1061 })
1062 .map_right(|(structured_key, structured_val)| {
1063 self.decode_structured(index, structured_key, structured_val, key, val)
1064 });
1065
1066 match decoded {
1067 EitherOrBoth::Both((k, v), (k_s, v_s)) => {
1068 let is_valid = self
1070 .metrics
1071 .columnar
1072 .arrow()
1073 .key()
1074 .report_valid(|| k_s == k);
1075 if !is_valid {
1076 soft_panic_no_log!("structured key did not match, {k_s:?} != {k:?}");
1077 }
1078 let is_valid = self
1080 .metrics
1081 .columnar
1082 .arrow()
1083 .val()
1084 .report_valid(|| v_s == v);
1085 if !is_valid {
1086 soft_panic_no_log!("structured val did not match, {v_s:?} != {v:?}");
1087 }
1088
1089 (k, v)
1090 }
1091 EitherOrBoth::Left(kv) => kv,
1092 EitherOrBoth::Right(kv) => kv,
1093 }
1094 }
1095
1096 fn decode_codec(
1097 metrics: &Metrics,
1098 read_schemas: &Schemas<K, V>,
1099 key_buf: &[u8],
1100 val_buf: &[u8],
1101 key: &mut Option<K>,
1102 val: &mut Option<V>,
1103 key_storage: &mut Option<K::Storage>,
1104 val_storage: &mut Option<V::Storage>,
1105 ) -> (Result<K, String>, Result<V, String>) {
1106 let k = metrics.codecs.key.decode(|| match key.take() {
1107 Some(mut key) => {
1108 match K::decode_from(&mut key, key_buf, key_storage, &read_schemas.key) {
1109 Ok(()) => Ok(key),
1110 Err(err) => Err(err),
1111 }
1112 }
1113 None => K::decode(key_buf, &read_schemas.key),
1114 });
1115 let v = metrics.codecs.val.decode(|| match val.take() {
1116 Some(mut val) => {
1117 match V::decode_from(&mut val, val_buf, val_storage, &read_schemas.val) {
1118 Ok(()) => Ok(val),
1119 Err(err) => Err(err),
1120 }
1121 }
1122 None => V::decode(val_buf, &read_schemas.val),
1123 });
1124 (k, v)
1125 }
1126
1127 fn decode_structured(
1128 &self,
1129 idx: usize,
1130 keys: &<K::Schema as Schema<K>>::Decoder,
1131 vals: &<V::Schema as Schema<V>>::Decoder,
1132 key: &mut Option<K>,
1133 val: &mut Option<V>,
1134 ) -> (K, V) {
1135 let mut key = key.take().unwrap_or_default();
1136 keys.decode(idx, &mut key);
1137
1138 let mut val = val.take().unwrap_or_default();
1139 vals.decode(idx, &mut val);
1140
1141 (key, val)
1142 }
1143}
1144
1145impl<K, V, T, D> Iterator for FetchedPart<K, V, T, D>
1146where
1147 K: Debug + Codec,
1148 V: Debug + Codec,
1149 T: Timestamp + Lattice + Codec64,
1150 D: Monoid + Codec64 + Send + Sync,
1151{
1152 type Item = ((K, V), T, D);
1153
1154 fn next(&mut self) -> Option<Self::Item> {
1155 self.next_with_storage(&mut None, &mut None)
1156 }
1157
1158 fn size_hint(&self) -> (usize, Option<usize>) {
1159 let max_len = self.timestamps.len();
1161 (0, Some(max_len))
1162 }
1163}
1164
1165impl<T> EncodedPart<T>
1166where
1167 T: Timestamp + Lattice + Codec64,
1168{
1169 pub async fn fetch(
1170 cfg: &FetchConfig,
1171 shard_id: &ShardId,
1172 blob: &dyn Blob,
1173 metrics: &Metrics,
1174 shard_metrics: &ShardMetrics,
1175 read_metrics: &ReadMetrics,
1176 registered_desc: &Description<T>,
1177 part: &BatchPart<T>,
1178 ) -> Result<Self, BlobKey> {
1179 match part {
1180 BatchPart::Hollow(x) => {
1181 fetch_batch_part(
1182 cfg,
1183 shard_id,
1184 blob,
1185 metrics,
1186 shard_metrics,
1187 read_metrics,
1188 registered_desc,
1189 x,
1190 )
1191 .await
1192 }
1193 BatchPart::Inline {
1194 updates,
1195 ts_rewrite,
1196 ..
1197 } => Ok(EncodedPart::from_inline(
1198 cfg,
1199 metrics,
1200 read_metrics.clone(),
1201 registered_desc.clone(),
1202 updates,
1203 ts_rewrite.as_ref(),
1204 )),
1205 }
1206 }
1207
1208 pub(crate) fn from_inline(
1209 cfg: &FetchConfig,
1210 metrics: &Metrics,
1211 read_metrics: ReadMetrics,
1212 desc: Description<T>,
1213 x: &LazyInlineBatchPart,
1214 ts_rewrite: Option<&Antichain<T>>,
1215 ) -> Self {
1216 let parsed = x.decode(&metrics.columnar).expect("valid inline part");
1217 Self::new(cfg, read_metrics, desc, "inline", ts_rewrite, parsed)
1218 }
1219
1220 pub(crate) fn from_hollow(
1221 cfg: &FetchConfig,
1222 metrics: ReadMetrics,
1223 registered_desc: Description<T>,
1224 part: &HollowBatchPart<T>,
1225 parsed: BlobTraceBatchPart<T>,
1226 ) -> Self {
1227 Self::new(
1228 cfg,
1229 metrics,
1230 registered_desc,
1231 &part.key.0,
1232 part.ts_rewrite.as_ref(),
1233 parsed,
1234 )
1235 }
1236
1237 pub(crate) fn new(
1238 cfg: &FetchConfig,
1239 metrics: ReadMetrics,
1240 registered_desc: Description<T>,
1241 printable_name: &str,
1242 ts_rewrite: Option<&Antichain<T>>,
1243 parsed: BlobTraceBatchPart<T>,
1244 ) -> Self {
1245 let inline_desc = &parsed.desc;
1260 let needs_truncation = inline_desc.lower() != registered_desc.lower()
1261 || inline_desc.upper() != registered_desc.upper();
1262 if needs_truncation {
1263 if cfg.validate_bounds_on_read {
1264 soft_assert_or_log!(
1265 PartialOrder::less_equal(inline_desc.lower(), registered_desc.lower()),
1266 "key={} inline={:?} registered={:?}",
1267 printable_name,
1268 inline_desc,
1269 registered_desc
1270 );
1271
1272 if ts_rewrite.is_none() {
1273 soft_assert_or_log!(
1278 PartialOrder::less_equal(registered_desc.upper(), inline_desc.upper()),
1279 "key={} inline={:?} registered={:?}",
1280 printable_name,
1281 inline_desc,
1282 registered_desc
1283 );
1284 }
1285 }
1286 assert_eq!(
1291 inline_desc.since(),
1292 &Antichain::from_elem(T::minimum()),
1293 "key={} inline={:?} registered={:?}",
1294 printable_name,
1295 inline_desc,
1296 registered_desc
1297 );
1298 } else {
1299 assert!(
1300 PartialOrder::less_equal(inline_desc.since(), registered_desc.since()),
1301 "key={} inline={:?} registered={:?}",
1302 printable_name,
1303 inline_desc,
1304 registered_desc
1305 );
1306 assert_eq!(
1307 inline_desc.lower(),
1308 registered_desc.lower(),
1309 "key={} inline={:?} registered={:?}",
1310 printable_name,
1311 inline_desc,
1312 registered_desc
1313 );
1314 assert_eq!(
1315 inline_desc.upper(),
1316 registered_desc.upper(),
1317 "key={} inline={:?} registered={:?}",
1318 printable_name,
1319 inline_desc,
1320 registered_desc
1321 );
1322 }
1323
1324 EncodedPart {
1325 metrics,
1326 registered_desc,
1327 part: parsed,
1328 needs_truncation,
1329 ts_rewrite: ts_rewrite.cloned(),
1330 }
1331 }
1332
1333 pub(crate) fn maybe_unconsolidated(&self) -> bool {
1334 self.part.desc.since().borrow() == AntichainRef::new(&[T::minimum()])
1337 }
1338
1339 pub(crate) fn updates(&self) -> &BlobTraceUpdates {
1340 &self.part.updates
1341 }
1342
1343 pub(crate) fn normalize(&self, metrics: &ColumnarMetrics) -> BlobTraceUpdates {
1345 let updates = self.part.updates.clone();
1346 if !self.needs_truncation && self.ts_rewrite.is_none() {
1347 return updates;
1348 }
1349
1350 let mut codec = updates
1351 .records()
1352 .map(|r| (r.keys().clone(), r.vals().clone()));
1353 let mut structured = updates.structured().cloned();
1354 let mut timestamps = updates.timestamps().clone();
1355 let mut diffs = updates.diffs().clone();
1356
1357 if let Some(rewrite) = self.ts_rewrite.as_ref() {
1358 timestamps = arrow::compute::unary(×tamps, |i: i64| {
1359 let mut t = T::decode(i.to_le_bytes());
1360 t.advance_by(rewrite.borrow());
1361 i64::from_le_bytes(T::encode(&t))
1362 });
1363 }
1364
1365 let reallocated = if self.needs_truncation {
1366 let filter = BooleanArray::from_unary(×tamps, |i| {
1367 let t = T::decode(i.to_le_bytes());
1368 let truncate_t = {
1369 !self.registered_desc.lower().less_equal(&t)
1370 || self.registered_desc.upper().less_equal(&t)
1371 };
1372 !truncate_t
1373 });
1374 if filter.false_count() == 0 {
1375 false
1377 } else {
1378 let filter = FilterBuilder::new(&filter).optimize().build();
1379 let do_filter = |array: &dyn Array| filter.filter(array).expect("valid filter len");
1380 if let Some((keys, vals)) = codec {
1381 codec = Some((
1382 realloc_array(do_filter(&keys).as_binary(), metrics),
1383 realloc_array(do_filter(&vals).as_binary(), metrics),
1384 ));
1385 }
1386 if let Some(ext) = structured {
1387 structured = Some(ColumnarRecordsStructuredExt {
1388 key: realloc_any(do_filter(&*ext.key), metrics),
1389 val: realloc_any(do_filter(&*ext.val), metrics),
1390 });
1391 }
1392 timestamps = realloc_array(do_filter(×tamps).as_primitive(), metrics);
1393 diffs = realloc_array(do_filter(&diffs).as_primitive(), metrics);
1394 true
1395 }
1396 } else {
1397 false
1398 };
1399
1400 if self.ts_rewrite.is_some() && !reallocated {
1401 timestamps = realloc_array(×tamps, metrics);
1402 }
1403
1404 if self.ts_rewrite.is_some() {
1405 self.metrics
1406 .ts_rewrite
1407 .inc_by(u64::cast_from(timestamps.len()));
1408 }
1409
1410 match (codec, structured) {
1411 (Some((key, value)), None) => {
1412 BlobTraceUpdates::Row(ColumnarRecords::new(key, value, timestamps, diffs))
1413 }
1414 (Some((key, value)), Some(ext)) => {
1415 BlobTraceUpdates::Both(ColumnarRecords::new(key, value, timestamps, diffs), ext)
1416 }
1417 (None, Some(ext)) => BlobTraceUpdates::Structured {
1418 key_values: ext,
1419 timestamps,
1420 diffs,
1421 },
1422 (None, None) => unreachable!(),
1423 }
1424 }
1425}
1426
1427#[derive(Debug, Serialize, Deserialize, Clone)]
1436pub struct ExchangeableBatchPart<T> {
1437 shard_id: ShardId,
1438 encoded_size_bytes: usize,
1440 desc: Description<T>,
1441 filter: FetchBatchFilter<T>,
1442 part: LazyProto<ProtoHollowBatchPart>,
1443 filter_pushdown_audit: bool,
1444}
1445
1446impl<T> ExchangeableBatchPart<T> {
1447 pub fn encoded_size_bytes(&self) -> usize {
1449 self.encoded_size_bytes
1450 }
1451}
1452
1453#[derive(Debug, Copy, Clone)]
1457pub enum PartDecodeFormat {
1458 Row {
1460 validate_structured: bool,
1462 },
1463 Arrow,
1465}
1466
1467impl PartDecodeFormat {
1468 pub const fn default() -> Self {
1470 PartDecodeFormat::Arrow
1471 }
1472
1473 pub fn from_str(s: &str) -> Self {
1476 match s {
1477 "row" => PartDecodeFormat::Row {
1478 validate_structured: false,
1479 },
1480 "row_with_validate" => PartDecodeFormat::Row {
1481 validate_structured: true,
1482 },
1483 "arrow" => PartDecodeFormat::Arrow,
1484 x => {
1485 let default = PartDecodeFormat::default();
1486 soft_panic_or_log!("Invalid part decode format: '{x}', falling back to {default}");
1487 default
1488 }
1489 }
1490 }
1491
1492 pub const fn as_str(&self) -> &'static str {
1494 match self {
1495 PartDecodeFormat::Row {
1496 validate_structured: false,
1497 } => "row",
1498 PartDecodeFormat::Row {
1499 validate_structured: true,
1500 } => "row_with_validate",
1501 PartDecodeFormat::Arrow => "arrow",
1502 }
1503 }
1504}
1505
1506impl fmt::Display for PartDecodeFormat {
1507 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1508 f.write_str(self.as_str())
1509 }
1510}
1511
1512#[mz_ore::test]
1513fn client_exchange_data() {
1514 fn is_exchange_data<T: timely::ExchangeData>() {}
1518 is_exchange_data::<ExchangeableBatchPart<u64>>();
1519 is_exchange_data::<ExchangeableBatchPart<u64>>();
1520}