1use std::collections::BTreeMap;
84use std::rc::Rc;
85use std::time::Duration;
86
87use differential_dataflow::AsCollection;
88use itertools::Itertools as _;
89use mz_expr::EvalError;
90use mz_ore::cast::CastFrom;
91use mz_ore::error::ErrorExt;
92use mz_postgres_util::desc::PostgresTableDesc;
93use mz_postgres_util::{Client, PostgresError, simple_query_opt};
94use mz_repr::{Datum, Diff, GlobalId, Row};
95use mz_sql_parser::ast::Ident;
96use mz_sql_parser::ast::display::AstDisplay;
97use mz_storage_types::errors::{DataflowError, SourceError, SourceErrorDetails};
98use mz_storage_types::sources::casts::StorageScalarExpr;
99use mz_storage_types::sources::postgres::CastType;
100use mz_storage_types::sources::{
101 MzOffset, PostgresSourceConnection, SourceExport, SourceExportDetails, SourceTimestamp,
102};
103use mz_timely_util::builder_async::PressOnDropButton;
104use serde::{Deserialize, Serialize};
105use timely::container::CapacityContainerBuilder;
106use timely::dataflow::operators::Concat;
107use timely::dataflow::operators::core::Partition;
108use timely::dataflow::operators::vec::{Map, ToStream};
109use timely::dataflow::{Scope, StreamVec};
110use timely::progress::Antichain;
111use tokio_postgres::error::SqlState;
112use tokio_postgres::types::PgLsn;
113
114use crate::healthcheck::{HealthStatusMessage, HealthStatusUpdate, StatusNamespace};
115use crate::source::types::{Probe, SourceRender, StackedCollection};
116use crate::source::{RawSourceCreationConfig, SourceMessage};
117
118mod replication;
119mod snapshot;
120
121impl SourceRender for PostgresSourceConnection {
122 type Time = MzOffset;
123
124 const STATUS_NAMESPACE: StatusNamespace = StatusNamespace::Postgres;
125
126 fn render<'scope>(
129 self,
130 scope: Scope<'scope, MzOffset>,
131 config: &RawSourceCreationConfig,
132 resume_uppers: impl futures::Stream<Item = Antichain<MzOffset>> + 'static,
133 _start_signal: impl std::future::Future<Output = ()> + 'static,
134 ) -> (
135 BTreeMap<
136 GlobalId,
137 StackedCollection<'scope, MzOffset, Result<SourceMessage, DataflowError>>,
138 >,
139 StreamVec<'scope, MzOffset, HealthStatusMessage>,
140 StreamVec<'scope, MzOffset, Probe<MzOffset>>,
141 Vec<PressOnDropButton>,
142 ) {
143 let mut table_info = BTreeMap::new();
145 for (idx, (id, export)) in config.source_exports.iter().enumerate() {
146 let SourceExport {
147 details,
148 storage_metadata: _,
149 data_config: _,
150 } = export;
151 let details = match details {
152 SourceExportDetails::Postgres(details) => details,
153 SourceExportDetails::None => continue,
155 _ => panic!("unexpected source export details: {:?}", details),
156 };
157 let desc = details.table.clone();
158 let casts = details.column_casts.clone();
159 let resume_upper = Antichain::from_iter(
160 config
161 .source_resume_uppers
162 .get(id)
163 .expect("all source exports must be present in source resume uppers")
164 .iter()
165 .map(MzOffset::decode_row),
166 );
167 let output = SourceOutputInfo {
168 desc,
169 projection: None,
170 casts,
171 resume_upper,
172 export_id: id.clone(),
173 };
174 table_info
175 .entry(output.desc.oid)
176 .or_insert_with(BTreeMap::new)
177 .insert(idx, output);
178 }
179
180 let metrics = config.metrics.get_postgres_source_metrics(config.id);
181
182 let (snapshot_updates, rewinds, slot_ready, snapshot_err, snapshot_token) =
183 snapshot::render(
184 scope.clone(),
185 config.clone(),
186 self.clone(),
187 table_info.clone(),
188 metrics.snapshot_metrics.clone(),
189 );
190
191 let (repl_updates, probe_stream, repl_err, repl_token) = replication::render(
192 scope.clone(),
193 config.clone(),
194 self,
195 table_info,
196 rewinds,
197 slot_ready,
198 resume_uppers,
199 metrics,
200 );
201
202 let updates = snapshot_updates.concat(repl_updates);
203 let partition_count = u64::cast_from(config.source_exports.len());
204 let data_streams: Vec<_> = updates
205 .inner
206 .partition::<CapacityContainerBuilder<_>, _, _>(
207 partition_count,
208 |((output, data), time, diff): &(
209 (usize, Result<SourceMessage, DataflowError>),
210 MzOffset,
211 Diff,
212 )| {
213 let output = u64::cast_from(*output);
214 (output, (data.clone(), time.clone(), diff.clone()))
215 },
216 );
217 let mut data_collections = BTreeMap::new();
218 for (id, data_stream) in config.source_exports.keys().zip_eq(data_streams) {
219 data_collections.insert(*id, data_stream.as_collection());
220 }
221
222 let export_ids = config.source_exports.keys().copied();
223 let health_init = export_ids
224 .map(Some)
225 .chain(std::iter::once(None))
226 .map(|id| HealthStatusMessage {
227 id,
228 namespace: Self::STATUS_NAMESPACE,
229 update: HealthStatusUpdate::Running,
230 })
231 .collect::<Vec<_>>()
232 .to_stream(scope);
233
234 let errs = snapshot_err.concat(repl_err).map(move |err| {
238 let err_string = err.display_with_causes().to_string();
240 let update = HealthStatusUpdate::halting(err_string.clone(), None);
241
242 let namespace = match err {
243 ReplicationError::Transient(err)
244 if matches!(
245 &*err,
246 TransientError::PostgresError(PostgresError::Ssh(_))
247 | TransientError::PostgresError(PostgresError::SshIo(_))
248 ) =>
249 {
250 StatusNamespace::Ssh
251 }
252 _ => Self::STATUS_NAMESPACE,
253 };
254
255 HealthStatusMessage {
256 id: None,
257 namespace: namespace.clone(),
258 update,
259 }
260 });
261
262 let health = health_init.concat(errs);
263
264 (
265 data_collections,
266 health,
267 probe_stream,
268 vec![snapshot_token, repl_token],
269 )
270 }
271}
272
273#[derive(Clone, Debug)]
274struct SourceOutputInfo {
275 desc: PostgresTableDesc,
277 projection: Option<Vec<usize>>,
281 casts: Vec<(CastType, StorageScalarExpr)>,
282 resume_upper: Antichain<MzOffset>,
283 export_id: GlobalId,
284}
285
286#[derive(Clone, Debug, thiserror::Error)]
287pub enum ReplicationError {
288 #[error(transparent)]
289 Transient(#[from] Rc<TransientError>),
290 #[error(transparent)]
291 Definite(#[from] Rc<DefiniteError>),
292}
293
294#[derive(Debug, thiserror::Error)]
296pub enum TransientError {
297 #[error("replication slot mysteriously missing")]
298 MissingReplicationSlot,
299 #[error(
300 "slot overcompacted. Requested LSN {requested_lsn} but only LSNs >= {available_lsn} are available"
301 )]
302 OvercompactedReplicationSlot {
303 requested_lsn: MzOffset,
304 available_lsn: MzOffset,
305 },
306 #[error("replication slot already exists")]
307 ReplicationSlotAlreadyExists,
308 #[error("stream ended prematurely")]
309 ReplicationEOF,
310 #[error("unexpected replication message")]
311 UnknownReplicationMessage,
312 #[error("unexpected logical replication message")]
313 UnknownLogicalReplicationMessage,
314 #[error("received replication event outside of transaction")]
315 BareTransactionEvent,
316 #[error("lsn mismatch between BEGIN and COMMIT")]
317 InvalidTransaction,
318 #[error("BEGIN within existing BEGIN stream")]
319 NestedTransaction,
320 #[error("recoverable errors should crash the process during snapshots")]
321 SyntheticError,
322 #[error("sql client error")]
323 SQLClient(#[from] tokio_postgres::Error),
324 #[error(transparent)]
325 PostgresError(#[from] PostgresError),
326 #[error(transparent)]
327 Generic(#[from] anyhow::Error),
328}
329
330#[derive(Debug, Clone, Serialize, Deserialize, thiserror::Error)]
332pub enum DefiniteError {
333 #[error("slot compacted past snapshot point. snapshot consistent point={0} resume_lsn={1}")]
334 SlotCompactedPastResumePoint(MzOffset, MzOffset),
335 #[error("table was truncated")]
336 TableTruncated,
337 #[error("table was dropped")]
338 TableDropped,
339 #[error("publication {0:?} does not exist")]
340 PublicationDropped(String),
341 #[error("replication slot has been invalidated because it exceeded the maximum reserved size")]
342 InvalidReplicationSlot,
343 #[error("unexpected number of columns while parsing COPY output")]
344 MissingColumn,
345 #[error("failed to parse COPY protocol")]
346 InvalidCopyInput,
347 #[error(
348 "unsupported action: database restored from point-in-time backup. Expected timeline ID {expected} but got {actual}"
349 )]
350 InvalidTimelineId { expected: u64, actual: u64 },
351 #[error(
352 "TOASTed value missing from old row. Did you forget to set REPLICA IDENTITY to FULL for your table?"
353 )]
354 MissingToast,
355 #[error(
356 "old row missing from replication stream. Did you forget to set REPLICA IDENTITY to FULL for your table?"
357 )]
358 DefaultReplicaIdentity,
359 #[error("incompatible schema change: {0}")]
360 IncompatibleSchema(String),
362 #[error("invalid UTF8 string: {0:?}")]
363 InvalidUTF8(Vec<u8>),
364 #[error("failed to cast raw column: {0}")]
365 CastError(#[source] EvalError),
366 #[error("unexpected binary data in replication stream")]
367 UnexpectedBinaryData,
368}
369
370impl From<DefiniteError> for DataflowError {
371 fn from(err: DefiniteError) -> Self {
372 let m = err.to_string().into();
373 DataflowError::SourceError(Box::new(SourceError {
374 error: match &err {
375 DefiniteError::SlotCompactedPastResumePoint(_, _) => SourceErrorDetails::Other(m),
376 DefiniteError::TableTruncated => SourceErrorDetails::Other(m),
377 DefiniteError::TableDropped => SourceErrorDetails::Other(m),
378 DefiniteError::PublicationDropped(_) => SourceErrorDetails::Initialization(m),
379 DefiniteError::InvalidReplicationSlot => SourceErrorDetails::Initialization(m),
380 DefiniteError::MissingColumn => SourceErrorDetails::Other(m),
381 DefiniteError::InvalidCopyInput => SourceErrorDetails::Other(m),
382 DefiniteError::InvalidTimelineId { .. } => SourceErrorDetails::Initialization(m),
383 DefiniteError::MissingToast => SourceErrorDetails::Other(m),
384 DefiniteError::DefaultReplicaIdentity => SourceErrorDetails::Other(m),
385 DefiniteError::IncompatibleSchema(_) => SourceErrorDetails::Other(m),
386 DefiniteError::InvalidUTF8(_) => SourceErrorDetails::Other(m),
387 DefiniteError::CastError(_) => SourceErrorDetails::Other(m),
388 DefiniteError::UnexpectedBinaryData => SourceErrorDetails::Other(m),
389 },
390 }))
391 }
392}
393
394async fn ensure_replication_slot(client: &Client, slot: &str) -> Result<(), TransientError> {
395 let slot = Ident::new_unchecked(slot).to_ast_string_simple();
397 let query = format!("CREATE_REPLICATION_SLOT {slot} LOGICAL \"pgoutput\" NOEXPORT_SNAPSHOT");
398 match simple_query_opt(client, &query).await {
399 Ok(_) => Ok(()),
400 Err(PostgresError::Postgres(err)) if err.code() == Some(&SqlState::DUPLICATE_OBJECT) => {
402 tracing::trace!("replication slot {slot} already existed");
403 Ok(())
404 }
405 Err(err) => Err(TransientError::PostgresError(err)),
406 }
407}
408
409struct SlotMetadata {
411 active_pid: Option<i32>,
414 confirmed_flush_lsn: MzOffset,
417}
418
419async fn fetch_slot_metadata(
421 client: &Client,
422 slot: &str,
423 interval: Duration,
424) -> Result<SlotMetadata, TransientError> {
425 loop {
426 let query = "SELECT active_pid, confirmed_flush_lsn
427 FROM pg_replication_slots WHERE slot_name = $1";
428 let Some(row) = client.query_opt(query, &[&slot]).await? else {
429 return Err(TransientError::MissingReplicationSlot);
430 };
431
432 match row.get::<_, Option<PgLsn>>("confirmed_flush_lsn") {
433 Some(lsn) => {
437 return Ok(SlotMetadata {
438 confirmed_flush_lsn: MzOffset::from(lsn),
439 active_pid: row.get("active_pid"),
440 });
441 }
442 None => tokio::time::sleep(interval).await,
446 };
447 }
448}
449
450async fn fetch_max_lsn(client: &Client) -> Result<MzOffset, TransientError> {
452 let query = "SELECT pg_current_wal_lsn()";
453 let row = simple_query_opt(client, query).await?;
454
455 match row.and_then(|row| {
456 row.get("pg_current_wal_lsn")
457 .map(|lsn| lsn.parse::<PgLsn>().unwrap())
458 }) {
459 Some(lsn) => Ok(MzOffset::from(lsn)),
464 None => Err(TransientError::Generic(anyhow::anyhow!(
465 "pg_current_wal_lsn() mysteriously has no value"
466 ))),
467 }
468}
469
470fn verify_schema(
473 oid: u32,
474 info: &SourceOutputInfo,
475 upstream_info: &BTreeMap<u32, PostgresTableDesc>,
476) -> Result<(), DefiniteError> {
477 let current_desc = upstream_info.get(&oid).ok_or(DefiniteError::TableDropped)?;
478
479 let allow_oids_to_change_by_col_num = info
480 .desc
481 .columns
482 .iter()
483 .zip_eq(info.casts.iter())
484 .flat_map(|(col, (cast_type, _))| match cast_type {
485 CastType::Text => Some(col.col_num),
486 CastType::Natural => None,
487 })
488 .collect();
489
490 match info
491 .desc
492 .determine_compatibility(current_desc, &allow_oids_to_change_by_col_num)
493 {
494 Ok(()) => Ok(()),
495 Err(err) => Err(DefiniteError::IncompatibleSchema(err.to_string())),
496 }
497}
498
499fn cast_row(
501 casts: &[(CastType, StorageScalarExpr)],
502 datums: &[Datum<'_>],
503 row: &mut Row,
504) -> Result<(), DefiniteError> {
505 let arena = mz_repr::RowArena::new();
506 let mut packer = row.packer();
507 for (_, column_cast) in casts {
508 let datum = column_cast
509 .eval(datums, &arena)
510 .map_err(DefiniteError::CastError)?;
511 packer.push(datum);
512 }
513 Ok(())
514}
515
516fn decode_utf8_text(bytes: &[u8]) -> Result<Datum<'_>, DefiniteError> {
518 match std::str::from_utf8(bytes) {
519 Ok(text) => Ok(Datum::String(text)),
520 Err(_) => Err(DefiniteError::InvalidUTF8(bytes.to_vec())),
521 }
522}