mz_storage/source/mysql/replication/partitions.rs
1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Code related to tracking the frontier of GTID partitions for a MySQL source.
11
12use std::collections::BTreeMap;
13
14use timely::progress::Antichain;
15use uuid::Uuid;
16
17use mz_storage_types::sources::mysql::{GtidPartition, GtidState};
18
19use super::super::DefiniteError;
20
21/// Holds the active and future GTID partitions that represent the complete
22/// UUID range of all possible GTID source-ids from a MySQL server.
23/// The active partitions are all singleton partitions representing a single
24/// source-id timestamp, and the future partitions represent the missing
25/// UUID ranges that we have not yet seen and are held at timestamp GtidState::Absent.
26///
27/// This is used to keep track of all partitions and is updated as we receive
28/// new GTID updates from the server, and is used to create a full 'frontier'
29/// representing the current state of all GTID partitions that we can use
30/// to downgrade capabilities for the source.
31///
32/// We could instead mint capabilities for each individual partition
33/// and advance each partition as a frontier separately using its own capabilities,
34/// but since this is used inside a fallible operator if we ever hit any errors
35/// then all newly minted capabilities would be dropped by the async runtime
36/// and we might lose the ability to send any more data.
37/// However if we just use the main capabilities provided to the operator, the
38/// capabilities externally will be preserved even in the case of an error in
39/// the operator, which is why we just manage a single frontier and capability set.
40pub(super) struct GtidReplicationPartitions {
41 active: BTreeMap<Uuid, GtidPartition>,
42 future: Vec<GtidPartition>,
43}
44
45impl From<Antichain<GtidPartition>> for GtidReplicationPartitions {
46 fn from(frontier: Antichain<GtidPartition>) -> Self {
47 let mut active = BTreeMap::new();
48 let mut future = Vec::new();
49 for part in frontier.iter() {
50 if part.timestamp() == &GtidState::Absent {
51 future.push(part.clone());
52 } else {
53 let source_id = part.interval().singleton().unwrap().clone();
54 active.insert(source_id, part.clone());
55 }
56 }
57 Self { active, future }
58 }
59}
60
61impl GtidReplicationPartitions {
62 /// Return an Antichain for the frontier composed of all the
63 /// active and future GTID partitions.
64 pub(super) fn frontier(&self) -> Antichain<GtidPartition> {
65 Antichain::from_iter(
66 self.active
67 .values()
68 .cloned()
69 .chain(self.future.iter().cloned()),
70 )
71 }
72
73 /// Given a singleton GTID partition, update the timestamp of the existing
74 /// active partition with the same UUID
75 /// or split the future partitions to remove this new partition and then
76 /// insert the new 'active' partition.
77 ///
78 /// This is used whenever we receive a GTID update from the server and
79 /// need to update our state keeping track
80 /// of all the active and future GTID partition timestamps.
81 /// This call should usually be followed up by downgrading capabilities
82 /// using the frontier returned by `self.frontier()`
83 pub(super) fn advance_frontier(
84 &mut self,
85 new_part: GtidPartition,
86 ) -> Result<(), DefiniteError> {
87 let source_id = new_part.interval().singleton().unwrap();
88 // Check if we have an active partition for the GTID UUID
89 match self.active.get_mut(source_id) {
90 Some(active_part) => {
91 // Since we start replication at a specific upper, we
92 // should only see GTID transaction-ids
93 // in a monotonic order for each source, starting at that upper.
94 if active_part.timestamp() > new_part.timestamp() {
95 let err = DefiniteError::BinlogGtidMonotonicityViolation(
96 source_id.to_string(),
97 new_part.timestamp().clone(),
98 );
99 return Err(err);
100 }
101
102 // replace this active partition with the new one
103 *active_part = new_part;
104 }
105 // We've received a GTID for a UUID we don't yet know about
106 None => {
107 // Extract the future partition whose range encompasses this UUID
108 // TODO: Replace with Vec::extract_if() once it's stabilized
109 let mut i = 0;
110 let mut contained_part = None;
111 while i < self.future.len() {
112 if self.future[i].interval().contains(source_id) {
113 contained_part = Some(self.future.remove(i));
114 break;
115 } else {
116 i += 1;
117 }
118 }
119 let contained_part =
120 contained_part.expect("expected a future partition to contain the UUID");
121
122 // Split the future partition into partitions for before and after this UUID
123 // and add back to the future partitions.
124 let (before_range, after_range) = contained_part.split(source_id);
125 self.future
126 .extend(before_range.into_iter().chain(after_range));
127
128 // Store the new part in our active partitions
129 self.active.insert(source_id.clone(), new_part);
130 }
131 };
132
133 Ok(())
134 }
135}