mz_storage_types/sources/
postgres.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Types related to postgres sources
11
12use mz_expr::MirScalarExpr;
13use mz_postgres_util::desc::PostgresTableDesc;
14use mz_proto::{IntoRustIfSome, RustType, TryFromProtoError};
15use mz_repr::{CatalogItemId, GlobalId, RelationDesc, ScalarType};
16use proptest::prelude::any;
17use proptest_derive::Arbitrary;
18use serde::{Deserialize, Serialize};
19use std::sync::LazyLock;
20
21use crate::AlterCompatible;
22use crate::connections::inline::{
23    ConnectionAccess, ConnectionResolver, InlinedConnection, IntoInlineConnection,
24    ReferencedConnection,
25};
26use crate::controller::AlterError;
27use crate::sources::{MzOffset, SourceConnection};
28
29use super::SourceExportDetails;
30
31include!(concat!(
32    env!("OUT_DIR"),
33    "/mz_storage_types.sources.postgres.rs"
34));
35
36#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize, Arbitrary)]
37pub struct PostgresSourceConnection<C: ConnectionAccess = InlinedConnection> {
38    pub connection_id: CatalogItemId,
39    pub connection: C::Pg,
40    pub publication: String,
41    pub publication_details: PostgresSourcePublicationDetails,
42}
43
44impl<R: ConnectionResolver> IntoInlineConnection<PostgresSourceConnection, R>
45    for PostgresSourceConnection<ReferencedConnection>
46{
47    fn into_inline_connection(self, r: R) -> PostgresSourceConnection {
48        let PostgresSourceConnection {
49            connection_id,
50            connection,
51            publication,
52            publication_details,
53        } = self;
54
55        PostgresSourceConnection {
56            connection_id,
57            connection: r.resolve_connection(connection).unwrap_pg(),
58            publication,
59            publication_details,
60        }
61    }
62}
63
64pub static PG_PROGRESS_DESC: LazyLock<RelationDesc> = LazyLock::new(|| {
65    RelationDesc::builder()
66        .with_column("lsn", ScalarType::UInt64.nullable(true))
67        .finish()
68});
69
70impl PostgresSourceConnection {
71    pub async fn fetch_write_frontier(
72        self,
73        storage_configuration: &crate::configuration::StorageConfiguration,
74    ) -> Result<timely::progress::Antichain<MzOffset>, anyhow::Error> {
75        use timely::progress::Antichain;
76
77        let config = self
78            .connection
79            .config(
80                &storage_configuration.connection_context.secrets_reader,
81                storage_configuration,
82                mz_ore::future::InTask::No,
83            )
84            .await?;
85        let client = config
86            .connect(
87                "postgres_wal_lsn",
88                &storage_configuration.connection_context.ssh_tunnel_manager,
89            )
90            .await?;
91
92        let lsn = mz_postgres_util::get_current_wal_lsn(&client).await?;
93
94        let current_upper = Antichain::from_elem(MzOffset::from(u64::from(lsn)));
95        Ok(current_upper)
96    }
97}
98
99impl<C: ConnectionAccess> SourceConnection for PostgresSourceConnection<C> {
100    fn name(&self) -> &'static str {
101        "postgres"
102    }
103
104    fn external_reference(&self) -> Option<&str> {
105        None
106    }
107
108    fn default_key_desc(&self) -> RelationDesc {
109        RelationDesc::empty()
110    }
111
112    fn default_value_desc(&self) -> RelationDesc {
113        // The postgres source only outputs data to its subsources. The catalog object
114        // representing the source itself is just an empty relation with no columns
115        RelationDesc::empty()
116    }
117
118    fn timestamp_desc(&self) -> RelationDesc {
119        PG_PROGRESS_DESC.clone()
120    }
121
122    fn connection_id(&self) -> Option<CatalogItemId> {
123        Some(self.connection_id)
124    }
125
126    fn primary_export_details(&self) -> SourceExportDetails {
127        SourceExportDetails::None
128    }
129
130    fn supports_read_only(&self) -> bool {
131        false
132    }
133
134    fn prefers_single_replica(&self) -> bool {
135        true
136    }
137}
138
139impl<C: ConnectionAccess> AlterCompatible for PostgresSourceConnection<C> {
140    fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
141        if self == other {
142            return Ok(());
143        }
144
145        let PostgresSourceConnection {
146            connection_id,
147            connection,
148            publication,
149            publication_details,
150        } = self;
151
152        let compatibility_checks = [
153            (connection_id == &other.connection_id, "connection_id"),
154            (
155                connection.alter_compatible(id, &other.connection).is_ok(),
156                "connection",
157            ),
158            (publication == &other.publication, "publication"),
159            (
160                publication_details
161                    .alter_compatible(id, &other.publication_details)
162                    .is_ok(),
163                "publication_details",
164            ),
165        ];
166
167        for (compatible, field) in compatibility_checks {
168            if !compatible {
169                tracing::warn!(
170                    "PostgresSourceConnection incompatible at {field}:\nself:\n{:#?}\n\nother\n{:#?}",
171                    self,
172                    other
173                );
174
175                return Err(AlterError { id });
176            }
177        }
178
179        Ok(())
180    }
181}
182
183#[derive(Copy, Clone, Debug, Eq, PartialEq, Serialize, Deserialize, Arbitrary)]
184pub enum CastType {
185    /// This cast is corresponds to its type in the upstream system.
186    Natural,
187    /// This cast was generated with the `TEXT COLUMNS` option.
188    Text,
189}
190
191impl RustType<ProtoCastType> for CastType {
192    fn into_proto(&self) -> ProtoCastType {
193        use proto_cast_type::Kind::*;
194        ProtoCastType {
195            kind: Some(match self {
196                CastType::Natural => Natural(()),
197                CastType::Text => Text(()),
198            }),
199        }
200    }
201
202    fn from_proto(proto: ProtoCastType) -> Result<Self, TryFromProtoError> {
203        use proto_cast_type::Kind::*;
204        Ok(match proto.kind {
205            Some(Natural(())) => CastType::Natural,
206            Some(Text(())) => CastType::Text,
207            None => {
208                return Err(TryFromProtoError::missing_field(
209                    "ProtoWindowFrameUnits::kind",
210                ));
211            }
212        })
213    }
214}
215
216impl RustType<ProtoPostgresSourceConnection> for PostgresSourceConnection {
217    fn into_proto(&self) -> ProtoPostgresSourceConnection {
218        ProtoPostgresSourceConnection {
219            connection: Some(self.connection.into_proto()),
220            connection_id: Some(self.connection_id.into_proto()),
221            publication: self.publication.clone(),
222            details: Some(self.publication_details.into_proto()),
223        }
224    }
225
226    fn from_proto(proto: ProtoPostgresSourceConnection) -> Result<Self, TryFromProtoError> {
227        Ok(PostgresSourceConnection {
228            connection: proto
229                .connection
230                .into_rust_if_some("ProtoPostgresSourceConnection::connection")?,
231            connection_id: proto
232                .connection_id
233                .into_rust_if_some("ProtoPostgresSourceConnection::connection_id")?,
234            publication: proto.publication,
235            publication_details: proto
236                .details
237                .into_rust_if_some("ProtoPostgresSourceConnection::details")?,
238        })
239    }
240}
241
242/// The details of a source export from a postgres source.
243#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize, Arbitrary)]
244pub struct PostgresSourceExportDetails {
245    /// The cast expressions to convert the incoming string encoded rows to
246    /// their target types
247    #[proptest(
248        strategy = "proptest::collection::vec((any::<CastType>(), any::<MirScalarExpr>()), 0..4)"
249    )]
250    pub column_casts: Vec<(CastType, MirScalarExpr)>,
251    pub table: PostgresTableDesc,
252}
253
254impl AlterCompatible for PostgresSourceExportDetails {
255    fn alter_compatible(&self, _id: GlobalId, _other: &Self) -> Result<(), AlterError> {
256        // compatibility checks are performed against the upstream table in the source
257        // render operators instead
258        let Self {
259            column_casts: _,
260            table: _,
261        } = self;
262        Ok(())
263    }
264}
265
266impl RustType<ProtoPostgresSourceExportDetails> for PostgresSourceExportDetails {
267    fn into_proto(&self) -> ProtoPostgresSourceExportDetails {
268        let mut column_casts = Vec::with_capacity(self.column_casts.len());
269
270        for (col_type, cast) in self.column_casts.iter() {
271            column_casts.push(ProtoPostgresColumnCast {
272                cast: Some(cast.into_proto()),
273                cast_type: Some(col_type.into_proto()),
274            });
275        }
276
277        ProtoPostgresSourceExportDetails {
278            table: Some(self.table.into_proto()),
279            column_casts,
280        }
281    }
282
283    fn from_proto(proto: ProtoPostgresSourceExportDetails) -> Result<Self, TryFromProtoError> {
284        let mut column_casts = vec![];
285        for column_cast in proto.column_casts.into_iter() {
286            column_casts.push((
287                column_cast
288                    .cast_type
289                    .into_rust_if_some("ProtoPostgresColumnCast::cast_type")?,
290                column_cast
291                    .cast
292                    .into_rust_if_some("ProtoPostgresColumnCast::cast")?,
293            ));
294        }
295
296        Ok(PostgresSourceExportDetails {
297            table: proto
298                .table
299                .into_rust_if_some("ProtoPostgresSourceExportDetails::table")?,
300            column_casts,
301        })
302    }
303}
304
305#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize, Arbitrary)]
306pub struct PostgresSourcePublicationDetails {
307    pub slot: String,
308    /// The active timeline_id when this source was created
309    /// The None value indicates an unknown timeline, to account for sources that existed
310    /// prior to this field being introduced
311    pub timeline_id: Option<u64>,
312    pub database: String,
313}
314
315impl RustType<ProtoPostgresSourcePublicationDetails> for PostgresSourcePublicationDetails {
316    fn into_proto(&self) -> ProtoPostgresSourcePublicationDetails {
317        ProtoPostgresSourcePublicationDetails {
318            slot: self.slot.clone(),
319            timeline_id: self.timeline_id.clone(),
320            database: self.database.clone(),
321        }
322    }
323
324    fn from_proto(proto: ProtoPostgresSourcePublicationDetails) -> Result<Self, TryFromProtoError> {
325        Ok(PostgresSourcePublicationDetails {
326            slot: proto.slot,
327            timeline_id: proto.timeline_id,
328            database: proto.database,
329        })
330    }
331}
332
333impl AlterCompatible for PostgresSourcePublicationDetails {
334    fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
335        let PostgresSourcePublicationDetails {
336            slot,
337            timeline_id,
338            database,
339        } = self;
340
341        let compatibility_checks = [
342            (slot == &other.slot, "slot"),
343            (
344                match (timeline_id, &other.timeline_id) {
345                    (Some(curr_id), Some(new_id)) => curr_id == new_id,
346                    (None, Some(_)) => true,
347                    // New values must always have timeline ID
348                    (_, None) => false,
349                },
350                "timeline_id",
351            ),
352            (database == &other.database, "database"),
353        ];
354
355        for (compatible, field) in compatibility_checks {
356            if !compatible {
357                tracing::warn!(
358                    "PostgresSourcePublicationDetails incompatible at {field}:\nself:\n{:#?}\n\nother\n{:#?}",
359                    self,
360                    other
361                );
362
363                return Err(AlterError { id });
364            }
365        }
366
367        Ok(())
368    }
369}