mz_storage_types/sources/
sql_server.rs1use std::sync::{Arc, LazyLock};
13use std::time::Duration;
14
15use mz_dyncfg::Config;
16use mz_ore::future::InTask;
17use mz_proto::{IntoRustIfSome, RustType};
18use mz_repr::{CatalogItemId, Datum, GlobalId, RelationDesc, Row, ScalarType};
19use mz_sql_server_util::cdc::Lsn;
20use proptest_derive::Arbitrary;
21use serde::{Deserialize, Serialize};
22use timely::progress::Antichain;
23
24use crate::AlterCompatible;
25use crate::connections::inline::{
26 ConnectionAccess, ConnectionResolver, InlinedConnection, IntoInlineConnection,
27 ReferencedConnection,
28};
29use crate::controller::AlterError;
30use crate::sources::{SourceConnection, SourceExportDetails, SourceTimestamp};
31
32include!(concat!(
33 env!("OUT_DIR"),
34 "/mz_storage_types.sources.sql_server.rs"
35));
36
37pub const SNAPSHOT_MAX_LSN_WAIT: Config<Duration> = Config::new(
38 "sql_server_snapshot_max_lsn_wait",
39 Duration::from_secs(30),
40 "Maximum amount of time we'll wait for SQL Server to report an LSN (in other words for \
41 CDC to be fully enabled) before taking an initial snapshot.",
42);
43
44pub const SNAPSHOT_PROGRESS_REPORT_INTERVAL: Config<Duration> = Config::new(
45 "sql_server_snapshot_progress_report_interval",
46 Duration::from_secs(2),
47 "Interval at which we'll report progress for currently running snapshots.",
48);
49
50pub const CDC_POLL_INTERVAL: Config<Duration> = Config::new(
51 "sql_server_cdc_poll_interval",
52 Duration::from_millis(500),
53 "Interval at which we'll poll the upstream SQL Server instance to discover new changes.",
54);
55
56pub const CDC_CLEANUP_CHANGE_TABLE: Config<bool> = Config::new(
57 "sql_server_cdc_cleanup_change_table",
58 true,
59 "When enabled we'll notify SQL Server that it can cleanup the change tables \
60 as the source makes progress and commits data.",
61);
62
63pub const CDC_CLEANUP_CHANGE_TABLE_MAX_DELETES: Config<u32> = Config::new(
67 "sql_server_cdc_cleanup_change_table_max_deletes",
68 1_000_000,
73 "Maximum number of entries that can be deleted by using a single statement.",
74);
75
76pub const OFFSET_KNOWN_INTERVAL: Config<Duration> = Config::new(
77 "sql_server_offset_known_interval",
78 Duration::from_secs(10),
79 "Interval to fetch `offset_known`, from `sys.fn_cdc_get_max_lsn()`",
80);
81
82pub static SQL_SERVER_PROGRESS_DESC: LazyLock<RelationDesc> = LazyLock::new(|| {
83 RelationDesc::builder()
84 .with_column("lsn", ScalarType::Bytes.nullable(true))
85 .finish()
86});
87
88#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize, Arbitrary)]
90pub struct SqlServerSource<C: ConnectionAccess = InlinedConnection> {
91 pub catalog_id: CatalogItemId,
93 pub connection: C::SqlServer,
95 pub extras: SqlServerSourceExtras,
97}
98
99impl SqlServerSource<InlinedConnection> {
100 pub async fn fetch_write_frontier(
101 self,
102 storage_configuration: &crate::configuration::StorageConfiguration,
103 ) -> Result<Antichain<Lsn>, anyhow::Error> {
104 let config = self
105 .connection
106 .resolve_config(
107 &storage_configuration.connection_context.secrets_reader,
108 storage_configuration,
109 InTask::No,
110 )
111 .await?;
112 let mut client = mz_sql_server_util::Client::connect(config).await?;
113
114 let max_lsn = mz_sql_server_util::inspect::get_max_lsn(&mut client).await?;
115 Ok(Antichain::from_elem(max_lsn))
116 }
117}
118
119impl<R: ConnectionResolver> IntoInlineConnection<SqlServerSource, R>
120 for SqlServerSource<ReferencedConnection>
121{
122 fn into_inline_connection(self, r: R) -> SqlServerSource {
123 let SqlServerSource {
124 catalog_id,
125 connection,
126 extras,
127 } = self;
128
129 SqlServerSource {
130 catalog_id,
131 connection: r.resolve_connection(connection).unwrap_sql_server(),
132 extras,
133 }
134 }
135}
136
137impl<C: ConnectionAccess> SourceConnection for SqlServerSource<C> {
138 fn name(&self) -> &'static str {
139 "sql-server"
140 }
141
142 fn external_reference(&self) -> Option<&str> {
143 None
144 }
145
146 fn default_key_desc(&self) -> RelationDesc {
147 RelationDesc::empty()
148 }
149
150 fn default_value_desc(&self) -> RelationDesc {
151 RelationDesc::empty()
154 }
155
156 fn timestamp_desc(&self) -> RelationDesc {
157 SQL_SERVER_PROGRESS_DESC.clone()
158 }
159
160 fn connection_id(&self) -> Option<CatalogItemId> {
161 Some(self.catalog_id)
162 }
163
164 fn primary_export_details(&self) -> super::SourceExportDetails {
165 SourceExportDetails::None
166 }
167
168 fn supports_read_only(&self) -> bool {
169 false
170 }
171
172 fn prefers_single_replica(&self) -> bool {
173 true
174 }
175}
176
177impl<C: ConnectionAccess> AlterCompatible for SqlServerSource<C> {
178 fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
179 if self == other {
180 return Ok(());
181 }
182
183 let SqlServerSource {
184 catalog_id,
185 connection,
186 extras,
187 } = self;
188
189 let compatibility_checks = [
190 (catalog_id == &other.catalog_id, "catalog_id"),
191 (
192 connection.alter_compatible(id, &other.connection).is_ok(),
193 "connection",
194 ),
195 (extras.alter_compatible(id, &other.extras).is_ok(), "extras"),
196 ];
197
198 for (compatible, field) in compatibility_checks {
199 if !compatible {
200 tracing::warn!(
201 "SqlServerSourceConnection incompatible at {field}:\nself:\n{:#?}\n\nother\n{:#?}",
202 self,
203 other
204 );
205
206 return Err(AlterError { id });
207 }
208 }
209
210 Ok(())
211 }
212}
213
214impl RustType<ProtoSqlServerSource> for SqlServerSource {
215 fn into_proto(&self) -> ProtoSqlServerSource {
216 ProtoSqlServerSource {
217 catalog_id: Some(self.catalog_id.into_proto()),
218 connection: Some(self.connection.into_proto()),
219 extras: Some(self.extras.into_proto()),
220 }
221 }
222
223 fn from_proto(proto: ProtoSqlServerSource) -> Result<Self, mz_proto::TryFromProtoError> {
224 Ok(SqlServerSource {
225 catalog_id: proto
226 .catalog_id
227 .into_rust_if_some("ProtoSqlServerSource::catalog_id")?,
228 connection: proto
229 .connection
230 .into_rust_if_some("ProtoSqlServerSource::connection")?,
231 extras: proto
232 .extras
233 .into_rust_if_some("ProtoSqlServerSource::extras")?,
234 })
235 }
236}
237
238#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize, Arbitrary)]
248pub struct SqlServerSourceExtras {}
249
250impl AlterCompatible for SqlServerSourceExtras {
251 fn alter_compatible(&self, _id: GlobalId, _other: &Self) -> Result<(), AlterError> {
252 Ok(())
253 }
254}
255
256impl RustType<ProtoSqlServerSourceExtras> for SqlServerSourceExtras {
257 fn into_proto(&self) -> ProtoSqlServerSourceExtras {
258 ProtoSqlServerSourceExtras {}
259 }
260
261 fn from_proto(_proto: ProtoSqlServerSourceExtras) -> Result<Self, mz_proto::TryFromProtoError> {
262 Ok(SqlServerSourceExtras {})
263 }
264}
265
266#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize, Arbitrary)]
268pub struct SqlServerSourceExportDetails {
269 pub capture_instance: Arc<str>,
271 pub table: mz_sql_server_util::desc::SqlServerTableDesc,
273 pub text_columns: Vec<String>,
275 pub exclude_columns: Vec<String>,
277}
278
279impl RustType<ProtoSqlServerSourceExportDetails> for SqlServerSourceExportDetails {
280 fn into_proto(&self) -> ProtoSqlServerSourceExportDetails {
281 ProtoSqlServerSourceExportDetails {
282 capture_instance: self.capture_instance.to_string(),
283 table: Some(self.table.into_proto()),
284 text_columns: self.text_columns.clone(),
285 exclude_columns: self.exclude_columns.clone(),
286 }
287 }
288
289 fn from_proto(
290 proto: ProtoSqlServerSourceExportDetails,
291 ) -> Result<Self, mz_proto::TryFromProtoError> {
292 Ok(SqlServerSourceExportDetails {
293 capture_instance: proto.capture_instance.into(),
294 table: proto
295 .table
296 .into_rust_if_some("ProtoSqlServerSourceExportDetails::table")?,
297 text_columns: proto.text_columns,
298 exclude_columns: proto.exclude_columns,
299 })
300 }
301}
302
303impl SourceTimestamp for Lsn {
304 fn encode_row(&self) -> mz_repr::Row {
305 Row::pack_slice(&[Datum::Bytes(&self.as_bytes())])
306 }
307
308 fn decode_row(row: &mz_repr::Row) -> Self {
309 let mut datums = row.iter();
310 match (datums.next(), datums.next()) {
311 (Some(Datum::Bytes(bytes)), None) => {
312 let lsn: [u8; 10] = bytes.try_into().expect("invalid LSN, wrong length");
313 Lsn::try_from_bytes(&lsn).expect("invalid LSN")
314 }
315 _ => panic!("invalid row {row:?}"),
316 }
317 }
318}