Skip to main content

mz_storage_types/sources/
postgres.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Types related to postgres sources
11
12use mz_postgres_util::desc::PostgresTableDesc;
13use mz_proto::{RustType, TryFromProtoError};
14use mz_repr::{CatalogItemId, GlobalId, RelationDesc, SqlScalarType};
15use serde::{Deserialize, Serialize};
16use std::sync::LazyLock;
17
18use crate::AlterCompatible;
19use crate::connections::inline::{
20    ConnectionAccess, ConnectionResolver, InlinedConnection, IntoInlineConnection,
21    ReferencedConnection,
22};
23use crate::controller::AlterError;
24use crate::sources::{MzOffset, SourceConnection};
25
26include!(concat!(
27    env!("OUT_DIR"),
28    "/mz_storage_types.sources.postgres.rs"
29));
30
31#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
32pub struct PostgresSourceConnection<C: ConnectionAccess = InlinedConnection> {
33    pub connection_id: CatalogItemId,
34    pub connection: C::Pg,
35    pub publication: String,
36    pub publication_details: PostgresSourcePublicationDetails,
37}
38
39impl<R: ConnectionResolver> IntoInlineConnection<PostgresSourceConnection, R>
40    for PostgresSourceConnection<ReferencedConnection>
41{
42    fn into_inline_connection(self, r: R) -> PostgresSourceConnection {
43        let PostgresSourceConnection {
44            connection_id,
45            connection,
46            publication,
47            publication_details,
48        } = self;
49
50        PostgresSourceConnection {
51            connection_id,
52            connection: r.resolve_connection(connection).unwrap_pg(),
53            publication,
54            publication_details,
55        }
56    }
57}
58
59pub static PG_PROGRESS_DESC: LazyLock<RelationDesc> = LazyLock::new(|| {
60    RelationDesc::builder()
61        .with_column("lsn", SqlScalarType::UInt64.nullable(true))
62        .finish()
63});
64
65impl PostgresSourceConnection {
66    pub async fn fetch_write_frontier(
67        self,
68        storage_configuration: &crate::configuration::StorageConfiguration,
69    ) -> Result<timely::progress::Antichain<MzOffset>, anyhow::Error> {
70        use timely::progress::Antichain;
71
72        let config = self
73            .connection
74            .config(
75                &storage_configuration.connection_context.secrets_reader,
76                storage_configuration,
77                mz_ore::future::InTask::No,
78            )
79            .await?;
80        let client = config
81            .connect(
82                "postgres_wal_lsn",
83                &storage_configuration.connection_context.ssh_tunnel_manager,
84            )
85            .await?;
86
87        let lsn = mz_postgres_util::get_current_wal_lsn(&client).await?;
88
89        let current_upper = Antichain::from_elem(MzOffset::from(u64::from(lsn)));
90        Ok(current_upper)
91    }
92}
93
94impl<C: ConnectionAccess> SourceConnection for PostgresSourceConnection<C> {
95    fn name(&self) -> &'static str {
96        "postgres"
97    }
98
99    fn external_reference(&self) -> Option<&str> {
100        None
101    }
102
103    fn default_key_desc(&self) -> RelationDesc {
104        RelationDesc::empty()
105    }
106
107    fn default_value_desc(&self) -> RelationDesc {
108        // The postgres source only outputs data to its subsources. The catalog object
109        // representing the source itself is just an empty relation with no columns
110        RelationDesc::empty()
111    }
112
113    fn timestamp_desc(&self) -> RelationDesc {
114        PG_PROGRESS_DESC.clone()
115    }
116
117    fn connection_id(&self) -> Option<CatalogItemId> {
118        Some(self.connection_id)
119    }
120
121    fn supports_read_only(&self) -> bool {
122        false
123    }
124
125    fn prefers_single_replica(&self) -> bool {
126        true
127    }
128}
129
130impl<C: ConnectionAccess> AlterCompatible for PostgresSourceConnection<C> {
131    fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
132        if self == other {
133            return Ok(());
134        }
135
136        let PostgresSourceConnection {
137            connection_id,
138            connection,
139            publication,
140            publication_details,
141        } = self;
142
143        let compatibility_checks = [
144            (connection_id == &other.connection_id, "connection_id"),
145            (
146                connection.alter_compatible(id, &other.connection).is_ok(),
147                "connection",
148            ),
149            (publication == &other.publication, "publication"),
150            (
151                publication_details
152                    .alter_compatible(id, &other.publication_details)
153                    .is_ok(),
154                "publication_details",
155            ),
156        ];
157
158        for (compatible, field) in compatibility_checks {
159            if !compatible {
160                tracing::warn!(
161                    "PostgresSourceConnection incompatible at {field}:\nself:\n{:#?}\n\nother\n{:#?}",
162                    self,
163                    other
164                );
165
166                return Err(AlterError { id });
167            }
168        }
169
170        Ok(())
171    }
172}
173
174#[derive(Copy, Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
175pub enum CastType {
176    /// This cast is corresponds to its type in the upstream system.
177    Natural,
178    /// This cast was generated with the `TEXT COLUMNS` option.
179    Text,
180}
181
182/// The details of a source export from a postgres source.
183#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
184pub struct PostgresSourceExportDetails {
185    /// The cast expressions to convert the incoming string encoded rows to
186    /// their target types
187    pub column_casts: Vec<(CastType, crate::sources::casts::StorageScalarExpr)>,
188    pub table: PostgresTableDesc,
189}
190
191impl AlterCompatible for PostgresSourceExportDetails {
192    fn alter_compatible(&self, _id: GlobalId, _other: &Self) -> Result<(), AlterError> {
193        // compatibility checks are performed against the upstream table in the source
194        // render operators instead
195        let Self {
196            column_casts: _,
197            table: _,
198        } = self;
199        Ok(())
200    }
201}
202
203#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
204pub struct PostgresSourcePublicationDetails {
205    pub slot: String,
206    /// The active timeline_id when this source was created
207    /// The None value indicates an unknown timeline, to account for sources that existed
208    /// prior to this field being introduced
209    pub timeline_id: Option<u64>,
210    pub database: String,
211}
212
213impl RustType<ProtoPostgresSourcePublicationDetails> for PostgresSourcePublicationDetails {
214    fn into_proto(&self) -> ProtoPostgresSourcePublicationDetails {
215        ProtoPostgresSourcePublicationDetails {
216            slot: self.slot.clone(),
217            timeline_id: self.timeline_id.clone(),
218            database: self.database.clone(),
219        }
220    }
221
222    fn from_proto(proto: ProtoPostgresSourcePublicationDetails) -> Result<Self, TryFromProtoError> {
223        Ok(PostgresSourcePublicationDetails {
224            slot: proto.slot,
225            timeline_id: proto.timeline_id,
226            database: proto.database,
227        })
228    }
229}
230
231impl AlterCompatible for PostgresSourcePublicationDetails {
232    fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
233        let PostgresSourcePublicationDetails {
234            slot,
235            timeline_id,
236            database,
237        } = self;
238
239        let compatibility_checks = [
240            (slot == &other.slot, "slot"),
241            (
242                match (timeline_id, &other.timeline_id) {
243                    (Some(curr_id), Some(new_id)) => curr_id == new_id,
244                    (None, Some(_)) => true,
245                    // New values must always have timeline ID
246                    (_, None) => false,
247                },
248                "timeline_id",
249            ),
250            (database == &other.database, "database"),
251        ];
252
253        for (compatible, field) in compatibility_checks {
254            if !compatible {
255                tracing::warn!(
256                    "PostgresSourcePublicationDetails incompatible at {field}:\nself:\n{:#?}\n\nother\n{:#?}",
257                    self,
258                    other
259                );
260
261                return Err(AlterError { id });
262            }
263        }
264
265        Ok(())
266    }
267}