mz_storage_types/sources/
sql_server.rs1use std::sync::{Arc, LazyLock};
13use std::time::Duration;
14
15use mz_dyncfg::Config;
16use mz_ore::future::InTask;
17use mz_proto::RustType;
18use mz_repr::{CatalogItemId, Datum, GlobalId, RelationDesc, Row, SqlScalarType};
19use mz_sql_server_util::cdc::Lsn;
20use serde::{Deserialize, Serialize};
21use timely::progress::Antichain;
22
23use crate::AlterCompatible;
24use crate::connections::inline::{
25 ConnectionAccess, ConnectionResolver, InlinedConnection, IntoInlineConnection,
26 ReferencedConnection,
27};
28use crate::controller::AlterError;
29use crate::sources::{SourceConnection, SourceTimestamp};
30
31include!(concat!(
32 env!("OUT_DIR"),
33 "/mz_storage_types.sources.sql_server.rs"
34));
35
36pub const MAX_LSN_WAIT: Config<Duration> = Config::new(
37 "sql_server_max_lsn_wait",
38 Duration::from_secs(30),
39 "Maximum amount of time we'll wait for SQL Server to report an LSN (in other words for \
40 CDC to be fully enabled)",
41);
42
43pub const SNAPSHOT_PROGRESS_REPORT_INTERVAL: Config<Duration> = Config::new(
44 "sql_server_snapshot_progress_report_interval",
45 Duration::from_secs(2),
46 "Interval at which we'll report progress for currently running snapshots.",
47);
48
49pub const CDC_POLL_INTERVAL: Config<Duration> = Config::new(
50 "sql_server_cdc_poll_interval",
51 Duration::from_millis(500),
52 "Interval at which we'll poll the upstream SQL Server instance to discover new changes.",
53);
54
55pub const CDC_CLEANUP_CHANGE_TABLE: Config<bool> = Config::new(
56 "sql_server_cdc_cleanup_change_table",
57 false,
58 "When enabled we'll notify SQL Server that it can cleanup the change tables \
59 as the source makes progress and commits data.",
60);
61
62pub const CDC_CLEANUP_CHANGE_TABLE_MAX_DELETES: Config<u32> = Config::new(
66 "sql_server_cdc_cleanup_change_table_max_deletes",
67 1_000_000,
72 "Maximum number of entries that can be deleted by using a single statement.",
73);
74
75pub const OFFSET_KNOWN_INTERVAL: Config<Duration> = Config::new(
76 "sql_server_offset_known_interval",
77 Duration::from_secs(1),
78 "Interval to fetch `offset_known`, from `sys.fn_cdc_get_max_lsn()`",
79);
80
81pub static SQL_SERVER_PROGRESS_DESC: LazyLock<RelationDesc> = LazyLock::new(|| {
82 RelationDesc::builder()
83 .with_column("lsn", SqlScalarType::Bytes.nullable(true))
84 .finish()
85});
86
87#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
89pub struct SqlServerSourceConnection<C: ConnectionAccess = InlinedConnection> {
90 pub connection_id: CatalogItemId,
92 pub connection: C::SqlServer,
94 pub extras: SqlServerSourceExtras,
96}
97
98impl SqlServerSourceConnection<InlinedConnection> {
99 pub async fn fetch_write_frontier(
100 self,
101 storage_configuration: &crate::configuration::StorageConfiguration,
102 ) -> Result<Antichain<Lsn>, anyhow::Error> {
103 let config = self
104 .connection
105 .resolve_config(
106 &storage_configuration.connection_context.secrets_reader,
107 storage_configuration,
108 InTask::No,
109 )
110 .await?;
111 let mut client = mz_sql_server_util::Client::connect(config).await?;
112
113 let max_lsn = mz_sql_server_util::inspect::get_max_lsn(&mut client).await?;
114 Ok(Antichain::from_elem(max_lsn))
115 }
116}
117
118impl<R: ConnectionResolver> IntoInlineConnection<SqlServerSourceConnection, R>
119 for SqlServerSourceConnection<ReferencedConnection>
120{
121 fn into_inline_connection(self, r: R) -> SqlServerSourceConnection {
122 let SqlServerSourceConnection {
123 connection_id,
124 connection,
125 extras,
126 } = self;
127
128 SqlServerSourceConnection {
129 connection_id,
130 connection: r.resolve_connection(connection).unwrap_sql_server(),
131 extras,
132 }
133 }
134}
135
136impl<C: ConnectionAccess> SourceConnection for SqlServerSourceConnection<C> {
137 fn name(&self) -> &'static str {
138 "sql-server"
139 }
140
141 fn external_reference(&self) -> Option<&str> {
142 None
143 }
144
145 fn default_key_desc(&self) -> RelationDesc {
146 RelationDesc::empty()
147 }
148
149 fn default_value_desc(&self) -> RelationDesc {
150 RelationDesc::empty()
153 }
154
155 fn timestamp_desc(&self) -> RelationDesc {
156 SQL_SERVER_PROGRESS_DESC.clone()
157 }
158
159 fn connection_id(&self) -> Option<CatalogItemId> {
160 Some(self.connection_id)
161 }
162
163 fn supports_read_only(&self) -> bool {
164 false
165 }
166
167 fn prefers_single_replica(&self) -> bool {
168 true
169 }
170}
171
172impl<C: ConnectionAccess> AlterCompatible for SqlServerSourceConnection<C> {
173 fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
174 if self == other {
175 return Ok(());
176 }
177
178 let SqlServerSourceConnection {
179 connection_id,
180 connection,
181 extras,
182 } = self;
183
184 let compatibility_checks = [
185 (connection_id == &other.connection_id, "connection_id"),
186 (
187 connection.alter_compatible(id, &other.connection).is_ok(),
188 "connection",
189 ),
190 (extras.alter_compatible(id, &other.extras).is_ok(), "extras"),
191 ];
192
193 for (compatible, field) in compatibility_checks {
194 if !compatible {
195 tracing::warn!(
196 "SqlServerSourceConnection incompatible at {field}:\nself:\n{:#?}\n\nother\n{:#?}",
197 self,
198 other
199 );
200
201 return Err(AlterError { id });
202 }
203 }
204
205 Ok(())
206 }
207}
208
209#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
219pub struct SqlServerSourceExtras {
220 pub restore_history_id: Option<i32>,
224}
225
226impl AlterCompatible for SqlServerSourceExtras {
227 fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
228 if self.restore_history_id != other.restore_history_id {
229 tracing::warn!(?self, ?other, "SqlServerSourceExtras incompatible");
230 return Err(AlterError { id });
231 }
232 Ok(())
233 }
234}
235
236impl RustType<ProtoSqlServerSourceExtras> for SqlServerSourceExtras {
237 fn into_proto(&self) -> ProtoSqlServerSourceExtras {
238 ProtoSqlServerSourceExtras {
239 restore_history_id: self.restore_history_id.clone(),
240 }
241 }
242
243 fn from_proto(proto: ProtoSqlServerSourceExtras) -> Result<Self, mz_proto::TryFromProtoError> {
244 Ok(SqlServerSourceExtras {
245 restore_history_id: proto.restore_history_id,
246 })
247 }
248}
249
250#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
252pub struct SqlServerSourceExportDetails {
253 pub capture_instance: Arc<str>,
255 pub table: mz_sql_server_util::desc::SqlServerTableDesc,
257 pub text_columns: Vec<String>,
259 pub exclude_columns: Vec<String>,
261 pub initial_lsn: mz_sql_server_util::cdc::Lsn,
265}
266
267impl SourceTimestamp for Lsn {
268 fn encode_row(&self) -> mz_repr::Row {
269 Row::pack_slice(&[Datum::Bytes(&self.as_bytes())])
270 }
271
272 fn decode_row(row: &mz_repr::Row) -> Self {
273 let mut datums = row.iter();
274 match (datums.next(), datums.next()) {
275 (Some(Datum::Bytes(bytes)), None) => {
276 let lsn: [u8; 10] = bytes.try_into().expect("invalid LSN, wrong length");
277 Lsn::try_from_bytes(&lsn).expect("invalid LSN")
278 }
279 _ => panic!("invalid row {row:?}"),
280 }
281 }
282}