Skip to main content

mz_persist_client/
fetch.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Fetching batches of data from persist's backing store
11
12use std::fmt::{self, Debug};
13use std::marker::PhantomData;
14use std::sync::Arc;
15use std::time::{Duration, Instant};
16
17use anyhow::anyhow;
18use arrow::array::{Array, ArrayRef, AsArray, BooleanArray, Int64Array};
19use arrow::compute::FilterBuilder;
20use differential_dataflow::difference::Monoid;
21use differential_dataflow::lattice::Lattice;
22use differential_dataflow::trace::Description;
23use itertools::EitherOrBoth;
24use mz_dyncfg::{Config, ConfigSet, ConfigValHandle};
25use mz_ore::bytes::SegmentedBytes;
26use mz_ore::cast::CastFrom;
27use mz_ore::{soft_assert_or_log, soft_panic_no_log, soft_panic_or_log};
28use mz_persist::indexed::columnar::arrow::{realloc_any, realloc_array};
29use mz_persist::indexed::columnar::{ColumnarRecords, ColumnarRecordsStructuredExt};
30use mz_persist::indexed::encoding::{BlobTraceBatchPart, BlobTraceUpdates};
31use mz_persist::location::{Blob, SeqNo};
32use mz_persist::metrics::ColumnarMetrics;
33use mz_persist_types::arrow::ArrayOrd;
34use mz_persist_types::columnar::{ColumnDecoder, Schema, data_type};
35use mz_persist_types::part::Codec64Mut;
36use mz_persist_types::schema::backward_compatible;
37use mz_persist_types::stats::PartStats;
38use mz_persist_types::{Codec, Codec64};
39use mz_proto::RustType;
40use serde::{Deserialize, Serialize};
41use timely::PartialOrder;
42use timely::progress::frontier::AntichainRef;
43use timely::progress::{Antichain, Timestamp};
44use tracing::{Instrument, debug, debug_span, trace_span};
45
46use crate::ShardId;
47use crate::cfg::PersistConfig;
48use crate::error::InvalidUsage;
49use crate::internal::apply::Applier;
50use crate::internal::encoding::{LazyInlineBatchPart, LazyPartStats, LazyProto, Schemas};
51use crate::internal::machine::retry_external;
52use crate::internal::metrics::{Metrics, MetricsPermits, ReadMetrics, ShardMetrics};
53use crate::internal::paths::BlobKey;
54use crate::internal::state::{
55    BatchPart, HollowBatchPart, ProtoHollowBatchPart, ProtoInlineBatchPart,
56};
57use crate::read::LeasedReaderId;
58use crate::schema::{PartMigration, SchemaCache};
59
60pub(crate) const FETCH_SEMAPHORE_COST_ADJUSTMENT: Config<f64> = Config::new(
61    "persist_fetch_semaphore_cost_adjustment",
62    // We use `encoded_size_bytes` as the number of permits, but the parsed size
63    // is larger than the encoded one, so adjust it. This default value is from
64    // eyeballing graphs in experiments that were run on tpch loadgen data.
65    1.2,
66    "\
67    An adjustment multiplied by encoded_size_bytes to approximate an upper \
68    bound on the size in lgalloc, which includes the decoded version.",
69);
70
71pub(crate) const FETCH_SEMAPHORE_PERMIT_ADJUSTMENT: Config<f64> = Config::new(
72    "persist_fetch_semaphore_permit_adjustment",
73    1.0,
74    "\
75    A limit on the number of outstanding persist bytes being fetched and \
76    parsed, expressed as a multiplier of the process's memory limit. This data \
77    all spills to lgalloc, so values > 1.0 are safe. Only applied to cc \
78    replicas.",
79);
80
81pub(crate) const PART_DECODE_FORMAT: Config<&'static str> = Config::new(
82    "persist_part_decode_format",
83    PartDecodeFormat::default().as_str(),
84    "\
85    Format we'll use to decode a Persist Part, either 'row', \
86    'row_with_validate', or 'arrow' (Materialize).",
87);
88
89pub(crate) const OPTIMIZE_IGNORED_DATA_FETCH: Config<bool> = Config::new(
90    "persist_optimize_ignored_data_fetch",
91    true,
92    "CYA to allow opt-out of a performance optimization to skip fetching ignored data",
93);
94
95pub(crate) const VALIDATE_PART_BOUNDS_ON_READ: Config<bool> = Config::new(
96    "persist_validate_part_bounds_on_read",
97    false,
98    "Validate the part lower <= the batch lower and the part upper <= batch upper,\
99    for the batch containing that part",
100);
101
102#[derive(Debug, Clone)]
103pub(crate) struct FetchConfig {
104    pub(crate) validate_bounds_on_read: bool,
105}
106
107impl FetchConfig {
108    pub fn from_persist_config(cfg: &PersistConfig) -> Self {
109        Self {
110            validate_bounds_on_read: VALIDATE_PART_BOUNDS_ON_READ.get(cfg),
111        }
112    }
113}
114
115#[derive(Debug, Clone)]
116pub(crate) struct BatchFetcherConfig {
117    pub(crate) part_decode_format: ConfigValHandle<String>,
118    pub(crate) fetch_config: FetchConfig,
119}
120
121impl BatchFetcherConfig {
122    pub fn new(value: &PersistConfig) -> Self {
123        Self {
124            part_decode_format: PART_DECODE_FORMAT.handle(value),
125            fetch_config: FetchConfig::from_persist_config(value),
126        }
127    }
128
129    pub fn part_decode_format(&self) -> PartDecodeFormat {
130        PartDecodeFormat::from_str(self.part_decode_format.get().as_str())
131    }
132}
133
134/// Capable of fetching [`LeasedBatchPart`] while not holding any capabilities.
135#[derive(Debug)]
136pub struct BatchFetcher<K, V, T, D>
137where
138    T: Timestamp + Lattice + Codec64,
139    // These are only here so we can use them in the auto-expiring `Drop` impl.
140    K: Debug + Codec,
141    V: Debug + Codec,
142    D: Monoid + Codec64 + Send + Sync,
143{
144    pub(crate) cfg: BatchFetcherConfig,
145    pub(crate) blob: Arc<dyn Blob>,
146    pub(crate) metrics: Arc<Metrics>,
147    pub(crate) shard_metrics: Arc<ShardMetrics>,
148    pub(crate) shard_id: ShardId,
149    pub(crate) read_schemas: Schemas<K, V>,
150    pub(crate) schema_cache: SchemaCache<K, V, T, D>,
151    pub(crate) is_transient: bool,
152
153    // Ensures that `BatchFetcher` is of the same type as the `ReadHandle` it's
154    // derived from.
155    pub(crate) _phantom: PhantomData<fn() -> (K, V, T, D)>,
156}
157
158impl<K, V, T, D> BatchFetcher<K, V, T, D>
159where
160    K: Debug + Codec,
161    V: Debug + Codec,
162    T: Timestamp + Lattice + Codec64 + Sync,
163    D: Monoid + Codec64 + Send + Sync,
164{
165    /// Trade in an exchange-able [LeasedBatchPart] for the data it represents.
166    ///
167    /// Note to check the `LeasedBatchPart` documentation for how to handle the
168    /// returned value.
169    pub async fn fetch_leased_part(
170        &mut self,
171        part: ExchangeableBatchPart<T>,
172    ) -> Result<Result<FetchedBlob<K, V, T, D>, BlobKey>, InvalidUsage<T>> {
173        let ExchangeableBatchPart {
174            shard_id,
175            encoded_size_bytes: _,
176            desc,
177            filter,
178            filter_pushdown_audit,
179            part,
180            reader_id: _,
181        } = part;
182        let part: BatchPart<T> = part.decode_to().expect("valid part");
183        if shard_id != self.shard_id {
184            return Err(InvalidUsage::BatchNotFromThisShard {
185                batch_shard: shard_id,
186                handle_shard: self.shard_id.clone(),
187            });
188        }
189
190        let migration =
191            PartMigration::new(&part, self.read_schemas.clone(), &mut self.schema_cache)
192                .await
193                .unwrap_or_else(|read_schemas| {
194                    panic!(
195                        "could not decode part {:?} with schema: {:?}",
196                        part.schema_id(),
197                        read_schemas
198                    )
199                });
200
201        let (buf, fetch_permit) = match &part {
202            BatchPart::Hollow(x) => {
203                let fetch_permit = self
204                    .metrics
205                    .semaphore
206                    .acquire_fetch_permits(x.encoded_size_bytes)
207                    .await;
208                let read_metrics = if self.is_transient {
209                    &self.metrics.read.unindexed
210                } else {
211                    &self.metrics.read.batch_fetcher
212                };
213                let buf = fetch_batch_part_blob(
214                    &shard_id,
215                    self.blob.as_ref(),
216                    &self.metrics,
217                    &self.shard_metrics,
218                    read_metrics,
219                    x,
220                )
221                .await;
222                let buf = match buf {
223                    Ok(buf) => buf,
224                    Err(key) => return Ok(Err(key)),
225                };
226                let buf = FetchedBlobBuf::Hollow {
227                    buf,
228                    part: x.clone(),
229                };
230                (buf, Some(Arc::new(fetch_permit)))
231            }
232            BatchPart::Inline {
233                updates,
234                ts_rewrite,
235                ..
236            } => {
237                let buf = FetchedBlobBuf::Inline {
238                    desc: desc.clone(),
239                    updates: updates.clone(),
240                    ts_rewrite: ts_rewrite.clone(),
241                };
242                (buf, None)
243            }
244        };
245        let fetched_blob = FetchedBlob {
246            metrics: Arc::clone(&self.metrics),
247            read_metrics: self.metrics.read.batch_fetcher.clone(),
248            buf,
249            registered_desc: desc.clone(),
250            migration,
251            filter: filter.clone(),
252            filter_pushdown_audit,
253            structured_part_audit: self.cfg.part_decode_format(),
254            fetch_permit,
255            _phantom: PhantomData,
256            fetch_config: self.cfg.fetch_config.clone(),
257        };
258        Ok(Ok(fetched_blob))
259    }
260
261    /// Diagnoses a missing-blob fetch failure for a part leased by the given
262    /// reader. See the free function `missing_blob_diagnostics`.
263    pub async fn missing_blob_diagnostics(&self, reader_id: &LeasedReaderId) -> String {
264        missing_blob_diagnostics(self.schema_cache.applier(), reader_id).await
265    }
266}
267
268/// Diagnoses a missing-blob fetch failure: refreshes the shard state and
269/// reports whether the reader that leased the part is still present in it.
270///
271/// A missing blob means garbage collection deleted a blob that the part's
272/// lease (a seqno hold in shard state) should have protected. If the reader
273/// has been expired out of state, the lease was lost: this can happen when the
274/// process fails to heartbeat the reader for longer than the lease duration,
275/// e.g. because the machine went to sleep, was starved of CPU or memory, or
276/// was partitioned from consensus. If the reader is still present, the hold
277/// did not protect the blob, which points at a GC or lease-tracking bug.
278pub(crate) async fn missing_blob_diagnostics<K, V, T, D>(
279    applier: &Applier<K, V, T, D>,
280    reader_id: &LeasedReaderId,
281) -> String
282where
283    K: Debug + Codec,
284    V: Debug + Codec,
285    T: Timestamp + Lattice + Codec64 + Sync,
286    D: Monoid + Codec64,
287{
288    // Refreshing state talks to consensus; this runs on an already-fatal path
289    // and a partition from consensus may be the very reason the lease was
290    // lost, so don't let the diagnosis block the restart indefinitely.
291    let refresh = applier.fetch_and_update_state(None);
292    if tokio::time::timeout(Duration::from_secs(30), refresh)
293        .await
294        .is_err()
295    {
296        return format!(
297            "reader {reader_id}: could not refresh state within 30s to diagnose the lease; \
298             partitioned from consensus?"
299        );
300    }
301    match applier.reader_lease(reader_id.clone()) {
302        Some(lease_state) => format!(
303            "reader {reader_id} is still present in state ({lease_state:?}); \
304             a missing blob despite a live lease indicates a GC bug"
305        ),
306        None => format!(
307            "reader {reader_id} has been expired out of state; \
308             the process likely failed to heartbeat it within the lease duration \
309             (machine sleep, CPU/memory starvation, or a partition from consensus?)"
310        ),
311    }
312}
313
314#[derive(Debug, Clone, Serialize, Deserialize)]
315pub(crate) enum FetchBatchFilter<T> {
316    Snapshot {
317        as_of: Antichain<T>,
318    },
319    Listen {
320        as_of: Antichain<T>,
321        lower: Antichain<T>,
322    },
323    Compaction {
324        since: Antichain<T>,
325    },
326}
327
328impl<T: Timestamp + Lattice> FetchBatchFilter<T> {
329    pub(crate) fn filter_ts(&self, t: &mut T) -> bool {
330        match self {
331            FetchBatchFilter::Snapshot { as_of } => {
332                // This time is covered by a listen
333                if as_of.less_than(t) {
334                    return false;
335                }
336                t.advance_by(as_of.borrow());
337                true
338            }
339            FetchBatchFilter::Listen { as_of, lower } => {
340                // This time is covered by a snapshot
341                if !as_of.less_than(t) {
342                    return false;
343                }
344
345                // Because of compaction, the next batch we get might also
346                // contain updates we've already emitted. For example, we
347                // emitted `[1, 2)` and then compaction combined that batch with
348                // a `[2, 3)` batch into a new `[1, 3)` batch. If this happens,
349                // we just need to filter out anything < the frontier. This
350                // frontier was the upper of the last batch (and thus exclusive)
351                // so for the == case, we still emit.
352                if !lower.less_equal(t) {
353                    return false;
354                }
355                true
356            }
357            FetchBatchFilter::Compaction { since } => {
358                t.advance_by(since.borrow());
359                true
360            }
361        }
362    }
363}
364
365/// Trade in an exchange-able [LeasedBatchPart] for the data it represents.
366///
367/// Note to check the `LeasedBatchPart` documentation for how to handle the
368/// returned value.
369pub(crate) async fn fetch_leased_part<K, V, T, D>(
370    cfg: &PersistConfig,
371    part: &LeasedBatchPart<T>,
372    blob: &dyn Blob,
373    metrics: Arc<Metrics>,
374    read_metrics: &ReadMetrics,
375    shard_metrics: &ShardMetrics,
376    reader_id: &LeasedReaderId,
377    read_schemas: Schemas<K, V>,
378    schema_cache: &mut SchemaCache<K, V, T, D>,
379) -> FetchedPart<K, V, T, D>
380where
381    K: Debug + Codec,
382    V: Debug + Codec,
383    T: Timestamp + Lattice + Codec64 + Sync,
384    D: Monoid + Codec64 + Send + Sync,
385{
386    let fetch_config = FetchConfig::from_persist_config(cfg);
387    let encoded_part = match EncodedPart::fetch(
388        &fetch_config,
389        &part.shard_id,
390        blob,
391        &metrics,
392        shard_metrics,
393        read_metrics,
394        &part.desc,
395        &part.part,
396    )
397    .await
398    {
399        Ok(x) => x,
400        Err(blob_key) => {
401            // Ideally, readers should never encounter a missing blob. They place a seqno
402            // hold as they consume their snapshot/listen, preventing any blobs they need
403            // from being deleted by garbage collection, and all blob implementations are
404            // linearizable so there should be no possibility of stale reads.
405            //
406            // If we do have a bug and a reader does encounter a missing blob, the state
407            // cannot be recovered, and our best option is to panic and retry the whole
408            // process.
409            let diagnostics = missing_blob_diagnostics(schema_cache.applier(), reader_id).await;
410            panic!("could not fetch batch part {}: {}", blob_key, diagnostics)
411        }
412    };
413    let part_cfg = BatchFetcherConfig::new(cfg);
414    let migration = PartMigration::new(&part.part, read_schemas, schema_cache)
415        .await
416        .unwrap_or_else(|read_schemas| {
417            panic!(
418                "could not decode part {:?} with schema: {:?}",
419                part.part.schema_id(),
420                read_schemas
421            )
422        });
423    FetchedPart::new(
424        metrics,
425        encoded_part,
426        migration,
427        part.filter.clone(),
428        part.filter_pushdown_audit,
429        part_cfg.part_decode_format(),
430        part.part.stats(),
431    )
432}
433
434pub(crate) async fn fetch_batch_part_blob<T>(
435    shard_id: &ShardId,
436    blob: &dyn Blob,
437    metrics: &Metrics,
438    shard_metrics: &ShardMetrics,
439    read_metrics: &ReadMetrics,
440    part: &HollowBatchPart<T>,
441) -> Result<SegmentedBytes, BlobKey> {
442    let now = Instant::now();
443    let get_span = debug_span!("fetch_batch::get");
444    let blob_key = part.key.complete(shard_id);
445    let value = retry_external(&metrics.retries.external.fetch_batch_get, || async {
446        shard_metrics.blob_gets.inc();
447        blob.get(&blob_key).await
448    })
449    .instrument(get_span.clone())
450    .await
451    .ok_or(blob_key)?;
452
453    drop(get_span);
454
455    read_metrics.part_count.inc();
456    read_metrics.part_bytes.inc_by(u64::cast_from(value.len()));
457    read_metrics.seconds.inc_by(now.elapsed().as_secs_f64());
458
459    Ok(value)
460}
461
462pub(crate) fn decode_batch_part_blob<T>(
463    cfg: &FetchConfig,
464    metrics: &Metrics,
465    read_metrics: &ReadMetrics,
466    registered_desc: Description<T>,
467    part: &HollowBatchPart<T>,
468    buf: &SegmentedBytes,
469) -> EncodedPart<T>
470where
471    T: Timestamp + Lattice + Codec64,
472{
473    trace_span!("fetch_batch::decode").in_scope(|| {
474        let parsed = metrics
475            .codecs
476            .batch
477            .decode(|| BlobTraceBatchPart::decode(buf, &metrics.columnar))
478            .map_err(|err| anyhow!("couldn't decode batch at key {}: {}", part.key, err))
479            // We received a State that we couldn't decode. This could happen if
480            // persist messes up backward/forward compatibility, if the durable
481            // data was corrupted, or if operations messes up deployment. In any
482            // case, fail loudly.
483            .expect("internal error: invalid encoded state");
484        read_metrics
485            .part_goodbytes
486            .inc_by(u64::cast_from(parsed.updates.goodbytes()));
487        EncodedPart::from_hollow(cfg, read_metrics.clone(), registered_desc, part, parsed)
488    })
489}
490
491pub(crate) async fn fetch_batch_part<T>(
492    cfg: &FetchConfig,
493    shard_id: &ShardId,
494    blob: &dyn Blob,
495    metrics: &Metrics,
496    shard_metrics: &ShardMetrics,
497    read_metrics: &ReadMetrics,
498    registered_desc: &Description<T>,
499    part: &HollowBatchPart<T>,
500) -> Result<EncodedPart<T>, BlobKey>
501where
502    T: Timestamp + Lattice + Codec64,
503{
504    let buf =
505        fetch_batch_part_blob(shard_id, blob, metrics, shard_metrics, read_metrics, part).await?;
506    let part = decode_batch_part_blob(
507        cfg,
508        metrics,
509        read_metrics,
510        registered_desc.clone(),
511        part,
512        &buf,
513    );
514    Ok(part)
515}
516
517/// This represents the lease of a seqno. It's generally paired with some external state,
518/// like a hollow part: holding this lease indicates that we may still want to fetch that part,
519/// and should hold back GC to keep it around.
520///
521/// Generally the state and lease are bundled together, as in [LeasedBatchPart]... but sometimes
522/// it's necessary to handle them separately, so this struct is exposed as well. Handle with care.
523#[derive(Clone, Debug)]
524pub struct Lease(Arc<SeqNo>);
525
526impl Lease {
527    /// Creates a new [Lease] that holds the given [SeqNo].
528    pub fn new(seqno: SeqNo) -> Self {
529        Self(Arc::new(seqno))
530    }
531
532    /// Returns the inner [SeqNo] of this [Lease].
533    pub fn seqno(&self) -> SeqNo {
534        *self.0
535    }
536
537    /// Returns the number of live copies of this lease, including this one.
538    pub fn count(&self) -> usize {
539        Arc::strong_count(&self.0)
540    }
541}
542
543/// A token representing one fetch-able batch part.
544///
545/// It is tradeable via `crate::fetch::fetch_batch` for the resulting data
546/// stored in the part.
547///
548/// # Exchange
549///
550/// You can exchange `LeasedBatchPart`:
551/// - If `leased_seqno.is_none()`
552/// - By converting it to [`ExchangeableBatchPart`] through
553///   `Self::into_exchangeable_part`. [`ExchangeableBatchPart`] is exchangeable,
554///   including over the network.
555///
556/// n.b. `Self::into_exchangeable_part` is known to be equivalent to
557/// `SerdeLeasedBatchPart::from(self)`, but we want the additional warning message to
558/// be visible and sufficiently scary.
559///
560/// # Panics
561/// `LeasedBatchPart` panics when dropped unless a very strict set of invariants are
562/// held:
563///
564/// `LeasedBatchPart` may only be dropped if it:
565/// - Does not have a leased `SeqNo (i.e. `self.leased_seqno.is_none()`)
566///
567/// In any other circumstance, dropping `LeasedBatchPart` panics.
568#[derive(Debug)]
569pub struct LeasedBatchPart<T> {
570    pub(crate) metrics: Arc<Metrics>,
571    pub(crate) shard_id: ShardId,
572    pub(crate) filter: FetchBatchFilter<T>,
573    pub(crate) desc: Description<T>,
574    pub(crate) part: BatchPart<T>,
575    /// The lease that prevents this part from being GCed. Code should ensure that this lease
576    /// lives as long as the part is needed.
577    pub(crate) lease: Lease,
578    /// The id of the reader that leased this part, for diagnostics: if the
579    /// blob backing the part goes missing, knowing whether this reader is
580    /// still present in state distinguishes a lost lease from a GC bug.
581    pub(crate) reader_id: LeasedReaderId,
582    pub(crate) filter_pushdown_audit: bool,
583}
584
585impl<T> LeasedBatchPart<T>
586where
587    T: Timestamp + Codec64,
588{
589    /// Takes `self` into a [`ExchangeableBatchPart`], which allows `self` to be
590    /// exchanged (potentially across the network).
591    ///
592    /// !!!WARNING!!!
593    ///
594    /// This method also returns the [Lease] associated with the given part, since
595    /// that can't travel across process boundaries. The caller is responsible for
596    /// ensuring that the lease is held for as long as the batch part may be in use:
597    /// dropping it too early may cause a fetch to fail.
598    pub(crate) fn into_exchangeable_part(self) -> (ExchangeableBatchPart<T>, Lease) {
599        // If `x` has a lease, we've effectively transferred it to `r`.
600        let lease = self.lease.clone();
601        let part = ExchangeableBatchPart {
602            shard_id: self.shard_id,
603            encoded_size_bytes: self.part.encoded_size_bytes(),
604            desc: self.desc.clone(),
605            filter: self.filter.clone(),
606            part: LazyProto::from(&self.part.into_proto()),
607            reader_id: self.reader_id.clone(),
608            filter_pushdown_audit: self.filter_pushdown_audit,
609        };
610        (part, lease)
611    }
612
613    /// The encoded size of this part in bytes
614    pub fn encoded_size_bytes(&self) -> usize {
615        self.part.encoded_size_bytes()
616    }
617
618    /// The filter has indicated we don't need this part, we can verify the
619    /// ongoing end-to-end correctness of corner cases via "audit". This means
620    /// we fetch the part like normal and if the MFP keeps anything from it,
621    /// then something has gone horribly wrong.
622    pub fn request_filter_pushdown_audit(&mut self) {
623        self.filter_pushdown_audit = true;
624    }
625
626    /// Returns the pushdown stats for this part.
627    pub fn stats(&self) -> Option<PartStats> {
628        self.part.stats().map(|x| x.decode())
629    }
630
631    /// Apply any relevant projection pushdown optimizations, assuming that the data in the part
632    /// is equivalent to the provided key and value.
633    pub fn maybe_optimize(&mut self, cfg: &ConfigSet, key: ArrayRef, val: ArrayRef) {
634        assert_eq!(key.len(), 1, "expect a single-row key array");
635        assert_eq!(val.len(), 1, "expect a single-row val array");
636        let as_of = match &self.filter {
637            FetchBatchFilter::Snapshot { as_of } => as_of,
638            FetchBatchFilter::Listen { .. } | FetchBatchFilter::Compaction { .. } => return,
639        };
640        if !OPTIMIZE_IGNORED_DATA_FETCH.get(cfg) {
641            return;
642        }
643        let (diffs_sum, _stats) = match &self.part {
644            BatchPart::Hollow(x) => (x.diffs_sum, x.stats.as_ref()),
645            BatchPart::Inline { .. } => return,
646        };
647        debug!(
648            "try_optimize_ignored_data_fetch diffs_sum={:?} as_of={:?} lower={:?} upper={:?}",
649            // This is only used for debugging, so hack to assume that D is i64.
650            diffs_sum.map(i64::decode),
651            as_of.elements(),
652            self.desc.lower().elements(),
653            self.desc.upper().elements()
654        );
655        let as_of = match &as_of.elements() {
656            &[as_of] => as_of,
657            _ => return,
658        };
659        let eligible = self.desc.upper().less_equal(as_of) && self.desc.since().less_equal(as_of);
660        if !eligible {
661            return;
662        }
663        let Some(diffs_sum) = diffs_sum else {
664            return;
665        };
666
667        debug!(
668            "try_optimize_ignored_data_fetch faked {:?} diffs at ts {:?} skipping fetch of {} bytes",
669            // This is only used for debugging, so hack to assume that D is i64.
670            i64::decode(diffs_sum),
671            as_of,
672            self.part.encoded_size_bytes(),
673        );
674        self.metrics.pushdown.parts_faked_count.inc();
675        self.metrics
676            .pushdown
677            .parts_faked_bytes
678            .inc_by(u64::cast_from(self.part.encoded_size_bytes()));
679        let timestamps = {
680            let mut col = Codec64Mut::with_capacity(1);
681            col.push(as_of);
682            col.finish()
683        };
684        let diffs = {
685            let mut col = Codec64Mut::with_capacity(1);
686            col.push_raw(diffs_sum);
687            col.finish()
688        };
689        let updates = BlobTraceUpdates::Structured {
690            key_values: ColumnarRecordsStructuredExt { key, val },
691            timestamps,
692            diffs,
693        };
694        let faked_data = LazyInlineBatchPart::from(&ProtoInlineBatchPart {
695            desc: Some(self.desc.into_proto()),
696            index: 0,
697            updates: Some(updates.into_proto()),
698        });
699        self.part = BatchPart::Inline {
700            updates: faked_data,
701            ts_rewrite: None,
702            schema_id: None,
703            deprecated_schema_id: None,
704        };
705    }
706}
707
708impl<T> Drop for LeasedBatchPart<T> {
709    /// For details, see [`LeasedBatchPart`].
710    fn drop(&mut self) {
711        self.metrics.lease.dropped_part.inc()
712    }
713}
714
715/// A [Blob] object that has been fetched, but not at all decoded.
716///
717/// In contrast to [FetchedPart], this representation hasn't yet done parquet
718/// decoding.
719#[derive(Debug)]
720pub struct FetchedBlob<K: Codec, V: Codec, T, D> {
721    metrics: Arc<Metrics>,
722    read_metrics: ReadMetrics,
723    buf: FetchedBlobBuf<T>,
724    registered_desc: Description<T>,
725    migration: PartMigration<K, V>,
726    filter: FetchBatchFilter<T>,
727    filter_pushdown_audit: bool,
728    structured_part_audit: PartDecodeFormat,
729    fetch_permit: Option<Arc<MetricsPermits>>,
730    fetch_config: FetchConfig,
731    _phantom: PhantomData<fn() -> D>,
732}
733
734#[derive(Debug, Clone)]
735enum FetchedBlobBuf<T> {
736    Hollow {
737        buf: SegmentedBytes,
738        part: HollowBatchPart<T>,
739    },
740    Inline {
741        desc: Description<T>,
742        updates: LazyInlineBatchPart,
743        ts_rewrite: Option<Antichain<T>>,
744    },
745}
746
747impl<K: Codec, V: Codec, T: Clone, D> Clone for FetchedBlob<K, V, T, D> {
748    fn clone(&self) -> Self {
749        Self {
750            metrics: Arc::clone(&self.metrics),
751            read_metrics: self.read_metrics.clone(),
752            buf: self.buf.clone(),
753            registered_desc: self.registered_desc.clone(),
754            migration: self.migration.clone(),
755            filter: self.filter.clone(),
756            filter_pushdown_audit: self.filter_pushdown_audit.clone(),
757            fetch_permit: self.fetch_permit.clone(),
758            structured_part_audit: self.structured_part_audit.clone(),
759            fetch_config: self.fetch_config.clone(),
760            _phantom: self._phantom.clone(),
761        }
762    }
763}
764
765/// [FetchedPart] but with an accompanying permit from the fetch mem/disk
766/// semaphore.
767pub struct ShardSourcePart<K: Codec, V: Codec, T, D> {
768    /// The underlying [FetchedPart].
769    pub part: FetchedPart<K, V, T, D>,
770    fetch_permit: Option<Arc<MetricsPermits>>,
771}
772
773impl<K, V, T: Debug, D: Debug> Debug for ShardSourcePart<K, V, T, D>
774where
775    K: Codec + Debug,
776    <K as Codec>::Storage: Debug,
777    V: Codec + Debug,
778    <V as Codec>::Storage: Debug,
779{
780    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
781        let ShardSourcePart { part, fetch_permit } = self;
782        f.debug_struct("ShardSourcePart")
783            .field("part", part)
784            .field("fetch_permit", fetch_permit)
785            .finish()
786    }
787}
788
789impl<K: Codec, V: Codec, T: Timestamp + Lattice + Codec64, D> FetchedBlob<K, V, T, D> {
790    /// Partially decodes this blob into a [FetchedPart].
791    pub fn parse(&self) -> ShardSourcePart<K, V, T, D> {
792        self.parse_internal(&self.fetch_config)
793    }
794
795    /// Partially decodes this blob into a [FetchedPart].
796    pub(crate) fn parse_internal(&self, cfg: &FetchConfig) -> ShardSourcePart<K, V, T, D> {
797        let (part, stats) = match &self.buf {
798            FetchedBlobBuf::Hollow { buf, part } => {
799                let parsed = decode_batch_part_blob(
800                    cfg,
801                    &self.metrics,
802                    &self.read_metrics,
803                    self.registered_desc.clone(),
804                    part,
805                    buf,
806                );
807                (parsed, part.stats.as_ref())
808            }
809            FetchedBlobBuf::Inline {
810                desc,
811                updates,
812                ts_rewrite,
813            } => {
814                let parsed = EncodedPart::from_inline(
815                    cfg,
816                    &self.metrics,
817                    self.read_metrics.clone(),
818                    desc.clone(),
819                    updates,
820                    ts_rewrite.as_ref(),
821                );
822                (parsed, None)
823            }
824        };
825        let part = FetchedPart::new(
826            Arc::clone(&self.metrics),
827            part,
828            self.migration.clone(),
829            self.filter.clone(),
830            self.filter_pushdown_audit,
831            self.structured_part_audit,
832            stats,
833        );
834        ShardSourcePart {
835            part,
836            fetch_permit: self.fetch_permit.clone(),
837        }
838    }
839
840    /// Decodes and returns the pushdown stats for this part, if known.
841    pub fn stats(&self) -> Option<PartStats> {
842        match &self.buf {
843            FetchedBlobBuf::Hollow { part, .. } => part.stats.as_ref().map(|x| x.decode()),
844            FetchedBlobBuf::Inline { .. } => None,
845        }
846    }
847}
848
849/// A [Blob] object that has been fetched, but not yet fully decoded.
850///
851/// In contrast to [FetchedBlob], this representation has already done parquet
852/// decoding.
853#[derive(Debug)]
854pub struct FetchedPart<K: Codec, V: Codec, T, D> {
855    metrics: Arc<Metrics>,
856    ts_filter: FetchBatchFilter<T>,
857    // If migration is Either, then the columnar one will have already been
858    // applied here on the structured data only.
859    part: EitherOrBoth<
860        ColumnarRecords,
861        (
862            <K::Schema as Schema<K>>::Decoder,
863            <V::Schema as Schema<V>>::Decoder,
864        ),
865    >,
866    timestamps: Int64Array,
867    diffs: Int64Array,
868    migration: PartMigration<K, V>,
869    filter_pushdown_audit: Option<LazyPartStats>,
870    peek_stash: Option<((K, V), T, D)>,
871    part_cursor: usize,
872    key_storage: Option<K::Storage>,
873    val_storage: Option<V::Storage>,
874
875    _phantom: PhantomData<fn() -> D>,
876}
877
878impl<K: Codec, V: Codec, T: Timestamp + Lattice + Codec64, D> FetchedPart<K, V, T, D> {
879    pub(crate) fn new(
880        metrics: Arc<Metrics>,
881        part: EncodedPart<T>,
882        migration: PartMigration<K, V>,
883        ts_filter: FetchBatchFilter<T>,
884        filter_pushdown_audit: bool,
885        part_decode_format: PartDecodeFormat,
886        stats: Option<&LazyPartStats>,
887    ) -> Self {
888        let part_len = u64::cast_from(part.part.updates.len());
889        match &migration {
890            PartMigration::SameSchema { .. } => metrics.schema.migration_count_same.inc(),
891            PartMigration::Schemaless { .. } => {
892                metrics.schema.migration_count_codec.inc();
893                metrics.schema.migration_len_legacy_codec.inc_by(part_len);
894            }
895            PartMigration::Either { .. } => {
896                metrics.schema.migration_count_either.inc();
897                match part_decode_format {
898                    PartDecodeFormat::Row {
899                        validate_structured: false,
900                    } => metrics.schema.migration_len_either_codec.inc_by(part_len),
901                    PartDecodeFormat::Row {
902                        validate_structured: true,
903                    } => {
904                        metrics.schema.migration_len_either_codec.inc_by(part_len);
905                        metrics.schema.migration_len_either_arrow.inc_by(part_len);
906                    }
907                    PartDecodeFormat::Arrow => {
908                        metrics.schema.migration_len_either_arrow.inc_by(part_len)
909                    }
910                }
911            }
912        }
913
914        let filter_pushdown_audit = if filter_pushdown_audit {
915            stats.cloned()
916        } else {
917            None
918        };
919
920        let downcast_structured = |structured: ColumnarRecordsStructuredExt,
921                                   structured_only: bool| {
922            let key_size_before = ArrayOrd::new(&structured.key).goodbytes();
923
924            let structured = match &migration {
925                PartMigration::SameSchema { .. } => structured,
926                PartMigration::Schemaless { read } if structured_only => {
927                    // We don't know the source schema, but we do know the source datatype; migrate it directly.
928                    let start = Instant::now();
929                    let read_key = data_type::<K>(&*read.key).ok()?;
930                    let read_val = data_type::<V>(&*read.val).ok()?;
931                    let key_migration = backward_compatible(structured.key.data_type(), &read_key)?;
932                    let val_migration = backward_compatible(structured.val.data_type(), &read_val)?;
933                    let key = key_migration.migrate(structured.key);
934                    let val = val_migration.migrate(structured.val);
935                    metrics
936                        .schema
937                        .migration_migrate_seconds
938                        .inc_by(start.elapsed().as_secs_f64());
939                    ColumnarRecordsStructuredExt { key, val }
940                }
941                PartMigration::Schemaless { .. } => return None,
942                PartMigration::Either {
943                    write: _,
944                    read: _,
945                    key_migration,
946                    val_migration,
947                } => {
948                    let start = Instant::now();
949                    let key = key_migration.migrate(structured.key);
950                    let val = val_migration.migrate(structured.val);
951                    metrics
952                        .schema
953                        .migration_migrate_seconds
954                        .inc_by(start.elapsed().as_secs_f64());
955                    ColumnarRecordsStructuredExt { key, val }
956                }
957            };
958
959            let read_schema = migration.codec_read();
960            let key = K::Schema::decoder_any(&*read_schema.key, &*structured.key);
961            let val = V::Schema::decoder_any(&*read_schema.val, &*structured.val);
962
963            match &key {
964                Ok(key_decoder) => {
965                    let key_size_after = key_decoder.goodbytes();
966                    let key_diff = key_size_before.saturating_sub(key_size_after);
967                    metrics
968                        .pushdown
969                        .parts_projection_trimmed_bytes
970                        .inc_by(u64::cast_from(key_diff));
971                }
972                Err(e) => {
973                    soft_panic_or_log!("failed to create decoder: {e:#?}");
974                }
975            }
976
977            Some((key.ok()?, val.ok()?))
978        };
979
980        let updates = part.normalize(&metrics.columnar);
981        let timestamps = updates.timestamps().clone();
982        let diffs = updates.diffs().clone();
983        let part = match updates {
984            // If only one encoding is available, decode via that encoding.
985            BlobTraceUpdates::Row(records) => EitherOrBoth::Left(records),
986            BlobTraceUpdates::Structured { key_values, .. } => EitherOrBoth::Right(
987                // The structured-only data format was added after schema ids were recorded everywhere,
988                // so we expect this data to be present.
989                downcast_structured(key_values, true).expect("valid schemas for structured data"),
990            ),
991            // If both are available, respect the specified part decode format.
992            BlobTraceUpdates::Both(records, ext) => match part_decode_format {
993                PartDecodeFormat::Row {
994                    validate_structured: false,
995                } => EitherOrBoth::Left(records),
996                PartDecodeFormat::Row {
997                    validate_structured: true,
998                } => match downcast_structured(ext, false) {
999                    Some(decoders) => EitherOrBoth::Both(records, decoders),
1000                    None => EitherOrBoth::Left(records),
1001                },
1002                PartDecodeFormat::Arrow => match downcast_structured(ext, false) {
1003                    Some(decoders) => EitherOrBoth::Right(decoders),
1004                    None => EitherOrBoth::Left(records),
1005                },
1006            },
1007        };
1008
1009        FetchedPart {
1010            metrics,
1011            ts_filter,
1012            part,
1013            peek_stash: None,
1014            timestamps,
1015            diffs,
1016            migration,
1017            filter_pushdown_audit,
1018            part_cursor: 0,
1019            key_storage: None,
1020            val_storage: None,
1021            _phantom: PhantomData,
1022        }
1023    }
1024
1025    /// Returns Some if this part was only fetched as part of a filter pushdown
1026    /// audit. See [LeasedBatchPart::request_filter_pushdown_audit].
1027    ///
1028    /// If set, the value in the Option is for debugging and should be included
1029    /// in any error messages.
1030    pub fn is_filter_pushdown_audit(&self) -> Option<impl std::fmt::Debug + use<K, V, T, D>> {
1031        self.filter_pushdown_audit.clone()
1032    }
1033}
1034
1035/// A [Blob] object that has been fetched, but has no associated decoding
1036/// logic.
1037#[derive(Debug)]
1038pub(crate) struct EncodedPart<T> {
1039    metrics: ReadMetrics,
1040    registered_desc: Description<T>,
1041    part: BlobTraceBatchPart<T>,
1042    needs_truncation: bool,
1043    ts_rewrite: Option<Antichain<T>>,
1044}
1045
1046impl<K, V, T, D> FetchedPart<K, V, T, D>
1047where
1048    K: Debug + Codec,
1049    V: Debug + Codec,
1050    T: Timestamp + Lattice + Codec64,
1051    D: Monoid + Codec64 + Send + Sync,
1052{
1053    /// [Self::next] but optionally providing a `K` and `V` for alloc reuse.
1054    ///
1055    /// When `result_override` is specified, return it instead of decoding data.
1056    /// This is used when we know the decoded result will be ignored.
1057    pub fn next_with_storage(
1058        &mut self,
1059        key: &mut Option<K>,
1060        val: &mut Option<V>,
1061    ) -> Option<((K, V), T, D)> {
1062        let mut consolidated = self.peek_stash.take();
1063        loop {
1064            // Fetch and decode the next tuple in the sequence. (Or break if there is none.)
1065            let next = if self.part_cursor < self.timestamps.len() {
1066                let next_idx = self.part_cursor;
1067                self.part_cursor += 1;
1068                // These `to_le_bytes` calls were previously encapsulated by `ColumnarRecords`.
1069                // TODO(structured): re-encapsulate these once we've finished the structured migration.
1070                let mut t = T::decode(self.timestamps.values()[next_idx].to_le_bytes());
1071                if !self.ts_filter.filter_ts(&mut t) {
1072                    continue;
1073                }
1074                let d = D::decode(self.diffs.values()[next_idx].to_le_bytes());
1075                if d.is_zero() {
1076                    continue;
1077                }
1078                let kv = self.decode_kv(next_idx, key, val);
1079                (kv, t, d)
1080            } else {
1081                break;
1082            };
1083
1084            // Attempt to consolidate in the next tuple, stashing it if that's not possible.
1085            if let Some((kv, t, d)) = &mut consolidated {
1086                let (kv_next, t_next, d_next) = &next;
1087                if kv == kv_next && t == t_next {
1088                    d.plus_equals(d_next);
1089                    if d.is_zero() {
1090                        consolidated = None;
1091                    }
1092                } else {
1093                    self.peek_stash = Some(next);
1094                    break;
1095                }
1096            } else {
1097                consolidated = Some(next);
1098            }
1099        }
1100
1101        let (kv, t, d) = consolidated?;
1102
1103        Some((kv, t, d))
1104    }
1105
1106    fn decode_kv(&mut self, index: usize, key: &mut Option<K>, val: &mut Option<V>) -> (K, V) {
1107        let decoded = self
1108            .part
1109            .as_ref()
1110            .map_left(|codec| {
1111                let ((ck, cv), _, _) = codec.get(index).expect("valid index");
1112                let (k, v) = Self::decode_codec(
1113                    &*self.metrics,
1114                    self.migration.codec_read(),
1115                    ck,
1116                    cv,
1117                    key,
1118                    val,
1119                    &mut self.key_storage,
1120                    &mut self.val_storage,
1121                );
1122                (k.expect("valid legacy key"), v.expect("valid legacy value"))
1123            })
1124            .map_right(|(structured_key, structured_val)| {
1125                self.decode_structured(index, structured_key, structured_val, key, val)
1126            });
1127
1128        match decoded {
1129            EitherOrBoth::Both((k, v), (k_s, v_s)) => {
1130                // Purposefully do not trace to prevent blowing up Sentry.
1131                let is_valid = self
1132                    .metrics
1133                    .columnar
1134                    .arrow()
1135                    .key()
1136                    .report_valid(|| k_s == k);
1137                if !is_valid {
1138                    soft_panic_no_log!("structured key did not match, {k_s:?} != {k:?}");
1139                }
1140                // Purposefully do not trace to prevent blowing up Sentry.
1141                let is_valid = self
1142                    .metrics
1143                    .columnar
1144                    .arrow()
1145                    .val()
1146                    .report_valid(|| v_s == v);
1147                if !is_valid {
1148                    soft_panic_no_log!("structured val did not match, {v_s:?} != {v:?}");
1149                }
1150
1151                (k, v)
1152            }
1153            EitherOrBoth::Left(kv) => kv,
1154            EitherOrBoth::Right(kv) => kv,
1155        }
1156    }
1157
1158    fn decode_codec(
1159        metrics: &Metrics,
1160        read_schemas: &Schemas<K, V>,
1161        key_buf: &[u8],
1162        val_buf: &[u8],
1163        key: &mut Option<K>,
1164        val: &mut Option<V>,
1165        key_storage: &mut Option<K::Storage>,
1166        val_storage: &mut Option<V::Storage>,
1167    ) -> (Result<K, String>, Result<V, String>) {
1168        let k = metrics.codecs.key.decode(|| match key.take() {
1169            Some(mut key) => {
1170                match K::decode_from(&mut key, key_buf, key_storage, &read_schemas.key) {
1171                    Ok(()) => Ok(key),
1172                    Err(err) => Err(err),
1173                }
1174            }
1175            None => K::decode(key_buf, &read_schemas.key),
1176        });
1177        let v = metrics.codecs.val.decode(|| match val.take() {
1178            Some(mut val) => {
1179                match V::decode_from(&mut val, val_buf, val_storage, &read_schemas.val) {
1180                    Ok(()) => Ok(val),
1181                    Err(err) => Err(err),
1182                }
1183            }
1184            None => V::decode(val_buf, &read_schemas.val),
1185        });
1186        (k, v)
1187    }
1188
1189    fn decode_structured(
1190        &self,
1191        idx: usize,
1192        keys: &<K::Schema as Schema<K>>::Decoder,
1193        vals: &<V::Schema as Schema<V>>::Decoder,
1194        key: &mut Option<K>,
1195        val: &mut Option<V>,
1196    ) -> (K, V) {
1197        let mut key = key.take().unwrap_or_default();
1198        keys.decode(idx, &mut key);
1199
1200        let mut val = val.take().unwrap_or_default();
1201        vals.decode(idx, &mut val);
1202
1203        (key, val)
1204    }
1205}
1206
1207impl<K, V, T, D> Iterator for FetchedPart<K, V, T, D>
1208where
1209    K: Debug + Codec,
1210    V: Debug + Codec,
1211    T: Timestamp + Lattice + Codec64,
1212    D: Monoid + Codec64 + Send + Sync,
1213{
1214    type Item = ((K, V), T, D);
1215
1216    fn next(&mut self) -> Option<Self::Item> {
1217        self.next_with_storage(&mut None, &mut None)
1218    }
1219
1220    fn size_hint(&self) -> (usize, Option<usize>) {
1221        // We don't know in advance how restrictive the filter will be.
1222        let max_len = self.timestamps.len();
1223        (0, Some(max_len))
1224    }
1225}
1226
1227impl<T> EncodedPart<T>
1228where
1229    T: Timestamp + Lattice + Codec64,
1230{
1231    pub async fn fetch(
1232        cfg: &FetchConfig,
1233        shard_id: &ShardId,
1234        blob: &dyn Blob,
1235        metrics: &Metrics,
1236        shard_metrics: &ShardMetrics,
1237        read_metrics: &ReadMetrics,
1238        registered_desc: &Description<T>,
1239        part: &BatchPart<T>,
1240    ) -> Result<Self, BlobKey> {
1241        match part {
1242            BatchPart::Hollow(x) => {
1243                fetch_batch_part(
1244                    cfg,
1245                    shard_id,
1246                    blob,
1247                    metrics,
1248                    shard_metrics,
1249                    read_metrics,
1250                    registered_desc,
1251                    x,
1252                )
1253                .await
1254            }
1255            BatchPart::Inline {
1256                updates,
1257                ts_rewrite,
1258                ..
1259            } => Ok(EncodedPart::from_inline(
1260                cfg,
1261                metrics,
1262                read_metrics.clone(),
1263                registered_desc.clone(),
1264                updates,
1265                ts_rewrite.as_ref(),
1266            )),
1267        }
1268    }
1269
1270    pub(crate) fn from_inline(
1271        cfg: &FetchConfig,
1272        metrics: &Metrics,
1273        read_metrics: ReadMetrics,
1274        desc: Description<T>,
1275        x: &LazyInlineBatchPart,
1276        ts_rewrite: Option<&Antichain<T>>,
1277    ) -> Self {
1278        let parsed = x.decode(&metrics.columnar).expect("valid inline part");
1279        Self::new(cfg, read_metrics, desc, "inline", ts_rewrite, parsed)
1280    }
1281
1282    pub(crate) fn from_hollow(
1283        cfg: &FetchConfig,
1284        metrics: ReadMetrics,
1285        registered_desc: Description<T>,
1286        part: &HollowBatchPart<T>,
1287        parsed: BlobTraceBatchPart<T>,
1288    ) -> Self {
1289        Self::new(
1290            cfg,
1291            metrics,
1292            registered_desc,
1293            &part.key.0,
1294            part.ts_rewrite.as_ref(),
1295            parsed,
1296        )
1297    }
1298
1299    pub(crate) fn new(
1300        cfg: &FetchConfig,
1301        metrics: ReadMetrics,
1302        registered_desc: Description<T>,
1303        printable_name: &str,
1304        ts_rewrite: Option<&Antichain<T>>,
1305        parsed: BlobTraceBatchPart<T>,
1306    ) -> Self {
1307        // There are two types of batches in persist:
1308        // - Batches written by a persist user (either directly or indirectly
1309        //   via BatchBuilder). These always have a since of the minimum
1310        //   timestamp and may be registered in persist state with a tighter set
1311        //   of bounds than are inline in the batch (truncation). To read one of
1312        //   these batches, all data physically in the batch but outside of the
1313        //   truncated bounds must be ignored. Not every user batch is
1314        //   truncated.
1315        // - Batches written by compaction. These always have an inline desc
1316        //   lower and upper that matches the registered desc lower and upper,
1317        //   and a since that is less than or equal to the registered desc.
1318        //   The inline since may be less than the registered desc since,
1319        //   this is because of incremental compaction, where we might rewrite
1320        //   certain runs in a batch but not others.
1321        let inline_desc = &parsed.desc;
1322        let needs_truncation = inline_desc.lower() != registered_desc.lower()
1323            || inline_desc.upper() != registered_desc.upper();
1324        if needs_truncation {
1325            if cfg.validate_bounds_on_read {
1326                soft_assert_or_log!(
1327                    PartialOrder::less_equal(inline_desc.lower(), registered_desc.lower()),
1328                    "key={} inline={:?} registered={:?}",
1329                    printable_name,
1330                    inline_desc,
1331                    registered_desc
1332                );
1333
1334                if ts_rewrite.is_none() {
1335                    // The ts rewrite feature allows us to advance the registered
1336                    // upper of a batch that's already been staged (the inline
1337                    // upper), so if it's been used, then there's no useful
1338                    // invariant that we can assert here.
1339                    soft_assert_or_log!(
1340                        PartialOrder::less_equal(registered_desc.upper(), inline_desc.upper()),
1341                        "key={} inline={:?} registered={:?}",
1342                        printable_name,
1343                        inline_desc,
1344                        registered_desc
1345                    );
1346                }
1347            }
1348            // As mentioned above, batches that needs truncation will always have a
1349            // since of the minimum timestamp. Technically we could truncate any
1350            // batch where the since is less_than the output_desc's lower, but we're
1351            // strict here so we don't get any surprises.
1352            assert_eq!(
1353                inline_desc.since(),
1354                &Antichain::from_elem(T::minimum()),
1355                "key={} inline={:?} registered={:?}",
1356                printable_name,
1357                inline_desc,
1358                registered_desc
1359            );
1360        } else {
1361            soft_assert_or_log!(
1362                PartialOrder::less_equal(inline_desc.since(), registered_desc.since()),
1363                "key={} inline={:?} registered={:?}",
1364                printable_name,
1365                inline_desc,
1366                registered_desc
1367            );
1368            assert_eq!(
1369                inline_desc.lower(),
1370                registered_desc.lower(),
1371                "key={} inline={:?} registered={:?}",
1372                printable_name,
1373                inline_desc,
1374                registered_desc
1375            );
1376            assert_eq!(
1377                inline_desc.upper(),
1378                registered_desc.upper(),
1379                "key={} inline={:?} registered={:?}",
1380                printable_name,
1381                inline_desc,
1382                registered_desc
1383            );
1384        }
1385
1386        EncodedPart {
1387            metrics,
1388            registered_desc,
1389            part: parsed,
1390            needs_truncation,
1391            ts_rewrite: ts_rewrite.cloned(),
1392        }
1393    }
1394
1395    pub(crate) fn maybe_unconsolidated(&self) -> bool {
1396        // At time of writing, only user parts may be unconsolidated, and they are always
1397        // written with a since of [T::minimum()].
1398        self.part.desc.since().borrow() == AntichainRef::new(&[T::minimum()])
1399    }
1400
1401    pub(crate) fn updates(&self) -> &BlobTraceUpdates {
1402        &self.part.updates
1403    }
1404
1405    /// Returns the updates with all truncation / timestamp rewriting applied.
1406    pub(crate) fn normalize(&self, metrics: &ColumnarMetrics) -> BlobTraceUpdates {
1407        let updates = self.part.updates.clone();
1408        if !self.needs_truncation && self.ts_rewrite.is_none() {
1409            return updates;
1410        }
1411
1412        let mut codec = updates
1413            .records()
1414            .map(|r| (r.keys().clone(), r.vals().clone()));
1415        let mut structured = updates.structured().cloned();
1416        let mut timestamps = updates.timestamps().clone();
1417        let mut diffs = updates.diffs().clone();
1418
1419        if let Some(rewrite) = self.ts_rewrite.as_ref() {
1420            timestamps = arrow::compute::unary(&timestamps, |i: i64| {
1421                let mut t = T::decode(i.to_le_bytes());
1422                t.advance_by(rewrite.borrow());
1423                i64::from_le_bytes(T::encode(&t))
1424            });
1425        }
1426
1427        let reallocated = if self.needs_truncation {
1428            let filter = BooleanArray::from_unary(&timestamps, |i| {
1429                let t = T::decode(i.to_le_bytes());
1430                let truncate_t = {
1431                    !self.registered_desc.lower().less_equal(&t)
1432                        || self.registered_desc.upper().less_equal(&t)
1433                };
1434                !truncate_t
1435            });
1436            if filter.false_count() == 0 {
1437                // If we're not filtering anything in practice, skip filtering and reallocating.
1438                false
1439            } else {
1440                let filter = FilterBuilder::new(&filter).optimize().build();
1441                let do_filter = |array: &dyn Array| filter.filter(array).expect("valid filter len");
1442                if let Some((keys, vals)) = codec {
1443                    codec = Some((
1444                        realloc_array(do_filter(&keys).as_binary(), metrics),
1445                        realloc_array(do_filter(&vals).as_binary(), metrics),
1446                    ));
1447                }
1448                if let Some(ext) = structured {
1449                    structured = Some(ColumnarRecordsStructuredExt {
1450                        key: realloc_any(do_filter(&*ext.key), metrics),
1451                        val: realloc_any(do_filter(&*ext.val), metrics),
1452                    });
1453                }
1454                timestamps = realloc_array(do_filter(&timestamps).as_primitive(), metrics);
1455                diffs = realloc_array(do_filter(&diffs).as_primitive(), metrics);
1456                true
1457            }
1458        } else {
1459            false
1460        };
1461
1462        if self.ts_rewrite.is_some() && !reallocated {
1463            timestamps = realloc_array(&timestamps, metrics);
1464        }
1465
1466        if self.ts_rewrite.is_some() {
1467            self.metrics
1468                .ts_rewrite
1469                .inc_by(u64::cast_from(timestamps.len()));
1470        }
1471
1472        match (codec, structured) {
1473            (Some((key, value)), None) => {
1474                BlobTraceUpdates::Row(ColumnarRecords::new(key, value, timestamps, diffs))
1475            }
1476            (Some((key, value)), Some(ext)) => {
1477                BlobTraceUpdates::Both(ColumnarRecords::new(key, value, timestamps, diffs), ext)
1478            }
1479            (None, Some(ext)) => BlobTraceUpdates::Structured {
1480                key_values: ext,
1481                timestamps,
1482                diffs,
1483            },
1484            (None, None) => unreachable!(),
1485        }
1486    }
1487}
1488
1489/// This represents the serde encoding for [`LeasedBatchPart`]. We expose the struct
1490/// itself (unlike other encodable structs) to attempt to provide stricter drop
1491/// semantics on `LeasedBatchPart`, i.e. `SerdeLeasedBatchPart` is exchangeable
1492/// (including over the network), where `LeasedBatchPart` is not.
1493///
1494/// For more details see documentation and comments on:
1495/// - [`LeasedBatchPart`]
1496/// - `From<SerdeLeasedBatchPart>` for `LeasedBatchPart<T>`
1497#[derive(Debug, Serialize, Deserialize, Clone)]
1498pub struct ExchangeableBatchPart<T> {
1499    shard_id: ShardId,
1500    // Duplicated with the one serialized in the proto for use in backpressure.
1501    encoded_size_bytes: usize,
1502    desc: Description<T>,
1503    filter: FetchBatchFilter<T>,
1504    part: LazyProto<ProtoHollowBatchPart>,
1505    /// The id of the reader that leased this part. See the corresponding field
1506    /// on [LeasedBatchPart].
1507    reader_id: LeasedReaderId,
1508    filter_pushdown_audit: bool,
1509}
1510
1511impl<T> ExchangeableBatchPart<T> {
1512    /// Returns the encoded size of the given part.
1513    pub fn encoded_size_bytes(&self) -> usize {
1514        self.encoded_size_bytes
1515    }
1516
1517    /// Returns the id of the reader that leased this part.
1518    pub fn reader_id(&self) -> &LeasedReaderId {
1519        &self.reader_id
1520    }
1521}
1522
1523/// Format we'll use when decoding a [`Part`].
1524///
1525/// [`Part`]: mz_persist_types::part::Part
1526#[derive(Debug, Copy, Clone)]
1527pub enum PartDecodeFormat {
1528    /// Decode from opaque `Codec` data.
1529    Row {
1530        /// Will also decode the structured data, and validate it matches.
1531        validate_structured: bool,
1532    },
1533    /// Decode from arrow data
1534    Arrow,
1535}
1536
1537impl PartDecodeFormat {
1538    /// Returns a default value for [`PartDecodeFormat`].
1539    pub const fn default() -> Self {
1540        PartDecodeFormat::Arrow
1541    }
1542
1543    /// Parses a [`PartDecodeFormat`] from the provided string, falling back to the default if the
1544    /// provided value is unrecognized.
1545    pub fn from_str(s: &str) -> Self {
1546        match s {
1547            "row" => PartDecodeFormat::Row {
1548                validate_structured: false,
1549            },
1550            "row_with_validate" => PartDecodeFormat::Row {
1551                validate_structured: true,
1552            },
1553            "arrow" => PartDecodeFormat::Arrow,
1554            x => {
1555                let default = PartDecodeFormat::default();
1556                soft_panic_or_log!("Invalid part decode format: '{x}', falling back to {default}");
1557                default
1558            }
1559        }
1560    }
1561
1562    /// Returns a string representation of [`PartDecodeFormat`].
1563    pub const fn as_str(&self) -> &'static str {
1564        match self {
1565            PartDecodeFormat::Row {
1566                validate_structured: false,
1567            } => "row",
1568            PartDecodeFormat::Row {
1569                validate_structured: true,
1570            } => "row_with_validate",
1571            PartDecodeFormat::Arrow => "arrow",
1572        }
1573    }
1574}
1575
1576impl fmt::Display for PartDecodeFormat {
1577    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1578        f.write_str(self.as_str())
1579    }
1580}
1581
1582#[mz_ore::test]
1583fn client_exchange_data() {
1584    // The whole point of SerdeLeasedBatchPart is that it can be exchanged
1585    // between timely workers, including over the network. Enforce then that it
1586    // implements ExchangeData.
1587    fn is_exchange_data<T: timely::ExchangeData>() {}
1588    is_exchange_data::<ExchangeableBatchPart<u64>>();
1589    is_exchange_data::<ExchangeableBatchPart<u64>>();
1590}