mz_storage/source/mysql/replication/
partitions.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Code related to tracking the frontier of GTID partitions for a MySQL source.
11
12use std::collections::BTreeMap;
13
14use timely::progress::Antichain;
15use uuid::Uuid;
16
17use mz_storage_types::sources::mysql::{GtidPartition, GtidState};
18
19use super::super::DefiniteError;
20
21/// Holds the active and future GTID partitions that represent the complete
22/// UUID range of all possible GTID source-ids from a MySQL server.
23/// The active partitions are all singleton partitions representing a single
24/// source-id timestamp, and the future partitions represent the missing
25/// UUID ranges that we have not yet seen and are held at timestamp GtidState::Absent.
26///
27/// This is used to keep track of all partitions and is updated as we receive
28/// new GTID updates from the server, and is used to create a full 'frontier'
29/// representing the current state of all GTID partitions that we can use
30/// to downgrade capabilities for the source.
31///
32/// We could instead mint capabilities for each individual partition
33/// and advance each partition as a frontier separately using its own capabilities,
34/// but since this is used inside a fallible operator if we ever hit any errors
35/// then all newly minted capabilities would be dropped by the async runtime
36/// and we might lose the ability to send any more data.
37/// However if we just use the main capabilities provided to the operator, the
38/// capabilities externally will be preserved even in the case of an error in
39/// the operator, which is why we just manage a single frontier and capability set.
40pub(super) struct GtidReplicationPartitions {
41    active: BTreeMap<Uuid, GtidPartition>,
42    future: Vec<GtidPartition>,
43}
44
45impl From<Antichain<GtidPartition>> for GtidReplicationPartitions {
46    fn from(frontier: Antichain<GtidPartition>) -> Self {
47        let mut active = BTreeMap::new();
48        let mut future = Vec::new();
49        for part in frontier.iter() {
50            if part.timestamp() == &GtidState::Absent {
51                future.push(part.clone());
52            } else {
53                let source_id = part.interval().singleton().unwrap().clone();
54                active.insert(source_id, part.clone());
55            }
56        }
57        Self { active, future }
58    }
59}
60
61impl GtidReplicationPartitions {
62    /// Return an Antichain for the frontier composed of all the
63    /// active and future GTID partitions.
64    pub(super) fn frontier(&self) -> Antichain<GtidPartition> {
65        Antichain::from_iter(
66            self.active
67                .values()
68                .cloned()
69                .chain(self.future.iter().cloned()),
70        )
71    }
72
73    /// Given a singleton GTID partition, update the timestamp of the existing
74    /// active partition with the same UUID
75    /// or split the future partitions to remove this new partition and then
76    /// insert the new 'active' partition.
77    ///
78    /// This is used whenever we receive a GTID update from the server and
79    /// need to update our state keeping track
80    /// of all the active and future GTID partition timestamps.
81    /// This call should usually be followed up by downgrading capabilities
82    /// using the frontier returned by `self.frontier()`
83    pub(super) fn advance_frontier(
84        &mut self,
85        new_part: GtidPartition,
86    ) -> Result<(), DefiniteError> {
87        let source_id = new_part.interval().singleton().unwrap();
88        // Check if we have an active partition for the GTID UUID
89        match self.active.get_mut(source_id) {
90            Some(active_part) => {
91                // Since we start replication at a specific upper, we
92                // should only see GTID transaction-ids
93                // in a monotonic order for each source, starting at that upper.
94                if active_part.timestamp() > new_part.timestamp() {
95                    let err = DefiniteError::BinlogGtidMonotonicityViolation(
96                        source_id.to_string(),
97                        new_part.timestamp().clone(),
98                    );
99                    return Err(err);
100                }
101
102                // replace this active partition with the new one
103                *active_part = new_part;
104            }
105            // We've received a GTID for a UUID we don't yet know about
106            None => {
107                // Extract the future partition whose range encompasses this UUID
108                // TODO: Replace with Vec::extract_if() once it's stabilized
109                let mut i = 0;
110                let mut contained_part = None;
111                while i < self.future.len() {
112                    if self.future[i].interval().contains(source_id) {
113                        contained_part = Some(self.future.remove(i));
114                        break;
115                    } else {
116                        i += 1;
117                    }
118                }
119                let contained_part =
120                    contained_part.expect("expected a future partition to contain the UUID");
121
122                // Split the future partition into partitions for before and after this UUID
123                // and add back to the future partitions.
124                let (before_range, after_range) = contained_part.split(source_id);
125                self.future
126                    .extend(before_range.into_iter().chain(after_range));
127
128                // Store the new part in our active partitions
129                self.active.insert(source_id.clone(), new_part);
130            }
131        };
132
133        Ok(())
134    }
135}