mz_storage_types/sources/
postgres.rs1use mz_expr::MirScalarExpr;
13use mz_postgres_util::desc::PostgresTableDesc;
14use mz_proto::{IntoRustIfSome, RustType, TryFromProtoError};
15use mz_repr::{CatalogItemId, GlobalId, RelationDesc, ScalarType};
16use proptest::prelude::any;
17use proptest_derive::Arbitrary;
18use serde::{Deserialize, Serialize};
19use std::sync::LazyLock;
20
21use crate::AlterCompatible;
22use crate::connections::inline::{
23 ConnectionAccess, ConnectionResolver, InlinedConnection, IntoInlineConnection,
24 ReferencedConnection,
25};
26use crate::controller::AlterError;
27use crate::sources::{MzOffset, SourceConnection};
28
29use super::SourceExportDetails;
30
31include!(concat!(
32 env!("OUT_DIR"),
33 "/mz_storage_types.sources.postgres.rs"
34));
35
36#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize, Arbitrary)]
37pub struct PostgresSourceConnection<C: ConnectionAccess = InlinedConnection> {
38 pub connection_id: CatalogItemId,
39 pub connection: C::Pg,
40 pub publication: String,
41 pub publication_details: PostgresSourcePublicationDetails,
42}
43
44impl<R: ConnectionResolver> IntoInlineConnection<PostgresSourceConnection, R>
45 for PostgresSourceConnection<ReferencedConnection>
46{
47 fn into_inline_connection(self, r: R) -> PostgresSourceConnection {
48 let PostgresSourceConnection {
49 connection_id,
50 connection,
51 publication,
52 publication_details,
53 } = self;
54
55 PostgresSourceConnection {
56 connection_id,
57 connection: r.resolve_connection(connection).unwrap_pg(),
58 publication,
59 publication_details,
60 }
61 }
62}
63
64pub static PG_PROGRESS_DESC: LazyLock<RelationDesc> = LazyLock::new(|| {
65 RelationDesc::builder()
66 .with_column("lsn", ScalarType::UInt64.nullable(true))
67 .finish()
68});
69
70impl PostgresSourceConnection {
71 pub async fn fetch_write_frontier(
72 self,
73 storage_configuration: &crate::configuration::StorageConfiguration,
74 ) -> Result<timely::progress::Antichain<MzOffset>, anyhow::Error> {
75 use timely::progress::Antichain;
76
77 let config = self
78 .connection
79 .config(
80 &storage_configuration.connection_context.secrets_reader,
81 storage_configuration,
82 mz_ore::future::InTask::No,
83 )
84 .await?;
85 let client = config
86 .connect(
87 "postgres_wal_lsn",
88 &storage_configuration.connection_context.ssh_tunnel_manager,
89 )
90 .await?;
91
92 let lsn = mz_postgres_util::get_current_wal_lsn(&client).await?;
93
94 let current_upper = Antichain::from_elem(MzOffset::from(u64::from(lsn)));
95 Ok(current_upper)
96 }
97}
98
99impl<C: ConnectionAccess> SourceConnection for PostgresSourceConnection<C> {
100 fn name(&self) -> &'static str {
101 "postgres"
102 }
103
104 fn external_reference(&self) -> Option<&str> {
105 None
106 }
107
108 fn default_key_desc(&self) -> RelationDesc {
109 RelationDesc::empty()
110 }
111
112 fn default_value_desc(&self) -> RelationDesc {
113 RelationDesc::empty()
116 }
117
118 fn timestamp_desc(&self) -> RelationDesc {
119 PG_PROGRESS_DESC.clone()
120 }
121
122 fn connection_id(&self) -> Option<CatalogItemId> {
123 Some(self.connection_id)
124 }
125
126 fn primary_export_details(&self) -> SourceExportDetails {
127 SourceExportDetails::None
128 }
129
130 fn supports_read_only(&self) -> bool {
131 false
132 }
133
134 fn prefers_single_replica(&self) -> bool {
135 true
136 }
137}
138
139impl<C: ConnectionAccess> AlterCompatible for PostgresSourceConnection<C> {
140 fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
141 if self == other {
142 return Ok(());
143 }
144
145 let PostgresSourceConnection {
146 connection_id,
147 connection,
148 publication,
149 publication_details,
150 } = self;
151
152 let compatibility_checks = [
153 (connection_id == &other.connection_id, "connection_id"),
154 (
155 connection.alter_compatible(id, &other.connection).is_ok(),
156 "connection",
157 ),
158 (publication == &other.publication, "publication"),
159 (
160 publication_details
161 .alter_compatible(id, &other.publication_details)
162 .is_ok(),
163 "publication_details",
164 ),
165 ];
166
167 for (compatible, field) in compatibility_checks {
168 if !compatible {
169 tracing::warn!(
170 "PostgresSourceConnection incompatible at {field}:\nself:\n{:#?}\n\nother\n{:#?}",
171 self,
172 other
173 );
174
175 return Err(AlterError { id });
176 }
177 }
178
179 Ok(())
180 }
181}
182
183#[derive(Copy, Clone, Debug, Eq, PartialEq, Serialize, Deserialize, Arbitrary)]
184pub enum CastType {
185 Natural,
187 Text,
189}
190
191impl RustType<ProtoCastType> for CastType {
192 fn into_proto(&self) -> ProtoCastType {
193 use proto_cast_type::Kind::*;
194 ProtoCastType {
195 kind: Some(match self {
196 CastType::Natural => Natural(()),
197 CastType::Text => Text(()),
198 }),
199 }
200 }
201
202 fn from_proto(proto: ProtoCastType) -> Result<Self, TryFromProtoError> {
203 use proto_cast_type::Kind::*;
204 Ok(match proto.kind {
205 Some(Natural(())) => CastType::Natural,
206 Some(Text(())) => CastType::Text,
207 None => {
208 return Err(TryFromProtoError::missing_field(
209 "ProtoWindowFrameUnits::kind",
210 ));
211 }
212 })
213 }
214}
215
216impl RustType<ProtoPostgresSourceConnection> for PostgresSourceConnection {
217 fn into_proto(&self) -> ProtoPostgresSourceConnection {
218 ProtoPostgresSourceConnection {
219 connection: Some(self.connection.into_proto()),
220 connection_id: Some(self.connection_id.into_proto()),
221 publication: self.publication.clone(),
222 details: Some(self.publication_details.into_proto()),
223 }
224 }
225
226 fn from_proto(proto: ProtoPostgresSourceConnection) -> Result<Self, TryFromProtoError> {
227 Ok(PostgresSourceConnection {
228 connection: proto
229 .connection
230 .into_rust_if_some("ProtoPostgresSourceConnection::connection")?,
231 connection_id: proto
232 .connection_id
233 .into_rust_if_some("ProtoPostgresSourceConnection::connection_id")?,
234 publication: proto.publication,
235 publication_details: proto
236 .details
237 .into_rust_if_some("ProtoPostgresSourceConnection::details")?,
238 })
239 }
240}
241
242#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize, Arbitrary)]
244pub struct PostgresSourceExportDetails {
245 #[proptest(
248 strategy = "proptest::collection::vec((any::<CastType>(), any::<MirScalarExpr>()), 0..4)"
249 )]
250 pub column_casts: Vec<(CastType, MirScalarExpr)>,
251 pub table: PostgresTableDesc,
252}
253
254impl AlterCompatible for PostgresSourceExportDetails {
255 fn alter_compatible(&self, _id: GlobalId, _other: &Self) -> Result<(), AlterError> {
256 let Self {
259 column_casts: _,
260 table: _,
261 } = self;
262 Ok(())
263 }
264}
265
266impl RustType<ProtoPostgresSourceExportDetails> for PostgresSourceExportDetails {
267 fn into_proto(&self) -> ProtoPostgresSourceExportDetails {
268 let mut column_casts = Vec::with_capacity(self.column_casts.len());
269
270 for (col_type, cast) in self.column_casts.iter() {
271 column_casts.push(ProtoPostgresColumnCast {
272 cast: Some(cast.into_proto()),
273 cast_type: Some(col_type.into_proto()),
274 });
275 }
276
277 ProtoPostgresSourceExportDetails {
278 table: Some(self.table.into_proto()),
279 column_casts,
280 }
281 }
282
283 fn from_proto(proto: ProtoPostgresSourceExportDetails) -> Result<Self, TryFromProtoError> {
284 let mut column_casts = vec![];
285 for column_cast in proto.column_casts.into_iter() {
286 column_casts.push((
287 column_cast
288 .cast_type
289 .into_rust_if_some("ProtoPostgresColumnCast::cast_type")?,
290 column_cast
291 .cast
292 .into_rust_if_some("ProtoPostgresColumnCast::cast")?,
293 ));
294 }
295
296 Ok(PostgresSourceExportDetails {
297 table: proto
298 .table
299 .into_rust_if_some("ProtoPostgresSourceExportDetails::table")?,
300 column_casts,
301 })
302 }
303}
304
305#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize, Arbitrary)]
306pub struct PostgresSourcePublicationDetails {
307 pub slot: String,
308 pub timeline_id: Option<u64>,
312 pub database: String,
313}
314
315impl RustType<ProtoPostgresSourcePublicationDetails> for PostgresSourcePublicationDetails {
316 fn into_proto(&self) -> ProtoPostgresSourcePublicationDetails {
317 ProtoPostgresSourcePublicationDetails {
318 slot: self.slot.clone(),
319 timeline_id: self.timeline_id.clone(),
320 database: self.database.clone(),
321 }
322 }
323
324 fn from_proto(proto: ProtoPostgresSourcePublicationDetails) -> Result<Self, TryFromProtoError> {
325 Ok(PostgresSourcePublicationDetails {
326 slot: proto.slot,
327 timeline_id: proto.timeline_id,
328 database: proto.database,
329 })
330 }
331}
332
333impl AlterCompatible for PostgresSourcePublicationDetails {
334 fn alter_compatible(&self, id: GlobalId, other: &Self) -> Result<(), AlterError> {
335 let PostgresSourcePublicationDetails {
336 slot,
337 timeline_id,
338 database,
339 } = self;
340
341 let compatibility_checks = [
342 (slot == &other.slot, "slot"),
343 (
344 match (timeline_id, &other.timeline_id) {
345 (Some(curr_id), Some(new_id)) => curr_id == new_id,
346 (None, Some(_)) => true,
347 (_, None) => false,
349 },
350 "timeline_id",
351 ),
352 (database == &other.database, "database"),
353 ];
354
355 for (compatible, field) in compatibility_checks {
356 if !compatible {
357 tracing::warn!(
358 "PostgresSourcePublicationDetails incompatible at {field}:\nself:\n{:#?}\n\nother\n{:#?}",
359 self,
360 other
361 );
362
363 return Err(AlterError { id });
364 }
365 }
366
367 Ok(())
368 }
369}