mz_storage/source/mysql/
statistics.rs1use futures::StreamExt;
13use timely::container::CapacityContainerBuilder;
14use timely::dataflow::channels::pact::Pipeline;
15use timely::dataflow::operators::vec::Map;
16use timely::dataflow::{Scope, StreamVec};
17use timely::progress::Antichain;
18
19use mz_mysql_util::query_sys_var;
20use mz_ore::future::InTask;
21use mz_storage_types::sources::MySqlSourceConnection;
22use mz_storage_types::sources::mysql::{GtidPartition, GtidState, gtid_set_frontier};
23use mz_timely_util::builder_async::{
24 Event as AsyncEvent, OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton,
25};
26
27use crate::source::types::Probe;
28use crate::source::{RawSourceCreationConfig, probe};
29
30use super::{ReplicationError, TransientError};
31
32static STATISTICS: &str = "statistics";
33
34pub(crate) fn render<G: Scope<Timestamp = GtidPartition>>(
36 scope: G,
37 config: RawSourceCreationConfig,
38 connection: MySqlSourceConnection,
39 resume_uppers: impl futures::Stream<Item = Antichain<GtidPartition>> + 'static,
40 replication_errors: StreamVec<G, ReplicationError>,
41) -> (
42 StreamVec<G, ReplicationError>,
43 StreamVec<G, Probe<GtidPartition>>,
44 PressOnDropButton,
45) {
46 let op_name = format!("MySqlStatistics({})", config.id);
47 let mut builder = AsyncOperatorBuilder::new(op_name, scope);
48
49 let mut error_handle = builder.new_disconnected_input(replication_errors, Pipeline);
50
51 let (probe_output, probe_stream) = builder.new_output::<CapacityContainerBuilder<_>>();
52
53 let (button, transient_errors) = builder.build_fallible::<TransientError, _>(move |caps| {
55 Box::pin(async move {
56 let worker_id = config.worker_id;
57 let [probe_cap]: &mut [_; 1] = caps.try_into().unwrap();
58
59 if !config.responsible_for(STATISTICS) {
61 for stat in config.statistics.values() {
63 stat.set_offset_known(0);
64 stat.set_offset_committed(0);
65 }
66 return Ok(());
67 }
68
69 let connection_config = connection
70 .connection
71 .config(
72 &config.config.connection_context.secrets_reader,
73 &config.config,
74 InTask::Yes,
75 )
76 .await?;
77
78 let mut stats_conn = connection_config
79 .connect(
80 &format!("timely-{worker_id} MySQL replication statistics"),
81 &config.config.connection_context.ssh_tunnel_manager,
82 )
83 .await?;
84
85 tokio::pin!(resume_uppers);
86 let timestamp_interval = config.timestamp_interval;
87 let mut probe_ticker = probe::Ticker::new(move || timestamp_interval, config.now_fn);
88
89 let probe_loop = async {
90 loop {
91 let probe_ts = probe_ticker.tick().await;
92
93 let gtid_executed =
94 query_sys_var(&mut stats_conn, "global.gtid_executed").await?;
95 let upstream_frontier =
98 gtid_set_frontier(>id_executed).map_err(TransientError::from)?;
99
100 let offset_known = aggregate_mysql_frontier(&upstream_frontier);
101 for stat in config.statistics.values() {
102 stat.set_offset_known(offset_known);
103 }
104 probe_output.give(
105 &probe_cap[0],
106 Probe {
107 probe_ts,
108 upstream_frontier,
109 },
110 );
111 }
112 };
113 let commit_loop = async {
114 while let Some(committed_frontier) = resume_uppers.next().await {
115 let offset_committed = aggregate_mysql_frontier(&committed_frontier);
116 for stat in config.statistics.values() {
117 stat.set_offset_committed(offset_committed);
118 }
119 }
120 };
121 let error_loop = async {
122 while let Some(event) = error_handle.next().await {
123 if let AsyncEvent::Data(ts, err_data) = event {
124 for err in err_data {
125 if let ReplicationError::Definite(def_err) = err {
126 tracing::info!(
127 "ts: {:?} Definite replication error detected in statistics operator: {def_err}, exiting", ts
128 );
129 break;
130 }
131 }
132 }
133 }
134 tracing::info!("Replication error stream closed, exiting statistics loop");
135 Ok(())
136 };
137 let res = tokio::select! {
138 res = probe_loop => res,
139 res = commit_loop => Ok(res),
140 res = error_loop => res,
141 };
142 tracing::info!("Statistics loop exited, shutting down");
143 res
144 })
145 });
146
147 (
148 transient_errors.map(ReplicationError::from),
149 probe_stream,
150 button.press_on_drop(),
151 )
152}
153
154fn aggregate_mysql_frontier(frontier: &Antichain<GtidPartition>) -> u64 {
157 let mut progress_stat = 0;
158 for ts in frontier.iter() {
159 if let Some(_uuid) = ts.interval().singleton() {
160 let ts = match ts.timestamp() {
162 GtidState::Absent => 0,
163 GtidState::Active(id) => id.get().saturating_sub(1),
166 };
167 progress_stat += ts;
168 }
169 }
170 progress_stat
171}