1use std::collections::BTreeMap;
83use std::convert::Infallible;
84use std::rc::Rc;
85use std::time::Duration;
86
87use differential_dataflow::AsCollection;
88use itertools::Itertools as _;
89use mz_expr::{EvalError, MirScalarExpr};
90use mz_ore::cast::CastFrom;
91use mz_ore::error::ErrorExt;
92use mz_postgres_util::desc::PostgresTableDesc;
93use mz_postgres_util::{Client, PostgresError, simple_query_opt};
94use mz_repr::{Datum, Diff, GlobalId, Row};
95use mz_sql_parser::ast::Ident;
96use mz_sql_parser::ast::display::AstDisplay;
97use mz_storage_types::errors::{DataflowError, SourceError, SourceErrorDetails};
98use mz_storage_types::sources::postgres::CastType;
99use mz_storage_types::sources::{
100    MzOffset, PostgresSourceConnection, SourceExport, SourceExportDetails, SourceTimestamp,
101};
102use mz_timely_util::builder_async::PressOnDropButton;
103use serde::{Deserialize, Serialize};
104use timely::container::CapacityContainerBuilder;
105use timely::dataflow::operators::core::Partition;
106use timely::dataflow::operators::{Concat, Map, ToStream};
107use timely::dataflow::{Scope, Stream};
108use timely::progress::Antichain;
109use tokio_postgres::error::SqlState;
110use tokio_postgres::types::PgLsn;
111
112use crate::healthcheck::{HealthStatusMessage, HealthStatusUpdate, StatusNamespace};
113use crate::source::types::{Probe, SourceRender, StackedCollection};
114use crate::source::{RawSourceCreationConfig, SourceMessage};
115
116mod replication;
117mod snapshot;
118
119impl SourceRender for PostgresSourceConnection {
120    type Time = MzOffset;
121
122    const STATUS_NAMESPACE: StatusNamespace = StatusNamespace::Postgres;
123
124    fn render<G: Scope<Timestamp = MzOffset>>(
127        self,
128        scope: &mut G,
129        config: &RawSourceCreationConfig,
130        resume_uppers: impl futures::Stream<Item = Antichain<MzOffset>> + 'static,
131        _start_signal: impl std::future::Future<Output = ()> + 'static,
132    ) -> (
133        BTreeMap<GlobalId, StackedCollection<G, Result<SourceMessage, DataflowError>>>,
134        Stream<G, Infallible>,
135        Stream<G, HealthStatusMessage>,
136        Option<Stream<G, Probe<MzOffset>>>,
137        Vec<PressOnDropButton>,
138    ) {
139        let mut table_info = BTreeMap::new();
141        for (idx, (id, export)) in config.source_exports.iter().enumerate() {
142            let SourceExport {
143                details,
144                storage_metadata: _,
145                data_config: _,
146            } = export;
147            let details = match details {
148                SourceExportDetails::Postgres(details) => details,
149                SourceExportDetails::None => continue,
151                _ => panic!("unexpected source export details: {:?}", details),
152            };
153            let desc = details.table.clone();
154            let casts = details.column_casts.clone();
155            let resume_upper = Antichain::from_iter(
156                config
157                    .source_resume_uppers
158                    .get(id)
159                    .expect("all source exports must be present in source resume uppers")
160                    .iter()
161                    .map(MzOffset::decode_row),
162            );
163            let output = SourceOutputInfo {
164                desc,
165                projection: None,
166                casts,
167                resume_upper,
168                export_id: id.clone(),
169            };
170            table_info
171                .entry(output.desc.oid)
172                .or_insert_with(BTreeMap::new)
173                .insert(idx, output);
174        }
175
176        let metrics = config.metrics.get_postgres_source_metrics(config.id);
177
178        let (snapshot_updates, rewinds, slot_ready, snapshot_err, snapshot_token) =
179            snapshot::render(
180                scope.clone(),
181                config.clone(),
182                self.clone(),
183                table_info.clone(),
184                metrics.snapshot_metrics.clone(),
185            );
186
187        let (repl_updates, uppers, probe_stream, repl_err, repl_token) = replication::render(
188            scope.clone(),
189            config.clone(),
190            self,
191            table_info,
192            &rewinds,
193            &slot_ready,
194            resume_uppers,
195            metrics,
196        );
197
198        let updates = snapshot_updates.concat(&repl_updates);
199        let partition_count = u64::cast_from(config.source_exports.len());
200        let data_streams: Vec<_> = updates
201            .inner
202            .partition::<CapacityContainerBuilder<_>, _, _>(
203                partition_count,
204                |((output, data), time, diff): &(
205                    (usize, Result<SourceMessage, DataflowError>),
206                    MzOffset,
207                    Diff,
208                )| {
209                    let output = u64::cast_from(*output);
210                    (output, (data.clone(), time.clone(), diff.clone()))
211                },
212            );
213        let mut data_collections = BTreeMap::new();
214        for (id, data_stream) in config.source_exports.keys().zip_eq(data_streams) {
215            data_collections.insert(*id, data_stream.as_collection());
216        }
217
218        let export_ids = config.source_exports.keys().copied();
219        let health_init = export_ids
220            .map(Some)
221            .chain(std::iter::once(None))
222            .map(|id| HealthStatusMessage {
223                id,
224                namespace: Self::STATUS_NAMESPACE,
225                update: HealthStatusUpdate::Running,
226            })
227            .collect::<Vec<_>>()
228            .to_stream(scope);
229
230        let errs = snapshot_err.concat(&repl_err).map(move |err| {
234            let err_string = err.display_with_causes().to_string();
236            let update = HealthStatusUpdate::halting(err_string.clone(), None);
237
238            let namespace = match err {
239                ReplicationError::Transient(err)
240                    if matches!(
241                        &*err,
242                        TransientError::PostgresError(PostgresError::Ssh(_))
243                            | TransientError::PostgresError(PostgresError::SshIo(_))
244                    ) =>
245                {
246                    StatusNamespace::Ssh
247                }
248                _ => Self::STATUS_NAMESPACE,
249            };
250
251            HealthStatusMessage {
252                id: None,
253                namespace: namespace.clone(),
254                update,
255            }
256        });
257
258        let health = health_init.concat(&errs);
259
260        (
261            data_collections,
262            uppers,
263            health,
264            probe_stream,
265            vec![snapshot_token, repl_token],
266        )
267    }
268}
269
270#[derive(Clone, Debug)]
271struct SourceOutputInfo {
272    desc: PostgresTableDesc,
274    projection: Option<Vec<usize>>,
278    casts: Vec<(CastType, MirScalarExpr)>,
279    resume_upper: Antichain<MzOffset>,
280    export_id: GlobalId,
281}
282
283#[derive(Clone, Debug, thiserror::Error)]
284pub enum ReplicationError {
285    #[error(transparent)]
286    Transient(#[from] Rc<TransientError>),
287    #[error(transparent)]
288    Definite(#[from] Rc<DefiniteError>),
289}
290
291#[derive(Debug, thiserror::Error)]
293pub enum TransientError {
294    #[error("replication slot mysteriously missing")]
295    MissingReplicationSlot,
296    #[error(
297        "slot overcompacted. Requested LSN {requested_lsn} but only LSNs >= {available_lsn} are available"
298    )]
299    OvercompactedReplicationSlot {
300        requested_lsn: MzOffset,
301        available_lsn: MzOffset,
302    },
303    #[error("replication slot already exists")]
304    ReplicationSlotAlreadyExists,
305    #[error("stream ended prematurely")]
306    ReplicationEOF,
307    #[error("unexpected replication message")]
308    UnknownReplicationMessage,
309    #[error("unexpected logical replication message")]
310    UnknownLogicalReplicationMessage,
311    #[error("received replication event outside of transaction")]
312    BareTransactionEvent,
313    #[error("lsn mismatch between BEGIN and COMMIT")]
314    InvalidTransaction,
315    #[error("BEGIN within existing BEGIN stream")]
316    NestedTransaction,
317    #[error("recoverable errors should crash the process during snapshots")]
318    SyntheticError,
319    #[error("sql client error")]
320    SQLClient(#[from] tokio_postgres::Error),
321    #[error(transparent)]
322    PostgresError(#[from] PostgresError),
323    #[error(transparent)]
324    Generic(#[from] anyhow::Error),
325}
326
327#[derive(Debug, Clone, Serialize, Deserialize, thiserror::Error)]
329pub enum DefiniteError {
330    #[error("slot compacted past snapshot point. snapshot consistent point={0} resume_lsn={1}")]
331    SlotCompactedPastResumePoint(MzOffset, MzOffset),
332    #[error("table was truncated")]
333    TableTruncated,
334    #[error("table was dropped")]
335    TableDropped,
336    #[error("publication {0:?} does not exist")]
337    PublicationDropped(String),
338    #[error("replication slot has been invalidated because it exceeded the maximum reserved size")]
339    InvalidReplicationSlot,
340    #[error("unexpected number of columns while parsing COPY output")]
341    MissingColumn,
342    #[error("failed to parse COPY protocol")]
343    InvalidCopyInput,
344    #[error(
345        "unsupported action: database restored from point-in-time backup. Expected timeline ID {expected} but got {actual}"
346    )]
347    InvalidTimelineId { expected: u64, actual: u64 },
348    #[error(
349        "TOASTed value missing from old row. Did you forget to set REPLICA IDENTITY to FULL for your table?"
350    )]
351    MissingToast,
352    #[error(
353        "old row missing from replication stream. Did you forget to set REPLICA IDENTITY to FULL for your table?"
354    )]
355    DefaultReplicaIdentity,
356    #[error("incompatible schema change: {0}")]
357    IncompatibleSchema(String),
359    #[error("invalid UTF8 string: {0:?}")]
360    InvalidUTF8(Vec<u8>),
361    #[error("failed to cast raw column: {0}")]
362    CastError(#[source] EvalError),
363    #[error("unexpected binary data in replication stream")]
364    UnexpectedBinaryData,
365}
366
367impl From<DefiniteError> for DataflowError {
368    fn from(err: DefiniteError) -> Self {
369        let m = err.to_string().into();
370        DataflowError::SourceError(Box::new(SourceError {
371            error: match &err {
372                DefiniteError::SlotCompactedPastResumePoint(_, _) => SourceErrorDetails::Other(m),
373                DefiniteError::TableTruncated => SourceErrorDetails::Other(m),
374                DefiniteError::TableDropped => SourceErrorDetails::Other(m),
375                DefiniteError::PublicationDropped(_) => SourceErrorDetails::Initialization(m),
376                DefiniteError::InvalidReplicationSlot => SourceErrorDetails::Initialization(m),
377                DefiniteError::MissingColumn => SourceErrorDetails::Other(m),
378                DefiniteError::InvalidCopyInput => SourceErrorDetails::Other(m),
379                DefiniteError::InvalidTimelineId { .. } => SourceErrorDetails::Initialization(m),
380                DefiniteError::MissingToast => SourceErrorDetails::Other(m),
381                DefiniteError::DefaultReplicaIdentity => SourceErrorDetails::Other(m),
382                DefiniteError::IncompatibleSchema(_) => SourceErrorDetails::Other(m),
383                DefiniteError::InvalidUTF8(_) => SourceErrorDetails::Other(m),
384                DefiniteError::CastError(_) => SourceErrorDetails::Other(m),
385                DefiniteError::UnexpectedBinaryData => SourceErrorDetails::Other(m),
386            },
387        }))
388    }
389}
390
391async fn ensure_replication_slot(client: &Client, slot: &str) -> Result<(), TransientError> {
392    let slot = Ident::new_unchecked(slot).to_ast_string_simple();
394    let query = format!("CREATE_REPLICATION_SLOT {slot} LOGICAL \"pgoutput\" NOEXPORT_SNAPSHOT");
395    match simple_query_opt(client, &query).await {
396        Ok(_) => Ok(()),
397        Err(PostgresError::Postgres(err)) if err.code() == Some(&SqlState::DUPLICATE_OBJECT) => {
399            tracing::trace!("replication slot {slot} already existed");
400            Ok(())
401        }
402        Err(err) => Err(TransientError::PostgresError(err)),
403    }
404}
405
406struct SlotMetadata {
408    active_pid: Option<i32>,
411    confirmed_flush_lsn: MzOffset,
414}
415
416async fn fetch_slot_metadata(
418    client: &Client,
419    slot: &str,
420    interval: Duration,
421) -> Result<SlotMetadata, TransientError> {
422    loop {
423        let query = "SELECT active_pid, confirmed_flush_lsn
424                FROM pg_replication_slots WHERE slot_name = $1";
425        let Some(row) = client.query_opt(query, &[&slot]).await? else {
426            return Err(TransientError::MissingReplicationSlot);
427        };
428
429        match row.get::<_, Option<PgLsn>>("confirmed_flush_lsn") {
430            Some(lsn) => {
434                return Ok(SlotMetadata {
435                    confirmed_flush_lsn: MzOffset::from(lsn),
436                    active_pid: row.get("active_pid"),
437                });
438            }
439            None => tokio::time::sleep(interval).await,
443        };
444    }
445}
446
447async fn fetch_max_lsn(client: &Client) -> Result<MzOffset, TransientError> {
449    let query = "SELECT pg_current_wal_lsn()";
450    let row = simple_query_opt(client, query).await?;
451
452    match row.and_then(|row| {
453        row.get("pg_current_wal_lsn")
454            .map(|lsn| lsn.parse::<PgLsn>().unwrap())
455    }) {
456        Some(lsn) => Ok(MzOffset::from(lsn)),
461        None => Err(TransientError::Generic(anyhow::anyhow!(
462            "pg_current_wal_lsn() mysteriously has no value"
463        ))),
464    }
465}
466
467fn verify_schema(
470    oid: u32,
471    info: &SourceOutputInfo,
472    upstream_info: &BTreeMap<u32, PostgresTableDesc>,
473) -> Result<(), DefiniteError> {
474    let current_desc = upstream_info.get(&oid).ok_or(DefiniteError::TableDropped)?;
475
476    let allow_oids_to_change_by_col_num = info
477        .desc
478        .columns
479        .iter()
480        .zip_eq(info.casts.iter())
481        .flat_map(|(col, (cast_type, _))| match cast_type {
482            CastType::Text => Some(col.col_num),
483            CastType::Natural => None,
484        })
485        .collect();
486
487    match info
488        .desc
489        .determine_compatibility(current_desc, &allow_oids_to_change_by_col_num)
490    {
491        Ok(()) => Ok(()),
492        Err(err) => Err(DefiniteError::IncompatibleSchema(err.to_string())),
493    }
494}
495
496fn cast_row(
498    casts: &[(CastType, MirScalarExpr)],
499    datums: &[Datum<'_>],
500    row: &mut Row,
501) -> Result<(), DefiniteError> {
502    let arena = mz_repr::RowArena::new();
503    let mut packer = row.packer();
504    for (_, column_cast) in casts {
505        let datum = column_cast
506            .eval(datums, &arena)
507            .map_err(DefiniteError::CastError)?;
508        packer.push(datum);
509    }
510    Ok(())
511}
512
513fn decode_utf8_text(bytes: &[u8]) -> Result<Datum<'_>, DefiniteError> {
515    match std::str::from_utf8(bytes) {
516        Ok(text) => Ok(Datum::String(text)),
517        Err(_) => Err(DefiniteError::InvalidUTF8(bytes.to_vec())),
518    }
519}