mz_persist_client/
fetch.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Fetching batches of data from persist's backing store
11
12use std::fmt::{self, Debug};
13use std::marker::PhantomData;
14use std::sync::Arc;
15use std::time::Instant;
16
17use anyhow::anyhow;
18use arrow::array::{Array, ArrayRef, AsArray, BooleanArray, Int64Array};
19use arrow::compute::FilterBuilder;
20use differential_dataflow::difference::Semigroup;
21use differential_dataflow::lattice::Lattice;
22use differential_dataflow::trace::Description;
23use itertools::EitherOrBoth;
24use mz_dyncfg::{Config, ConfigSet, ConfigValHandle};
25use mz_ore::bytes::SegmentedBytes;
26use mz_ore::cast::CastFrom;
27use mz_ore::{soft_panic_no_log, soft_panic_or_log};
28use mz_persist::indexed::columnar::arrow::{realloc_any, realloc_array};
29use mz_persist::indexed::columnar::{ColumnarRecords, ColumnarRecordsStructuredExt};
30use mz_persist::indexed::encoding::{BlobTraceBatchPart, BlobTraceUpdates};
31use mz_persist::location::{Blob, SeqNo};
32use mz_persist::metrics::ColumnarMetrics;
33use mz_persist_types::arrow::ArrayOrd;
34use mz_persist_types::columnar::{ColumnDecoder, Schema, data_type};
35use mz_persist_types::part::Codec64Mut;
36use mz_persist_types::schema::backward_compatible;
37use mz_persist_types::stats::PartStats;
38use mz_persist_types::{Codec, Codec64};
39use mz_proto::RustType;
40use serde::{Deserialize, Serialize};
41use timely::PartialOrder;
42use timely::progress::frontier::AntichainRef;
43use timely::progress::{Antichain, Timestamp};
44use tracing::{Instrument, debug, debug_span, trace_span};
45
46use crate::ShardId;
47use crate::cfg::PersistConfig;
48use crate::error::InvalidUsage;
49use crate::internal::encoding::{LazyInlineBatchPart, LazyPartStats, LazyProto, Schemas};
50use crate::internal::machine::retry_external;
51use crate::internal::metrics::{Metrics, MetricsPermits, ReadMetrics, ShardMetrics};
52use crate::internal::paths::BlobKey;
53use crate::internal::state::{
54    BatchPart, HollowBatchPart, ProtoHollowBatchPart, ProtoInlineBatchPart,
55};
56use crate::read::LeasedReaderId;
57use crate::schema::{PartMigration, SchemaCache};
58
59pub(crate) const FETCH_SEMAPHORE_COST_ADJUSTMENT: Config<f64> = Config::new(
60    "persist_fetch_semaphore_cost_adjustment",
61    // We use `encoded_size_bytes` as the number of permits, but the parsed size
62    // is larger than the encoded one, so adjust it. This default value is from
63    // eyeballing graphs in experiments that were run on tpch loadgen data.
64    1.2,
65    "\
66    An adjustment multiplied by encoded_size_bytes to approximate an upper \
67    bound on the size in lgalloc, which includes the decoded version.",
68);
69
70pub(crate) const FETCH_SEMAPHORE_PERMIT_ADJUSTMENT: Config<f64> = Config::new(
71    "persist_fetch_semaphore_permit_adjustment",
72    1.0,
73    "\
74    A limit on the number of outstanding persist bytes being fetched and \
75    parsed, expressed as a multiplier of the process's memory limit. This data \
76    all spills to lgalloc, so values > 1.0 are safe. Only applied to cc \
77    replicas.",
78);
79
80pub(crate) const PART_DECODE_FORMAT: Config<&'static str> = Config::new(
81    "persist_part_decode_format",
82    PartDecodeFormat::default().as_str(),
83    "\
84    Format we'll use to decode a Persist Part, either 'row', \
85    'row_with_validate', or 'arrow' (Materialize).",
86);
87
88pub(crate) const OPTIMIZE_IGNORED_DATA_FETCH: Config<bool> = Config::new(
89    "persist_optimize_ignored_data_fetch",
90    true,
91    "CYA to allow opt-out of a performance optimization to skip fetching ignored data",
92);
93
94pub(crate) const VALIDATE_PART_BOUNDS_ON_READ: Config<bool> = Config::new(
95    "persist_validate_part_bounds_on_read",
96    true,
97    "Validate the part lower <= the batch lower and the part upper <= batch upper,\
98    for the batch containing that part",
99);
100
101#[derive(Debug, Clone)]
102pub(crate) struct FetchConfig {
103    pub(crate) validate_bounds_on_read: bool,
104}
105
106impl FetchConfig {
107    pub fn from_persist_config(cfg: &PersistConfig) -> Self {
108        Self {
109            validate_bounds_on_read: VALIDATE_PART_BOUNDS_ON_READ.get(cfg),
110        }
111    }
112}
113
114#[derive(Debug, Clone)]
115pub(crate) struct BatchFetcherConfig {
116    pub(crate) part_decode_format: ConfigValHandle<String>,
117    pub(crate) fetch_config: FetchConfig,
118}
119
120impl BatchFetcherConfig {
121    pub fn new(value: &PersistConfig) -> Self {
122        Self {
123            part_decode_format: PART_DECODE_FORMAT.handle(value),
124            fetch_config: FetchConfig::from_persist_config(value),
125        }
126    }
127
128    pub fn part_decode_format(&self) -> PartDecodeFormat {
129        PartDecodeFormat::from_str(self.part_decode_format.get().as_str())
130    }
131}
132
133/// Capable of fetching [`LeasedBatchPart`] while not holding any capabilities.
134#[derive(Debug)]
135pub struct BatchFetcher<K, V, T, D>
136where
137    T: Timestamp + Lattice + Codec64,
138    // These are only here so we can use them in the auto-expiring `Drop` impl.
139    K: Debug + Codec,
140    V: Debug + Codec,
141    D: Semigroup + Codec64 + Send + Sync,
142{
143    pub(crate) cfg: BatchFetcherConfig,
144    pub(crate) blob: Arc<dyn Blob>,
145    pub(crate) metrics: Arc<Metrics>,
146    pub(crate) shard_metrics: Arc<ShardMetrics>,
147    pub(crate) shard_id: ShardId,
148    pub(crate) read_schemas: Schemas<K, V>,
149    pub(crate) schema_cache: SchemaCache<K, V, T, D>,
150    pub(crate) is_transient: bool,
151
152    // Ensures that `BatchFetcher` is of the same type as the `ReadHandle` it's
153    // derived from.
154    pub(crate) _phantom: PhantomData<fn() -> (K, V, T, D)>,
155}
156
157impl<K, V, T, D> BatchFetcher<K, V, T, D>
158where
159    K: Debug + Codec,
160    V: Debug + Codec,
161    T: Timestamp + Lattice + Codec64 + Sync,
162    D: Semigroup + Codec64 + Send + Sync,
163{
164    /// Trade in an exchange-able [LeasedBatchPart] for the data it represents.
165    ///
166    /// Note to check the `LeasedBatchPart` documentation for how to handle the
167    /// returned value.
168    pub async fn fetch_leased_part(
169        &mut self,
170        part: ExchangeableBatchPart<T>,
171    ) -> Result<Result<FetchedBlob<K, V, T, D>, BlobKey>, InvalidUsage<T>> {
172        let ExchangeableBatchPart {
173            shard_id,
174            encoded_size_bytes: _,
175            desc,
176            filter,
177            filter_pushdown_audit,
178            part,
179        } = part;
180        let part: BatchPart<T> = part.decode_to().expect("valid part");
181        if shard_id != self.shard_id {
182            return Err(InvalidUsage::BatchNotFromThisShard {
183                batch_shard: shard_id,
184                handle_shard: self.shard_id.clone(),
185            });
186        }
187
188        let migration =
189            PartMigration::new(&part, self.read_schemas.clone(), &mut self.schema_cache)
190                .await
191                .unwrap_or_else(|read_schemas| {
192                    panic!(
193                        "could not decode part {:?} with schema: {:?}",
194                        part.schema_id(),
195                        read_schemas
196                    )
197                });
198
199        let (buf, fetch_permit) = match &part {
200            BatchPart::Hollow(x) => {
201                let fetch_permit = self
202                    .metrics
203                    .semaphore
204                    .acquire_fetch_permits(x.encoded_size_bytes)
205                    .await;
206                let read_metrics = if self.is_transient {
207                    &self.metrics.read.unindexed
208                } else {
209                    &self.metrics.read.batch_fetcher
210                };
211                let buf = fetch_batch_part_blob(
212                    &shard_id,
213                    self.blob.as_ref(),
214                    &self.metrics,
215                    &self.shard_metrics,
216                    read_metrics,
217                    x,
218                )
219                .await;
220                let buf = match buf {
221                    Ok(buf) => buf,
222                    Err(key) => return Ok(Err(key)),
223                };
224                let buf = FetchedBlobBuf::Hollow {
225                    buf,
226                    part: x.clone(),
227                };
228                (buf, Some(Arc::new(fetch_permit)))
229            }
230            BatchPart::Inline {
231                updates,
232                ts_rewrite,
233                ..
234            } => {
235                let buf = FetchedBlobBuf::Inline {
236                    desc: desc.clone(),
237                    updates: updates.clone(),
238                    ts_rewrite: ts_rewrite.clone(),
239                };
240                (buf, None)
241            }
242        };
243        let fetched_blob = FetchedBlob {
244            metrics: Arc::clone(&self.metrics),
245            read_metrics: self.metrics.read.batch_fetcher.clone(),
246            buf,
247            registered_desc: desc.clone(),
248            migration,
249            filter: filter.clone(),
250            filter_pushdown_audit,
251            structured_part_audit: self.cfg.part_decode_format(),
252            fetch_permit,
253            _phantom: PhantomData,
254            fetch_config: self.cfg.fetch_config.clone(),
255        };
256        Ok(Ok(fetched_blob))
257    }
258}
259
260#[derive(Debug, Clone, Serialize, Deserialize)]
261pub(crate) enum FetchBatchFilter<T> {
262    Snapshot {
263        as_of: Antichain<T>,
264    },
265    Listen {
266        as_of: Antichain<T>,
267        lower: Antichain<T>,
268    },
269    Compaction {
270        since: Antichain<T>,
271    },
272}
273
274impl<T: Timestamp + Lattice> FetchBatchFilter<T> {
275    pub(crate) fn filter_ts(&self, t: &mut T) -> bool {
276        match self {
277            FetchBatchFilter::Snapshot { as_of } => {
278                // This time is covered by a listen
279                if as_of.less_than(t) {
280                    return false;
281                }
282                t.advance_by(as_of.borrow());
283                true
284            }
285            FetchBatchFilter::Listen { as_of, lower } => {
286                // This time is covered by a snapshot
287                if !as_of.less_than(t) {
288                    return false;
289                }
290
291                // Because of compaction, the next batch we get might also
292                // contain updates we've already emitted. For example, we
293                // emitted `[1, 2)` and then compaction combined that batch with
294                // a `[2, 3)` batch into a new `[1, 3)` batch. If this happens,
295                // we just need to filter out anything < the frontier. This
296                // frontier was the upper of the last batch (and thus exclusive)
297                // so for the == case, we still emit.
298                if !lower.less_equal(t) {
299                    return false;
300                }
301                true
302            }
303            FetchBatchFilter::Compaction { since } => {
304                t.advance_by(since.borrow());
305                true
306            }
307        }
308    }
309}
310
311/// Trade in an exchange-able [LeasedBatchPart] for the data it represents.
312///
313/// Note to check the `LeasedBatchPart` documentation for how to handle the
314/// returned value.
315pub(crate) async fn fetch_leased_part<K, V, T, D>(
316    cfg: &PersistConfig,
317    part: &LeasedBatchPart<T>,
318    blob: &dyn Blob,
319    metrics: Arc<Metrics>,
320    read_metrics: &ReadMetrics,
321    shard_metrics: &ShardMetrics,
322    reader_id: &LeasedReaderId,
323    read_schemas: Schemas<K, V>,
324    schema_cache: &mut SchemaCache<K, V, T, D>,
325) -> FetchedPart<K, V, T, D>
326where
327    K: Debug + Codec,
328    V: Debug + Codec,
329    T: Timestamp + Lattice + Codec64 + Sync,
330    D: Semigroup + Codec64 + Send + Sync,
331{
332    let fetch_config = FetchConfig::from_persist_config(cfg);
333    let encoded_part = EncodedPart::fetch(
334        &fetch_config,
335        &part.shard_id,
336        blob,
337        &metrics,
338        shard_metrics,
339        read_metrics,
340        &part.desc,
341        &part.part,
342    )
343    .await
344    .unwrap_or_else(|blob_key| {
345        // Ideally, readers should never encounter a missing blob. They place a seqno
346        // hold as they consume their snapshot/listen, preventing any blobs they need
347        // from being deleted by garbage collection, and all blob implementations are
348        // linearizable so there should be no possibility of stale reads.
349        //
350        // If we do have a bug and a reader does encounter a missing blob, the state
351        // cannot be recovered, and our best option is to panic and retry the whole
352        // process.
353        panic!("{} could not fetch batch part: {}", reader_id, blob_key)
354    });
355    let part_cfg = BatchFetcherConfig::new(cfg);
356    let migration = PartMigration::new(&part.part, read_schemas, schema_cache)
357        .await
358        .unwrap_or_else(|read_schemas| {
359            panic!(
360                "could not decode part {:?} with schema: {:?}",
361                part.part.schema_id(),
362                read_schemas
363            )
364        });
365    FetchedPart::new(
366        metrics,
367        encoded_part,
368        migration,
369        part.filter.clone(),
370        part.filter_pushdown_audit,
371        part_cfg.part_decode_format(),
372        part.part.stats(),
373    )
374}
375
376pub(crate) async fn fetch_batch_part_blob<T>(
377    shard_id: &ShardId,
378    blob: &dyn Blob,
379    metrics: &Metrics,
380    shard_metrics: &ShardMetrics,
381    read_metrics: &ReadMetrics,
382    part: &HollowBatchPart<T>,
383) -> Result<SegmentedBytes, BlobKey> {
384    let now = Instant::now();
385    let get_span = debug_span!("fetch_batch::get");
386    let blob_key = part.key.complete(shard_id);
387    let value = retry_external(&metrics.retries.external.fetch_batch_get, || async {
388        shard_metrics.blob_gets.inc();
389        blob.get(&blob_key).await
390    })
391    .instrument(get_span.clone())
392    .await
393    .ok_or(blob_key)?;
394
395    drop(get_span);
396
397    read_metrics.part_count.inc();
398    read_metrics.part_bytes.inc_by(u64::cast_from(value.len()));
399    read_metrics.seconds.inc_by(now.elapsed().as_secs_f64());
400
401    Ok(value)
402}
403
404pub(crate) fn decode_batch_part_blob<T>(
405    cfg: &FetchConfig,
406    metrics: &Metrics,
407    read_metrics: &ReadMetrics,
408    registered_desc: Description<T>,
409    part: &HollowBatchPart<T>,
410    buf: &SegmentedBytes,
411) -> EncodedPart<T>
412where
413    T: Timestamp + Lattice + Codec64,
414{
415    trace_span!("fetch_batch::decode").in_scope(|| {
416        let parsed = metrics
417            .codecs
418            .batch
419            .decode(|| BlobTraceBatchPart::decode(buf, &metrics.columnar))
420            .map_err(|err| anyhow!("couldn't decode batch at key {}: {}", part.key, err))
421            // We received a State that we couldn't decode. This could happen if
422            // persist messes up backward/forward compatibility, if the durable
423            // data was corrupted, or if operations messes up deployment. In any
424            // case, fail loudly.
425            .expect("internal error: invalid encoded state");
426        read_metrics
427            .part_goodbytes
428            .inc_by(u64::cast_from(parsed.updates.goodbytes()));
429        EncodedPart::from_hollow(cfg, read_metrics.clone(), registered_desc, part, parsed)
430    })
431}
432
433pub(crate) async fn fetch_batch_part<T>(
434    cfg: &FetchConfig,
435    shard_id: &ShardId,
436    blob: &dyn Blob,
437    metrics: &Metrics,
438    shard_metrics: &ShardMetrics,
439    read_metrics: &ReadMetrics,
440    registered_desc: &Description<T>,
441    part: &HollowBatchPart<T>,
442) -> Result<EncodedPart<T>, BlobKey>
443where
444    T: Timestamp + Lattice + Codec64,
445{
446    let buf =
447        fetch_batch_part_blob(shard_id, blob, metrics, shard_metrics, read_metrics, part).await?;
448    let part = decode_batch_part_blob(
449        cfg,
450        metrics,
451        read_metrics,
452        registered_desc.clone(),
453        part,
454        &buf,
455    );
456    Ok(part)
457}
458
459/// This represents the lease of a seqno. It's generally paired with some external state,
460/// like a hollow part: holding this lease indicates that we may still want to fetch that part,
461/// and should hold back GC to keep it around.
462///
463/// Generally the state and lease are bundled together, as in [LeasedBatchPart]... but sometimes
464/// it's necessary to handle them separately, so this struct is exposed as well. Handle with care.
465#[derive(Clone, Debug)]
466pub struct Lease(Arc<SeqNo>);
467
468impl Lease {
469    /// Creates a new [Lease] that holds the given [SeqNo].
470    pub fn new(seqno: SeqNo) -> Self {
471        Self(Arc::new(seqno))
472    }
473
474    /// Returns the inner [SeqNo] of this [Lease].
475    #[cfg(test)]
476    pub fn seqno(&self) -> SeqNo {
477        *self.0
478    }
479
480    /// Returns the number of live copies of this lease, including this one.
481    pub fn count(&self) -> usize {
482        Arc::strong_count(&self.0)
483    }
484}
485
486/// A token representing one fetch-able batch part.
487///
488/// It is tradeable via `crate::fetch::fetch_batch` for the resulting data
489/// stored in the part.
490///
491/// # Exchange
492///
493/// You can exchange `LeasedBatchPart`:
494/// - If `leased_seqno.is_none()`
495/// - By converting it to [`ExchangeableBatchPart`] through
496///   `Self::into_exchangeable_part`. [`ExchangeableBatchPart`] is exchangeable,
497///   including over the network.
498///
499/// n.b. `Self::into_exchangeable_part` is known to be equivalent to
500/// `SerdeLeasedBatchPart::from(self)`, but we want the additional warning message to
501/// be visible and sufficiently scary.
502///
503/// # Panics
504/// `LeasedBatchPart` panics when dropped unless a very strict set of invariants are
505/// held:
506///
507/// `LeasedBatchPart` may only be dropped if it:
508/// - Does not have a leased `SeqNo (i.e. `self.leased_seqno.is_none()`)
509///
510/// In any other circumstance, dropping `LeasedBatchPart` panics.
511#[derive(Debug)]
512pub struct LeasedBatchPart<T> {
513    pub(crate) metrics: Arc<Metrics>,
514    pub(crate) shard_id: ShardId,
515    pub(crate) filter: FetchBatchFilter<T>,
516    pub(crate) desc: Description<T>,
517    pub(crate) part: BatchPart<T>,
518    /// The lease that prevents this part from being GCed. Code should ensure that this lease
519    /// lives as long as the part is needed.
520    pub(crate) lease: Lease,
521    pub(crate) filter_pushdown_audit: bool,
522}
523
524impl<T> LeasedBatchPart<T>
525where
526    T: Timestamp + Codec64,
527{
528    /// Takes `self` into a [`ExchangeableBatchPart`], which allows `self` to be
529    /// exchanged (potentially across the network).
530    ///
531    /// !!!WARNING!!!
532    ///
533    /// This method also returns the [Lease] associated with the given part, since
534    /// that can't travel across process boundaries. The caller is responsible for
535    /// ensuring that the lease is held for as long as the batch part may be in use:
536    /// dropping it too early may cause a fetch to fail.
537    pub(crate) fn into_exchangeable_part(self) -> (ExchangeableBatchPart<T>, Lease) {
538        // If `x` has a lease, we've effectively transferred it to `r`.
539        let lease = self.lease.clone();
540        let part = ExchangeableBatchPart {
541            shard_id: self.shard_id,
542            encoded_size_bytes: self.part.encoded_size_bytes(),
543            desc: self.desc.clone(),
544            filter: self.filter.clone(),
545            part: LazyProto::from(&self.part.into_proto()),
546            filter_pushdown_audit: self.filter_pushdown_audit,
547        };
548        (part, lease)
549    }
550
551    /// The encoded size of this part in bytes
552    pub fn encoded_size_bytes(&self) -> usize {
553        self.part.encoded_size_bytes()
554    }
555
556    /// The filter has indicated we don't need this part, we can verify the
557    /// ongoing end-to-end correctness of corner cases via "audit". This means
558    /// we fetch the part like normal and if the MFP keeps anything from it,
559    /// then something has gone horribly wrong.
560    pub fn request_filter_pushdown_audit(&mut self) {
561        self.filter_pushdown_audit = true;
562    }
563
564    /// Returns the pushdown stats for this part.
565    pub fn stats(&self) -> Option<PartStats> {
566        self.part.stats().map(|x| x.decode())
567    }
568
569    /// Apply any relevant projection pushdown optimizations, assuming that the data in the part
570    /// is equivalent to the provided key and value.
571    pub fn maybe_optimize(&mut self, cfg: &ConfigSet, key: ArrayRef, val: ArrayRef) {
572        assert_eq!(key.len(), 1, "expect a single-row key array");
573        assert_eq!(val.len(), 1, "expect a single-row val array");
574        let as_of = match &self.filter {
575            FetchBatchFilter::Snapshot { as_of } => as_of,
576            FetchBatchFilter::Listen { .. } | FetchBatchFilter::Compaction { .. } => return,
577        };
578        if !OPTIMIZE_IGNORED_DATA_FETCH.get(cfg) {
579            return;
580        }
581        let (diffs_sum, _stats) = match &self.part {
582            BatchPart::Hollow(x) => (x.diffs_sum, x.stats.as_ref()),
583            BatchPart::Inline { .. } => return,
584        };
585        debug!(
586            "try_optimize_ignored_data_fetch diffs_sum={:?} as_of={:?} lower={:?} upper={:?}",
587            // This is only used for debugging, so hack to assume that D is i64.
588            diffs_sum.map(i64::decode),
589            as_of.elements(),
590            self.desc.lower().elements(),
591            self.desc.upper().elements()
592        );
593        let as_of = match &as_of.elements() {
594            &[as_of] => as_of,
595            _ => return,
596        };
597        let eligible = self.desc.upper().less_equal(as_of) && self.desc.since().less_equal(as_of);
598        if !eligible {
599            return;
600        }
601        let Some(diffs_sum) = diffs_sum else {
602            return;
603        };
604
605        debug!(
606            "try_optimize_ignored_data_fetch faked {:?} diffs at ts {:?} skipping fetch of {} bytes",
607            // This is only used for debugging, so hack to assume that D is i64.
608            i64::decode(diffs_sum),
609            as_of,
610            self.part.encoded_size_bytes(),
611        );
612        self.metrics.pushdown.parts_faked_count.inc();
613        self.metrics
614            .pushdown
615            .parts_faked_bytes
616            .inc_by(u64::cast_from(self.part.encoded_size_bytes()));
617        let timestamps = {
618            let mut col = Codec64Mut::with_capacity(1);
619            col.push(as_of);
620            col.finish()
621        };
622        let diffs = {
623            let mut col = Codec64Mut::with_capacity(1);
624            col.push_raw(diffs_sum);
625            col.finish()
626        };
627        let updates = BlobTraceUpdates::Structured {
628            key_values: ColumnarRecordsStructuredExt { key, val },
629            timestamps,
630            diffs,
631        };
632        let faked_data = LazyInlineBatchPart::from(&ProtoInlineBatchPart {
633            desc: Some(self.desc.into_proto()),
634            index: 0,
635            updates: Some(updates.into_proto()),
636        });
637        self.part = BatchPart::Inline {
638            updates: faked_data,
639            ts_rewrite: None,
640            schema_id: None,
641            deprecated_schema_id: None,
642        };
643    }
644}
645
646impl<T> Drop for LeasedBatchPart<T> {
647    /// For details, see [`LeasedBatchPart`].
648    fn drop(&mut self) {
649        self.metrics.lease.dropped_part.inc()
650    }
651}
652
653/// A [Blob] object that has been fetched, but not at all decoded.
654///
655/// In contrast to [FetchedPart], this representation hasn't yet done parquet
656/// decoding.
657#[derive(Debug)]
658pub struct FetchedBlob<K: Codec, V: Codec, T, D> {
659    metrics: Arc<Metrics>,
660    read_metrics: ReadMetrics,
661    buf: FetchedBlobBuf<T>,
662    registered_desc: Description<T>,
663    migration: PartMigration<K, V>,
664    filter: FetchBatchFilter<T>,
665    filter_pushdown_audit: bool,
666    structured_part_audit: PartDecodeFormat,
667    fetch_permit: Option<Arc<MetricsPermits>>,
668    fetch_config: FetchConfig,
669    _phantom: PhantomData<fn() -> D>,
670}
671
672#[derive(Debug, Clone)]
673enum FetchedBlobBuf<T> {
674    Hollow {
675        buf: SegmentedBytes,
676        part: HollowBatchPart<T>,
677    },
678    Inline {
679        desc: Description<T>,
680        updates: LazyInlineBatchPart,
681        ts_rewrite: Option<Antichain<T>>,
682    },
683}
684
685impl<K: Codec, V: Codec, T: Clone, D> Clone for FetchedBlob<K, V, T, D> {
686    fn clone(&self) -> Self {
687        Self {
688            metrics: Arc::clone(&self.metrics),
689            read_metrics: self.read_metrics.clone(),
690            buf: self.buf.clone(),
691            registered_desc: self.registered_desc.clone(),
692            migration: self.migration.clone(),
693            filter: self.filter.clone(),
694            filter_pushdown_audit: self.filter_pushdown_audit.clone(),
695            fetch_permit: self.fetch_permit.clone(),
696            structured_part_audit: self.structured_part_audit.clone(),
697            fetch_config: self.fetch_config.clone(),
698            _phantom: self._phantom.clone(),
699        }
700    }
701}
702
703/// [FetchedPart] but with an accompanying permit from the fetch mem/disk
704/// semaphore.
705pub struct ShardSourcePart<K: Codec, V: Codec, T, D> {
706    /// The underlying [FetchedPart].
707    pub part: FetchedPart<K, V, T, D>,
708    fetch_permit: Option<Arc<MetricsPermits>>,
709}
710
711impl<K, V, T: Debug, D: Debug> Debug for ShardSourcePart<K, V, T, D>
712where
713    K: Codec + Debug,
714    <K as Codec>::Storage: Debug,
715    V: Codec + Debug,
716    <V as Codec>::Storage: Debug,
717{
718    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
719        let ShardSourcePart { part, fetch_permit } = self;
720        f.debug_struct("ShardSourcePart")
721            .field("part", part)
722            .field("fetch_permit", fetch_permit)
723            .finish()
724    }
725}
726
727impl<K: Codec, V: Codec, T: Timestamp + Lattice + Codec64, D> FetchedBlob<K, V, T, D> {
728    /// Partially decodes this blob into a [FetchedPart].
729    pub fn parse(&self) -> ShardSourcePart<K, V, T, D> {
730        self.parse_internal(&self.fetch_config)
731    }
732
733    /// Partially decodes this blob into a [FetchedPart].
734    pub(crate) fn parse_internal(&self, cfg: &FetchConfig) -> ShardSourcePart<K, V, T, D> {
735        let (part, stats) = match &self.buf {
736            FetchedBlobBuf::Hollow { buf, part } => {
737                let parsed = decode_batch_part_blob(
738                    cfg,
739                    &self.metrics,
740                    &self.read_metrics,
741                    self.registered_desc.clone(),
742                    part,
743                    buf,
744                );
745                (parsed, part.stats.as_ref())
746            }
747            FetchedBlobBuf::Inline {
748                desc,
749                updates,
750                ts_rewrite,
751            } => {
752                let parsed = EncodedPart::from_inline(
753                    cfg,
754                    &self.metrics,
755                    self.read_metrics.clone(),
756                    desc.clone(),
757                    updates,
758                    ts_rewrite.as_ref(),
759                );
760                (parsed, None)
761            }
762        };
763        let part = FetchedPart::new(
764            Arc::clone(&self.metrics),
765            part,
766            self.migration.clone(),
767            self.filter.clone(),
768            self.filter_pushdown_audit,
769            self.structured_part_audit,
770            stats,
771        );
772        ShardSourcePart {
773            part,
774            fetch_permit: self.fetch_permit.clone(),
775        }
776    }
777
778    /// Decodes and returns the pushdown stats for this part, if known.
779    pub fn stats(&self) -> Option<PartStats> {
780        match &self.buf {
781            FetchedBlobBuf::Hollow { part, .. } => part.stats.as_ref().map(|x| x.decode()),
782            FetchedBlobBuf::Inline { .. } => None,
783        }
784    }
785}
786
787/// A [Blob] object that has been fetched, but not yet fully decoded.
788///
789/// In contrast to [FetchedBlob], this representation has already done parquet
790/// decoding.
791#[derive(Debug)]
792pub struct FetchedPart<K: Codec, V: Codec, T, D> {
793    metrics: Arc<Metrics>,
794    ts_filter: FetchBatchFilter<T>,
795    // If migration is Either, then the columnar one will have already been
796    // applied here on the structured data only.
797    part: EitherOrBoth<
798        ColumnarRecords,
799        (
800            <K::Schema as Schema<K>>::Decoder,
801            <V::Schema as Schema<V>>::Decoder,
802        ),
803    >,
804    timestamps: Int64Array,
805    diffs: Int64Array,
806    migration: PartMigration<K, V>,
807    filter_pushdown_audit: Option<LazyPartStats>,
808    peek_stash: Option<((Result<K, String>, Result<V, String>), T, D)>,
809    part_cursor: usize,
810    key_storage: Option<K::Storage>,
811    val_storage: Option<V::Storage>,
812
813    _phantom: PhantomData<fn() -> D>,
814}
815
816impl<K: Codec, V: Codec, T: Timestamp + Lattice + Codec64, D> FetchedPart<K, V, T, D> {
817    pub(crate) fn new(
818        metrics: Arc<Metrics>,
819        part: EncodedPart<T>,
820        migration: PartMigration<K, V>,
821        ts_filter: FetchBatchFilter<T>,
822        filter_pushdown_audit: bool,
823        part_decode_format: PartDecodeFormat,
824        stats: Option<&LazyPartStats>,
825    ) -> Self {
826        let part_len = u64::cast_from(part.part.updates.len());
827        match &migration {
828            PartMigration::SameSchema { .. } => metrics.schema.migration_count_same.inc(),
829            PartMigration::Schemaless { .. } => {
830                metrics.schema.migration_count_codec.inc();
831                metrics.schema.migration_len_legacy_codec.inc_by(part_len);
832            }
833            PartMigration::Either { .. } => {
834                metrics.schema.migration_count_either.inc();
835                match part_decode_format {
836                    PartDecodeFormat::Row {
837                        validate_structured: false,
838                    } => metrics.schema.migration_len_either_codec.inc_by(part_len),
839                    PartDecodeFormat::Row {
840                        validate_structured: true,
841                    } => {
842                        metrics.schema.migration_len_either_codec.inc_by(part_len);
843                        metrics.schema.migration_len_either_arrow.inc_by(part_len);
844                    }
845                    PartDecodeFormat::Arrow => {
846                        metrics.schema.migration_len_either_arrow.inc_by(part_len)
847                    }
848                }
849            }
850        }
851
852        let filter_pushdown_audit = if filter_pushdown_audit {
853            stats.cloned()
854        } else {
855            None
856        };
857
858        let downcast_structured = |structured: ColumnarRecordsStructuredExt,
859                                   structured_only: bool| {
860            let key_size_before = ArrayOrd::new(&structured.key).goodbytes();
861
862            let structured = match &migration {
863                PartMigration::SameSchema { .. } => structured,
864                PartMigration::Schemaless { read } if structured_only => {
865                    // We don't know the source schema, but we do know the source datatype; migrate it directly.
866                    let start = Instant::now();
867                    let read_key = data_type::<K>(&*read.key).ok()?;
868                    let read_val = data_type::<V>(&*read.val).ok()?;
869                    let key_migration = backward_compatible(structured.key.data_type(), &read_key)?;
870                    let val_migration = backward_compatible(structured.val.data_type(), &read_val)?;
871                    let key = key_migration.migrate(structured.key);
872                    let val = val_migration.migrate(structured.val);
873                    metrics
874                        .schema
875                        .migration_migrate_seconds
876                        .inc_by(start.elapsed().as_secs_f64());
877                    ColumnarRecordsStructuredExt { key, val }
878                }
879                PartMigration::Schemaless { .. } => return None,
880                PartMigration::Either {
881                    write: _,
882                    read: _,
883                    key_migration,
884                    val_migration,
885                } => {
886                    let start = Instant::now();
887                    let key = key_migration.migrate(structured.key);
888                    let val = val_migration.migrate(structured.val);
889                    metrics
890                        .schema
891                        .migration_migrate_seconds
892                        .inc_by(start.elapsed().as_secs_f64());
893                    ColumnarRecordsStructuredExt { key, val }
894                }
895            };
896
897            let read_schema = migration.codec_read();
898            let key = K::Schema::decoder_any(&*read_schema.key, &*structured.key);
899            let val = V::Schema::decoder_any(&*read_schema.val, &*structured.val);
900
901            match &key {
902                Ok(key_decoder) => {
903                    let key_size_after = key_decoder.goodbytes();
904                    let key_diff = key_size_before.saturating_sub(key_size_after);
905                    metrics
906                        .pushdown
907                        .parts_projection_trimmed_bytes
908                        .inc_by(u64::cast_from(key_diff));
909                }
910                Err(e) => {
911                    soft_panic_or_log!("failed to create decoder: {e:#?}");
912                }
913            }
914
915            Some((key.ok()?, val.ok()?))
916        };
917
918        let updates = part.normalize(&metrics.columnar);
919        let timestamps = updates.timestamps().clone();
920        let diffs = updates.diffs().clone();
921        let part = match updates {
922            // If only one encoding is available, decode via that encoding.
923            BlobTraceUpdates::Row(records) => EitherOrBoth::Left(records),
924            BlobTraceUpdates::Structured { key_values, .. } => EitherOrBoth::Right(
925                // The structured-only data format was added after schema ids were recorded everywhere,
926                // so we expect this data to be present.
927                downcast_structured(key_values, true).expect("valid schemas for structured data"),
928            ),
929            // If both are available, respect the specified part decode format.
930            BlobTraceUpdates::Both(records, ext) => match part_decode_format {
931                PartDecodeFormat::Row {
932                    validate_structured: false,
933                } => EitherOrBoth::Left(records),
934                PartDecodeFormat::Row {
935                    validate_structured: true,
936                } => match downcast_structured(ext, false) {
937                    Some(decoders) => EitherOrBoth::Both(records, decoders),
938                    None => EitherOrBoth::Left(records),
939                },
940                PartDecodeFormat::Arrow => match downcast_structured(ext, false) {
941                    Some(decoders) => EitherOrBoth::Right(decoders),
942                    None => EitherOrBoth::Left(records),
943                },
944            },
945        };
946
947        FetchedPart {
948            metrics,
949            ts_filter,
950            part,
951            peek_stash: None,
952            timestamps,
953            diffs,
954            migration,
955            filter_pushdown_audit,
956            part_cursor: 0,
957            key_storage: None,
958            val_storage: None,
959            _phantom: PhantomData,
960        }
961    }
962
963    /// Returns Some if this part was only fetched as part of a filter pushdown
964    /// audit. See [LeasedBatchPart::request_filter_pushdown_audit].
965    ///
966    /// If set, the value in the Option is for debugging and should be included
967    /// in any error messages.
968    pub fn is_filter_pushdown_audit(&self) -> Option<impl std::fmt::Debug + use<K, V, T, D>> {
969        self.filter_pushdown_audit.clone()
970    }
971}
972
973/// A [Blob] object that has been fetched, but has no associated decoding
974/// logic.
975#[derive(Debug)]
976pub(crate) struct EncodedPart<T> {
977    metrics: ReadMetrics,
978    registered_desc: Description<T>,
979    part: BlobTraceBatchPart<T>,
980    needs_truncation: bool,
981    ts_rewrite: Option<Antichain<T>>,
982}
983
984impl<K, V, T, D> FetchedPart<K, V, T, D>
985where
986    K: Debug + Codec,
987    V: Debug + Codec,
988    T: Timestamp + Lattice + Codec64,
989    D: Semigroup + Codec64 + Send + Sync,
990{
991    /// [Self::next] but optionally providing a `K` and `V` for alloc reuse.
992    ///
993    /// When `result_override` is specified, return it instead of decoding data.
994    /// This is used when we know the decoded result will be ignored.
995    pub fn next_with_storage(
996        &mut self,
997        key: &mut Option<K>,
998        val: &mut Option<V>,
999    ) -> Option<((Result<K, String>, Result<V, String>), T, D)> {
1000        let mut consolidated = self.peek_stash.take();
1001        loop {
1002            // Fetch and decode the next tuple in the sequence. (Or break if there is none.)
1003            let next = if self.part_cursor < self.timestamps.len() {
1004                let next_idx = self.part_cursor;
1005                self.part_cursor += 1;
1006                // These `to_le_bytes` calls were previously encapsulated by `ColumnarRecords`.
1007                // TODO(structured): re-encapsulate these once we've finished the structured migration.
1008                let mut t = T::decode(self.timestamps.values()[next_idx].to_le_bytes());
1009                if !self.ts_filter.filter_ts(&mut t) {
1010                    continue;
1011                }
1012                let d = D::decode(self.diffs.values()[next_idx].to_le_bytes());
1013                if d.is_zero() {
1014                    continue;
1015                }
1016                let kv = self.decode_kv(next_idx, key, val);
1017                (kv, t, d)
1018            } else {
1019                break;
1020            };
1021
1022            // Attempt to consolidate in the next tuple, stashing it if that's not possible.
1023            if let Some((kv, t, d)) = &mut consolidated {
1024                let (kv_next, t_next, d_next) = &next;
1025                if kv == kv_next && t == t_next {
1026                    d.plus_equals(d_next);
1027                    if d.is_zero() {
1028                        consolidated = None;
1029                    }
1030                } else {
1031                    self.peek_stash = Some(next);
1032                    break;
1033                }
1034            } else {
1035                consolidated = Some(next);
1036            }
1037        }
1038
1039        let (kv, t, d) = consolidated?;
1040
1041        Some((kv, t, d))
1042    }
1043
1044    fn decode_kv(
1045        &mut self,
1046        index: usize,
1047        key: &mut Option<K>,
1048        val: &mut Option<V>,
1049    ) -> (Result<K, String>, Result<V, String>) {
1050        let decoded = self
1051            .part
1052            .as_ref()
1053            .map_left(|codec| {
1054                let ((ck, cv), _, _) = codec.get(index).expect("valid index");
1055                Self::decode_codec(
1056                    &*self.metrics,
1057                    self.migration.codec_read(),
1058                    ck,
1059                    cv,
1060                    key,
1061                    val,
1062                    &mut self.key_storage,
1063                    &mut self.val_storage,
1064                )
1065            })
1066            .map_right(|(structured_key, structured_val)| {
1067                self.decode_structured(index, structured_key, structured_val, key, val)
1068            });
1069
1070        match decoded {
1071            EitherOrBoth::Both((k, v), (k_s, v_s)) => {
1072                // Purposefully do not trace to prevent blowing up Sentry.
1073                let is_valid = self
1074                    .metrics
1075                    .columnar
1076                    .arrow()
1077                    .key()
1078                    .report_valid(|| k_s == k);
1079                if !is_valid {
1080                    soft_panic_no_log!("structured key did not match, {k_s:?} != {k:?}");
1081                }
1082                // Purposefully do not trace to prevent blowing up Sentry.
1083                let is_valid = self
1084                    .metrics
1085                    .columnar
1086                    .arrow()
1087                    .val()
1088                    .report_valid(|| v_s == v);
1089                if !is_valid {
1090                    soft_panic_no_log!("structured val did not match, {v_s:?} != {v:?}");
1091                }
1092
1093                (k, v)
1094            }
1095            EitherOrBoth::Left(kv) => kv,
1096            EitherOrBoth::Right(kv) => kv,
1097        }
1098    }
1099
1100    fn decode_codec(
1101        metrics: &Metrics,
1102        read_schemas: &Schemas<K, V>,
1103        key_buf: &[u8],
1104        val_buf: &[u8],
1105        key: &mut Option<K>,
1106        val: &mut Option<V>,
1107        key_storage: &mut Option<K::Storage>,
1108        val_storage: &mut Option<V::Storage>,
1109    ) -> (Result<K, String>, Result<V, String>) {
1110        let k = metrics.codecs.key.decode(|| match key.take() {
1111            Some(mut key) => {
1112                match K::decode_from(&mut key, key_buf, key_storage, &read_schemas.key) {
1113                    Ok(()) => Ok(key),
1114                    Err(err) => Err(err),
1115                }
1116            }
1117            None => K::decode(key_buf, &read_schemas.key),
1118        });
1119        let v = metrics.codecs.val.decode(|| match val.take() {
1120            Some(mut val) => {
1121                match V::decode_from(&mut val, val_buf, val_storage, &read_schemas.val) {
1122                    Ok(()) => Ok(val),
1123                    Err(err) => Err(err),
1124                }
1125            }
1126            None => V::decode(val_buf, &read_schemas.val),
1127        });
1128        (k, v)
1129    }
1130
1131    fn decode_structured(
1132        &self,
1133        idx: usize,
1134        keys: &<K::Schema as Schema<K>>::Decoder,
1135        vals: &<V::Schema as Schema<V>>::Decoder,
1136        key: &mut Option<K>,
1137        val: &mut Option<V>,
1138    ) -> (Result<K, String>, Result<V, String>) {
1139        let mut key = key.take().unwrap_or_default();
1140        keys.decode(idx, &mut key);
1141
1142        let mut val = val.take().unwrap_or_default();
1143        vals.decode(idx, &mut val);
1144
1145        (Ok(key), Ok(val))
1146    }
1147}
1148
1149impl<K, V, T, D> Iterator for FetchedPart<K, V, T, D>
1150where
1151    K: Debug + Codec,
1152    V: Debug + Codec,
1153    T: Timestamp + Lattice + Codec64,
1154    D: Semigroup + Codec64 + Send + Sync,
1155{
1156    type Item = ((Result<K, String>, Result<V, String>), T, D);
1157
1158    fn next(&mut self) -> Option<Self::Item> {
1159        self.next_with_storage(&mut None, &mut None)
1160    }
1161
1162    fn size_hint(&self) -> (usize, Option<usize>) {
1163        // We don't know in advance how restrictive the filter will be.
1164        let max_len = self.timestamps.len();
1165        (0, Some(max_len))
1166    }
1167}
1168
1169impl<T> EncodedPart<T>
1170where
1171    T: Timestamp + Lattice + Codec64,
1172{
1173    pub async fn fetch(
1174        cfg: &FetchConfig,
1175        shard_id: &ShardId,
1176        blob: &dyn Blob,
1177        metrics: &Metrics,
1178        shard_metrics: &ShardMetrics,
1179        read_metrics: &ReadMetrics,
1180        registered_desc: &Description<T>,
1181        part: &BatchPart<T>,
1182    ) -> Result<Self, BlobKey> {
1183        match part {
1184            BatchPart::Hollow(x) => {
1185                fetch_batch_part(
1186                    cfg,
1187                    shard_id,
1188                    blob,
1189                    metrics,
1190                    shard_metrics,
1191                    read_metrics,
1192                    registered_desc,
1193                    x,
1194                )
1195                .await
1196            }
1197            BatchPart::Inline {
1198                updates,
1199                ts_rewrite,
1200                ..
1201            } => Ok(EncodedPart::from_inline(
1202                cfg,
1203                metrics,
1204                read_metrics.clone(),
1205                registered_desc.clone(),
1206                updates,
1207                ts_rewrite.as_ref(),
1208            )),
1209        }
1210    }
1211
1212    pub(crate) fn from_inline(
1213        cfg: &FetchConfig,
1214        metrics: &Metrics,
1215        read_metrics: ReadMetrics,
1216        desc: Description<T>,
1217        x: &LazyInlineBatchPart,
1218        ts_rewrite: Option<&Antichain<T>>,
1219    ) -> Self {
1220        let parsed = x.decode(&metrics.columnar).expect("valid inline part");
1221        Self::new(cfg, read_metrics, desc, "inline", ts_rewrite, parsed)
1222    }
1223
1224    pub(crate) fn from_hollow(
1225        cfg: &FetchConfig,
1226        metrics: ReadMetrics,
1227        registered_desc: Description<T>,
1228        part: &HollowBatchPart<T>,
1229        parsed: BlobTraceBatchPart<T>,
1230    ) -> Self {
1231        Self::new(
1232            cfg,
1233            metrics,
1234            registered_desc,
1235            &part.key.0,
1236            part.ts_rewrite.as_ref(),
1237            parsed,
1238        )
1239    }
1240
1241    pub(crate) fn new(
1242        cfg: &FetchConfig,
1243        metrics: ReadMetrics,
1244        registered_desc: Description<T>,
1245        printable_name: &str,
1246        ts_rewrite: Option<&Antichain<T>>,
1247        parsed: BlobTraceBatchPart<T>,
1248    ) -> Self {
1249        // There are two types of batches in persist:
1250        // - Batches written by a persist user (either directly or indirectly
1251        //   via BatchBuilder). These always have a since of the minimum
1252        //   timestamp and may be registered in persist state with a tighter set
1253        //   of bounds than are inline in the batch (truncation). To read one of
1254        //   these batches, all data physically in the batch but outside of the
1255        //   truncated bounds must be ignored. Not every user batch is
1256        //   truncated.
1257        // - Batches written by compaction. These always have an inline desc
1258        //   that exactly matches the one they are registered with. The since
1259        //   can be anything.
1260        let inline_desc = &parsed.desc;
1261        let needs_truncation = inline_desc.lower() != registered_desc.lower()
1262            || inline_desc.upper() != registered_desc.upper();
1263        if needs_truncation {
1264            if cfg.validate_bounds_on_read {
1265                assert!(
1266                    PartialOrder::less_equal(inline_desc.lower(), registered_desc.lower()),
1267                    "key={} inline={:?} registered={:?}",
1268                    printable_name,
1269                    inline_desc,
1270                    registered_desc
1271                );
1272
1273                if ts_rewrite.is_none() {
1274                    // The ts rewrite feature allows us to advance the registered
1275                    // upper of a batch that's already been staged (the inline
1276                    // upper), so if it's been used, then there's no useful
1277                    // invariant that we can assert here.
1278                    assert!(
1279                        PartialOrder::less_equal(registered_desc.upper(), inline_desc.upper()),
1280                        "key={} inline={:?} registered={:?}",
1281                        printable_name,
1282                        inline_desc,
1283                        registered_desc
1284                    );
1285                }
1286            }
1287            // As mentioned above, batches that needs truncation will always have a
1288            // since of the minimum timestamp. Technically we could truncate any
1289            // batch where the since is less_than the output_desc's lower, but we're
1290            // strict here so we don't get any surprises.
1291            assert_eq!(
1292                inline_desc.since(),
1293                &Antichain::from_elem(T::minimum()),
1294                "key={} inline={:?} registered={:?}",
1295                printable_name,
1296                inline_desc,
1297                registered_desc
1298            );
1299        } else {
1300            assert_eq!(
1301                inline_desc, &registered_desc,
1302                "key={} inline={:?} registered={:?}",
1303                printable_name, inline_desc, registered_desc
1304            );
1305        }
1306
1307        EncodedPart {
1308            metrics,
1309            registered_desc,
1310            part: parsed,
1311            needs_truncation,
1312            ts_rewrite: ts_rewrite.cloned(),
1313        }
1314    }
1315
1316    pub(crate) fn maybe_unconsolidated(&self) -> bool {
1317        // At time of writing, only user parts may be unconsolidated, and they are always
1318        // written with a since of [T::minimum()].
1319        self.part.desc.since().borrow() == AntichainRef::new(&[T::minimum()])
1320    }
1321
1322    pub(crate) fn updates(&self) -> &BlobTraceUpdates {
1323        &self.part.updates
1324    }
1325
1326    /// Returns the updates with all truncation / timestamp rewriting applied.
1327    pub(crate) fn normalize(&self, metrics: &ColumnarMetrics) -> BlobTraceUpdates {
1328        let updates = self.part.updates.clone();
1329        if !self.needs_truncation && self.ts_rewrite.is_none() {
1330            return updates;
1331        }
1332
1333        let mut codec = updates
1334            .records()
1335            .map(|r| (r.keys().clone(), r.vals().clone()));
1336        let mut structured = updates.structured().cloned();
1337        let mut timestamps = updates.timestamps().clone();
1338        let mut diffs = updates.diffs().clone();
1339
1340        if let Some(rewrite) = self.ts_rewrite.as_ref() {
1341            timestamps = arrow::compute::unary(&timestamps, |i: i64| {
1342                let mut t = T::decode(i.to_le_bytes());
1343                t.advance_by(rewrite.borrow());
1344                i64::from_le_bytes(T::encode(&t))
1345            });
1346        }
1347
1348        let reallocated = if self.needs_truncation {
1349            let filter = BooleanArray::from_unary(&timestamps, |i| {
1350                let t = T::decode(i.to_le_bytes());
1351                let truncate_t = {
1352                    !self.registered_desc.lower().less_equal(&t)
1353                        || self.registered_desc.upper().less_equal(&t)
1354                };
1355                !truncate_t
1356            });
1357            if filter.false_count() == 0 {
1358                // If we're not filtering anything in practice, skip filtering and reallocating.
1359                false
1360            } else {
1361                let filter = FilterBuilder::new(&filter).optimize().build();
1362                let do_filter = |array: &dyn Array| filter.filter(array).expect("valid filter len");
1363                if let Some((keys, vals)) = codec {
1364                    codec = Some((
1365                        realloc_array(do_filter(&keys).as_binary(), metrics),
1366                        realloc_array(do_filter(&vals).as_binary(), metrics),
1367                    ));
1368                }
1369                if let Some(ext) = structured {
1370                    structured = Some(ColumnarRecordsStructuredExt {
1371                        key: realloc_any(do_filter(&*ext.key), metrics),
1372                        val: realloc_any(do_filter(&*ext.val), metrics),
1373                    });
1374                }
1375                timestamps = realloc_array(do_filter(&timestamps).as_primitive(), metrics);
1376                diffs = realloc_array(do_filter(&diffs).as_primitive(), metrics);
1377                true
1378            }
1379        } else {
1380            false
1381        };
1382
1383        if self.ts_rewrite.is_some() && !reallocated {
1384            timestamps = realloc_array(&timestamps, metrics);
1385        }
1386
1387        if self.ts_rewrite.is_some() {
1388            self.metrics
1389                .ts_rewrite
1390                .inc_by(u64::cast_from(timestamps.len()));
1391        }
1392
1393        match (codec, structured) {
1394            (Some((key, value)), None) => {
1395                BlobTraceUpdates::Row(ColumnarRecords::new(key, value, timestamps, diffs))
1396            }
1397            (Some((key, value)), Some(ext)) => {
1398                BlobTraceUpdates::Both(ColumnarRecords::new(key, value, timestamps, diffs), ext)
1399            }
1400            (None, Some(ext)) => BlobTraceUpdates::Structured {
1401                key_values: ext,
1402                timestamps,
1403                diffs,
1404            },
1405            (None, None) => unreachable!(),
1406        }
1407    }
1408}
1409
1410/// This represents the serde encoding for [`LeasedBatchPart`]. We expose the struct
1411/// itself (unlike other encodable structs) to attempt to provide stricter drop
1412/// semantics on `LeasedBatchPart`, i.e. `SerdeLeasedBatchPart` is exchangeable
1413/// (including over the network), where `LeasedBatchPart` is not.
1414///
1415/// For more details see documentation and comments on:
1416/// - [`LeasedBatchPart`]
1417/// - `From<SerdeLeasedBatchPart>` for `LeasedBatchPart<T>`
1418#[derive(Debug, Serialize, Deserialize, Clone)]
1419pub struct ExchangeableBatchPart<T> {
1420    shard_id: ShardId,
1421    // Duplicated with the one serialized in the proto for use in backpressure.
1422    encoded_size_bytes: usize,
1423    desc: Description<T>,
1424    filter: FetchBatchFilter<T>,
1425    part: LazyProto<ProtoHollowBatchPart>,
1426    filter_pushdown_audit: bool,
1427}
1428
1429impl<T> ExchangeableBatchPart<T> {
1430    /// Returns the encoded size of the given part.
1431    pub fn encoded_size_bytes(&self) -> usize {
1432        self.encoded_size_bytes
1433    }
1434}
1435
1436/// Format we'll use when decoding a [`Part`].
1437///
1438/// [`Part`]: mz_persist_types::part::Part
1439#[derive(Debug, Copy, Clone)]
1440pub enum PartDecodeFormat {
1441    /// Decode from opaque `Codec` data.
1442    Row {
1443        /// Will also decode the structured data, and validate it matches.
1444        validate_structured: bool,
1445    },
1446    /// Decode from arrow data
1447    Arrow,
1448}
1449
1450impl PartDecodeFormat {
1451    /// Returns a default value for [`PartDecodeFormat`].
1452    pub const fn default() -> Self {
1453        PartDecodeFormat::Arrow
1454    }
1455
1456    /// Parses a [`PartDecodeFormat`] from the provided string, falling back to the default if the
1457    /// provided value is unrecognized.
1458    pub fn from_str(s: &str) -> Self {
1459        match s {
1460            "row" => PartDecodeFormat::Row {
1461                validate_structured: false,
1462            },
1463            "row_with_validate" => PartDecodeFormat::Row {
1464                validate_structured: true,
1465            },
1466            "arrow" => PartDecodeFormat::Arrow,
1467            x => {
1468                let default = PartDecodeFormat::default();
1469                soft_panic_or_log!("Invalid part decode format: '{x}', falling back to {default}");
1470                default
1471            }
1472        }
1473    }
1474
1475    /// Returns a string representation of [`PartDecodeFormat`].
1476    pub const fn as_str(&self) -> &'static str {
1477        match self {
1478            PartDecodeFormat::Row {
1479                validate_structured: false,
1480            } => "row",
1481            PartDecodeFormat::Row {
1482                validate_structured: true,
1483            } => "row_with_validate",
1484            PartDecodeFormat::Arrow => "arrow",
1485        }
1486    }
1487}
1488
1489impl fmt::Display for PartDecodeFormat {
1490    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1491        f.write_str(self.as_str())
1492    }
1493}
1494
1495#[mz_ore::test]
1496fn client_exchange_data() {
1497    // The whole point of SerdeLeasedBatchPart is that it can be exchanged
1498    // between timely workers, including over the network. Enforce then that it
1499    // implements ExchangeData.
1500    fn is_exchange_data<T: timely::ExchangeData>() {}
1501    is_exchange_data::<ExchangeableBatchPart<u64>>();
1502    is_exchange_data::<ExchangeableBatchPart<u64>>();
1503}