mz_storage_operators/
stats.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Types and traits that connect up our mz-repr types with the stats that persist maintains.
11
12use mz_expr::{ResultSpec, SafeMfpPlan};
13use mz_persist_client::metrics::Metrics;
14use mz_persist_client::read::{Cursor, LazyPartStats, ReadHandle, Since};
15use mz_repr::{RelationDesc, Row, Timestamp};
16use mz_storage_types::StorageDiff;
17use mz_storage_types::controller::TxnsCodecRow;
18use mz_storage_types::errors::DataflowError;
19use mz_storage_types::sources::SourceData;
20use mz_storage_types::stats::RelationPartStats;
21use mz_txn_wal::txn_cache::TxnsCache;
22use timely::progress::Antichain;
23
24/// This is a streaming-consolidating cursor type specialized to `RelationDesc`.
25///
26/// Internally this maintains two separate cursors: one for errors and one for data.
27/// This is necessary so that errors are presented before data, which matches our usual
28/// lookup semantics. To avoid being ludicrously inefficient, this pushes down a filter
29/// on the stats. (In particular, in the common case of no errors, we don't do any extra
30/// fetching.)
31pub struct StatsCursor {
32    errors: Cursor<SourceData, (), Timestamp, StorageDiff>,
33    data: Cursor<SourceData, (), Timestamp, StorageDiff>,
34}
35
36impl StatsCursor {
37    pub async fn new(
38        handle: &mut ReadHandle<SourceData, (), Timestamp, StorageDiff>,
39        // If and only if we are using txn-wal to manage this shard, then
40        // this must be Some. This is because the upper might be advanced lazily
41        // and we have to go through txn-wal for reads.
42        txns_read: Option<&mut TxnsCache<Timestamp, TxnsCodecRow>>,
43        metrics: &Metrics,
44        mfp_plan: &SafeMfpPlan,
45        desc: &RelationDesc,
46        as_of: Antichain<Timestamp>,
47    ) -> Result<StatsCursor, Since<Timestamp>> {
48        let should_fetch = |name: &'static str, errors: bool| {
49            move |stats: Option<&LazyPartStats>| {
50                let Some(stats) = stats else { return true };
51                let stats = stats.decode();
52                let metrics = &metrics.pushdown.part_stats;
53                let relation_stats = RelationPartStats::new(name, metrics, desc, &stats);
54                if errors {
55                    relation_stats.err_count().map_or(true, |e| e > 0)
56                } else {
57                    relation_stats.may_match_mfp(ResultSpec::value_all(), mfp_plan)
58                }
59            }
60        };
61        let (errors, data) = match txns_read {
62            None => {
63                let errors = handle
64                    .snapshot_cursor(as_of.clone(), should_fetch("errors", true))
65                    .await?;
66                let data = handle
67                    .snapshot_cursor(as_of.clone(), should_fetch("data", false))
68                    .await?;
69                (errors, data)
70            }
71            Some(txns_read) => {
72                let as_of = as_of
73                    .as_option()
74                    .expect("reads as_of empty antichain block forever")
75                    .clone();
76                let _ = txns_read.update_gt(&as_of).await;
77                let data_snapshot = txns_read.data_snapshot(handle.shard_id(), as_of);
78                let errors: Cursor<SourceData, (), Timestamp, i64> = data_snapshot
79                    .snapshot_cursor(handle, should_fetch("errors", true))
80                    .await?;
81                let data = data_snapshot
82                    .snapshot_cursor(handle, should_fetch("data", false))
83                    .await?;
84                (errors, data)
85            }
86        };
87
88        Ok(StatsCursor { errors, data })
89    }
90
91    pub async fn next(
92        &mut self,
93    ) -> Option<impl Iterator<Item = (Result<Row, DataflowError>, Timestamp, StorageDiff)> + '_>
94    {
95        fn expect_decode(
96            raw: impl Iterator<
97                Item = (
98                    (Result<SourceData, String>, Result<(), String>),
99                    Timestamp,
100                    StorageDiff,
101                ),
102            >,
103            is_err: bool,
104        ) -> impl Iterator<Item = (Result<Row, DataflowError>, Timestamp, StorageDiff)> {
105            raw.map(|((k, v), t, d)| {
106                // NB: this matches the decode behaviour in sources
107                let SourceData(row) = k.expect("decode error");
108                let () = v.expect("decode error");
109                (row, t, d)
110            })
111            .filter(move |(r, _, _)| if is_err { r.is_err() } else { r.is_ok() })
112        }
113
114        if let Some(errors) = self.errors.next().await {
115            Some(expect_decode(errors, true))
116        } else if let Some(data) = self.data.next().await {
117            Some(expect_decode(data, false))
118        } else {
119            None
120        }
121    }
122}