1use std::fmt::{self, Debug};
13use std::marker::PhantomData;
14use std::sync::Arc;
15use std::time::{Duration, Instant};
16
17use anyhow::anyhow;
18use arrow::array::{Array, ArrayRef, AsArray, BooleanArray, Int64Array};
19use arrow::compute::FilterBuilder;
20use differential_dataflow::difference::Monoid;
21use differential_dataflow::lattice::Lattice;
22use differential_dataflow::trace::Description;
23use itertools::EitherOrBoth;
24use mz_dyncfg::{Config, ConfigSet, ConfigValHandle};
25use mz_ore::bytes::SegmentedBytes;
26use mz_ore::cast::CastFrom;
27use mz_ore::{soft_assert_or_log, soft_panic_no_log, soft_panic_or_log};
28use mz_persist::indexed::columnar::arrow::{realloc_any, realloc_array};
29use mz_persist::indexed::columnar::{ColumnarRecords, ColumnarRecordsStructuredExt};
30use mz_persist::indexed::encoding::{BlobTraceBatchPart, BlobTraceUpdates};
31use mz_persist::location::{Blob, SeqNo};
32use mz_persist::metrics::ColumnarMetrics;
33use mz_persist_types::arrow::ArrayOrd;
34use mz_persist_types::columnar::{ColumnDecoder, Schema, data_type};
35use mz_persist_types::part::Codec64Mut;
36use mz_persist_types::schema::backward_compatible;
37use mz_persist_types::stats::PartStats;
38use mz_persist_types::{Codec, Codec64};
39use mz_proto::RustType;
40use serde::{Deserialize, Serialize};
41use timely::PartialOrder;
42use timely::progress::frontier::AntichainRef;
43use timely::progress::{Antichain, Timestamp};
44use tracing::{Instrument, debug, debug_span, trace_span};
45
46use crate::ShardId;
47use crate::cfg::PersistConfig;
48use crate::error::InvalidUsage;
49use crate::internal::apply::Applier;
50use crate::internal::encoding::{LazyInlineBatchPart, LazyPartStats, LazyProto, Schemas};
51use crate::internal::machine::retry_external;
52use crate::internal::metrics::{Metrics, MetricsPermits, ReadMetrics, ShardMetrics};
53use crate::internal::paths::BlobKey;
54use crate::internal::state::{
55 BatchPart, HollowBatchPart, ProtoHollowBatchPart, ProtoInlineBatchPart,
56};
57use crate::read::LeasedReaderId;
58use crate::schema::{PartMigration, SchemaCache};
59
60pub(crate) const FETCH_SEMAPHORE_COST_ADJUSTMENT: Config<f64> = Config::new(
61 "persist_fetch_semaphore_cost_adjustment",
62 1.2,
66 "\
67 An adjustment multiplied by encoded_size_bytes to approximate an upper \
68 bound on the size in lgalloc, which includes the decoded version.",
69);
70
71pub(crate) const FETCH_SEMAPHORE_PERMIT_ADJUSTMENT: Config<f64> = Config::new(
72 "persist_fetch_semaphore_permit_adjustment",
73 1.0,
74 "\
75 A limit on the number of outstanding persist bytes being fetched and \
76 parsed, expressed as a multiplier of the process's memory limit. This data \
77 all spills to lgalloc, so values > 1.0 are safe. Only applied to cc \
78 replicas.",
79);
80
81pub(crate) const PART_DECODE_FORMAT: Config<&'static str> = Config::new(
82 "persist_part_decode_format",
83 PartDecodeFormat::default().as_str(),
84 "\
85 Format we'll use to decode a Persist Part, either 'row', \
86 'row_with_validate', or 'arrow' (Materialize).",
87);
88
89pub(crate) const OPTIMIZE_IGNORED_DATA_FETCH: Config<bool> = Config::new(
90 "persist_optimize_ignored_data_fetch",
91 true,
92 "CYA to allow opt-out of a performance optimization to skip fetching ignored data",
93);
94
95pub(crate) const VALIDATE_PART_BOUNDS_ON_READ: Config<bool> = Config::new(
96 "persist_validate_part_bounds_on_read",
97 false,
98 "Validate the part lower <= the batch lower and the part upper <= batch upper,\
99 for the batch containing that part",
100);
101
102#[derive(Debug, Clone)]
103pub(crate) struct FetchConfig {
104 pub(crate) validate_bounds_on_read: bool,
105}
106
107impl FetchConfig {
108 pub fn from_persist_config(cfg: &PersistConfig) -> Self {
109 Self {
110 validate_bounds_on_read: VALIDATE_PART_BOUNDS_ON_READ.get(cfg),
111 }
112 }
113}
114
115#[derive(Debug, Clone)]
116pub(crate) struct BatchFetcherConfig {
117 pub(crate) part_decode_format: ConfigValHandle<String>,
118 pub(crate) fetch_config: FetchConfig,
119}
120
121impl BatchFetcherConfig {
122 pub fn new(value: &PersistConfig) -> Self {
123 Self {
124 part_decode_format: PART_DECODE_FORMAT.handle(value),
125 fetch_config: FetchConfig::from_persist_config(value),
126 }
127 }
128
129 pub fn part_decode_format(&self) -> PartDecodeFormat {
130 PartDecodeFormat::from_str(self.part_decode_format.get().as_str())
131 }
132}
133
134#[derive(Debug)]
136pub struct BatchFetcher<K, V, T, D>
137where
138 T: Timestamp + Lattice + Codec64,
139 K: Debug + Codec,
141 V: Debug + Codec,
142 D: Monoid + Codec64 + Send + Sync,
143{
144 pub(crate) cfg: BatchFetcherConfig,
145 pub(crate) blob: Arc<dyn Blob>,
146 pub(crate) metrics: Arc<Metrics>,
147 pub(crate) shard_metrics: Arc<ShardMetrics>,
148 pub(crate) shard_id: ShardId,
149 pub(crate) read_schemas: Schemas<K, V>,
150 pub(crate) schema_cache: SchemaCache<K, V, T, D>,
151 pub(crate) is_transient: bool,
152
153 pub(crate) _phantom: PhantomData<fn() -> (K, V, T, D)>,
156}
157
158impl<K, V, T, D> BatchFetcher<K, V, T, D>
159where
160 K: Debug + Codec,
161 V: Debug + Codec,
162 T: Timestamp + Lattice + Codec64 + Sync,
163 D: Monoid + Codec64 + Send + Sync,
164{
165 pub async fn fetch_leased_part(
170 &mut self,
171 part: ExchangeableBatchPart<T>,
172 ) -> Result<Result<FetchedBlob<K, V, T, D>, BlobKey>, InvalidUsage<T>> {
173 let ExchangeableBatchPart {
174 shard_id,
175 encoded_size_bytes: _,
176 desc,
177 filter,
178 filter_pushdown_audit,
179 part,
180 reader_id: _,
181 } = part;
182 let part: BatchPart<T> = part.decode_to().expect("valid part");
183 if shard_id != self.shard_id {
184 return Err(InvalidUsage::BatchNotFromThisShard {
185 batch_shard: shard_id,
186 handle_shard: self.shard_id.clone(),
187 });
188 }
189
190 let migration =
191 PartMigration::new(&part, self.read_schemas.clone(), &mut self.schema_cache)
192 .await
193 .unwrap_or_else(|read_schemas| {
194 panic!(
195 "could not decode part {:?} with schema: {:?}",
196 part.schema_id(),
197 read_schemas
198 )
199 });
200
201 let (buf, fetch_permit) = match &part {
202 BatchPart::Hollow(x) => {
203 let fetch_permit = self
204 .metrics
205 .semaphore
206 .acquire_fetch_permits(x.encoded_size_bytes)
207 .await;
208 let read_metrics = if self.is_transient {
209 &self.metrics.read.unindexed
210 } else {
211 &self.metrics.read.batch_fetcher
212 };
213 let buf = fetch_batch_part_blob(
214 &shard_id,
215 self.blob.as_ref(),
216 &self.metrics,
217 &self.shard_metrics,
218 read_metrics,
219 x,
220 )
221 .await;
222 let buf = match buf {
223 Ok(buf) => buf,
224 Err(key) => return Ok(Err(key)),
225 };
226 let buf = FetchedBlobBuf::Hollow {
227 buf,
228 part: x.clone(),
229 };
230 (buf, Some(Arc::new(fetch_permit)))
231 }
232 BatchPart::Inline {
233 updates,
234 ts_rewrite,
235 ..
236 } => {
237 let buf = FetchedBlobBuf::Inline {
238 desc: desc.clone(),
239 updates: updates.clone(),
240 ts_rewrite: ts_rewrite.clone(),
241 };
242 (buf, None)
243 }
244 };
245 let fetched_blob = FetchedBlob {
246 metrics: Arc::clone(&self.metrics),
247 read_metrics: self.metrics.read.batch_fetcher.clone(),
248 buf,
249 registered_desc: desc.clone(),
250 migration,
251 filter: filter.clone(),
252 filter_pushdown_audit,
253 structured_part_audit: self.cfg.part_decode_format(),
254 fetch_permit,
255 _phantom: PhantomData,
256 fetch_config: self.cfg.fetch_config.clone(),
257 };
258 Ok(Ok(fetched_blob))
259 }
260
261 pub async fn missing_blob_diagnostics(&self, reader_id: &LeasedReaderId) -> String {
264 missing_blob_diagnostics(self.schema_cache.applier(), reader_id).await
265 }
266}
267
268pub(crate) async fn missing_blob_diagnostics<K, V, T, D>(
279 applier: &Applier<K, V, T, D>,
280 reader_id: &LeasedReaderId,
281) -> String
282where
283 K: Debug + Codec,
284 V: Debug + Codec,
285 T: Timestamp + Lattice + Codec64 + Sync,
286 D: Monoid + Codec64,
287{
288 let refresh = applier.fetch_and_update_state(None);
292 if tokio::time::timeout(Duration::from_secs(30), refresh)
293 .await
294 .is_err()
295 {
296 return format!(
297 "reader {reader_id}: could not refresh state within 30s to diagnose the lease; \
298 partitioned from consensus?"
299 );
300 }
301 match applier.reader_lease(reader_id.clone()) {
302 Some(lease_state) => format!(
303 "reader {reader_id} is still present in state ({lease_state:?}); \
304 a missing blob despite a live lease indicates a GC bug"
305 ),
306 None => format!(
307 "reader {reader_id} has been expired out of state; \
308 the process likely failed to heartbeat it within the lease duration \
309 (machine sleep, CPU/memory starvation, or a partition from consensus?)"
310 ),
311 }
312}
313
314#[derive(Debug, Clone, Serialize, Deserialize)]
315pub(crate) enum FetchBatchFilter<T> {
316 Snapshot {
317 as_of: Antichain<T>,
318 },
319 Listen {
320 as_of: Antichain<T>,
321 lower: Antichain<T>,
322 },
323 Compaction {
324 since: Antichain<T>,
325 },
326}
327
328impl<T: Timestamp + Lattice> FetchBatchFilter<T> {
329 pub(crate) fn filter_ts(&self, t: &mut T) -> bool {
330 match self {
331 FetchBatchFilter::Snapshot { as_of } => {
332 if as_of.less_than(t) {
334 return false;
335 }
336 t.advance_by(as_of.borrow());
337 true
338 }
339 FetchBatchFilter::Listen { as_of, lower } => {
340 if !as_of.less_than(t) {
342 return false;
343 }
344
345 if !lower.less_equal(t) {
353 return false;
354 }
355 true
356 }
357 FetchBatchFilter::Compaction { since } => {
358 t.advance_by(since.borrow());
359 true
360 }
361 }
362 }
363}
364
365pub(crate) async fn fetch_leased_part<K, V, T, D>(
370 cfg: &PersistConfig,
371 part: &LeasedBatchPart<T>,
372 blob: &dyn Blob,
373 metrics: Arc<Metrics>,
374 read_metrics: &ReadMetrics,
375 shard_metrics: &ShardMetrics,
376 reader_id: &LeasedReaderId,
377 read_schemas: Schemas<K, V>,
378 schema_cache: &mut SchemaCache<K, V, T, D>,
379) -> FetchedPart<K, V, T, D>
380where
381 K: Debug + Codec,
382 V: Debug + Codec,
383 T: Timestamp + Lattice + Codec64 + Sync,
384 D: Monoid + Codec64 + Send + Sync,
385{
386 let fetch_config = FetchConfig::from_persist_config(cfg);
387 let encoded_part = match EncodedPart::fetch(
388 &fetch_config,
389 &part.shard_id,
390 blob,
391 &metrics,
392 shard_metrics,
393 read_metrics,
394 &part.desc,
395 &part.part,
396 )
397 .await
398 {
399 Ok(x) => x,
400 Err(blob_key) => {
401 let diagnostics = missing_blob_diagnostics(schema_cache.applier(), reader_id).await;
410 panic!("could not fetch batch part {}: {}", blob_key, diagnostics)
411 }
412 };
413 let part_cfg = BatchFetcherConfig::new(cfg);
414 let migration = PartMigration::new(&part.part, read_schemas, schema_cache)
415 .await
416 .unwrap_or_else(|read_schemas| {
417 panic!(
418 "could not decode part {:?} with schema: {:?}",
419 part.part.schema_id(),
420 read_schemas
421 )
422 });
423 FetchedPart::new(
424 metrics,
425 encoded_part,
426 migration,
427 part.filter.clone(),
428 part.filter_pushdown_audit,
429 part_cfg.part_decode_format(),
430 part.part.stats(),
431 )
432}
433
434pub(crate) async fn fetch_batch_part_blob<T>(
435 shard_id: &ShardId,
436 blob: &dyn Blob,
437 metrics: &Metrics,
438 shard_metrics: &ShardMetrics,
439 read_metrics: &ReadMetrics,
440 part: &HollowBatchPart<T>,
441) -> Result<SegmentedBytes, BlobKey> {
442 let now = Instant::now();
443 let get_span = debug_span!("fetch_batch::get");
444 let blob_key = part.key.complete(shard_id);
445 let value = retry_external(&metrics.retries.external.fetch_batch_get, || async {
446 shard_metrics.blob_gets.inc();
447 blob.get(&blob_key).await
448 })
449 .instrument(get_span.clone())
450 .await
451 .ok_or(blob_key)?;
452
453 drop(get_span);
454
455 read_metrics.part_count.inc();
456 read_metrics.part_bytes.inc_by(u64::cast_from(value.len()));
457 read_metrics.seconds.inc_by(now.elapsed().as_secs_f64());
458
459 Ok(value)
460}
461
462pub(crate) fn decode_batch_part_blob<T>(
463 cfg: &FetchConfig,
464 metrics: &Metrics,
465 read_metrics: &ReadMetrics,
466 registered_desc: Description<T>,
467 part: &HollowBatchPart<T>,
468 buf: &SegmentedBytes,
469) -> EncodedPart<T>
470where
471 T: Timestamp + Lattice + Codec64,
472{
473 trace_span!("fetch_batch::decode").in_scope(|| {
474 let parsed = metrics
475 .codecs
476 .batch
477 .decode(|| BlobTraceBatchPart::decode(buf, &metrics.columnar))
478 .map_err(|err| anyhow!("couldn't decode batch at key {}: {}", part.key, err))
479 .expect("internal error: invalid encoded state");
484 read_metrics
485 .part_goodbytes
486 .inc_by(u64::cast_from(parsed.updates.goodbytes()));
487 EncodedPart::from_hollow(cfg, read_metrics.clone(), registered_desc, part, parsed)
488 })
489}
490
491pub(crate) async fn fetch_batch_part<T>(
492 cfg: &FetchConfig,
493 shard_id: &ShardId,
494 blob: &dyn Blob,
495 metrics: &Metrics,
496 shard_metrics: &ShardMetrics,
497 read_metrics: &ReadMetrics,
498 registered_desc: &Description<T>,
499 part: &HollowBatchPart<T>,
500) -> Result<EncodedPart<T>, BlobKey>
501where
502 T: Timestamp + Lattice + Codec64,
503{
504 let buf =
505 fetch_batch_part_blob(shard_id, blob, metrics, shard_metrics, read_metrics, part).await?;
506 let part = decode_batch_part_blob(
507 cfg,
508 metrics,
509 read_metrics,
510 registered_desc.clone(),
511 part,
512 &buf,
513 );
514 Ok(part)
515}
516
517#[derive(Clone, Debug)]
524pub struct Lease(Arc<SeqNo>);
525
526impl Lease {
527 pub fn new(seqno: SeqNo) -> Self {
529 Self(Arc::new(seqno))
530 }
531
532 pub fn seqno(&self) -> SeqNo {
534 *self.0
535 }
536
537 pub fn count(&self) -> usize {
539 Arc::strong_count(&self.0)
540 }
541}
542
543#[derive(Debug)]
569pub struct LeasedBatchPart<T> {
570 pub(crate) metrics: Arc<Metrics>,
571 pub(crate) shard_id: ShardId,
572 pub(crate) filter: FetchBatchFilter<T>,
573 pub(crate) desc: Description<T>,
574 pub(crate) part: BatchPart<T>,
575 pub(crate) lease: Lease,
578 pub(crate) reader_id: LeasedReaderId,
582 pub(crate) filter_pushdown_audit: bool,
583}
584
585impl<T> LeasedBatchPart<T>
586where
587 T: Timestamp + Codec64,
588{
589 pub(crate) fn into_exchangeable_part(self) -> (ExchangeableBatchPart<T>, Lease) {
599 let lease = self.lease.clone();
601 let part = ExchangeableBatchPart {
602 shard_id: self.shard_id,
603 encoded_size_bytes: self.part.encoded_size_bytes(),
604 desc: self.desc.clone(),
605 filter: self.filter.clone(),
606 part: LazyProto::from(&self.part.into_proto()),
607 reader_id: self.reader_id.clone(),
608 filter_pushdown_audit: self.filter_pushdown_audit,
609 };
610 (part, lease)
611 }
612
613 pub fn encoded_size_bytes(&self) -> usize {
615 self.part.encoded_size_bytes()
616 }
617
618 pub fn request_filter_pushdown_audit(&mut self) {
623 self.filter_pushdown_audit = true;
624 }
625
626 pub fn stats(&self) -> Option<PartStats> {
628 self.part.stats().map(|x| x.decode())
629 }
630
631 pub fn maybe_optimize(&mut self, cfg: &ConfigSet, key: ArrayRef, val: ArrayRef) {
634 assert_eq!(key.len(), 1, "expect a single-row key array");
635 assert_eq!(val.len(), 1, "expect a single-row val array");
636 let as_of = match &self.filter {
637 FetchBatchFilter::Snapshot { as_of } => as_of,
638 FetchBatchFilter::Listen { .. } | FetchBatchFilter::Compaction { .. } => return,
639 };
640 if !OPTIMIZE_IGNORED_DATA_FETCH.get(cfg) {
641 return;
642 }
643 let (diffs_sum, _stats) = match &self.part {
644 BatchPart::Hollow(x) => (x.diffs_sum, x.stats.as_ref()),
645 BatchPart::Inline { .. } => return,
646 };
647 debug!(
648 "try_optimize_ignored_data_fetch diffs_sum={:?} as_of={:?} lower={:?} upper={:?}",
649 diffs_sum.map(i64::decode),
651 as_of.elements(),
652 self.desc.lower().elements(),
653 self.desc.upper().elements()
654 );
655 let as_of = match &as_of.elements() {
656 &[as_of] => as_of,
657 _ => return,
658 };
659 let eligible = self.desc.upper().less_equal(as_of) && self.desc.since().less_equal(as_of);
660 if !eligible {
661 return;
662 }
663 let Some(diffs_sum) = diffs_sum else {
664 return;
665 };
666
667 debug!(
668 "try_optimize_ignored_data_fetch faked {:?} diffs at ts {:?} skipping fetch of {} bytes",
669 i64::decode(diffs_sum),
671 as_of,
672 self.part.encoded_size_bytes(),
673 );
674 self.metrics.pushdown.parts_faked_count.inc();
675 self.metrics
676 .pushdown
677 .parts_faked_bytes
678 .inc_by(u64::cast_from(self.part.encoded_size_bytes()));
679 let timestamps = {
680 let mut col = Codec64Mut::with_capacity(1);
681 col.push(as_of);
682 col.finish()
683 };
684 let diffs = {
685 let mut col = Codec64Mut::with_capacity(1);
686 col.push_raw(diffs_sum);
687 col.finish()
688 };
689 let updates = BlobTraceUpdates::Structured {
690 key_values: ColumnarRecordsStructuredExt { key, val },
691 timestamps,
692 diffs,
693 };
694 let faked_data = LazyInlineBatchPart::from(&ProtoInlineBatchPart {
695 desc: Some(self.desc.into_proto()),
696 index: 0,
697 updates: Some(updates.into_proto()),
698 });
699 self.part = BatchPart::Inline {
700 updates: faked_data,
701 ts_rewrite: None,
702 schema_id: None,
703 deprecated_schema_id: None,
704 };
705 }
706}
707
708impl<T> Drop for LeasedBatchPart<T> {
709 fn drop(&mut self) {
711 self.metrics.lease.dropped_part.inc()
712 }
713}
714
715#[derive(Debug)]
720pub struct FetchedBlob<K: Codec, V: Codec, T, D> {
721 metrics: Arc<Metrics>,
722 read_metrics: ReadMetrics,
723 buf: FetchedBlobBuf<T>,
724 registered_desc: Description<T>,
725 migration: PartMigration<K, V>,
726 filter: FetchBatchFilter<T>,
727 filter_pushdown_audit: bool,
728 structured_part_audit: PartDecodeFormat,
729 fetch_permit: Option<Arc<MetricsPermits>>,
730 fetch_config: FetchConfig,
731 _phantom: PhantomData<fn() -> D>,
732}
733
734#[derive(Debug, Clone)]
735enum FetchedBlobBuf<T> {
736 Hollow {
737 buf: SegmentedBytes,
738 part: HollowBatchPart<T>,
739 },
740 Inline {
741 desc: Description<T>,
742 updates: LazyInlineBatchPart,
743 ts_rewrite: Option<Antichain<T>>,
744 },
745}
746
747impl<K: Codec, V: Codec, T: Clone, D> Clone for FetchedBlob<K, V, T, D> {
748 fn clone(&self) -> Self {
749 Self {
750 metrics: Arc::clone(&self.metrics),
751 read_metrics: self.read_metrics.clone(),
752 buf: self.buf.clone(),
753 registered_desc: self.registered_desc.clone(),
754 migration: self.migration.clone(),
755 filter: self.filter.clone(),
756 filter_pushdown_audit: self.filter_pushdown_audit.clone(),
757 fetch_permit: self.fetch_permit.clone(),
758 structured_part_audit: self.structured_part_audit.clone(),
759 fetch_config: self.fetch_config.clone(),
760 _phantom: self._phantom.clone(),
761 }
762 }
763}
764
765pub struct ShardSourcePart<K: Codec, V: Codec, T, D> {
768 pub part: FetchedPart<K, V, T, D>,
770 fetch_permit: Option<Arc<MetricsPermits>>,
771}
772
773impl<K, V, T: Debug, D: Debug> Debug for ShardSourcePart<K, V, T, D>
774where
775 K: Codec + Debug,
776 <K as Codec>::Storage: Debug,
777 V: Codec + Debug,
778 <V as Codec>::Storage: Debug,
779{
780 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
781 let ShardSourcePart { part, fetch_permit } = self;
782 f.debug_struct("ShardSourcePart")
783 .field("part", part)
784 .field("fetch_permit", fetch_permit)
785 .finish()
786 }
787}
788
789impl<K: Codec, V: Codec, T: Timestamp + Lattice + Codec64, D> FetchedBlob<K, V, T, D> {
790 pub fn parse(&self) -> ShardSourcePart<K, V, T, D> {
792 self.parse_internal(&self.fetch_config)
793 }
794
795 pub(crate) fn parse_internal(&self, cfg: &FetchConfig) -> ShardSourcePart<K, V, T, D> {
797 let (part, stats) = match &self.buf {
798 FetchedBlobBuf::Hollow { buf, part } => {
799 let parsed = decode_batch_part_blob(
800 cfg,
801 &self.metrics,
802 &self.read_metrics,
803 self.registered_desc.clone(),
804 part,
805 buf,
806 );
807 (parsed, part.stats.as_ref())
808 }
809 FetchedBlobBuf::Inline {
810 desc,
811 updates,
812 ts_rewrite,
813 } => {
814 let parsed = EncodedPart::from_inline(
815 cfg,
816 &self.metrics,
817 self.read_metrics.clone(),
818 desc.clone(),
819 updates,
820 ts_rewrite.as_ref(),
821 );
822 (parsed, None)
823 }
824 };
825 let part = FetchedPart::new(
826 Arc::clone(&self.metrics),
827 part,
828 self.migration.clone(),
829 self.filter.clone(),
830 self.filter_pushdown_audit,
831 self.structured_part_audit,
832 stats,
833 );
834 ShardSourcePart {
835 part,
836 fetch_permit: self.fetch_permit.clone(),
837 }
838 }
839
840 pub fn stats(&self) -> Option<PartStats> {
842 match &self.buf {
843 FetchedBlobBuf::Hollow { part, .. } => part.stats.as_ref().map(|x| x.decode()),
844 FetchedBlobBuf::Inline { .. } => None,
845 }
846 }
847}
848
849#[derive(Debug)]
854pub struct FetchedPart<K: Codec, V: Codec, T, D> {
855 metrics: Arc<Metrics>,
856 ts_filter: FetchBatchFilter<T>,
857 part: EitherOrBoth<
860 ColumnarRecords,
861 (
862 <K::Schema as Schema<K>>::Decoder,
863 <V::Schema as Schema<V>>::Decoder,
864 ),
865 >,
866 timestamps: Int64Array,
867 diffs: Int64Array,
868 migration: PartMigration<K, V>,
869 filter_pushdown_audit: Option<LazyPartStats>,
870 peek_stash: Option<((K, V), T, D)>,
871 part_cursor: usize,
872 key_storage: Option<K::Storage>,
873 val_storage: Option<V::Storage>,
874
875 _phantom: PhantomData<fn() -> D>,
876}
877
878impl<K: Codec, V: Codec, T: Timestamp + Lattice + Codec64, D> FetchedPart<K, V, T, D> {
879 pub(crate) fn new(
880 metrics: Arc<Metrics>,
881 part: EncodedPart<T>,
882 migration: PartMigration<K, V>,
883 ts_filter: FetchBatchFilter<T>,
884 filter_pushdown_audit: bool,
885 part_decode_format: PartDecodeFormat,
886 stats: Option<&LazyPartStats>,
887 ) -> Self {
888 let part_len = u64::cast_from(part.part.updates.len());
889 match &migration {
890 PartMigration::SameSchema { .. } => metrics.schema.migration_count_same.inc(),
891 PartMigration::Schemaless { .. } => {
892 metrics.schema.migration_count_codec.inc();
893 metrics.schema.migration_len_legacy_codec.inc_by(part_len);
894 }
895 PartMigration::Either { .. } => {
896 metrics.schema.migration_count_either.inc();
897 match part_decode_format {
898 PartDecodeFormat::Row {
899 validate_structured: false,
900 } => metrics.schema.migration_len_either_codec.inc_by(part_len),
901 PartDecodeFormat::Row {
902 validate_structured: true,
903 } => {
904 metrics.schema.migration_len_either_codec.inc_by(part_len);
905 metrics.schema.migration_len_either_arrow.inc_by(part_len);
906 }
907 PartDecodeFormat::Arrow => {
908 metrics.schema.migration_len_either_arrow.inc_by(part_len)
909 }
910 }
911 }
912 }
913
914 let filter_pushdown_audit = if filter_pushdown_audit {
915 stats.cloned()
916 } else {
917 None
918 };
919
920 let downcast_structured = |structured: ColumnarRecordsStructuredExt,
921 structured_only: bool| {
922 let key_size_before = ArrayOrd::new(&structured.key).goodbytes();
923
924 let structured = match &migration {
925 PartMigration::SameSchema { .. } => structured,
926 PartMigration::Schemaless { read } if structured_only => {
927 let start = Instant::now();
929 let read_key = data_type::<K>(&*read.key).ok()?;
930 let read_val = data_type::<V>(&*read.val).ok()?;
931 let key_migration = backward_compatible(structured.key.data_type(), &read_key)?;
932 let val_migration = backward_compatible(structured.val.data_type(), &read_val)?;
933 let key = key_migration.migrate(structured.key);
934 let val = val_migration.migrate(structured.val);
935 metrics
936 .schema
937 .migration_migrate_seconds
938 .inc_by(start.elapsed().as_secs_f64());
939 ColumnarRecordsStructuredExt { key, val }
940 }
941 PartMigration::Schemaless { .. } => return None,
942 PartMigration::Either {
943 write: _,
944 read: _,
945 key_migration,
946 val_migration,
947 } => {
948 let start = Instant::now();
949 let key = key_migration.migrate(structured.key);
950 let val = val_migration.migrate(structured.val);
951 metrics
952 .schema
953 .migration_migrate_seconds
954 .inc_by(start.elapsed().as_secs_f64());
955 ColumnarRecordsStructuredExt { key, val }
956 }
957 };
958
959 let read_schema = migration.codec_read();
960 let key = K::Schema::decoder_any(&*read_schema.key, &*structured.key);
961 let val = V::Schema::decoder_any(&*read_schema.val, &*structured.val);
962
963 match &key {
964 Ok(key_decoder) => {
965 let key_size_after = key_decoder.goodbytes();
966 let key_diff = key_size_before.saturating_sub(key_size_after);
967 metrics
968 .pushdown
969 .parts_projection_trimmed_bytes
970 .inc_by(u64::cast_from(key_diff));
971 }
972 Err(e) => {
973 soft_panic_or_log!("failed to create decoder: {e:#?}");
974 }
975 }
976
977 Some((key.ok()?, val.ok()?))
978 };
979
980 let updates = part.normalize(&metrics.columnar);
981 let timestamps = updates.timestamps().clone();
982 let diffs = updates.diffs().clone();
983 let part = match updates {
984 BlobTraceUpdates::Row(records) => EitherOrBoth::Left(records),
986 BlobTraceUpdates::Structured { key_values, .. } => EitherOrBoth::Right(
987 downcast_structured(key_values, true).expect("valid schemas for structured data"),
990 ),
991 BlobTraceUpdates::Both(records, ext) => match part_decode_format {
993 PartDecodeFormat::Row {
994 validate_structured: false,
995 } => EitherOrBoth::Left(records),
996 PartDecodeFormat::Row {
997 validate_structured: true,
998 } => match downcast_structured(ext, false) {
999 Some(decoders) => EitherOrBoth::Both(records, decoders),
1000 None => EitherOrBoth::Left(records),
1001 },
1002 PartDecodeFormat::Arrow => match downcast_structured(ext, false) {
1003 Some(decoders) => EitherOrBoth::Right(decoders),
1004 None => EitherOrBoth::Left(records),
1005 },
1006 },
1007 };
1008
1009 FetchedPart {
1010 metrics,
1011 ts_filter,
1012 part,
1013 peek_stash: None,
1014 timestamps,
1015 diffs,
1016 migration,
1017 filter_pushdown_audit,
1018 part_cursor: 0,
1019 key_storage: None,
1020 val_storage: None,
1021 _phantom: PhantomData,
1022 }
1023 }
1024
1025 pub fn is_filter_pushdown_audit(&self) -> Option<impl std::fmt::Debug + use<K, V, T, D>> {
1031 self.filter_pushdown_audit.clone()
1032 }
1033}
1034
1035#[derive(Debug)]
1038pub(crate) struct EncodedPart<T> {
1039 metrics: ReadMetrics,
1040 registered_desc: Description<T>,
1041 part: BlobTraceBatchPart<T>,
1042 needs_truncation: bool,
1043 ts_rewrite: Option<Antichain<T>>,
1044}
1045
1046impl<K, V, T, D> FetchedPart<K, V, T, D>
1047where
1048 K: Debug + Codec,
1049 V: Debug + Codec,
1050 T: Timestamp + Lattice + Codec64,
1051 D: Monoid + Codec64 + Send + Sync,
1052{
1053 pub fn next_with_storage(
1058 &mut self,
1059 key: &mut Option<K>,
1060 val: &mut Option<V>,
1061 ) -> Option<((K, V), T, D)> {
1062 let mut consolidated = self.peek_stash.take();
1063 loop {
1064 let next = if self.part_cursor < self.timestamps.len() {
1066 let next_idx = self.part_cursor;
1067 self.part_cursor += 1;
1068 let mut t = T::decode(self.timestamps.values()[next_idx].to_le_bytes());
1071 if !self.ts_filter.filter_ts(&mut t) {
1072 continue;
1073 }
1074 let d = D::decode(self.diffs.values()[next_idx].to_le_bytes());
1075 if d.is_zero() {
1076 continue;
1077 }
1078 let kv = self.decode_kv(next_idx, key, val);
1079 (kv, t, d)
1080 } else {
1081 break;
1082 };
1083
1084 if let Some((kv, t, d)) = &mut consolidated {
1086 let (kv_next, t_next, d_next) = &next;
1087 if kv == kv_next && t == t_next {
1088 d.plus_equals(d_next);
1089 if d.is_zero() {
1090 consolidated = None;
1091 }
1092 } else {
1093 self.peek_stash = Some(next);
1094 break;
1095 }
1096 } else {
1097 consolidated = Some(next);
1098 }
1099 }
1100
1101 let (kv, t, d) = consolidated?;
1102
1103 Some((kv, t, d))
1104 }
1105
1106 fn decode_kv(&mut self, index: usize, key: &mut Option<K>, val: &mut Option<V>) -> (K, V) {
1107 let decoded = self
1108 .part
1109 .as_ref()
1110 .map_left(|codec| {
1111 let ((ck, cv), _, _) = codec.get(index).expect("valid index");
1112 let (k, v) = Self::decode_codec(
1113 &*self.metrics,
1114 self.migration.codec_read(),
1115 ck,
1116 cv,
1117 key,
1118 val,
1119 &mut self.key_storage,
1120 &mut self.val_storage,
1121 );
1122 (k.expect("valid legacy key"), v.expect("valid legacy value"))
1123 })
1124 .map_right(|(structured_key, structured_val)| {
1125 self.decode_structured(index, structured_key, structured_val, key, val)
1126 });
1127
1128 match decoded {
1129 EitherOrBoth::Both((k, v), (k_s, v_s)) => {
1130 let is_valid = self
1132 .metrics
1133 .columnar
1134 .arrow()
1135 .key()
1136 .report_valid(|| k_s == k);
1137 if !is_valid {
1138 soft_panic_no_log!("structured key did not match, {k_s:?} != {k:?}");
1139 }
1140 let is_valid = self
1142 .metrics
1143 .columnar
1144 .arrow()
1145 .val()
1146 .report_valid(|| v_s == v);
1147 if !is_valid {
1148 soft_panic_no_log!("structured val did not match, {v_s:?} != {v:?}");
1149 }
1150
1151 (k, v)
1152 }
1153 EitherOrBoth::Left(kv) => kv,
1154 EitherOrBoth::Right(kv) => kv,
1155 }
1156 }
1157
1158 fn decode_codec(
1159 metrics: &Metrics,
1160 read_schemas: &Schemas<K, V>,
1161 key_buf: &[u8],
1162 val_buf: &[u8],
1163 key: &mut Option<K>,
1164 val: &mut Option<V>,
1165 key_storage: &mut Option<K::Storage>,
1166 val_storage: &mut Option<V::Storage>,
1167 ) -> (Result<K, String>, Result<V, String>) {
1168 let k = metrics.codecs.key.decode(|| match key.take() {
1169 Some(mut key) => {
1170 match K::decode_from(&mut key, key_buf, key_storage, &read_schemas.key) {
1171 Ok(()) => Ok(key),
1172 Err(err) => Err(err),
1173 }
1174 }
1175 None => K::decode(key_buf, &read_schemas.key),
1176 });
1177 let v = metrics.codecs.val.decode(|| match val.take() {
1178 Some(mut val) => {
1179 match V::decode_from(&mut val, val_buf, val_storage, &read_schemas.val) {
1180 Ok(()) => Ok(val),
1181 Err(err) => Err(err),
1182 }
1183 }
1184 None => V::decode(val_buf, &read_schemas.val),
1185 });
1186 (k, v)
1187 }
1188
1189 fn decode_structured(
1190 &self,
1191 idx: usize,
1192 keys: &<K::Schema as Schema<K>>::Decoder,
1193 vals: &<V::Schema as Schema<V>>::Decoder,
1194 key: &mut Option<K>,
1195 val: &mut Option<V>,
1196 ) -> (K, V) {
1197 let mut key = key.take().unwrap_or_default();
1198 keys.decode(idx, &mut key);
1199
1200 let mut val = val.take().unwrap_or_default();
1201 vals.decode(idx, &mut val);
1202
1203 (key, val)
1204 }
1205}
1206
1207impl<K, V, T, D> Iterator for FetchedPart<K, V, T, D>
1208where
1209 K: Debug + Codec,
1210 V: Debug + Codec,
1211 T: Timestamp + Lattice + Codec64,
1212 D: Monoid + Codec64 + Send + Sync,
1213{
1214 type Item = ((K, V), T, D);
1215
1216 fn next(&mut self) -> Option<Self::Item> {
1217 self.next_with_storage(&mut None, &mut None)
1218 }
1219
1220 fn size_hint(&self) -> (usize, Option<usize>) {
1221 let max_len = self.timestamps.len();
1223 (0, Some(max_len))
1224 }
1225}
1226
1227impl<T> EncodedPart<T>
1228where
1229 T: Timestamp + Lattice + Codec64,
1230{
1231 pub async fn fetch(
1232 cfg: &FetchConfig,
1233 shard_id: &ShardId,
1234 blob: &dyn Blob,
1235 metrics: &Metrics,
1236 shard_metrics: &ShardMetrics,
1237 read_metrics: &ReadMetrics,
1238 registered_desc: &Description<T>,
1239 part: &BatchPart<T>,
1240 ) -> Result<Self, BlobKey> {
1241 match part {
1242 BatchPart::Hollow(x) => {
1243 fetch_batch_part(
1244 cfg,
1245 shard_id,
1246 blob,
1247 metrics,
1248 shard_metrics,
1249 read_metrics,
1250 registered_desc,
1251 x,
1252 )
1253 .await
1254 }
1255 BatchPart::Inline {
1256 updates,
1257 ts_rewrite,
1258 ..
1259 } => Ok(EncodedPart::from_inline(
1260 cfg,
1261 metrics,
1262 read_metrics.clone(),
1263 registered_desc.clone(),
1264 updates,
1265 ts_rewrite.as_ref(),
1266 )),
1267 }
1268 }
1269
1270 pub(crate) fn from_inline(
1271 cfg: &FetchConfig,
1272 metrics: &Metrics,
1273 read_metrics: ReadMetrics,
1274 desc: Description<T>,
1275 x: &LazyInlineBatchPart,
1276 ts_rewrite: Option<&Antichain<T>>,
1277 ) -> Self {
1278 let parsed = x.decode(&metrics.columnar).expect("valid inline part");
1279 Self::new(cfg, read_metrics, desc, "inline", ts_rewrite, parsed)
1280 }
1281
1282 pub(crate) fn from_hollow(
1283 cfg: &FetchConfig,
1284 metrics: ReadMetrics,
1285 registered_desc: Description<T>,
1286 part: &HollowBatchPart<T>,
1287 parsed: BlobTraceBatchPart<T>,
1288 ) -> Self {
1289 Self::new(
1290 cfg,
1291 metrics,
1292 registered_desc,
1293 &part.key.0,
1294 part.ts_rewrite.as_ref(),
1295 parsed,
1296 )
1297 }
1298
1299 pub(crate) fn new(
1300 cfg: &FetchConfig,
1301 metrics: ReadMetrics,
1302 registered_desc: Description<T>,
1303 printable_name: &str,
1304 ts_rewrite: Option<&Antichain<T>>,
1305 parsed: BlobTraceBatchPart<T>,
1306 ) -> Self {
1307 let inline_desc = &parsed.desc;
1322 let needs_truncation = inline_desc.lower() != registered_desc.lower()
1323 || inline_desc.upper() != registered_desc.upper();
1324 if needs_truncation {
1325 if cfg.validate_bounds_on_read {
1326 soft_assert_or_log!(
1327 PartialOrder::less_equal(inline_desc.lower(), registered_desc.lower()),
1328 "key={} inline={:?} registered={:?}",
1329 printable_name,
1330 inline_desc,
1331 registered_desc
1332 );
1333
1334 if ts_rewrite.is_none() {
1335 soft_assert_or_log!(
1340 PartialOrder::less_equal(registered_desc.upper(), inline_desc.upper()),
1341 "key={} inline={:?} registered={:?}",
1342 printable_name,
1343 inline_desc,
1344 registered_desc
1345 );
1346 }
1347 }
1348 assert_eq!(
1353 inline_desc.since(),
1354 &Antichain::from_elem(T::minimum()),
1355 "key={} inline={:?} registered={:?}",
1356 printable_name,
1357 inline_desc,
1358 registered_desc
1359 );
1360 } else {
1361 soft_assert_or_log!(
1362 PartialOrder::less_equal(inline_desc.since(), registered_desc.since()),
1363 "key={} inline={:?} registered={:?}",
1364 printable_name,
1365 inline_desc,
1366 registered_desc
1367 );
1368 assert_eq!(
1369 inline_desc.lower(),
1370 registered_desc.lower(),
1371 "key={} inline={:?} registered={:?}",
1372 printable_name,
1373 inline_desc,
1374 registered_desc
1375 );
1376 assert_eq!(
1377 inline_desc.upper(),
1378 registered_desc.upper(),
1379 "key={} inline={:?} registered={:?}",
1380 printable_name,
1381 inline_desc,
1382 registered_desc
1383 );
1384 }
1385
1386 EncodedPart {
1387 metrics,
1388 registered_desc,
1389 part: parsed,
1390 needs_truncation,
1391 ts_rewrite: ts_rewrite.cloned(),
1392 }
1393 }
1394
1395 pub(crate) fn maybe_unconsolidated(&self) -> bool {
1396 self.part.desc.since().borrow() == AntichainRef::new(&[T::minimum()])
1399 }
1400
1401 pub(crate) fn updates(&self) -> &BlobTraceUpdates {
1402 &self.part.updates
1403 }
1404
1405 pub(crate) fn normalize(&self, metrics: &ColumnarMetrics) -> BlobTraceUpdates {
1407 let updates = self.part.updates.clone();
1408 if !self.needs_truncation && self.ts_rewrite.is_none() {
1409 return updates;
1410 }
1411
1412 let mut codec = updates
1413 .records()
1414 .map(|r| (r.keys().clone(), r.vals().clone()));
1415 let mut structured = updates.structured().cloned();
1416 let mut timestamps = updates.timestamps().clone();
1417 let mut diffs = updates.diffs().clone();
1418
1419 if let Some(rewrite) = self.ts_rewrite.as_ref() {
1420 timestamps = arrow::compute::unary(×tamps, |i: i64| {
1421 let mut t = T::decode(i.to_le_bytes());
1422 t.advance_by(rewrite.borrow());
1423 i64::from_le_bytes(T::encode(&t))
1424 });
1425 }
1426
1427 let reallocated = if self.needs_truncation {
1428 let filter = BooleanArray::from_unary(×tamps, |i| {
1429 let t = T::decode(i.to_le_bytes());
1430 let truncate_t = {
1431 !self.registered_desc.lower().less_equal(&t)
1432 || self.registered_desc.upper().less_equal(&t)
1433 };
1434 !truncate_t
1435 });
1436 if filter.false_count() == 0 {
1437 false
1439 } else {
1440 let filter = FilterBuilder::new(&filter).optimize().build();
1441 let do_filter = |array: &dyn Array| filter.filter(array).expect("valid filter len");
1442 if let Some((keys, vals)) = codec {
1443 codec = Some((
1444 realloc_array(do_filter(&keys).as_binary(), metrics),
1445 realloc_array(do_filter(&vals).as_binary(), metrics),
1446 ));
1447 }
1448 if let Some(ext) = structured {
1449 structured = Some(ColumnarRecordsStructuredExt {
1450 key: realloc_any(do_filter(&*ext.key), metrics),
1451 val: realloc_any(do_filter(&*ext.val), metrics),
1452 });
1453 }
1454 timestamps = realloc_array(do_filter(×tamps).as_primitive(), metrics);
1455 diffs = realloc_array(do_filter(&diffs).as_primitive(), metrics);
1456 true
1457 }
1458 } else {
1459 false
1460 };
1461
1462 if self.ts_rewrite.is_some() && !reallocated {
1463 timestamps = realloc_array(×tamps, metrics);
1464 }
1465
1466 if self.ts_rewrite.is_some() {
1467 self.metrics
1468 .ts_rewrite
1469 .inc_by(u64::cast_from(timestamps.len()));
1470 }
1471
1472 match (codec, structured) {
1473 (Some((key, value)), None) => {
1474 BlobTraceUpdates::Row(ColumnarRecords::new(key, value, timestamps, diffs))
1475 }
1476 (Some((key, value)), Some(ext)) => {
1477 BlobTraceUpdates::Both(ColumnarRecords::new(key, value, timestamps, diffs), ext)
1478 }
1479 (None, Some(ext)) => BlobTraceUpdates::Structured {
1480 key_values: ext,
1481 timestamps,
1482 diffs,
1483 },
1484 (None, None) => unreachable!(),
1485 }
1486 }
1487}
1488
1489#[derive(Debug, Serialize, Deserialize, Clone)]
1498pub struct ExchangeableBatchPart<T> {
1499 shard_id: ShardId,
1500 encoded_size_bytes: usize,
1502 desc: Description<T>,
1503 filter: FetchBatchFilter<T>,
1504 part: LazyProto<ProtoHollowBatchPart>,
1505 reader_id: LeasedReaderId,
1508 filter_pushdown_audit: bool,
1509}
1510
1511impl<T> ExchangeableBatchPart<T> {
1512 pub fn encoded_size_bytes(&self) -> usize {
1514 self.encoded_size_bytes
1515 }
1516
1517 pub fn reader_id(&self) -> &LeasedReaderId {
1519 &self.reader_id
1520 }
1521}
1522
1523#[derive(Debug, Copy, Clone)]
1527pub enum PartDecodeFormat {
1528 Row {
1530 validate_structured: bool,
1532 },
1533 Arrow,
1535}
1536
1537impl PartDecodeFormat {
1538 pub const fn default() -> Self {
1540 PartDecodeFormat::Arrow
1541 }
1542
1543 pub fn from_str(s: &str) -> Self {
1546 match s {
1547 "row" => PartDecodeFormat::Row {
1548 validate_structured: false,
1549 },
1550 "row_with_validate" => PartDecodeFormat::Row {
1551 validate_structured: true,
1552 },
1553 "arrow" => PartDecodeFormat::Arrow,
1554 x => {
1555 let default = PartDecodeFormat::default();
1556 soft_panic_or_log!("Invalid part decode format: '{x}', falling back to {default}");
1557 default
1558 }
1559 }
1560 }
1561
1562 pub const fn as_str(&self) -> &'static str {
1564 match self {
1565 PartDecodeFormat::Row {
1566 validate_structured: false,
1567 } => "row",
1568 PartDecodeFormat::Row {
1569 validate_structured: true,
1570 } => "row_with_validate",
1571 PartDecodeFormat::Arrow => "arrow",
1572 }
1573 }
1574}
1575
1576impl fmt::Display for PartDecodeFormat {
1577 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1578 f.write_str(self.as_str())
1579 }
1580}
1581
1582#[mz_ore::test]
1583fn client_exchange_data() {
1584 fn is_exchange_data<T: timely::ExchangeData>() {}
1588 is_exchange_data::<ExchangeableBatchPart<u64>>();
1589 is_exchange_data::<ExchangeableBatchPart<u64>>();
1590}