1use std::fmt::{self, Debug};
13use std::marker::PhantomData;
14use std::sync::Arc;
15use std::time::Instant;
16
17use anyhow::anyhow;
18use arrow::array::{Array, ArrayRef, AsArray, BooleanArray, Int64Array};
19use arrow::compute::FilterBuilder;
20use differential_dataflow::difference::Semigroup;
21use differential_dataflow::lattice::Lattice;
22use differential_dataflow::trace::Description;
23use itertools::EitherOrBoth;
24use mz_dyncfg::{Config, ConfigSet, ConfigValHandle};
25use mz_ore::bytes::SegmentedBytes;
26use mz_ore::cast::CastFrom;
27use mz_ore::{soft_panic_no_log, soft_panic_or_log};
28use mz_persist::indexed::columnar::arrow::{realloc_any, realloc_array};
29use mz_persist::indexed::columnar::{ColumnarRecords, ColumnarRecordsStructuredExt};
30use mz_persist::indexed::encoding::{BlobTraceBatchPart, BlobTraceUpdates};
31use mz_persist::location::{Blob, SeqNo};
32use mz_persist::metrics::ColumnarMetrics;
33use mz_persist_types::arrow::ArrayOrd;
34use mz_persist_types::columnar::{ColumnDecoder, Schema, data_type};
35use mz_persist_types::part::Codec64Mut;
36use mz_persist_types::schema::backward_compatible;
37use mz_persist_types::stats::PartStats;
38use mz_persist_types::{Codec, Codec64};
39use mz_proto::{IntoRustIfSome, ProtoType, RustType, TryFromProtoError};
40use serde::{Deserialize, Serialize};
41use timely::PartialOrder;
42use timely::progress::frontier::AntichainRef;
43use timely::progress::{Antichain, Timestamp};
44use tracing::{Instrument, debug, debug_span, trace_span};
45
46use crate::ShardId;
47use crate::batch::{
48 ProtoFetchBatchFilter, ProtoFetchBatchFilterListen, ProtoLease, ProtoLeasedBatchPart,
49 proto_fetch_batch_filter,
50};
51use crate::cfg::PersistConfig;
52use crate::error::InvalidUsage;
53use crate::internal::encoding::{LazyInlineBatchPart, LazyPartStats, LazyProto, Schemas};
54use crate::internal::machine::retry_external;
55use crate::internal::metrics::{Metrics, MetricsPermits, ReadMetrics, ShardMetrics};
56use crate::internal::paths::BlobKey;
57use crate::internal::state::{BatchPart, HollowBatchPart, ProtoInlineBatchPart};
58use crate::read::LeasedReaderId;
59use crate::schema::{PartMigration, SchemaCache};
60
61pub(crate) const FETCH_SEMAPHORE_COST_ADJUSTMENT: Config<f64> = Config::new(
62 "persist_fetch_semaphore_cost_adjustment",
63 1.2,
67 "\
68 An adjustment multiplied by encoded_size_bytes to approximate an upper \
69 bound on the size in lgalloc, which includes the decoded version.",
70);
71
72pub(crate) const FETCH_SEMAPHORE_PERMIT_ADJUSTMENT: Config<f64> = Config::new(
73 "persist_fetch_semaphore_permit_adjustment",
74 1.0,
75 "\
76 A limit on the number of outstanding persist bytes being fetched and \
77 parsed, expressed as a multiplier of the process's memory limit. This data \
78 all spills to lgalloc, so values > 1.0 are safe. Only applied to cc \
79 replicas.",
80);
81
82pub(crate) const PART_DECODE_FORMAT: Config<&'static str> = Config::new(
83 "persist_part_decode_format",
84 PartDecodeFormat::default().as_str(),
85 "\
86 Format we'll use to decode a Persist Part, either 'row', \
87 'row_with_validate', or 'arrow' (Materialize).",
88);
89
90pub(crate) const OPTIMIZE_IGNORED_DATA_FETCH: Config<bool> = Config::new(
91 "persist_optimize_ignored_data_fetch",
92 true,
93 "CYA to allow opt-out of a performance optimization to skip fetching ignored data",
94);
95
96#[derive(Debug, Clone)]
97pub(crate) struct BatchFetcherConfig {
98 pub(crate) part_decode_format: ConfigValHandle<String>,
99}
100
101impl BatchFetcherConfig {
102 pub fn new(value: &PersistConfig) -> Self {
103 BatchFetcherConfig {
104 part_decode_format: PART_DECODE_FORMAT.handle(value),
105 }
106 }
107
108 pub fn part_decode_format(&self) -> PartDecodeFormat {
109 PartDecodeFormat::from_str(self.part_decode_format.get().as_str())
110 }
111}
112
113#[derive(Debug)]
115pub struct BatchFetcher<K, V, T, D>
116where
117 T: Timestamp + Lattice + Codec64,
118 K: Debug + Codec,
120 V: Debug + Codec,
121 D: Semigroup + Codec64 + Send + Sync,
122{
123 pub(crate) cfg: BatchFetcherConfig,
124 pub(crate) blob: Arc<dyn Blob>,
125 pub(crate) metrics: Arc<Metrics>,
126 pub(crate) shard_metrics: Arc<ShardMetrics>,
127 pub(crate) shard_id: ShardId,
128 pub(crate) read_schemas: Schemas<K, V>,
129 pub(crate) schema_cache: SchemaCache<K, V, T, D>,
130 pub(crate) is_transient: bool,
131
132 pub(crate) _phantom: PhantomData<fn() -> (K, V, T, D)>,
135}
136
137impl<K, V, T, D> BatchFetcher<K, V, T, D>
138where
139 K: Debug + Codec,
140 V: Debug + Codec,
141 T: Timestamp + Lattice + Codec64 + Sync,
142 D: Semigroup + Codec64 + Send + Sync,
143{
144 pub fn leased_part_from_exchangeable(&self, x: SerdeLeasedBatchPart) -> LeasedBatchPart<T> {
146 x.decode(Arc::clone(&self.metrics))
147 }
148
149 pub async fn fetch_leased_part(
154 &mut self,
155 part: &LeasedBatchPart<T>,
156 ) -> Result<FetchedBlob<K, V, T, D>, InvalidUsage<T>> {
157 if &part.shard_id != &self.shard_id {
158 let batch_shard = part.shard_id.clone();
159 return Err(InvalidUsage::BatchNotFromThisShard {
160 batch_shard,
161 handle_shard: self.shard_id.clone(),
162 });
163 }
164
165 let migration = PartMigration::new(
166 &part.part,
167 self.read_schemas.clone(),
168 &mut self.schema_cache,
169 )
170 .await
171 .unwrap_or_else(|read_schemas| {
172 panic!(
173 "could not decode part {:?} with schema: {:?}",
174 part.part.schema_id(),
175 read_schemas
176 )
177 });
178
179 let (buf, fetch_permit) = match &part.part {
180 BatchPart::Hollow(x) => {
181 let fetch_permit = self
182 .metrics
183 .semaphore
184 .acquire_fetch_permits(x.encoded_size_bytes)
185 .await;
186 let read_metrics = if self.is_transient {
187 &self.metrics.read.unindexed
188 } else {
189 &self.metrics.read.batch_fetcher
190 };
191 let buf = fetch_batch_part_blob(
192 &part.shard_id,
193 self.blob.as_ref(),
194 &self.metrics,
195 &self.shard_metrics,
196 read_metrics,
197 x,
198 )
199 .await
200 .unwrap_or_else(|blob_key| {
201 panic!("batch fetcher could not fetch batch part: {}", blob_key)
210 });
211 let buf = FetchedBlobBuf::Hollow {
212 buf,
213 part: x.clone(),
214 };
215 (buf, Some(Arc::new(fetch_permit)))
216 }
217 BatchPart::Inline {
218 updates,
219 ts_rewrite,
220 ..
221 } => {
222 let buf = FetchedBlobBuf::Inline {
223 desc: part.desc.clone(),
224 updates: updates.clone(),
225 ts_rewrite: ts_rewrite.clone(),
226 };
227 (buf, None)
228 }
229 };
230 let fetched_blob = FetchedBlob {
231 metrics: Arc::clone(&self.metrics),
232 read_metrics: self.metrics.read.batch_fetcher.clone(),
233 buf,
234 registered_desc: part.desc.clone(),
235 migration,
236 filter: part.filter.clone(),
237 filter_pushdown_audit: part.filter_pushdown_audit,
238 structured_part_audit: self.cfg.part_decode_format(),
239 fetch_permit,
240 _phantom: PhantomData,
241 };
242 Ok(fetched_blob)
243 }
244}
245
246#[derive(Debug, Clone)]
247pub(crate) enum FetchBatchFilter<T> {
248 Snapshot {
249 as_of: Antichain<T>,
250 },
251 Listen {
252 as_of: Antichain<T>,
253 lower: Antichain<T>,
254 },
255 Compaction {
256 since: Antichain<T>,
257 },
258}
259
260impl<T: Timestamp + Lattice> FetchBatchFilter<T> {
261 pub(crate) fn filter_ts(&self, t: &mut T) -> bool {
262 match self {
263 FetchBatchFilter::Snapshot { as_of } => {
264 if as_of.less_than(t) {
266 return false;
267 }
268 t.advance_by(as_of.borrow());
269 true
270 }
271 FetchBatchFilter::Listen { as_of, lower } => {
272 if !as_of.less_than(t) {
274 return false;
275 }
276
277 if !lower.less_equal(t) {
285 return false;
286 }
287 true
288 }
289 FetchBatchFilter::Compaction { since } => {
290 t.advance_by(since.borrow());
291 true
292 }
293 }
294 }
295}
296
297impl<T: Timestamp + Codec64> RustType<ProtoFetchBatchFilter> for FetchBatchFilter<T> {
298 fn into_proto(&self) -> ProtoFetchBatchFilter {
299 let kind = match self {
300 FetchBatchFilter::Snapshot { as_of } => {
301 proto_fetch_batch_filter::Kind::Snapshot(as_of.into_proto())
302 }
303 FetchBatchFilter::Listen { as_of, lower } => {
304 proto_fetch_batch_filter::Kind::Listen(ProtoFetchBatchFilterListen {
305 as_of: Some(as_of.into_proto()),
306 lower: Some(lower.into_proto()),
307 })
308 }
309 FetchBatchFilter::Compaction { .. } => unreachable!("not serialized"),
310 };
311 ProtoFetchBatchFilter { kind: Some(kind) }
312 }
313
314 fn from_proto(proto: ProtoFetchBatchFilter) -> Result<Self, TryFromProtoError> {
315 let kind = proto
316 .kind
317 .ok_or_else(|| TryFromProtoError::missing_field("ProtoFetchBatchFilter::kind"))?;
318 match kind {
319 proto_fetch_batch_filter::Kind::Snapshot(as_of) => Ok(FetchBatchFilter::Snapshot {
320 as_of: as_of.into_rust()?,
321 }),
322 proto_fetch_batch_filter::Kind::Listen(ProtoFetchBatchFilterListen {
323 as_of,
324 lower,
325 }) => Ok(FetchBatchFilter::Listen {
326 as_of: as_of.into_rust_if_some("ProtoFetchBatchFilterListen::as_of")?,
327 lower: lower.into_rust_if_some("ProtoFetchBatchFilterListen::lower")?,
328 }),
329 }
330 }
331}
332
333pub(crate) async fn fetch_leased_part<K, V, T, D>(
338 cfg: &PersistConfig,
339 part: &LeasedBatchPart<T>,
340 blob: &dyn Blob,
341 metrics: Arc<Metrics>,
342 read_metrics: &ReadMetrics,
343 shard_metrics: &ShardMetrics,
344 reader_id: &LeasedReaderId,
345 read_schemas: Schemas<K, V>,
346 schema_cache: &mut SchemaCache<K, V, T, D>,
347) -> FetchedPart<K, V, T, D>
348where
349 K: Debug + Codec,
350 V: Debug + Codec,
351 T: Timestamp + Lattice + Codec64 + Sync,
352 D: Semigroup + Codec64 + Send + Sync,
353{
354 let encoded_part = EncodedPart::fetch(
355 &part.shard_id,
356 blob,
357 &metrics,
358 shard_metrics,
359 read_metrics,
360 &part.desc,
361 &part.part,
362 )
363 .await
364 .unwrap_or_else(|blob_key| {
365 panic!("{} could not fetch batch part: {}", reader_id, blob_key)
374 });
375 let part_cfg = BatchFetcherConfig::new(cfg);
376 let migration = PartMigration::new(&part.part, read_schemas, schema_cache)
377 .await
378 .unwrap_or_else(|read_schemas| {
379 panic!(
380 "could not decode part {:?} with schema: {:?}",
381 part.part.schema_id(),
382 read_schemas
383 )
384 });
385 FetchedPart::new(
386 metrics,
387 encoded_part,
388 migration,
389 part.filter.clone(),
390 part.filter_pushdown_audit,
391 part_cfg.part_decode_format(),
392 part.part.stats(),
393 )
394}
395
396pub(crate) async fn fetch_batch_part_blob<T>(
397 shard_id: &ShardId,
398 blob: &dyn Blob,
399 metrics: &Metrics,
400 shard_metrics: &ShardMetrics,
401 read_metrics: &ReadMetrics,
402 part: &HollowBatchPart<T>,
403) -> Result<SegmentedBytes, BlobKey> {
404 let now = Instant::now();
405 let get_span = debug_span!("fetch_batch::get");
406 let blob_key = part.key.complete(shard_id);
407 let value = retry_external(&metrics.retries.external.fetch_batch_get, || async {
408 shard_metrics.blob_gets.inc();
409 blob.get(&blob_key).await
410 })
411 .instrument(get_span.clone())
412 .await
413 .ok_or(blob_key)?;
414
415 drop(get_span);
416
417 read_metrics.part_count.inc();
418 read_metrics.part_bytes.inc_by(u64::cast_from(value.len()));
419 read_metrics.seconds.inc_by(now.elapsed().as_secs_f64());
420
421 Ok(value)
422}
423
424pub(crate) fn decode_batch_part_blob<T>(
425 metrics: &Metrics,
426 read_metrics: &ReadMetrics,
427 registered_desc: Description<T>,
428 part: &HollowBatchPart<T>,
429 buf: &SegmentedBytes,
430) -> EncodedPart<T>
431where
432 T: Timestamp + Lattice + Codec64,
433{
434 trace_span!("fetch_batch::decode").in_scope(|| {
435 let parsed = metrics
436 .codecs
437 .batch
438 .decode(|| BlobTraceBatchPart::decode(buf, &metrics.columnar))
439 .map_err(|err| anyhow!("couldn't decode batch at key {}: {}", part.key, err))
440 .expect("internal error: invalid encoded state");
445 read_metrics
446 .part_goodbytes
447 .inc_by(u64::cast_from(parsed.updates.goodbytes()));
448 EncodedPart::from_hollow(read_metrics.clone(), registered_desc, part, parsed)
449 })
450}
451
452pub(crate) async fn fetch_batch_part<T>(
453 shard_id: &ShardId,
454 blob: &dyn Blob,
455 metrics: &Metrics,
456 shard_metrics: &ShardMetrics,
457 read_metrics: &ReadMetrics,
458 registered_desc: &Description<T>,
459 part: &HollowBatchPart<T>,
460) -> Result<EncodedPart<T>, BlobKey>
461where
462 T: Timestamp + Lattice + Codec64,
463{
464 let buf =
465 fetch_batch_part_blob(shard_id, blob, metrics, shard_metrics, read_metrics, part).await?;
466 let part = decode_batch_part_blob(metrics, read_metrics, registered_desc.clone(), part, &buf);
467 Ok(part)
468}
469
470#[derive(Clone, Debug, Default)]
477pub(crate) struct Lease(Arc<()>);
478
479impl Lease {
480 pub fn count(&self) -> usize {
482 Arc::strong_count(&self.0)
483 }
484}
485
486#[derive(Debug)]
512pub struct LeasedBatchPart<T> {
513 pub(crate) metrics: Arc<Metrics>,
514 pub(crate) shard_id: ShardId,
515 pub(crate) reader_id: LeasedReaderId,
516 pub(crate) filter: FetchBatchFilter<T>,
517 pub(crate) desc: Description<T>,
518 pub(crate) part: BatchPart<T>,
519 pub(crate) leased_seqno: SeqNo,
523 pub(crate) lease: Option<Lease>,
526 pub(crate) filter_pushdown_audit: bool,
527}
528
529impl<T> LeasedBatchPart<T>
530where
531 T: Timestamp + Codec64,
532{
533 pub(crate) fn into_exchangeable_part(mut self) -> (SerdeLeasedBatchPart, Option<Lease>) {
543 let (proto, _metrics) = self.into_proto();
544 let lease = self.lease.take();
546 let part = SerdeLeasedBatchPart {
547 encoded_size_bytes: self.part.encoded_size_bytes(),
548 proto: LazyProto::from(&proto),
549 };
550 (part, lease)
551 }
552
553 pub fn encoded_size_bytes(&self) -> usize {
555 self.part.encoded_size_bytes()
556 }
557
558 pub fn request_filter_pushdown_audit(&mut self) {
563 self.filter_pushdown_audit = true;
564 }
565
566 pub fn stats(&self) -> Option<PartStats> {
568 self.part.stats().map(|x| x.decode())
569 }
570
571 pub fn maybe_optimize(&mut self, cfg: &ConfigSet, key: ArrayRef, val: ArrayRef) {
574 assert_eq!(key.len(), 1, "expect a single-row key array");
575 assert_eq!(val.len(), 1, "expect a single-row val array");
576 let as_of = match &self.filter {
577 FetchBatchFilter::Snapshot { as_of } => as_of,
578 FetchBatchFilter::Listen { .. } | FetchBatchFilter::Compaction { .. } => return,
579 };
580 if !OPTIMIZE_IGNORED_DATA_FETCH.get(cfg) {
581 return;
582 }
583 let (diffs_sum, _stats) = match &self.part {
584 BatchPart::Hollow(x) => (x.diffs_sum, x.stats.as_ref()),
585 BatchPart::Inline { .. } => return,
586 };
587 debug!(
588 "try_optimize_ignored_data_fetch diffs_sum={:?} as_of={:?} lower={:?} upper={:?}",
589 diffs_sum.map(i64::decode),
591 as_of.elements(),
592 self.desc.lower().elements(),
593 self.desc.upper().elements()
594 );
595 let as_of = match &as_of.elements() {
596 &[as_of] => as_of,
597 _ => return,
598 };
599 let eligible = self.desc.upper().less_equal(as_of) && self.desc.since().less_equal(as_of);
600 if !eligible {
601 return;
602 }
603 let Some(diffs_sum) = diffs_sum else {
604 return;
605 };
606
607 debug!(
608 "try_optimize_ignored_data_fetch faked {:?} diffs at ts {:?} skipping fetch of {} bytes",
609 i64::decode(diffs_sum),
611 as_of,
612 self.part.encoded_size_bytes(),
613 );
614 self.metrics.pushdown.parts_faked_count.inc();
615 self.metrics
616 .pushdown
617 .parts_faked_bytes
618 .inc_by(u64::cast_from(self.part.encoded_size_bytes()));
619 let timestamps = {
620 let mut col = Codec64Mut::with_capacity(1);
621 col.push(as_of);
622 col.finish()
623 };
624 let diffs = {
625 let mut col = Codec64Mut::with_capacity(1);
626 col.push_raw(diffs_sum);
627 col.finish()
628 };
629 let updates = BlobTraceUpdates::Structured {
630 key_values: ColumnarRecordsStructuredExt { key, val },
631 timestamps,
632 diffs,
633 };
634 let faked_data = LazyInlineBatchPart::from(&ProtoInlineBatchPart {
635 desc: Some(self.desc.into_proto()),
636 index: 0,
637 updates: Some(updates.into_proto()),
638 });
639 self.part = BatchPart::Inline {
640 updates: faked_data,
641 ts_rewrite: None,
642 schema_id: None,
643 deprecated_schema_id: None,
644 };
645 }
646}
647
648impl<T> Drop for LeasedBatchPart<T> {
649 fn drop(&mut self) {
651 self.metrics.lease.dropped_part.inc()
652 }
653}
654
655#[derive(Debug)]
660pub struct FetchedBlob<K: Codec, V: Codec, T, D> {
661 metrics: Arc<Metrics>,
662 read_metrics: ReadMetrics,
663 buf: FetchedBlobBuf<T>,
664 registered_desc: Description<T>,
665 migration: PartMigration<K, V>,
666 filter: FetchBatchFilter<T>,
667 filter_pushdown_audit: bool,
668 structured_part_audit: PartDecodeFormat,
669 fetch_permit: Option<Arc<MetricsPermits>>,
670 _phantom: PhantomData<fn() -> D>,
671}
672
673#[derive(Debug, Clone)]
674enum FetchedBlobBuf<T> {
675 Hollow {
676 buf: SegmentedBytes,
677 part: HollowBatchPart<T>,
678 },
679 Inline {
680 desc: Description<T>,
681 updates: LazyInlineBatchPart,
682 ts_rewrite: Option<Antichain<T>>,
683 },
684}
685
686impl<K: Codec, V: Codec, T: Clone, D> Clone for FetchedBlob<K, V, T, D> {
687 fn clone(&self) -> Self {
688 Self {
689 metrics: Arc::clone(&self.metrics),
690 read_metrics: self.read_metrics.clone(),
691 buf: self.buf.clone(),
692 registered_desc: self.registered_desc.clone(),
693 migration: self.migration.clone(),
694 filter: self.filter.clone(),
695 filter_pushdown_audit: self.filter_pushdown_audit.clone(),
696 fetch_permit: self.fetch_permit.clone(),
697 structured_part_audit: self.structured_part_audit.clone(),
698 _phantom: self._phantom.clone(),
699 }
700 }
701}
702
703pub struct ShardSourcePart<K: Codec, V: Codec, T, D> {
706 pub part: FetchedPart<K, V, T, D>,
708 fetch_permit: Option<Arc<MetricsPermits>>,
709}
710
711impl<K, V, T: Debug, D: Debug> Debug for ShardSourcePart<K, V, T, D>
712where
713 K: Codec + Debug,
714 <K as Codec>::Storage: Debug,
715 V: Codec + Debug,
716 <V as Codec>::Storage: Debug,
717{
718 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
719 let ShardSourcePart { part, fetch_permit } = self;
720 f.debug_struct("ShardSourcePart")
721 .field("part", part)
722 .field("fetch_permit", fetch_permit)
723 .finish()
724 }
725}
726
727impl<K: Codec, V: Codec, T: Timestamp + Lattice + Codec64, D> FetchedBlob<K, V, T, D> {
728 pub fn parse(&self) -> ShardSourcePart<K, V, T, D> {
730 let (part, stats) = match &self.buf {
731 FetchedBlobBuf::Hollow { buf, part } => {
732 let parsed = decode_batch_part_blob(
733 &self.metrics,
734 &self.read_metrics,
735 self.registered_desc.clone(),
736 part,
737 buf,
738 );
739 (parsed, part.stats.as_ref())
740 }
741 FetchedBlobBuf::Inline {
742 desc,
743 updates,
744 ts_rewrite,
745 } => {
746 let parsed = EncodedPart::from_inline(
747 &self.metrics,
748 self.read_metrics.clone(),
749 desc.clone(),
750 updates,
751 ts_rewrite.as_ref(),
752 );
753 (parsed, None)
754 }
755 };
756 let part = FetchedPart::new(
757 Arc::clone(&self.metrics),
758 part,
759 self.migration.clone(),
760 self.filter.clone(),
761 self.filter_pushdown_audit,
762 self.structured_part_audit,
763 stats,
764 );
765 ShardSourcePart {
766 part,
767 fetch_permit: self.fetch_permit.clone(),
768 }
769 }
770
771 pub fn stats(&self) -> Option<PartStats> {
773 match &self.buf {
774 FetchedBlobBuf::Hollow { part, .. } => part.stats.as_ref().map(|x| x.decode()),
775 FetchedBlobBuf::Inline { .. } => None,
776 }
777 }
778}
779
780#[derive(Debug)]
785pub struct FetchedPart<K: Codec, V: Codec, T, D> {
786 metrics: Arc<Metrics>,
787 ts_filter: FetchBatchFilter<T>,
788 part: EitherOrBoth<
791 ColumnarRecords,
792 (
793 <K::Schema as Schema<K>>::Decoder,
794 <V::Schema as Schema<V>>::Decoder,
795 ),
796 >,
797 timestamps: Int64Array,
798 diffs: Int64Array,
799 migration: PartMigration<K, V>,
800 filter_pushdown_audit: Option<LazyPartStats>,
801 peek_stash: Option<((Result<K, String>, Result<V, String>), T, D)>,
802 part_cursor: usize,
803 key_storage: Option<K::Storage>,
804 val_storage: Option<V::Storage>,
805
806 _phantom: PhantomData<fn() -> D>,
807}
808
809impl<K: Codec, V: Codec, T: Timestamp + Lattice + Codec64, D> FetchedPart<K, V, T, D> {
810 pub(crate) fn new(
811 metrics: Arc<Metrics>,
812 part: EncodedPart<T>,
813 migration: PartMigration<K, V>,
814 ts_filter: FetchBatchFilter<T>,
815 filter_pushdown_audit: bool,
816 part_decode_format: PartDecodeFormat,
817 stats: Option<&LazyPartStats>,
818 ) -> Self {
819 let part_len = u64::cast_from(part.part.updates.len());
820 match &migration {
821 PartMigration::SameSchema { .. } => metrics.schema.migration_count_same.inc(),
822 PartMigration::Schemaless { .. } => {
823 metrics.schema.migration_count_codec.inc();
824 metrics.schema.migration_len_legacy_codec.inc_by(part_len);
825 }
826 PartMigration::Either { .. } => {
827 metrics.schema.migration_count_either.inc();
828 match part_decode_format {
829 PartDecodeFormat::Row {
830 validate_structured: false,
831 } => metrics.schema.migration_len_either_codec.inc_by(part_len),
832 PartDecodeFormat::Row {
833 validate_structured: true,
834 } => {
835 metrics.schema.migration_len_either_codec.inc_by(part_len);
836 metrics.schema.migration_len_either_arrow.inc_by(part_len);
837 }
838 PartDecodeFormat::Arrow => {
839 metrics.schema.migration_len_either_arrow.inc_by(part_len)
840 }
841 }
842 }
843 }
844
845 let filter_pushdown_audit = if filter_pushdown_audit {
846 stats.cloned()
847 } else {
848 None
849 };
850
851 let downcast_structured = |structured: ColumnarRecordsStructuredExt,
852 structured_only: bool| {
853 let key_size_before = ArrayOrd::new(&structured.key).goodbytes();
854
855 let structured = match &migration {
856 PartMigration::SameSchema { .. } => structured,
857 PartMigration::Schemaless { read } if structured_only => {
858 let start = Instant::now();
860 let read_key = data_type::<K>(&*read.key).ok()?;
861 let read_val = data_type::<V>(&*read.val).ok()?;
862 let key_migration = backward_compatible(structured.key.data_type(), &read_key)?;
863 let val_migration = backward_compatible(structured.val.data_type(), &read_val)?;
864 let key = key_migration.migrate(structured.key);
865 let val = val_migration.migrate(structured.val);
866 metrics
867 .schema
868 .migration_migrate_seconds
869 .inc_by(start.elapsed().as_secs_f64());
870 ColumnarRecordsStructuredExt { key, val }
871 }
872 PartMigration::Schemaless { .. } => return None,
873 PartMigration::Either {
874 write: _,
875 read: _,
876 key_migration,
877 val_migration,
878 } => {
879 let start = Instant::now();
880 let key = key_migration.migrate(structured.key);
881 let val = val_migration.migrate(structured.val);
882 metrics
883 .schema
884 .migration_migrate_seconds
885 .inc_by(start.elapsed().as_secs_f64());
886 ColumnarRecordsStructuredExt { key, val }
887 }
888 };
889
890 let read_schema = migration.codec_read();
891 let key = K::Schema::decoder_any(&*read_schema.key, &*structured.key);
892 let val = V::Schema::decoder_any(&*read_schema.val, &*structured.val);
893
894 match &key {
895 Ok(key_decoder) => {
896 let key_size_after = key_decoder.goodbytes();
897 let key_diff = key_size_before.saturating_sub(key_size_after);
898 metrics
899 .pushdown
900 .parts_projection_trimmed_bytes
901 .inc_by(u64::cast_from(key_diff));
902 }
903 Err(e) => {
904 soft_panic_or_log!("failed to create decoder: {e:#?}");
905 }
906 }
907
908 Some((key.ok()?, val.ok()?))
909 };
910
911 let updates = part.normalize(&metrics.columnar);
912 let timestamps = updates.timestamps().clone();
913 let diffs = updates.diffs().clone();
914 let part = match updates {
915 BlobTraceUpdates::Row(records) => EitherOrBoth::Left(records),
917 BlobTraceUpdates::Structured { key_values, .. } => EitherOrBoth::Right(
918 downcast_structured(key_values, true).expect("valid schemas for structured data"),
921 ),
922 BlobTraceUpdates::Both(records, ext) => match part_decode_format {
924 PartDecodeFormat::Row {
925 validate_structured: false,
926 } => EitherOrBoth::Left(records),
927 PartDecodeFormat::Row {
928 validate_structured: true,
929 } => match downcast_structured(ext, false) {
930 Some(decoders) => EitherOrBoth::Both(records, decoders),
931 None => EitherOrBoth::Left(records),
932 },
933 PartDecodeFormat::Arrow => match downcast_structured(ext, false) {
934 Some(decoders) => EitherOrBoth::Right(decoders),
935 None => EitherOrBoth::Left(records),
936 },
937 },
938 };
939
940 FetchedPart {
941 metrics,
942 ts_filter,
943 part,
944 peek_stash: None,
945 timestamps,
946 diffs,
947 migration,
948 filter_pushdown_audit,
949 part_cursor: 0,
950 key_storage: None,
951 val_storage: None,
952 _phantom: PhantomData,
953 }
954 }
955
956 pub fn is_filter_pushdown_audit(&self) -> Option<impl std::fmt::Debug + use<K, V, T, D>> {
962 self.filter_pushdown_audit.clone()
963 }
964}
965
966#[derive(Debug)]
969pub(crate) struct EncodedPart<T> {
970 metrics: ReadMetrics,
971 registered_desc: Description<T>,
972 part: BlobTraceBatchPart<T>,
973 needs_truncation: bool,
974 ts_rewrite: Option<Antichain<T>>,
975}
976
977impl<K, V, T, D> FetchedPart<K, V, T, D>
978where
979 K: Debug + Codec,
980 V: Debug + Codec,
981 T: Timestamp + Lattice + Codec64,
982 D: Semigroup + Codec64 + Send + Sync,
983{
984 pub fn next_with_storage(
989 &mut self,
990 key: &mut Option<K>,
991 val: &mut Option<V>,
992 ) -> Option<((Result<K, String>, Result<V, String>), T, D)> {
993 let mut consolidated = self.peek_stash.take();
994 loop {
995 let next = if self.part_cursor < self.timestamps.len() {
997 let next_idx = self.part_cursor;
998 self.part_cursor += 1;
999 let mut t = T::decode(self.timestamps.values()[next_idx].to_le_bytes());
1002 if !self.ts_filter.filter_ts(&mut t) {
1003 continue;
1004 }
1005 let d = D::decode(self.diffs.values()[next_idx].to_le_bytes());
1006 if d.is_zero() {
1007 continue;
1008 }
1009 let kv = self.decode_kv(next_idx, key, val);
1010 (kv, t, d)
1011 } else {
1012 break;
1013 };
1014
1015 if let Some((kv, t, d)) = &mut consolidated {
1017 let (kv_next, t_next, d_next) = &next;
1018 if kv == kv_next && t == t_next {
1019 d.plus_equals(d_next);
1020 if d.is_zero() {
1021 consolidated = None;
1022 }
1023 } else {
1024 self.peek_stash = Some(next);
1025 break;
1026 }
1027 } else {
1028 consolidated = Some(next);
1029 }
1030 }
1031
1032 let (kv, t, d) = consolidated?;
1033
1034 Some((kv, t, d))
1035 }
1036
1037 fn decode_kv(
1038 &mut self,
1039 index: usize,
1040 key: &mut Option<K>,
1041 val: &mut Option<V>,
1042 ) -> (Result<K, String>, Result<V, String>) {
1043 let decoded = self
1044 .part
1045 .as_ref()
1046 .map_left(|codec| {
1047 let ((ck, cv), _, _) = codec.get(index).expect("valid index");
1048 Self::decode_codec(
1049 &*self.metrics,
1050 self.migration.codec_read(),
1051 ck,
1052 cv,
1053 key,
1054 val,
1055 &mut self.key_storage,
1056 &mut self.val_storage,
1057 )
1058 })
1059 .map_right(|(structured_key, structured_val)| {
1060 self.decode_structured(index, structured_key, structured_val, key, val)
1061 });
1062
1063 match decoded {
1064 EitherOrBoth::Both((k, v), (k_s, v_s)) => {
1065 let is_valid = self
1067 .metrics
1068 .columnar
1069 .arrow()
1070 .key()
1071 .report_valid(|| k_s == k);
1072 if !is_valid {
1073 soft_panic_no_log!("structured key did not match, {k_s:?} != {k:?}");
1074 }
1075 let is_valid = self
1077 .metrics
1078 .columnar
1079 .arrow()
1080 .val()
1081 .report_valid(|| v_s == v);
1082 if !is_valid {
1083 soft_panic_no_log!("structured val did not match, {v_s:?} != {v:?}");
1084 }
1085
1086 (k, v)
1087 }
1088 EitherOrBoth::Left(kv) => kv,
1089 EitherOrBoth::Right(kv) => kv,
1090 }
1091 }
1092
1093 fn decode_codec(
1094 metrics: &Metrics,
1095 read_schemas: &Schemas<K, V>,
1096 key_buf: &[u8],
1097 val_buf: &[u8],
1098 key: &mut Option<K>,
1099 val: &mut Option<V>,
1100 key_storage: &mut Option<K::Storage>,
1101 val_storage: &mut Option<V::Storage>,
1102 ) -> (Result<K, String>, Result<V, String>) {
1103 let k = metrics.codecs.key.decode(|| match key.take() {
1104 Some(mut key) => {
1105 match K::decode_from(&mut key, key_buf, key_storage, &read_schemas.key) {
1106 Ok(()) => Ok(key),
1107 Err(err) => Err(err),
1108 }
1109 }
1110 None => K::decode(key_buf, &read_schemas.key),
1111 });
1112 let v = metrics.codecs.val.decode(|| match val.take() {
1113 Some(mut val) => {
1114 match V::decode_from(&mut val, val_buf, val_storage, &read_schemas.val) {
1115 Ok(()) => Ok(val),
1116 Err(err) => Err(err),
1117 }
1118 }
1119 None => V::decode(val_buf, &read_schemas.val),
1120 });
1121 (k, v)
1122 }
1123
1124 fn decode_structured(
1125 &self,
1126 idx: usize,
1127 keys: &<K::Schema as Schema<K>>::Decoder,
1128 vals: &<V::Schema as Schema<V>>::Decoder,
1129 key: &mut Option<K>,
1130 val: &mut Option<V>,
1131 ) -> (Result<K, String>, Result<V, String>) {
1132 let mut key = key.take().unwrap_or_default();
1133 keys.decode(idx, &mut key);
1134
1135 let mut val = val.take().unwrap_or_default();
1136 vals.decode(idx, &mut val);
1137
1138 (Ok(key), Ok(val))
1139 }
1140}
1141
1142impl<K, V, T, D> Iterator for FetchedPart<K, V, T, D>
1143where
1144 K: Debug + Codec,
1145 V: Debug + Codec,
1146 T: Timestamp + Lattice + Codec64,
1147 D: Semigroup + Codec64 + Send + Sync,
1148{
1149 type Item = ((Result<K, String>, Result<V, String>), T, D);
1150
1151 fn next(&mut self) -> Option<Self::Item> {
1152 self.next_with_storage(&mut None, &mut None)
1153 }
1154
1155 fn size_hint(&self) -> (usize, Option<usize>) {
1156 let max_len = self.timestamps.len();
1158 (0, Some(max_len))
1159 }
1160}
1161
1162impl<T> EncodedPart<T>
1163where
1164 T: Timestamp + Lattice + Codec64,
1165{
1166 pub async fn fetch(
1167 shard_id: &ShardId,
1168 blob: &dyn Blob,
1169 metrics: &Metrics,
1170 shard_metrics: &ShardMetrics,
1171 read_metrics: &ReadMetrics,
1172 registered_desc: &Description<T>,
1173 part: &BatchPart<T>,
1174 ) -> Result<Self, BlobKey> {
1175 match part {
1176 BatchPart::Hollow(x) => {
1177 fetch_batch_part(
1178 shard_id,
1179 blob,
1180 metrics,
1181 shard_metrics,
1182 read_metrics,
1183 registered_desc,
1184 x,
1185 )
1186 .await
1187 }
1188 BatchPart::Inline {
1189 updates,
1190 ts_rewrite,
1191 ..
1192 } => Ok(EncodedPart::from_inline(
1193 metrics,
1194 read_metrics.clone(),
1195 registered_desc.clone(),
1196 updates,
1197 ts_rewrite.as_ref(),
1198 )),
1199 }
1200 }
1201
1202 pub(crate) fn from_inline(
1203 metrics: &Metrics,
1204 read_metrics: ReadMetrics,
1205 desc: Description<T>,
1206 x: &LazyInlineBatchPart,
1207 ts_rewrite: Option<&Antichain<T>>,
1208 ) -> Self {
1209 let parsed = x.decode(&metrics.columnar).expect("valid inline part");
1210 Self::new(read_metrics, desc, "inline", ts_rewrite, parsed)
1211 }
1212
1213 pub(crate) fn from_hollow(
1214 metrics: ReadMetrics,
1215 registered_desc: Description<T>,
1216 part: &HollowBatchPart<T>,
1217 parsed: BlobTraceBatchPart<T>,
1218 ) -> Self {
1219 Self::new(
1220 metrics,
1221 registered_desc,
1222 &part.key.0,
1223 part.ts_rewrite.as_ref(),
1224 parsed,
1225 )
1226 }
1227
1228 pub(crate) fn new(
1229 metrics: ReadMetrics,
1230 registered_desc: Description<T>,
1231 printable_name: &str,
1232 ts_rewrite: Option<&Antichain<T>>,
1233 parsed: BlobTraceBatchPart<T>,
1234 ) -> Self {
1235 let inline_desc = &parsed.desc;
1247 let needs_truncation = inline_desc.lower() != registered_desc.lower()
1248 || inline_desc.upper() != registered_desc.upper();
1249 if needs_truncation {
1250 assert!(
1251 PartialOrder::less_equal(inline_desc.lower(), registered_desc.lower()),
1252 "key={} inline={:?} registered={:?}",
1253 printable_name,
1254 inline_desc,
1255 registered_desc
1256 );
1257 if ts_rewrite.is_none() {
1258 assert!(
1263 PartialOrder::less_equal(registered_desc.upper(), inline_desc.upper()),
1264 "key={} inline={:?} registered={:?}",
1265 printable_name,
1266 inline_desc,
1267 registered_desc
1268 );
1269 }
1270 assert_eq!(
1275 inline_desc.since(),
1276 &Antichain::from_elem(T::minimum()),
1277 "key={} inline={:?} registered={:?}",
1278 printable_name,
1279 inline_desc,
1280 registered_desc
1281 );
1282 } else {
1283 assert_eq!(
1284 inline_desc, ®istered_desc,
1285 "key={} inline={:?} registered={:?}",
1286 printable_name, inline_desc, registered_desc
1287 );
1288 }
1289
1290 EncodedPart {
1291 metrics,
1292 registered_desc,
1293 part: parsed,
1294 needs_truncation,
1295 ts_rewrite: ts_rewrite.cloned(),
1296 }
1297 }
1298
1299 pub(crate) fn maybe_unconsolidated(&self) -> bool {
1300 self.part.desc.since().borrow() == AntichainRef::new(&[T::minimum()])
1303 }
1304
1305 pub(crate) fn normalize(&self, metrics: &ColumnarMetrics) -> BlobTraceUpdates {
1307 let updates = self.part.updates.clone();
1308 if !self.needs_truncation && self.ts_rewrite.is_none() {
1309 return updates;
1310 }
1311
1312 let mut codec = updates
1313 .records()
1314 .map(|r| (r.keys().clone(), r.vals().clone()));
1315 let mut structured = updates.structured().cloned();
1316 let mut timestamps = updates.timestamps().clone();
1317 let mut diffs = updates.diffs().clone();
1318
1319 if let Some(rewrite) = self.ts_rewrite.as_ref() {
1320 timestamps = arrow::compute::unary(×tamps, |i: i64| {
1321 let mut t = T::decode(i.to_le_bytes());
1322 t.advance_by(rewrite.borrow());
1323 i64::from_le_bytes(T::encode(&t))
1324 });
1325 }
1326
1327 let reallocated = if self.needs_truncation {
1328 let filter = BooleanArray::from_unary(×tamps, |i| {
1329 let t = T::decode(i.to_le_bytes());
1330 let truncate_t = {
1331 !self.registered_desc.lower().less_equal(&t)
1332 || self.registered_desc.upper().less_equal(&t)
1333 };
1334 !truncate_t
1335 });
1336 if filter.false_count() == 0 {
1337 false
1339 } else {
1340 let filter = FilterBuilder::new(&filter).optimize().build();
1341 let do_filter = |array: &dyn Array| filter.filter(array).expect("valid filter len");
1342 if let Some((keys, vals)) = codec {
1343 codec = Some((
1344 realloc_array(do_filter(&keys).as_binary(), metrics),
1345 realloc_array(do_filter(&vals).as_binary(), metrics),
1346 ));
1347 }
1348 if let Some(ext) = structured {
1349 structured = Some(ColumnarRecordsStructuredExt {
1350 key: realloc_any(do_filter(&*ext.key), metrics),
1351 val: realloc_any(do_filter(&*ext.val), metrics),
1352 });
1353 }
1354 timestamps = realloc_array(do_filter(×tamps).as_primitive(), metrics);
1355 diffs = realloc_array(do_filter(&diffs).as_primitive(), metrics);
1356 true
1357 }
1358 } else {
1359 false
1360 };
1361
1362 if self.ts_rewrite.is_some() && !reallocated {
1363 timestamps = realloc_array(×tamps, metrics);
1364 }
1365
1366 if self.ts_rewrite.is_some() {
1367 self.metrics
1368 .ts_rewrite
1369 .inc_by(u64::cast_from(timestamps.len()));
1370 }
1371
1372 match (codec, structured) {
1373 (Some((key, value)), None) => {
1374 BlobTraceUpdates::Row(ColumnarRecords::new(key, value, timestamps, diffs))
1375 }
1376 (Some((key, value)), Some(ext)) => {
1377 BlobTraceUpdates::Both(ColumnarRecords::new(key, value, timestamps, diffs), ext)
1378 }
1379 (None, Some(ext)) => BlobTraceUpdates::Structured {
1380 key_values: ext,
1381 timestamps,
1382 diffs,
1383 },
1384 (None, None) => unreachable!(),
1385 }
1386 }
1387}
1388
1389#[derive(Debug, Serialize, Deserialize, Clone)]
1398pub struct SerdeLeasedBatchPart {
1399 encoded_size_bytes: usize,
1401 proto: LazyProto<ProtoLeasedBatchPart>,
1404}
1405
1406impl SerdeLeasedBatchPart {
1407 pub fn encoded_size_bytes(&self) -> usize {
1409 self.encoded_size_bytes
1410 }
1411
1412 pub(crate) fn decode<T: Timestamp + Codec64>(
1413 &self,
1414 metrics: Arc<Metrics>,
1415 ) -> LeasedBatchPart<T> {
1416 let proto = self.proto.decode().expect("valid leased batch part");
1417 (proto, metrics)
1418 .into_rust()
1419 .expect("valid leased batch part")
1420 }
1421}
1422
1423impl<T: Timestamp + Codec64> RustType<(ProtoLeasedBatchPart, Arc<Metrics>)> for LeasedBatchPart<T> {
1427 fn into_proto(&self) -> (ProtoLeasedBatchPart, Arc<Metrics>) {
1428 let proto = ProtoLeasedBatchPart {
1429 shard_id: self.shard_id.into_proto(),
1430 filter: Some(self.filter.into_proto()),
1431 desc: Some(self.desc.into_proto()),
1432 part: Some(self.part.into_proto()),
1433 lease: Some(ProtoLease {
1434 reader_id: self.reader_id.into_proto(),
1435 seqno: Some(self.leased_seqno.into_proto()),
1436 }),
1437 filter_pushdown_audit: self.filter_pushdown_audit,
1438 };
1439 (proto, Arc::clone(&self.metrics))
1440 }
1441
1442 fn from_proto(proto: (ProtoLeasedBatchPart, Arc<Metrics>)) -> Result<Self, TryFromProtoError> {
1443 let (proto, metrics) = proto;
1444 let lease = proto
1445 .lease
1446 .ok_or_else(|| TryFromProtoError::missing_field("ProtoLeasedBatchPart::lease"))?;
1447 Ok(LeasedBatchPart {
1448 metrics,
1449 shard_id: proto.shard_id.into_rust()?,
1450 filter: proto
1451 .filter
1452 .into_rust_if_some("ProtoLeasedBatchPart::filter")?,
1453 desc: proto.desc.into_rust_if_some("ProtoLeasedBatchPart::desc")?,
1454 part: proto.part.into_rust_if_some("ProtoLeasedBatchPart::part")?,
1455 reader_id: lease.reader_id.into_rust()?,
1456 leased_seqno: lease.seqno.into_rust_if_some("ProtoLease::seqno")?,
1457 lease: None,
1458 filter_pushdown_audit: proto.filter_pushdown_audit,
1459 })
1460 }
1461}
1462
1463#[derive(Debug, Copy, Clone)]
1467pub enum PartDecodeFormat {
1468 Row {
1470 validate_structured: bool,
1472 },
1473 Arrow,
1475}
1476
1477impl PartDecodeFormat {
1478 pub const fn default() -> Self {
1480 PartDecodeFormat::Arrow
1481 }
1482
1483 pub fn from_str(s: &str) -> Self {
1486 match s {
1487 "row" => PartDecodeFormat::Row {
1488 validate_structured: false,
1489 },
1490 "row_with_validate" => PartDecodeFormat::Row {
1491 validate_structured: true,
1492 },
1493 "arrow" => PartDecodeFormat::Arrow,
1494 x => {
1495 let default = PartDecodeFormat::default();
1496 soft_panic_or_log!("Invalid part decode format: '{x}', falling back to {default}");
1497 default
1498 }
1499 }
1500 }
1501
1502 pub const fn as_str(&self) -> &'static str {
1504 match self {
1505 PartDecodeFormat::Row {
1506 validate_structured: false,
1507 } => "row",
1508 PartDecodeFormat::Row {
1509 validate_structured: true,
1510 } => "row_with_validate",
1511 PartDecodeFormat::Arrow => "arrow",
1512 }
1513 }
1514}
1515
1516impl fmt::Display for PartDecodeFormat {
1517 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1518 f.write_str(self.as_str())
1519 }
1520}
1521
1522#[mz_ore::test]
1523fn client_exchange_data() {
1524 fn is_exchange_data<T: timely::ExchangeData>() {}
1528 is_exchange_data::<SerdeLeasedBatchPart>();
1529 is_exchange_data::<SerdeLeasedBatchPart>();
1530}