mz_storage/source/mysql/replication/
context.rs
1use std::collections::{BTreeMap, BTreeSet};
11use std::pin::Pin;
12
13use mysql_async::BinlogStream;
14use mz_storage_types::errors::DataflowError;
15use timely::dataflow::operators::{Capability, CapabilitySet};
16use timely::progress::Antichain;
17use tracing::trace;
18
19use mz_mysql_util::Config;
20use mz_storage_types::sources::mysql::{GtidPartition, GtidState};
21
22use crate::metrics::source::mysql::MySqlSourceMetrics;
23use crate::source::RawSourceCreationConfig;
24use crate::source::mysql::{
25 MySqlTableName, RewindRequest, SourceOutputInfo, StackedAsyncOutputHandle,
26};
27use crate::source::types::SourceMessage;
28
29pub(super) struct ReplContext<'a> {
32 pub(super) config: &'a RawSourceCreationConfig,
33 pub(super) connection_config: &'a Config,
34 pub(super) stream: Pin<&'a mut futures::stream::Peekable<BinlogStream>>,
35 pub(super) table_info: &'a BTreeMap<MySqlTableName, Vec<SourceOutputInfo>>,
36 pub(super) metrics: &'a MySqlSourceMetrics,
37 pub(super) data_output: &'a mut StackedAsyncOutputHandle<
38 GtidPartition,
39 (usize, Result<SourceMessage, DataflowError>),
40 >,
41 pub(super) data_cap_set: &'a mut CapabilitySet<GtidPartition>,
42 pub(super) upper_cap_set: &'a mut CapabilitySet<GtidPartition>,
43 pub(super) rewinds: BTreeMap<usize, (Capability<GtidPartition>, RewindRequest)>,
45 pub(super) errored_outputs: BTreeSet<usize>,
46}
47
48impl<'a> ReplContext<'a> {
49 pub(super) fn new(
50 config: &'a RawSourceCreationConfig,
51 connection_config: &'a Config,
52 stream: Pin<&'a mut futures::stream::Peekable<BinlogStream>>,
53 table_info: &'a BTreeMap<MySqlTableName, Vec<SourceOutputInfo>>,
54 metrics: &'a MySqlSourceMetrics,
55 data_output: &'a mut StackedAsyncOutputHandle<
56 GtidPartition,
57 (usize, Result<SourceMessage, DataflowError>),
58 >,
59 data_cap_set: &'a mut CapabilitySet<GtidPartition>,
60 upper_cap_set: &'a mut CapabilitySet<GtidPartition>,
61 rewinds: BTreeMap<usize, (Capability<GtidPartition>, RewindRequest)>,
62 ) -> Self {
63 Self {
64 config,
65 connection_config,
66 stream,
67 table_info,
68 metrics,
69 data_output,
70 data_cap_set,
71 upper_cap_set,
72 rewinds,
73 errored_outputs: BTreeSet::new(),
74 }
75 }
76
77 pub(super) fn downgrade_data_cap_set(
80 &mut self,
81 reason: &str,
82 new_upper: Antichain<GtidPartition>,
83 ) {
84 let (id, worker_id) = (self.config.id, self.config.worker_id);
85
86 trace!(%id, "timely-{worker_id} [{reason}] advancing data frontier to {new_upper:?}");
87
88 self.data_cap_set.downgrade(&*new_upper);
89
90 self.metrics.gtid_txids.set(
91 new_upper
92 .iter()
93 .filter_map(|part| match part.timestamp() {
94 GtidState::Absent => None,
95 GtidState::Active(tx_id) => Some(tx_id.get()),
96 })
97 .sum(),
98 );
99
100 self.rewinds.retain(|_, (_, req)| {
101 let res = req.snapshot_upper.iter().any(|ts| new_upper.less_than(ts));
105 if !res {
106 trace!(%id, "timely-{worker_id} removing rewind request {req:?}");
107 }
108 res
109 });
110 }
111
112 pub(super) fn downgrade_progress_cap_set(
114 &mut self,
115 reason: &str,
116 new_upper: Antichain<GtidPartition>,
117 ) {
118 let (id, worker_id) = (self.config.id, self.config.worker_id);
119 trace!(%id, "timely-{worker_id} [{reason}] advancing progress frontier to {new_upper:?}");
120 self.upper_cap_set.downgrade(&*new_upper);
121 }
122}