mz_storage_operators/
stats.rs1use mz_expr::{ResultSpec, SafeMfpPlan};
13use mz_persist_client::metrics::Metrics;
14use mz_persist_client::read::{Cursor, LazyPartStats, ReadHandle, Since};
15use mz_repr::{RelationDesc, Row, Timestamp};
16use mz_storage_types::StorageDiff;
17use mz_storage_types::controller::TxnsCodecRow;
18use mz_storage_types::errors::DataflowError;
19use mz_storage_types::sources::SourceData;
20use mz_storage_types::stats::RelationPartStats;
21use mz_txn_wal::txn_cache::TxnsCache;
22use timely::progress::Antichain;
23
24pub struct StatsCursor {
32 errors: Cursor<SourceData, (), Timestamp, StorageDiff>,
33 data: Cursor<SourceData, (), Timestamp, StorageDiff>,
34}
35
36impl StatsCursor {
37 pub async fn new(
38 handle: &mut ReadHandle<SourceData, (), Timestamp, StorageDiff>,
39 txns_read: Option<&mut TxnsCache<Timestamp, TxnsCodecRow>>,
43 metrics: &Metrics,
44 mfp_plan: &SafeMfpPlan,
45 desc: &RelationDesc,
46 as_of: Antichain<Timestamp>,
47 ) -> Result<StatsCursor, Since<Timestamp>> {
48 let should_fetch = |name: &'static str, errors: bool| {
49 move |stats: Option<&LazyPartStats>| {
50 let Some(stats) = stats else { return true };
51 let stats = stats.decode();
52 let metrics = &metrics.pushdown.part_stats;
53 let relation_stats = RelationPartStats::new(name, metrics, desc, &stats);
54 if errors {
55 relation_stats.err_count().map_or(true, |e| e > 0)
56 } else {
57 relation_stats.may_match_mfp(ResultSpec::value_all(), mfp_plan)
58 }
59 }
60 };
61 let (errors, data) = match txns_read {
62 None => {
63 let errors = handle
64 .snapshot_cursor(as_of.clone(), should_fetch("errors", true))
65 .await?;
66 let data = handle
67 .snapshot_cursor(as_of.clone(), should_fetch("data", false))
68 .await?;
69 (errors, data)
70 }
71 Some(txns_read) => {
72 let as_of = as_of
73 .as_option()
74 .expect("reads as_of empty antichain block forever")
75 .clone();
76 let _ = txns_read.update_gt(&as_of).await;
77 let data_snapshot = txns_read.data_snapshot(handle.shard_id(), as_of);
78 let errors: Cursor<SourceData, (), Timestamp, i64> = data_snapshot
79 .snapshot_cursor(handle, should_fetch("errors", true))
80 .await?;
81 let data = data_snapshot
82 .snapshot_cursor(handle, should_fetch("data", false))
83 .await?;
84 (errors, data)
85 }
86 };
87
88 Ok(StatsCursor { errors, data })
89 }
90
91 pub async fn next(
92 &mut self,
93 ) -> Option<impl Iterator<Item = (Result<Row, DataflowError>, Timestamp, StorageDiff)> + '_>
94 {
95 fn expect_decode(
96 raw: impl Iterator<
97 Item = (
98 (Result<SourceData, String>, Result<(), String>),
99 Timestamp,
100 StorageDiff,
101 ),
102 >,
103 is_err: bool,
104 ) -> impl Iterator<Item = (Result<Row, DataflowError>, Timestamp, StorageDiff)> {
105 raw.map(|((k, v), t, d)| {
106 let SourceData(row) = k.expect("decode error");
108 let () = v.expect("decode error");
109 (row, t, d)
110 })
111 .filter(move |(r, _, _)| if is_err { r.is_err() } else { r.is_ok() })
112 }
113
114 if let Some(errors) = self.errors.next().await {
115 Some(expect_decode(errors, true))
116 } else if let Some(data) = self.data.next().await {
117 Some(expect_decode(data, false))
118 } else {
119 None
120 }
121 }
122}