mz_storage/source/mysql/
statistics.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Renders the statistics collection of the [`MySqlSourceConnection`] ingestion dataflow.
11
12use std::cell::{Cell, RefCell};
13
14use futures::StreamExt;
15use mz_storage_types::dyncfgs::MYSQL_OFFSET_KNOWN_INTERVAL;
16use timely::dataflow::operators::Map;
17use timely::dataflow::{Scope, Stream};
18use timely::progress::Antichain;
19
20use mz_mysql_util::query_sys_var;
21use mz_ore::future::InTask;
22use mz_storage_types::sources::MySqlSourceConnection;
23use mz_storage_types::sources::mysql::{GtidPartition, GtidState, gtid_set_frontier};
24use mz_timely_util::builder_async::{OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton};
25
26use crate::source::types::{Probe, ProgressStatisticsUpdate};
27use crate::source::{RawSourceCreationConfig, probe};
28
29use super::{ReplicationError, TransientError};
30
31static STATISTICS: &str = "statistics";
32
33/// Renders the statistics dataflow.
34pub(crate) fn render<G: Scope<Timestamp = GtidPartition>>(
35    scope: G,
36    config: RawSourceCreationConfig,
37    connection: MySqlSourceConnection,
38    resume_uppers: impl futures::Stream<Item = Antichain<GtidPartition>> + 'static,
39) -> (
40    Stream<G, ProgressStatisticsUpdate>,
41    Stream<G, ReplicationError>,
42    Stream<G, Probe<GtidPartition>>,
43    PressOnDropButton,
44) {
45    let op_name = format!("MySqlStatistics({})", config.id);
46    let mut builder = AsyncOperatorBuilder::new(op_name, scope);
47
48    let (stats_output, stats_stream) = builder.new_output();
49    let (probe_output, probe_stream) = builder.new_output();
50
51    // TODO: Add additional metrics
52
53    let (button, transient_errors) = builder.build_fallible::<TransientError, _>(move |caps| {
54        Box::pin(async move {
55            let worker_id = config.worker_id;
56            let [stats_cap, probe_cap]: &mut [_; 2] = caps.try_into().unwrap();
57
58            // Only run the replication reader on the worker responsible for it.
59            if !config.responsible_for(STATISTICS) {
60                // Emit 0, to mark this worker as having started up correctly.
61                stats_output.give(
62                    &stats_cap[0],
63                    ProgressStatisticsUpdate::SteadyState {
64                        offset_known: 0,
65                        offset_committed: 0,
66                    },
67                );
68                return Ok(());
69            }
70
71            let connection_config = connection
72                .connection
73                .config(
74                    &config.config.connection_context.secrets_reader,
75                    &config.config,
76                    InTask::Yes,
77                )
78                .await?;
79
80            let mut stats_conn = connection_config
81                .connect(
82                    &format!("timely-{worker_id} MySQL replication statistics"),
83                    &config.config.connection_context.ssh_tunnel_manager,
84                )
85                .await?;
86
87            tokio::pin!(resume_uppers);
88
89            let prev_offset_known = Cell::new(None);
90            let prev_offset_committed = Cell::new(None);
91            let stats_output = RefCell::new(stats_output);
92
93            let mut probe_ticker = probe::Ticker::new(
94                || MYSQL_OFFSET_KNOWN_INTERVAL.get(config.config.config_set()),
95                config.now_fn,
96            );
97            let probe_loop = async {
98                loop {
99                    let probe_ts = probe_ticker.tick().await;
100
101                    let gtid_executed =
102                        query_sys_var(&mut stats_conn, "global.gtid_executed").await?;
103                    // We don't translate this into a definite error like in snapshotting, but we
104                    // will restart the source.
105                    let upstream_frontier =
106                        gtid_set_frontier(&gtid_executed).map_err(TransientError::from)?;
107
108                    let offset_known = aggregate_mysql_frontier(&upstream_frontier);
109                    if let Some(offset_committed) = prev_offset_committed.get() {
110                        stats_output.borrow_mut().give(
111                            &stats_cap[0],
112                            ProgressStatisticsUpdate::SteadyState {
113                                offset_known,
114                                offset_committed,
115                            },
116                        );
117                    }
118                    prev_offset_known.set(Some(offset_known));
119
120                    probe_output.give(
121                        &probe_cap[0],
122                        Probe {
123                            probe_ts,
124                            upstream_frontier,
125                        },
126                    );
127                }
128            };
129            let commit_loop = async {
130                while let Some(committed_frontier) = resume_uppers.next().await {
131                    let offset_committed = aggregate_mysql_frontier(&committed_frontier);
132                    if let Some(offset_known) = prev_offset_known.get() {
133                        stats_output.borrow_mut().give(
134                            &stats_cap[0],
135                            ProgressStatisticsUpdate::SteadyState {
136                                offset_known,
137                                offset_committed,
138                            },
139                        );
140                    }
141                    prev_offset_committed.set(Some(offset_committed));
142                }
143            };
144
145            futures::future::join(probe_loop, commit_loop).await.0
146        })
147    });
148
149    (
150        stats_stream,
151        transient_errors.map(ReplicationError::from),
152        probe_stream,
153        button.press_on_drop(),
154    )
155}
156
157/// Aggregate a mysql frontier into single number representing the
158/// _number of transactions_ it represents.
159fn aggregate_mysql_frontier(frontier: &Antichain<GtidPartition>) -> u64 {
160    let mut progress_stat = 0;
161    for ts in frontier.iter() {
162        if let Some(_uuid) = ts.interval().singleton() {
163            // We assume source id's don't disappear once they appear.
164            let ts = match ts.timestamp() {
165                GtidState::Absent => 0,
166                // Txid's in mysql start at 1, so we subtract 1 from the _frontier_
167                // to get the _number of transactions_.
168                GtidState::Active(id) => id.get().saturating_sub(1),
169            };
170            progress_stat += ts;
171        }
172    }
173    progress_stat
174}