mz_storage/source/
statistics.rs1use futures::StreamExt;
11use mz_repr::GlobalId;
12use mz_storage_types::sources::SourceTimestamp;
13use mz_timely_util::builder_async::{Event as AsyncEvent, OperatorBuilder as AsyncOperatorBuilder};
14use timely::dataflow::channels::pact::Pipeline;
15use timely::dataflow::{Scope, Stream};
16
17use crate::source::types::ProgressStatisticsUpdate;
18use crate::statistics::SourceStatistics;
19
20pub fn process_statistics<G, FromTime>(
21 scope: G,
22 source_id: GlobalId,
23 worker_id: usize,
24 stats_stream: Stream<G, ProgressStatisticsUpdate>,
25 source_statistics: SourceStatistics,
26) where
27 G: Scope<Timestamp = FromTime>,
28 FromTime: SourceTimestamp,
29{
30 let name = format!("SourceProgressStats({})", source_id);
31 let mut builder = AsyncOperatorBuilder::new(name, scope.clone());
32
33 let mut input = builder.new_disconnected_input(&stats_stream, Pipeline);
34
35 builder.build(move |caps| async move {
36 drop(caps);
37
38 while let Some(event) = input.next().await {
39 let AsyncEvent::Data(_, data) = event else {
40 continue;
41 };
42 tracing::debug!(
43 ?data,
44 %source_id,
45 %worker_id,
46 "timely-{worker_id} received \
47 source progress statistics update"
48 );
49
50 for d in data {
51 match d {
52 ProgressStatisticsUpdate::Snapshot {
53 records_known,
54 records_staged,
55 } => {
56 source_statistics.set_snapshot_records_known(records_known);
57 source_statistics.set_snapshot_records_staged(records_staged);
58 }
59 ProgressStatisticsUpdate::SteadyState {
60 mut offset_known,
61 offset_committed,
62 } => {
63 offset_known = std::cmp::max(offset_known, offset_committed);
72
73 source_statistics.set_offset_known(offset_known);
74 source_statistics.set_offset_committed(offset_committed);
75 }
76 }
77 }
78 }
79 });
80}