mz_storage/source/mysql/
statistics.rs1use futures::StreamExt;
13use mz_storage_types::dyncfgs::MYSQL_OFFSET_KNOWN_INTERVAL;
14use timely::dataflow::operators::Map;
15use timely::dataflow::{Scope, Stream};
16use timely::progress::Antichain;
17
18use mz_mysql_util::query_sys_var;
19use mz_ore::future::InTask;
20use mz_storage_types::sources::MySqlSourceConnection;
21use mz_storage_types::sources::mysql::{GtidPartition, GtidState, gtid_set_frontier};
22use mz_timely_util::builder_async::{OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton};
23
24use crate::source::types::Probe;
25use crate::source::{RawSourceCreationConfig, probe};
26
27use super::{ReplicationError, TransientError};
28
29static STATISTICS: &str = "statistics";
30
31pub(crate) fn render<G: Scope<Timestamp = GtidPartition>>(
33 scope: G,
34 config: RawSourceCreationConfig,
35 connection: MySqlSourceConnection,
36 resume_uppers: impl futures::Stream<Item = Antichain<GtidPartition>> + 'static,
37) -> (
38 Stream<G, ReplicationError>,
39 Stream<G, Probe<GtidPartition>>,
40 PressOnDropButton,
41) {
42 let op_name = format!("MySqlStatistics({})", config.id);
43 let mut builder = AsyncOperatorBuilder::new(op_name, scope);
44
45 let (probe_output, probe_stream) = builder.new_output();
46
47 let (button, transient_errors) = builder.build_fallible::<TransientError, _>(move |caps| {
49 Box::pin(async move {
50 let worker_id = config.worker_id;
51 let [probe_cap]: &mut [_; 1] = caps.try_into().unwrap();
52
53 if !config.responsible_for(STATISTICS) {
55 for stat in config.statistics.values() {
57 stat.set_offset_known(0);
58 stat.set_offset_committed(0);
59 }
60 return Ok(());
61 }
62
63 let connection_config = connection
64 .connection
65 .config(
66 &config.config.connection_context.secrets_reader,
67 &config.config,
68 InTask::Yes,
69 )
70 .await?;
71
72 let mut stats_conn = connection_config
73 .connect(
74 &format!("timely-{worker_id} MySQL replication statistics"),
75 &config.config.connection_context.ssh_tunnel_manager,
76 )
77 .await?;
78
79 tokio::pin!(resume_uppers);
80 let mut probe_ticker = probe::Ticker::new(
81 || MYSQL_OFFSET_KNOWN_INTERVAL.get(config.config.config_set()),
82 config.now_fn,
83 );
84
85 let probe_loop = async {
86 loop {
87 let probe_ts = probe_ticker.tick().await;
88
89 let gtid_executed =
90 query_sys_var(&mut stats_conn, "global.gtid_executed").await?;
91 let upstream_frontier =
94 gtid_set_frontier(>id_executed).map_err(TransientError::from)?;
95
96 let offset_known = aggregate_mysql_frontier(&upstream_frontier);
97 for stat in config.statistics.values() {
98 stat.set_offset_known(offset_known);
99 }
100 probe_output.give(
101 &probe_cap[0],
102 Probe {
103 probe_ts,
104 upstream_frontier,
105 },
106 );
107 }
108 };
109 let commit_loop = async {
110 while let Some(committed_frontier) = resume_uppers.next().await {
111 let offset_committed = aggregate_mysql_frontier(&committed_frontier);
112 for stat in config.statistics.values() {
113 stat.set_offset_committed(offset_committed);
114 }
115 }
116 };
117
118 futures::future::join(probe_loop, commit_loop).await.0
119 })
120 });
121
122 (
123 transient_errors.map(ReplicationError::from),
124 probe_stream,
125 button.press_on_drop(),
126 )
127}
128
129fn aggregate_mysql_frontier(frontier: &Antichain<GtidPartition>) -> u64 {
132 let mut progress_stat = 0;
133 for ts in frontier.iter() {
134 if let Some(_uuid) = ts.interval().singleton() {
135 let ts = match ts.timestamp() {
137 GtidState::Absent => 0,
138 GtidState::Active(id) => id.get().saturating_sub(1),
141 };
142 progress_stat += ts;
143 }
144 }
145 progress_stat
146}