mz_storage/source/mysql/
statistics.rs1use futures::StreamExt;
13use timely::container::CapacityContainerBuilder;
14use timely::dataflow::operators::vec::Map;
15use timely::dataflow::{Scope, StreamVec};
16use timely::progress::Antichain;
17
18use mz_mysql_util::query_sys_var;
19use mz_ore::future::InTask;
20use mz_storage_types::sources::MySqlSourceConnection;
21use mz_storage_types::sources::mysql::{GtidPartition, GtidState, gtid_set_frontier};
22use mz_timely_util::builder_async::{OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton};
23
24use crate::source::types::Probe;
25use crate::source::{RawSourceCreationConfig, probe};
26
27use super::{ReplicationError, TransientError};
28
29static STATISTICS: &str = "statistics";
30
31pub(crate) fn render<G: Scope<Timestamp = GtidPartition>>(
33 scope: G,
34 config: RawSourceCreationConfig,
35 connection: MySqlSourceConnection,
36 resume_uppers: impl futures::Stream<Item = Antichain<GtidPartition>> + 'static,
37) -> (
38 StreamVec<G, ReplicationError>,
39 StreamVec<G, Probe<GtidPartition>>,
40 PressOnDropButton,
41) {
42 let op_name = format!("MySqlStatistics({})", config.id);
43 let mut builder = AsyncOperatorBuilder::new(op_name, scope);
44
45 let (probe_output, probe_stream) = builder.new_output::<CapacityContainerBuilder<_>>();
46
47 let (button, transient_errors) = builder.build_fallible::<TransientError, _>(move |caps| {
49 Box::pin(async move {
50 let worker_id = config.worker_id;
51 let [probe_cap]: &mut [_; 1] = caps.try_into().unwrap();
52
53 if !config.responsible_for(STATISTICS) {
55 for stat in config.statistics.values() {
57 stat.set_offset_known(0);
58 stat.set_offset_committed(0);
59 }
60 return Ok(());
61 }
62
63 let connection_config = connection
64 .connection
65 .config(
66 &config.config.connection_context.secrets_reader,
67 &config.config,
68 InTask::Yes,
69 )
70 .await?;
71
72 let mut stats_conn = connection_config
73 .connect(
74 &format!("timely-{worker_id} MySQL replication statistics"),
75 &config.config.connection_context.ssh_tunnel_manager,
76 )
77 .await?;
78
79 tokio::pin!(resume_uppers);
80 let timestamp_interval = config.timestamp_interval;
81 let mut probe_ticker = probe::Ticker::new(move || timestamp_interval, config.now_fn);
82
83 let probe_loop = async {
84 loop {
85 let probe_ts = probe_ticker.tick().await;
86
87 let gtid_executed =
88 query_sys_var(&mut stats_conn, "global.gtid_executed").await?;
89 let upstream_frontier =
92 gtid_set_frontier(>id_executed).map_err(TransientError::from)?;
93
94 let offset_known = aggregate_mysql_frontier(&upstream_frontier);
95 for stat in config.statistics.values() {
96 stat.set_offset_known(offset_known);
97 }
98 probe_output.give(
99 &probe_cap[0],
100 Probe {
101 probe_ts,
102 upstream_frontier,
103 },
104 );
105 }
106 };
107 let commit_loop = async {
108 while let Some(committed_frontier) = resume_uppers.next().await {
109 let offset_committed = aggregate_mysql_frontier(&committed_frontier);
110 for stat in config.statistics.values() {
111 stat.set_offset_committed(offset_committed);
112 }
113 }
114 };
115
116 futures::future::join(probe_loop, commit_loop).await.0
117 })
118 });
119
120 (
121 transient_errors.map(ReplicationError::from),
122 probe_stream,
123 button.press_on_drop(),
124 )
125}
126
127fn aggregate_mysql_frontier(frontier: &Antichain<GtidPartition>) -> u64 {
130 let mut progress_stat = 0;
131 for ts in frontier.iter() {
132 if let Some(_uuid) = ts.interval().singleton() {
133 let ts = match ts.timestamp() {
135 GtidState::Absent => 0,
136 GtidState::Active(id) => id.get().saturating_sub(1),
139 };
140 progress_stat += ts;
141 }
142 }
143 progress_stat
144}