1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
910use futures::StreamExt;
11use mz_repr::GlobalId;
12use mz_storage_types::sources::SourceTimestamp;
13use mz_timely_util::builder_async::{Event as AsyncEvent, OperatorBuilder as AsyncOperatorBuilder};
14use timely::dataflow::channels::pact::Pipeline;
15use timely::dataflow::{Scope, Stream};
1617use crate::source::types::ProgressStatisticsUpdate;
18use crate::statistics::SourceStatistics;
1920pub fn process_statistics<G, FromTime>(
21 scope: G,
22 source_id: GlobalId,
23 worker_id: usize,
24 stats_stream: Stream<G, ProgressStatisticsUpdate>,
25 source_statistics: SourceStatistics,
26) where
27G: Scope<Timestamp = FromTime>,
28 FromTime: SourceTimestamp,
29{
30let name = format!("SourceProgressStats({})", source_id);
31let mut builder = AsyncOperatorBuilder::new(name, scope.clone());
3233let mut input = builder.new_disconnected_input(&stats_stream, Pipeline);
3435 builder.build(move |caps| async move {
36 drop(caps);
3738while let Some(event) = input.next().await {
39let AsyncEvent::Data(_, data) = event else {
40continue;
41 };
42tracing::debug!(
43?data,
44 %source_id,
45 %worker_id,
46"timely-{worker_id} received \
47 source progress statistics update"
48);
4950for d in data {
51match d {
52 ProgressStatisticsUpdate::Snapshot {
53 records_known,
54 records_staged,
55 } => {
56 source_statistics.set_snapshot_records_known(records_known);
57 source_statistics.set_snapshot_records_staged(records_staged);
58 }
59 ProgressStatisticsUpdate::SteadyState {
60mut offset_known,
61 offset_committed,
62 } => {
63// There are two reasons `offset_known` could be below
64 // `offset_committed`:
65 // - A source implementation only periodically fetches `offset_known`,
66 // but drives offset_committed based on the data its received. This is
67 // the case for sources like Kafka.
68 // - Some irrecoverable restore has regressed `offset_known`. This is
69 // possible for sources like Postgres, and the these metrics are NOT
70 // the intended signal for this failure (source status is).
71offset_known = std::cmp::max(offset_known, offset_committed);
7273 source_statistics.set_offset_known(offset_known);
74 source_statistics.set_offset_committed(offset_committed);
75 }
76 }
77 }
78 }
79 });
80}