mz_storage/source/mysql/replication/
context.rs1use std::collections::{BTreeMap, BTreeSet};
11use std::pin::Pin;
12
13use mysql_async::BinlogStream;
14use mz_storage_types::errors::DataflowError;
15use timely::dataflow::operators::{Capability, CapabilitySet};
16use timely::progress::Antichain;
17use tracing::trace;
18
19use mz_mysql_util::Config;
20use mz_storage_types::sources::mysql::{GtidPartition, GtidState};
21
22use crate::metrics::source::mysql::MySqlSourceMetrics;
23use crate::source::RawSourceCreationConfig;
24use crate::source::mysql::{
25 MySqlTableName, RewindRequest, SourceOutputInfo, StackedAsyncOutputHandle,
26};
27use crate::source::types::SourceMessage;
28
29pub(super) struct ReplContext<'a> {
32 pub(super) config: &'a RawSourceCreationConfig,
33 pub(super) connection_config: &'a Config,
34 pub(super) stream: Pin<&'a mut futures::stream::Peekable<BinlogStream>>,
35 pub(super) table_info: &'a BTreeMap<MySqlTableName, Vec<SourceOutputInfo>>,
36 pub(super) metrics: &'a MySqlSourceMetrics,
37 pub(super) data_output: &'a mut StackedAsyncOutputHandle<
38 GtidPartition,
39 (usize, Result<SourceMessage, DataflowError>),
40 >,
41 pub(super) data_cap_set: &'a mut CapabilitySet<GtidPartition>,
42 pub(super) rewinds: BTreeMap<usize, (Capability<GtidPartition>, RewindRequest)>,
44 pub(super) errored_outputs: BTreeSet<usize>,
45}
46
47impl<'a> ReplContext<'a> {
48 pub(super) fn new(
49 config: &'a RawSourceCreationConfig,
50 connection_config: &'a Config,
51 stream: Pin<&'a mut futures::stream::Peekable<BinlogStream>>,
52 table_info: &'a BTreeMap<MySqlTableName, Vec<SourceOutputInfo>>,
53 metrics: &'a MySqlSourceMetrics,
54 data_output: &'a mut StackedAsyncOutputHandle<
55 GtidPartition,
56 (usize, Result<SourceMessage, DataflowError>),
57 >,
58 data_cap_set: &'a mut CapabilitySet<GtidPartition>,
59 rewinds: BTreeMap<usize, (Capability<GtidPartition>, RewindRequest)>,
60 ) -> Self {
61 Self {
62 config,
63 connection_config,
64 stream,
65 table_info,
66 metrics,
67 data_output,
68 data_cap_set,
69 rewinds,
70 errored_outputs: BTreeSet::new(),
71 }
72 }
73
74 pub(super) fn downgrade_data_cap_set(
77 &mut self,
78 reason: &str,
79 new_upper: Antichain<GtidPartition>,
80 ) {
81 let (id, worker_id) = (self.config.id, self.config.worker_id);
82
83 trace!(%id, "timely-{worker_id} [{reason}] advancing data frontier to {new_upper:?}");
84
85 self.data_cap_set.downgrade(&*new_upper);
86
87 self.metrics.gtid_txids.set(
88 new_upper
89 .iter()
90 .filter_map(|part| match part.timestamp() {
91 GtidState::Absent => None,
92 GtidState::Active(tx_id) => Some(tx_id.get()),
93 })
94 .sum(),
95 );
96
97 self.rewinds.retain(|_, (_, req)| {
98 let res = req.snapshot_upper.iter().any(|ts| new_upper.less_than(ts));
102 if !res {
103 trace!(%id, "timely-{worker_id} removing rewind request {req:?}");
104 }
105 res
106 });
107 }
108}