1use std::collections::BTreeMap;
84use std::rc::Rc;
85use std::time::Duration;
86
87use differential_dataflow::AsCollection;
88use itertools::Itertools as _;
89use mz_expr::{EvalError, MirScalarExpr};
90use mz_ore::cast::CastFrom;
91use mz_ore::error::ErrorExt;
92use mz_postgres_util::desc::PostgresTableDesc;
93use mz_postgres_util::{Client, PostgresError, simple_query_opt};
94use mz_repr::{Datum, Diff, GlobalId, Row};
95use mz_sql_parser::ast::Ident;
96use mz_sql_parser::ast::display::AstDisplay;
97use mz_storage_types::errors::{DataflowError, SourceError, SourceErrorDetails};
98use mz_storage_types::sources::postgres::CastType;
99use mz_storage_types::sources::{
100 MzOffset, PostgresSourceConnection, SourceExport, SourceExportDetails, SourceTimestamp,
101};
102use mz_timely_util::builder_async::PressOnDropButton;
103use serde::{Deserialize, Serialize};
104use timely::container::CapacityContainerBuilder;
105use timely::dataflow::operators::Concat;
106use timely::dataflow::operators::core::Partition;
107use timely::dataflow::operators::vec::{Map, ToStream};
108use timely::dataflow::{Scope, StreamVec};
109use timely::progress::Antichain;
110use tokio_postgres::error::SqlState;
111use tokio_postgres::types::PgLsn;
112
113use crate::healthcheck::{HealthStatusMessage, HealthStatusUpdate, StatusNamespace};
114use crate::source::types::{Probe, SourceRender, StackedCollection};
115use crate::source::{RawSourceCreationConfig, SourceMessage};
116
117mod replication;
118mod snapshot;
119
120impl SourceRender for PostgresSourceConnection {
121 type Time = MzOffset;
122
123 const STATUS_NAMESPACE: StatusNamespace = StatusNamespace::Postgres;
124
125 fn render<G: Scope<Timestamp = MzOffset>>(
128 self,
129 scope: &mut G,
130 config: &RawSourceCreationConfig,
131 resume_uppers: impl futures::Stream<Item = Antichain<MzOffset>> + 'static,
132 _start_signal: impl std::future::Future<Output = ()> + 'static,
133 ) -> (
134 BTreeMap<GlobalId, StackedCollection<G, Result<SourceMessage, DataflowError>>>,
135 StreamVec<G, HealthStatusMessage>,
136 StreamVec<G, Probe<MzOffset>>,
137 Vec<PressOnDropButton>,
138 ) {
139 let mut table_info = BTreeMap::new();
141 for (idx, (id, export)) in config.source_exports.iter().enumerate() {
142 let SourceExport {
143 details,
144 storage_metadata: _,
145 data_config: _,
146 } = export;
147 let details = match details {
148 SourceExportDetails::Postgres(details) => details,
149 SourceExportDetails::None => continue,
151 _ => panic!("unexpected source export details: {:?}", details),
152 };
153 let desc = details.table.clone();
154 let casts = details.column_casts.clone();
155 let resume_upper = Antichain::from_iter(
156 config
157 .source_resume_uppers
158 .get(id)
159 .expect("all source exports must be present in source resume uppers")
160 .iter()
161 .map(MzOffset::decode_row),
162 );
163 let output = SourceOutputInfo {
164 desc,
165 projection: None,
166 casts,
167 resume_upper,
168 export_id: id.clone(),
169 };
170 table_info
171 .entry(output.desc.oid)
172 .or_insert_with(BTreeMap::new)
173 .insert(idx, output);
174 }
175
176 let metrics = config.metrics.get_postgres_source_metrics(config.id);
177
178 let (snapshot_updates, rewinds, slot_ready, snapshot_err, snapshot_token) =
179 snapshot::render(
180 scope.clone(),
181 config.clone(),
182 self.clone(),
183 table_info.clone(),
184 metrics.snapshot_metrics.clone(),
185 );
186
187 let (repl_updates, probe_stream, repl_err, repl_token) = replication::render(
188 scope.clone(),
189 config.clone(),
190 self,
191 table_info,
192 rewinds,
193 slot_ready,
194 resume_uppers,
195 metrics,
196 );
197
198 let updates = snapshot_updates.concat(repl_updates);
199 let partition_count = u64::cast_from(config.source_exports.len());
200 let data_streams: Vec<_> = updates
201 .inner
202 .partition::<CapacityContainerBuilder<_>, _, _>(
203 partition_count,
204 |((output, data), time, diff): &(
205 (usize, Result<SourceMessage, DataflowError>),
206 MzOffset,
207 Diff,
208 )| {
209 let output = u64::cast_from(*output);
210 (output, (data.clone(), time.clone(), diff.clone()))
211 },
212 );
213 let mut data_collections = BTreeMap::new();
214 for (id, data_stream) in config.source_exports.keys().zip_eq(data_streams) {
215 data_collections.insert(*id, data_stream.as_collection());
216 }
217
218 let export_ids = config.source_exports.keys().copied();
219 let health_init = export_ids
220 .map(Some)
221 .chain(std::iter::once(None))
222 .map(|id| HealthStatusMessage {
223 id,
224 namespace: Self::STATUS_NAMESPACE,
225 update: HealthStatusUpdate::Running,
226 })
227 .collect::<Vec<_>>()
228 .to_stream(scope);
229
230 let errs = snapshot_err.concat(repl_err).map(move |err| {
234 let err_string = err.display_with_causes().to_string();
236 let update = HealthStatusUpdate::halting(err_string.clone(), None);
237
238 let namespace = match err {
239 ReplicationError::Transient(err)
240 if matches!(
241 &*err,
242 TransientError::PostgresError(PostgresError::Ssh(_))
243 | TransientError::PostgresError(PostgresError::SshIo(_))
244 ) =>
245 {
246 StatusNamespace::Ssh
247 }
248 _ => Self::STATUS_NAMESPACE,
249 };
250
251 HealthStatusMessage {
252 id: None,
253 namespace: namespace.clone(),
254 update,
255 }
256 });
257
258 let health = health_init.concat(errs);
259
260 (
261 data_collections,
262 health,
263 probe_stream,
264 vec![snapshot_token, repl_token],
265 )
266 }
267}
268
269#[derive(Clone, Debug)]
270struct SourceOutputInfo {
271 desc: PostgresTableDesc,
273 projection: Option<Vec<usize>>,
277 casts: Vec<(CastType, MirScalarExpr)>,
278 resume_upper: Antichain<MzOffset>,
279 export_id: GlobalId,
280}
281
282#[derive(Clone, Debug, thiserror::Error)]
283pub enum ReplicationError {
284 #[error(transparent)]
285 Transient(#[from] Rc<TransientError>),
286 #[error(transparent)]
287 Definite(#[from] Rc<DefiniteError>),
288}
289
290#[derive(Debug, thiserror::Error)]
292pub enum TransientError {
293 #[error("replication slot mysteriously missing")]
294 MissingReplicationSlot,
295 #[error(
296 "slot overcompacted. Requested LSN {requested_lsn} but only LSNs >= {available_lsn} are available"
297 )]
298 OvercompactedReplicationSlot {
299 requested_lsn: MzOffset,
300 available_lsn: MzOffset,
301 },
302 #[error("replication slot already exists")]
303 ReplicationSlotAlreadyExists,
304 #[error("stream ended prematurely")]
305 ReplicationEOF,
306 #[error("unexpected replication message")]
307 UnknownReplicationMessage,
308 #[error("unexpected logical replication message")]
309 UnknownLogicalReplicationMessage,
310 #[error("received replication event outside of transaction")]
311 BareTransactionEvent,
312 #[error("lsn mismatch between BEGIN and COMMIT")]
313 InvalidTransaction,
314 #[error("BEGIN within existing BEGIN stream")]
315 NestedTransaction,
316 #[error("recoverable errors should crash the process during snapshots")]
317 SyntheticError,
318 #[error("sql client error")]
319 SQLClient(#[from] tokio_postgres::Error),
320 #[error(transparent)]
321 PostgresError(#[from] PostgresError),
322 #[error(transparent)]
323 Generic(#[from] anyhow::Error),
324}
325
326#[derive(Debug, Clone, Serialize, Deserialize, thiserror::Error)]
328pub enum DefiniteError {
329 #[error("slot compacted past snapshot point. snapshot consistent point={0} resume_lsn={1}")]
330 SlotCompactedPastResumePoint(MzOffset, MzOffset),
331 #[error("table was truncated")]
332 TableTruncated,
333 #[error("table was dropped")]
334 TableDropped,
335 #[error("publication {0:?} does not exist")]
336 PublicationDropped(String),
337 #[error("replication slot has been invalidated because it exceeded the maximum reserved size")]
338 InvalidReplicationSlot,
339 #[error("unexpected number of columns while parsing COPY output")]
340 MissingColumn,
341 #[error("failed to parse COPY protocol")]
342 InvalidCopyInput,
343 #[error(
344 "unsupported action: database restored from point-in-time backup. Expected timeline ID {expected} but got {actual}"
345 )]
346 InvalidTimelineId { expected: u64, actual: u64 },
347 #[error(
348 "TOASTed value missing from old row. Did you forget to set REPLICA IDENTITY to FULL for your table?"
349 )]
350 MissingToast,
351 #[error(
352 "old row missing from replication stream. Did you forget to set REPLICA IDENTITY to FULL for your table?"
353 )]
354 DefaultReplicaIdentity,
355 #[error("incompatible schema change: {0}")]
356 IncompatibleSchema(String),
358 #[error("invalid UTF8 string: {0:?}")]
359 InvalidUTF8(Vec<u8>),
360 #[error("failed to cast raw column: {0}")]
361 CastError(#[source] EvalError),
362 #[error("unexpected binary data in replication stream")]
363 UnexpectedBinaryData,
364}
365
366impl From<DefiniteError> for DataflowError {
367 fn from(err: DefiniteError) -> Self {
368 let m = err.to_string().into();
369 DataflowError::SourceError(Box::new(SourceError {
370 error: match &err {
371 DefiniteError::SlotCompactedPastResumePoint(_, _) => SourceErrorDetails::Other(m),
372 DefiniteError::TableTruncated => SourceErrorDetails::Other(m),
373 DefiniteError::TableDropped => SourceErrorDetails::Other(m),
374 DefiniteError::PublicationDropped(_) => SourceErrorDetails::Initialization(m),
375 DefiniteError::InvalidReplicationSlot => SourceErrorDetails::Initialization(m),
376 DefiniteError::MissingColumn => SourceErrorDetails::Other(m),
377 DefiniteError::InvalidCopyInput => SourceErrorDetails::Other(m),
378 DefiniteError::InvalidTimelineId { .. } => SourceErrorDetails::Initialization(m),
379 DefiniteError::MissingToast => SourceErrorDetails::Other(m),
380 DefiniteError::DefaultReplicaIdentity => SourceErrorDetails::Other(m),
381 DefiniteError::IncompatibleSchema(_) => SourceErrorDetails::Other(m),
382 DefiniteError::InvalidUTF8(_) => SourceErrorDetails::Other(m),
383 DefiniteError::CastError(_) => SourceErrorDetails::Other(m),
384 DefiniteError::UnexpectedBinaryData => SourceErrorDetails::Other(m),
385 },
386 }))
387 }
388}
389
390async fn ensure_replication_slot(client: &Client, slot: &str) -> Result<(), TransientError> {
391 let slot = Ident::new_unchecked(slot).to_ast_string_simple();
393 let query = format!("CREATE_REPLICATION_SLOT {slot} LOGICAL \"pgoutput\" NOEXPORT_SNAPSHOT");
394 match simple_query_opt(client, &query).await {
395 Ok(_) => Ok(()),
396 Err(PostgresError::Postgres(err)) if err.code() == Some(&SqlState::DUPLICATE_OBJECT) => {
398 tracing::trace!("replication slot {slot} already existed");
399 Ok(())
400 }
401 Err(err) => Err(TransientError::PostgresError(err)),
402 }
403}
404
405struct SlotMetadata {
407 active_pid: Option<i32>,
410 confirmed_flush_lsn: MzOffset,
413}
414
415async fn fetch_slot_metadata(
417 client: &Client,
418 slot: &str,
419 interval: Duration,
420) -> Result<SlotMetadata, TransientError> {
421 loop {
422 let query = "SELECT active_pid, confirmed_flush_lsn
423 FROM pg_replication_slots WHERE slot_name = $1";
424 let Some(row) = client.query_opt(query, &[&slot]).await? else {
425 return Err(TransientError::MissingReplicationSlot);
426 };
427
428 match row.get::<_, Option<PgLsn>>("confirmed_flush_lsn") {
429 Some(lsn) => {
433 return Ok(SlotMetadata {
434 confirmed_flush_lsn: MzOffset::from(lsn),
435 active_pid: row.get("active_pid"),
436 });
437 }
438 None => tokio::time::sleep(interval).await,
442 };
443 }
444}
445
446async fn fetch_max_lsn(client: &Client) -> Result<MzOffset, TransientError> {
448 let query = "SELECT pg_current_wal_lsn()";
449 let row = simple_query_opt(client, query).await?;
450
451 match row.and_then(|row| {
452 row.get("pg_current_wal_lsn")
453 .map(|lsn| lsn.parse::<PgLsn>().unwrap())
454 }) {
455 Some(lsn) => Ok(MzOffset::from(lsn)),
460 None => Err(TransientError::Generic(anyhow::anyhow!(
461 "pg_current_wal_lsn() mysteriously has no value"
462 ))),
463 }
464}
465
466fn verify_schema(
469 oid: u32,
470 info: &SourceOutputInfo,
471 upstream_info: &BTreeMap<u32, PostgresTableDesc>,
472) -> Result<(), DefiniteError> {
473 let current_desc = upstream_info.get(&oid).ok_or(DefiniteError::TableDropped)?;
474
475 let allow_oids_to_change_by_col_num = info
476 .desc
477 .columns
478 .iter()
479 .zip_eq(info.casts.iter())
480 .flat_map(|(col, (cast_type, _))| match cast_type {
481 CastType::Text => Some(col.col_num),
482 CastType::Natural => None,
483 })
484 .collect();
485
486 match info
487 .desc
488 .determine_compatibility(current_desc, &allow_oids_to_change_by_col_num)
489 {
490 Ok(()) => Ok(()),
491 Err(err) => Err(DefiniteError::IncompatibleSchema(err.to_string())),
492 }
493}
494
495fn cast_row(
497 casts: &[(CastType, MirScalarExpr)],
498 datums: &[Datum<'_>],
499 row: &mut Row,
500) -> Result<(), DefiniteError> {
501 let arena = mz_repr::RowArena::new();
502 let mut packer = row.packer();
503 for (_, column_cast) in casts {
504 let datum = column_cast
505 .eval(datums, &arena)
506 .map_err(DefiniteError::CastError)?;
507 packer.push(datum);
508 }
509 Ok(())
510}
511
512fn decode_utf8_text(bytes: &[u8]) -> Result<Datum<'_>, DefiniteError> {
514 match std::str::from_utf8(bytes) {
515 Ok(text) => Ok(Datum::String(text)),
516 Err(_) => Err(DefiniteError::InvalidUTF8(bytes.to_vec())),
517 }
518}