1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135
// Copyright Materialize, Inc. and contributors. All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.
//! Code related to tracking the frontier of GTID partitions for a MySQL source.
use std::collections::BTreeMap;
use timely::progress::Antichain;
use uuid::Uuid;
use mz_storage_types::sources::mysql::{GtidPartition, GtidState};
use super::super::DefiniteError;
/// Holds the active and future GTID partitions that represent the complete
/// UUID range of all possible GTID source-ids from a MySQL server.
/// The active partitions are all singleton partitions representing a single
/// source-id timestamp, and the future partitions represent the missing
/// UUID ranges that we have not yet seen and are held at timestamp GtidState::Absent.
///
/// This is used to keep track of all partitions and is updated as we receive
/// new GTID updates from the server, and is used to create a full 'frontier'
/// representing the current state of all GTID partitions that we can use
/// to downgrade capabilities for the source.
///
/// We could instead mint capabilities for each individual partition
/// and advance each partition as a frontier separately using its own capabilities,
/// but since this is used inside a fallible operator if we ever hit any errors
/// then all newly minted capabilities would be dropped by the async runtime
/// and we might lose the ability to send any more data.
/// However if we just use the main capabilities provided to the operator, the
/// capabilities externally will be preserved even in the case of an error in
/// the operator, which is why we just manage a single frontier and capability set.
pub(super) struct GtidReplicationPartitions {
active: BTreeMap<Uuid, GtidPartition>,
future: Vec<GtidPartition>,
}
impl From<Antichain<GtidPartition>> for GtidReplicationPartitions {
fn from(frontier: Antichain<GtidPartition>) -> Self {
let mut active = BTreeMap::new();
let mut future = Vec::new();
for part in frontier.iter() {
if part.timestamp() == &GtidState::Absent {
future.push(part.clone());
} else {
let source_id = part.interval().singleton().unwrap().clone();
active.insert(source_id, part.clone());
}
}
Self { active, future }
}
}
impl GtidReplicationPartitions {
/// Return an Antichain for the frontier composed of all the
/// active and future GTID partitions.
pub(super) fn frontier(&self) -> Antichain<GtidPartition> {
Antichain::from_iter(
self.active
.values()
.cloned()
.chain(self.future.iter().cloned()),
)
}
/// Given a singleton GTID partition, update the timestamp of the existing
/// active partition with the same UUID
/// or split the future partitions to remove this new partition and then
/// insert the new 'active' partition.
///
/// This is used whenever we receive a GTID update from the server and
/// need to update our state keeping track
/// of all the active and future GTID partition timestamps.
/// This call should usually be followed up by downgrading capabilities
/// using the frontier returned by `self.frontier()`
pub(super) fn advance_frontier(
&mut self,
new_part: GtidPartition,
) -> Result<(), DefiniteError> {
let source_id = new_part.interval().singleton().unwrap();
// Check if we have an active partition for the GTID UUID
match self.active.get_mut(source_id) {
Some(active_part) => {
// Since we start replication at a specific upper, we
// should only see GTID transaction-ids
// in a monotonic order for each source, starting at that upper.
if active_part.timestamp() > new_part.timestamp() {
let err = DefiniteError::BinlogGtidMonotonicityViolation(
source_id.to_string(),
new_part.timestamp().clone(),
);
return Err(err);
}
// replace this active partition with the new one
*active_part = new_part;
}
// We've received a GTID for a UUID we don't yet know about
None => {
// Extract the future partition whose range encompasses this UUID
// TODO: Replace with Vec::extract_if() once it's stabilized
let mut i = 0;
let mut contained_part = None;
while i < self.future.len() {
if self.future[i].interval().contains(source_id) {
contained_part = Some(self.future.remove(i));
break;
} else {
i += 1;
}
}
let contained_part =
contained_part.expect("expected a future partition to contain the UUID");
// Split the future partition into partitions for before and after this UUID
// and add back to the future partitions.
let (before_range, after_range) = contained_part.split(source_id);
self.future
.extend(before_range.into_iter().chain(after_range));
// Store the new part in our active partitions
self.active.insert(source_id.clone(), new_part);
}
};
Ok(())
}
}