mz_storage/source/mysql/
statistics.rs
1use std::cell::{Cell, RefCell};
13
14use futures::StreamExt;
15use mz_storage_types::dyncfgs::MYSQL_OFFSET_KNOWN_INTERVAL;
16use timely::dataflow::operators::Map;
17use timely::dataflow::{Scope, Stream};
18use timely::progress::Antichain;
19
20use mz_mysql_util::query_sys_var;
21use mz_ore::future::InTask;
22use mz_storage_types::sources::MySqlSourceConnection;
23use mz_storage_types::sources::mysql::{GtidPartition, GtidState, gtid_set_frontier};
24use mz_timely_util::builder_async::{OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton};
25
26use crate::source::types::{Probe, ProgressStatisticsUpdate};
27use crate::source::{RawSourceCreationConfig, probe};
28
29use super::{ReplicationError, TransientError};
30
31static STATISTICS: &str = "statistics";
32
33pub(crate) fn render<G: Scope<Timestamp = GtidPartition>>(
35 scope: G,
36 config: RawSourceCreationConfig,
37 connection: MySqlSourceConnection,
38 resume_uppers: impl futures::Stream<Item = Antichain<GtidPartition>> + 'static,
39) -> (
40 Stream<G, ProgressStatisticsUpdate>,
41 Stream<G, ReplicationError>,
42 Stream<G, Probe<GtidPartition>>,
43 PressOnDropButton,
44) {
45 let op_name = format!("MySqlStatistics({})", config.id);
46 let mut builder = AsyncOperatorBuilder::new(op_name, scope);
47
48 let (stats_output, stats_stream) = builder.new_output();
49 let (probe_output, probe_stream) = builder.new_output();
50
51 let (button, transient_errors) = builder.build_fallible::<TransientError, _>(move |caps| {
54 Box::pin(async move {
55 let worker_id = config.worker_id;
56 let [stats_cap, probe_cap]: &mut [_; 2] = caps.try_into().unwrap();
57
58 if !config.responsible_for(STATISTICS) {
60 stats_output.give(
62 &stats_cap[0],
63 ProgressStatisticsUpdate::SteadyState {
64 offset_known: 0,
65 offset_committed: 0,
66 },
67 );
68 return Ok(());
69 }
70
71 let connection_config = connection
72 .connection
73 .config(
74 &config.config.connection_context.secrets_reader,
75 &config.config,
76 InTask::Yes,
77 )
78 .await?;
79
80 let mut stats_conn = connection_config
81 .connect(
82 &format!("timely-{worker_id} MySQL replication statistics"),
83 &config.config.connection_context.ssh_tunnel_manager,
84 )
85 .await?;
86
87 tokio::pin!(resume_uppers);
88
89 let prev_offset_known = Cell::new(None);
90 let prev_offset_committed = Cell::new(None);
91 let stats_output = RefCell::new(stats_output);
92
93 let mut probe_ticker = probe::Ticker::new(
94 || MYSQL_OFFSET_KNOWN_INTERVAL.get(config.config.config_set()),
95 config.now_fn,
96 );
97 let probe_loop = async {
98 loop {
99 let probe_ts = probe_ticker.tick().await;
100
101 let gtid_executed =
102 query_sys_var(&mut stats_conn, "global.gtid_executed").await?;
103 let upstream_frontier =
106 gtid_set_frontier(>id_executed).map_err(TransientError::from)?;
107
108 let offset_known = aggregate_mysql_frontier(&upstream_frontier);
109 if let Some(offset_committed) = prev_offset_committed.get() {
110 stats_output.borrow_mut().give(
111 &stats_cap[0],
112 ProgressStatisticsUpdate::SteadyState {
113 offset_known,
114 offset_committed,
115 },
116 );
117 }
118 prev_offset_known.set(Some(offset_known));
119
120 probe_output.give(
121 &probe_cap[0],
122 Probe {
123 probe_ts,
124 upstream_frontier,
125 },
126 );
127 }
128 };
129 let commit_loop = async {
130 while let Some(committed_frontier) = resume_uppers.next().await {
131 let offset_committed = aggregate_mysql_frontier(&committed_frontier);
132 if let Some(offset_known) = prev_offset_known.get() {
133 stats_output.borrow_mut().give(
134 &stats_cap[0],
135 ProgressStatisticsUpdate::SteadyState {
136 offset_known,
137 offset_committed,
138 },
139 );
140 }
141 prev_offset_committed.set(Some(offset_committed));
142 }
143 };
144
145 futures::future::join(probe_loop, commit_loop).await.0
146 })
147 });
148
149 (
150 stats_stream,
151 transient_errors.map(ReplicationError::from),
152 probe_stream,
153 button.press_on_drop(),
154 )
155}
156
157fn aggregate_mysql_frontier(frontier: &Antichain<GtidPartition>) -> u64 {
160 let mut progress_stat = 0;
161 for ts in frontier.iter() {
162 if let Some(_uuid) = ts.interval().singleton() {
163 let ts = match ts.timestamp() {
165 GtidState::Absent => 0,
166 GtidState::Active(id) => id.get().saturating_sub(1),
169 };
170 progress_stat += ts;
171 }
172 }
173 progress_stat
174}