mz_storage/source/mysql/
statistics.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Renders the statistics collection of the [`MySqlSourceConnection`] ingestion dataflow.
11
12use futures::StreamExt;
13use mz_storage_types::dyncfgs::MYSQL_OFFSET_KNOWN_INTERVAL;
14use timely::dataflow::operators::Map;
15use timely::dataflow::{Scope, Stream};
16use timely::progress::Antichain;
17
18use mz_mysql_util::query_sys_var;
19use mz_ore::future::InTask;
20use mz_storage_types::sources::MySqlSourceConnection;
21use mz_storage_types::sources::mysql::{GtidPartition, GtidState, gtid_set_frontier};
22use mz_timely_util::builder_async::{OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton};
23
24use crate::source::types::Probe;
25use crate::source::{RawSourceCreationConfig, probe};
26
27use super::{ReplicationError, TransientError};
28
29static STATISTICS: &str = "statistics";
30
31/// Renders the statistics dataflow.
32pub(crate) fn render<G: Scope<Timestamp = GtidPartition>>(
33    scope: G,
34    config: RawSourceCreationConfig,
35    connection: MySqlSourceConnection,
36    resume_uppers: impl futures::Stream<Item = Antichain<GtidPartition>> + 'static,
37) -> (
38    Stream<G, ReplicationError>,
39    Stream<G, Probe<GtidPartition>>,
40    PressOnDropButton,
41) {
42    let op_name = format!("MySqlStatistics({})", config.id);
43    let mut builder = AsyncOperatorBuilder::new(op_name, scope);
44
45    let (probe_output, probe_stream) = builder.new_output();
46
47    // TODO: Add additional metrics
48    let (button, transient_errors) = builder.build_fallible::<TransientError, _>(move |caps| {
49        Box::pin(async move {
50            let worker_id = config.worker_id;
51            let [probe_cap]: &mut [_; 1] = caps.try_into().unwrap();
52
53            // Only run the replication reader on the worker responsible for it.
54            if !config.responsible_for(STATISTICS) {
55                // Emit 0, to mark this worker as having started up correctly.
56                for stat in config.statistics.values() {
57                    stat.set_offset_known(0);
58                    stat.set_offset_committed(0);
59                }
60                return Ok(());
61            }
62
63            let connection_config = connection
64                .connection
65                .config(
66                    &config.config.connection_context.secrets_reader,
67                    &config.config,
68                    InTask::Yes,
69                )
70                .await?;
71
72            let mut stats_conn = connection_config
73                .connect(
74                    &format!("timely-{worker_id} MySQL replication statistics"),
75                    &config.config.connection_context.ssh_tunnel_manager,
76                )
77                .await?;
78
79            tokio::pin!(resume_uppers);
80            let mut probe_ticker = probe::Ticker::new(
81                || MYSQL_OFFSET_KNOWN_INTERVAL.get(config.config.config_set()),
82                config.now_fn,
83            );
84
85            let probe_loop = async {
86                loop {
87                    let probe_ts = probe_ticker.tick().await;
88
89                    let gtid_executed =
90                        query_sys_var(&mut stats_conn, "global.gtid_executed").await?;
91                    // We don't translate this into a definite error like in snapshotting, but we
92                    // will restart the source.
93                    let upstream_frontier =
94                        gtid_set_frontier(&gtid_executed).map_err(TransientError::from)?;
95
96                    let offset_known = aggregate_mysql_frontier(&upstream_frontier);
97                    for stat in config.statistics.values() {
98                        stat.set_offset_known(offset_known);
99                    }
100                    probe_output.give(
101                        &probe_cap[0],
102                        Probe {
103                            probe_ts,
104                            upstream_frontier,
105                        },
106                    );
107                }
108            };
109            let commit_loop = async {
110                while let Some(committed_frontier) = resume_uppers.next().await {
111                    let offset_committed = aggregate_mysql_frontier(&committed_frontier);
112                    for stat in config.statistics.values() {
113                        stat.set_offset_committed(offset_committed);
114                    }
115                }
116            };
117
118            futures::future::join(probe_loop, commit_loop).await.0
119        })
120    });
121
122    (
123        transient_errors.map(ReplicationError::from),
124        probe_stream,
125        button.press_on_drop(),
126    )
127}
128
129/// Aggregate a mysql frontier into single number representing the
130/// _number of transactions_ it represents.
131fn aggregate_mysql_frontier(frontier: &Antichain<GtidPartition>) -> u64 {
132    let mut progress_stat = 0;
133    for ts in frontier.iter() {
134        if let Some(_uuid) = ts.interval().singleton() {
135            // We assume source id's don't disappear once they appear.
136            let ts = match ts.timestamp() {
137                GtidState::Absent => 0,
138                // Txid's in mysql start at 1, so we subtract 1 from the _frontier_
139                // to get the _number of transactions_.
140                GtidState::Active(id) => id.get().saturating_sub(1),
141            };
142            progress_stat += ts;
143        }
144    }
145    progress_stat
146}