Skip to main content

mz_persist_client/
fetch.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Fetching batches of data from persist's backing store
11
12use std::fmt::{self, Debug};
13use std::marker::PhantomData;
14use std::sync::Arc;
15use std::time::Instant;
16
17use anyhow::anyhow;
18use arrow::array::{Array, ArrayRef, AsArray, BooleanArray, Int64Array};
19use arrow::compute::FilterBuilder;
20use differential_dataflow::difference::Monoid;
21use differential_dataflow::lattice::Lattice;
22use differential_dataflow::trace::Description;
23use itertools::EitherOrBoth;
24use mz_dyncfg::{Config, ConfigSet, ConfigValHandle};
25use mz_ore::bytes::SegmentedBytes;
26use mz_ore::cast::CastFrom;
27use mz_ore::{soft_assert_or_log, soft_panic_no_log, soft_panic_or_log};
28use mz_persist::indexed::columnar::arrow::{realloc_any, realloc_array};
29use mz_persist::indexed::columnar::{ColumnarRecords, ColumnarRecordsStructuredExt};
30use mz_persist::indexed::encoding::{BlobTraceBatchPart, BlobTraceUpdates};
31use mz_persist::location::{Blob, SeqNo};
32use mz_persist::metrics::ColumnarMetrics;
33use mz_persist_types::arrow::ArrayOrd;
34use mz_persist_types::columnar::{ColumnDecoder, Schema, data_type};
35use mz_persist_types::part::Codec64Mut;
36use mz_persist_types::schema::backward_compatible;
37use mz_persist_types::stats::PartStats;
38use mz_persist_types::{Codec, Codec64};
39use mz_proto::RustType;
40use serde::{Deserialize, Serialize};
41use timely::PartialOrder;
42use timely::progress::frontier::AntichainRef;
43use timely::progress::{Antichain, Timestamp};
44use tracing::{Instrument, debug, debug_span, trace_span};
45
46use crate::ShardId;
47use crate::cfg::PersistConfig;
48use crate::error::InvalidUsage;
49use crate::internal::encoding::{LazyInlineBatchPart, LazyPartStats, LazyProto, Schemas};
50use crate::internal::machine::retry_external;
51use crate::internal::metrics::{Metrics, MetricsPermits, ReadMetrics, ShardMetrics};
52use crate::internal::paths::BlobKey;
53use crate::internal::state::{
54    BatchPart, HollowBatchPart, ProtoHollowBatchPart, ProtoInlineBatchPart,
55};
56use crate::read::LeasedReaderId;
57use crate::schema::{PartMigration, SchemaCache};
58
59pub(crate) const FETCH_SEMAPHORE_COST_ADJUSTMENT: Config<f64> = Config::new(
60    "persist_fetch_semaphore_cost_adjustment",
61    // We use `encoded_size_bytes` as the number of permits, but the parsed size
62    // is larger than the encoded one, so adjust it. This default value is from
63    // eyeballing graphs in experiments that were run on tpch loadgen data.
64    1.2,
65    "\
66    An adjustment multiplied by encoded_size_bytes to approximate an upper \
67    bound on the size in lgalloc, which includes the decoded version.",
68);
69
70pub(crate) const FETCH_SEMAPHORE_PERMIT_ADJUSTMENT: Config<f64> = Config::new(
71    "persist_fetch_semaphore_permit_adjustment",
72    1.0,
73    "\
74    A limit on the number of outstanding persist bytes being fetched and \
75    parsed, expressed as a multiplier of the process's memory limit. This data \
76    all spills to lgalloc, so values > 1.0 are safe. Only applied to cc \
77    replicas.",
78);
79
80pub(crate) const PART_DECODE_FORMAT: Config<&'static str> = Config::new(
81    "persist_part_decode_format",
82    PartDecodeFormat::default().as_str(),
83    "\
84    Format we'll use to decode a Persist Part, either 'row', \
85    'row_with_validate', or 'arrow' (Materialize).",
86);
87
88pub(crate) const OPTIMIZE_IGNORED_DATA_FETCH: Config<bool> = Config::new(
89    "persist_optimize_ignored_data_fetch",
90    true,
91    "CYA to allow opt-out of a performance optimization to skip fetching ignored data",
92);
93
94pub(crate) const VALIDATE_PART_BOUNDS_ON_READ: Config<bool> = Config::new(
95    "persist_validate_part_bounds_on_read",
96    false,
97    "Validate the part lower <= the batch lower and the part upper <= batch upper,\
98    for the batch containing that part",
99);
100
101#[derive(Debug, Clone)]
102pub(crate) struct FetchConfig {
103    pub(crate) validate_bounds_on_read: bool,
104}
105
106impl FetchConfig {
107    pub fn from_persist_config(cfg: &PersistConfig) -> Self {
108        Self {
109            validate_bounds_on_read: VALIDATE_PART_BOUNDS_ON_READ.get(cfg),
110        }
111    }
112}
113
114#[derive(Debug, Clone)]
115pub(crate) struct BatchFetcherConfig {
116    pub(crate) part_decode_format: ConfigValHandle<String>,
117    pub(crate) fetch_config: FetchConfig,
118}
119
120impl BatchFetcherConfig {
121    pub fn new(value: &PersistConfig) -> Self {
122        Self {
123            part_decode_format: PART_DECODE_FORMAT.handle(value),
124            fetch_config: FetchConfig::from_persist_config(value),
125        }
126    }
127
128    pub fn part_decode_format(&self) -> PartDecodeFormat {
129        PartDecodeFormat::from_str(self.part_decode_format.get().as_str())
130    }
131}
132
133/// Capable of fetching [`LeasedBatchPart`] while not holding any capabilities.
134#[derive(Debug)]
135pub struct BatchFetcher<K, V, T, D>
136where
137    T: Timestamp + Lattice + Codec64,
138    // These are only here so we can use them in the auto-expiring `Drop` impl.
139    K: Debug + Codec,
140    V: Debug + Codec,
141    D: Monoid + Codec64 + Send + Sync,
142{
143    pub(crate) cfg: BatchFetcherConfig,
144    pub(crate) blob: Arc<dyn Blob>,
145    pub(crate) metrics: Arc<Metrics>,
146    pub(crate) shard_metrics: Arc<ShardMetrics>,
147    pub(crate) shard_id: ShardId,
148    pub(crate) read_schemas: Schemas<K, V>,
149    pub(crate) schema_cache: SchemaCache<K, V, T, D>,
150    pub(crate) is_transient: bool,
151
152    // Ensures that `BatchFetcher` is of the same type as the `ReadHandle` it's
153    // derived from.
154    pub(crate) _phantom: PhantomData<fn() -> (K, V, T, D)>,
155}
156
157impl<K, V, T, D> BatchFetcher<K, V, T, D>
158where
159    K: Debug + Codec,
160    V: Debug + Codec,
161    T: Timestamp + Lattice + Codec64 + Sync,
162    D: Monoid + Codec64 + Send + Sync,
163{
164    /// Trade in an exchange-able [LeasedBatchPart] for the data it represents.
165    ///
166    /// Note to check the `LeasedBatchPart` documentation for how to handle the
167    /// returned value.
168    pub async fn fetch_leased_part(
169        &mut self,
170        part: ExchangeableBatchPart<T>,
171    ) -> Result<Result<FetchedBlob<K, V, T, D>, BlobKey>, InvalidUsage<T>> {
172        let ExchangeableBatchPart {
173            shard_id,
174            encoded_size_bytes: _,
175            desc,
176            filter,
177            filter_pushdown_audit,
178            part,
179        } = part;
180        let part: BatchPart<T> = part.decode_to().expect("valid part");
181        if shard_id != self.shard_id {
182            return Err(InvalidUsage::BatchNotFromThisShard {
183                batch_shard: shard_id,
184                handle_shard: self.shard_id.clone(),
185            });
186        }
187
188        let migration =
189            PartMigration::new(&part, self.read_schemas.clone(), &mut self.schema_cache)
190                .await
191                .unwrap_or_else(|read_schemas| {
192                    panic!(
193                        "could not decode part {:?} with schema: {:?}",
194                        part.schema_id(),
195                        read_schemas
196                    )
197                });
198
199        let (buf, fetch_permit) = match &part {
200            BatchPart::Hollow(x) => {
201                let fetch_permit = self
202                    .metrics
203                    .semaphore
204                    .acquire_fetch_permits(x.encoded_size_bytes)
205                    .await;
206                let read_metrics = if self.is_transient {
207                    &self.metrics.read.unindexed
208                } else {
209                    &self.metrics.read.batch_fetcher
210                };
211                let buf = fetch_batch_part_blob(
212                    &shard_id,
213                    self.blob.as_ref(),
214                    &self.metrics,
215                    &self.shard_metrics,
216                    read_metrics,
217                    x,
218                )
219                .await;
220                let buf = match buf {
221                    Ok(buf) => buf,
222                    Err(key) => return Ok(Err(key)),
223                };
224                let buf = FetchedBlobBuf::Hollow {
225                    buf,
226                    part: x.clone(),
227                };
228                (buf, Some(Arc::new(fetch_permit)))
229            }
230            BatchPart::Inline {
231                updates,
232                ts_rewrite,
233                ..
234            } => {
235                let buf = FetchedBlobBuf::Inline {
236                    desc: desc.clone(),
237                    updates: updates.clone(),
238                    ts_rewrite: ts_rewrite.clone(),
239                };
240                (buf, None)
241            }
242        };
243        let fetched_blob = FetchedBlob {
244            metrics: Arc::clone(&self.metrics),
245            read_metrics: self.metrics.read.batch_fetcher.clone(),
246            buf,
247            registered_desc: desc.clone(),
248            migration,
249            filter: filter.clone(),
250            filter_pushdown_audit,
251            structured_part_audit: self.cfg.part_decode_format(),
252            fetch_permit,
253            _phantom: PhantomData,
254            fetch_config: self.cfg.fetch_config.clone(),
255        };
256        Ok(Ok(fetched_blob))
257    }
258}
259
260#[derive(Debug, Clone, Serialize, Deserialize)]
261pub(crate) enum FetchBatchFilter<T> {
262    Snapshot {
263        as_of: Antichain<T>,
264    },
265    Listen {
266        as_of: Antichain<T>,
267        lower: Antichain<T>,
268    },
269    Compaction {
270        since: Antichain<T>,
271    },
272}
273
274impl<T: Timestamp + Lattice> FetchBatchFilter<T> {
275    pub(crate) fn filter_ts(&self, t: &mut T) -> bool {
276        match self {
277            FetchBatchFilter::Snapshot { as_of } => {
278                // This time is covered by a listen
279                if as_of.less_than(t) {
280                    return false;
281                }
282                t.advance_by(as_of.borrow());
283                true
284            }
285            FetchBatchFilter::Listen { as_of, lower } => {
286                // This time is covered by a snapshot
287                if !as_of.less_than(t) {
288                    return false;
289                }
290
291                // Because of compaction, the next batch we get might also
292                // contain updates we've already emitted. For example, we
293                // emitted `[1, 2)` and then compaction combined that batch with
294                // a `[2, 3)` batch into a new `[1, 3)` batch. If this happens,
295                // we just need to filter out anything < the frontier. This
296                // frontier was the upper of the last batch (and thus exclusive)
297                // so for the == case, we still emit.
298                if !lower.less_equal(t) {
299                    return false;
300                }
301                true
302            }
303            FetchBatchFilter::Compaction { since } => {
304                t.advance_by(since.borrow());
305                true
306            }
307        }
308    }
309}
310
311/// Trade in an exchange-able [LeasedBatchPart] for the data it represents.
312///
313/// Note to check the `LeasedBatchPart` documentation for how to handle the
314/// returned value.
315pub(crate) async fn fetch_leased_part<K, V, T, D>(
316    cfg: &PersistConfig,
317    part: &LeasedBatchPart<T>,
318    blob: &dyn Blob,
319    metrics: Arc<Metrics>,
320    read_metrics: &ReadMetrics,
321    shard_metrics: &ShardMetrics,
322    reader_id: &LeasedReaderId,
323    read_schemas: Schemas<K, V>,
324    schema_cache: &mut SchemaCache<K, V, T, D>,
325) -> FetchedPart<K, V, T, D>
326where
327    K: Debug + Codec,
328    V: Debug + Codec,
329    T: Timestamp + Lattice + Codec64 + Sync,
330    D: Monoid + Codec64 + Send + Sync,
331{
332    let fetch_config = FetchConfig::from_persist_config(cfg);
333    let encoded_part = EncodedPart::fetch(
334        &fetch_config,
335        &part.shard_id,
336        blob,
337        &metrics,
338        shard_metrics,
339        read_metrics,
340        &part.desc,
341        &part.part,
342    )
343    .await
344    .unwrap_or_else(|blob_key| {
345        // Ideally, readers should never encounter a missing blob. They place a seqno
346        // hold as they consume their snapshot/listen, preventing any blobs they need
347        // from being deleted by garbage collection, and all blob implementations are
348        // linearizable so there should be no possibility of stale reads.
349        //
350        // If we do have a bug and a reader does encounter a missing blob, the state
351        // cannot be recovered, and our best option is to panic and retry the whole
352        // process.
353        panic!("{} could not fetch batch part: {}", reader_id, blob_key)
354    });
355    let part_cfg = BatchFetcherConfig::new(cfg);
356    let migration = PartMigration::new(&part.part, read_schemas, schema_cache)
357        .await
358        .unwrap_or_else(|read_schemas| {
359            panic!(
360                "could not decode part {:?} with schema: {:?}",
361                part.part.schema_id(),
362                read_schemas
363            )
364        });
365    FetchedPart::new(
366        metrics,
367        encoded_part,
368        migration,
369        part.filter.clone(),
370        part.filter_pushdown_audit,
371        part_cfg.part_decode_format(),
372        part.part.stats(),
373    )
374}
375
376pub(crate) async fn fetch_batch_part_blob<T>(
377    shard_id: &ShardId,
378    blob: &dyn Blob,
379    metrics: &Metrics,
380    shard_metrics: &ShardMetrics,
381    read_metrics: &ReadMetrics,
382    part: &HollowBatchPart<T>,
383) -> Result<SegmentedBytes, BlobKey> {
384    let now = Instant::now();
385    let get_span = debug_span!("fetch_batch::get");
386    let blob_key = part.key.complete(shard_id);
387    let value = retry_external(&metrics.retries.external.fetch_batch_get, || async {
388        shard_metrics.blob_gets.inc();
389        blob.get(&blob_key).await
390    })
391    .instrument(get_span.clone())
392    .await
393    .ok_or(blob_key)?;
394
395    drop(get_span);
396
397    read_metrics.part_count.inc();
398    read_metrics.part_bytes.inc_by(u64::cast_from(value.len()));
399    read_metrics.seconds.inc_by(now.elapsed().as_secs_f64());
400
401    Ok(value)
402}
403
404pub(crate) fn decode_batch_part_blob<T>(
405    cfg: &FetchConfig,
406    metrics: &Metrics,
407    read_metrics: &ReadMetrics,
408    registered_desc: Description<T>,
409    part: &HollowBatchPart<T>,
410    buf: &SegmentedBytes,
411) -> EncodedPart<T>
412where
413    T: Timestamp + Lattice + Codec64,
414{
415    trace_span!("fetch_batch::decode").in_scope(|| {
416        let parsed = metrics
417            .codecs
418            .batch
419            .decode(|| BlobTraceBatchPart::decode(buf, &metrics.columnar))
420            .map_err(|err| anyhow!("couldn't decode batch at key {}: {}", part.key, err))
421            // We received a State that we couldn't decode. This could happen if
422            // persist messes up backward/forward compatibility, if the durable
423            // data was corrupted, or if operations messes up deployment. In any
424            // case, fail loudly.
425            .expect("internal error: invalid encoded state");
426        read_metrics
427            .part_goodbytes
428            .inc_by(u64::cast_from(parsed.updates.goodbytes()));
429        EncodedPart::from_hollow(cfg, read_metrics.clone(), registered_desc, part, parsed)
430    })
431}
432
433pub(crate) async fn fetch_batch_part<T>(
434    cfg: &FetchConfig,
435    shard_id: &ShardId,
436    blob: &dyn Blob,
437    metrics: &Metrics,
438    shard_metrics: &ShardMetrics,
439    read_metrics: &ReadMetrics,
440    registered_desc: &Description<T>,
441    part: &HollowBatchPart<T>,
442) -> Result<EncodedPart<T>, BlobKey>
443where
444    T: Timestamp + Lattice + Codec64,
445{
446    let buf =
447        fetch_batch_part_blob(shard_id, blob, metrics, shard_metrics, read_metrics, part).await?;
448    let part = decode_batch_part_blob(
449        cfg,
450        metrics,
451        read_metrics,
452        registered_desc.clone(),
453        part,
454        &buf,
455    );
456    Ok(part)
457}
458
459/// This represents the lease of a seqno. It's generally paired with some external state,
460/// like a hollow part: holding this lease indicates that we may still want to fetch that part,
461/// and should hold back GC to keep it around.
462///
463/// Generally the state and lease are bundled together, as in [LeasedBatchPart]... but sometimes
464/// it's necessary to handle them separately, so this struct is exposed as well. Handle with care.
465#[derive(Clone, Debug)]
466pub struct Lease(Arc<SeqNo>);
467
468impl Lease {
469    /// Creates a new [Lease] that holds the given [SeqNo].
470    pub fn new(seqno: SeqNo) -> Self {
471        Self(Arc::new(seqno))
472    }
473
474    /// Returns the inner [SeqNo] of this [Lease].
475    pub fn seqno(&self) -> SeqNo {
476        *self.0
477    }
478
479    /// Returns the number of live copies of this lease, including this one.
480    pub fn count(&self) -> usize {
481        Arc::strong_count(&self.0)
482    }
483}
484
485/// A token representing one fetch-able batch part.
486///
487/// It is tradeable via `crate::fetch::fetch_batch` for the resulting data
488/// stored in the part.
489///
490/// # Exchange
491///
492/// You can exchange `LeasedBatchPart`:
493/// - If `leased_seqno.is_none()`
494/// - By converting it to [`ExchangeableBatchPart`] through
495///   `Self::into_exchangeable_part`. [`ExchangeableBatchPart`] is exchangeable,
496///   including over the network.
497///
498/// n.b. `Self::into_exchangeable_part` is known to be equivalent to
499/// `SerdeLeasedBatchPart::from(self)`, but we want the additional warning message to
500/// be visible and sufficiently scary.
501///
502/// # Panics
503/// `LeasedBatchPart` panics when dropped unless a very strict set of invariants are
504/// held:
505///
506/// `LeasedBatchPart` may only be dropped if it:
507/// - Does not have a leased `SeqNo (i.e. `self.leased_seqno.is_none()`)
508///
509/// In any other circumstance, dropping `LeasedBatchPart` panics.
510#[derive(Debug)]
511pub struct LeasedBatchPart<T> {
512    pub(crate) metrics: Arc<Metrics>,
513    pub(crate) shard_id: ShardId,
514    pub(crate) filter: FetchBatchFilter<T>,
515    pub(crate) desc: Description<T>,
516    pub(crate) part: BatchPart<T>,
517    /// The lease that prevents this part from being GCed. Code should ensure that this lease
518    /// lives as long as the part is needed.
519    pub(crate) lease: Lease,
520    pub(crate) filter_pushdown_audit: bool,
521}
522
523impl<T> LeasedBatchPart<T>
524where
525    T: Timestamp + Codec64,
526{
527    /// Takes `self` into a [`ExchangeableBatchPart`], which allows `self` to be
528    /// exchanged (potentially across the network).
529    ///
530    /// !!!WARNING!!!
531    ///
532    /// This method also returns the [Lease] associated with the given part, since
533    /// that can't travel across process boundaries. The caller is responsible for
534    /// ensuring that the lease is held for as long as the batch part may be in use:
535    /// dropping it too early may cause a fetch to fail.
536    pub(crate) fn into_exchangeable_part(self) -> (ExchangeableBatchPart<T>, Lease) {
537        // If `x` has a lease, we've effectively transferred it to `r`.
538        let lease = self.lease.clone();
539        let part = ExchangeableBatchPart {
540            shard_id: self.shard_id,
541            encoded_size_bytes: self.part.encoded_size_bytes(),
542            desc: self.desc.clone(),
543            filter: self.filter.clone(),
544            part: LazyProto::from(&self.part.into_proto()),
545            filter_pushdown_audit: self.filter_pushdown_audit,
546        };
547        (part, lease)
548    }
549
550    /// The encoded size of this part in bytes
551    pub fn encoded_size_bytes(&self) -> usize {
552        self.part.encoded_size_bytes()
553    }
554
555    /// The filter has indicated we don't need this part, we can verify the
556    /// ongoing end-to-end correctness of corner cases via "audit". This means
557    /// we fetch the part like normal and if the MFP keeps anything from it,
558    /// then something has gone horribly wrong.
559    pub fn request_filter_pushdown_audit(&mut self) {
560        self.filter_pushdown_audit = true;
561    }
562
563    /// Returns the pushdown stats for this part.
564    pub fn stats(&self) -> Option<PartStats> {
565        self.part.stats().map(|x| x.decode())
566    }
567
568    /// Apply any relevant projection pushdown optimizations, assuming that the data in the part
569    /// is equivalent to the provided key and value.
570    pub fn maybe_optimize(&mut self, cfg: &ConfigSet, key: ArrayRef, val: ArrayRef) {
571        assert_eq!(key.len(), 1, "expect a single-row key array");
572        assert_eq!(val.len(), 1, "expect a single-row val array");
573        let as_of = match &self.filter {
574            FetchBatchFilter::Snapshot { as_of } => as_of,
575            FetchBatchFilter::Listen { .. } | FetchBatchFilter::Compaction { .. } => return,
576        };
577        if !OPTIMIZE_IGNORED_DATA_FETCH.get(cfg) {
578            return;
579        }
580        let (diffs_sum, _stats) = match &self.part {
581            BatchPart::Hollow(x) => (x.diffs_sum, x.stats.as_ref()),
582            BatchPart::Inline { .. } => return,
583        };
584        debug!(
585            "try_optimize_ignored_data_fetch diffs_sum={:?} as_of={:?} lower={:?} upper={:?}",
586            // This is only used for debugging, so hack to assume that D is i64.
587            diffs_sum.map(i64::decode),
588            as_of.elements(),
589            self.desc.lower().elements(),
590            self.desc.upper().elements()
591        );
592        let as_of = match &as_of.elements() {
593            &[as_of] => as_of,
594            _ => return,
595        };
596        let eligible = self.desc.upper().less_equal(as_of) && self.desc.since().less_equal(as_of);
597        if !eligible {
598            return;
599        }
600        let Some(diffs_sum) = diffs_sum else {
601            return;
602        };
603
604        debug!(
605            "try_optimize_ignored_data_fetch faked {:?} diffs at ts {:?} skipping fetch of {} bytes",
606            // This is only used for debugging, so hack to assume that D is i64.
607            i64::decode(diffs_sum),
608            as_of,
609            self.part.encoded_size_bytes(),
610        );
611        self.metrics.pushdown.parts_faked_count.inc();
612        self.metrics
613            .pushdown
614            .parts_faked_bytes
615            .inc_by(u64::cast_from(self.part.encoded_size_bytes()));
616        let timestamps = {
617            let mut col = Codec64Mut::with_capacity(1);
618            col.push(as_of);
619            col.finish()
620        };
621        let diffs = {
622            let mut col = Codec64Mut::with_capacity(1);
623            col.push_raw(diffs_sum);
624            col.finish()
625        };
626        let updates = BlobTraceUpdates::Structured {
627            key_values: ColumnarRecordsStructuredExt { key, val },
628            timestamps,
629            diffs,
630        };
631        let faked_data = LazyInlineBatchPart::from(&ProtoInlineBatchPart {
632            desc: Some(self.desc.into_proto()),
633            index: 0,
634            updates: Some(updates.into_proto()),
635        });
636        self.part = BatchPart::Inline {
637            updates: faked_data,
638            ts_rewrite: None,
639            schema_id: None,
640            deprecated_schema_id: None,
641        };
642    }
643}
644
645impl<T> Drop for LeasedBatchPart<T> {
646    /// For details, see [`LeasedBatchPart`].
647    fn drop(&mut self) {
648        self.metrics.lease.dropped_part.inc()
649    }
650}
651
652/// A [Blob] object that has been fetched, but not at all decoded.
653///
654/// In contrast to [FetchedPart], this representation hasn't yet done parquet
655/// decoding.
656#[derive(Debug)]
657pub struct FetchedBlob<K: Codec, V: Codec, T, D> {
658    metrics: Arc<Metrics>,
659    read_metrics: ReadMetrics,
660    buf: FetchedBlobBuf<T>,
661    registered_desc: Description<T>,
662    migration: PartMigration<K, V>,
663    filter: FetchBatchFilter<T>,
664    filter_pushdown_audit: bool,
665    structured_part_audit: PartDecodeFormat,
666    fetch_permit: Option<Arc<MetricsPermits>>,
667    fetch_config: FetchConfig,
668    _phantom: PhantomData<fn() -> D>,
669}
670
671#[derive(Debug, Clone)]
672enum FetchedBlobBuf<T> {
673    Hollow {
674        buf: SegmentedBytes,
675        part: HollowBatchPart<T>,
676    },
677    Inline {
678        desc: Description<T>,
679        updates: LazyInlineBatchPart,
680        ts_rewrite: Option<Antichain<T>>,
681    },
682}
683
684impl<K: Codec, V: Codec, T: Clone, D> Clone for FetchedBlob<K, V, T, D> {
685    fn clone(&self) -> Self {
686        Self {
687            metrics: Arc::clone(&self.metrics),
688            read_metrics: self.read_metrics.clone(),
689            buf: self.buf.clone(),
690            registered_desc: self.registered_desc.clone(),
691            migration: self.migration.clone(),
692            filter: self.filter.clone(),
693            filter_pushdown_audit: self.filter_pushdown_audit.clone(),
694            fetch_permit: self.fetch_permit.clone(),
695            structured_part_audit: self.structured_part_audit.clone(),
696            fetch_config: self.fetch_config.clone(),
697            _phantom: self._phantom.clone(),
698        }
699    }
700}
701
702/// [FetchedPart] but with an accompanying permit from the fetch mem/disk
703/// semaphore.
704pub struct ShardSourcePart<K: Codec, V: Codec, T, D> {
705    /// The underlying [FetchedPart].
706    pub part: FetchedPart<K, V, T, D>,
707    fetch_permit: Option<Arc<MetricsPermits>>,
708}
709
710impl<K, V, T: Debug, D: Debug> Debug for ShardSourcePart<K, V, T, D>
711where
712    K: Codec + Debug,
713    <K as Codec>::Storage: Debug,
714    V: Codec + Debug,
715    <V as Codec>::Storage: Debug,
716{
717    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
718        let ShardSourcePart { part, fetch_permit } = self;
719        f.debug_struct("ShardSourcePart")
720            .field("part", part)
721            .field("fetch_permit", fetch_permit)
722            .finish()
723    }
724}
725
726impl<K: Codec, V: Codec, T: Timestamp + Lattice + Codec64, D> FetchedBlob<K, V, T, D> {
727    /// Partially decodes this blob into a [FetchedPart].
728    pub fn parse(&self) -> ShardSourcePart<K, V, T, D> {
729        self.parse_internal(&self.fetch_config)
730    }
731
732    /// Partially decodes this blob into a [FetchedPart].
733    pub(crate) fn parse_internal(&self, cfg: &FetchConfig) -> ShardSourcePart<K, V, T, D> {
734        let (part, stats) = match &self.buf {
735            FetchedBlobBuf::Hollow { buf, part } => {
736                let parsed = decode_batch_part_blob(
737                    cfg,
738                    &self.metrics,
739                    &self.read_metrics,
740                    self.registered_desc.clone(),
741                    part,
742                    buf,
743                );
744                (parsed, part.stats.as_ref())
745            }
746            FetchedBlobBuf::Inline {
747                desc,
748                updates,
749                ts_rewrite,
750            } => {
751                let parsed = EncodedPart::from_inline(
752                    cfg,
753                    &self.metrics,
754                    self.read_metrics.clone(),
755                    desc.clone(),
756                    updates,
757                    ts_rewrite.as_ref(),
758                );
759                (parsed, None)
760            }
761        };
762        let part = FetchedPart::new(
763            Arc::clone(&self.metrics),
764            part,
765            self.migration.clone(),
766            self.filter.clone(),
767            self.filter_pushdown_audit,
768            self.structured_part_audit,
769            stats,
770        );
771        ShardSourcePart {
772            part,
773            fetch_permit: self.fetch_permit.clone(),
774        }
775    }
776
777    /// Decodes and returns the pushdown stats for this part, if known.
778    pub fn stats(&self) -> Option<PartStats> {
779        match &self.buf {
780            FetchedBlobBuf::Hollow { part, .. } => part.stats.as_ref().map(|x| x.decode()),
781            FetchedBlobBuf::Inline { .. } => None,
782        }
783    }
784}
785
786/// A [Blob] object that has been fetched, but not yet fully decoded.
787///
788/// In contrast to [FetchedBlob], this representation has already done parquet
789/// decoding.
790#[derive(Debug)]
791pub struct FetchedPart<K: Codec, V: Codec, T, D> {
792    metrics: Arc<Metrics>,
793    ts_filter: FetchBatchFilter<T>,
794    // If migration is Either, then the columnar one will have already been
795    // applied here on the structured data only.
796    part: EitherOrBoth<
797        ColumnarRecords,
798        (
799            <K::Schema as Schema<K>>::Decoder,
800            <V::Schema as Schema<V>>::Decoder,
801        ),
802    >,
803    timestamps: Int64Array,
804    diffs: Int64Array,
805    migration: PartMigration<K, V>,
806    filter_pushdown_audit: Option<LazyPartStats>,
807    peek_stash: Option<((K, V), T, D)>,
808    part_cursor: usize,
809    key_storage: Option<K::Storage>,
810    val_storage: Option<V::Storage>,
811
812    _phantom: PhantomData<fn() -> D>,
813}
814
815impl<K: Codec, V: Codec, T: Timestamp + Lattice + Codec64, D> FetchedPart<K, V, T, D> {
816    pub(crate) fn new(
817        metrics: Arc<Metrics>,
818        part: EncodedPart<T>,
819        migration: PartMigration<K, V>,
820        ts_filter: FetchBatchFilter<T>,
821        filter_pushdown_audit: bool,
822        part_decode_format: PartDecodeFormat,
823        stats: Option<&LazyPartStats>,
824    ) -> Self {
825        let part_len = u64::cast_from(part.part.updates.len());
826        match &migration {
827            PartMigration::SameSchema { .. } => metrics.schema.migration_count_same.inc(),
828            PartMigration::Schemaless { .. } => {
829                metrics.schema.migration_count_codec.inc();
830                metrics.schema.migration_len_legacy_codec.inc_by(part_len);
831            }
832            PartMigration::Either { .. } => {
833                metrics.schema.migration_count_either.inc();
834                match part_decode_format {
835                    PartDecodeFormat::Row {
836                        validate_structured: false,
837                    } => metrics.schema.migration_len_either_codec.inc_by(part_len),
838                    PartDecodeFormat::Row {
839                        validate_structured: true,
840                    } => {
841                        metrics.schema.migration_len_either_codec.inc_by(part_len);
842                        metrics.schema.migration_len_either_arrow.inc_by(part_len);
843                    }
844                    PartDecodeFormat::Arrow => {
845                        metrics.schema.migration_len_either_arrow.inc_by(part_len)
846                    }
847                }
848            }
849        }
850
851        let filter_pushdown_audit = if filter_pushdown_audit {
852            stats.cloned()
853        } else {
854            None
855        };
856
857        let downcast_structured = |structured: ColumnarRecordsStructuredExt,
858                                   structured_only: bool| {
859            let key_size_before = ArrayOrd::new(&structured.key).goodbytes();
860
861            let structured = match &migration {
862                PartMigration::SameSchema { .. } => structured,
863                PartMigration::Schemaless { read } if structured_only => {
864                    // We don't know the source schema, but we do know the source datatype; migrate it directly.
865                    let start = Instant::now();
866                    let read_key = data_type::<K>(&*read.key).ok()?;
867                    let read_val = data_type::<V>(&*read.val).ok()?;
868                    let key_migration = backward_compatible(structured.key.data_type(), &read_key)?;
869                    let val_migration = backward_compatible(structured.val.data_type(), &read_val)?;
870                    let key = key_migration.migrate(structured.key);
871                    let val = val_migration.migrate(structured.val);
872                    metrics
873                        .schema
874                        .migration_migrate_seconds
875                        .inc_by(start.elapsed().as_secs_f64());
876                    ColumnarRecordsStructuredExt { key, val }
877                }
878                PartMigration::Schemaless { .. } => return None,
879                PartMigration::Either {
880                    write: _,
881                    read: _,
882                    key_migration,
883                    val_migration,
884                } => {
885                    let start = Instant::now();
886                    let key = key_migration.migrate(structured.key);
887                    let val = val_migration.migrate(structured.val);
888                    metrics
889                        .schema
890                        .migration_migrate_seconds
891                        .inc_by(start.elapsed().as_secs_f64());
892                    ColumnarRecordsStructuredExt { key, val }
893                }
894            };
895
896            let read_schema = migration.codec_read();
897            let key = K::Schema::decoder_any(&*read_schema.key, &*structured.key);
898            let val = V::Schema::decoder_any(&*read_schema.val, &*structured.val);
899
900            match &key {
901                Ok(key_decoder) => {
902                    let key_size_after = key_decoder.goodbytes();
903                    let key_diff = key_size_before.saturating_sub(key_size_after);
904                    metrics
905                        .pushdown
906                        .parts_projection_trimmed_bytes
907                        .inc_by(u64::cast_from(key_diff));
908                }
909                Err(e) => {
910                    soft_panic_or_log!("failed to create decoder: {e:#?}");
911                }
912            }
913
914            Some((key.ok()?, val.ok()?))
915        };
916
917        let updates = part.normalize(&metrics.columnar);
918        let timestamps = updates.timestamps().clone();
919        let diffs = updates.diffs().clone();
920        let part = match updates {
921            // If only one encoding is available, decode via that encoding.
922            BlobTraceUpdates::Row(records) => EitherOrBoth::Left(records),
923            BlobTraceUpdates::Structured { key_values, .. } => EitherOrBoth::Right(
924                // The structured-only data format was added after schema ids were recorded everywhere,
925                // so we expect this data to be present.
926                downcast_structured(key_values, true).expect("valid schemas for structured data"),
927            ),
928            // If both are available, respect the specified part decode format.
929            BlobTraceUpdates::Both(records, ext) => match part_decode_format {
930                PartDecodeFormat::Row {
931                    validate_structured: false,
932                } => EitherOrBoth::Left(records),
933                PartDecodeFormat::Row {
934                    validate_structured: true,
935                } => match downcast_structured(ext, false) {
936                    Some(decoders) => EitherOrBoth::Both(records, decoders),
937                    None => EitherOrBoth::Left(records),
938                },
939                PartDecodeFormat::Arrow => match downcast_structured(ext, false) {
940                    Some(decoders) => EitherOrBoth::Right(decoders),
941                    None => EitherOrBoth::Left(records),
942                },
943            },
944        };
945
946        FetchedPart {
947            metrics,
948            ts_filter,
949            part,
950            peek_stash: None,
951            timestamps,
952            diffs,
953            migration,
954            filter_pushdown_audit,
955            part_cursor: 0,
956            key_storage: None,
957            val_storage: None,
958            _phantom: PhantomData,
959        }
960    }
961
962    /// Returns Some if this part was only fetched as part of a filter pushdown
963    /// audit. See [LeasedBatchPart::request_filter_pushdown_audit].
964    ///
965    /// If set, the value in the Option is for debugging and should be included
966    /// in any error messages.
967    pub fn is_filter_pushdown_audit(&self) -> Option<impl std::fmt::Debug + use<K, V, T, D>> {
968        self.filter_pushdown_audit.clone()
969    }
970}
971
972/// A [Blob] object that has been fetched, but has no associated decoding
973/// logic.
974#[derive(Debug)]
975pub(crate) struct EncodedPart<T> {
976    metrics: ReadMetrics,
977    registered_desc: Description<T>,
978    part: BlobTraceBatchPart<T>,
979    needs_truncation: bool,
980    ts_rewrite: Option<Antichain<T>>,
981}
982
983impl<K, V, T, D> FetchedPart<K, V, T, D>
984where
985    K: Debug + Codec,
986    V: Debug + Codec,
987    T: Timestamp + Lattice + Codec64,
988    D: Monoid + Codec64 + Send + Sync,
989{
990    /// [Self::next] but optionally providing a `K` and `V` for alloc reuse.
991    ///
992    /// When `result_override` is specified, return it instead of decoding data.
993    /// This is used when we know the decoded result will be ignored.
994    pub fn next_with_storage(
995        &mut self,
996        key: &mut Option<K>,
997        val: &mut Option<V>,
998    ) -> Option<((K, V), T, D)> {
999        let mut consolidated = self.peek_stash.take();
1000        loop {
1001            // Fetch and decode the next tuple in the sequence. (Or break if there is none.)
1002            let next = if self.part_cursor < self.timestamps.len() {
1003                let next_idx = self.part_cursor;
1004                self.part_cursor += 1;
1005                // These `to_le_bytes` calls were previously encapsulated by `ColumnarRecords`.
1006                // TODO(structured): re-encapsulate these once we've finished the structured migration.
1007                let mut t = T::decode(self.timestamps.values()[next_idx].to_le_bytes());
1008                if !self.ts_filter.filter_ts(&mut t) {
1009                    continue;
1010                }
1011                let d = D::decode(self.diffs.values()[next_idx].to_le_bytes());
1012                if d.is_zero() {
1013                    continue;
1014                }
1015                let kv = self.decode_kv(next_idx, key, val);
1016                (kv, t, d)
1017            } else {
1018                break;
1019            };
1020
1021            // Attempt to consolidate in the next tuple, stashing it if that's not possible.
1022            if let Some((kv, t, d)) = &mut consolidated {
1023                let (kv_next, t_next, d_next) = &next;
1024                if kv == kv_next && t == t_next {
1025                    d.plus_equals(d_next);
1026                    if d.is_zero() {
1027                        consolidated = None;
1028                    }
1029                } else {
1030                    self.peek_stash = Some(next);
1031                    break;
1032                }
1033            } else {
1034                consolidated = Some(next);
1035            }
1036        }
1037
1038        let (kv, t, d) = consolidated?;
1039
1040        Some((kv, t, d))
1041    }
1042
1043    fn decode_kv(&mut self, index: usize, key: &mut Option<K>, val: &mut Option<V>) -> (K, V) {
1044        let decoded = self
1045            .part
1046            .as_ref()
1047            .map_left(|codec| {
1048                let ((ck, cv), _, _) = codec.get(index).expect("valid index");
1049                let (k, v) = Self::decode_codec(
1050                    &*self.metrics,
1051                    self.migration.codec_read(),
1052                    ck,
1053                    cv,
1054                    key,
1055                    val,
1056                    &mut self.key_storage,
1057                    &mut self.val_storage,
1058                );
1059                (k.expect("valid legacy key"), v.expect("valid legacy value"))
1060            })
1061            .map_right(|(structured_key, structured_val)| {
1062                self.decode_structured(index, structured_key, structured_val, key, val)
1063            });
1064
1065        match decoded {
1066            EitherOrBoth::Both((k, v), (k_s, v_s)) => {
1067                // Purposefully do not trace to prevent blowing up Sentry.
1068                let is_valid = self
1069                    .metrics
1070                    .columnar
1071                    .arrow()
1072                    .key()
1073                    .report_valid(|| k_s == k);
1074                if !is_valid {
1075                    soft_panic_no_log!("structured key did not match, {k_s:?} != {k:?}");
1076                }
1077                // Purposefully do not trace to prevent blowing up Sentry.
1078                let is_valid = self
1079                    .metrics
1080                    .columnar
1081                    .arrow()
1082                    .val()
1083                    .report_valid(|| v_s == v);
1084                if !is_valid {
1085                    soft_panic_no_log!("structured val did not match, {v_s:?} != {v:?}");
1086                }
1087
1088                (k, v)
1089            }
1090            EitherOrBoth::Left(kv) => kv,
1091            EitherOrBoth::Right(kv) => kv,
1092        }
1093    }
1094
1095    fn decode_codec(
1096        metrics: &Metrics,
1097        read_schemas: &Schemas<K, V>,
1098        key_buf: &[u8],
1099        val_buf: &[u8],
1100        key: &mut Option<K>,
1101        val: &mut Option<V>,
1102        key_storage: &mut Option<K::Storage>,
1103        val_storage: &mut Option<V::Storage>,
1104    ) -> (Result<K, String>, Result<V, String>) {
1105        let k = metrics.codecs.key.decode(|| match key.take() {
1106            Some(mut key) => {
1107                match K::decode_from(&mut key, key_buf, key_storage, &read_schemas.key) {
1108                    Ok(()) => Ok(key),
1109                    Err(err) => Err(err),
1110                }
1111            }
1112            None => K::decode(key_buf, &read_schemas.key),
1113        });
1114        let v = metrics.codecs.val.decode(|| match val.take() {
1115            Some(mut val) => {
1116                match V::decode_from(&mut val, val_buf, val_storage, &read_schemas.val) {
1117                    Ok(()) => Ok(val),
1118                    Err(err) => Err(err),
1119                }
1120            }
1121            None => V::decode(val_buf, &read_schemas.val),
1122        });
1123        (k, v)
1124    }
1125
1126    fn decode_structured(
1127        &self,
1128        idx: usize,
1129        keys: &<K::Schema as Schema<K>>::Decoder,
1130        vals: &<V::Schema as Schema<V>>::Decoder,
1131        key: &mut Option<K>,
1132        val: &mut Option<V>,
1133    ) -> (K, V) {
1134        let mut key = key.take().unwrap_or_default();
1135        keys.decode(idx, &mut key);
1136
1137        let mut val = val.take().unwrap_or_default();
1138        vals.decode(idx, &mut val);
1139
1140        (key, val)
1141    }
1142}
1143
1144impl<K, V, T, D> Iterator for FetchedPart<K, V, T, D>
1145where
1146    K: Debug + Codec,
1147    V: Debug + Codec,
1148    T: Timestamp + Lattice + Codec64,
1149    D: Monoid + Codec64 + Send + Sync,
1150{
1151    type Item = ((K, V), T, D);
1152
1153    fn next(&mut self) -> Option<Self::Item> {
1154        self.next_with_storage(&mut None, &mut None)
1155    }
1156
1157    fn size_hint(&self) -> (usize, Option<usize>) {
1158        // We don't know in advance how restrictive the filter will be.
1159        let max_len = self.timestamps.len();
1160        (0, Some(max_len))
1161    }
1162}
1163
1164impl<T> EncodedPart<T>
1165where
1166    T: Timestamp + Lattice + Codec64,
1167{
1168    pub async fn fetch(
1169        cfg: &FetchConfig,
1170        shard_id: &ShardId,
1171        blob: &dyn Blob,
1172        metrics: &Metrics,
1173        shard_metrics: &ShardMetrics,
1174        read_metrics: &ReadMetrics,
1175        registered_desc: &Description<T>,
1176        part: &BatchPart<T>,
1177    ) -> Result<Self, BlobKey> {
1178        match part {
1179            BatchPart::Hollow(x) => {
1180                fetch_batch_part(
1181                    cfg,
1182                    shard_id,
1183                    blob,
1184                    metrics,
1185                    shard_metrics,
1186                    read_metrics,
1187                    registered_desc,
1188                    x,
1189                )
1190                .await
1191            }
1192            BatchPart::Inline {
1193                updates,
1194                ts_rewrite,
1195                ..
1196            } => Ok(EncodedPart::from_inline(
1197                cfg,
1198                metrics,
1199                read_metrics.clone(),
1200                registered_desc.clone(),
1201                updates,
1202                ts_rewrite.as_ref(),
1203            )),
1204        }
1205    }
1206
1207    pub(crate) fn from_inline(
1208        cfg: &FetchConfig,
1209        metrics: &Metrics,
1210        read_metrics: ReadMetrics,
1211        desc: Description<T>,
1212        x: &LazyInlineBatchPart,
1213        ts_rewrite: Option<&Antichain<T>>,
1214    ) -> Self {
1215        let parsed = x.decode(&metrics.columnar).expect("valid inline part");
1216        Self::new(cfg, read_metrics, desc, "inline", ts_rewrite, parsed)
1217    }
1218
1219    pub(crate) fn from_hollow(
1220        cfg: &FetchConfig,
1221        metrics: ReadMetrics,
1222        registered_desc: Description<T>,
1223        part: &HollowBatchPart<T>,
1224        parsed: BlobTraceBatchPart<T>,
1225    ) -> Self {
1226        Self::new(
1227            cfg,
1228            metrics,
1229            registered_desc,
1230            &part.key.0,
1231            part.ts_rewrite.as_ref(),
1232            parsed,
1233        )
1234    }
1235
1236    pub(crate) fn new(
1237        cfg: &FetchConfig,
1238        metrics: ReadMetrics,
1239        registered_desc: Description<T>,
1240        printable_name: &str,
1241        ts_rewrite: Option<&Antichain<T>>,
1242        parsed: BlobTraceBatchPart<T>,
1243    ) -> Self {
1244        // There are two types of batches in persist:
1245        // - Batches written by a persist user (either directly or indirectly
1246        //   via BatchBuilder). These always have a since of the minimum
1247        //   timestamp and may be registered in persist state with a tighter set
1248        //   of bounds than are inline in the batch (truncation). To read one of
1249        //   these batches, all data physically in the batch but outside of the
1250        //   truncated bounds must be ignored. Not every user batch is
1251        //   truncated.
1252        // - Batches written by compaction. These always have an inline desc
1253        //   lower and upper that matches the registered desc lower and upper,
1254        //   and a since that is less than or equal to the registered desc.
1255        //   The inline since may be less than the registered desc since,
1256        //   this is because of incremental compaction, where we might rewrite
1257        //   certain runs in a batch but not others.
1258        let inline_desc = &parsed.desc;
1259        let needs_truncation = inline_desc.lower() != registered_desc.lower()
1260            || inline_desc.upper() != registered_desc.upper();
1261        if needs_truncation {
1262            if cfg.validate_bounds_on_read {
1263                soft_assert_or_log!(
1264                    PartialOrder::less_equal(inline_desc.lower(), registered_desc.lower()),
1265                    "key={} inline={:?} registered={:?}",
1266                    printable_name,
1267                    inline_desc,
1268                    registered_desc
1269                );
1270
1271                if ts_rewrite.is_none() {
1272                    // The ts rewrite feature allows us to advance the registered
1273                    // upper of a batch that's already been staged (the inline
1274                    // upper), so if it's been used, then there's no useful
1275                    // invariant that we can assert here.
1276                    soft_assert_or_log!(
1277                        PartialOrder::less_equal(registered_desc.upper(), inline_desc.upper()),
1278                        "key={} inline={:?} registered={:?}",
1279                        printable_name,
1280                        inline_desc,
1281                        registered_desc
1282                    );
1283                }
1284            }
1285            // As mentioned above, batches that needs truncation will always have a
1286            // since of the minimum timestamp. Technically we could truncate any
1287            // batch where the since is less_than the output_desc's lower, but we're
1288            // strict here so we don't get any surprises.
1289            assert_eq!(
1290                inline_desc.since(),
1291                &Antichain::from_elem(T::minimum()),
1292                "key={} inline={:?} registered={:?}",
1293                printable_name,
1294                inline_desc,
1295                registered_desc
1296            );
1297        } else {
1298            assert!(
1299                PartialOrder::less_equal(inline_desc.since(), registered_desc.since()),
1300                "key={} inline={:?} registered={:?}",
1301                printable_name,
1302                inline_desc,
1303                registered_desc
1304            );
1305            assert_eq!(
1306                inline_desc.lower(),
1307                registered_desc.lower(),
1308                "key={} inline={:?} registered={:?}",
1309                printable_name,
1310                inline_desc,
1311                registered_desc
1312            );
1313            assert_eq!(
1314                inline_desc.upper(),
1315                registered_desc.upper(),
1316                "key={} inline={:?} registered={:?}",
1317                printable_name,
1318                inline_desc,
1319                registered_desc
1320            );
1321        }
1322
1323        EncodedPart {
1324            metrics,
1325            registered_desc,
1326            part: parsed,
1327            needs_truncation,
1328            ts_rewrite: ts_rewrite.cloned(),
1329        }
1330    }
1331
1332    pub(crate) fn maybe_unconsolidated(&self) -> bool {
1333        // At time of writing, only user parts may be unconsolidated, and they are always
1334        // written with a since of [T::minimum()].
1335        self.part.desc.since().borrow() == AntichainRef::new(&[T::minimum()])
1336    }
1337
1338    pub(crate) fn updates(&self) -> &BlobTraceUpdates {
1339        &self.part.updates
1340    }
1341
1342    /// Returns the updates with all truncation / timestamp rewriting applied.
1343    pub(crate) fn normalize(&self, metrics: &ColumnarMetrics) -> BlobTraceUpdates {
1344        let updates = self.part.updates.clone();
1345        if !self.needs_truncation && self.ts_rewrite.is_none() {
1346            return updates;
1347        }
1348
1349        let mut codec = updates
1350            .records()
1351            .map(|r| (r.keys().clone(), r.vals().clone()));
1352        let mut structured = updates.structured().cloned();
1353        let mut timestamps = updates.timestamps().clone();
1354        let mut diffs = updates.diffs().clone();
1355
1356        if let Some(rewrite) = self.ts_rewrite.as_ref() {
1357            timestamps = arrow::compute::unary(&timestamps, |i: i64| {
1358                let mut t = T::decode(i.to_le_bytes());
1359                t.advance_by(rewrite.borrow());
1360                i64::from_le_bytes(T::encode(&t))
1361            });
1362        }
1363
1364        let reallocated = if self.needs_truncation {
1365            let filter = BooleanArray::from_unary(&timestamps, |i| {
1366                let t = T::decode(i.to_le_bytes());
1367                let truncate_t = {
1368                    !self.registered_desc.lower().less_equal(&t)
1369                        || self.registered_desc.upper().less_equal(&t)
1370                };
1371                !truncate_t
1372            });
1373            if filter.false_count() == 0 {
1374                // If we're not filtering anything in practice, skip filtering and reallocating.
1375                false
1376            } else {
1377                let filter = FilterBuilder::new(&filter).optimize().build();
1378                let do_filter = |array: &dyn Array| filter.filter(array).expect("valid filter len");
1379                if let Some((keys, vals)) = codec {
1380                    codec = Some((
1381                        realloc_array(do_filter(&keys).as_binary(), metrics),
1382                        realloc_array(do_filter(&vals).as_binary(), metrics),
1383                    ));
1384                }
1385                if let Some(ext) = structured {
1386                    structured = Some(ColumnarRecordsStructuredExt {
1387                        key: realloc_any(do_filter(&*ext.key), metrics),
1388                        val: realloc_any(do_filter(&*ext.val), metrics),
1389                    });
1390                }
1391                timestamps = realloc_array(do_filter(&timestamps).as_primitive(), metrics);
1392                diffs = realloc_array(do_filter(&diffs).as_primitive(), metrics);
1393                true
1394            }
1395        } else {
1396            false
1397        };
1398
1399        if self.ts_rewrite.is_some() && !reallocated {
1400            timestamps = realloc_array(&timestamps, metrics);
1401        }
1402
1403        if self.ts_rewrite.is_some() {
1404            self.metrics
1405                .ts_rewrite
1406                .inc_by(u64::cast_from(timestamps.len()));
1407        }
1408
1409        match (codec, structured) {
1410            (Some((key, value)), None) => {
1411                BlobTraceUpdates::Row(ColumnarRecords::new(key, value, timestamps, diffs))
1412            }
1413            (Some((key, value)), Some(ext)) => {
1414                BlobTraceUpdates::Both(ColumnarRecords::new(key, value, timestamps, diffs), ext)
1415            }
1416            (None, Some(ext)) => BlobTraceUpdates::Structured {
1417                key_values: ext,
1418                timestamps,
1419                diffs,
1420            },
1421            (None, None) => unreachable!(),
1422        }
1423    }
1424}
1425
1426/// This represents the serde encoding for [`LeasedBatchPart`]. We expose the struct
1427/// itself (unlike other encodable structs) to attempt to provide stricter drop
1428/// semantics on `LeasedBatchPart`, i.e. `SerdeLeasedBatchPart` is exchangeable
1429/// (including over the network), where `LeasedBatchPart` is not.
1430///
1431/// For more details see documentation and comments on:
1432/// - [`LeasedBatchPart`]
1433/// - `From<SerdeLeasedBatchPart>` for `LeasedBatchPart<T>`
1434#[derive(Debug, Serialize, Deserialize, Clone)]
1435pub struct ExchangeableBatchPart<T> {
1436    shard_id: ShardId,
1437    // Duplicated with the one serialized in the proto for use in backpressure.
1438    encoded_size_bytes: usize,
1439    desc: Description<T>,
1440    filter: FetchBatchFilter<T>,
1441    part: LazyProto<ProtoHollowBatchPart>,
1442    filter_pushdown_audit: bool,
1443}
1444
1445impl<T> ExchangeableBatchPart<T> {
1446    /// Returns the encoded size of the given part.
1447    pub fn encoded_size_bytes(&self) -> usize {
1448        self.encoded_size_bytes
1449    }
1450}
1451
1452/// Format we'll use when decoding a [`Part`].
1453///
1454/// [`Part`]: mz_persist_types::part::Part
1455#[derive(Debug, Copy, Clone)]
1456pub enum PartDecodeFormat {
1457    /// Decode from opaque `Codec` data.
1458    Row {
1459        /// Will also decode the structured data, and validate it matches.
1460        validate_structured: bool,
1461    },
1462    /// Decode from arrow data
1463    Arrow,
1464}
1465
1466impl PartDecodeFormat {
1467    /// Returns a default value for [`PartDecodeFormat`].
1468    pub const fn default() -> Self {
1469        PartDecodeFormat::Arrow
1470    }
1471
1472    /// Parses a [`PartDecodeFormat`] from the provided string, falling back to the default if the
1473    /// provided value is unrecognized.
1474    pub fn from_str(s: &str) -> Self {
1475        match s {
1476            "row" => PartDecodeFormat::Row {
1477                validate_structured: false,
1478            },
1479            "row_with_validate" => PartDecodeFormat::Row {
1480                validate_structured: true,
1481            },
1482            "arrow" => PartDecodeFormat::Arrow,
1483            x => {
1484                let default = PartDecodeFormat::default();
1485                soft_panic_or_log!("Invalid part decode format: '{x}', falling back to {default}");
1486                default
1487            }
1488        }
1489    }
1490
1491    /// Returns a string representation of [`PartDecodeFormat`].
1492    pub const fn as_str(&self) -> &'static str {
1493        match self {
1494            PartDecodeFormat::Row {
1495                validate_structured: false,
1496            } => "row",
1497            PartDecodeFormat::Row {
1498                validate_structured: true,
1499            } => "row_with_validate",
1500            PartDecodeFormat::Arrow => "arrow",
1501        }
1502    }
1503}
1504
1505impl fmt::Display for PartDecodeFormat {
1506    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1507        f.write_str(self.as_str())
1508    }
1509}
1510
1511#[mz_ore::test]
1512fn client_exchange_data() {
1513    // The whole point of SerdeLeasedBatchPart is that it can be exchanged
1514    // between timely workers, including over the network. Enforce then that it
1515    // implements ExchangeData.
1516    fn is_exchange_data<T: timely::ExchangeData>() {}
1517    is_exchange_data::<ExchangeableBatchPart<u64>>();
1518    is_exchange_data::<ExchangeableBatchPart<u64>>();
1519}