Skip to main content

mz_storage/source/postgres/
replication.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Renders the logical replication side of the [`PostgresSourceConnection`] ingestion dataflow.
11//!
12//! ```text
13//!              o
14//!              │rewind
15//!              │requests
16//!          ╭───┴────╮
17//!          │exchange│ (collect all requests to one worker)
18//!          ╰───┬────╯
19//!           ┏━━v━━━━━━━━━━┓
20//!           ┃ replication ┃ (single worker)
21//!           ┃   reader    ┃
22//!           ┗━┯━━━━━━━━┯━━┛
23//!             │raw     │
24//!             │data    │
25//!        ╭────┴─────╮  │
26//!        │distribute│  │ (distribute to all workers)
27//!        ╰────┬─────╯  │
28//! ┏━━━━━━━━━━━┷━┓      │
29//! ┃ replication ┃      │ (parallel decode)
30//! ┃   decoder   ┃      │
31//! ┗━━━━━┯━━━━━━━┛      │
32//!       │ replication  │ progress
33//!       │ updates      │ output
34//!       v              v
35//! ```
36//!
37//! # Progress tracking
38//!
39//! In order to avoid causing excessive resource usage in the upstream server it's important to
40//! track the LSN that we have successfully committed to persist and communicate that back to
41//! PostgreSQL. Under normal operation this gauge of progress is provided by the presence of
42//! transactions themselves. Since at a given LSN offset there can be only a single message, when a
43//! transaction is received and processed we can infer that we have seen all the messages that are
44//! not beyond `commit_lsn + 1`.
45//!
46//! Things are a bit more complicated in the absence of transactions though because even though we
47//! don't receive any the server might very well be generating WAL records. This can happen if
48//! there is a separate logical database performing writes (which is the case for RDS databases),
49//! or, in servers running PostgreSQL version 15 or greater, the logical replication process
50//! includes an optimization that omits empty transactions, which can happen if you're only
51//! replicating a subset of the tables and there writes going to the other ones.
52//!
53//! If we fail to detect this situation and don't send LSN feedback in a timely manner the server
54//! will be forced to keep around WAL data that can eventually lead to disk space exhaustion.
55//!
56//! In the absence of transactions the only available piece of information in the replication
57//! stream are keepalive messages. Keepalive messages are documented[1] to contain the current end
58//! of WAL on the server. That is a useless number when it comes to progress tracking because there
59//! might be pending messages at LSNs between the last received commit_lsn and the current end of
60//! WAL.
61//!
62//! Fortunately for us, the documentation for PrimaryKeepalive messages is wrong and it actually
63//! contains the last *sent* LSN[2]. Here sent doesn't necessarily mean sent over the wire, but
64//! sent to the upstream process that is handling producing the logical stream. Therefore, if we
65//! receive a keepalive with a particular LSN we can be certain that there are no other replication
66//! messages at previous LSNs, because they would have been already generated and received. We
67//! therefore connect the keepalive messages directly to our capability.
68//!
69//! [1]: https://www.postgresql.org/docs/15/protocol-replication.html#PROTOCOL-REPLICATION-START-REPLICATION
70//! [2]: https://www.postgresql.org/message-id/CAFPTHDZS9O9WG02EfayBd6oONzK%2BqfUxS6AbVLJ7W%2BKECza2gg%40mail.gmail.com
71
72use std::collections::BTreeMap;
73use std::convert::Infallible;
74use std::pin::pin;
75use std::rc::Rc;
76use std::str::FromStr;
77use std::sync::Arc;
78use std::sync::LazyLock;
79use std::time::Instant;
80use std::time::{Duration, SystemTime, UNIX_EPOCH};
81
82use differential_dataflow::AsCollection;
83use futures::{FutureExt, Stream as AsyncStream, StreamExt, TryStreamExt};
84use mz_dyncfg::ConfigSet;
85use mz_ore::cast::CastFrom;
86use mz_ore::future::InTask;
87use mz_postgres_util::PostgresError;
88use mz_postgres_util::{Client, simple_query_opt};
89use mz_repr::{Datum, DatumVec, Diff, Row};
90use mz_sql_parser::ast::Ident;
91use mz_sql_parser::ast::display::{AstDisplay, escaped_string_literal};
92use mz_storage_types::dyncfgs::PG_SCHEMA_VALIDATION_INTERVAL;
93use mz_storage_types::dyncfgs::PG_SOURCE_VALIDATE_TIMELINE;
94use mz_storage_types::errors::DataflowError;
95use mz_storage_types::sources::{MzOffset, PostgresSourceConnection};
96use mz_timely_util::builder_async::{
97    AsyncOutputHandle, Event as AsyncEvent, OperatorBuilder as AsyncOperatorBuilder,
98    PressOnDropButton,
99};
100use postgres_replication::LogicalReplicationStream;
101use postgres_replication::protocol::{LogicalReplicationMessage, ReplicationMessage, TupleData};
102use serde::{Deserialize, Serialize};
103use timely::container::CapacityContainerBuilder;
104use timely::dataflow::channels::pact::{Exchange, Pipeline};
105use timely::dataflow::operators::Capability;
106use timely::dataflow::operators::Concat;
107use timely::dataflow::operators::Operator;
108use timely::dataflow::operators::core::Map;
109use timely::dataflow::{Scope, StreamVec};
110use timely::progress::Antichain;
111use tokio::sync::{mpsc, watch};
112use tokio_postgres::error::SqlState;
113use tokio_postgres::types::PgLsn;
114use tracing::{error, trace};
115
116use crate::metrics::source::postgres::PgSourceMetrics;
117use crate::source::RawSourceCreationConfig;
118use crate::source::postgres::verify_schema;
119use crate::source::postgres::{DefiniteError, ReplicationError, SourceOutputInfo, TransientError};
120use crate::source::probe;
121use crate::source::types::{FuelSize, Probe, SignaledFuture, SourceMessage, StackedCollection};
122
123/// A logical replication message from the server.
124type LogicalReplMsg = ReplicationMessage<LogicalReplicationMessage>;
125
126/// A decoded row from a transaction with source information.
127type DecodedRow = (u32, usize, Result<Row, DefiniteError>, Diff);
128
129/// Postgres epoch is 2000-01-01T00:00:00Z
130static PG_EPOCH: LazyLock<SystemTime> =
131    LazyLock::new(|| UNIX_EPOCH + Duration::from_secs(946_684_800));
132
133// A request to rewind a snapshot taken at `snapshot_lsn` to the initial LSN of the replication
134// slot. This is accomplished by emitting `(data, 0, -diff)` for all updates `(data, lsn, diff)`
135// whose `lsn <= snapshot_lsn`. By convention the snapshot is always emitted at LSN 0.
136#[derive(Debug, Clone, Serialize, Deserialize)]
137pub(crate) struct RewindRequest {
138    /// The output index that should be rewound.
139    pub(crate) output_index: usize,
140    /// The LSN that the snapshot was taken at.
141    pub(crate) snapshot_lsn: MzOffset,
142}
143
144/// Renders the replication dataflow. See the module documentation for more information.
145pub(crate) fn render<'scope>(
146    scope: Scope<'scope, MzOffset>,
147    config: RawSourceCreationConfig,
148    connection: PostgresSourceConnection,
149    table_info: BTreeMap<u32, BTreeMap<usize, SourceOutputInfo>>,
150    rewind_stream: StreamVec<'scope, MzOffset, RewindRequest>,
151    slot_ready_stream: StreamVec<'scope, MzOffset, Infallible>,
152    committed_uppers: impl futures::Stream<Item = Antichain<MzOffset>> + 'static,
153    metrics: PgSourceMetrics,
154) -> (
155    StackedCollection<'scope, MzOffset, (usize, Result<SourceMessage, DataflowError>)>,
156    StreamVec<'scope, MzOffset, Probe<MzOffset>>,
157    StreamVec<'scope, MzOffset, ReplicationError>,
158    PressOnDropButton,
159) {
160    let op_name = format!("ReplicationReader({})", config.id);
161    let mut builder = AsyncOperatorBuilder::new(op_name, scope.clone());
162
163    let slot_reader = u64::cast_from(config.responsible_worker("slot"));
164    let (data_output, data_stream) = builder.new_output();
165    let (definite_error_handle, definite_errors) =
166        builder.new_output::<CapacityContainerBuilder<_>>();
167    let (probe_output, probe_stream) = builder.new_output::<CapacityContainerBuilder<_>>();
168
169    let mut rewind_input =
170        builder.new_disconnected_input(rewind_stream, Exchange::new(move |_| slot_reader));
171    let mut slot_ready_input = builder.new_disconnected_input(slot_ready_stream, Pipeline);
172    let output_uppers = table_info
173        .iter()
174        .flat_map(|(_, outputs)| outputs.values().map(|o| o.resume_upper.clone()))
175        .collect::<Vec<_>>();
176    metrics.tables.set(u64::cast_from(output_uppers.len()));
177
178    let reader_table_info = table_info.clone();
179    let (button, transient_errors) = builder.build_fallible(move |caps| {
180        let mut table_info = reader_table_info;
181        let busy_signal = Arc::clone(&config.busy_signal);
182        Box::pin(SignaledFuture::new(busy_signal, async move {
183            let (id, worker_id) = (config.id, config.worker_id);
184            let [data_cap_set, definite_error_cap_set, probe_cap]: &mut [_; 3] =
185                caps.try_into().unwrap();
186
187            if !config.responsible_for("slot") {
188                // Emit 0, to mark this worker as having started up correctly.
189                for stat in config.statistics.values() {
190                    stat.set_offset_known(0);
191                    stat.set_offset_committed(0);
192                }
193                return Ok(());
194            }
195
196            // Determine the slot lsn.
197            let connection_config = connection
198                .connection
199                .config(
200                    &config.config.connection_context.secrets_reader,
201                    &config.config,
202                    InTask::Yes,
203                )
204                .await?;
205
206            let slot = &connection.publication_details.slot;
207            let replication_client = connection_config
208                .connect_replication(&config.config.connection_context.ssh_tunnel_manager)
209                .await?;
210
211            let metadata_client = connection_config
212                .connect(
213                    "replication metadata",
214                    &config.config.connection_context.ssh_tunnel_manager,
215                )
216                .await?;
217            let metadata_client = Arc::new(metadata_client);
218
219            while let Some(_) = slot_ready_input.next().await {
220                // Wait for the slot to be created
221            }
222
223            // The slot is always created by the snapshot operator. If the slot doesn't exist,
224            // when this check runs, this operator will return an error.
225            let slot_metadata = super::fetch_slot_metadata(
226                &*metadata_client,
227                slot,
228                mz_storage_types::dyncfgs::PG_FETCH_SLOT_RESUME_LSN_INTERVAL
229                    .get(config.config.config_set()),
230            )
231            .await?;
232
233            // We're the only application that should be using this replication
234            // slot. The only way that there can be another connection using
235            // this slot under normal operation is if there's a stale TCP
236            // connection from a prior incarnation of the source holding on to
237            // the slot. We don't want to wait for the WAL sender timeout and/or
238            // TCP keepalives to time out that connection, because these values
239            // are generally under the control of the DBA and may not time out
240            // the connection for multiple minutes, or at all. Instead we just
241            // force kill the connection that's using the slot.
242            //
243            // Note that there's a small risk that *we're* the zombie cluster
244            // that should not be using the replication slot. Kubernetes cannot
245            // 100% guarantee that only one cluster is alive at a time. However,
246            // this situation should not last long, and the worst that can
247            // happen is a bit of transient thrashing over ownership of the
248            // replication slot.
249            if let Some(active_pid) = slot_metadata.active_pid {
250                tracing::warn!(
251                    %id, %active_pid,
252                    "replication slot already in use; will attempt to kill existing connection",
253                );
254
255                match metadata_client
256                    .execute("SELECT pg_terminate_backend($1)", &[&active_pid])
257                    .await
258                {
259                    Ok(_) => {
260                        tracing::info!(
261                            "successfully killed existing connection; \
262                            starting replication is likely to succeed"
263                        );
264                        // Note that `pg_terminate_backend` does not wait for
265                        // the termination of the targeted connection to
266                        // complete. We may try to start replication before the
267                        // targeted connection has cleaned up its state. That's
268                        // okay. If that happens we'll just try again from the
269                        // top via the suspend-and-restart flow.
270                    }
271                    Err(e) => {
272                        tracing::warn!(
273                            %e,
274                            "failed to kill existing replication connection; \
275                            replication will likely fail to start"
276                        );
277                        // Continue on anyway, just in case the replication slot
278                        // is actually available. Maybe PostgreSQL has some
279                        // staleness when it reports `active_pid`, for example.
280                    }
281                }
282            }
283
284            // The overall resumption point for this source is the minimum of the resumption points
285            // contributed by each of the outputs.
286            let resume_lsn = output_uppers
287                .iter()
288                .flat_map(|f| f.elements())
289                .map(|&lsn| {
290                    // An output is either an output that has never had data committed to it or one
291                    // that has and needs to resume. We differentiate between the two by checking
292                    // whether an output wishes to "resume" from the minimum timestamp. In that case
293                    // its contribution to the overal resumption point is the earliest point available
294                    // in the slot. This information would normally be something that the storage
295                    // controller figures out in the form of an as-of frontier, but at the moment the
296                    // storage controller does not have visibility into what the replication slot is
297                    // doing.
298                    if lsn == MzOffset::from(0) {
299                        slot_metadata.confirmed_flush_lsn
300                    } else {
301                        lsn
302                    }
303                })
304                .min();
305            let Some(resume_lsn) = resume_lsn else {
306                std::future::pending::<()>().await;
307                return Ok(());
308            };
309            // If we don't set "offset_committed" now, it'll be stuck at 0 (the default value)
310            // until we finish processing the table snapshot. If the snapshot is large, that could be a long time.
311            // This confuses the ingestion lag calculation in the UI, causing it to yield erroneously high values.
312            for stat in config.statistics.values() {
313                stat.set_offset_committed(resume_lsn.offset);
314            }
315            trace!(%id, "timely-{worker_id} replication reader started lsn={resume_lsn}");
316
317            // Emitting an initial probe before we start waiting for rewinds ensures that we will
318            // have a timestamp binding in the remap collection while the snapshot is processed.
319            // This is important because otherwise the snapshot updates would need to be buffered
320            // in the reclock operator, instead of being spilled to S3 in the persist sink.
321            //
322            // Note that we need to fetch the probe LSN _after_ having created the replication
323            // slot, to make sure the fetched LSN will be included in the replication stream.
324            let probe_ts = (config.now_fn)().into();
325            let max_lsn = super::fetch_max_lsn(&*metadata_client).await?;
326            let probe = Probe {
327                probe_ts,
328                upstream_frontier: Antichain::from_elem(max_lsn),
329            };
330            probe_output.give(&probe_cap[0], probe);
331
332            let mut rewinds = BTreeMap::new();
333            while let Some(event) = rewind_input.next().await {
334                if let AsyncEvent::Data(_, data) = event {
335                    for req in data {
336                        if resume_lsn > req.snapshot_lsn + 1 {
337                            let err = DefiniteError::SlotCompactedPastResumePoint(
338                                req.snapshot_lsn + 1,
339                                resume_lsn,
340                            );
341                            // If the replication stream cannot be obtained from the resume point there is nothing
342                            // else to do. These errors are not retractable.
343                            for (oid, outputs) in table_info.iter() {
344                                for output_index in outputs.keys() {
345                                    // We pick `u64::MAX` as the LSN which will (in practice) never conflict
346                                    // any previously revealed portions of the TVC.
347                                    let update = (
348                                        (
349                                            *oid,
350                                            *output_index,
351                                            Err(DataflowError::from(err.clone())),
352                                        ),
353                                        MzOffset::from(u64::MAX),
354                                        Diff::ONE,
355                                    );
356                                    let size = update.fuel_size();
357                                    data_output
358                                        .give_fueled(&data_cap_set[0], update, size)
359                                        .await;
360                                }
361                            }
362                            definite_error_handle.give(
363                                &definite_error_cap_set[0],
364                                ReplicationError::Definite(Rc::new(err)),
365                            );
366                            return Ok(());
367                        }
368                        rewinds.insert(req.output_index, req);
369                    }
370                }
371            }
372            trace!(%id, "timely-{worker_id} pending rewinds {rewinds:?}");
373
374            let mut committed_uppers = pin!(committed_uppers);
375
376            let stream_result = raw_stream(
377                &config,
378                replication_client,
379                Arc::clone(&metadata_client),
380                &connection.publication_details.slot,
381                &connection.publication_details.timeline_id,
382                &connection.publication,
383                resume_lsn,
384                committed_uppers.as_mut(),
385                &probe_output,
386                &probe_cap[0],
387            )
388            .await?;
389
390            let stream = match stream_result {
391                Ok(stream) => stream,
392                Err(err) => {
393                    // If the replication stream cannot be obtained in a definite way there is
394                    // nothing else to do. These errors are not retractable.
395                    for (oid, outputs) in table_info.iter() {
396                        for output_index in outputs.keys() {
397                            // We pick `u64::MAX` as the LSN which will (in practice) never conflict
398                            // any previously revealed portions of the TVC.
399                            let update = (
400                                (*oid, *output_index, Err(DataflowError::from(err.clone()))),
401                                MzOffset::from(u64::MAX),
402                                Diff::ONE,
403                            );
404                            let size = update.fuel_size();
405                            data_output
406                                .give_fueled(&data_cap_set[0], update, size)
407                                .await;
408                        }
409                    }
410
411                    definite_error_handle.give(
412                        &definite_error_cap_set[0],
413                        ReplicationError::Definite(Rc::new(err)),
414                    );
415                    return Ok(());
416                }
417            };
418            let mut stream = pin!(stream.peekable());
419
420            // Run the periodic schema validation on a separate task using a separate client,
421            // to prevent it from blocking the replication reading progress.
422            let ssh_tunnel_manager = &config.config.connection_context.ssh_tunnel_manager;
423            let client = connection_config
424                .connect("schema validation", ssh_tunnel_manager)
425                .await?;
426            let mut schema_errors = spawn_schema_validator(
427                client,
428                &config,
429                connection.publication.clone(),
430                table_info.clone(),
431            );
432
433            // Instead of downgrading the capability for every transaction we process we only do it
434            // if we're about to yield, which is checked at the bottom of the loop. This avoids
435            // creating excessive progress tracking traffic when there are multiple small
436            // transactions ready to go.
437            let mut data_upper = resume_lsn;
438            while let Some(event) = stream.as_mut().next().await {
439                use LogicalReplicationMessage::*;
440                use ReplicationMessage::*;
441                match event {
442                    Ok(XLogData(data)) => match data.data() {
443                        Begin(begin) => {
444                            let commit_lsn = MzOffset::from(begin.final_lsn());
445
446                            let mut tx = pin!(extract_transaction(
447                                stream.by_ref(),
448                                &*metadata_client,
449                                commit_lsn,
450                                &mut table_info,
451                                &metrics,
452                                &connection.publication,
453                            ));
454
455                            trace!(
456                                %id,
457                                "timely-{worker_id} extracting transaction \
458                                    at {commit_lsn}"
459                            );
460                            assert!(
461                                data_upper <= commit_lsn,
462                                "new_upper={data_upper} tx_lsn={commit_lsn}",
463                            );
464                            data_upper = commit_lsn + 1;
465                            while let Some((oid, output_index, event, diff)) = tx.try_next().await?
466                            {
467                                let event = event.map_err(Into::into);
468                                let data = (oid, output_index, event);
469                                if let Some(req) = rewinds.get(&output_index) {
470                                    if commit_lsn <= req.snapshot_lsn {
471                                        let update = (data.clone(), MzOffset::from(0), -diff);
472                                        let size = update.fuel_size();
473                                        data_output
474                                            .give_fueled(&data_cap_set[0], update, size)
475                                            .await;
476                                    }
477                                }
478                                let update = (data, commit_lsn, diff);
479                                let size = update.fuel_size();
480                                data_output
481                                    .give_fueled(&data_cap_set[0], update, size)
482                                    .await;
483                            }
484                        }
485                        _ => return Err(TransientError::BareTransactionEvent),
486                    },
487                    Ok(PrimaryKeepAlive(keepalive)) => {
488                        trace!( %id,
489                            "timely-{worker_id} received keepalive lsn={}",
490                            keepalive.wal_end()
491                        );
492
493                        // Take the opportunity to report any schema validation errors.
494                        while let Ok(error) = schema_errors.try_recv() {
495                            use SchemaValidationError::*;
496                            match error {
497                                Postgres(PostgresError::PublicationMissing(publication)) => {
498                                    let err = DefiniteError::PublicationDropped(publication);
499                                    for (oid, outputs) in table_info.iter() {
500                                        for output_index in outputs.keys() {
501                                            let update = (
502                                                (
503                                                    *oid,
504                                                    *output_index,
505                                                    Err(DataflowError::from(err.clone())),
506                                                ),
507                                                data_cap_set[0].time().clone(),
508                                                Diff::ONE,
509                                            );
510                                            let size = update.fuel_size();
511                                            data_output
512                                                .give_fueled(&data_cap_set[0], update, size)
513                                                .await;
514                                        }
515                                    }
516                                    definite_error_handle.give(
517                                        &definite_error_cap_set[0],
518                                        ReplicationError::Definite(Rc::new(err)),
519                                    );
520                                    return Ok(());
521                                }
522                                Postgres(pg_error) => Err(TransientError::from(pg_error))?,
523                                Schema {
524                                    oid,
525                                    output_index,
526                                    error,
527                                } => {
528                                    let table = table_info.get_mut(&oid).unwrap();
529                                    if table.remove(&output_index).is_none() {
530                                        continue;
531                                    }
532
533                                    let update = (
534                                        (oid, output_index, Err(error.into())),
535                                        data_cap_set[0].time().clone(),
536                                        Diff::ONE,
537                                    );
538                                    let size = update.fuel_size();
539                                    data_output
540                                        .give_fueled(&data_cap_set[0], update, size)
541                                        .await;
542                                }
543                            }
544                        }
545                        data_upper = std::cmp::max(data_upper, keepalive.wal_end().into());
546                    }
547                    Ok(_) => return Err(TransientError::UnknownReplicationMessage),
548                    Err(err) => return Err(err),
549                }
550
551                let will_yield = stream.as_mut().peek().now_or_never().is_none();
552                if will_yield {
553                    trace!(%id, "timely-{worker_id} yielding at lsn={data_upper}");
554                    rewinds.retain(|_, req| data_upper <= req.snapshot_lsn);
555                    // As long as there are pending rewinds we can't downgrade our data capability
556                    // since we must be able to produce data at offset 0.
557                    if rewinds.is_empty() {
558                        data_cap_set.downgrade([&data_upper]);
559                    }
560                }
561            }
562            // We never expect the replication stream to gracefully end
563            Err(TransientError::ReplicationEOF)
564        }))
565    });
566
567    // We now process the slot updates and apply the cast expressions
568    let mut final_row = Row::default();
569    let mut datum_vec = DatumVec::new();
570    let mut next_worker = (0..u64::cast_from(scope.peers()))
571        // Round robin on 1000-records basis to avoid creating tiny containers when there are a
572        // small number of updates and a large number of workers.
573        .flat_map(|w| std::iter::repeat_n(w, 1000))
574        .cycle();
575    let round_robin = Exchange::new(move |_| next_worker.next().unwrap());
576    let replication_updates = data_stream
577        .unary(round_robin, "PgCastReplicationRows", |_, _| {
578            move |input, output| {
579                input.for_each_time(|time, data| {
580                    let mut session = output.session(&time);
581                    for ((oid, output_index, event), time, diff) in
582                        data.flat_map(|data| data.drain(..))
583                    {
584                        let output = &table_info
585                            .get(&oid)
586                            .and_then(|outputs| outputs.get(&output_index))
587                            .expect("table_info contains all outputs");
588                        let event = event.and_then(|row| {
589                            let datums = datum_vec.borrow_with(&row);
590                            super::cast_row(&output.casts, &datums, &mut final_row)?;
591                            Ok(SourceMessage {
592                                key: Row::default(),
593                                value: final_row.clone(),
594                                metadata: Row::default(),
595                            })
596                        });
597
598                        session.give(((output_index, event), time, diff));
599                    }
600                });
601            }
602        })
603        .as_collection();
604
605    let errors = definite_errors.concat(transient_errors.map(ReplicationError::from));
606
607    (
608        replication_updates,
609        probe_stream,
610        errors,
611        button.press_on_drop(),
612    )
613}
614
615/// Produces the logical replication stream while taking care of regularly sending standby
616/// keepalive messages with the provided `uppers` stream.
617///
618/// The returned stream will contain all transactions that whose commit LSN is beyond `resume_lsn`.
619async fn raw_stream<'a>(
620    config: &'a RawSourceCreationConfig,
621    replication_client: Client,
622    metadata_client: Arc<Client>,
623    slot: &'a str,
624    timeline_id: &'a Option<u64>,
625    publication: &'a str,
626    resume_lsn: MzOffset,
627    uppers: impl futures::Stream<Item = Antichain<MzOffset>> + 'a,
628    probe_output: &'a AsyncOutputHandle<MzOffset, CapacityContainerBuilder<Vec<Probe<MzOffset>>>>,
629    probe_cap: &'a Capability<MzOffset>,
630) -> Result<
631    Result<impl AsyncStream<Item = Result<LogicalReplMsg, TransientError>> + 'a, DefiniteError>,
632    TransientError,
633> {
634    if let Err(err) = ensure_publication_exists(&*metadata_client, publication).await? {
635        // If the publication gets deleted there is nothing else to do. These errors
636        // are not retractable.
637        return Ok(Err(err));
638    }
639
640    // Skip the timeline ID check for sources without a known timeline ID
641    // (sources created before the timeline ID was added to the source details)
642    if let Some(expected_timeline_id) = timeline_id {
643        if let Err(err) = ensure_replication_timeline_id(
644            &replication_client,
645            expected_timeline_id,
646            config.config.config_set(),
647        )
648        .await?
649        {
650            return Ok(Err(err));
651        }
652    }
653
654    // How often a proactive standby status update message should be sent to the server.
655    //
656    // The upstream will periodically request status updates by setting the keepalive's reply field
657    // value to 1. However, we cannot rely on these messages arriving on time. For example, when
658    // the upstream is sending a big transaction its keepalive messages are queued and can be
659    // delayed arbitrarily.
660    //
661    // See: <https://www.postgresql.org/message-id/CAMsr+YE2dSfHVr7iEv1GSPZihitWX-PMkD9QALEGcTYa+sdsgg@mail.gmail.com>
662    //
663    // For this reason we query the server's timeout value and proactively send a keepalive at
664    // twice the frequency to have a healthy margin from the deadline.
665    //
666    // Note: We must use the metadata client here which is NOT in replication mode. Some Aurora
667    // Postgres versions disallow SHOW commands from within replication connection.
668    // See: https://github.com/readysettech/readyset/discussions/28#discussioncomment-4405671
669    let row = simple_query_opt(&*metadata_client, "SHOW wal_sender_timeout;")
670        .await?
671        .unwrap();
672    let wal_sender_timeout = match row.get("wal_sender_timeout") {
673        // When this parameter is zero the timeout mechanism is disabled
674        Some("0") => None,
675        Some(value) => Some(
676            mz_repr::adt::interval::Interval::from_str(value)
677                .unwrap()
678                .duration()
679                .unwrap(),
680        ),
681        None => panic!("ubiquitous parameter missing"),
682    };
683
684    // This interval controls the cadence at which we send back status updates and, crucially,
685    // request PrimaryKeepAlive messages. PrimaryKeepAlive messages drive the frontier forward in
686    // the absence of data updates and we don't want a large `wal_sender_timeout` value to slow us
687    // down. For this reason the feedback interval is set to one second, or less if the
688    // wal_sender_timeout is less than 2 seconds.
689    let feedback_interval = match wal_sender_timeout {
690        Some(t) => std::cmp::min(Duration::from_secs(1), t.checked_div(2).unwrap()),
691        None => Duration::from_secs(1),
692    };
693
694    let mut feedback_timer = tokio::time::interval(feedback_interval);
695    // 'Delay' ensures we always tick at least 'feedback_interval'.
696    feedback_timer.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
697
698    // Postgres will return all transactions that commit *at or after* after the provided LSN,
699    // following the timely upper semantics.
700    let lsn = PgLsn::from(resume_lsn.offset);
701    let query = format!(
702        r#"START_REPLICATION SLOT "{}" LOGICAL {} ("proto_version" '1', "publication_names" {})"#,
703        Ident::new_unchecked(slot).to_ast_string_simple(),
704        lsn,
705        escaped_string_literal(publication),
706    );
707    let copy_stream = match replication_client.copy_both_simple(&query).await {
708        Ok(copy_stream) => copy_stream,
709        Err(err) if err.code() == Some(&SqlState::OBJECT_NOT_IN_PREREQUISITE_STATE) => {
710            return Ok(Err(DefiniteError::InvalidReplicationSlot));
711        }
712        Err(err) => return Err(err.into()),
713    };
714
715    // According to the documentation [1] we must check that the slot LSN matches our
716    // expectations otherwise we risk getting silently fast-forwarded to a future LSN. In order
717    // to avoid a TOCTOU issue we must do this check after starting the replication stream. We
718    // cannot use the replication client to do that because it's already in CopyBoth mode.
719    // [1] https://www.postgresql.org/docs/15/protocol-replication.html#PROTOCOL-REPLICATION-START-REPLICATION-SLOT-LOGICAL
720    let slot_metadata = super::fetch_slot_metadata(
721        &*metadata_client,
722        slot,
723        mz_storage_types::dyncfgs::PG_FETCH_SLOT_RESUME_LSN_INTERVAL
724            .get(config.config.config_set()),
725    )
726    .await?;
727    let min_resume_lsn = slot_metadata.confirmed_flush_lsn;
728    tracing::info!(
729        %config.id,
730        "started replication using backend PID={:?}. wal_sender_timeout={:?}",
731        slot_metadata.active_pid, wal_sender_timeout
732    );
733
734    let (probe_tx, mut probe_rx) = watch::channel(None);
735    let timestamp_interval = config.timestamp_interval;
736    let now_fn = config.now_fn.clone();
737    let max_lsn_task_handle =
738        mz_ore::task::spawn(|| format!("pg_current_wal_lsn:{}", config.id), async move {
739            let mut probe_ticker = probe::Ticker::new(move || timestamp_interval, now_fn);
740
741            while !probe_tx.is_closed() {
742                let probe_ts = probe_ticker.tick().await;
743                let probe_or_err = super::fetch_max_lsn(&*metadata_client)
744                    .await
745                    .map(|lsn| Probe {
746                        probe_ts,
747                        upstream_frontier: Antichain::from_elem(lsn),
748                    });
749                let _ = probe_tx.send(Some(probe_or_err));
750            }
751        })
752        .abort_on_drop();
753
754    let stream = async_stream::try_stream!({
755        // Ensure we don't pre-drop the task
756        let _max_lsn_task_handle = max_lsn_task_handle;
757
758        // ensure we don't drop the replication client!
759        let _replication_client = replication_client;
760
761        let mut uppers = pin!(uppers);
762        let mut last_committed_upper = resume_lsn;
763
764        let mut stream = pin!(LogicalReplicationStream::new(copy_stream));
765
766        if !(resume_lsn == MzOffset::from(0) || min_resume_lsn <= resume_lsn) {
767            let err = TransientError::OvercompactedReplicationSlot {
768                available_lsn: min_resume_lsn,
769                requested_lsn: resume_lsn,
770            };
771            error!("timely-{} ({}) {err}", config.worker_id, config.id);
772            Err(err)?;
773        }
774
775        loop {
776            tokio::select! {
777                Some(next_message) = stream.next() => match next_message {
778                    Ok(ReplicationMessage::XLogData(data)) => {
779                        yield ReplicationMessage::XLogData(data);
780                        Ok(())
781                    }
782                    Ok(ReplicationMessage::PrimaryKeepAlive(keepalive)) => {
783                        yield ReplicationMessage::PrimaryKeepAlive(keepalive);
784                        Ok(())
785                    }
786                    Err(err) => Err(err.into()),
787                    _ => Err(TransientError::UnknownReplicationMessage),
788                },
789                _ = feedback_timer.tick() => {
790                    let ts: i64 = PG_EPOCH.elapsed().unwrap().as_micros().try_into().unwrap();
791                    let lsn = PgLsn::from(last_committed_upper.offset);
792                    trace!("timely-{} ({}) sending keepalive {lsn:?}", config.worker_id, config.id);
793                    // Postgres only sends PrimaryKeepAlive messages when *it* wants a reply, which
794                    // happens when out status update is late. Since we send them proactively this
795                    // may never happen. It is therefore *crucial* that we set the last parameter
796                    // (the reply flag) to 1 here. This will cause the upstream server to send us a
797                    // PrimaryKeepAlive message promptly which will give us frontier advancement
798                    // information in the absence of data updates.
799                    let res = stream.as_mut().standby_status_update(lsn, lsn, lsn, ts, 1).await;
800                    res.map_err(|e| e.into())
801                },
802                Some(upper) = uppers.next() => match upper.into_option() {
803                    Some(lsn) => {
804                        if last_committed_upper < lsn {
805                            last_committed_upper = lsn;
806                            for stat in config.statistics.values() {
807                                stat.set_offset_committed(last_committed_upper.offset);
808                            }
809                        }
810                        Ok(())
811                    }
812                    None => Ok(()),
813                },
814                Ok(()) = probe_rx.changed() => match &*probe_rx.borrow() {
815                    Some(Ok(probe)) => {
816                        if let Some(offset_known) = probe.upstream_frontier.as_option() {
817                            for stat in config.statistics.values() {
818                                stat.set_offset_known(offset_known.offset);
819                            }
820                        }
821                        probe_output.give(probe_cap, probe);
822                        Ok(())
823                    },
824                    Some(Err(err)) => Err(anyhow::anyhow!("{err}").into()),
825                    None => Ok(()),
826                },
827                else => return
828            }?;
829        }
830    });
831    Ok(Ok(stream))
832}
833
834/// Extracts a single transaction from the replication stream delimited by a BEGIN and COMMIT
835/// message. The BEGIN message must have already been consumed from the stream before calling this
836/// function.
837fn extract_transaction<'a>(
838    stream: impl AsyncStream<Item = Result<LogicalReplMsg, TransientError>> + 'a,
839    metadata_client: &'a Client,
840    commit_lsn: MzOffset,
841    table_info: &'a mut BTreeMap<u32, BTreeMap<usize, SourceOutputInfo>>,
842    metrics: &'a PgSourceMetrics,
843    publication: &'a str,
844) -> impl AsyncStream<Item = Result<DecodedRow, TransientError>> + 'a {
845    use LogicalReplicationMessage::*;
846    let mut row = Row::default();
847    async_stream::try_stream!({
848        let mut stream = pin!(stream);
849        metrics.transactions.inc();
850        metrics.lsn.set(commit_lsn.offset);
851        while let Some(event) = stream.try_next().await? {
852            // We can ignore keepalive messages while processing a transaction because the
853            // commit_lsn will drive progress.
854            let message = match event {
855                ReplicationMessage::XLogData(data) => data.into_data(),
856                ReplicationMessage::PrimaryKeepAlive(_) => {
857                    metrics.ignored.inc();
858                    continue;
859                }
860                _ => Err(TransientError::UnknownReplicationMessage)?,
861            };
862            metrics.total.inc();
863            match message {
864                Insert(body) if !table_info.contains_key(&body.rel_id()) => metrics.ignored.inc(),
865                Update(body) if !table_info.contains_key(&body.rel_id()) => metrics.ignored.inc(),
866                Delete(body) if !table_info.contains_key(&body.rel_id()) => metrics.ignored.inc(),
867                Relation(body) if !table_info.contains_key(&body.rel_id()) => metrics.ignored.inc(),
868                Insert(body) => {
869                    metrics.inserts.inc();
870                    let rel = body.rel_id();
871                    for (output, info) in table_info.get(&rel).into_iter().flatten() {
872                        let tuple_data = body.tuple().tuple_data();
873                        let Some(ref projection) = info.projection else {
874                            panic!("missing projection for {rel}");
875                        };
876                        let datums = projection.iter().map(|idx| &tuple_data[*idx]);
877                        let row = unpack_tuple(datums, &mut row);
878                        yield (rel, *output, row, Diff::ONE);
879                    }
880                }
881                Update(body) => match body.old_tuple() {
882                    Some(old_tuple) => {
883                        metrics.updates.inc();
884                        let new_tuple = body.new_tuple();
885                        let rel = body.rel_id();
886                        for (output, info) in table_info.get(&rel).into_iter().flatten() {
887                            let Some(ref projection) = info.projection else {
888                                panic!("missing projection for {rel}");
889                            };
890                            let old_tuple =
891                                projection.iter().map(|idx| &old_tuple.tuple_data()[*idx]);
892                            // If the new tuple contains unchanged toast values we reference the old ones
893                            let new_tuple = std::iter::zip(
894                                projection.iter().map(|idx| &new_tuple.tuple_data()[*idx]),
895                                old_tuple.clone(),
896                            )
897                            .map(|(new, old)| match new {
898                                TupleData::UnchangedToast => old,
899                                _ => new,
900                            });
901                            let old_row = unpack_tuple(old_tuple, &mut row);
902                            let new_row = unpack_tuple(new_tuple, &mut row);
903
904                            yield (rel, *output, old_row, Diff::MINUS_ONE);
905                            yield (rel, *output, new_row, Diff::ONE);
906                        }
907                    }
908                    None => {
909                        let rel = body.rel_id();
910                        for (output, _) in table_info.get(&rel).into_iter().flatten() {
911                            yield (
912                                rel,
913                                *output,
914                                Err(DefiniteError::DefaultReplicaIdentity),
915                                Diff::ONE,
916                            );
917                        }
918                    }
919                },
920                Delete(body) => match body.old_tuple() {
921                    Some(old_tuple) => {
922                        metrics.deletes.inc();
923                        let rel = body.rel_id();
924                        for (output, info) in table_info.get(&rel).into_iter().flatten() {
925                            let Some(ref projection) = info.projection else {
926                                panic!("missing projection for {rel}");
927                            };
928                            let datums = projection.iter().map(|idx| &old_tuple.tuple_data()[*idx]);
929                            let row = unpack_tuple(datums, &mut row);
930                            yield (rel, *output, row, Diff::MINUS_ONE);
931                        }
932                    }
933                    None => {
934                        let rel = body.rel_id();
935                        for (output, _) in table_info.get(&rel).into_iter().flatten() {
936                            yield (
937                                rel,
938                                *output,
939                                Err(DefiniteError::DefaultReplicaIdentity),
940                                Diff::ONE,
941                            );
942                        }
943                    }
944                },
945                Relation(body) => {
946                    let rel_id = body.rel_id();
947                    if let Some(outputs) = table_info.get_mut(&body.rel_id()) {
948                        // Because the replication stream doesn't include columns' attnums, we need
949                        // to check the current local schema against the current remote schema to
950                        // ensure e.g. we haven't received a schema update with the same terminal
951                        // column name which is actually a different column.
952                        let upstream_info = mz_postgres_util::publication_info(
953                            metadata_client,
954                            publication,
955                            Some(&[rel_id]),
956                        )
957                        .await?;
958
959                        let mut schema_errors = vec![];
960
961                        outputs.retain(|output_index, info| {
962                            match verify_schema(rel_id, info, &upstream_info) {
963                                Ok(()) => true,
964                                Err(err) => {
965                                    schema_errors.push((
966                                        rel_id,
967                                        *output_index,
968                                        Err(err),
969                                        Diff::ONE,
970                                    ));
971                                    false
972                                }
973                            }
974                        });
975                        // Recalculate projection vector for the retained valid outputs. Here we
976                        // must use the column names in the RelationBody message and not the
977                        // upstream_info obtained above, since that one represents the current
978                        // schema upstream which may be many versions head of the one we're about
979                        // to receive after this Relation message.
980                        let column_positions: BTreeMap<_, _> = body
981                            .columns()
982                            .iter()
983                            .enumerate()
984                            .map(|(idx, col)| (col.name().unwrap(), idx))
985                            .collect();
986                        for info in outputs.values_mut() {
987                            let mut projection = vec![];
988                            for col in info.desc.columns.iter() {
989                                projection.push(column_positions[&*col.name]);
990                            }
991                            info.projection = Some(projection);
992                        }
993                        for schema_error in schema_errors {
994                            yield schema_error;
995                        }
996                    }
997                }
998                Truncate(body) => {
999                    for &rel_id in body.rel_ids() {
1000                        if let Some(outputs) = table_info.get_mut(&rel_id) {
1001                            for (output, _) in std::mem::take(outputs) {
1002                                yield (
1003                                    rel_id,
1004                                    output,
1005                                    Err(DefiniteError::TableTruncated),
1006                                    Diff::ONE,
1007                                );
1008                            }
1009                        }
1010                    }
1011                }
1012                Commit(body) => {
1013                    if commit_lsn != body.commit_lsn().into() {
1014                        Err(TransientError::InvalidTransaction)?
1015                    }
1016                    return;
1017                }
1018                // TODO: We should handle origin messages and emit an error as they indicate that
1019                // the upstream performed a point in time restore so all bets are off about the
1020                // continuity of the stream.
1021                Origin(_) | Type(_) => metrics.ignored.inc(),
1022                Begin(_) => Err(TransientError::NestedTransaction)?,
1023                // The enum is marked as non_exhaustive. Better to be conservative
1024                _ => Err(TransientError::UnknownLogicalReplicationMessage)?,
1025            }
1026        }
1027        Err(TransientError::ReplicationEOF)?;
1028    })
1029}
1030
1031/// Unpacks an iterator of TupleData into a list of nullable bytes or an error if this can't be
1032/// done.
1033#[inline]
1034fn unpack_tuple<'a, I>(tuple_data: I, row: &mut Row) -> Result<Row, DefiniteError>
1035where
1036    I: IntoIterator<Item = &'a TupleData>,
1037    I::IntoIter: ExactSizeIterator,
1038{
1039    let iter = tuple_data.into_iter();
1040    let mut packer = row.packer();
1041    for data in iter {
1042        let datum = match data {
1043            TupleData::Text(bytes) => super::decode_utf8_text(bytes)?,
1044            TupleData::Null => Datum::Null,
1045            TupleData::UnchangedToast => return Err(DefiniteError::MissingToast),
1046            TupleData::Binary(_) => return Err(DefiniteError::UnexpectedBinaryData),
1047        };
1048        packer.push(datum);
1049    }
1050    Ok(row.clone())
1051}
1052
1053/// Ensures the publication exists on the server. It returns an outer transient error in case of
1054/// connection issues and an inner definite error if the publication is dropped.
1055async fn ensure_publication_exists(
1056    client: &Client,
1057    publication: &str,
1058) -> Result<Result<(), DefiniteError>, TransientError> {
1059    // Figure out the last written LSN and then add one to convert it into an upper.
1060    let result = client
1061        .query_opt(
1062            "SELECT 1 FROM pg_publication WHERE pubname = $1;",
1063            &[&publication],
1064        )
1065        .await?;
1066    match result {
1067        Some(_) => Ok(Ok(())),
1068        None => Ok(Err(DefiniteError::PublicationDropped(
1069            publication.to_owned(),
1070        ))),
1071    }
1072}
1073
1074/// Ensure the active replication timeline_id matches the one we expect such that we can safely
1075/// resume replication. It returns an outer transient error in case of
1076/// connection issues and an inner definite error if the timeline id does not match.
1077async fn ensure_replication_timeline_id(
1078    replication_client: &Client,
1079    expected_timeline_id: &u64,
1080    config_set: &ConfigSet,
1081) -> Result<Result<(), DefiniteError>, TransientError> {
1082    let timeline_id = mz_postgres_util::get_timeline_id(replication_client).await?;
1083    if timeline_id == *expected_timeline_id {
1084        Ok(Ok(()))
1085    } else {
1086        if PG_SOURCE_VALIDATE_TIMELINE.get(config_set) {
1087            Ok(Err(DefiniteError::InvalidTimelineId {
1088                expected: *expected_timeline_id,
1089                actual: timeline_id,
1090            }))
1091        } else {
1092            tracing::warn!(
1093                "Timeline ID mismatch ignored: expected={expected_timeline_id} actual={timeline_id}"
1094            );
1095            Ok(Ok(()))
1096        }
1097    }
1098}
1099
1100enum SchemaValidationError {
1101    Postgres(PostgresError),
1102    Schema {
1103        oid: u32,
1104        output_index: usize,
1105        error: DefiniteError,
1106    },
1107}
1108
1109fn spawn_schema_validator(
1110    client: Client,
1111    config: &RawSourceCreationConfig,
1112    publication: String,
1113    table_info: BTreeMap<u32, BTreeMap<usize, SourceOutputInfo>>,
1114) -> mpsc::UnboundedReceiver<SchemaValidationError> {
1115    let (tx, rx) = mpsc::unbounded_channel();
1116    let source_id = config.id;
1117    let config_set = Arc::clone(config.config.config_set());
1118
1119    mz_ore::task::spawn(|| format!("schema-validator:{}", source_id), async move {
1120        while !tx.is_closed() {
1121            trace!(%source_id, "validating schemas");
1122
1123            let validation_start = Instant::now();
1124
1125            let upstream_info = match mz_postgres_util::publication_info(
1126                &*client,
1127                &publication,
1128                Some(&table_info.keys().copied().collect::<Vec<_>>()),
1129            )
1130            .await
1131            {
1132                Ok(info) => info,
1133                Err(error) => {
1134                    let _ = tx.send(SchemaValidationError::Postgres(error));
1135                    continue;
1136                }
1137            };
1138
1139            for (&oid, outputs) in table_info.iter() {
1140                for (&output_index, info) in outputs {
1141                    if let Err(error) = verify_schema(oid, info, &upstream_info) {
1142                        trace!(
1143                            %source_id,
1144                            "schema of output index {output_index} for oid {oid} invalid",
1145                        );
1146                        let _ = tx.send(SchemaValidationError::Schema {
1147                            oid,
1148                            output_index,
1149                            error,
1150                        });
1151                    } else {
1152                        trace!(
1153                            %source_id,
1154                            "schema of output index {output_index} for oid {oid} valid",
1155                        );
1156                    }
1157                }
1158            }
1159
1160            let interval = PG_SCHEMA_VALIDATION_INTERVAL.get(&config_set);
1161            let elapsed = validation_start.elapsed();
1162            let wait = interval.saturating_sub(elapsed);
1163            tokio::time::sleep(wait).await;
1164        }
1165    });
1166
1167    rx
1168}