1use std::collections::BTreeMap;
83use std::convert::Infallible;
84use std::rc::Rc;
85use std::time::Duration;
86
87use differential_dataflow::AsCollection;
88use itertools::Itertools as _;
89use mz_expr::{EvalError, MirScalarExpr};
90use mz_ore::cast::CastFrom;
91use mz_ore::error::ErrorExt;
92use mz_postgres_util::desc::PostgresTableDesc;
93use mz_postgres_util::{Client, PostgresError, simple_query_opt};
94use mz_repr::{Datum, Diff, GlobalId, Row};
95use mz_sql_parser::ast::Ident;
96use mz_sql_parser::ast::display::AstDisplay;
97use mz_storage_types::errors::{DataflowError, SourceError, SourceErrorDetails};
98use mz_storage_types::sources::postgres::CastType;
99use mz_storage_types::sources::{
100 MzOffset, PostgresSourceConnection, SourceExport, SourceExportDetails, SourceTimestamp,
101};
102use mz_timely_util::builder_async::PressOnDropButton;
103use serde::{Deserialize, Serialize};
104use timely::container::CapacityContainerBuilder;
105use timely::dataflow::operators::core::Partition;
106use timely::dataflow::operators::{Concat, Map, ToStream};
107use timely::dataflow::{Scope, Stream};
108use timely::progress::Antichain;
109use tokio_postgres::error::SqlState;
110use tokio_postgres::types::PgLsn;
111
112use crate::healthcheck::{HealthStatusMessage, HealthStatusUpdate, StatusNamespace};
113use crate::source::types::{Probe, ProgressStatisticsUpdate, SourceRender, StackedCollection};
114use crate::source::{RawSourceCreationConfig, SourceMessage};
115
116mod replication;
117mod snapshot;
118
119impl SourceRender for PostgresSourceConnection {
120 type Time = MzOffset;
121
122 const STATUS_NAMESPACE: StatusNamespace = StatusNamespace::Postgres;
123
124 fn render<G: Scope<Timestamp = MzOffset>>(
127 self,
128 scope: &mut G,
129 config: &RawSourceCreationConfig,
130 resume_uppers: impl futures::Stream<Item = Antichain<MzOffset>> + 'static,
131 _start_signal: impl std::future::Future<Output = ()> + 'static,
132 ) -> (
133 BTreeMap<GlobalId, StackedCollection<G, Result<SourceMessage, DataflowError>>>,
134 Stream<G, Infallible>,
135 Stream<G, HealthStatusMessage>,
136 Stream<G, ProgressStatisticsUpdate>,
137 Option<Stream<G, Probe<MzOffset>>>,
138 Vec<PressOnDropButton>,
139 ) {
140 let mut table_info = BTreeMap::new();
142 for (idx, (id, export)) in config.source_exports.iter().enumerate() {
143 let SourceExport {
144 details,
145 storage_metadata: _,
146 data_config: _,
147 } = export;
148 let details = match details {
149 SourceExportDetails::Postgres(details) => details,
150 SourceExportDetails::None => continue,
152 _ => panic!("unexpected source export details: {:?}", details),
153 };
154 let desc = details.table.clone();
155 let casts = details.column_casts.clone();
156 let resume_upper = Antichain::from_iter(
157 config
158 .source_resume_uppers
159 .get(id)
160 .expect("all source exports must be present in source resume uppers")
161 .iter()
162 .map(MzOffset::decode_row),
163 );
164 let output = SourceOutputInfo {
165 desc,
166 casts,
167 resume_upper,
168 };
169 table_info
170 .entry(output.desc.oid)
171 .or_insert_with(BTreeMap::new)
172 .insert(idx, output);
173 }
174
175 let metrics = config.metrics.get_postgres_source_metrics(config.id);
176
177 let (snapshot_updates, rewinds, slot_ready, snapshot_stats, snapshot_err, snapshot_token) =
178 snapshot::render(
179 scope.clone(),
180 config.clone(),
181 self.clone(),
182 table_info.clone(),
183 metrics.snapshot_metrics.clone(),
184 );
185
186 let (repl_updates, uppers, stats_stream, probe_stream, repl_err, repl_token) =
187 replication::render(
188 scope.clone(),
189 config.clone(),
190 self,
191 table_info,
192 &rewinds,
193 &slot_ready,
194 resume_uppers,
195 metrics,
196 );
197
198 let stats_stream = stats_stream.concat(&snapshot_stats);
199
200 let updates = snapshot_updates.concat(&repl_updates);
201 let partition_count = u64::cast_from(config.source_exports.len());
202 let data_streams: Vec<_> = updates
203 .inner
204 .partition::<CapacityContainerBuilder<_>, _, _>(
205 partition_count,
206 |((output, data), time, diff): &(
207 (usize, Result<SourceMessage, DataflowError>),
208 MzOffset,
209 Diff,
210 )| {
211 let output = u64::cast_from(*output);
212 (output, (data.clone(), time.clone(), diff.clone()))
213 },
214 );
215 let mut data_collections = BTreeMap::new();
216 for (id, data_stream) in config.source_exports.keys().zip_eq(data_streams) {
217 data_collections.insert(*id, data_stream.as_collection());
218 }
219
220 let init = std::iter::once(HealthStatusMessage {
221 id: None,
222 namespace: Self::STATUS_NAMESPACE,
223 update: HealthStatusUpdate::Running,
224 })
225 .to_stream(scope);
226
227 let errs = snapshot_err.concat(&repl_err).map(move |err| {
231 let err_string = err.display_with_causes().to_string();
233 let update = HealthStatusUpdate::halting(err_string.clone(), None);
234
235 let namespace = match err {
236 ReplicationError::Transient(err)
237 if matches!(
238 &*err,
239 TransientError::PostgresError(PostgresError::Ssh(_))
240 | TransientError::PostgresError(PostgresError::SshIo(_))
241 ) =>
242 {
243 StatusNamespace::Ssh
244 }
245 _ => Self::STATUS_NAMESPACE,
246 };
247
248 HealthStatusMessage {
249 id: None,
250 namespace: namespace.clone(),
251 update,
252 }
253 });
254
255 let health = init.concat(&errs);
256
257 (
258 data_collections,
259 uppers,
260 health,
261 stats_stream,
262 probe_stream,
263 vec![snapshot_token, repl_token],
264 )
265 }
266}
267
268#[derive(Clone, Debug)]
269struct SourceOutputInfo {
270 desc: PostgresTableDesc,
271 casts: Vec<(CastType, MirScalarExpr)>,
272 resume_upper: Antichain<MzOffset>,
273}
274
275#[derive(Clone, Debug, thiserror::Error)]
276pub enum ReplicationError {
277 #[error(transparent)]
278 Transient(#[from] Rc<TransientError>),
279 #[error(transparent)]
280 Definite(#[from] Rc<DefiniteError>),
281}
282
283#[derive(Debug, thiserror::Error)]
285pub enum TransientError {
286 #[error("replication slot mysteriously missing")]
287 MissingReplicationSlot,
288 #[error(
289 "slot overcompacted. Requested LSN {requested_lsn} but only LSNs >= {available_lsn} are available"
290 )]
291 OvercompactedReplicationSlot {
292 requested_lsn: MzOffset,
293 available_lsn: MzOffset,
294 },
295 #[error("replication slot already exists")]
296 ReplicationSlotAlreadyExists,
297 #[error("stream ended prematurely")]
298 ReplicationEOF,
299 #[error("unexpected replication message")]
300 UnknownReplicationMessage,
301 #[error("unexpected logical replication message")]
302 UnknownLogicalReplicationMessage,
303 #[error("received replication event outside of transaction")]
304 BareTransactionEvent,
305 #[error("lsn mismatch between BEGIN and COMMIT")]
306 InvalidTransaction,
307 #[error("BEGIN within existing BEGIN stream")]
308 NestedTransaction,
309 #[error("recoverable errors should crash the process during snapshots")]
310 SyntheticError,
311 #[error("sql client error")]
312 SQLClient(#[from] tokio_postgres::Error),
313 #[error(transparent)]
314 PostgresError(#[from] PostgresError),
315 #[error(transparent)]
316 Generic(#[from] anyhow::Error),
317}
318
319#[derive(Debug, Clone, Serialize, Deserialize, thiserror::Error)]
321pub enum DefiniteError {
322 #[error("slot compacted past snapshot point. snapshot consistent point={0} resume_lsn={1}")]
323 SlotCompactedPastResumePoint(MzOffset, MzOffset),
324 #[error("table was truncated")]
325 TableTruncated,
326 #[error("table was dropped")]
327 TableDropped,
328 #[error("publication {0:?} does not exist")]
329 PublicationDropped(String),
330 #[error("replication slot has been invalidated because it exceeded the maximum reserved size")]
331 InvalidReplicationSlot,
332 #[error("unexpected number of columns while parsing COPY output")]
333 MissingColumn,
334 #[error("failed to parse COPY protocol")]
335 InvalidCopyInput,
336 #[error(
337 "unsupported action: database restored from point-in-time backup. Expected timeline ID {expected} but got {actual}"
338 )]
339 InvalidTimelineId { expected: u64, actual: u64 },
340 #[error(
341 "TOASTed value missing from old row. Did you forget to set REPLICA IDENTITY to FULL for your table?"
342 )]
343 MissingToast,
344 #[error(
345 "old row missing from replication stream. Did you forget to set REPLICA IDENTITY to FULL for your table?"
346 )]
347 DefaultReplicaIdentity,
348 #[error("incompatible schema change: {0}")]
349 IncompatibleSchema(String),
351 #[error("invalid UTF8 string: {0:?}")]
352 InvalidUTF8(Vec<u8>),
353 #[error("failed to cast raw column: {0}")]
354 CastError(#[source] EvalError),
355 #[error("unexpected binary data in replication stream")]
356 UnexpectedBinaryData,
357}
358
359impl From<DefiniteError> for DataflowError {
360 fn from(err: DefiniteError) -> Self {
361 let m = err.to_string().into();
362 DataflowError::SourceError(Box::new(SourceError {
363 error: match &err {
364 DefiniteError::SlotCompactedPastResumePoint(_, _) => SourceErrorDetails::Other(m),
365 DefiniteError::TableTruncated => SourceErrorDetails::Other(m),
366 DefiniteError::TableDropped => SourceErrorDetails::Other(m),
367 DefiniteError::PublicationDropped(_) => SourceErrorDetails::Initialization(m),
368 DefiniteError::InvalidReplicationSlot => SourceErrorDetails::Initialization(m),
369 DefiniteError::MissingColumn => SourceErrorDetails::Other(m),
370 DefiniteError::InvalidCopyInput => SourceErrorDetails::Other(m),
371 DefiniteError::InvalidTimelineId { .. } => SourceErrorDetails::Initialization(m),
372 DefiniteError::MissingToast => SourceErrorDetails::Other(m),
373 DefiniteError::DefaultReplicaIdentity => SourceErrorDetails::Other(m),
374 DefiniteError::IncompatibleSchema(_) => SourceErrorDetails::Other(m),
375 DefiniteError::InvalidUTF8(_) => SourceErrorDetails::Other(m),
376 DefiniteError::CastError(_) => SourceErrorDetails::Other(m),
377 DefiniteError::UnexpectedBinaryData => SourceErrorDetails::Other(m),
378 },
379 }))
380 }
381}
382
383async fn ensure_replication_slot(client: &Client, slot: &str) -> Result<(), TransientError> {
384 let slot = Ident::new_unchecked(slot).to_ast_string_simple();
386 let query = format!("CREATE_REPLICATION_SLOT {slot} LOGICAL \"pgoutput\" NOEXPORT_SNAPSHOT");
387 match simple_query_opt(client, &query).await {
388 Ok(_) => Ok(()),
389 Err(PostgresError::Postgres(err)) if err.code() == Some(&SqlState::DUPLICATE_OBJECT) => {
391 tracing::trace!("replication slot {slot} already existed");
392 Ok(())
393 }
394 Err(err) => Err(TransientError::PostgresError(err)),
395 }
396}
397
398struct SlotMetadata {
400 active_pid: Option<i32>,
403 confirmed_flush_lsn: MzOffset,
406}
407
408async fn fetch_slot_metadata(
410 client: &Client,
411 slot: &str,
412 interval: Duration,
413) -> Result<SlotMetadata, TransientError> {
414 loop {
415 let query = "SELECT active_pid, confirmed_flush_lsn
416 FROM pg_replication_slots WHERE slot_name = $1";
417 let Some(row) = client.query_opt(query, &[&slot]).await? else {
418 return Err(TransientError::MissingReplicationSlot);
419 };
420
421 match row.get::<_, Option<PgLsn>>("confirmed_flush_lsn") {
422 Some(lsn) => {
426 return Ok(SlotMetadata {
427 confirmed_flush_lsn: MzOffset::from(lsn),
428 active_pid: row.get("active_pid"),
429 });
430 }
431 None => tokio::time::sleep(interval).await,
435 };
436 }
437}
438
439async fn fetch_max_lsn(client: &Client) -> Result<MzOffset, TransientError> {
441 let query = "SELECT pg_current_wal_lsn()";
442 let row = simple_query_opt(client, query).await?;
443
444 match row.and_then(|row| {
445 row.get("pg_current_wal_lsn")
446 .map(|lsn| lsn.parse::<PgLsn>().unwrap())
447 }) {
448 Some(lsn) => Ok(MzOffset::from(lsn)),
453 None => Err(TransientError::Generic(anyhow::anyhow!(
454 "pg_current_wal_lsn() mysteriously has no value"
455 ))),
456 }
457}
458
459fn verify_schema(
462 oid: u32,
463 expected_desc: &PostgresTableDesc,
464 upstream_info: &BTreeMap<u32, PostgresTableDesc>,
465 casts: &[(CastType, MirScalarExpr)],
466) -> Result<(), DefiniteError> {
467 let current_desc = upstream_info.get(&oid).ok_or(DefiniteError::TableDropped)?;
468
469 let allow_oids_to_change_by_col_num = expected_desc
470 .columns
471 .iter()
472 .zip_eq(casts.iter())
473 .flat_map(|(col, (cast_type, _))| match cast_type {
474 CastType::Text => Some(col.col_num),
475 CastType::Natural => None,
476 })
477 .collect();
478
479 match expected_desc.determine_compatibility(current_desc, &allow_oids_to_change_by_col_num) {
480 Ok(()) => Ok(()),
481 Err(err) => Err(DefiniteError::IncompatibleSchema(err.to_string())),
482 }
483}
484
485fn cast_row(
487 casts: &[(CastType, MirScalarExpr)],
488 datums: &[Datum<'_>],
489 row: &mut Row,
490) -> Result<(), DefiniteError> {
491 let arena = mz_repr::RowArena::new();
492 let mut packer = row.packer();
493 for (_, column_cast) in casts {
494 let datum = column_cast
495 .eval(datums, &arena)
496 .map_err(DefiniteError::CastError)?;
497 packer.push(datum);
498 }
499 Ok(())
500}
501
502fn decode_utf8_text(bytes: &[u8]) -> Result<Datum<'_>, DefiniteError> {
504 match std::str::from_utf8(bytes) {
505 Ok(text) => Ok(Datum::String(text)),
506 Err(_) => Err(DefiniteError::InvalidUTF8(bytes.to_vec())),
507 }
508}