Skip to main content

mz_storage/source/mysql/
statistics.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Renders the statistics collection of the [`MySqlSourceConnection`] ingestion dataflow.
11
12use futures::StreamExt;
13use timely::container::CapacityContainerBuilder;
14use timely::dataflow::channels::pact::Pipeline;
15use timely::dataflow::operators::vec::Map;
16use timely::dataflow::{Scope, StreamVec};
17use timely::progress::Antichain;
18
19use mz_mysql_util::query_sys_var;
20use mz_ore::future::InTask;
21use mz_storage_types::sources::MySqlSourceConnection;
22use mz_storage_types::sources::mysql::{GtidPartition, GtidState, gtid_set_frontier};
23use mz_timely_util::builder_async::{
24    Event as AsyncEvent, OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton,
25};
26
27use crate::source::types::Probe;
28use crate::source::{RawSourceCreationConfig, probe};
29
30use super::{ReplicationError, TransientError};
31
32static STATISTICS: &str = "statistics";
33
34/// Renders the statistics dataflow.
35pub(crate) fn render<G: Scope<Timestamp = GtidPartition>>(
36    scope: G,
37    config: RawSourceCreationConfig,
38    connection: MySqlSourceConnection,
39    resume_uppers: impl futures::Stream<Item = Antichain<GtidPartition>> + 'static,
40    replication_errors: StreamVec<G, ReplicationError>,
41) -> (
42    StreamVec<G, ReplicationError>,
43    StreamVec<G, Probe<GtidPartition>>,
44    PressOnDropButton,
45) {
46    let op_name = format!("MySqlStatistics({})", config.id);
47    let mut builder = AsyncOperatorBuilder::new(op_name, scope);
48
49    let mut error_handle = builder.new_disconnected_input(replication_errors, Pipeline);
50
51    let (probe_output, probe_stream) = builder.new_output::<CapacityContainerBuilder<_>>();
52
53    // TODO: Add additional metrics
54    let (button, transient_errors) = builder.build_fallible::<TransientError, _>(move |caps| {
55        Box::pin(async move {
56            let worker_id = config.worker_id;
57            let [probe_cap]: &mut [_; 1] = caps.try_into().unwrap();
58
59            // Only run the replication reader on the worker responsible for it.
60            if !config.responsible_for(STATISTICS) {
61                // Emit 0, to mark this worker as having started up correctly.
62                for stat in config.statistics.values() {
63                    stat.set_offset_known(0);
64                    stat.set_offset_committed(0);
65                }
66                return Ok(());
67            }
68
69            let connection_config = connection
70                .connection
71                .config(
72                    &config.config.connection_context.secrets_reader,
73                    &config.config,
74                    InTask::Yes,
75                )
76                .await?;
77
78            let mut stats_conn = connection_config
79                .connect(
80                    &format!("timely-{worker_id} MySQL replication statistics"),
81                    &config.config.connection_context.ssh_tunnel_manager,
82                )
83                .await?;
84
85            tokio::pin!(resume_uppers);
86            let timestamp_interval = config.timestamp_interval;
87            let mut probe_ticker = probe::Ticker::new(move || timestamp_interval, config.now_fn);
88
89            let probe_loop = async {
90                loop {
91                    let probe_ts = probe_ticker.tick().await;
92
93                    let gtid_executed =
94                        query_sys_var(&mut stats_conn, "global.gtid_executed").await?;
95                    // We don't translate this into a definite error like in snapshotting, but we
96                    // will restart the source.
97                    let upstream_frontier =
98                        gtid_set_frontier(&gtid_executed).map_err(TransientError::from)?;
99
100                    let offset_known = aggregate_mysql_frontier(&upstream_frontier);
101                    for stat in config.statistics.values() {
102                        stat.set_offset_known(offset_known);
103                    }
104                    probe_output.give(
105                        &probe_cap[0],
106                        Probe {
107                            probe_ts,
108                            upstream_frontier,
109                        },
110                    );
111                }
112            };
113            let commit_loop = async {
114                while let Some(committed_frontier) = resume_uppers.next().await {
115                    let offset_committed = aggregate_mysql_frontier(&committed_frontier);
116                    for stat in config.statistics.values() {
117                        stat.set_offset_committed(offset_committed);
118                    }
119                }
120            };
121            let error_loop = async {
122                while let Some(event) = error_handle.next().await {
123                    if let AsyncEvent::Data(ts, err_data) = event {
124                        for err in err_data {
125                            if let ReplicationError::Definite(def_err) = err {
126                                tracing::info!(
127                                    "ts: {:?} Definite replication error detected in statistics operator: {def_err}, exiting", ts
128                                );
129                                break;
130                            }
131                        }
132                    }
133                }
134                tracing::info!("Replication error stream closed, exiting statistics loop");
135                Ok(())
136            };
137            let res = tokio::select! {
138                res = probe_loop => res,
139                res = commit_loop => Ok(res),
140                res = error_loop => res,
141            };
142            tracing::info!("Statistics loop exited, shutting down");
143            res
144        })
145    });
146
147    (
148        transient_errors.map(ReplicationError::from),
149        probe_stream,
150        button.press_on_drop(),
151    )
152}
153
154/// Aggregate a mysql frontier into single number representing the
155/// _number of transactions_ it represents.
156fn aggregate_mysql_frontier(frontier: &Antichain<GtidPartition>) -> u64 {
157    let mut progress_stat = 0;
158    for ts in frontier.iter() {
159        if let Some(_uuid) = ts.interval().singleton() {
160            // We assume source id's don't disappear once they appear.
161            let ts = match ts.timestamp() {
162                GtidState::Absent => 0,
163                // Txid's in mysql start at 1, so we subtract 1 from the _frontier_
164                // to get the _number of transactions_.
165                GtidState::Active(id) => id.get().saturating_sub(1),
166            };
167            progress_stat += ts;
168        }
169    }
170    progress_stat
171}