mz_storage/source/mysql/
statistics.rs1use futures::StreamExt;
13use mz_storage_types::dyncfgs::MYSQL_OFFSET_KNOWN_INTERVAL;
14use timely::container::CapacityContainerBuilder;
15use timely::dataflow::operators::Map;
16use timely::dataflow::{Scope, Stream};
17use timely::progress::Antichain;
18
19use mz_mysql_util::query_sys_var;
20use mz_ore::future::InTask;
21use mz_storage_types::sources::MySqlSourceConnection;
22use mz_storage_types::sources::mysql::{GtidPartition, GtidState, gtid_set_frontier};
23use mz_timely_util::builder_async::{OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton};
24
25use crate::source::types::Probe;
26use crate::source::{RawSourceCreationConfig, probe};
27
28use super::{ReplicationError, TransientError};
29
30static STATISTICS: &str = "statistics";
31
32pub(crate) fn render<G: Scope<Timestamp = GtidPartition>>(
34 scope: G,
35 config: RawSourceCreationConfig,
36 connection: MySqlSourceConnection,
37 resume_uppers: impl futures::Stream<Item = Antichain<GtidPartition>> + 'static,
38) -> (
39 Stream<G, ReplicationError>,
40 Stream<G, Probe<GtidPartition>>,
41 PressOnDropButton,
42) {
43 let op_name = format!("MySqlStatistics({})", config.id);
44 let mut builder = AsyncOperatorBuilder::new(op_name, scope);
45
46 let (probe_output, probe_stream) = builder.new_output::<CapacityContainerBuilder<_>>();
47
48 let (button, transient_errors) = builder.build_fallible::<TransientError, _>(move |caps| {
50 Box::pin(async move {
51 let worker_id = config.worker_id;
52 let [probe_cap]: &mut [_; 1] = caps.try_into().unwrap();
53
54 if !config.responsible_for(STATISTICS) {
56 for stat in config.statistics.values() {
58 stat.set_offset_known(0);
59 stat.set_offset_committed(0);
60 }
61 return Ok(());
62 }
63
64 let connection_config = connection
65 .connection
66 .config(
67 &config.config.connection_context.secrets_reader,
68 &config.config,
69 InTask::Yes,
70 )
71 .await?;
72
73 let mut stats_conn = connection_config
74 .connect(
75 &format!("timely-{worker_id} MySQL replication statistics"),
76 &config.config.connection_context.ssh_tunnel_manager,
77 )
78 .await?;
79
80 tokio::pin!(resume_uppers);
81 let mut probe_ticker = probe::Ticker::new(
82 || MYSQL_OFFSET_KNOWN_INTERVAL.get(config.config.config_set()),
83 config.now_fn,
84 );
85
86 let probe_loop = async {
87 loop {
88 let probe_ts = probe_ticker.tick().await;
89
90 let gtid_executed =
91 query_sys_var(&mut stats_conn, "global.gtid_executed").await?;
92 let upstream_frontier =
95 gtid_set_frontier(>id_executed).map_err(TransientError::from)?;
96
97 let offset_known = aggregate_mysql_frontier(&upstream_frontier);
98 for stat in config.statistics.values() {
99 stat.set_offset_known(offset_known);
100 }
101 probe_output.give(
102 &probe_cap[0],
103 Probe {
104 probe_ts,
105 upstream_frontier,
106 },
107 );
108 }
109 };
110 let commit_loop = async {
111 while let Some(committed_frontier) = resume_uppers.next().await {
112 let offset_committed = aggregate_mysql_frontier(&committed_frontier);
113 for stat in config.statistics.values() {
114 stat.set_offset_committed(offset_committed);
115 }
116 }
117 };
118
119 futures::future::join(probe_loop, commit_loop).await.0
120 })
121 });
122
123 (
124 transient_errors.map(ReplicationError::from),
125 probe_stream,
126 button.press_on_drop(),
127 )
128}
129
130fn aggregate_mysql_frontier(frontier: &Antichain<GtidPartition>) -> u64 {
133 let mut progress_stat = 0;
134 for ts in frontier.iter() {
135 if let Some(_uuid) = ts.interval().singleton() {
136 let ts = match ts.timestamp() {
138 GtidState::Absent => 0,
139 GtidState::Active(id) => id.get().saturating_sub(1),
142 };
143 progress_stat += ts;
144 }
145 }
146 progress_stat
147}