Skip to main content

mz_storage/source/
postgres.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Code to render the ingestion dataflow of a [`PostgresSourceConnection`]. The dataflow consists
11//! of multiple operators in order to take advantage of all the available workers.
12//!
13//! # Snapshot
14//!
15//! One part of the dataflow deals with snapshotting the tables involved in the ingestion. Each
16//! table is partitioned across all workers using PostgreSQL's `ctid` column to identify row
17//! ranges. Each worker fetches its assigned range using a `COPY` query with ctid filtering,
18//! enabling parallel snapshotting of large tables.
19//!
20//! For all tables that ended up being snapshotted the snapshot reader also emits a rewind request
21//! to the replication reader which will ensure that the requested portion of the replication
22//! stream is subtracted from the snapshot.
23//!
24//! See the [snapshot] module for more information on the snapshot strategy.
25//!
26//! # Replication
27//!
28//! The other part of the dataflow deals with reading the logical replication slot, which must
29//! happen from a single worker. The minimum amount of processing is performed from that worker
30//! and the data is then distributed among all workers for decoding.
31//!
32//! See the [replication] module for more information on the replication strategy.
33//!
34//! # Error handling
35//!
36//! There are two kinds of errors that can happen during ingestion that are represented as two
37//! separate error types:
38//!
39//! [`DefiniteError`]s are errors that happen during processing of a specific
40//! collection record at a specific LSN. These are the only errors that can ever end up in the
41//! error collection of a subsource.
42//!
43//! Transient errors are any errors that can happen for reasons that are unrelated to the data
44//! itself. This could be authentication failures, connection failures, etc. The only operators
45//! that can emit such errors are the `TableReader` and the `ReplicationReader` operators, which
46//! are the ones that talk to the external world. Both of these operators are built with the
47//! `AsyncOperatorBuilder::build_fallible` method which allows transient errors to be propagated
48//! upwards with the standard `?` operator without risking downgrading the capability and producing
49//! bogus frontiers.
50//!
51//! The error streams from both of those operators are published to the source status and also
52//! trigger a restart of the dataflow.
53//!
54//! ```text
55//!    ┏━━━━━━━━━━━━━━┓
56//!    ┃    table     ┃
57//!    ┃    reader    ┃
58//!    ┗━┯━━━━━━━━━━┯━┛
59//!      │          │rewind
60//!      │          │requests
61//!      │          ╰────╮
62//!      │             ┏━v━━━━━━━━━━━┓
63//!      │             ┃ replication ┃
64//!      │             ┃   reader    ┃
65//!      │             ┗━┯━━━━━━━━━┯━┛
66//!  COPY│           slot│         │
67//!  data│           data│         │
68//! ┏━━━━v━━━━━┓ ┏━━━━━━━v━━━━━┓   │
69//! ┃  COPY    ┃ ┃ replication ┃   │
70//! ┃ decoder  ┃ ┃   decoder   ┃   │
71//! ┗━━━━┯━━━━━┛ ┗━━━━━┯━━━━━━━┛   │
72//!      │snapshot     │replication│
73//!      │updates      │updates    │
74//!      ╰────╮    ╭───╯           │
75//!          ╭┴────┴╮              │
76//!          │concat│              │
77//!          ╰──┬───╯              │
78//!             │ data             │progress
79//!             │ output           │output
80//!             v                  v
81//! ```
82
83use std::collections::BTreeMap;
84use std::rc::Rc;
85use std::time::Duration;
86
87use differential_dataflow::AsCollection;
88use itertools::Itertools as _;
89use mz_expr::EvalError;
90use mz_ore::cast::CastFrom;
91use mz_ore::error::ErrorExt;
92use mz_postgres_util::desc::PostgresTableDesc;
93use mz_postgres_util::{Client, PostgresError, simple_query_opt};
94use mz_repr::{Datum, Diff, GlobalId, Row};
95use mz_sql_parser::ast::Ident;
96use mz_sql_parser::ast::display::AstDisplay;
97use mz_storage_types::errors::{DataflowError, SourceError, SourceErrorDetails};
98use mz_storage_types::sources::casts::StorageScalarExpr;
99use mz_storage_types::sources::postgres::CastType;
100use mz_storage_types::sources::{
101    MzOffset, PostgresSourceConnection, SourceExport, SourceExportDetails, SourceTimestamp,
102};
103use mz_timely_util::builder_async::PressOnDropButton;
104use serde::{Deserialize, Serialize};
105use timely::container::CapacityContainerBuilder;
106use timely::dataflow::operators::Concat;
107use timely::dataflow::operators::core::Partition;
108use timely::dataflow::operators::vec::{Map, ToStream};
109use timely::dataflow::{Scope, StreamVec};
110use timely::progress::Antichain;
111use tokio_postgres::error::SqlState;
112use tokio_postgres::types::PgLsn;
113
114use crate::healthcheck::{HealthStatusMessage, HealthStatusUpdate, StatusNamespace};
115use crate::source::types::{Probe, SourceRender, StackedCollection};
116use crate::source::{RawSourceCreationConfig, SourceMessage};
117
118mod replication;
119mod snapshot;
120
121impl SourceRender for PostgresSourceConnection {
122    type Time = MzOffset;
123
124    const STATUS_NAMESPACE: StatusNamespace = StatusNamespace::Postgres;
125
126    /// Render the ingestion dataflow. This function only connects things together and contains no
127    /// actual processing logic.
128    fn render<'scope>(
129        self,
130        scope: Scope<'scope, MzOffset>,
131        config: &RawSourceCreationConfig,
132        resume_uppers: impl futures::Stream<Item = Antichain<MzOffset>> + 'static,
133        _start_signal: impl std::future::Future<Output = ()> + 'static,
134    ) -> (
135        BTreeMap<
136            GlobalId,
137            StackedCollection<'scope, MzOffset, Result<SourceMessage, DataflowError>>,
138        >,
139        StreamVec<'scope, MzOffset, HealthStatusMessage>,
140        StreamVec<'scope, MzOffset, Probe<MzOffset>>,
141        Vec<PressOnDropButton>,
142    ) {
143        // Collect the source outputs that we will be exporting into a per-table map.
144        let mut table_info = BTreeMap::new();
145        for (idx, (id, export)) in config.source_exports.iter().enumerate() {
146            let SourceExport {
147                details,
148                storage_metadata: _,
149                data_config: _,
150            } = export;
151            let details = match details {
152                SourceExportDetails::Postgres(details) => details,
153                // This is an export that doesn't need any data output to it.
154                SourceExportDetails::None => continue,
155                _ => panic!("unexpected source export details: {:?}", details),
156            };
157            let desc = details.table.clone();
158            let casts = details.column_casts.clone();
159            let resume_upper = Antichain::from_iter(
160                config
161                    .source_resume_uppers
162                    .get(id)
163                    .expect("all source exports must be present in source resume uppers")
164                    .iter()
165                    .map(MzOffset::decode_row),
166            );
167            let output = SourceOutputInfo {
168                desc,
169                projection: None,
170                casts,
171                resume_upper,
172                export_id: id.clone(),
173            };
174            table_info
175                .entry(output.desc.oid)
176                .or_insert_with(BTreeMap::new)
177                .insert(idx, output);
178        }
179
180        let metrics = config.metrics.get_postgres_source_metrics(config.id);
181
182        let (snapshot_updates, rewinds, slot_ready, snapshot_err, snapshot_token) =
183            snapshot::render(
184                scope.clone(),
185                config.clone(),
186                self.clone(),
187                table_info.clone(),
188                metrics.snapshot_metrics.clone(),
189            );
190
191        let (repl_updates, probe_stream, repl_err, repl_token) = replication::render(
192            scope.clone(),
193            config.clone(),
194            self,
195            table_info,
196            rewinds,
197            slot_ready,
198            resume_uppers,
199            metrics,
200        );
201
202        let updates = snapshot_updates.concat(repl_updates);
203        let partition_count = u64::cast_from(config.source_exports.len());
204        let data_streams: Vec<_> = updates
205            .inner
206            .partition::<CapacityContainerBuilder<_>, _, _>(
207                partition_count,
208                |((output, data), time, diff): &(
209                    (usize, Result<SourceMessage, DataflowError>),
210                    MzOffset,
211                    Diff,
212                )| {
213                    let output = u64::cast_from(*output);
214                    (output, (data.clone(), time.clone(), diff.clone()))
215                },
216            );
217        let mut data_collections = BTreeMap::new();
218        for (id, data_stream) in config.source_exports.keys().zip_eq(data_streams) {
219            data_collections.insert(*id, data_stream.as_collection());
220        }
221
222        let export_ids = config.source_exports.keys().copied();
223        let health_init = export_ids
224            .map(Some)
225            .chain(std::iter::once(None))
226            .map(|id| HealthStatusMessage {
227                id,
228                namespace: Self::STATUS_NAMESPACE,
229                update: HealthStatusUpdate::Running,
230            })
231            .collect::<Vec<_>>()
232            .to_stream(scope);
233
234        // N.B. Note that we don't check ssh tunnel statuses here. We could, but immediately on
235        // restart we are going to set the status to an ssh error correctly, so we don't do this
236        // extra work.
237        let errs = snapshot_err.concat(repl_err).map(move |err| {
238            // This update will cause the dataflow to restart
239            let err_string = err.display_with_causes().to_string();
240            let update = HealthStatusUpdate::halting(err_string.clone(), None);
241
242            let namespace = match err {
243                ReplicationError::Transient(err)
244                    if matches!(
245                        &*err,
246                        TransientError::PostgresError(PostgresError::Ssh(_))
247                            | TransientError::PostgresError(PostgresError::SshIo(_))
248                    ) =>
249                {
250                    StatusNamespace::Ssh
251                }
252                _ => Self::STATUS_NAMESPACE,
253            };
254
255            HealthStatusMessage {
256                id: None,
257                namespace: namespace.clone(),
258                update,
259            }
260        });
261
262        let health = health_init.concat(errs);
263
264        (
265            data_collections,
266            health,
267            probe_stream,
268            vec![snapshot_token, repl_token],
269        )
270    }
271}
272
273#[derive(Clone, Debug)]
274struct SourceOutputInfo {
275    /// The expected upstream schema of this output.
276    desc: PostgresTableDesc,
277    /// A projection of the upstream columns into the columns expected by this output. This field
278    /// is recalculated every time we observe an upstream schema change. On dataflow initialization
279    /// this field is None since we haven't yet observed any schemas.
280    projection: Option<Vec<usize>>,
281    casts: Vec<(CastType, StorageScalarExpr)>,
282    resume_upper: Antichain<MzOffset>,
283    export_id: GlobalId,
284}
285
286#[derive(Clone, Debug, thiserror::Error)]
287pub enum ReplicationError {
288    #[error(transparent)]
289    Transient(#[from] Rc<TransientError>),
290    #[error(transparent)]
291    Definite(#[from] Rc<DefiniteError>),
292}
293
294/// A transient error that never ends up in the collection of a specific table.
295#[derive(Debug, thiserror::Error)]
296pub enum TransientError {
297    #[error("replication slot mysteriously missing")]
298    MissingReplicationSlot,
299    #[error(
300        "slot overcompacted. Requested LSN {requested_lsn} but only LSNs >= {available_lsn} are available"
301    )]
302    OvercompactedReplicationSlot {
303        requested_lsn: MzOffset,
304        available_lsn: MzOffset,
305    },
306    #[error("replication slot already exists")]
307    ReplicationSlotAlreadyExists,
308    #[error("stream ended prematurely")]
309    ReplicationEOF,
310    #[error("unexpected replication message")]
311    UnknownReplicationMessage,
312    #[error("unexpected logical replication message")]
313    UnknownLogicalReplicationMessage,
314    #[error("received replication event outside of transaction")]
315    BareTransactionEvent,
316    #[error("lsn mismatch between BEGIN and COMMIT")]
317    InvalidTransaction,
318    #[error("BEGIN within existing BEGIN stream")]
319    NestedTransaction,
320    #[error("recoverable errors should crash the process during snapshots")]
321    SyntheticError,
322    #[error("sql client error")]
323    SQLClient(#[from] tokio_postgres::Error),
324    #[error(transparent)]
325    PostgresError(#[from] PostgresError),
326    #[error(transparent)]
327    Generic(#[from] anyhow::Error),
328}
329
330/// A definite error that always ends up in the collection of a specific table.
331#[derive(Debug, Clone, Serialize, Deserialize, thiserror::Error)]
332pub enum DefiniteError {
333    #[error("slot compacted past snapshot point. snapshot consistent point={0} resume_lsn={1}")]
334    SlotCompactedPastResumePoint(MzOffset, MzOffset),
335    #[error("table was truncated")]
336    TableTruncated,
337    #[error("table was dropped")]
338    TableDropped,
339    #[error("publication {0:?} does not exist")]
340    PublicationDropped(String),
341    #[error("replication slot has been invalidated because it exceeded the maximum reserved size")]
342    InvalidReplicationSlot,
343    #[error("unexpected number of columns while parsing COPY output")]
344    MissingColumn,
345    #[error("failed to parse COPY protocol")]
346    InvalidCopyInput,
347    #[error(
348        "unsupported action: database restored from point-in-time backup. Expected timeline ID {expected} but got {actual}"
349    )]
350    InvalidTimelineId { expected: u64, actual: u64 },
351    #[error(
352        "TOASTed value missing from old row. Did you forget to set REPLICA IDENTITY to FULL for your table?"
353    )]
354    MissingToast,
355    #[error(
356        "old row missing from replication stream. Did you forget to set REPLICA IDENTITY to FULL for your table?"
357    )]
358    DefaultReplicaIdentity,
359    #[error("incompatible schema change: {0}")]
360    // TODO: proper error variants for all the expected schema violations
361    IncompatibleSchema(String),
362    #[error("invalid UTF8 string: {0:?}")]
363    InvalidUTF8(Vec<u8>),
364    #[error("failed to cast raw column: {0}")]
365    CastError(#[source] EvalError),
366    #[error("unexpected binary data in replication stream")]
367    UnexpectedBinaryData,
368}
369
370impl From<DefiniteError> for DataflowError {
371    fn from(err: DefiniteError) -> Self {
372        let m = err.to_string().into();
373        DataflowError::SourceError(Box::new(SourceError {
374            error: match &err {
375                DefiniteError::SlotCompactedPastResumePoint(_, _) => SourceErrorDetails::Other(m),
376                DefiniteError::TableTruncated => SourceErrorDetails::Other(m),
377                DefiniteError::TableDropped => SourceErrorDetails::Other(m),
378                DefiniteError::PublicationDropped(_) => SourceErrorDetails::Initialization(m),
379                DefiniteError::InvalidReplicationSlot => SourceErrorDetails::Initialization(m),
380                DefiniteError::MissingColumn => SourceErrorDetails::Other(m),
381                DefiniteError::InvalidCopyInput => SourceErrorDetails::Other(m),
382                DefiniteError::InvalidTimelineId { .. } => SourceErrorDetails::Initialization(m),
383                DefiniteError::MissingToast => SourceErrorDetails::Other(m),
384                DefiniteError::DefaultReplicaIdentity => SourceErrorDetails::Other(m),
385                DefiniteError::IncompatibleSchema(_) => SourceErrorDetails::Other(m),
386                DefiniteError::InvalidUTF8(_) => SourceErrorDetails::Other(m),
387                DefiniteError::CastError(_) => SourceErrorDetails::Other(m),
388                DefiniteError::UnexpectedBinaryData => SourceErrorDetails::Other(m),
389            },
390        }))
391    }
392}
393
394async fn ensure_replication_slot(client: &Client, slot: &str) -> Result<(), TransientError> {
395    // Note: Using unchecked here is okay because we're using it in a SQL query.
396    let slot = Ident::new_unchecked(slot).to_ast_string_simple();
397    let query = format!("CREATE_REPLICATION_SLOT {slot} LOGICAL \"pgoutput\" NOEXPORT_SNAPSHOT");
398    match simple_query_opt(client, &query).await {
399        Ok(_) => Ok(()),
400        // If the slot already exists that's still ok
401        Err(PostgresError::Postgres(err)) if err.code() == Some(&SqlState::DUPLICATE_OBJECT) => {
402            tracing::trace!("replication slot {slot} already existed");
403            Ok(())
404        }
405        Err(err) => Err(TransientError::PostgresError(err)),
406    }
407}
408
409/// The state of a replication slot.
410struct SlotMetadata {
411    /// The process ID of the session using this slot if the slot is currently actively being used.
412    /// None if inactive.
413    active_pid: Option<i32>,
414    /// The address (LSN) up to which the logical slot's consumer has confirmed receiving data.
415    /// Data corresponding to the transactions committed before this LSN is not available anymore.
416    confirmed_flush_lsn: MzOffset,
417}
418
419/// Fetches the minimum LSN at which this slot can safely resume.
420async fn fetch_slot_metadata(
421    client: &Client,
422    slot: &str,
423    interval: Duration,
424) -> Result<SlotMetadata, TransientError> {
425    loop {
426        let query = "SELECT active_pid, confirmed_flush_lsn
427                FROM pg_replication_slots WHERE slot_name = $1";
428        let Some(row) = client.query_opt(query, &[&slot]).await? else {
429            return Err(TransientError::MissingReplicationSlot);
430        };
431
432        match row.get::<_, Option<PgLsn>>("confirmed_flush_lsn") {
433            // For postgres, `confirmed_flush_lsn` means that the slot is able to produce
434            // all transactions that happen at tx_lsn >= confirmed_flush_lsn. Therefore this value
435            // already has "upper" semantics.
436            Some(lsn) => {
437                return Ok(SlotMetadata {
438                    confirmed_flush_lsn: MzOffset::from(lsn),
439                    active_pid: row.get("active_pid"),
440                });
441            }
442            // It can happen that confirmed_flush_lsn is NULL as the slot initializes
443            // This could probably be a `tokio::time::interval`, but its only is called twice,
444            // so its fine like this.
445            None => tokio::time::sleep(interval).await,
446        };
447    }
448}
449
450/// Fetch the `pg_current_wal_lsn`, used to report metrics.
451async fn fetch_max_lsn(client: &Client) -> Result<MzOffset, TransientError> {
452    let query = "SELECT pg_current_wal_lsn()";
453    let row = simple_query_opt(client, query).await?;
454
455    match row.and_then(|row| {
456        row.get("pg_current_wal_lsn")
457            .map(|lsn| lsn.parse::<PgLsn>().unwrap())
458    }) {
459        // Based on the documentation, it appears that `pg_current_wal_lsn` has
460        // the same "upper" semantics of `confirmed_flush_lsn`:
461        // <https://www.postgresql.org/docs/current/functions-admin.html#FUNCTIONS-ADMIN-BACKUP>
462        // We may need to revisit this and use `pg_current_wal_flush_lsn`.
463        Some(lsn) => Ok(MzOffset::from(lsn)),
464        None => Err(TransientError::Generic(anyhow::anyhow!(
465            "pg_current_wal_lsn() mysteriously has no value"
466        ))),
467    }
468}
469
470// Ensures that the table with oid `oid` and expected schema `expected_schema` is still compatible
471// with the current upstream schema `upstream_info`.
472fn verify_schema(
473    oid: u32,
474    info: &SourceOutputInfo,
475    upstream_info: &BTreeMap<u32, PostgresTableDesc>,
476) -> Result<(), DefiniteError> {
477    let current_desc = upstream_info.get(&oid).ok_or(DefiniteError::TableDropped)?;
478
479    let allow_oids_to_change_by_col_num = info
480        .desc
481        .columns
482        .iter()
483        .zip_eq(info.casts.iter())
484        .flat_map(|(col, (cast_type, _))| match cast_type {
485            CastType::Text => Some(col.col_num),
486            CastType::Natural => None,
487        })
488        .collect();
489
490    match info
491        .desc
492        .determine_compatibility(current_desc, &allow_oids_to_change_by_col_num)
493    {
494        Ok(()) => Ok(()),
495        Err(err) => Err(DefiniteError::IncompatibleSchema(err.to_string())),
496    }
497}
498
499/// Casts a text row into the target types
500fn cast_row(
501    casts: &[(CastType, StorageScalarExpr)],
502    datums: &[Datum<'_>],
503    row: &mut Row,
504) -> Result<(), DefiniteError> {
505    let arena = mz_repr::RowArena::new();
506    let mut packer = row.packer();
507    for (_, column_cast) in casts {
508        let datum = column_cast
509            .eval(datums, &arena)
510            .map_err(DefiniteError::CastError)?;
511        packer.push(datum);
512    }
513    Ok(())
514}
515
516/// Converts raw bytes that are expected to be UTF8 encoded into a `Datum::String`
517fn decode_utf8_text(bytes: &[u8]) -> Result<Datum<'_>, DefiniteError> {
518    match std::str::from_utf8(bytes) {
519        Ok(text) => Ok(Datum::String(text)),
520        Err(_) => Err(DefiniteError::InvalidUTF8(bytes.to_vec())),
521    }
522}