1use std::collections::BTreeMap;
84use std::convert::Infallible;
85use std::rc::Rc;
86use std::time::Duration;
87
88use differential_dataflow::AsCollection;
89use itertools::Itertools as _;
90use mz_expr::{EvalError, MirScalarExpr};
91use mz_ore::cast::CastFrom;
92use mz_ore::error::ErrorExt;
93use mz_postgres_util::desc::PostgresTableDesc;
94use mz_postgres_util::{Client, PostgresError, simple_query_opt};
95use mz_repr::{Datum, Diff, GlobalId, Row};
96use mz_sql_parser::ast::Ident;
97use mz_sql_parser::ast::display::AstDisplay;
98use mz_storage_types::errors::{DataflowError, SourceError, SourceErrorDetails};
99use mz_storage_types::sources::postgres::CastType;
100use mz_storage_types::sources::{
101 MzOffset, PostgresSourceConnection, SourceExport, SourceExportDetails, SourceTimestamp,
102};
103use mz_timely_util::builder_async::PressOnDropButton;
104use serde::{Deserialize, Serialize};
105use timely::container::CapacityContainerBuilder;
106use timely::dataflow::operators::core::Partition;
107use timely::dataflow::operators::{Concat, Map, ToStream};
108use timely::dataflow::{Scope, Stream};
109use timely::progress::Antichain;
110use tokio_postgres::error::SqlState;
111use tokio_postgres::types::PgLsn;
112
113use crate::healthcheck::{HealthStatusMessage, HealthStatusUpdate, StatusNamespace};
114use crate::source::types::{Probe, SourceRender, StackedCollection};
115use crate::source::{RawSourceCreationConfig, SourceMessage};
116
117mod replication;
118mod snapshot;
119
120impl SourceRender for PostgresSourceConnection {
121 type Time = MzOffset;
122
123 const STATUS_NAMESPACE: StatusNamespace = StatusNamespace::Postgres;
124
125 fn render<G: Scope<Timestamp = MzOffset>>(
128 self,
129 scope: &mut G,
130 config: &RawSourceCreationConfig,
131 resume_uppers: impl futures::Stream<Item = Antichain<MzOffset>> + 'static,
132 _start_signal: impl std::future::Future<Output = ()> + 'static,
133 ) -> (
134 BTreeMap<GlobalId, StackedCollection<G, Result<SourceMessage, DataflowError>>>,
135 Stream<G, Infallible>,
136 Stream<G, HealthStatusMessage>,
137 Option<Stream<G, Probe<MzOffset>>>,
138 Vec<PressOnDropButton>,
139 ) {
140 let mut table_info = BTreeMap::new();
142 for (idx, (id, export)) in config.source_exports.iter().enumerate() {
143 let SourceExport {
144 details,
145 storage_metadata: _,
146 data_config: _,
147 } = export;
148 let details = match details {
149 SourceExportDetails::Postgres(details) => details,
150 SourceExportDetails::None => continue,
152 _ => panic!("unexpected source export details: {:?}", details),
153 };
154 let desc = details.table.clone();
155 let casts = details.column_casts.clone();
156 let resume_upper = Antichain::from_iter(
157 config
158 .source_resume_uppers
159 .get(id)
160 .expect("all source exports must be present in source resume uppers")
161 .iter()
162 .map(MzOffset::decode_row),
163 );
164 let output = SourceOutputInfo {
165 desc,
166 projection: None,
167 casts,
168 resume_upper,
169 export_id: id.clone(),
170 };
171 table_info
172 .entry(output.desc.oid)
173 .or_insert_with(BTreeMap::new)
174 .insert(idx, output);
175 }
176
177 let metrics = config.metrics.get_postgres_source_metrics(config.id);
178
179 let (snapshot_updates, rewinds, slot_ready, snapshot_err, snapshot_token) =
180 snapshot::render(
181 scope.clone(),
182 config.clone(),
183 self.clone(),
184 table_info.clone(),
185 metrics.snapshot_metrics.clone(),
186 );
187
188 let (repl_updates, uppers, probe_stream, repl_err, repl_token) = replication::render(
189 scope.clone(),
190 config.clone(),
191 self,
192 table_info,
193 &rewinds,
194 &slot_ready,
195 resume_uppers,
196 metrics,
197 );
198
199 let updates = snapshot_updates.concat(&repl_updates);
200 let partition_count = u64::cast_from(config.source_exports.len());
201 let data_streams: Vec<_> = updates
202 .inner
203 .partition::<CapacityContainerBuilder<_>, _, _>(
204 partition_count,
205 |((output, data), time, diff): &(
206 (usize, Result<SourceMessage, DataflowError>),
207 MzOffset,
208 Diff,
209 )| {
210 let output = u64::cast_from(*output);
211 (output, (data.clone(), time.clone(), diff.clone()))
212 },
213 );
214 let mut data_collections = BTreeMap::new();
215 for (id, data_stream) in config.source_exports.keys().zip_eq(data_streams) {
216 data_collections.insert(*id, data_stream.as_collection());
217 }
218
219 let export_ids = config.source_exports.keys().copied();
220 let health_init = export_ids
221 .map(Some)
222 .chain(std::iter::once(None))
223 .map(|id| HealthStatusMessage {
224 id,
225 namespace: Self::STATUS_NAMESPACE,
226 update: HealthStatusUpdate::Running,
227 })
228 .collect::<Vec<_>>()
229 .to_stream(scope);
230
231 let errs = snapshot_err.concat(&repl_err).map(move |err| {
235 let err_string = err.display_with_causes().to_string();
237 let update = HealthStatusUpdate::halting(err_string.clone(), None);
238
239 let namespace = match err {
240 ReplicationError::Transient(err)
241 if matches!(
242 &*err,
243 TransientError::PostgresError(PostgresError::Ssh(_))
244 | TransientError::PostgresError(PostgresError::SshIo(_))
245 ) =>
246 {
247 StatusNamespace::Ssh
248 }
249 _ => Self::STATUS_NAMESPACE,
250 };
251
252 HealthStatusMessage {
253 id: None,
254 namespace: namespace.clone(),
255 update,
256 }
257 });
258
259 let health = health_init.concat(&errs);
260
261 (
262 data_collections,
263 uppers,
264 health,
265 probe_stream,
266 vec![snapshot_token, repl_token],
267 )
268 }
269}
270
271#[derive(Clone, Debug)]
272struct SourceOutputInfo {
273 desc: PostgresTableDesc,
275 projection: Option<Vec<usize>>,
279 casts: Vec<(CastType, MirScalarExpr)>,
280 resume_upper: Antichain<MzOffset>,
281 export_id: GlobalId,
282}
283
284#[derive(Clone, Debug, thiserror::Error)]
285pub enum ReplicationError {
286 #[error(transparent)]
287 Transient(#[from] Rc<TransientError>),
288 #[error(transparent)]
289 Definite(#[from] Rc<DefiniteError>),
290}
291
292#[derive(Debug, thiserror::Error)]
294pub enum TransientError {
295 #[error("replication slot mysteriously missing")]
296 MissingReplicationSlot,
297 #[error(
298 "slot overcompacted. Requested LSN {requested_lsn} but only LSNs >= {available_lsn} are available"
299 )]
300 OvercompactedReplicationSlot {
301 requested_lsn: MzOffset,
302 available_lsn: MzOffset,
303 },
304 #[error("replication slot already exists")]
305 ReplicationSlotAlreadyExists,
306 #[error("stream ended prematurely")]
307 ReplicationEOF,
308 #[error("unexpected replication message")]
309 UnknownReplicationMessage,
310 #[error("unexpected logical replication message")]
311 UnknownLogicalReplicationMessage,
312 #[error("received replication event outside of transaction")]
313 BareTransactionEvent,
314 #[error("lsn mismatch between BEGIN and COMMIT")]
315 InvalidTransaction,
316 #[error("BEGIN within existing BEGIN stream")]
317 NestedTransaction,
318 #[error("recoverable errors should crash the process during snapshots")]
319 SyntheticError,
320 #[error("sql client error")]
321 SQLClient(#[from] tokio_postgres::Error),
322 #[error(transparent)]
323 PostgresError(#[from] PostgresError),
324 #[error(transparent)]
325 Generic(#[from] anyhow::Error),
326}
327
328#[derive(Debug, Clone, Serialize, Deserialize, thiserror::Error)]
330pub enum DefiniteError {
331 #[error("slot compacted past snapshot point. snapshot consistent point={0} resume_lsn={1}")]
332 SlotCompactedPastResumePoint(MzOffset, MzOffset),
333 #[error("table was truncated")]
334 TableTruncated,
335 #[error("table was dropped")]
336 TableDropped,
337 #[error("publication {0:?} does not exist")]
338 PublicationDropped(String),
339 #[error("replication slot has been invalidated because it exceeded the maximum reserved size")]
340 InvalidReplicationSlot,
341 #[error("unexpected number of columns while parsing COPY output")]
342 MissingColumn,
343 #[error("failed to parse COPY protocol")]
344 InvalidCopyInput,
345 #[error(
346 "unsupported action: database restored from point-in-time backup. Expected timeline ID {expected} but got {actual}"
347 )]
348 InvalidTimelineId { expected: u64, actual: u64 },
349 #[error(
350 "TOASTed value missing from old row. Did you forget to set REPLICA IDENTITY to FULL for your table?"
351 )]
352 MissingToast,
353 #[error(
354 "old row missing from replication stream. Did you forget to set REPLICA IDENTITY to FULL for your table?"
355 )]
356 DefaultReplicaIdentity,
357 #[error("incompatible schema change: {0}")]
358 IncompatibleSchema(String),
360 #[error("invalid UTF8 string: {0:?}")]
361 InvalidUTF8(Vec<u8>),
362 #[error("failed to cast raw column: {0}")]
363 CastError(#[source] EvalError),
364 #[error("unexpected binary data in replication stream")]
365 UnexpectedBinaryData,
366}
367
368impl From<DefiniteError> for DataflowError {
369 fn from(err: DefiniteError) -> Self {
370 let m = err.to_string().into();
371 DataflowError::SourceError(Box::new(SourceError {
372 error: match &err {
373 DefiniteError::SlotCompactedPastResumePoint(_, _) => SourceErrorDetails::Other(m),
374 DefiniteError::TableTruncated => SourceErrorDetails::Other(m),
375 DefiniteError::TableDropped => SourceErrorDetails::Other(m),
376 DefiniteError::PublicationDropped(_) => SourceErrorDetails::Initialization(m),
377 DefiniteError::InvalidReplicationSlot => SourceErrorDetails::Initialization(m),
378 DefiniteError::MissingColumn => SourceErrorDetails::Other(m),
379 DefiniteError::InvalidCopyInput => SourceErrorDetails::Other(m),
380 DefiniteError::InvalidTimelineId { .. } => SourceErrorDetails::Initialization(m),
381 DefiniteError::MissingToast => SourceErrorDetails::Other(m),
382 DefiniteError::DefaultReplicaIdentity => SourceErrorDetails::Other(m),
383 DefiniteError::IncompatibleSchema(_) => SourceErrorDetails::Other(m),
384 DefiniteError::InvalidUTF8(_) => SourceErrorDetails::Other(m),
385 DefiniteError::CastError(_) => SourceErrorDetails::Other(m),
386 DefiniteError::UnexpectedBinaryData => SourceErrorDetails::Other(m),
387 },
388 }))
389 }
390}
391
392async fn ensure_replication_slot(client: &Client, slot: &str) -> Result<(), TransientError> {
393 let slot = Ident::new_unchecked(slot).to_ast_string_simple();
395 let query = format!("CREATE_REPLICATION_SLOT {slot} LOGICAL \"pgoutput\" NOEXPORT_SNAPSHOT");
396 match simple_query_opt(client, &query).await {
397 Ok(_) => Ok(()),
398 Err(PostgresError::Postgres(err)) if err.code() == Some(&SqlState::DUPLICATE_OBJECT) => {
400 tracing::trace!("replication slot {slot} already existed");
401 Ok(())
402 }
403 Err(err) => Err(TransientError::PostgresError(err)),
404 }
405}
406
407struct SlotMetadata {
409 active_pid: Option<i32>,
412 confirmed_flush_lsn: MzOffset,
415}
416
417async fn fetch_slot_metadata(
419 client: &Client,
420 slot: &str,
421 interval: Duration,
422) -> Result<SlotMetadata, TransientError> {
423 loop {
424 let query = "SELECT active_pid, confirmed_flush_lsn
425 FROM pg_replication_slots WHERE slot_name = $1";
426 let Some(row) = client.query_opt(query, &[&slot]).await? else {
427 return Err(TransientError::MissingReplicationSlot);
428 };
429
430 match row.get::<_, Option<PgLsn>>("confirmed_flush_lsn") {
431 Some(lsn) => {
435 return Ok(SlotMetadata {
436 confirmed_flush_lsn: MzOffset::from(lsn),
437 active_pid: row.get("active_pid"),
438 });
439 }
440 None => tokio::time::sleep(interval).await,
444 };
445 }
446}
447
448async fn fetch_max_lsn(client: &Client) -> Result<MzOffset, TransientError> {
450 let query = "SELECT pg_current_wal_lsn()";
451 let row = simple_query_opt(client, query).await?;
452
453 match row.and_then(|row| {
454 row.get("pg_current_wal_lsn")
455 .map(|lsn| lsn.parse::<PgLsn>().unwrap())
456 }) {
457 Some(lsn) => Ok(MzOffset::from(lsn)),
462 None => Err(TransientError::Generic(anyhow::anyhow!(
463 "pg_current_wal_lsn() mysteriously has no value"
464 ))),
465 }
466}
467
468fn verify_schema(
471 oid: u32,
472 info: &SourceOutputInfo,
473 upstream_info: &BTreeMap<u32, PostgresTableDesc>,
474) -> Result<(), DefiniteError> {
475 let current_desc = upstream_info.get(&oid).ok_or(DefiniteError::TableDropped)?;
476
477 let allow_oids_to_change_by_col_num = info
478 .desc
479 .columns
480 .iter()
481 .zip_eq(info.casts.iter())
482 .flat_map(|(col, (cast_type, _))| match cast_type {
483 CastType::Text => Some(col.col_num),
484 CastType::Natural => None,
485 })
486 .collect();
487
488 match info
489 .desc
490 .determine_compatibility(current_desc, &allow_oids_to_change_by_col_num)
491 {
492 Ok(()) => Ok(()),
493 Err(err) => Err(DefiniteError::IncompatibleSchema(err.to_string())),
494 }
495}
496
497fn cast_row(
499 casts: &[(CastType, MirScalarExpr)],
500 datums: &[Datum<'_>],
501 row: &mut Row,
502) -> Result<(), DefiniteError> {
503 let arena = mz_repr::RowArena::new();
504 let mut packer = row.packer();
505 for (_, column_cast) in casts {
506 let datum = column_cast
507 .eval(datums, &arena)
508 .map_err(DefiniteError::CastError)?;
509 packer.push(datum);
510 }
511 Ok(())
512}
513
514fn decode_utf8_text(bytes: &[u8]) -> Result<Datum<'_>, DefiniteError> {
516 match std::str::from_utf8(bytes) {
517 Ok(text) => Ok(Datum::String(text)),
518 Err(_) => Err(DefiniteError::InvalidUTF8(bytes.to_vec())),
519 }
520}