Skip to main content

mz_storage/source/mysql/replication/
context.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::{BTreeMap, BTreeSet};
11use std::pin::Pin;
12
13use mysql_async::BinlogStream;
14use mz_storage_types::errors::DataflowError;
15use timely::dataflow::operators::{Capability, CapabilitySet};
16use timely::progress::Antichain;
17use tracing::trace;
18
19use mz_mysql_util::Config;
20use mz_storage_types::sources::mysql::{GtidPartition, GtidState};
21
22use crate::metrics::source::mysql::MySqlSourceMetrics;
23use crate::source::RawSourceCreationConfig;
24use crate::source::mysql::{
25    MySqlTableName, RewindRequest, SourceOutputInfo, StackedAsyncOutputHandle,
26};
27use crate::source::types::SourceMessage;
28
29/// A container to hold various context information for the replication process, used when
30/// processing events from the binlog stream.
31pub(super) struct ReplContext<'a> {
32    pub(super) config: &'a RawSourceCreationConfig,
33    pub(super) connection_config: &'a Config,
34    pub(super) stream: Pin<&'a mut futures::stream::Peekable<BinlogStream>>,
35    pub(super) table_info: &'a BTreeMap<MySqlTableName, Vec<SourceOutputInfo>>,
36    pub(super) metrics: &'a MySqlSourceMetrics,
37    pub(super) data_output: &'a mut StackedAsyncOutputHandle<
38        GtidPartition,
39        (usize, Result<SourceMessage, DataflowError>),
40    >,
41    pub(super) data_cap_set: &'a mut CapabilitySet<GtidPartition>,
42    // Owned values:
43    pub(super) rewinds: BTreeMap<usize, (Capability<GtidPartition>, RewindRequest)>,
44    pub(super) errored_outputs: BTreeSet<usize>,
45}
46
47impl<'a> ReplContext<'a> {
48    pub(super) fn new(
49        config: &'a RawSourceCreationConfig,
50        connection_config: &'a Config,
51        stream: Pin<&'a mut futures::stream::Peekable<BinlogStream>>,
52        table_info: &'a BTreeMap<MySqlTableName, Vec<SourceOutputInfo>>,
53        metrics: &'a MySqlSourceMetrics,
54        data_output: &'a mut StackedAsyncOutputHandle<
55            GtidPartition,
56            (usize, Result<SourceMessage, DataflowError>),
57        >,
58        data_cap_set: &'a mut CapabilitySet<GtidPartition>,
59        rewinds: BTreeMap<usize, (Capability<GtidPartition>, RewindRequest)>,
60    ) -> Self {
61        Self {
62            config,
63            connection_config,
64            stream,
65            table_info,
66            metrics,
67            data_output,
68            data_cap_set,
69            rewinds,
70            errored_outputs: BTreeSet::new(),
71        }
72    }
73
74    /// Advances the frontier of the data capability set to `new_upper`,
75    /// and drops any existing rewind requests that are no longer applicable.
76    pub(super) fn downgrade_data_cap_set(
77        &mut self,
78        reason: &str,
79        new_upper: Antichain<GtidPartition>,
80    ) {
81        let (id, worker_id) = (self.config.id, self.config.worker_id);
82
83        trace!(%id, "timely-{worker_id} [{reason}] advancing data frontier to {new_upper:?}");
84
85        self.data_cap_set.downgrade(&*new_upper);
86
87        self.metrics.gtid_txids.set(
88            new_upper
89                .iter()
90                .filter_map(|part| match part.timestamp() {
91                    GtidState::Absent => None,
92                    GtidState::Active(tx_id) => Some(tx_id.get()),
93                })
94                .sum(),
95        );
96
97        self.rewinds.retain(|_, (_, req)| {
98            // We need to retain the rewind requests whose snapshot_upper
99            // has at least one timestamp such that new_upper is less than
100            // that timestamp
101            let res = req.snapshot_upper.iter().any(|ts| new_upper.less_than(ts));
102            if !res {
103                trace!(%id, "timely-{worker_id} removing rewind request {req:?}");
104            }
105            res
106        });
107    }
108}