mz_persist_client/
fetch.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Fetching batches of data from persist's backing store
11
12use std::fmt::{self, Debug};
13use std::marker::PhantomData;
14use std::sync::Arc;
15use std::time::Instant;
16
17use anyhow::anyhow;
18use arrow::array::{Array, ArrayRef, AsArray, BooleanArray, Int64Array};
19use arrow::compute::FilterBuilder;
20use differential_dataflow::difference::Semigroup;
21use differential_dataflow::lattice::Lattice;
22use differential_dataflow::trace::Description;
23use itertools::EitherOrBoth;
24use mz_dyncfg::{Config, ConfigSet, ConfigValHandle};
25use mz_ore::bytes::SegmentedBytes;
26use mz_ore::cast::CastFrom;
27use mz_ore::{soft_panic_no_log, soft_panic_or_log};
28use mz_persist::indexed::columnar::arrow::{realloc_any, realloc_array};
29use mz_persist::indexed::columnar::{ColumnarRecords, ColumnarRecordsStructuredExt};
30use mz_persist::indexed::encoding::{BlobTraceBatchPart, BlobTraceUpdates};
31use mz_persist::location::{Blob, SeqNo};
32use mz_persist::metrics::ColumnarMetrics;
33use mz_persist_types::arrow::ArrayOrd;
34use mz_persist_types::columnar::{ColumnDecoder, Schema, data_type};
35use mz_persist_types::part::Codec64Mut;
36use mz_persist_types::schema::backward_compatible;
37use mz_persist_types::stats::PartStats;
38use mz_persist_types::{Codec, Codec64};
39use mz_proto::{IntoRustIfSome, ProtoType, RustType, TryFromProtoError};
40use serde::{Deserialize, Serialize};
41use timely::PartialOrder;
42use timely::progress::frontier::AntichainRef;
43use timely::progress::{Antichain, Timestamp};
44use tracing::{Instrument, debug, debug_span, trace_span};
45
46use crate::ShardId;
47use crate::batch::{
48    ProtoFetchBatchFilter, ProtoFetchBatchFilterListen, ProtoLease, ProtoLeasedBatchPart,
49    proto_fetch_batch_filter,
50};
51use crate::cfg::PersistConfig;
52use crate::error::InvalidUsage;
53use crate::internal::encoding::{LazyInlineBatchPart, LazyPartStats, LazyProto, Schemas};
54use crate::internal::machine::retry_external;
55use crate::internal::metrics::{Metrics, MetricsPermits, ReadMetrics, ShardMetrics};
56use crate::internal::paths::BlobKey;
57use crate::internal::state::{BatchPart, HollowBatchPart, ProtoInlineBatchPart};
58use crate::read::LeasedReaderId;
59use crate::schema::{PartMigration, SchemaCache};
60
61pub(crate) const FETCH_SEMAPHORE_COST_ADJUSTMENT: Config<f64> = Config::new(
62    "persist_fetch_semaphore_cost_adjustment",
63    // We use `encoded_size_bytes` as the number of permits, but the parsed size
64    // is larger than the encoded one, so adjust it. This default value is from
65    // eyeballing graphs in experiments that were run on tpch loadgen data.
66    1.2,
67    "\
68    An adjustment multiplied by encoded_size_bytes to approximate an upper \
69    bound on the size in lgalloc, which includes the decoded version.",
70);
71
72pub(crate) const FETCH_SEMAPHORE_PERMIT_ADJUSTMENT: Config<f64> = Config::new(
73    "persist_fetch_semaphore_permit_adjustment",
74    1.0,
75    "\
76    A limit on the number of outstanding persist bytes being fetched and \
77    parsed, expressed as a multiplier of the process's memory limit. This data \
78    all spills to lgalloc, so values > 1.0 are safe. Only applied to cc \
79    replicas.",
80);
81
82pub(crate) const PART_DECODE_FORMAT: Config<&'static str> = Config::new(
83    "persist_part_decode_format",
84    PartDecodeFormat::default().as_str(),
85    "\
86    Format we'll use to decode a Persist Part, either 'row', \
87    'row_with_validate', or 'arrow' (Materialize).",
88);
89
90pub(crate) const OPTIMIZE_IGNORED_DATA_FETCH: Config<bool> = Config::new(
91    "persist_optimize_ignored_data_fetch",
92    true,
93    "CYA to allow opt-out of a performance optimization to skip fetching ignored data",
94);
95
96#[derive(Debug, Clone)]
97pub(crate) struct BatchFetcherConfig {
98    pub(crate) part_decode_format: ConfigValHandle<String>,
99}
100
101impl BatchFetcherConfig {
102    pub fn new(value: &PersistConfig) -> Self {
103        BatchFetcherConfig {
104            part_decode_format: PART_DECODE_FORMAT.handle(value),
105        }
106    }
107
108    pub fn part_decode_format(&self) -> PartDecodeFormat {
109        PartDecodeFormat::from_str(self.part_decode_format.get().as_str())
110    }
111}
112
113/// Capable of fetching [`LeasedBatchPart`] while not holding any capabilities.
114#[derive(Debug)]
115pub struct BatchFetcher<K, V, T, D>
116where
117    T: Timestamp + Lattice + Codec64,
118    // These are only here so we can use them in the auto-expiring `Drop` impl.
119    K: Debug + Codec,
120    V: Debug + Codec,
121    D: Semigroup + Codec64 + Send + Sync,
122{
123    pub(crate) cfg: BatchFetcherConfig,
124    pub(crate) blob: Arc<dyn Blob>,
125    pub(crate) metrics: Arc<Metrics>,
126    pub(crate) shard_metrics: Arc<ShardMetrics>,
127    pub(crate) shard_id: ShardId,
128    pub(crate) read_schemas: Schemas<K, V>,
129    pub(crate) schema_cache: SchemaCache<K, V, T, D>,
130    pub(crate) is_transient: bool,
131
132    // Ensures that `BatchFetcher` is of the same type as the `ReadHandle` it's
133    // derived from.
134    pub(crate) _phantom: PhantomData<fn() -> (K, V, T, D)>,
135}
136
137impl<K, V, T, D> BatchFetcher<K, V, T, D>
138where
139    K: Debug + Codec,
140    V: Debug + Codec,
141    T: Timestamp + Lattice + Codec64 + Sync,
142    D: Semigroup + Codec64 + Send + Sync,
143{
144    /// Takes a [`SerdeLeasedBatchPart`] into a [`LeasedBatchPart`].
145    pub fn leased_part_from_exchangeable(&self, x: SerdeLeasedBatchPart) -> LeasedBatchPart<T> {
146        x.decode(Arc::clone(&self.metrics))
147    }
148
149    /// Trade in an exchange-able [LeasedBatchPart] for the data it represents.
150    ///
151    /// Note to check the `LeasedBatchPart` documentation for how to handle the
152    /// returned value.
153    pub async fn fetch_leased_part(
154        &mut self,
155        part: &LeasedBatchPart<T>,
156    ) -> Result<FetchedBlob<K, V, T, D>, InvalidUsage<T>> {
157        if &part.shard_id != &self.shard_id {
158            let batch_shard = part.shard_id.clone();
159            return Err(InvalidUsage::BatchNotFromThisShard {
160                batch_shard,
161                handle_shard: self.shard_id.clone(),
162            });
163        }
164
165        let migration = PartMigration::new(
166            &part.part,
167            self.read_schemas.clone(),
168            &mut self.schema_cache,
169        )
170        .await
171        .unwrap_or_else(|read_schemas| {
172            panic!(
173                "could not decode part {:?} with schema: {:?}",
174                part.part.schema_id(),
175                read_schemas
176            )
177        });
178
179        let (buf, fetch_permit) = match &part.part {
180            BatchPart::Hollow(x) => {
181                let fetch_permit = self
182                    .metrics
183                    .semaphore
184                    .acquire_fetch_permits(x.encoded_size_bytes)
185                    .await;
186                let read_metrics = if self.is_transient {
187                    &self.metrics.read.unindexed
188                } else {
189                    &self.metrics.read.batch_fetcher
190                };
191                let buf = fetch_batch_part_blob(
192                    &part.shard_id,
193                    self.blob.as_ref(),
194                    &self.metrics,
195                    &self.shard_metrics,
196                    read_metrics,
197                    x,
198                )
199                .await
200                .unwrap_or_else(|blob_key| {
201                    // Ideally, readers should never encounter a missing blob. They place a seqno
202                    // hold as they consume their snapshot/listen, preventing any blobs they need
203                    // from being deleted by garbage collection, and all blob implementations are
204                    // linearizable so there should be no possibility of stale reads.
205                    //
206                    // If we do have a bug and a reader does encounter a missing blob, the state
207                    // cannot be recovered, and our best option is to panic and retry the whole
208                    // process.
209                    panic!("batch fetcher could not fetch batch part: {}", blob_key)
210                });
211                let buf = FetchedBlobBuf::Hollow {
212                    buf,
213                    part: x.clone(),
214                };
215                (buf, Some(Arc::new(fetch_permit)))
216            }
217            BatchPart::Inline {
218                updates,
219                ts_rewrite,
220                ..
221            } => {
222                let buf = FetchedBlobBuf::Inline {
223                    desc: part.desc.clone(),
224                    updates: updates.clone(),
225                    ts_rewrite: ts_rewrite.clone(),
226                };
227                (buf, None)
228            }
229        };
230        let fetched_blob = FetchedBlob {
231            metrics: Arc::clone(&self.metrics),
232            read_metrics: self.metrics.read.batch_fetcher.clone(),
233            buf,
234            registered_desc: part.desc.clone(),
235            migration,
236            filter: part.filter.clone(),
237            filter_pushdown_audit: part.filter_pushdown_audit,
238            structured_part_audit: self.cfg.part_decode_format(),
239            fetch_permit,
240            _phantom: PhantomData,
241        };
242        Ok(fetched_blob)
243    }
244}
245
246#[derive(Debug, Clone)]
247pub(crate) enum FetchBatchFilter<T> {
248    Snapshot {
249        as_of: Antichain<T>,
250    },
251    Listen {
252        as_of: Antichain<T>,
253        lower: Antichain<T>,
254    },
255    Compaction {
256        since: Antichain<T>,
257    },
258}
259
260impl<T: Timestamp + Lattice> FetchBatchFilter<T> {
261    pub(crate) fn filter_ts(&self, t: &mut T) -> bool {
262        match self {
263            FetchBatchFilter::Snapshot { as_of } => {
264                // This time is covered by a listen
265                if as_of.less_than(t) {
266                    return false;
267                }
268                t.advance_by(as_of.borrow());
269                true
270            }
271            FetchBatchFilter::Listen { as_of, lower } => {
272                // This time is covered by a snapshot
273                if !as_of.less_than(t) {
274                    return false;
275                }
276
277                // Because of compaction, the next batch we get might also
278                // contain updates we've already emitted. For example, we
279                // emitted `[1, 2)` and then compaction combined that batch with
280                // a `[2, 3)` batch into a new `[1, 3)` batch. If this happens,
281                // we just need to filter out anything < the frontier. This
282                // frontier was the upper of the last batch (and thus exclusive)
283                // so for the == case, we still emit.
284                if !lower.less_equal(t) {
285                    return false;
286                }
287                true
288            }
289            FetchBatchFilter::Compaction { since } => {
290                t.advance_by(since.borrow());
291                true
292            }
293        }
294    }
295}
296
297impl<T: Timestamp + Codec64> RustType<ProtoFetchBatchFilter> for FetchBatchFilter<T> {
298    fn into_proto(&self) -> ProtoFetchBatchFilter {
299        let kind = match self {
300            FetchBatchFilter::Snapshot { as_of } => {
301                proto_fetch_batch_filter::Kind::Snapshot(as_of.into_proto())
302            }
303            FetchBatchFilter::Listen { as_of, lower } => {
304                proto_fetch_batch_filter::Kind::Listen(ProtoFetchBatchFilterListen {
305                    as_of: Some(as_of.into_proto()),
306                    lower: Some(lower.into_proto()),
307                })
308            }
309            FetchBatchFilter::Compaction { .. } => unreachable!("not serialized"),
310        };
311        ProtoFetchBatchFilter { kind: Some(kind) }
312    }
313
314    fn from_proto(proto: ProtoFetchBatchFilter) -> Result<Self, TryFromProtoError> {
315        let kind = proto
316            .kind
317            .ok_or_else(|| TryFromProtoError::missing_field("ProtoFetchBatchFilter::kind"))?;
318        match kind {
319            proto_fetch_batch_filter::Kind::Snapshot(as_of) => Ok(FetchBatchFilter::Snapshot {
320                as_of: as_of.into_rust()?,
321            }),
322            proto_fetch_batch_filter::Kind::Listen(ProtoFetchBatchFilterListen {
323                as_of,
324                lower,
325            }) => Ok(FetchBatchFilter::Listen {
326                as_of: as_of.into_rust_if_some("ProtoFetchBatchFilterListen::as_of")?,
327                lower: lower.into_rust_if_some("ProtoFetchBatchFilterListen::lower")?,
328            }),
329        }
330    }
331}
332
333/// Trade in an exchange-able [LeasedBatchPart] for the data it represents.
334///
335/// Note to check the `LeasedBatchPart` documentation for how to handle the
336/// returned value.
337pub(crate) async fn fetch_leased_part<K, V, T, D>(
338    cfg: &PersistConfig,
339    part: &LeasedBatchPart<T>,
340    blob: &dyn Blob,
341    metrics: Arc<Metrics>,
342    read_metrics: &ReadMetrics,
343    shard_metrics: &ShardMetrics,
344    reader_id: &LeasedReaderId,
345    read_schemas: Schemas<K, V>,
346    schema_cache: &mut SchemaCache<K, V, T, D>,
347) -> FetchedPart<K, V, T, D>
348where
349    K: Debug + Codec,
350    V: Debug + Codec,
351    T: Timestamp + Lattice + Codec64 + Sync,
352    D: Semigroup + Codec64 + Send + Sync,
353{
354    let encoded_part = EncodedPart::fetch(
355        &part.shard_id,
356        blob,
357        &metrics,
358        shard_metrics,
359        read_metrics,
360        &part.desc,
361        &part.part,
362    )
363    .await
364    .unwrap_or_else(|blob_key| {
365        // Ideally, readers should never encounter a missing blob. They place a seqno
366        // hold as they consume their snapshot/listen, preventing any blobs they need
367        // from being deleted by garbage collection, and all blob implementations are
368        // linearizable so there should be no possibility of stale reads.
369        //
370        // If we do have a bug and a reader does encounter a missing blob, the state
371        // cannot be recovered, and our best option is to panic and retry the whole
372        // process.
373        panic!("{} could not fetch batch part: {}", reader_id, blob_key)
374    });
375    let part_cfg = BatchFetcherConfig::new(cfg);
376    let migration = PartMigration::new(&part.part, read_schemas, schema_cache)
377        .await
378        .unwrap_or_else(|read_schemas| {
379            panic!(
380                "could not decode part {:?} with schema: {:?}",
381                part.part.schema_id(),
382                read_schemas
383            )
384        });
385    FetchedPart::new(
386        metrics,
387        encoded_part,
388        migration,
389        part.filter.clone(),
390        part.filter_pushdown_audit,
391        part_cfg.part_decode_format(),
392        part.part.stats(),
393    )
394}
395
396pub(crate) async fn fetch_batch_part_blob<T>(
397    shard_id: &ShardId,
398    blob: &dyn Blob,
399    metrics: &Metrics,
400    shard_metrics: &ShardMetrics,
401    read_metrics: &ReadMetrics,
402    part: &HollowBatchPart<T>,
403) -> Result<SegmentedBytes, BlobKey> {
404    let now = Instant::now();
405    let get_span = debug_span!("fetch_batch::get");
406    let blob_key = part.key.complete(shard_id);
407    let value = retry_external(&metrics.retries.external.fetch_batch_get, || async {
408        shard_metrics.blob_gets.inc();
409        blob.get(&blob_key).await
410    })
411    .instrument(get_span.clone())
412    .await
413    .ok_or(blob_key)?;
414
415    drop(get_span);
416
417    read_metrics.part_count.inc();
418    read_metrics.part_bytes.inc_by(u64::cast_from(value.len()));
419    read_metrics.seconds.inc_by(now.elapsed().as_secs_f64());
420
421    Ok(value)
422}
423
424pub(crate) fn decode_batch_part_blob<T>(
425    metrics: &Metrics,
426    read_metrics: &ReadMetrics,
427    registered_desc: Description<T>,
428    part: &HollowBatchPart<T>,
429    buf: &SegmentedBytes,
430) -> EncodedPart<T>
431where
432    T: Timestamp + Lattice + Codec64,
433{
434    trace_span!("fetch_batch::decode").in_scope(|| {
435        let parsed = metrics
436            .codecs
437            .batch
438            .decode(|| BlobTraceBatchPart::decode(buf, &metrics.columnar))
439            .map_err(|err| anyhow!("couldn't decode batch at key {}: {}", part.key, err))
440            // We received a State that we couldn't decode. This could happen if
441            // persist messes up backward/forward compatibility, if the durable
442            // data was corrupted, or if operations messes up deployment. In any
443            // case, fail loudly.
444            .expect("internal error: invalid encoded state");
445        read_metrics
446            .part_goodbytes
447            .inc_by(u64::cast_from(parsed.updates.goodbytes()));
448        EncodedPart::from_hollow(read_metrics.clone(), registered_desc, part, parsed)
449    })
450}
451
452pub(crate) async fn fetch_batch_part<T>(
453    shard_id: &ShardId,
454    blob: &dyn Blob,
455    metrics: &Metrics,
456    shard_metrics: &ShardMetrics,
457    read_metrics: &ReadMetrics,
458    registered_desc: &Description<T>,
459    part: &HollowBatchPart<T>,
460) -> Result<EncodedPart<T>, BlobKey>
461where
462    T: Timestamp + Lattice + Codec64,
463{
464    let buf =
465        fetch_batch_part_blob(shard_id, blob, metrics, shard_metrics, read_metrics, part).await?;
466    let part = decode_batch_part_blob(metrics, read_metrics, registered_desc.clone(), part, &buf);
467    Ok(part)
468}
469
470/// This represents the lease of a seqno. It's generally paired with some external state,
471/// like a hollow part: holding this lease indicates that we may still want to fetch that part,
472/// and should hold back GC to keep it around.
473///
474/// Generally the state and lease are bundled together, as in [LeasedBatchPart]... but sometimes
475/// it's necessary to handle them separately, so this struct is exposed as well. Handle with care.
476#[derive(Clone, Debug, Default)]
477pub(crate) struct Lease(Arc<()>);
478
479impl Lease {
480    /// Returns the number of live copies of this lease, including this one.
481    pub fn count(&self) -> usize {
482        Arc::strong_count(&self.0)
483    }
484}
485
486/// A token representing one fetch-able batch part.
487///
488/// It is tradeable via `crate::fetch::fetch_batch` for the resulting data
489/// stored in the part.
490///
491/// # Exchange
492///
493/// You can exchange `LeasedBatchPart`:
494/// - If `leased_seqno.is_none()`
495/// - By converting it to [`SerdeLeasedBatchPart`] through
496///   `Self::into_exchangeable_part`. [`SerdeLeasedBatchPart`] is exchangeable,
497///   including over the network.
498///
499/// n.b. `Self::into_exchangeable_part` is known to be equivalent to
500/// `SerdeLeasedBatchPart::from(self)`, but we want the additional warning message to
501/// be visible and sufficiently scary.
502///
503/// # Panics
504/// `LeasedBatchPart` panics when dropped unless a very strict set of invariants are
505/// held:
506///
507/// `LeasedBatchPart` may only be dropped if it:
508/// - Does not have a leased `SeqNo (i.e. `self.leased_seqno.is_none()`)
509///
510/// In any other circumstance, dropping `LeasedBatchPart` panics.
511#[derive(Debug)]
512pub struct LeasedBatchPart<T> {
513    pub(crate) metrics: Arc<Metrics>,
514    pub(crate) shard_id: ShardId,
515    pub(crate) reader_id: LeasedReaderId,
516    pub(crate) filter: FetchBatchFilter<T>,
517    pub(crate) desc: Description<T>,
518    pub(crate) part: BatchPart<T>,
519    /// The `SeqNo` from which this part originated; we track this value as
520    /// to ensure the `SeqNo` isn't garbage collected while a
521    /// read still depends on it.
522    pub(crate) leased_seqno: SeqNo,
523    /// The lease that prevents this part from being GCed. Code should ensure that this lease
524    /// lives as long as the part is needed.
525    pub(crate) lease: Option<Lease>,
526    pub(crate) filter_pushdown_audit: bool,
527}
528
529impl<T> LeasedBatchPart<T>
530where
531    T: Timestamp + Codec64,
532{
533    /// Takes `self` into a [`SerdeLeasedBatchPart`], which allows `self` to be
534    /// exchanged (potentially across the network).
535    ///
536    /// !!!WARNING!!!
537    ///
538    /// This method also returns the [Lease] associated with the given part, since
539    /// that can't travel across process boundaries. The caller is responsible for
540    /// ensuring that the lease is held for as long as the batch part may be in use:
541    /// dropping it too early may cause a fetch to fail.
542    pub(crate) fn into_exchangeable_part(mut self) -> (SerdeLeasedBatchPart, Option<Lease>) {
543        let (proto, _metrics) = self.into_proto();
544        // If `x` has a lease, we've effectively transferred it to `r`.
545        let lease = self.lease.take();
546        let part = SerdeLeasedBatchPart {
547            encoded_size_bytes: self.part.encoded_size_bytes(),
548            proto: LazyProto::from(&proto),
549        };
550        (part, lease)
551    }
552
553    /// The encoded size of this part in bytes
554    pub fn encoded_size_bytes(&self) -> usize {
555        self.part.encoded_size_bytes()
556    }
557
558    /// The filter has indicated we don't need this part, we can verify the
559    /// ongoing end-to-end correctness of corner cases via "audit". This means
560    /// we fetch the part like normal and if the MFP keeps anything from it,
561    /// then something has gone horribly wrong.
562    pub fn request_filter_pushdown_audit(&mut self) {
563        self.filter_pushdown_audit = true;
564    }
565
566    /// Returns the pushdown stats for this part.
567    pub fn stats(&self) -> Option<PartStats> {
568        self.part.stats().map(|x| x.decode())
569    }
570
571    /// Apply any relevant projection pushdown optimizations, assuming that the data in the part
572    /// is equivalent to the provided key and value.
573    pub fn maybe_optimize(&mut self, cfg: &ConfigSet, key: ArrayRef, val: ArrayRef) {
574        assert_eq!(key.len(), 1, "expect a single-row key array");
575        assert_eq!(val.len(), 1, "expect a single-row val array");
576        let as_of = match &self.filter {
577            FetchBatchFilter::Snapshot { as_of } => as_of,
578            FetchBatchFilter::Listen { .. } | FetchBatchFilter::Compaction { .. } => return,
579        };
580        if !OPTIMIZE_IGNORED_DATA_FETCH.get(cfg) {
581            return;
582        }
583        let (diffs_sum, _stats) = match &self.part {
584            BatchPart::Hollow(x) => (x.diffs_sum, x.stats.as_ref()),
585            BatchPart::Inline { .. } => return,
586        };
587        debug!(
588            "try_optimize_ignored_data_fetch diffs_sum={:?} as_of={:?} lower={:?} upper={:?}",
589            // This is only used for debugging, so hack to assume that D is i64.
590            diffs_sum.map(i64::decode),
591            as_of.elements(),
592            self.desc.lower().elements(),
593            self.desc.upper().elements()
594        );
595        let as_of = match &as_of.elements() {
596            &[as_of] => as_of,
597            _ => return,
598        };
599        let eligible = self.desc.upper().less_equal(as_of) && self.desc.since().less_equal(as_of);
600        if !eligible {
601            return;
602        }
603        let Some(diffs_sum) = diffs_sum else {
604            return;
605        };
606
607        debug!(
608            "try_optimize_ignored_data_fetch faked {:?} diffs at ts {:?} skipping fetch of {} bytes",
609            // This is only used for debugging, so hack to assume that D is i64.
610            i64::decode(diffs_sum),
611            as_of,
612            self.part.encoded_size_bytes(),
613        );
614        self.metrics.pushdown.parts_faked_count.inc();
615        self.metrics
616            .pushdown
617            .parts_faked_bytes
618            .inc_by(u64::cast_from(self.part.encoded_size_bytes()));
619        let timestamps = {
620            let mut col = Codec64Mut::with_capacity(1);
621            col.push(as_of);
622            col.finish()
623        };
624        let diffs = {
625            let mut col = Codec64Mut::with_capacity(1);
626            col.push_raw(diffs_sum);
627            col.finish()
628        };
629        let updates = BlobTraceUpdates::Structured {
630            key_values: ColumnarRecordsStructuredExt { key, val },
631            timestamps,
632            diffs,
633        };
634        let faked_data = LazyInlineBatchPart::from(&ProtoInlineBatchPart {
635            desc: Some(self.desc.into_proto()),
636            index: 0,
637            updates: Some(updates.into_proto()),
638        });
639        self.part = BatchPart::Inline {
640            updates: faked_data,
641            ts_rewrite: None,
642            schema_id: None,
643            deprecated_schema_id: None,
644        };
645    }
646}
647
648impl<T> Drop for LeasedBatchPart<T> {
649    /// For details, see [`LeasedBatchPart`].
650    fn drop(&mut self) {
651        self.metrics.lease.dropped_part.inc()
652    }
653}
654
655/// A [Blob] object that has been fetched, but not at all decoded.
656///
657/// In contrast to [FetchedPart], this representation hasn't yet done parquet
658/// decoding.
659#[derive(Debug)]
660pub struct FetchedBlob<K: Codec, V: Codec, T, D> {
661    metrics: Arc<Metrics>,
662    read_metrics: ReadMetrics,
663    buf: FetchedBlobBuf<T>,
664    registered_desc: Description<T>,
665    migration: PartMigration<K, V>,
666    filter: FetchBatchFilter<T>,
667    filter_pushdown_audit: bool,
668    structured_part_audit: PartDecodeFormat,
669    fetch_permit: Option<Arc<MetricsPermits>>,
670    _phantom: PhantomData<fn() -> D>,
671}
672
673#[derive(Debug, Clone)]
674enum FetchedBlobBuf<T> {
675    Hollow {
676        buf: SegmentedBytes,
677        part: HollowBatchPart<T>,
678    },
679    Inline {
680        desc: Description<T>,
681        updates: LazyInlineBatchPart,
682        ts_rewrite: Option<Antichain<T>>,
683    },
684}
685
686impl<K: Codec, V: Codec, T: Clone, D> Clone for FetchedBlob<K, V, T, D> {
687    fn clone(&self) -> Self {
688        Self {
689            metrics: Arc::clone(&self.metrics),
690            read_metrics: self.read_metrics.clone(),
691            buf: self.buf.clone(),
692            registered_desc: self.registered_desc.clone(),
693            migration: self.migration.clone(),
694            filter: self.filter.clone(),
695            filter_pushdown_audit: self.filter_pushdown_audit.clone(),
696            fetch_permit: self.fetch_permit.clone(),
697            structured_part_audit: self.structured_part_audit.clone(),
698            _phantom: self._phantom.clone(),
699        }
700    }
701}
702
703/// [FetchedPart] but with an accompanying permit from the fetch mem/disk
704/// semaphore.
705pub struct ShardSourcePart<K: Codec, V: Codec, T, D> {
706    /// The underlying [FetchedPart].
707    pub part: FetchedPart<K, V, T, D>,
708    fetch_permit: Option<Arc<MetricsPermits>>,
709}
710
711impl<K, V, T: Debug, D: Debug> Debug for ShardSourcePart<K, V, T, D>
712where
713    K: Codec + Debug,
714    <K as Codec>::Storage: Debug,
715    V: Codec + Debug,
716    <V as Codec>::Storage: Debug,
717{
718    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
719        let ShardSourcePart { part, fetch_permit } = self;
720        f.debug_struct("ShardSourcePart")
721            .field("part", part)
722            .field("fetch_permit", fetch_permit)
723            .finish()
724    }
725}
726
727impl<K: Codec, V: Codec, T: Timestamp + Lattice + Codec64, D> FetchedBlob<K, V, T, D> {
728    /// Partially decodes this blob into a [FetchedPart].
729    pub fn parse(&self) -> ShardSourcePart<K, V, T, D> {
730        let (part, stats) = match &self.buf {
731            FetchedBlobBuf::Hollow { buf, part } => {
732                let parsed = decode_batch_part_blob(
733                    &self.metrics,
734                    &self.read_metrics,
735                    self.registered_desc.clone(),
736                    part,
737                    buf,
738                );
739                (parsed, part.stats.as_ref())
740            }
741            FetchedBlobBuf::Inline {
742                desc,
743                updates,
744                ts_rewrite,
745            } => {
746                let parsed = EncodedPart::from_inline(
747                    &self.metrics,
748                    self.read_metrics.clone(),
749                    desc.clone(),
750                    updates,
751                    ts_rewrite.as_ref(),
752                );
753                (parsed, None)
754            }
755        };
756        let part = FetchedPart::new(
757            Arc::clone(&self.metrics),
758            part,
759            self.migration.clone(),
760            self.filter.clone(),
761            self.filter_pushdown_audit,
762            self.structured_part_audit,
763            stats,
764        );
765        ShardSourcePart {
766            part,
767            fetch_permit: self.fetch_permit.clone(),
768        }
769    }
770
771    /// Decodes and returns the pushdown stats for this part, if known.
772    pub fn stats(&self) -> Option<PartStats> {
773        match &self.buf {
774            FetchedBlobBuf::Hollow { part, .. } => part.stats.as_ref().map(|x| x.decode()),
775            FetchedBlobBuf::Inline { .. } => None,
776        }
777    }
778}
779
780/// A [Blob] object that has been fetched, but not yet fully decoded.
781///
782/// In contrast to [FetchedBlob], this representation has already done parquet
783/// decoding.
784#[derive(Debug)]
785pub struct FetchedPart<K: Codec, V: Codec, T, D> {
786    metrics: Arc<Metrics>,
787    ts_filter: FetchBatchFilter<T>,
788    // If migration is Either, then the columnar one will have already been
789    // applied here on the structured data only.
790    part: EitherOrBoth<
791        ColumnarRecords,
792        (
793            <K::Schema as Schema<K>>::Decoder,
794            <V::Schema as Schema<V>>::Decoder,
795        ),
796    >,
797    timestamps: Int64Array,
798    diffs: Int64Array,
799    migration: PartMigration<K, V>,
800    filter_pushdown_audit: Option<LazyPartStats>,
801    peek_stash: Option<((Result<K, String>, Result<V, String>), T, D)>,
802    part_cursor: usize,
803    key_storage: Option<K::Storage>,
804    val_storage: Option<V::Storage>,
805
806    _phantom: PhantomData<fn() -> D>,
807}
808
809impl<K: Codec, V: Codec, T: Timestamp + Lattice + Codec64, D> FetchedPart<K, V, T, D> {
810    pub(crate) fn new(
811        metrics: Arc<Metrics>,
812        part: EncodedPart<T>,
813        migration: PartMigration<K, V>,
814        ts_filter: FetchBatchFilter<T>,
815        filter_pushdown_audit: bool,
816        part_decode_format: PartDecodeFormat,
817        stats: Option<&LazyPartStats>,
818    ) -> Self {
819        let part_len = u64::cast_from(part.part.updates.len());
820        match &migration {
821            PartMigration::SameSchema { .. } => metrics.schema.migration_count_same.inc(),
822            PartMigration::Schemaless { .. } => {
823                metrics.schema.migration_count_codec.inc();
824                metrics.schema.migration_len_legacy_codec.inc_by(part_len);
825            }
826            PartMigration::Either { .. } => {
827                metrics.schema.migration_count_either.inc();
828                match part_decode_format {
829                    PartDecodeFormat::Row {
830                        validate_structured: false,
831                    } => metrics.schema.migration_len_either_codec.inc_by(part_len),
832                    PartDecodeFormat::Row {
833                        validate_structured: true,
834                    } => {
835                        metrics.schema.migration_len_either_codec.inc_by(part_len);
836                        metrics.schema.migration_len_either_arrow.inc_by(part_len);
837                    }
838                    PartDecodeFormat::Arrow => {
839                        metrics.schema.migration_len_either_arrow.inc_by(part_len)
840                    }
841                }
842            }
843        }
844
845        let filter_pushdown_audit = if filter_pushdown_audit {
846            stats.cloned()
847        } else {
848            None
849        };
850
851        let downcast_structured = |structured: ColumnarRecordsStructuredExt,
852                                   structured_only: bool| {
853            let key_size_before = ArrayOrd::new(&structured.key).goodbytes();
854
855            let structured = match &migration {
856                PartMigration::SameSchema { .. } => structured,
857                PartMigration::Schemaless { read } if structured_only => {
858                    // We don't know the source schema, but we do know the source datatype; migrate it directly.
859                    let start = Instant::now();
860                    let read_key = data_type::<K>(&*read.key).ok()?;
861                    let read_val = data_type::<V>(&*read.val).ok()?;
862                    let key_migration = backward_compatible(structured.key.data_type(), &read_key)?;
863                    let val_migration = backward_compatible(structured.val.data_type(), &read_val)?;
864                    let key = key_migration.migrate(structured.key);
865                    let val = val_migration.migrate(structured.val);
866                    metrics
867                        .schema
868                        .migration_migrate_seconds
869                        .inc_by(start.elapsed().as_secs_f64());
870                    ColumnarRecordsStructuredExt { key, val }
871                }
872                PartMigration::Schemaless { .. } => return None,
873                PartMigration::Either {
874                    write: _,
875                    read: _,
876                    key_migration,
877                    val_migration,
878                } => {
879                    let start = Instant::now();
880                    let key = key_migration.migrate(structured.key);
881                    let val = val_migration.migrate(structured.val);
882                    metrics
883                        .schema
884                        .migration_migrate_seconds
885                        .inc_by(start.elapsed().as_secs_f64());
886                    ColumnarRecordsStructuredExt { key, val }
887                }
888            };
889
890            let read_schema = migration.codec_read();
891            let key = K::Schema::decoder_any(&*read_schema.key, &*structured.key);
892            let val = V::Schema::decoder_any(&*read_schema.val, &*structured.val);
893
894            match &key {
895                Ok(key_decoder) => {
896                    let key_size_after = key_decoder.goodbytes();
897                    let key_diff = key_size_before.saturating_sub(key_size_after);
898                    metrics
899                        .pushdown
900                        .parts_projection_trimmed_bytes
901                        .inc_by(u64::cast_from(key_diff));
902                }
903                Err(e) => {
904                    soft_panic_or_log!("failed to create decoder: {e:#?}");
905                }
906            }
907
908            Some((key.ok()?, val.ok()?))
909        };
910
911        let updates = part.normalize(&metrics.columnar);
912        let timestamps = updates.timestamps().clone();
913        let diffs = updates.diffs().clone();
914        let part = match updates {
915            // If only one encoding is available, decode via that encoding.
916            BlobTraceUpdates::Row(records) => EitherOrBoth::Left(records),
917            BlobTraceUpdates::Structured { key_values, .. } => EitherOrBoth::Right(
918                // The structured-only data format was added after schema ids were recorded everywhere,
919                // so we expect this data to be present.
920                downcast_structured(key_values, true).expect("valid schemas for structured data"),
921            ),
922            // If both are available, respect the specified part decode format.
923            BlobTraceUpdates::Both(records, ext) => match part_decode_format {
924                PartDecodeFormat::Row {
925                    validate_structured: false,
926                } => EitherOrBoth::Left(records),
927                PartDecodeFormat::Row {
928                    validate_structured: true,
929                } => match downcast_structured(ext, false) {
930                    Some(decoders) => EitherOrBoth::Both(records, decoders),
931                    None => EitherOrBoth::Left(records),
932                },
933                PartDecodeFormat::Arrow => match downcast_structured(ext, false) {
934                    Some(decoders) => EitherOrBoth::Right(decoders),
935                    None => EitherOrBoth::Left(records),
936                },
937            },
938        };
939
940        FetchedPart {
941            metrics,
942            ts_filter,
943            part,
944            peek_stash: None,
945            timestamps,
946            diffs,
947            migration,
948            filter_pushdown_audit,
949            part_cursor: 0,
950            key_storage: None,
951            val_storage: None,
952            _phantom: PhantomData,
953        }
954    }
955
956    /// Returns Some if this part was only fetched as part of a filter pushdown
957    /// audit. See [LeasedBatchPart::request_filter_pushdown_audit].
958    ///
959    /// If set, the value in the Option is for debugging and should be included
960    /// in any error messages.
961    pub fn is_filter_pushdown_audit(&self) -> Option<impl std::fmt::Debug + use<K, V, T, D>> {
962        self.filter_pushdown_audit.clone()
963    }
964}
965
966/// A [Blob] object that has been fetched, but has no associated decoding
967/// logic.
968#[derive(Debug)]
969pub(crate) struct EncodedPart<T> {
970    metrics: ReadMetrics,
971    registered_desc: Description<T>,
972    part: BlobTraceBatchPart<T>,
973    needs_truncation: bool,
974    ts_rewrite: Option<Antichain<T>>,
975}
976
977impl<K, V, T, D> FetchedPart<K, V, T, D>
978where
979    K: Debug + Codec,
980    V: Debug + Codec,
981    T: Timestamp + Lattice + Codec64,
982    D: Semigroup + Codec64 + Send + Sync,
983{
984    /// [Self::next] but optionally providing a `K` and `V` for alloc reuse.
985    ///
986    /// When `result_override` is specified, return it instead of decoding data.
987    /// This is used when we know the decoded result will be ignored.
988    pub fn next_with_storage(
989        &mut self,
990        key: &mut Option<K>,
991        val: &mut Option<V>,
992    ) -> Option<((Result<K, String>, Result<V, String>), T, D)> {
993        let mut consolidated = self.peek_stash.take();
994        loop {
995            // Fetch and decode the next tuple in the sequence. (Or break if there is none.)
996            let next = if self.part_cursor < self.timestamps.len() {
997                let next_idx = self.part_cursor;
998                self.part_cursor += 1;
999                // These `to_le_bytes` calls were previously encapsulated by `ColumnarRecords`.
1000                // TODO(structured): re-encapsulate these once we've finished the structured migration.
1001                let mut t = T::decode(self.timestamps.values()[next_idx].to_le_bytes());
1002                if !self.ts_filter.filter_ts(&mut t) {
1003                    continue;
1004                }
1005                let d = D::decode(self.diffs.values()[next_idx].to_le_bytes());
1006                if d.is_zero() {
1007                    continue;
1008                }
1009                let kv = self.decode_kv(next_idx, key, val);
1010                (kv, t, d)
1011            } else {
1012                break;
1013            };
1014
1015            // Attempt to consolidate in the next tuple, stashing it if that's not possible.
1016            if let Some((kv, t, d)) = &mut consolidated {
1017                let (kv_next, t_next, d_next) = &next;
1018                if kv == kv_next && t == t_next {
1019                    d.plus_equals(d_next);
1020                    if d.is_zero() {
1021                        consolidated = None;
1022                    }
1023                } else {
1024                    self.peek_stash = Some(next);
1025                    break;
1026                }
1027            } else {
1028                consolidated = Some(next);
1029            }
1030        }
1031
1032        let (kv, t, d) = consolidated?;
1033
1034        Some((kv, t, d))
1035    }
1036
1037    fn decode_kv(
1038        &mut self,
1039        index: usize,
1040        key: &mut Option<K>,
1041        val: &mut Option<V>,
1042    ) -> (Result<K, String>, Result<V, String>) {
1043        let decoded = self
1044            .part
1045            .as_ref()
1046            .map_left(|codec| {
1047                let ((ck, cv), _, _) = codec.get(index).expect("valid index");
1048                Self::decode_codec(
1049                    &*self.metrics,
1050                    self.migration.codec_read(),
1051                    ck,
1052                    cv,
1053                    key,
1054                    val,
1055                    &mut self.key_storage,
1056                    &mut self.val_storage,
1057                )
1058            })
1059            .map_right(|(structured_key, structured_val)| {
1060                self.decode_structured(index, structured_key, structured_val, key, val)
1061            });
1062
1063        match decoded {
1064            EitherOrBoth::Both((k, v), (k_s, v_s)) => {
1065                // Purposefully do not trace to prevent blowing up Sentry.
1066                let is_valid = self
1067                    .metrics
1068                    .columnar
1069                    .arrow()
1070                    .key()
1071                    .report_valid(|| k_s == k);
1072                if !is_valid {
1073                    soft_panic_no_log!("structured key did not match, {k_s:?} != {k:?}");
1074                }
1075                // Purposefully do not trace to prevent blowing up Sentry.
1076                let is_valid = self
1077                    .metrics
1078                    .columnar
1079                    .arrow()
1080                    .val()
1081                    .report_valid(|| v_s == v);
1082                if !is_valid {
1083                    soft_panic_no_log!("structured val did not match, {v_s:?} != {v:?}");
1084                }
1085
1086                (k, v)
1087            }
1088            EitherOrBoth::Left(kv) => kv,
1089            EitherOrBoth::Right(kv) => kv,
1090        }
1091    }
1092
1093    fn decode_codec(
1094        metrics: &Metrics,
1095        read_schemas: &Schemas<K, V>,
1096        key_buf: &[u8],
1097        val_buf: &[u8],
1098        key: &mut Option<K>,
1099        val: &mut Option<V>,
1100        key_storage: &mut Option<K::Storage>,
1101        val_storage: &mut Option<V::Storage>,
1102    ) -> (Result<K, String>, Result<V, String>) {
1103        let k = metrics.codecs.key.decode(|| match key.take() {
1104            Some(mut key) => {
1105                match K::decode_from(&mut key, key_buf, key_storage, &read_schemas.key) {
1106                    Ok(()) => Ok(key),
1107                    Err(err) => Err(err),
1108                }
1109            }
1110            None => K::decode(key_buf, &read_schemas.key),
1111        });
1112        let v = metrics.codecs.val.decode(|| match val.take() {
1113            Some(mut val) => {
1114                match V::decode_from(&mut val, val_buf, val_storage, &read_schemas.val) {
1115                    Ok(()) => Ok(val),
1116                    Err(err) => Err(err),
1117                }
1118            }
1119            None => V::decode(val_buf, &read_schemas.val),
1120        });
1121        (k, v)
1122    }
1123
1124    fn decode_structured(
1125        &self,
1126        idx: usize,
1127        keys: &<K::Schema as Schema<K>>::Decoder,
1128        vals: &<V::Schema as Schema<V>>::Decoder,
1129        key: &mut Option<K>,
1130        val: &mut Option<V>,
1131    ) -> (Result<K, String>, Result<V, String>) {
1132        let mut key = key.take().unwrap_or_default();
1133        keys.decode(idx, &mut key);
1134
1135        let mut val = val.take().unwrap_or_default();
1136        vals.decode(idx, &mut val);
1137
1138        (Ok(key), Ok(val))
1139    }
1140}
1141
1142impl<K, V, T, D> Iterator for FetchedPart<K, V, T, D>
1143where
1144    K: Debug + Codec,
1145    V: Debug + Codec,
1146    T: Timestamp + Lattice + Codec64,
1147    D: Semigroup + Codec64 + Send + Sync,
1148{
1149    type Item = ((Result<K, String>, Result<V, String>), T, D);
1150
1151    fn next(&mut self) -> Option<Self::Item> {
1152        self.next_with_storage(&mut None, &mut None)
1153    }
1154
1155    fn size_hint(&self) -> (usize, Option<usize>) {
1156        // We don't know in advance how restrictive the filter will be.
1157        let max_len = self.timestamps.len();
1158        (0, Some(max_len))
1159    }
1160}
1161
1162impl<T> EncodedPart<T>
1163where
1164    T: Timestamp + Lattice + Codec64,
1165{
1166    pub async fn fetch(
1167        shard_id: &ShardId,
1168        blob: &dyn Blob,
1169        metrics: &Metrics,
1170        shard_metrics: &ShardMetrics,
1171        read_metrics: &ReadMetrics,
1172        registered_desc: &Description<T>,
1173        part: &BatchPart<T>,
1174    ) -> Result<Self, BlobKey> {
1175        match part {
1176            BatchPart::Hollow(x) => {
1177                fetch_batch_part(
1178                    shard_id,
1179                    blob,
1180                    metrics,
1181                    shard_metrics,
1182                    read_metrics,
1183                    registered_desc,
1184                    x,
1185                )
1186                .await
1187            }
1188            BatchPart::Inline {
1189                updates,
1190                ts_rewrite,
1191                ..
1192            } => Ok(EncodedPart::from_inline(
1193                metrics,
1194                read_metrics.clone(),
1195                registered_desc.clone(),
1196                updates,
1197                ts_rewrite.as_ref(),
1198            )),
1199        }
1200    }
1201
1202    pub(crate) fn from_inline(
1203        metrics: &Metrics,
1204        read_metrics: ReadMetrics,
1205        desc: Description<T>,
1206        x: &LazyInlineBatchPart,
1207        ts_rewrite: Option<&Antichain<T>>,
1208    ) -> Self {
1209        let parsed = x.decode(&metrics.columnar).expect("valid inline part");
1210        Self::new(read_metrics, desc, "inline", ts_rewrite, parsed)
1211    }
1212
1213    pub(crate) fn from_hollow(
1214        metrics: ReadMetrics,
1215        registered_desc: Description<T>,
1216        part: &HollowBatchPart<T>,
1217        parsed: BlobTraceBatchPart<T>,
1218    ) -> Self {
1219        Self::new(
1220            metrics,
1221            registered_desc,
1222            &part.key.0,
1223            part.ts_rewrite.as_ref(),
1224            parsed,
1225        )
1226    }
1227
1228    pub(crate) fn new(
1229        metrics: ReadMetrics,
1230        registered_desc: Description<T>,
1231        printable_name: &str,
1232        ts_rewrite: Option<&Antichain<T>>,
1233        parsed: BlobTraceBatchPart<T>,
1234    ) -> Self {
1235        // There are two types of batches in persist:
1236        // - Batches written by a persist user (either directly or indirectly
1237        //   via BatchBuilder). These always have a since of the minimum
1238        //   timestamp and may be registered in persist state with a tighter set
1239        //   of bounds than are inline in the batch (truncation). To read one of
1240        //   these batches, all data physically in the batch but outside of the
1241        //   truncated bounds must be ignored. Not every user batch is
1242        //   truncated.
1243        // - Batches written by compaction. These always have an inline desc
1244        //   that exactly matches the one they are registered with. The since
1245        //   can be anything.
1246        let inline_desc = &parsed.desc;
1247        let needs_truncation = inline_desc.lower() != registered_desc.lower()
1248            || inline_desc.upper() != registered_desc.upper();
1249        if needs_truncation {
1250            assert!(
1251                PartialOrder::less_equal(inline_desc.lower(), registered_desc.lower()),
1252                "key={} inline={:?} registered={:?}",
1253                printable_name,
1254                inline_desc,
1255                registered_desc
1256            );
1257            if ts_rewrite.is_none() {
1258                // The ts rewrite feature allows us to advance the registered
1259                // upper of a batch that's already been staged (the inline
1260                // upper), so if it's been used, then there's no useful
1261                // invariant that we can assert here.
1262                assert!(
1263                    PartialOrder::less_equal(registered_desc.upper(), inline_desc.upper()),
1264                    "key={} inline={:?} registered={:?}",
1265                    printable_name,
1266                    inline_desc,
1267                    registered_desc
1268                );
1269            }
1270            // As mentioned above, batches that needs truncation will always have a
1271            // since of the minimum timestamp. Technically we could truncate any
1272            // batch where the since is less_than the output_desc's lower, but we're
1273            // strict here so we don't get any surprises.
1274            assert_eq!(
1275                inline_desc.since(),
1276                &Antichain::from_elem(T::minimum()),
1277                "key={} inline={:?} registered={:?}",
1278                printable_name,
1279                inline_desc,
1280                registered_desc
1281            );
1282        } else {
1283            assert_eq!(
1284                inline_desc, &registered_desc,
1285                "key={} inline={:?} registered={:?}",
1286                printable_name, inline_desc, registered_desc
1287            );
1288        }
1289
1290        EncodedPart {
1291            metrics,
1292            registered_desc,
1293            part: parsed,
1294            needs_truncation,
1295            ts_rewrite: ts_rewrite.cloned(),
1296        }
1297    }
1298
1299    pub(crate) fn maybe_unconsolidated(&self) -> bool {
1300        // At time of writing, only user parts may be unconsolidated, and they are always
1301        // written with a since of [T::minimum()].
1302        self.part.desc.since().borrow() == AntichainRef::new(&[T::minimum()])
1303    }
1304
1305    /// Returns the updates with all truncation / timestamp rewriting applied.
1306    pub(crate) fn normalize(&self, metrics: &ColumnarMetrics) -> BlobTraceUpdates {
1307        let updates = self.part.updates.clone();
1308        if !self.needs_truncation && self.ts_rewrite.is_none() {
1309            return updates;
1310        }
1311
1312        let mut codec = updates
1313            .records()
1314            .map(|r| (r.keys().clone(), r.vals().clone()));
1315        let mut structured = updates.structured().cloned();
1316        let mut timestamps = updates.timestamps().clone();
1317        let mut diffs = updates.diffs().clone();
1318
1319        if let Some(rewrite) = self.ts_rewrite.as_ref() {
1320            timestamps = arrow::compute::unary(&timestamps, |i: i64| {
1321                let mut t = T::decode(i.to_le_bytes());
1322                t.advance_by(rewrite.borrow());
1323                i64::from_le_bytes(T::encode(&t))
1324            });
1325        }
1326
1327        let reallocated = if self.needs_truncation {
1328            let filter = BooleanArray::from_unary(&timestamps, |i| {
1329                let t = T::decode(i.to_le_bytes());
1330                let truncate_t = {
1331                    !self.registered_desc.lower().less_equal(&t)
1332                        || self.registered_desc.upper().less_equal(&t)
1333                };
1334                !truncate_t
1335            });
1336            if filter.false_count() == 0 {
1337                // If we're not filtering anything in practice, skip filtering and reallocating.
1338                false
1339            } else {
1340                let filter = FilterBuilder::new(&filter).optimize().build();
1341                let do_filter = |array: &dyn Array| filter.filter(array).expect("valid filter len");
1342                if let Some((keys, vals)) = codec {
1343                    codec = Some((
1344                        realloc_array(do_filter(&keys).as_binary(), metrics),
1345                        realloc_array(do_filter(&vals).as_binary(), metrics),
1346                    ));
1347                }
1348                if let Some(ext) = structured {
1349                    structured = Some(ColumnarRecordsStructuredExt {
1350                        key: realloc_any(do_filter(&*ext.key), metrics),
1351                        val: realloc_any(do_filter(&*ext.val), metrics),
1352                    });
1353                }
1354                timestamps = realloc_array(do_filter(&timestamps).as_primitive(), metrics);
1355                diffs = realloc_array(do_filter(&diffs).as_primitive(), metrics);
1356                true
1357            }
1358        } else {
1359            false
1360        };
1361
1362        if self.ts_rewrite.is_some() && !reallocated {
1363            timestamps = realloc_array(&timestamps, metrics);
1364        }
1365
1366        if self.ts_rewrite.is_some() {
1367            self.metrics
1368                .ts_rewrite
1369                .inc_by(u64::cast_from(timestamps.len()));
1370        }
1371
1372        match (codec, structured) {
1373            (Some((key, value)), None) => {
1374                BlobTraceUpdates::Row(ColumnarRecords::new(key, value, timestamps, diffs))
1375            }
1376            (Some((key, value)), Some(ext)) => {
1377                BlobTraceUpdates::Both(ColumnarRecords::new(key, value, timestamps, diffs), ext)
1378            }
1379            (None, Some(ext)) => BlobTraceUpdates::Structured {
1380                key_values: ext,
1381                timestamps,
1382                diffs,
1383            },
1384            (None, None) => unreachable!(),
1385        }
1386    }
1387}
1388
1389/// This represents the serde encoding for [`LeasedBatchPart`]. We expose the struct
1390/// itself (unlike other encodable structs) to attempt to provide stricter drop
1391/// semantics on `LeasedBatchPart`, i.e. `SerdeLeasedBatchPart` is exchangeable
1392/// (including over the network), where `LeasedBatchPart` is not.
1393///
1394/// For more details see documentation and comments on:
1395/// - [`LeasedBatchPart`]
1396/// - `From<SerdeLeasedBatchPart>` for `LeasedBatchPart<T>`
1397#[derive(Debug, Serialize, Deserialize, Clone)]
1398pub struct SerdeLeasedBatchPart {
1399    // Duplicated with the one serialized in the proto for use in backpressure.
1400    encoded_size_bytes: usize,
1401    // We wrap this in a LazyProto because it guarantees that we use the proto
1402    // encoding for the serde impls.
1403    proto: LazyProto<ProtoLeasedBatchPart>,
1404}
1405
1406impl SerdeLeasedBatchPart {
1407    /// Returns the encoded size of the given part.
1408    pub fn encoded_size_bytes(&self) -> usize {
1409        self.encoded_size_bytes
1410    }
1411
1412    pub(crate) fn decode<T: Timestamp + Codec64>(
1413        &self,
1414        metrics: Arc<Metrics>,
1415    ) -> LeasedBatchPart<T> {
1416        let proto = self.proto.decode().expect("valid leased batch part");
1417        (proto, metrics)
1418            .into_rust()
1419            .expect("valid leased batch part")
1420    }
1421}
1422
1423// TODO: The way we're smuggling the metrics through here is a bit odd. Perhaps
1424// we could refactor `LeasedBatchPart` into some proto-able struct plus the
1425// metrics for the Drop bit?
1426impl<T: Timestamp + Codec64> RustType<(ProtoLeasedBatchPart, Arc<Metrics>)> for LeasedBatchPart<T> {
1427    fn into_proto(&self) -> (ProtoLeasedBatchPart, Arc<Metrics>) {
1428        let proto = ProtoLeasedBatchPart {
1429            shard_id: self.shard_id.into_proto(),
1430            filter: Some(self.filter.into_proto()),
1431            desc: Some(self.desc.into_proto()),
1432            part: Some(self.part.into_proto()),
1433            lease: Some(ProtoLease {
1434                reader_id: self.reader_id.into_proto(),
1435                seqno: Some(self.leased_seqno.into_proto()),
1436            }),
1437            filter_pushdown_audit: self.filter_pushdown_audit,
1438        };
1439        (proto, Arc::clone(&self.metrics))
1440    }
1441
1442    fn from_proto(proto: (ProtoLeasedBatchPart, Arc<Metrics>)) -> Result<Self, TryFromProtoError> {
1443        let (proto, metrics) = proto;
1444        let lease = proto
1445            .lease
1446            .ok_or_else(|| TryFromProtoError::missing_field("ProtoLeasedBatchPart::lease"))?;
1447        Ok(LeasedBatchPart {
1448            metrics,
1449            shard_id: proto.shard_id.into_rust()?,
1450            filter: proto
1451                .filter
1452                .into_rust_if_some("ProtoLeasedBatchPart::filter")?,
1453            desc: proto.desc.into_rust_if_some("ProtoLeasedBatchPart::desc")?,
1454            part: proto.part.into_rust_if_some("ProtoLeasedBatchPart::part")?,
1455            reader_id: lease.reader_id.into_rust()?,
1456            leased_seqno: lease.seqno.into_rust_if_some("ProtoLease::seqno")?,
1457            lease: None,
1458            filter_pushdown_audit: proto.filter_pushdown_audit,
1459        })
1460    }
1461}
1462
1463/// Format we'll use when decoding a [`Part`].
1464///
1465/// [`Part`]: mz_persist_types::part::Part
1466#[derive(Debug, Copy, Clone)]
1467pub enum PartDecodeFormat {
1468    /// Decode from opaque `Codec` data.
1469    Row {
1470        /// Will also decode the structured data, and validate it matches.
1471        validate_structured: bool,
1472    },
1473    /// Decode from arrow data
1474    Arrow,
1475}
1476
1477impl PartDecodeFormat {
1478    /// Returns a default value for [`PartDecodeFormat`].
1479    pub const fn default() -> Self {
1480        PartDecodeFormat::Arrow
1481    }
1482
1483    /// Parses a [`PartDecodeFormat`] from the provided string, falling back to the default if the
1484    /// provided value is unrecognized.
1485    pub fn from_str(s: &str) -> Self {
1486        match s {
1487            "row" => PartDecodeFormat::Row {
1488                validate_structured: false,
1489            },
1490            "row_with_validate" => PartDecodeFormat::Row {
1491                validate_structured: true,
1492            },
1493            "arrow" => PartDecodeFormat::Arrow,
1494            x => {
1495                let default = PartDecodeFormat::default();
1496                soft_panic_or_log!("Invalid part decode format: '{x}', falling back to {default}");
1497                default
1498            }
1499        }
1500    }
1501
1502    /// Returns a string representation of [`PartDecodeFormat`].
1503    pub const fn as_str(&self) -> &'static str {
1504        match self {
1505            PartDecodeFormat::Row {
1506                validate_structured: false,
1507            } => "row",
1508            PartDecodeFormat::Row {
1509                validate_structured: true,
1510            } => "row_with_validate",
1511            PartDecodeFormat::Arrow => "arrow",
1512        }
1513    }
1514}
1515
1516impl fmt::Display for PartDecodeFormat {
1517    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1518        f.write_str(self.as_str())
1519    }
1520}
1521
1522#[mz_ore::test]
1523fn client_exchange_data() {
1524    // The whole point of SerdeLeasedBatchPart is that it can be exchanged
1525    // between timely workers, including over the network. Enforce then that it
1526    // implements ExchangeData.
1527    fn is_exchange_data<T: timely::ExchangeData>() {}
1528    is_exchange_data::<SerdeLeasedBatchPart>();
1529    is_exchange_data::<SerdeLeasedBatchPart>();
1530}