mz_storage/source/
statistics.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use futures::StreamExt;
11use mz_repr::GlobalId;
12use mz_storage_types::sources::SourceTimestamp;
13use mz_timely_util::builder_async::{Event as AsyncEvent, OperatorBuilder as AsyncOperatorBuilder};
14use timely::dataflow::channels::pact::Pipeline;
15use timely::dataflow::{Scope, Stream};
16
17use crate::source::types::ProgressStatisticsUpdate;
18use crate::statistics::SourceStatistics;
19
20pub fn process_statistics<G, FromTime>(
21    scope: G,
22    source_id: GlobalId,
23    worker_id: usize,
24    stats_stream: Stream<G, ProgressStatisticsUpdate>,
25    source_statistics: SourceStatistics,
26) where
27    G: Scope<Timestamp = FromTime>,
28    FromTime: SourceTimestamp,
29{
30    let name = format!("SourceProgressStats({})", source_id);
31    let mut builder = AsyncOperatorBuilder::new(name, scope.clone());
32
33    let mut input = builder.new_disconnected_input(&stats_stream, Pipeline);
34
35    builder.build(move |caps| async move {
36        drop(caps);
37
38        while let Some(event) = input.next().await {
39            let AsyncEvent::Data(_, data) = event else {
40                continue;
41            };
42            tracing::debug!(
43                ?data,
44                %source_id,
45                %worker_id,
46                "timely-{worker_id} received \
47                    source progress statistics update"
48            );
49
50            for d in data {
51                match d {
52                    ProgressStatisticsUpdate::Snapshot {
53                        records_known,
54                        records_staged,
55                    } => {
56                        source_statistics.set_snapshot_records_known(records_known);
57                        source_statistics.set_snapshot_records_staged(records_staged);
58                    }
59                    ProgressStatisticsUpdate::SteadyState {
60                        mut offset_known,
61                        offset_committed,
62                    } => {
63                        // There are two reasons `offset_known` could be below
64                        // `offset_committed`:
65                        // - A source implementation only periodically fetches `offset_known`,
66                        // but drives offset_committed based on the data its received. This is
67                        // the case for sources like Kafka.
68                        // - Some irrecoverable restore has regressed `offset_known`. This is
69                        // possible for sources like Postgres, and the these metrics are NOT
70                        // the intended signal for this failure (source status is).
71                        offset_known = std::cmp::max(offset_known, offset_committed);
72
73                        source_statistics.set_offset_known(offset_known);
74                        source_statistics.set_offset_committed(offset_committed);
75                    }
76                }
77            }
78        }
79    });
80}