mz_storage/source/mysql/replication/
context.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::{BTreeMap, BTreeSet};
11use std::pin::Pin;
12
13use mysql_async::BinlogStream;
14use mz_storage_types::errors::DataflowError;
15use timely::dataflow::operators::{Capability, CapabilitySet};
16use timely::progress::Antichain;
17use tracing::trace;
18
19use mz_mysql_util::Config;
20use mz_storage_types::sources::mysql::{GtidPartition, GtidState};
21
22use crate::metrics::source::mysql::MySqlSourceMetrics;
23use crate::source::RawSourceCreationConfig;
24use crate::source::mysql::{
25    MySqlTableName, RewindRequest, SourceOutputInfo, StackedAsyncOutputHandle,
26};
27use crate::source::types::SourceMessage;
28
29/// A container to hold various context information for the replication process, used when
30/// processing events from the binlog stream.
31pub(super) struct ReplContext<'a> {
32    pub(super) config: &'a RawSourceCreationConfig,
33    pub(super) connection_config: &'a Config,
34    pub(super) stream: Pin<&'a mut futures::stream::Peekable<BinlogStream>>,
35    pub(super) table_info: &'a BTreeMap<MySqlTableName, Vec<SourceOutputInfo>>,
36    pub(super) metrics: &'a MySqlSourceMetrics,
37    pub(super) data_output: &'a mut StackedAsyncOutputHandle<
38        GtidPartition,
39        (usize, Result<SourceMessage, DataflowError>),
40    >,
41    pub(super) data_cap_set: &'a mut CapabilitySet<GtidPartition>,
42    pub(super) upper_cap_set: &'a mut CapabilitySet<GtidPartition>,
43    // Owned values:
44    pub(super) rewinds: BTreeMap<usize, (Capability<GtidPartition>, RewindRequest)>,
45    pub(super) errored_outputs: BTreeSet<usize>,
46}
47
48impl<'a> ReplContext<'a> {
49    pub(super) fn new(
50        config: &'a RawSourceCreationConfig,
51        connection_config: &'a Config,
52        stream: Pin<&'a mut futures::stream::Peekable<BinlogStream>>,
53        table_info: &'a BTreeMap<MySqlTableName, Vec<SourceOutputInfo>>,
54        metrics: &'a MySqlSourceMetrics,
55        data_output: &'a mut StackedAsyncOutputHandle<
56            GtidPartition,
57            (usize, Result<SourceMessage, DataflowError>),
58        >,
59        data_cap_set: &'a mut CapabilitySet<GtidPartition>,
60        upper_cap_set: &'a mut CapabilitySet<GtidPartition>,
61        rewinds: BTreeMap<usize, (Capability<GtidPartition>, RewindRequest)>,
62    ) -> Self {
63        Self {
64            config,
65            connection_config,
66            stream,
67            table_info,
68            metrics,
69            data_output,
70            data_cap_set,
71            upper_cap_set,
72            rewinds,
73            errored_outputs: BTreeSet::new(),
74        }
75    }
76
77    /// Advances the frontier of the data capability set to `new_upper`,
78    /// and drops any existing rewind requests that are no longer applicable.
79    pub(super) fn downgrade_data_cap_set(
80        &mut self,
81        reason: &str,
82        new_upper: Antichain<GtidPartition>,
83    ) {
84        let (id, worker_id) = (self.config.id, self.config.worker_id);
85
86        trace!(%id, "timely-{worker_id} [{reason}] advancing data frontier to {new_upper:?}");
87
88        self.data_cap_set.downgrade(&*new_upper);
89
90        self.metrics.gtid_txids.set(
91            new_upper
92                .iter()
93                .filter_map(|part| match part.timestamp() {
94                    GtidState::Absent => None,
95                    GtidState::Active(tx_id) => Some(tx_id.get()),
96                })
97                .sum(),
98        );
99
100        self.rewinds.retain(|_, (_, req)| {
101            // We need to retain the rewind requests whose snapshot_upper
102            // has at least one timestamp such that new_upper is less than
103            // that timestamp
104            let res = req.snapshot_upper.iter().any(|ts| new_upper.less_than(ts));
105            if !res {
106                trace!(%id, "timely-{worker_id} removing rewind request {req:?}");
107            }
108            res
109        });
110    }
111
112    /// Advances the frontier of the upper capability set to `new_upper`,
113    pub(super) fn downgrade_progress_cap_set(
114        &mut self,
115        reason: &str,
116        new_upper: Antichain<GtidPartition>,
117    ) {
118        let (id, worker_id) = (self.config.id, self.config.worker_id);
119        trace!(%id, "timely-{worker_id} [{reason}] advancing progress frontier to {new_upper:?}");
120        self.upper_cap_set.downgrade(&*new_upper);
121    }
122}