mz_storage/source/mysql/
statistics.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Renders the statistics collection of the [`MySqlSourceConnection`] ingestion dataflow.
11
12use futures::StreamExt;
13use mz_storage_types::dyncfgs::MYSQL_OFFSET_KNOWN_INTERVAL;
14use timely::container::CapacityContainerBuilder;
15use timely::dataflow::operators::Map;
16use timely::dataflow::{Scope, Stream};
17use timely::progress::Antichain;
18
19use mz_mysql_util::query_sys_var;
20use mz_ore::future::InTask;
21use mz_storage_types::sources::MySqlSourceConnection;
22use mz_storage_types::sources::mysql::{GtidPartition, GtidState, gtid_set_frontier};
23use mz_timely_util::builder_async::{OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton};
24
25use crate::source::types::Probe;
26use crate::source::{RawSourceCreationConfig, probe};
27
28use super::{ReplicationError, TransientError};
29
30static STATISTICS: &str = "statistics";
31
32/// Renders the statistics dataflow.
33pub(crate) fn render<G: Scope<Timestamp = GtidPartition>>(
34    scope: G,
35    config: RawSourceCreationConfig,
36    connection: MySqlSourceConnection,
37    resume_uppers: impl futures::Stream<Item = Antichain<GtidPartition>> + 'static,
38) -> (
39    Stream<G, ReplicationError>,
40    Stream<G, Probe<GtidPartition>>,
41    PressOnDropButton,
42) {
43    let op_name = format!("MySqlStatistics({})", config.id);
44    let mut builder = AsyncOperatorBuilder::new(op_name, scope);
45
46    let (probe_output, probe_stream) = builder.new_output::<CapacityContainerBuilder<_>>();
47
48    // TODO: Add additional metrics
49    let (button, transient_errors) = builder.build_fallible::<TransientError, _>(move |caps| {
50        Box::pin(async move {
51            let worker_id = config.worker_id;
52            let [probe_cap]: &mut [_; 1] = caps.try_into().unwrap();
53
54            // Only run the replication reader on the worker responsible for it.
55            if !config.responsible_for(STATISTICS) {
56                // Emit 0, to mark this worker as having started up correctly.
57                for stat in config.statistics.values() {
58                    stat.set_offset_known(0);
59                    stat.set_offset_committed(0);
60                }
61                return Ok(());
62            }
63
64            let connection_config = connection
65                .connection
66                .config(
67                    &config.config.connection_context.secrets_reader,
68                    &config.config,
69                    InTask::Yes,
70                )
71                .await?;
72
73            let mut stats_conn = connection_config
74                .connect(
75                    &format!("timely-{worker_id} MySQL replication statistics"),
76                    &config.config.connection_context.ssh_tunnel_manager,
77                )
78                .await?;
79
80            tokio::pin!(resume_uppers);
81            let mut probe_ticker = probe::Ticker::new(
82                || MYSQL_OFFSET_KNOWN_INTERVAL.get(config.config.config_set()),
83                config.now_fn,
84            );
85
86            let probe_loop = async {
87                loop {
88                    let probe_ts = probe_ticker.tick().await;
89
90                    let gtid_executed =
91                        query_sys_var(&mut stats_conn, "global.gtid_executed").await?;
92                    // We don't translate this into a definite error like in snapshotting, but we
93                    // will restart the source.
94                    let upstream_frontier =
95                        gtid_set_frontier(&gtid_executed).map_err(TransientError::from)?;
96
97                    let offset_known = aggregate_mysql_frontier(&upstream_frontier);
98                    for stat in config.statistics.values() {
99                        stat.set_offset_known(offset_known);
100                    }
101                    probe_output.give(
102                        &probe_cap[0],
103                        Probe {
104                            probe_ts,
105                            upstream_frontier,
106                        },
107                    );
108                }
109            };
110            let commit_loop = async {
111                while let Some(committed_frontier) = resume_uppers.next().await {
112                    let offset_committed = aggregate_mysql_frontier(&committed_frontier);
113                    for stat in config.statistics.values() {
114                        stat.set_offset_committed(offset_committed);
115                    }
116                }
117            };
118
119            futures::future::join(probe_loop, commit_loop).await.0
120        })
121    });
122
123    (
124        transient_errors.map(ReplicationError::from),
125        probe_stream,
126        button.press_on_drop(),
127    )
128}
129
130/// Aggregate a mysql frontier into single number representing the
131/// _number of transactions_ it represents.
132fn aggregate_mysql_frontier(frontier: &Antichain<GtidPartition>) -> u64 {
133    let mut progress_stat = 0;
134    for ts in frontier.iter() {
135        if let Some(_uuid) = ts.interval().singleton() {
136            // We assume source id's don't disappear once they appear.
137            let ts = match ts.timestamp() {
138                GtidState::Absent => 0,
139                // Txid's in mysql start at 1, so we subtract 1 from the _frontier_
140                // to get the _number of transactions_.
141                GtidState::Active(id) => id.get().saturating_sub(1),
142            };
143            progress_stat += ts;
144        }
145    }
146    progress_stat
147}