mz_storage_types/sources/
postgres.rs1use mz_postgres_util::desc::PostgresTableDesc;
13use mz_proto::{RustType, TryFromProtoError};
14use mz_repr::{CatalogItemId, GlobalId, RelationDesc, SqlScalarType};
15use serde::{Deserialize, Serialize};
16use std::sync::LazyLock;
17
18use crate::AlterCompatible;
19use crate::connections::inline::{
20 ConnectionAccess, ConnectionResolver, InlinedConnection, IntoInlineConnection,
21 ReferencedConnection,
22};
23use crate::controller::AlterError;
24use crate::sources::{MzOffset, SourceConnection};
25
26include!(concat!(
27 env!("OUT_DIR"),
28 "/mz_storage_types.sources.postgres.rs"
29));
30
31#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
32pub struct PostgresSourceConnection<C: ConnectionAccess = InlinedConnection> {
33 pub connection_id: CatalogItemId,
34 pub connection: C::Pg,
35 pub publication: String,
36 pub publication_details: PostgresSourcePublicationDetails,
37}
38
39impl<R: ConnectionResolver> IntoInlineConnection<PostgresSourceConnection, R>
40 for PostgresSourceConnection<ReferencedConnection>
41{
42 fn into_inline_connection(self, r: R) -> PostgresSourceConnection {
43 let PostgresSourceConnection {
44 connection_id,
45 connection,
46 publication,
47 publication_details,
48 } = self;
49
50 PostgresSourceConnection {
51 connection_id,
52 connection: r.resolve_connection(connection).unwrap_pg(),
53 publication,
54 publication_details,
55 }
56 }
57}
58
59pub static PG_PROGRESS_DESC: LazyLock<RelationDesc> = LazyLock::new(|| {
60 RelationDesc::builder()
61 .with_column("lsn", SqlScalarType::UInt64.nullable(true))
62 .finish()
63});
64
65impl PostgresSourceConnection {
66 pub async fn fetch_write_frontier(
67 self,
68 storage_configuration: &crate::configuration::StorageConfiguration,
69 ) -> Result<timely::progress::Antichain<MzOffset>, anyhow::Error> {
70 use timely::progress::Antichain;
71
72 let config = self
73 .connection
74 .config(
75 &storage_configuration.connection_context.secrets_reader,
76 storage_configuration,
77 mz_ore::future::InTask::No,
78 )
79 .await?;
80 let client = config
81 .connect(
82 "postgres_wal_lsn",
83 &storage_configuration.connection_context.ssh_tunnel_manager,
84 )
85 .await?;
86
87 let lsn = mz_postgres_util::get_current_wal_lsn(&client).await?;
88
89 let current_upper = Antichain::from_elem(MzOffset::from(u64::from(lsn)));
90 Ok(current_upper)
91 }
92}
93
94impl<C: ConnectionAccess> SourceConnection for PostgresSourceConnection<C> {
95 fn name(&self) -> &'static str {
96 "postgres"
97 }
98
99 fn external_reference(&self) -> Option<&str> {
100 None
101 }
102
103 fn default_key_desc(&self) -> RelationDesc {
104 RelationDesc::empty()
105 }
106
107 fn default_value_desc(&self) -> RelationDesc {
108 RelationDesc::empty()
111 }
112
113 fn timestamp_desc(&self) -> RelationDesc {
114 PG_PROGRESS_DESC.clone()
115 }
116
117 fn connection_id(&self) -> Option<CatalogItemId> {
118 Some(self.connection_id)
119 }
120
121 fn supports_read_only(&self) -> bool {
122 false
123 }
124
125 fn prefers_single_replica(&self) -> bool {
126 true
127 }
128}
129
130impl<C: ConnectionAccess> AlterCompatible for PostgresSourceConnection<C> {
131 fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
132 if self == other {
133 return Ok(());
134 }
135
136 let PostgresSourceConnection {
137 connection_id,
138 connection,
139 publication,
140 publication_details,
141 } = self;
142
143 let compatibility_checks = [
144 (connection_id == &other.connection_id, "connection_id"),
145 (
146 connection.alter_compatible(id, &other.connection).is_ok(),
147 "connection",
148 ),
149 (publication == &other.publication, "publication"),
150 (
151 publication_details
152 .alter_compatible(id, &other.publication_details)
153 .is_ok(),
154 "publication_details",
155 ),
156 ];
157
158 for (compatible, field) in compatibility_checks {
159 if !compatible {
160 tracing::warn!(
161 "PostgresSourceConnection incompatible at {field}:\nself:\n{:#?}\n\nother\n{:#?}",
162 self,
163 other
164 );
165
166 return Err(AlterError { id });
167 }
168 }
169
170 Ok(())
171 }
172}
173
174#[derive(Copy, Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
175pub enum CastType {
176 Natural,
178 Text,
180}
181
182#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
184pub struct PostgresSourceExportDetails {
185 pub column_casts: Vec<(CastType, crate::sources::casts::StorageScalarExpr)>,
188 pub table: PostgresTableDesc,
189}
190
191impl AlterCompatible for PostgresSourceExportDetails {
192 fn alter_compatible(&self, _id: GlobalId, _other: &Self) -> Result<(), AlterError> {
193 let Self {
196 column_casts: _,
197 table: _,
198 } = self;
199 Ok(())
200 }
201}
202
203#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize)]
204pub struct PostgresSourcePublicationDetails {
205 pub slot: String,
206 pub timeline_id: Option<u64>,
210 pub database: String,
211}
212
213impl RustType<ProtoPostgresSourcePublicationDetails> for PostgresSourcePublicationDetails {
214 fn into_proto(&self) -> ProtoPostgresSourcePublicationDetails {
215 ProtoPostgresSourcePublicationDetails {
216 slot: self.slot.clone(),
217 timeline_id: self.timeline_id.clone(),
218 database: self.database.clone(),
219 }
220 }
221
222 fn from_proto(proto: ProtoPostgresSourcePublicationDetails) -> Result<Self, TryFromProtoError> {
223 Ok(PostgresSourcePublicationDetails {
224 slot: proto.slot,
225 timeline_id: proto.timeline_id,
226 database: proto.database,
227 })
228 }
229}
230
231impl AlterCompatible for PostgresSourcePublicationDetails {
232 fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
233 let PostgresSourcePublicationDetails {
234 slot,
235 timeline_id,
236 database,
237 } = self;
238
239 let compatibility_checks = [
240 (slot == &other.slot, "slot"),
241 (
242 match (timeline_id, &other.timeline_id) {
243 (Some(curr_id), Some(new_id)) => curr_id == new_id,
244 (None, Some(_)) => true,
245 (_, None) => false,
247 },
248 "timeline_id",
249 ),
250 (database == &other.database, "database"),
251 ];
252
253 for (compatible, field) in compatibility_checks {
254 if !compatible {
255 tracing::warn!(
256 "PostgresSourcePublicationDetails incompatible at {field}:\nself:\n{:#?}\n\nother\n{:#?}",
257 self,
258 other
259 );
260
261 return Err(AlterError { id });
262 }
263 }
264
265 Ok(())
266 }
267}