mz_storage/source/
postgres.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Code to render the ingestion dataflow of a [`PostgresSourceConnection`]. The dataflow consists
11//! of multiple operators in order to take advantage of all the available workers.
12//!
13//! # Snapshot
14//!
15//! One part of the dataflow deals with snapshotting the tables involved in the ingestion. Each
16//! table that needs a snapshot is assigned to a specific worker which performs a `COPY` query
17//! and distributes the raw COPY bytes to all workers to decode the text encoded rows.
18//!
19//! For all tables that ended up being snapshotted the snapshot reader also emits a rewind request
20//! to the replication reader which will ensure that the requested portion of the replication
21//! stream is subtracted from the snapshot.
22//!
23//! See the [snapshot] module for more information on the snapshot strategy.
24//!
25//! # Replication
26//!
27//! The other part of the dataflow deals with reading the logical replication slot, which must
28//! happen from a single worker. The minimum amount of processing is performed from that worker
29//! and the data is then distributed among all workers for decoding.
30//!
31//! See the [replication] module for more information on the replication strategy.
32//!
33//! # Error handling
34//!
35//! There are two kinds of errors that can happen during ingestion that are represented as two
36//! separate error types:
37//!
38//! [`DefiniteError`]s are errors that happen during processing of a specific
39//! collection record at a specific LSN. These are the only errors that can ever end up in the
40//! error collection of a subsource.
41//!
42//! Transient errors are any errors that can happen for reasons that are unrelated to the data
43//! itself. This could be authentication failures, connection failures, etc. The only operators
44//! that can emit such errors are the `TableReader` and the `ReplicationReader` operators, which
45//! are the ones that talk to the external world. Both of these operators are built with the
46//! `AsyncOperatorBuilder::build_fallible` method which allows transient errors to be propagated
47//! upwards with the standard `?` operator without risking downgrading the capability and producing
48//! bogus frontiers.
49//!
50//! The error streams from both of those operators are published to the source status and also
51//! trigger a restart of the dataflow.
52//!
53//! ```text
54//!    ┏━━━━━━━━━━━━━━┓
55//!    ┃    table     ┃
56//!    ┃    reader    ┃
57//!    ┗━┯━━━━━━━━━━┯━┛
58//!      │          │rewind
59//!      │          │requests
60//!      │          ╰────╮
61//!      │             ┏━v━━━━━━━━━━━┓
62//!      │             ┃ replication ┃
63//!      │             ┃   reader    ┃
64//!      │             ┗━┯━━━━━━━━━┯━┛
65//!  COPY│           slot│         │
66//!  data│           data│         │
67//! ┏━━━━v━━━━━┓ ┏━━━━━━━v━━━━━┓   │
68//! ┃  COPY    ┃ ┃ replication ┃   │
69//! ┃ decoder  ┃ ┃   decoder   ┃   │
70//! ┗━━━━┯━━━━━┛ ┗━━━━━┯━━━━━━━┛   │
71//!      │snapshot     │replication│
72//!      │updates      │updates    │
73//!      ╰────╮    ╭───╯           │
74//!          ╭┴────┴╮              │
75//!          │concat│              │
76//!          ╰──┬───╯              │
77//!             │ data             │progress
78//!             │ output           │output
79//!             v                  v
80//! ```
81
82use std::collections::BTreeMap;
83use std::convert::Infallible;
84use std::rc::Rc;
85use std::time::Duration;
86
87use differential_dataflow::AsCollection;
88use itertools::Itertools as _;
89use mz_expr::{EvalError, MirScalarExpr};
90use mz_ore::cast::CastFrom;
91use mz_ore::error::ErrorExt;
92use mz_postgres_util::desc::PostgresTableDesc;
93use mz_postgres_util::{Client, PostgresError, simple_query_opt};
94use mz_repr::{Datum, Diff, GlobalId, Row};
95use mz_sql_parser::ast::Ident;
96use mz_sql_parser::ast::display::AstDisplay;
97use mz_storage_types::errors::{DataflowError, SourceError, SourceErrorDetails};
98use mz_storage_types::sources::postgres::CastType;
99use mz_storage_types::sources::{
100    MzOffset, PostgresSourceConnection, SourceExport, SourceExportDetails, SourceTimestamp,
101};
102use mz_timely_util::builder_async::PressOnDropButton;
103use serde::{Deserialize, Serialize};
104use timely::container::CapacityContainerBuilder;
105use timely::dataflow::operators::core::Partition;
106use timely::dataflow::operators::{Concat, Map, ToStream};
107use timely::dataflow::{Scope, Stream};
108use timely::progress::Antichain;
109use tokio_postgres::error::SqlState;
110use tokio_postgres::types::PgLsn;
111
112use crate::healthcheck::{HealthStatusMessage, HealthStatusUpdate, StatusNamespace};
113use crate::source::types::{Probe, ProgressStatisticsUpdate, SourceRender, StackedCollection};
114use crate::source::{RawSourceCreationConfig, SourceMessage};
115
116mod replication;
117mod snapshot;
118
119impl SourceRender for PostgresSourceConnection {
120    type Time = MzOffset;
121
122    const STATUS_NAMESPACE: StatusNamespace = StatusNamespace::Postgres;
123
124    /// Render the ingestion dataflow. This function only connects things together and contains no
125    /// actual processing logic.
126    fn render<G: Scope<Timestamp = MzOffset>>(
127        self,
128        scope: &mut G,
129        config: &RawSourceCreationConfig,
130        resume_uppers: impl futures::Stream<Item = Antichain<MzOffset>> + 'static,
131        _start_signal: impl std::future::Future<Output = ()> + 'static,
132    ) -> (
133        BTreeMap<GlobalId, StackedCollection<G, Result<SourceMessage, DataflowError>>>,
134        Stream<G, Infallible>,
135        Stream<G, HealthStatusMessage>,
136        Stream<G, ProgressStatisticsUpdate>,
137        Option<Stream<G, Probe<MzOffset>>>,
138        Vec<PressOnDropButton>,
139    ) {
140        // Collect the source outputs that we will be exporting into a per-table map.
141        let mut table_info = BTreeMap::new();
142        for (idx, (id, export)) in config.source_exports.iter().enumerate() {
143            let SourceExport {
144                details,
145                storage_metadata: _,
146                data_config: _,
147            } = export;
148            let details = match details {
149                SourceExportDetails::Postgres(details) => details,
150                // This is an export that doesn't need any data output to it.
151                SourceExportDetails::None => continue,
152                _ => panic!("unexpected source export details: {:?}", details),
153            };
154            let desc = details.table.clone();
155            let casts = details.column_casts.clone();
156            let resume_upper = Antichain::from_iter(
157                config
158                    .source_resume_uppers
159                    .get(id)
160                    .expect("all source exports must be present in source resume uppers")
161                    .iter()
162                    .map(MzOffset::decode_row),
163            );
164            let output = SourceOutputInfo {
165                desc,
166                casts,
167                resume_upper,
168            };
169            table_info
170                .entry(output.desc.oid)
171                .or_insert_with(BTreeMap::new)
172                .insert(idx, output);
173        }
174
175        let metrics = config.metrics.get_postgres_source_metrics(config.id);
176
177        let (snapshot_updates, rewinds, slot_ready, snapshot_stats, snapshot_err, snapshot_token) =
178            snapshot::render(
179                scope.clone(),
180                config.clone(),
181                self.clone(),
182                table_info.clone(),
183                metrics.snapshot_metrics.clone(),
184            );
185
186        let (repl_updates, uppers, stats_stream, probe_stream, repl_err, repl_token) =
187            replication::render(
188                scope.clone(),
189                config.clone(),
190                self,
191                table_info,
192                &rewinds,
193                &slot_ready,
194                resume_uppers,
195                metrics,
196            );
197
198        let stats_stream = stats_stream.concat(&snapshot_stats);
199
200        let updates = snapshot_updates.concat(&repl_updates);
201        let partition_count = u64::cast_from(config.source_exports.len());
202        let data_streams: Vec<_> = updates
203            .inner
204            .partition::<CapacityContainerBuilder<_>, _, _>(
205                partition_count,
206                |((output, data), time, diff): &(
207                    (usize, Result<SourceMessage, DataflowError>),
208                    MzOffset,
209                    Diff,
210                )| {
211                    let output = u64::cast_from(*output);
212                    (output, (data.clone(), time.clone(), diff.clone()))
213                },
214            );
215        let mut data_collections = BTreeMap::new();
216        for (id, data_stream) in config.source_exports.keys().zip_eq(data_streams) {
217            data_collections.insert(*id, data_stream.as_collection());
218        }
219
220        let init = std::iter::once(HealthStatusMessage {
221            id: None,
222            namespace: Self::STATUS_NAMESPACE,
223            update: HealthStatusUpdate::Running,
224        })
225        .to_stream(scope);
226
227        // N.B. Note that we don't check ssh tunnel statuses here. We could, but immediately on
228        // restart we are going to set the status to an ssh error correctly, so we don't do this
229        // extra work.
230        let errs = snapshot_err.concat(&repl_err).map(move |err| {
231            // This update will cause the dataflow to restart
232            let err_string = err.display_with_causes().to_string();
233            let update = HealthStatusUpdate::halting(err_string.clone(), None);
234
235            let namespace = match err {
236                ReplicationError::Transient(err)
237                    if matches!(
238                        &*err,
239                        TransientError::PostgresError(PostgresError::Ssh(_))
240                            | TransientError::PostgresError(PostgresError::SshIo(_))
241                    ) =>
242                {
243                    StatusNamespace::Ssh
244                }
245                _ => Self::STATUS_NAMESPACE,
246            };
247
248            HealthStatusMessage {
249                id: None,
250                namespace: namespace.clone(),
251                update,
252            }
253        });
254
255        let health = init.concat(&errs);
256
257        (
258            data_collections,
259            uppers,
260            health,
261            stats_stream,
262            probe_stream,
263            vec![snapshot_token, repl_token],
264        )
265    }
266}
267
268#[derive(Clone, Debug)]
269struct SourceOutputInfo {
270    desc: PostgresTableDesc,
271    casts: Vec<(CastType, MirScalarExpr)>,
272    resume_upper: Antichain<MzOffset>,
273}
274
275#[derive(Clone, Debug, thiserror::Error)]
276pub enum ReplicationError {
277    #[error(transparent)]
278    Transient(#[from] Rc<TransientError>),
279    #[error(transparent)]
280    Definite(#[from] Rc<DefiniteError>),
281}
282
283/// A transient error that never ends up in the collection of a specific table.
284#[derive(Debug, thiserror::Error)]
285pub enum TransientError {
286    #[error("replication slot mysteriously missing")]
287    MissingReplicationSlot,
288    #[error(
289        "slot overcompacted. Requested LSN {requested_lsn} but only LSNs >= {available_lsn} are available"
290    )]
291    OvercompactedReplicationSlot {
292        requested_lsn: MzOffset,
293        available_lsn: MzOffset,
294    },
295    #[error("replication slot already exists")]
296    ReplicationSlotAlreadyExists,
297    #[error("stream ended prematurely")]
298    ReplicationEOF,
299    #[error("unexpected replication message")]
300    UnknownReplicationMessage,
301    #[error("unexpected logical replication message")]
302    UnknownLogicalReplicationMessage,
303    #[error("received replication event outside of transaction")]
304    BareTransactionEvent,
305    #[error("lsn mismatch between BEGIN and COMMIT")]
306    InvalidTransaction,
307    #[error("BEGIN within existing BEGIN stream")]
308    NestedTransaction,
309    #[error("recoverable errors should crash the process during snapshots")]
310    SyntheticError,
311    #[error("sql client error")]
312    SQLClient(#[from] tokio_postgres::Error),
313    #[error(transparent)]
314    PostgresError(#[from] PostgresError),
315    #[error(transparent)]
316    Generic(#[from] anyhow::Error),
317}
318
319/// A definite error that always ends up in the collection of a specific table.
320#[derive(Debug, Clone, Serialize, Deserialize, thiserror::Error)]
321pub enum DefiniteError {
322    #[error("slot compacted past snapshot point. snapshot consistent point={0} resume_lsn={1}")]
323    SlotCompactedPastResumePoint(MzOffset, MzOffset),
324    #[error("table was truncated")]
325    TableTruncated,
326    #[error("table was dropped")]
327    TableDropped,
328    #[error("publication {0:?} does not exist")]
329    PublicationDropped(String),
330    #[error("replication slot has been invalidated because it exceeded the maximum reserved size")]
331    InvalidReplicationSlot,
332    #[error("unexpected number of columns while parsing COPY output")]
333    MissingColumn,
334    #[error("failed to parse COPY protocol")]
335    InvalidCopyInput,
336    #[error("invalid timeline ID from PostgreSQL server. Expected {expected} but got {actual}")]
337    InvalidTimelineId { expected: u64, actual: u64 },
338    #[error(
339        "TOASTed value missing from old row. Did you forget to set REPLICA IDENTITY to FULL for your table?"
340    )]
341    MissingToast,
342    #[error(
343        "old row missing from replication stream. Did you forget to set REPLICA IDENTITY to FULL for your table?"
344    )]
345    DefaultReplicaIdentity,
346    #[error("incompatible schema change: {0}")]
347    // TODO: proper error variants for all the expected schema violations
348    IncompatibleSchema(String),
349    #[error("invalid UTF8 string: {0:?}")]
350    InvalidUTF8(Vec<u8>),
351    #[error("failed to cast raw column: {0}")]
352    CastError(#[source] EvalError),
353    #[error("unexpected binary data in replication stream")]
354    UnexpectedBinaryData,
355}
356
357impl From<DefiniteError> for DataflowError {
358    fn from(err: DefiniteError) -> Self {
359        let m = err.to_string().into();
360        DataflowError::SourceError(Box::new(SourceError {
361            error: match &err {
362                DefiniteError::SlotCompactedPastResumePoint(_, _) => SourceErrorDetails::Other(m),
363                DefiniteError::TableTruncated => SourceErrorDetails::Other(m),
364                DefiniteError::TableDropped => SourceErrorDetails::Other(m),
365                DefiniteError::PublicationDropped(_) => SourceErrorDetails::Initialization(m),
366                DefiniteError::InvalidReplicationSlot => SourceErrorDetails::Initialization(m),
367                DefiniteError::MissingColumn => SourceErrorDetails::Other(m),
368                DefiniteError::InvalidCopyInput => SourceErrorDetails::Other(m),
369                DefiniteError::InvalidTimelineId { .. } => SourceErrorDetails::Initialization(m),
370                DefiniteError::MissingToast => SourceErrorDetails::Other(m),
371                DefiniteError::DefaultReplicaIdentity => SourceErrorDetails::Other(m),
372                DefiniteError::IncompatibleSchema(_) => SourceErrorDetails::Other(m),
373                DefiniteError::InvalidUTF8(_) => SourceErrorDetails::Other(m),
374                DefiniteError::CastError(_) => SourceErrorDetails::Other(m),
375                DefiniteError::UnexpectedBinaryData => SourceErrorDetails::Other(m),
376            },
377        }))
378    }
379}
380
381async fn ensure_replication_slot(client: &Client, slot: &str) -> Result<(), TransientError> {
382    // Note: Using unchecked here is okay because we're using it in a SQL query.
383    let slot = Ident::new_unchecked(slot).to_ast_string_simple();
384    let query = format!("CREATE_REPLICATION_SLOT {slot} LOGICAL \"pgoutput\" NOEXPORT_SNAPSHOT");
385    match simple_query_opt(client, &query).await {
386        Ok(_) => Ok(()),
387        // If the slot already exists that's still ok
388        Err(PostgresError::Postgres(err)) if err.code() == Some(&SqlState::DUPLICATE_OBJECT) => {
389            tracing::trace!("replication slot {slot} already existed");
390            Ok(())
391        }
392        Err(err) => Err(TransientError::PostgresError(err)),
393    }
394}
395
396/// The state of a replication slot.
397struct SlotMetadata {
398    /// The process ID of the session using this slot if the slot is currently actively being used.
399    /// None if inactive.
400    active_pid: Option<i32>,
401    /// The address (LSN) up to which the logical slot's consumer has confirmed receiving data.
402    /// Data corresponding to the transactions committed before this LSN is not available anymore.
403    confirmed_flush_lsn: MzOffset,
404}
405
406/// Fetches the minimum LSN at which this slot can safely resume.
407async fn fetch_slot_metadata(
408    client: &Client,
409    slot: &str,
410    interval: Duration,
411) -> Result<SlotMetadata, TransientError> {
412    loop {
413        let query = "SELECT active_pid, confirmed_flush_lsn
414                FROM pg_replication_slots WHERE slot_name = $1";
415        let Some(row) = client.query_opt(query, &[&slot]).await? else {
416            return Err(TransientError::MissingReplicationSlot);
417        };
418
419        match row.get::<_, Option<PgLsn>>("confirmed_flush_lsn") {
420            // For postgres, `confirmed_flush_lsn` means that the slot is able to produce
421            // all transactions that happen at tx_lsn >= confirmed_flush_lsn. Therefore this value
422            // already has "upper" semantics.
423            Some(lsn) => {
424                return Ok(SlotMetadata {
425                    confirmed_flush_lsn: MzOffset::from(lsn),
426                    active_pid: row.get("active_pid"),
427                });
428            }
429            // It can happen that confirmed_flush_lsn is NULL as the slot initializes
430            // This could probably be a `tokio::time::interval`, but its only is called twice,
431            // so its fine like this.
432            None => tokio::time::sleep(interval).await,
433        };
434    }
435}
436
437/// Fetch the `pg_current_wal_lsn`, used to report metrics.
438async fn fetch_max_lsn(client: &Client) -> Result<MzOffset, TransientError> {
439    let query = "SELECT pg_current_wal_lsn()";
440    let row = simple_query_opt(client, query).await?;
441
442    match row.and_then(|row| {
443        row.get("pg_current_wal_lsn")
444            .map(|lsn| lsn.parse::<PgLsn>().unwrap())
445    }) {
446        // Based on the documentation, it appears that `pg_current_wal_lsn` has
447        // the same "upper" semantics of `confirmed_flush_lsn`:
448        // <https://www.postgresql.org/docs/current/functions-admin.html#FUNCTIONS-ADMIN-BACKUP>
449        // We may need to revisit this and use `pg_current_wal_flush_lsn`.
450        Some(lsn) => Ok(MzOffset::from(lsn)),
451        None => Err(TransientError::Generic(anyhow::anyhow!(
452            "pg_current_wal_lsn() mysteriously has no value"
453        ))),
454    }
455}
456
457// Ensures that the table with oid `oid` and expected schema `expected_schema` is still compatible
458// with the current upstream schema `upstream_info`.
459fn verify_schema(
460    oid: u32,
461    expected_desc: &PostgresTableDesc,
462    upstream_info: &BTreeMap<u32, PostgresTableDesc>,
463    casts: &[(CastType, MirScalarExpr)],
464) -> Result<(), DefiniteError> {
465    let current_desc = upstream_info.get(&oid).ok_or(DefiniteError::TableDropped)?;
466
467    let allow_oids_to_change_by_col_num = expected_desc
468        .columns
469        .iter()
470        .zip_eq(casts.iter())
471        .flat_map(|(col, (cast_type, _))| match cast_type {
472            CastType::Text => Some(col.col_num),
473            CastType::Natural => None,
474        })
475        .collect();
476
477    match expected_desc.determine_compatibility(current_desc, &allow_oids_to_change_by_col_num) {
478        Ok(()) => Ok(()),
479        Err(err) => Err(DefiniteError::IncompatibleSchema(err.to_string())),
480    }
481}
482
483/// Casts a text row into the target types
484fn cast_row(
485    casts: &[(CastType, MirScalarExpr)],
486    datums: &[Datum<'_>],
487    row: &mut Row,
488) -> Result<(), DefiniteError> {
489    let arena = mz_repr::RowArena::new();
490    let mut packer = row.packer();
491    for (_, column_cast) in casts {
492        let datum = column_cast
493            .eval(datums, &arena)
494            .map_err(DefiniteError::CastError)?;
495        packer.push(datum);
496    }
497    Ok(())
498}
499
500/// Converts raw bytes that are expected to be UTF8 encoded into a `Datum::String`
501fn decode_utf8_text(bytes: &[u8]) -> Result<Datum<'_>, DefiniteError> {
502    match std::str::from_utf8(bytes) {
503        Ok(text) => Ok(Datum::String(text)),
504        Err(_) => Err(DefiniteError::InvalidUTF8(bytes.to_vec())),
505    }
506}