1use std::collections::BTreeMap;
84use std::rc::Rc;
85use std::time::Duration;
86
87use differential_dataflow::AsCollection;
88use itertools::Itertools as _;
89use mz_expr::EvalError;
90use mz_ore::cast::CastFrom;
91use mz_ore::error::ErrorExt;
92use mz_postgres_util::desc::PostgresTableDesc;
93use mz_postgres_util::{Client, PostgresError, Sql, query_opt, simple_query_opt, sql};
94use mz_repr::{Datum, Diff, GlobalId, Row};
95use mz_storage_types::errors::{DataflowError, SourceError, SourceErrorDetails};
96use mz_storage_types::sources::casts::StorageScalarExpr;
97use mz_storage_types::sources::postgres::CastType;
98use mz_storage_types::sources::{
99 MzOffset, PostgresSourceConnection, SourceExport, SourceExportDetails, SourceTimestamp,
100};
101use mz_timely_util::builder_async::PressOnDropButton;
102use serde::{Deserialize, Serialize};
103use timely::container::CapacityContainerBuilder;
104use timely::dataflow::operators::Concat;
105use timely::dataflow::operators::core::Partition;
106use timely::dataflow::operators::vec::{Map, ToStream};
107use timely::dataflow::{Scope, StreamVec};
108use timely::progress::Antichain;
109use tokio_postgres::error::SqlState;
110use tokio_postgres::types::PgLsn;
111
112use crate::healthcheck::{HealthStatusMessage, HealthStatusUpdate, StatusNamespace};
113use crate::source::types::{Probe, SourceRender, StackedCollection};
114use crate::source::{RawSourceCreationConfig, SourceMessage};
115
116mod replication;
117mod snapshot;
118
119impl SourceRender for PostgresSourceConnection {
120 type Time = MzOffset;
121
122 const STATUS_NAMESPACE: StatusNamespace = StatusNamespace::Postgres;
123
124 fn render<'scope>(
127 self,
128 scope: Scope<'scope, MzOffset>,
129 config: &RawSourceCreationConfig,
130 resume_uppers: impl futures::Stream<Item = Antichain<MzOffset>> + 'static,
131 _start_signal: impl std::future::Future<Output = ()> + 'static,
132 ) -> (
133 BTreeMap<
134 GlobalId,
135 StackedCollection<'scope, MzOffset, Result<SourceMessage, DataflowError>>,
136 >,
137 StreamVec<'scope, MzOffset, HealthStatusMessage>,
138 StreamVec<'scope, MzOffset, Probe<MzOffset>>,
139 Vec<PressOnDropButton>,
140 ) {
141 let mut table_info = BTreeMap::new();
143 for (idx, (id, export)) in config.source_exports.iter().enumerate() {
144 let SourceExport {
145 details,
146 storage_metadata: _,
147 data_config: _,
148 } = export;
149 let details = match details {
150 SourceExportDetails::Postgres(details) => details,
151 SourceExportDetails::None => continue,
153 _ => panic!("unexpected source export details: {:?}", details),
154 };
155 let desc = details.table.clone();
156 let casts = details.column_casts.clone();
157 let resume_upper = Antichain::from_iter(
158 config
159 .source_resume_uppers
160 .get(id)
161 .expect("all source exports must be present in source resume uppers")
162 .iter()
163 .map(MzOffset::decode_row),
164 );
165 let output = SourceOutputInfo {
166 desc,
167 projection: None,
168 casts,
169 resume_upper,
170 export_id: id.clone(),
171 };
172 table_info
173 .entry(output.desc.oid)
174 .or_insert_with(BTreeMap::new)
175 .insert(idx, output);
176 }
177
178 let metrics = config.metrics.get_postgres_source_metrics(config.id);
179
180 let (snapshot_updates, rewinds, slot_ready, snapshot_err, snapshot_token) =
181 snapshot::render(
182 scope.clone(),
183 config.clone(),
184 self.clone(),
185 table_info.clone(),
186 metrics.snapshot_metrics.clone(),
187 );
188
189 let (repl_updates, probe_stream, repl_err, repl_token) = replication::render(
190 scope.clone(),
191 config.clone(),
192 self,
193 table_info,
194 rewinds,
195 slot_ready,
196 resume_uppers,
197 metrics,
198 );
199
200 let updates = snapshot_updates.concat(repl_updates);
201 let partition_count = u64::cast_from(config.source_exports.len());
202 let data_streams: Vec<_> = updates
203 .inner
204 .partition::<CapacityContainerBuilder<_>, _, _>(
205 partition_count,
206 |((output, data), time, diff): (
207 (usize, Result<SourceMessage, DataflowError>),
208 MzOffset,
209 Diff,
210 )| {
211 let output = u64::cast_from(output);
212 (output, (data, time, diff))
213 },
214 );
215 let mut data_collections = BTreeMap::new();
216 for (id, data_stream) in config.source_exports.keys().zip_eq(data_streams) {
217 data_collections.insert(*id, data_stream.as_collection());
218 }
219
220 let export_ids = config.source_exports.keys().copied();
221 let health_init = export_ids
222 .map(Some)
223 .chain(std::iter::once(None))
224 .map(|id| HealthStatusMessage {
225 id,
226 namespace: Self::STATUS_NAMESPACE,
227 update: HealthStatusUpdate::Running,
228 })
229 .collect::<Vec<_>>()
230 .to_stream(scope);
231
232 let errs = snapshot_err.concat(repl_err).map(move |err| {
236 let err_string = err.display_with_causes().to_string();
238 let update = HealthStatusUpdate::halting(err_string.clone(), None);
239
240 let namespace = match err {
241 ReplicationError::Transient(err)
242 if matches!(
243 &*err,
244 TransientError::PostgresError(PostgresError::Ssh(_))
245 | TransientError::PostgresError(PostgresError::SshIo(_))
246 ) =>
247 {
248 StatusNamespace::Ssh
249 }
250 _ => Self::STATUS_NAMESPACE,
251 };
252
253 HealthStatusMessage {
254 id: None,
255 namespace: namespace.clone(),
256 update,
257 }
258 });
259
260 let health = health_init.concat(errs);
261
262 (
263 data_collections,
264 health,
265 probe_stream,
266 vec![snapshot_token, repl_token],
267 )
268 }
269}
270
271#[derive(Clone, Debug)]
272struct SourceOutputInfo {
273 desc: PostgresTableDesc,
275 projection: Option<Vec<usize>>,
279 casts: Vec<(CastType, StorageScalarExpr)>,
280 resume_upper: Antichain<MzOffset>,
281 export_id: GlobalId,
282}
283
284#[derive(Clone, Debug, thiserror::Error)]
285pub enum ReplicationError {
286 #[error(transparent)]
287 Transient(#[from] Rc<TransientError>),
288 #[error(transparent)]
289 Definite(#[from] Rc<DefiniteError>),
290}
291
292#[derive(Debug, thiserror::Error)]
294pub enum TransientError {
295 #[error("replication slot mysteriously missing")]
296 MissingReplicationSlot,
297 #[error(
298 "slot overcompacted. Requested LSN {requested_lsn} but only LSNs >= {available_lsn} are available"
299 )]
300 OvercompactedReplicationSlot {
301 requested_lsn: MzOffset,
302 available_lsn: MzOffset,
303 },
304 #[error("replication slot already exists")]
305 ReplicationSlotAlreadyExists,
306 #[error("stream ended prematurely")]
307 ReplicationEOF,
308 #[error("unexpected replication message")]
309 UnknownReplicationMessage,
310 #[error("unexpected logical replication message")]
311 UnknownLogicalReplicationMessage,
312 #[error("received replication event outside of transaction")]
313 BareTransactionEvent,
314 #[error("lsn mismatch between BEGIN and COMMIT")]
315 InvalidTransaction,
316 #[error("BEGIN within existing BEGIN stream")]
317 NestedTransaction,
318 #[error("recoverable errors should crash the process during snapshots")]
319 SyntheticError,
320 #[error("sql client error")]
321 SQLClient(#[from] tokio_postgres::Error),
322 #[error(transparent)]
323 PostgresError(#[from] PostgresError),
324 #[error(transparent)]
325 Generic(#[from] anyhow::Error),
326}
327
328#[derive(Debug, Clone, Serialize, Deserialize, thiserror::Error)]
330pub enum DefiniteError {
331 #[error("slot compacted past snapshot point. snapshot consistent point={0} resume_lsn={1}")]
332 SlotCompactedPastResumePoint(MzOffset, MzOffset),
333 #[error("table was truncated")]
334 TableTruncated,
335 #[error("table was dropped")]
336 TableDropped,
337 #[error("publication {0:?} does not exist")]
338 PublicationDropped(String),
339 #[error("replication slot has been invalidated because it exceeded the maximum reserved size")]
340 InvalidReplicationSlot,
341 #[error("unexpected number of columns while parsing COPY output")]
342 MissingColumn,
343 #[error("failed to parse COPY protocol")]
344 InvalidCopyInput,
345 #[error(
346 "unsupported action: database restored from point-in-time backup. Expected timeline ID {expected} but got {actual}"
347 )]
348 InvalidTimelineId { expected: u64, actual: u64 },
349 #[error(
350 "unsupported action: upstream physical replica status changed (e.g. a physical replica was promoted to a primary). Expected pg_is_in_recovery()={expected} but got {actual}"
351 )]
352 InvalidPhysicalReplica { expected: bool, actual: bool },
353 #[error(
354 "TOASTed value missing from old row. Did you forget to set REPLICA IDENTITY to FULL for your table?"
355 )]
356 MissingToast,
357 #[error(
358 "old row missing from replication stream. Did you forget to set REPLICA IDENTITY to FULL for your table?"
359 )]
360 DefaultReplicaIdentity,
361 #[error("incompatible schema change: {0}")]
362 IncompatibleSchema(String),
364 #[error("invalid UTF8 string: {0:?}")]
365 InvalidUTF8(Vec<u8>),
366 #[error("failed to cast raw column: {0}")]
367 CastError(#[source] EvalError),
368 #[error("unexpected binary data in replication stream")]
369 UnexpectedBinaryData,
370}
371
372impl From<DefiniteError> for DataflowError {
373 fn from(err: DefiniteError) -> Self {
374 let m = err.to_string().into();
375 DataflowError::SourceError(Box::new(SourceError {
376 error: match &err {
377 DefiniteError::SlotCompactedPastResumePoint(_, _) => SourceErrorDetails::Other(m),
378 DefiniteError::TableTruncated => SourceErrorDetails::Other(m),
379 DefiniteError::TableDropped => SourceErrorDetails::Other(m),
380 DefiniteError::PublicationDropped(_) => SourceErrorDetails::Initialization(m),
381 DefiniteError::InvalidReplicationSlot => SourceErrorDetails::Initialization(m),
382 DefiniteError::MissingColumn => SourceErrorDetails::Other(m),
383 DefiniteError::InvalidCopyInput => SourceErrorDetails::Other(m),
384 DefiniteError::InvalidTimelineId { .. } => SourceErrorDetails::Initialization(m),
385 DefiniteError::InvalidPhysicalReplica { .. } => {
386 SourceErrorDetails::Initialization(m)
387 }
388 DefiniteError::MissingToast => SourceErrorDetails::Other(m),
389 DefiniteError::DefaultReplicaIdentity => SourceErrorDetails::Other(m),
390 DefiniteError::IncompatibleSchema(_) => SourceErrorDetails::Other(m),
391 DefiniteError::InvalidUTF8(_) => SourceErrorDetails::Other(m),
392 DefiniteError::CastError(_) => SourceErrorDetails::Other(m),
393 DefiniteError::UnexpectedBinaryData => SourceErrorDetails::Other(m),
394 },
395 }))
396 }
397}
398
399async fn ensure_replication_slot(client: &Client, slot: &str) -> Result<(), TransientError> {
400 let slot = Sql::ident(slot);
401 let query = sql!(
402 "CREATE_REPLICATION_SLOT {} LOGICAL \"pgoutput\" NOEXPORT_SNAPSHOT",
403 slot.clone()
404 );
405 match simple_query_opt(client, query).await {
406 Ok(_) => Ok(()),
407 Err(PostgresError::Postgres(err)) if err.code() == Some(&SqlState::DUPLICATE_OBJECT) => {
409 tracing::trace!(slot = %slot, "replication slot already existed");
410 Ok(())
411 }
412 Err(err) => Err(TransientError::PostgresError(err)),
413 }
414}
415
416struct SlotMetadata {
418 active_pid: Option<i32>,
421 confirmed_flush_lsn: MzOffset,
424}
425
426async fn fetch_slot_metadata(
428 client: &Client,
429 slot: &str,
430 interval: Duration,
431) -> Result<SlotMetadata, TransientError> {
432 loop {
433 let Some(row) = query_opt(
434 &**client,
435 sql!(
436 "SELECT active_pid, confirmed_flush_lsn \
437 FROM pg_replication_slots WHERE slot_name = $1"
438 ),
439 &[&slot],
440 )
441 .await?
442 else {
443 return Err(TransientError::MissingReplicationSlot);
444 };
445
446 match row.get::<_, Option<PgLsn>>("confirmed_flush_lsn") {
447 Some(lsn) => {
451 return Ok(SlotMetadata {
452 confirmed_flush_lsn: MzOffset::from(lsn),
453 active_pid: row.get("active_pid"),
454 });
455 }
456 None => tokio::time::sleep(interval).await,
460 };
461 }
462}
463
464fn verify_schema(
467 oid: u32,
468 info: &SourceOutputInfo,
469 upstream_info: &BTreeMap<u32, PostgresTableDesc>,
470) -> Result<(), DefiniteError> {
471 let current_desc = upstream_info.get(&oid).ok_or(DefiniteError::TableDropped)?;
472
473 let allow_oids_to_change_by_col_num = info
474 .desc
475 .columns
476 .iter()
477 .zip_eq(info.casts.iter())
478 .flat_map(|(col, (cast_type, _))| match cast_type {
479 CastType::Text => Some(col.col_num),
480 CastType::Natural => None,
481 })
482 .collect();
483
484 match info
485 .desc
486 .determine_compatibility(current_desc, &allow_oids_to_change_by_col_num)
487 {
488 Ok(()) => Ok(()),
489 Err(err) => Err(DefiniteError::IncompatibleSchema(err.to_string())),
490 }
491}
492
493fn cast_row(
495 casts: &[(CastType, StorageScalarExpr)],
496 datums: &[Datum<'_>],
497 row: &mut Row,
498) -> Result<(), DefiniteError> {
499 let arena = mz_repr::RowArena::new();
500 let mut packer = row.packer();
501 for (_, column_cast) in casts {
502 let datum = column_cast
503 .eval(datums, &arena)
504 .map_err(DefiniteError::CastError)?;
505 packer.push(datum);
506 }
507 Ok(())
508}
509
510fn decode_utf8_text(bytes: &[u8]) -> Result<Datum<'_>, DefiniteError> {
512 match std::str::from_utf8(bytes) {
513 Ok(text) => Ok(Datum::String(text)),
514 Err(_) => Err(DefiniteError::InvalidUTF8(bytes.to_vec())),
515 }
516}