Skip to main content

mz_storage_types/sources/
sql_server.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Types related to SQL Server sources
11
12use std::sync::{Arc, LazyLock};
13use std::time::Duration;
14
15use mz_dyncfg::Config;
16use mz_ore::future::InTask;
17use mz_proto::RustType;
18use mz_repr::{CatalogItemId, Datum, GlobalId, RelationDesc, Row, SqlScalarType};
19use mz_sql_server_util::cdc::Lsn;
20use serde::{Deserialize, Serialize};
21use timely::progress::Antichain;
22
23use crate::AlterCompatible;
24use crate::connections::inline::{
25    ConnectionAccess, ConnectionResolver, InlinedConnection, IntoInlineConnection,
26    ReferencedConnection,
27};
28use crate::controller::AlterError;
29use crate::sources::{SourceConnection, SourceTimestamp};
30
31include!(concat!(
32    env!("OUT_DIR"),
33    "/mz_storage_types.sources.sql_server.rs"
34));
35
36pub const MAX_LSN_WAIT: Config<Duration> = Config::new(
37    "sql_server_max_lsn_wait",
38    Duration::from_secs(30),
39    "Maximum amount of time we'll wait for SQL Server to report an LSN (in other words for \
40    CDC to be fully enabled)",
41);
42
43pub const SNAPSHOT_PROGRESS_REPORT_INTERVAL: Config<Duration> = Config::new(
44    "sql_server_snapshot_progress_report_interval",
45    Duration::from_secs(2),
46    "Interval at which we'll report progress for currently running snapshots.",
47);
48
49pub const CDC_POLL_INTERVAL: Config<Duration> = Config::new(
50    "sql_server_cdc_poll_interval",
51    Duration::from_millis(500),
52    "Interval at which we'll poll the upstream SQL Server instance to discover new changes.",
53);
54
55pub const CDC_CLEANUP_CHANGE_TABLE: Config<bool> = Config::new(
56    "sql_server_cdc_cleanup_change_table",
57    false,
58    "When enabled we'll notify SQL Server that it can cleanup the change tables \
59    as the source makes progress and commits data.",
60);
61
62/// Maximum number of deletes that we'll make from a single SQL Server change table.
63///
64/// See: <https://learn.microsoft.com/en-us/sql/relational-databases/system-stored-procedures/sys-sp-cdc-cleanup-change-table-transact-sql?view=sql-server-ver16>.
65pub const CDC_CLEANUP_CHANGE_TABLE_MAX_DELETES: Config<u32> = Config::new(
66    "sql_server_cdc_cleanup_change_table_max_deletes",
67    // The default in SQL Server is 5,000 but until we change the cleanup
68    // function to call it iteratively we set a large value here.
69    //
70    // TODO(sql_server2): Call the cleanup function iteratively.
71    1_000_000,
72    "Maximum number of entries that can be deleted by using a single statement.",
73);
74
75pub const OFFSET_KNOWN_INTERVAL: Config<Duration> = Config::new(
76    "sql_server_offset_known_interval",
77    Duration::from_secs(1),
78    "Interval to fetch `offset_known`, from `sys.fn_cdc_get_max_lsn()`",
79);
80
81pub static SQL_SERVER_PROGRESS_DESC: LazyLock<RelationDesc> = LazyLock::new(|| {
82    RelationDesc::builder()
83        .with_column("lsn", SqlScalarType::Bytes.nullable(true))
84        .finish()
85});
86
87/// Details about how to create a Materialize Source that reads from Microsoft SQL Server.
88#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
89pub struct SqlServerSourceConnection<C: ConnectionAccess = InlinedConnection> {
90    /// The ID of the Connection object this source is using.
91    pub connection_id: CatalogItemId,
92    /// Configuration for connecting to SQL Server.
93    pub connection: C::SqlServer,
94    /// SQL Server specific information that is relevant to creating a source.
95    pub extras: SqlServerSourceExtras,
96}
97
98impl SqlServerSourceConnection<InlinedConnection> {
99    pub async fn fetch_write_frontier(
100        self,
101        storage_configuration: &crate::configuration::StorageConfiguration,
102    ) -> Result<Antichain<Lsn>, anyhow::Error> {
103        let config = self
104            .connection
105            .resolve_config(
106                &storage_configuration.connection_context.secrets_reader,
107                storage_configuration,
108                InTask::No,
109            )
110            .await?;
111        let mut client = mz_sql_server_util::Client::connect(config).await?;
112
113        let max_lsn = mz_sql_server_util::inspect::get_max_lsn(&mut client).await?;
114        Ok(Antichain::from_elem(max_lsn))
115    }
116}
117
118impl<R: ConnectionResolver> IntoInlineConnection<SqlServerSourceConnection, R>
119    for SqlServerSourceConnection<ReferencedConnection>
120{
121    fn into_inline_connection(self, r: R) -> SqlServerSourceConnection {
122        let SqlServerSourceConnection {
123            connection_id,
124            connection,
125            extras,
126        } = self;
127
128        SqlServerSourceConnection {
129            connection_id,
130            connection: r.resolve_connection(connection).unwrap_sql_server(),
131            extras,
132        }
133    }
134}
135
136impl<C: ConnectionAccess> SourceConnection for SqlServerSourceConnection<C> {
137    fn name(&self) -> &'static str {
138        "sql-server"
139    }
140
141    fn external_reference(&self) -> Option<&str> {
142        None
143    }
144
145    fn default_key_desc(&self) -> RelationDesc {
146        RelationDesc::empty()
147    }
148
149    fn default_value_desc(&self) -> RelationDesc {
150        // The SQL Server source only outputs data to its subsources. The catalog object
151        // representing the source itself is just an empty relation with no columns
152        RelationDesc::empty()
153    }
154
155    fn timestamp_desc(&self) -> RelationDesc {
156        SQL_SERVER_PROGRESS_DESC.clone()
157    }
158
159    fn connection_id(&self) -> Option<CatalogItemId> {
160        Some(self.connection_id)
161    }
162
163    fn supports_read_only(&self) -> bool {
164        false
165    }
166
167    fn prefers_single_replica(&self) -> bool {
168        true
169    }
170}
171
172impl<C: ConnectionAccess> AlterCompatible for SqlServerSourceConnection<C> {
173    fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
174        if self == other {
175            return Ok(());
176        }
177
178        let SqlServerSourceConnection {
179            connection_id,
180            connection,
181            extras,
182        } = self;
183
184        let compatibility_checks = [
185            (connection_id == &other.connection_id, "connection_id"),
186            (
187                connection.alter_compatible(id, &other.connection).is_ok(),
188                "connection",
189            ),
190            (extras.alter_compatible(id, &other.extras).is_ok(), "extras"),
191        ];
192
193        for (compatible, field) in compatibility_checks {
194            if !compatible {
195                tracing::warn!(
196                    "SqlServerSourceConnection incompatible at {field}:\nself:\n{:#?}\n\nother\n{:#?}",
197                    self,
198                    other
199                );
200
201                return Err(AlterError { id });
202            }
203        }
204
205        Ok(())
206    }
207}
208
209/// Extra information that is pertinent to creating a SQL Server specific
210/// Materialize source.
211///
212/// The information in this struct is durably recorded by serializing it as an
213/// option in the `CREATE SOURCE` SQL statement, thus backward compatibility is
214/// important!
215///
216/// It's currently unused but we keep the struct around to maintain conformity
217/// with other sources.
218#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
219pub struct SqlServerSourceExtras {
220    /// The most recent `restore_history_id` field from msdb.dbo.restorehistory. A change in this
221    /// value indicates the upstream SQL server has been restored.
222    /// See: <https://learn.microsoft.com/en-us/sql/relational-databases/system-tables/restorehistory-transact-sql?view=sql-server-ver17>
223    pub restore_history_id: Option<i32>,
224}
225
226impl AlterCompatible for SqlServerSourceExtras {
227    fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
228        if self.restore_history_id != other.restore_history_id {
229            tracing::warn!(?self, ?other, "SqlServerSourceExtras incompatible");
230            return Err(AlterError { id });
231        }
232        Ok(())
233    }
234}
235
236impl RustType<ProtoSqlServerSourceExtras> for SqlServerSourceExtras {
237    fn into_proto(&self) -> ProtoSqlServerSourceExtras {
238        ProtoSqlServerSourceExtras {
239            restore_history_id: self.restore_history_id.clone(),
240        }
241    }
242
243    fn from_proto(proto: ProtoSqlServerSourceExtras) -> Result<Self, mz_proto::TryFromProtoError> {
244        Ok(SqlServerSourceExtras {
245            restore_history_id: proto.restore_history_id,
246        })
247    }
248}
249
250/// Specifies the details of a SQL Server source export.
251#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
252pub struct SqlServerSourceExportDetails {
253    /// Name of the SQL Server capture instance we replicate changes from.
254    pub capture_instance: Arc<str>,
255    /// Description of the upstream table and how it maps to Materialize.
256    pub table: mz_sql_server_util::desc::SqlServerTableDesc,
257    /// Column names that we want to parse as text.
258    pub text_columns: Vec<String>,
259    /// Columns from the upstream source that should be excluded.
260    pub exclude_columns: Vec<String>,
261    /// The initial 'LSN' for this export.
262    /// This is used as a consistent snapshot point for this export to ensure
263    /// correctness in the case of multiple replicas.
264    pub initial_lsn: mz_sql_server_util::cdc::Lsn,
265}
266
267impl SourceTimestamp for Lsn {
268    fn encode_row(&self) -> mz_repr::Row {
269        Row::pack_slice(&[Datum::Bytes(&self.as_bytes())])
270    }
271
272    fn decode_row(row: &mz_repr::Row) -> Self {
273        let mut datums = row.iter();
274        match (datums.next(), datums.next()) {
275            (Some(Datum::Bytes(bytes)), None) => {
276                let lsn: [u8; 10] = bytes.try_into().expect("invalid LSN, wrong length");
277                Lsn::try_from_bytes(&lsn).expect("invalid LSN")
278            }
279            _ => panic!("invalid row {row:?}"),
280        }
281    }
282}