mz_storage_types/sources/
sql_server.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Types related to SQL Server sources
11
12use std::sync::{Arc, LazyLock};
13use std::time::Duration;
14
15use mz_dyncfg::Config;
16use mz_ore::future::InTask;
17use mz_proto::{IntoRustIfSome, RustType};
18use mz_repr::{CatalogItemId, Datum, GlobalId, RelationDesc, Row, ScalarType};
19use mz_sql_server_util::cdc::Lsn;
20use proptest_derive::Arbitrary;
21use serde::{Deserialize, Serialize};
22use timely::progress::Antichain;
23
24use crate::AlterCompatible;
25use crate::connections::inline::{
26    ConnectionAccess, ConnectionResolver, InlinedConnection, IntoInlineConnection,
27    ReferencedConnection,
28};
29use crate::controller::AlterError;
30use crate::sources::{SourceConnection, SourceExportDetails, SourceTimestamp};
31
32include!(concat!(
33    env!("OUT_DIR"),
34    "/mz_storage_types.sources.sql_server.rs"
35));
36
37pub const SNAPSHOT_MAX_LSN_WAIT: Config<Duration> = Config::new(
38    "sql_server_snapshot_max_lsn_wait",
39    Duration::from_secs(30),
40    "Maximum amount of time we'll wait for SQL Server to report an LSN (in other words for \
41    CDC to be fully enabled) before taking an initial snapshot.",
42);
43
44pub const SNAPSHOT_PROGRESS_REPORT_INTERVAL: Config<Duration> = Config::new(
45    "sql_server_snapshot_progress_report_interval",
46    Duration::from_secs(2),
47    "Interval at which we'll report progress for currently running snapshots.",
48);
49
50pub const CDC_POLL_INTERVAL: Config<Duration> = Config::new(
51    "sql_server_cdc_poll_interval",
52    Duration::from_millis(500),
53    "Interval at which we'll poll the upstream SQL Server instance to discover new changes.",
54);
55
56pub const CDC_CLEANUP_CHANGE_TABLE: Config<bool> = Config::new(
57    "sql_server_cdc_cleanup_change_table",
58    true,
59    "When enabled we'll notify SQL Server that it can cleanup the change tables \
60    as the source makes progress and commits data.",
61);
62
63/// Maximum number of deletes that we'll make from a single SQL Server change table.
64///
65/// See: <https://learn.microsoft.com/en-us/sql/relational-databases/system-stored-procedures/sys-sp-cdc-cleanup-change-table-transact-sql?view=sql-server-ver16>.
66pub const CDC_CLEANUP_CHANGE_TABLE_MAX_DELETES: Config<u32> = Config::new(
67    "sql_server_cdc_cleanup_change_table_max_deletes",
68    // The default in SQL Server is 5,000 but until we change the cleanup
69    // function to call it iteratively we set a large value here.
70    //
71    // TODO(sql_server2): Call the cleanup function iteratively.
72    1_000_000,
73    "Maximum number of entries that can be deleted by using a single statement.",
74);
75
76pub const OFFSET_KNOWN_INTERVAL: Config<Duration> = Config::new(
77    "sql_server_offset_known_interval",
78    Duration::from_secs(10),
79    "Interval to fetch `offset_known`, from `sys.fn_cdc_get_max_lsn()`",
80);
81
82pub static SQL_SERVER_PROGRESS_DESC: LazyLock<RelationDesc> = LazyLock::new(|| {
83    RelationDesc::builder()
84        .with_column("lsn", ScalarType::Bytes.nullable(true))
85        .finish()
86});
87
88/// Details about how to create a Materialize Source that reads from Microsoft SQL Server.
89#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize, Arbitrary)]
90pub struct SqlServerSource<C: ConnectionAccess = InlinedConnection> {
91    /// ID of this SQL `SOURCE` object in the Catalog.
92    pub catalog_id: CatalogItemId,
93    /// Configuration for connecting to SQL Server.
94    pub connection: C::SqlServer,
95    /// SQL Server specific information that is relevant to creating a source.
96    pub extras: SqlServerSourceExtras,
97}
98
99impl SqlServerSource<InlinedConnection> {
100    pub async fn fetch_write_frontier(
101        self,
102        storage_configuration: &crate::configuration::StorageConfiguration,
103    ) -> Result<Antichain<Lsn>, anyhow::Error> {
104        let config = self
105            .connection
106            .resolve_config(
107                &storage_configuration.connection_context.secrets_reader,
108                storage_configuration,
109                InTask::No,
110            )
111            .await?;
112        let mut client = mz_sql_server_util::Client::connect(config).await?;
113
114        let max_lsn = mz_sql_server_util::inspect::get_max_lsn(&mut client).await?;
115        Ok(Antichain::from_elem(max_lsn))
116    }
117}
118
119impl<R: ConnectionResolver> IntoInlineConnection<SqlServerSource, R>
120    for SqlServerSource<ReferencedConnection>
121{
122    fn into_inline_connection(self, r: R) -> SqlServerSource {
123        let SqlServerSource {
124            catalog_id,
125            connection,
126            extras,
127        } = self;
128
129        SqlServerSource {
130            catalog_id,
131            connection: r.resolve_connection(connection).unwrap_sql_server(),
132            extras,
133        }
134    }
135}
136
137impl<C: ConnectionAccess> SourceConnection for SqlServerSource<C> {
138    fn name(&self) -> &'static str {
139        "sql-server"
140    }
141
142    fn external_reference(&self) -> Option<&str> {
143        None
144    }
145
146    fn default_key_desc(&self) -> RelationDesc {
147        RelationDesc::empty()
148    }
149
150    fn default_value_desc(&self) -> RelationDesc {
151        // The SQL Server source only outputs data to its subsources. The catalog object
152        // representing the source itself is just an empty relation with no columns
153        RelationDesc::empty()
154    }
155
156    fn timestamp_desc(&self) -> RelationDesc {
157        SQL_SERVER_PROGRESS_DESC.clone()
158    }
159
160    fn connection_id(&self) -> Option<CatalogItemId> {
161        Some(self.catalog_id)
162    }
163
164    fn primary_export_details(&self) -> super::SourceExportDetails {
165        SourceExportDetails::None
166    }
167
168    fn supports_read_only(&self) -> bool {
169        false
170    }
171
172    fn prefers_single_replica(&self) -> bool {
173        true
174    }
175}
176
177impl<C: ConnectionAccess> AlterCompatible for SqlServerSource<C> {
178    fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
179        if self == other {
180            return Ok(());
181        }
182
183        let SqlServerSource {
184            catalog_id,
185            connection,
186            extras,
187        } = self;
188
189        let compatibility_checks = [
190            (catalog_id == &other.catalog_id, "catalog_id"),
191            (
192                connection.alter_compatible(id, &other.connection).is_ok(),
193                "connection",
194            ),
195            (extras.alter_compatible(id, &other.extras).is_ok(), "extras"),
196        ];
197
198        for (compatible, field) in compatibility_checks {
199            if !compatible {
200                tracing::warn!(
201                    "SqlServerSourceConnection incompatible at {field}:\nself:\n{:#?}\n\nother\n{:#?}",
202                    self,
203                    other
204                );
205
206                return Err(AlterError { id });
207            }
208        }
209
210        Ok(())
211    }
212}
213
214impl RustType<ProtoSqlServerSource> for SqlServerSource {
215    fn into_proto(&self) -> ProtoSqlServerSource {
216        ProtoSqlServerSource {
217            catalog_id: Some(self.catalog_id.into_proto()),
218            connection: Some(self.connection.into_proto()),
219            extras: Some(self.extras.into_proto()),
220        }
221    }
222
223    fn from_proto(proto: ProtoSqlServerSource) -> Result<Self, mz_proto::TryFromProtoError> {
224        Ok(SqlServerSource {
225            catalog_id: proto
226                .catalog_id
227                .into_rust_if_some("ProtoSqlServerSource::catalog_id")?,
228            connection: proto
229                .connection
230                .into_rust_if_some("ProtoSqlServerSource::connection")?,
231            extras: proto
232                .extras
233                .into_rust_if_some("ProtoSqlServerSource::extras")?,
234        })
235    }
236}
237
238/// Extra information that is pertinent to creating a SQL Server specific
239/// Materialize source.
240///
241/// The information in this struct is durably recorded by serializing it as an
242/// option in the `CREATE SOURCE` SQL statement, thus backward compatibility is
243/// important!
244///
245/// It's currently unused but we keep the struct around to maintain conformity
246/// with other sources.
247#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize, Arbitrary)]
248pub struct SqlServerSourceExtras {}
249
250impl AlterCompatible for SqlServerSourceExtras {
251    fn alter_compatible(&self, _id: GlobalId, _other: &Self) -> Result<(), AlterError> {
252        Ok(())
253    }
254}
255
256impl RustType<ProtoSqlServerSourceExtras> for SqlServerSourceExtras {
257    fn into_proto(&self) -> ProtoSqlServerSourceExtras {
258        ProtoSqlServerSourceExtras {}
259    }
260
261    fn from_proto(_proto: ProtoSqlServerSourceExtras) -> Result<Self, mz_proto::TryFromProtoError> {
262        Ok(SqlServerSourceExtras {})
263    }
264}
265
266/// Specifies the details of a SQL Server source export.
267#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize, Arbitrary)]
268pub struct SqlServerSourceExportDetails {
269    /// Name of the SQL Server capture instance we replicate changes from.
270    pub capture_instance: Arc<str>,
271    /// Description of the upstream table and how it maps to Materialize.
272    pub table: mz_sql_server_util::desc::SqlServerTableDesc,
273    /// Column names that we want to parse as text.
274    pub text_columns: Vec<String>,
275    /// Columns from the upstream source that should be excluded.
276    pub exclude_columns: Vec<String>,
277}
278
279impl RustType<ProtoSqlServerSourceExportDetails> for SqlServerSourceExportDetails {
280    fn into_proto(&self) -> ProtoSqlServerSourceExportDetails {
281        ProtoSqlServerSourceExportDetails {
282            capture_instance: self.capture_instance.to_string(),
283            table: Some(self.table.into_proto()),
284            text_columns: self.text_columns.clone(),
285            exclude_columns: self.exclude_columns.clone(),
286        }
287    }
288
289    fn from_proto(
290        proto: ProtoSqlServerSourceExportDetails,
291    ) -> Result<Self, mz_proto::TryFromProtoError> {
292        Ok(SqlServerSourceExportDetails {
293            capture_instance: proto.capture_instance.into(),
294            table: proto
295                .table
296                .into_rust_if_some("ProtoSqlServerSourceExportDetails::table")?,
297            text_columns: proto.text_columns,
298            exclude_columns: proto.exclude_columns,
299        })
300    }
301}
302
303impl SourceTimestamp for Lsn {
304    fn encode_row(&self) -> mz_repr::Row {
305        Row::pack_slice(&[Datum::Bytes(&self.as_bytes())])
306    }
307
308    fn decode_row(row: &mz_repr::Row) -> Self {
309        let mut datums = row.iter();
310        match (datums.next(), datums.next()) {
311            (Some(Datum::Bytes(bytes)), None) => {
312                let lsn: [u8; 10] = bytes.try_into().expect("invalid LSN, wrong length");
313                Lsn::try_from_bytes(&lsn).expect("invalid LSN")
314            }
315            _ => panic!("invalid row {row:?}"),
316        }
317    }
318}