mz_storage/source/postgres/
replication.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Renders the logical replication side of the [`PostgresSourceConnection`] ingestion dataflow.
11//!
12//! ```text
13//!              o
14//!              │rewind
15//!              │requests
16//!          ╭───┴────╮
17//!          │exchange│ (collect all requests to one worker)
18//!          ╰───┬────╯
19//!           ┏━━v━━━━━━━━━━┓
20//!           ┃ replication ┃ (single worker)
21//!           ┃   reader    ┃
22//!           ┗━┯━━━━━━━━┯━━┛
23//!             │raw     │
24//!             │data    │
25//!        ╭────┴─────╮  │
26//!        │distribute│  │ (distribute to all workers)
27//!        ╰────┬─────╯  │
28//! ┏━━━━━━━━━━━┷━┓      │
29//! ┃ replication ┃      │ (parallel decode)
30//! ┃   decoder   ┃      │
31//! ┗━━━━━┯━━━━━━━┛      │
32//!       │ replication  │ progress
33//!       │ updates      │ output
34//!       v              v
35//! ```
36//!
37//! # Progress tracking
38//!
39//! In order to avoid causing excessive resource usage in the upstream server it's important to
40//! track the LSN that we have successfully committed to persist and communicate that back to
41//! PostgreSQL. Under normal operation this gauge of progress is provided by the presence of
42//! transactions themselves. Since at a given LSN offset there can be only a single message, when a
43//! transaction is received and processed we can infer that we have seen all the messages that are
44//! not beyond `commit_lsn + 1`.
45//!
46//! Things are a bit more complicated in the absence of transactions though because even though we
47//! don't receive any the server might very well be generating WAL records. This can happen if
48//! there is a separate logical database performing writes (which is the case for RDS databases),
49//! or, in servers running PostgreSQL version 15 or greater, the logical replication process
50//! includes an optimization that omits empty transactions, which can happen if you're only
51//! replicating a subset of the tables and there writes going to the other ones.
52//!
53//! If we fail to detect this situation and don't send LSN feedback in a timely manner the server
54//! will be forced to keep around WAL data that can eventually lead to disk space exhaustion.
55//!
56//! In the absence of transactions the only available piece of information in the replication
57//! stream are keepalive messages. Keepalive messages are documented[1] to contain the current end
58//! of WAL on the server. That is a useless number when it comes to progress tracking because there
59//! might be pending messages at LSNs between the last received commit_lsn and the current end of
60//! WAL.
61//!
62//! Fortunately for us, the documentation for PrimaryKeepalive messages is wrong and it actually
63//! contains the last *sent* LSN[2]. Here sent doesn't necessarily mean sent over the wire, but
64//! sent to the upstream process that is handling producing the logical stream. Therefore, if we
65//! receive a keepalive with a particular LSN we can be certain that there are no other replication
66//! messages at previous LSNs, because they would have been already generated and received. We
67//! therefore connect the keepalive messages directly to our capability.
68//!
69//! [1]: https://www.postgresql.org/docs/15/protocol-replication.html#PROTOCOL-REPLICATION-START-REPLICATION
70//! [2]: https://www.postgresql.org/message-id/CAFPTHDZS9O9WG02EfayBd6oONzK%2BqfUxS6AbVLJ7W%2BKECza2gg%40mail.gmail.com
71
72use std::collections::BTreeMap;
73use std::convert::Infallible;
74use std::pin::pin;
75use std::rc::Rc;
76use std::str::FromStr;
77use std::sync::Arc;
78use std::sync::LazyLock;
79use std::time::Instant;
80use std::time::{Duration, SystemTime, UNIX_EPOCH};
81
82use differential_dataflow::AsCollection;
83use futures::{FutureExt, Stream as AsyncStream, StreamExt, TryStreamExt};
84use mz_dyncfg::ConfigSet;
85use mz_ore::cast::CastFrom;
86use mz_ore::future::InTask;
87use mz_postgres_util::PostgresError;
88use mz_postgres_util::{Client, simple_query_opt};
89use mz_repr::{Datum, DatumVec, Diff, Row};
90use mz_sql_parser::ast::{Ident, display::AstDisplay};
91use mz_storage_types::dyncfgs::PG_SOURCE_VALIDATE_TIMELINE;
92use mz_storage_types::dyncfgs::{PG_OFFSET_KNOWN_INTERVAL, PG_SCHEMA_VALIDATION_INTERVAL};
93use mz_storage_types::errors::DataflowError;
94use mz_storage_types::sources::{MzOffset, PostgresSourceConnection};
95use mz_timely_util::builder_async::{
96    AsyncOutputHandle, Event as AsyncEvent, OperatorBuilder as AsyncOperatorBuilder,
97    PressOnDropButton,
98};
99use postgres_replication::LogicalReplicationStream;
100use postgres_replication::protocol::{LogicalReplicationMessage, ReplicationMessage, TupleData};
101use serde::{Deserialize, Serialize};
102use timely::container::CapacityContainerBuilder;
103use timely::dataflow::channels::pact::{Exchange, Pipeline};
104use timely::dataflow::operators::Capability;
105use timely::dataflow::operators::Concat;
106use timely::dataflow::operators::Operator;
107use timely::dataflow::operators::core::Map;
108use timely::dataflow::{Scope, Stream};
109use timely::progress::Antichain;
110use tokio::sync::{mpsc, watch};
111use tokio_postgres::error::SqlState;
112use tokio_postgres::types::PgLsn;
113use tracing::{error, trace};
114
115use crate::metrics::source::postgres::PgSourceMetrics;
116use crate::source::RawSourceCreationConfig;
117use crate::source::postgres::verify_schema;
118use crate::source::postgres::{DefiniteError, ReplicationError, SourceOutputInfo, TransientError};
119use crate::source::probe;
120use crate::source::types::{Probe, SignaledFuture, SourceMessage, StackedCollection};
121
122/// Postgres epoch is 2000-01-01T00:00:00Z
123static PG_EPOCH: LazyLock<SystemTime> =
124    LazyLock::new(|| UNIX_EPOCH + Duration::from_secs(946_684_800));
125
126// A request to rewind a snapshot taken at `snapshot_lsn` to the initial LSN of the replication
127// slot. This is accomplished by emitting `(data, 0, -diff)` for all updates `(data, lsn, diff)`
128// whose `lsn <= snapshot_lsn`. By convention the snapshot is always emitted at LSN 0.
129#[derive(Debug, Clone, Serialize, Deserialize)]
130pub(crate) struct RewindRequest {
131    /// The output index that should be rewound.
132    pub(crate) output_index: usize,
133    /// The LSN that the snapshot was taken at.
134    pub(crate) snapshot_lsn: MzOffset,
135}
136
137/// Renders the replication dataflow. See the module documentation for more information.
138pub(crate) fn render<G: Scope<Timestamp = MzOffset>>(
139    scope: G,
140    config: RawSourceCreationConfig,
141    connection: PostgresSourceConnection,
142    table_info: BTreeMap<u32, BTreeMap<usize, SourceOutputInfo>>,
143    rewind_stream: &Stream<G, RewindRequest>,
144    slot_ready_stream: &Stream<G, Infallible>,
145    committed_uppers: impl futures::Stream<Item = Antichain<MzOffset>> + 'static,
146    metrics: PgSourceMetrics,
147) -> (
148    StackedCollection<G, (usize, Result<SourceMessage, DataflowError>)>,
149    Stream<G, Infallible>,
150    Option<Stream<G, Probe<MzOffset>>>,
151    Stream<G, ReplicationError>,
152    PressOnDropButton,
153) {
154    let op_name = format!("ReplicationReader({})", config.id);
155    let mut builder = AsyncOperatorBuilder::new(op_name, scope.clone());
156
157    let slot_reader = u64::cast_from(config.responsible_worker("slot"));
158    let (data_output, data_stream) = builder.new_output();
159    let (_upper_output, upper_stream) = builder.new_output::<CapacityContainerBuilder<_>>();
160    let (definite_error_handle, definite_errors) =
161        builder.new_output::<CapacityContainerBuilder<_>>();
162    let (probe_output, probe_stream) = builder.new_output::<CapacityContainerBuilder<_>>();
163
164    let mut rewind_input =
165        builder.new_disconnected_input(rewind_stream, Exchange::new(move |_| slot_reader));
166    let mut slot_ready_input = builder.new_disconnected_input(slot_ready_stream, Pipeline);
167    let output_uppers = table_info
168        .iter()
169        .flat_map(|(_, outputs)| outputs.values().map(|o| o.resume_upper.clone()))
170        .collect::<Vec<_>>();
171    metrics.tables.set(u64::cast_from(output_uppers.len()));
172
173    let reader_table_info = table_info.clone();
174    let (button, transient_errors) = builder.build_fallible(move |caps| {
175        let mut table_info = reader_table_info;
176        let busy_signal = Arc::clone(&config.busy_signal);
177        Box::pin(SignaledFuture::new(busy_signal, async move {
178            let (id, worker_id) = (config.id, config.worker_id);
179            let [
180                data_cap_set,
181                upper_cap_set,
182                definite_error_cap_set,
183                probe_cap,
184            ]: &mut [_; 4] = caps.try_into().unwrap();
185
186            if !config.responsible_for("slot") {
187                // Emit 0, to mark this worker as having started up correctly.
188                for stat in config.statistics.values() {
189                    stat.set_offset_known(0);
190                    stat.set_offset_committed(0);
191                }
192                return Ok(());
193            }
194
195            // Determine the slot lsn.
196            let connection_config = connection
197                .connection
198                .config(
199                    &config.config.connection_context.secrets_reader,
200                    &config.config,
201                    InTask::Yes,
202                )
203                .await?;
204
205            let slot = &connection.publication_details.slot;
206            let replication_client = connection_config
207                .connect_replication(&config.config.connection_context.ssh_tunnel_manager)
208                .await?;
209
210            let metadata_client = connection_config
211                .connect(
212                    "replication metadata",
213                    &config.config.connection_context.ssh_tunnel_manager,
214                )
215                .await?;
216            let metadata_client = Arc::new(metadata_client);
217
218            while let Some(_) = slot_ready_input.next().await {
219                // Wait for the slot to be created
220            }
221
222            // The slot is always created by the snapshot operator. If the slot doesn't exist,
223            // when this check runs, this operator will return an error.
224            let slot_metadata = super::fetch_slot_metadata(
225                &*metadata_client,
226                slot,
227                mz_storage_types::dyncfgs::PG_FETCH_SLOT_RESUME_LSN_INTERVAL
228                    .get(config.config.config_set()),
229            )
230            .await?;
231
232            // We're the only application that should be using this replication
233            // slot. The only way that there can be another connection using
234            // this slot under normal operation is if there's a stale TCP
235            // connection from a prior incarnation of the source holding on to
236            // the slot. We don't want to wait for the WAL sender timeout and/or
237            // TCP keepalives to time out that connection, because these values
238            // are generally under the control of the DBA and may not time out
239            // the connection for multiple minutes, or at all. Instead we just
240            // force kill the connection that's using the slot.
241            //
242            // Note that there's a small risk that *we're* the zombie cluster
243            // that should not be using the replication slot. Kubernetes cannot
244            // 100% guarantee that only one cluster is alive at a time. However,
245            // this situation should not last long, and the worst that can
246            // happen is a bit of transient thrashing over ownership of the
247            // replication slot.
248            if let Some(active_pid) = slot_metadata.active_pid {
249                tracing::warn!(
250                    %id, %active_pid,
251                    "replication slot already in use; will attempt to kill existing connection",
252                );
253
254                match metadata_client
255                    .execute("SELECT pg_terminate_backend($1)", &[&active_pid])
256                    .await
257                {
258                    Ok(_) => {
259                        tracing::info!(
260                            "successfully killed existing connection; \
261                            starting replication is likely to succeed"
262                        );
263                        // Note that `pg_terminate_backend` does not wait for
264                        // the termination of the targeted connection to
265                        // complete. We may try to start replication before the
266                        // targeted connection has cleaned up its state. That's
267                        // okay. If that happens we'll just try again from the
268                        // top via the suspend-and-restart flow.
269                    }
270                    Err(e) => {
271                        tracing::warn!(
272                            %e,
273                            "failed to kill existing replication connection; \
274                            replication will likely fail to start"
275                        );
276                        // Continue on anyway, just in case the replication slot
277                        // is actually available. Maybe PostgreSQL has some
278                        // staleness when it reports `active_pid`, for example.
279                    }
280                }
281            }
282
283            // The overall resumption point for this source is the minimum of the resumption points
284            // contributed by each of the outputs.
285            let resume_lsn = output_uppers
286                .iter()
287                .flat_map(|f| f.elements())
288                .map(|&lsn| {
289                    // An output is either an output that has never had data committed to it or one
290                    // that has and needs to resume. We differentiate between the two by checking
291                    // whether an output wishes to "resume" from the minimum timestamp. In that case
292                    // its contribution to the overal resumption point is the earliest point available
293                    // in the slot. This information would normally be something that the storage
294                    // controller figures out in the form of an as-of frontier, but at the moment the
295                    // storage controller does not have visibility into what the replication slot is
296                    // doing.
297                    if lsn == MzOffset::from(0) {
298                        slot_metadata.confirmed_flush_lsn
299                    } else {
300                        lsn
301                    }
302                })
303                .min();
304            let Some(resume_lsn) = resume_lsn else {
305                std::future::pending::<()>().await;
306                return Ok(());
307            };
308            upper_cap_set.downgrade([&resume_lsn]);
309            trace!(%id, "timely-{worker_id} replication reader started lsn={resume_lsn}");
310
311            // Emitting an initial probe before we start waiting for rewinds ensures that we will
312            // have a timestamp binding in the remap collection while the snapshot is processed.
313            // This is important because otherwise the snapshot updates would need to be buffered
314            // in the reclock operator, instead of being spilled to S3 in the persist sink.
315            //
316            // Note that we need to fetch the probe LSN _after_ having created the replication
317            // slot, to make sure the fetched LSN will be included in the replication stream.
318            let probe_ts = (config.now_fn)().into();
319            let max_lsn = super::fetch_max_lsn(&*metadata_client).await?;
320            let probe = Probe {
321                probe_ts,
322                upstream_frontier: Antichain::from_elem(max_lsn),
323            };
324            probe_output.give(&probe_cap[0], probe);
325
326            let mut rewinds = BTreeMap::new();
327            while let Some(event) = rewind_input.next().await {
328                if let AsyncEvent::Data(_, data) = event {
329                    for req in data {
330                        if resume_lsn > req.snapshot_lsn + 1 {
331                            let err = DefiniteError::SlotCompactedPastResumePoint(
332                                req.snapshot_lsn + 1,
333                                resume_lsn,
334                            );
335                            // If the replication stream cannot be obtained from the resume point there is nothing
336                            // else to do. These errors are not retractable.
337                            for (oid, outputs) in table_info.iter() {
338                                for output_index in outputs.keys() {
339                                    // We pick `u64::MAX` as the LSN which will (in practice) never conflict
340                                    // any previously revealed portions of the TVC.
341                                    let update = (
342                                        (
343                                            *oid,
344                                            *output_index,
345                                            Err(DataflowError::from(err.clone())),
346                                        ),
347                                        MzOffset::from(u64::MAX),
348                                        Diff::ONE,
349                                    );
350                                    data_output.give_fueled(&data_cap_set[0], update).await;
351                                }
352                            }
353                            definite_error_handle.give(
354                                &definite_error_cap_set[0],
355                                ReplicationError::Definite(Rc::new(err)),
356                            );
357                            return Ok(());
358                        }
359                        rewinds.insert(req.output_index, req);
360                    }
361                }
362            }
363            trace!(%id, "timely-{worker_id} pending rewinds {rewinds:?}");
364
365            let mut committed_uppers = pin!(committed_uppers);
366
367            let stream_result = raw_stream(
368                &config,
369                replication_client,
370                Arc::clone(&metadata_client),
371                &connection.publication_details.slot,
372                &connection.publication_details.timeline_id,
373                &connection.publication,
374                resume_lsn,
375                committed_uppers.as_mut(),
376                &probe_output,
377                &probe_cap[0],
378            )
379            .await?;
380
381            let stream = match stream_result {
382                Ok(stream) => stream,
383                Err(err) => {
384                    // If the replication stream cannot be obtained in a definite way there is
385                    // nothing else to do. These errors are not retractable.
386                    for (oid, outputs) in table_info.iter() {
387                        for output_index in outputs.keys() {
388                            // We pick `u64::MAX` as the LSN which will (in practice) never conflict
389                            // any previously revealed portions of the TVC.
390                            let update = (
391                                (*oid, *output_index, Err(DataflowError::from(err.clone()))),
392                                MzOffset::from(u64::MAX),
393                                Diff::ONE,
394                            );
395                            data_output.give_fueled(&data_cap_set[0], update).await;
396                        }
397                    }
398
399                    definite_error_handle.give(
400                        &definite_error_cap_set[0],
401                        ReplicationError::Definite(Rc::new(err)),
402                    );
403                    return Ok(());
404                }
405            };
406            let mut stream = pin!(stream.peekable());
407
408            // Run the periodic schema validation on a separate task using a separate client,
409            // to prevent it from blocking the replication reading progress.
410            let ssh_tunnel_manager = &config.config.connection_context.ssh_tunnel_manager;
411            let client = connection_config
412                .connect("schema validation", ssh_tunnel_manager)
413                .await?;
414            let mut schema_errors = spawn_schema_validator(
415                client,
416                &config,
417                connection.publication.clone(),
418                table_info.clone(),
419            );
420
421            // Instead of downgrading the capability for every transaction we process we only do it
422            // if we're about to yield, which is checked at the bottom of the loop. This avoids
423            // creating excessive progress tracking traffic when there are multiple small
424            // transactions ready to go.
425            let mut data_upper = resume_lsn;
426            // A stash of reusable vectors to convert from bytes::Bytes based data, which is not
427            // compatible with `columnation`, to Vec<u8> data that is.
428            while let Some(event) = stream.as_mut().next().await {
429                use LogicalReplicationMessage::*;
430                use ReplicationMessage::*;
431                match event {
432                    Ok(XLogData(data)) => match data.data() {
433                        Begin(begin) => {
434                            let commit_lsn = MzOffset::from(begin.final_lsn());
435
436                            let mut tx = pin!(extract_transaction(
437                                stream.by_ref(),
438                                &*metadata_client,
439                                commit_lsn,
440                                &mut table_info,
441                                &metrics,
442                                &connection.publication,
443                            ));
444
445                            trace!(
446                                %id,
447                                "timely-{worker_id} extracting transaction \
448                                    at {commit_lsn}"
449                            );
450                            assert!(
451                                data_upper <= commit_lsn,
452                                "new_upper={data_upper} tx_lsn={commit_lsn}",
453                            );
454                            data_upper = commit_lsn + 1;
455                            // We are about to ingest a transaction which has the possiblity to be
456                            // very big and we certainly don't want to hold the data in memory. For
457                            // this reason we eagerly downgrade the upper capability in order for
458                            // the reclocking machinery to mint a binding that includes
459                            // this transaction and therefore be able to pass the data of the
460                            // transaction through as we stream it.
461                            upper_cap_set.downgrade([&data_upper]);
462                            while let Some((oid, output_index, event, diff)) = tx.try_next().await?
463                            {
464                                let event = event.map_err(Into::into);
465                                let mut data = (oid, output_index, event);
466                                if let Some(req) = rewinds.get(&output_index) {
467                                    if commit_lsn <= req.snapshot_lsn {
468                                        let update = (data, MzOffset::from(0), -diff);
469                                        data_output.give_fueled(&data_cap_set[0], &update).await;
470                                        data = update.0;
471                                    }
472                                }
473                                let update = (data, commit_lsn, diff);
474                                data_output.give_fueled(&data_cap_set[0], &update).await;
475                            }
476                        }
477                        _ => return Err(TransientError::BareTransactionEvent),
478                    },
479                    Ok(PrimaryKeepAlive(keepalive)) => {
480                        trace!( %id,
481                            "timely-{worker_id} received keepalive lsn={}",
482                            keepalive.wal_end()
483                        );
484
485                        // Take the opportunity to report any schema validation errors.
486                        while let Ok(error) = schema_errors.try_recv() {
487                            use SchemaValidationError::*;
488                            match error {
489                                Postgres(PostgresError::PublicationMissing(publication)) => {
490                                    let err = DefiniteError::PublicationDropped(publication);
491                                    for (oid, outputs) in table_info.iter() {
492                                        for output_index in outputs.keys() {
493                                            let update = (
494                                                (
495                                                    *oid,
496                                                    *output_index,
497                                                    Err(DataflowError::from(err.clone())),
498                                                ),
499                                                data_cap_set[0].time().clone(),
500                                                Diff::ONE,
501                                            );
502                                            data_output.give_fueled(&data_cap_set[0], update).await;
503                                        }
504                                    }
505                                    definite_error_handle.give(
506                                        &definite_error_cap_set[0],
507                                        ReplicationError::Definite(Rc::new(err)),
508                                    );
509                                    return Ok(());
510                                }
511                                Postgres(pg_error) => Err(TransientError::from(pg_error))?,
512                                Schema {
513                                    oid,
514                                    output_index,
515                                    error,
516                                } => {
517                                    let table = table_info.get_mut(&oid).unwrap();
518                                    if table.remove(&output_index).is_none() {
519                                        continue;
520                                    }
521
522                                    let update = (
523                                        (oid, output_index, Err(error.into())),
524                                        data_cap_set[0].time().clone(),
525                                        Diff::ONE,
526                                    );
527                                    data_output.give_fueled(&data_cap_set[0], update).await;
528                                }
529                            }
530                        }
531                        data_upper = std::cmp::max(data_upper, keepalive.wal_end().into());
532                    }
533                    Ok(_) => return Err(TransientError::UnknownReplicationMessage),
534                    Err(err) => return Err(err),
535                }
536
537                let will_yield = stream.as_mut().peek().now_or_never().is_none();
538                if will_yield {
539                    trace!(%id, "timely-{worker_id} yielding at lsn={data_upper}");
540                    rewinds.retain(|_, req| data_upper <= req.snapshot_lsn);
541                    // As long as there are pending rewinds we can't downgrade our data capability
542                    // since we must be able to produce data at offset 0.
543                    if rewinds.is_empty() {
544                        data_cap_set.downgrade([&data_upper]);
545                    }
546                    upper_cap_set.downgrade([&data_upper]);
547                }
548            }
549            // We never expect the replication stream to gracefully end
550            Err(TransientError::ReplicationEOF)
551        }))
552    });
553
554    // We now process the slot updates and apply the cast expressions
555    let mut final_row = Row::default();
556    let mut datum_vec = DatumVec::new();
557    let mut next_worker = (0..u64::cast_from(scope.peers()))
558        // Round robin on 1000-records basis to avoid creating tiny containers when there are a
559        // small number of updates and a large number of workers.
560        .flat_map(|w| std::iter::repeat_n(w, 1000))
561        .cycle();
562    let round_robin = Exchange::new(move |_| next_worker.next().unwrap());
563    let replication_updates = data_stream
564        .map::<Vec<_>, _, _>(Clone::clone)
565        .unary(round_robin, "PgCastReplicationRows", |_, _| {
566            move |input, output| {
567                input.for_each_time(|time, data| {
568                    let mut session = output.session(&time);
569                    for ((oid, output_index, event), time, diff) in
570                        data.flat_map(|data| data.drain(..))
571                    {
572                        let output = &table_info
573                            .get(&oid)
574                            .and_then(|outputs| outputs.get(&output_index))
575                            .expect("table_info contains all outputs");
576                        let event = event.and_then(|row| {
577                            let datums = datum_vec.borrow_with(&row);
578                            super::cast_row(&output.casts, &datums, &mut final_row)?;
579                            Ok(SourceMessage {
580                                key: Row::default(),
581                                value: final_row.clone(),
582                                metadata: Row::default(),
583                            })
584                        });
585
586                        session.give(((output_index, event), time, diff));
587                    }
588                });
589            }
590        })
591        .as_collection();
592
593    let errors = definite_errors.concat(&transient_errors.map(ReplicationError::from));
594
595    (
596        replication_updates,
597        upper_stream,
598        Some(probe_stream),
599        errors,
600        button.press_on_drop(),
601    )
602}
603
604/// Produces the logical replication stream while taking care of regularly sending standby
605/// keepalive messages with the provided `uppers` stream.
606///
607/// The returned stream will contain all transactions that whose commit LSN is beyond `resume_lsn`.
608async fn raw_stream<'a>(
609    config: &'a RawSourceCreationConfig,
610    replication_client: Client,
611    metadata_client: Arc<Client>,
612    slot: &'a str,
613    timeline_id: &'a Option<u64>,
614    publication: &'a str,
615    resume_lsn: MzOffset,
616    uppers: impl futures::Stream<Item = Antichain<MzOffset>> + 'a,
617    probe_output: &'a AsyncOutputHandle<MzOffset, CapacityContainerBuilder<Vec<Probe<MzOffset>>>>,
618    probe_cap: &'a Capability<MzOffset>,
619) -> Result<
620    Result<
621        impl AsyncStream<Item = Result<ReplicationMessage<LogicalReplicationMessage>, TransientError>>
622        + 'a,
623        DefiniteError,
624    >,
625    TransientError,
626> {
627    if let Err(err) = ensure_publication_exists(&*metadata_client, publication).await? {
628        // If the publication gets deleted there is nothing else to do. These errors
629        // are not retractable.
630        return Ok(Err(err));
631    }
632
633    // Skip the timeline ID check for sources without a known timeline ID
634    // (sources created before the timeline ID was added to the source details)
635    if let Some(expected_timeline_id) = timeline_id {
636        if let Err(err) = ensure_replication_timeline_id(
637            &replication_client,
638            expected_timeline_id,
639            config.config.config_set(),
640        )
641        .await?
642        {
643            return Ok(Err(err));
644        }
645    }
646
647    // How often a proactive standby status update message should be sent to the server.
648    //
649    // The upstream will periodically request status updates by setting the keepalive's reply field
650    // value to 1. However, we cannot rely on these messages arriving on time. For example, when
651    // the upstream is sending a big transaction its keepalive messages are queued and can be
652    // delayed arbitrarily.
653    //
654    // See: <https://www.postgresql.org/message-id/CAMsr+YE2dSfHVr7iEv1GSPZihitWX-PMkD9QALEGcTYa+sdsgg@mail.gmail.com>
655    //
656    // For this reason we query the server's timeout value and proactively send a keepalive at
657    // twice the frequency to have a healthy margin from the deadline.
658    //
659    // Note: We must use the metadata client here which is NOT in replication mode. Some Aurora
660    // Postgres versions disallow SHOW commands from within replication connection.
661    // See: https://github.com/readysettech/readyset/discussions/28#discussioncomment-4405671
662    let row = simple_query_opt(&*metadata_client, "SHOW wal_sender_timeout;")
663        .await?
664        .unwrap();
665    let wal_sender_timeout = match row.get("wal_sender_timeout") {
666        // When this parameter is zero the timeout mechanism is disabled
667        Some("0") => None,
668        Some(value) => Some(
669            mz_repr::adt::interval::Interval::from_str(value)
670                .unwrap()
671                .duration()
672                .unwrap(),
673        ),
674        None => panic!("ubiquitous parameter missing"),
675    };
676
677    // This interval controls the cadence at which we send back status updates and, crucially,
678    // request PrimaryKeepAlive messages. PrimaryKeepAlive messages drive the frontier forward in
679    // the absence of data updates and we don't want a large `wal_sender_timeout` value to slow us
680    // down. For this reason the feedback interval is set to one second, or less if the
681    // wal_sender_timeout is less than 2 seconds.
682    let feedback_interval = match wal_sender_timeout {
683        Some(t) => std::cmp::min(Duration::from_secs(1), t.checked_div(2).unwrap()),
684        None => Duration::from_secs(1),
685    };
686
687    let mut feedback_timer = tokio::time::interval(feedback_interval);
688    // 'Delay' ensures we always tick at least 'feedback_interval'.
689    feedback_timer.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
690
691    // Postgres will return all transactions that commit *at or after* after the provided LSN,
692    // following the timely upper semantics.
693    let lsn = PgLsn::from(resume_lsn.offset);
694    let query = format!(
695        r#"START_REPLICATION SLOT "{}" LOGICAL {} ("proto_version" '1', "publication_names" '{}')"#,
696        Ident::new_unchecked(slot).to_ast_string_simple(),
697        lsn,
698        publication,
699    );
700    let copy_stream = match replication_client.copy_both_simple(&query).await {
701        Ok(copy_stream) => copy_stream,
702        Err(err) if err.code() == Some(&SqlState::OBJECT_NOT_IN_PREREQUISITE_STATE) => {
703            return Ok(Err(DefiniteError::InvalidReplicationSlot));
704        }
705        Err(err) => return Err(err.into()),
706    };
707
708    // According to the documentation [1] we must check that the slot LSN matches our
709    // expectations otherwise we risk getting silently fast-forwarded to a future LSN. In order
710    // to avoid a TOCTOU issue we must do this check after starting the replication stream. We
711    // cannot use the replication client to do that because it's already in CopyBoth mode.
712    // [1] https://www.postgresql.org/docs/15/protocol-replication.html#PROTOCOL-REPLICATION-START-REPLICATION-SLOT-LOGICAL
713    let slot_metadata = super::fetch_slot_metadata(
714        &*metadata_client,
715        slot,
716        mz_storage_types::dyncfgs::PG_FETCH_SLOT_RESUME_LSN_INTERVAL
717            .get(config.config.config_set()),
718    )
719    .await?;
720    let min_resume_lsn = slot_metadata.confirmed_flush_lsn;
721    tracing::info!(
722        %config.id,
723        "started replication using backend PID={:?}. wal_sender_timeout={:?}",
724        slot_metadata.active_pid, wal_sender_timeout
725    );
726
727    let (probe_tx, mut probe_rx) = watch::channel(None);
728    let config_set = Arc::clone(config.config.config_set());
729    let now_fn = config.now_fn.clone();
730    let max_lsn_task_handle =
731        mz_ore::task::spawn(|| format!("pg_current_wal_lsn:{}", config.id), async move {
732            let mut probe_ticker =
733                probe::Ticker::new(|| PG_OFFSET_KNOWN_INTERVAL.get(&config_set), now_fn);
734
735            while !probe_tx.is_closed() {
736                let probe_ts = probe_ticker.tick().await;
737                let probe_or_err = super::fetch_max_lsn(&*metadata_client)
738                    .await
739                    .map(|lsn| Probe {
740                        probe_ts,
741                        upstream_frontier: Antichain::from_elem(lsn),
742                    });
743                let _ = probe_tx.send(Some(probe_or_err));
744            }
745        })
746        .abort_on_drop();
747
748    let stream = async_stream::try_stream!({
749        // Ensure we don't pre-drop the task
750        let _max_lsn_task_handle = max_lsn_task_handle;
751
752        // ensure we don't drop the replication client!
753        let _replication_client = replication_client;
754
755        let mut uppers = pin!(uppers);
756        let mut last_committed_upper = resume_lsn;
757
758        let mut stream = pin!(LogicalReplicationStream::new(copy_stream));
759
760        if !(resume_lsn == MzOffset::from(0) || min_resume_lsn <= resume_lsn) {
761            let err = TransientError::OvercompactedReplicationSlot {
762                available_lsn: min_resume_lsn,
763                requested_lsn: resume_lsn,
764            };
765            error!("timely-{} ({}) {err}", config.worker_id, config.id);
766            Err(err)?;
767        }
768
769        loop {
770            tokio::select! {
771                Some(next_message) = stream.next() => match next_message {
772                    Ok(ReplicationMessage::XLogData(data)) => {
773                        yield ReplicationMessage::XLogData(data);
774                        Ok(())
775                    }
776                    Ok(ReplicationMessage::PrimaryKeepAlive(keepalive)) => {
777                        yield ReplicationMessage::PrimaryKeepAlive(keepalive);
778                        Ok(())
779                    }
780                    Err(err) => Err(err.into()),
781                    _ => Err(TransientError::UnknownReplicationMessage),
782                },
783                _ = feedback_timer.tick() => {
784                    let ts: i64 = PG_EPOCH.elapsed().unwrap().as_micros().try_into().unwrap();
785                    let lsn = PgLsn::from(last_committed_upper.offset);
786                    trace!("timely-{} ({}) sending keepalive {lsn:?}", config.worker_id, config.id);
787                    // Postgres only sends PrimaryKeepAlive messages when *it* wants a reply, which
788                    // happens when out status update is late. Since we send them proactively this
789                    // may never happen. It is therefore *crucial* that we set the last parameter
790                    // (the reply flag) to 1 here. This will cause the upstream server to send us a
791                    // PrimaryKeepAlive message promptly which will give us frontier advancement
792                    // information in the absence of data updates.
793                    let res = stream.as_mut().standby_status_update(lsn, lsn, lsn, ts, 1).await;
794                    res.map_err(|e| e.into())
795                },
796                Some(upper) = uppers.next() => match upper.into_option() {
797                    Some(lsn) => {
798                        if last_committed_upper < lsn {
799                            last_committed_upper = lsn;
800                            for stat in config.statistics.values() {
801                                stat.set_offset_committed(last_committed_upper.offset);
802                            }
803                        }
804                        Ok(())
805                    }
806                    None => Ok(()),
807                },
808                Ok(()) = probe_rx.changed() => match &*probe_rx.borrow() {
809                    Some(Ok(probe)) => {
810                        if let Some(offset_known) = probe.upstream_frontier.as_option() {
811                            for stat in config.statistics.values() {
812                                stat.set_offset_known(offset_known.offset);
813                            }
814                        }
815                        probe_output.give(probe_cap, probe);
816                        Ok(())
817                    },
818                    Some(Err(err)) => Err(anyhow::anyhow!("{err}").into()),
819                    None => Ok(()),
820                },
821                else => return
822            }?;
823        }
824    });
825    Ok(Ok(stream))
826}
827
828/// Extracts a single transaction from the replication stream delimited by a BEGIN and COMMIT
829/// message. The BEGIN message must have already been consumed from the stream before calling this
830/// function.
831fn extract_transaction<'a>(
832    stream: impl AsyncStream<
833        Item = Result<ReplicationMessage<LogicalReplicationMessage>, TransientError>,
834    > + 'a,
835    metadata_client: &'a Client,
836    commit_lsn: MzOffset,
837    table_info: &'a mut BTreeMap<u32, BTreeMap<usize, SourceOutputInfo>>,
838    metrics: &'a PgSourceMetrics,
839    publication: &'a str,
840) -> impl AsyncStream<Item = Result<(u32, usize, Result<Row, DefiniteError>, Diff), TransientError>> + 'a
841{
842    use LogicalReplicationMessage::*;
843    let mut row = Row::default();
844    async_stream::try_stream!({
845        let mut stream = pin!(stream);
846        metrics.transactions.inc();
847        metrics.lsn.set(commit_lsn.offset);
848        while let Some(event) = stream.try_next().await? {
849            // We can ignore keepalive messages while processing a transaction because the
850            // commit_lsn will drive progress.
851            let message = match event {
852                ReplicationMessage::XLogData(data) => data.into_data(),
853                ReplicationMessage::PrimaryKeepAlive(_) => {
854                    metrics.ignored.inc();
855                    continue;
856                }
857                _ => Err(TransientError::UnknownReplicationMessage)?,
858            };
859            metrics.total.inc();
860            match message {
861                Insert(body) if !table_info.contains_key(&body.rel_id()) => metrics.ignored.inc(),
862                Update(body) if !table_info.contains_key(&body.rel_id()) => metrics.ignored.inc(),
863                Delete(body) if !table_info.contains_key(&body.rel_id()) => metrics.ignored.inc(),
864                Relation(body) if !table_info.contains_key(&body.rel_id()) => metrics.ignored.inc(),
865                Insert(body) => {
866                    metrics.inserts.inc();
867                    let rel = body.rel_id();
868                    for (output, info) in table_info.get(&rel).into_iter().flatten() {
869                        let tuple_data = body.tuple().tuple_data();
870                        let Some(ref projection) = info.projection else {
871                            panic!("missing projection for {rel}");
872                        };
873                        let datums = projection.iter().map(|idx| &tuple_data[*idx]);
874                        let row = unpack_tuple(datums, &mut row);
875                        yield (rel, *output, row, Diff::ONE);
876                    }
877                }
878                Update(body) => match body.old_tuple() {
879                    Some(old_tuple) => {
880                        metrics.updates.inc();
881                        let new_tuple = body.new_tuple();
882                        let rel = body.rel_id();
883                        for (output, info) in table_info.get(&rel).into_iter().flatten() {
884                            let Some(ref projection) = info.projection else {
885                                panic!("missing projection for {rel}");
886                            };
887                            let old_tuple =
888                                projection.iter().map(|idx| &old_tuple.tuple_data()[*idx]);
889                            // If the new tuple contains unchanged toast values we reference the old ones
890                            let new_tuple = std::iter::zip(
891                                projection.iter().map(|idx| &new_tuple.tuple_data()[*idx]),
892                                old_tuple.clone(),
893                            )
894                            .map(|(new, old)| match new {
895                                TupleData::UnchangedToast => old,
896                                _ => new,
897                            });
898                            let old_row = unpack_tuple(old_tuple, &mut row);
899                            let new_row = unpack_tuple(new_tuple, &mut row);
900
901                            yield (rel, *output, old_row, Diff::MINUS_ONE);
902                            yield (rel, *output, new_row, Diff::ONE);
903                        }
904                    }
905                    None => {
906                        let rel = body.rel_id();
907                        for (output, _) in table_info.get(&rel).into_iter().flatten() {
908                            yield (
909                                rel,
910                                *output,
911                                Err(DefiniteError::DefaultReplicaIdentity),
912                                Diff::ONE,
913                            );
914                        }
915                    }
916                },
917                Delete(body) => match body.old_tuple() {
918                    Some(old_tuple) => {
919                        metrics.deletes.inc();
920                        let rel = body.rel_id();
921                        for (output, info) in table_info.get(&rel).into_iter().flatten() {
922                            let Some(ref projection) = info.projection else {
923                                panic!("missing projection for {rel}");
924                            };
925                            let datums = projection.iter().map(|idx| &old_tuple.tuple_data()[*idx]);
926                            let row = unpack_tuple(datums, &mut row);
927                            yield (rel, *output, row, Diff::MINUS_ONE);
928                        }
929                    }
930                    None => {
931                        let rel = body.rel_id();
932                        for (output, _) in table_info.get(&rel).into_iter().flatten() {
933                            yield (
934                                rel,
935                                *output,
936                                Err(DefiniteError::DefaultReplicaIdentity),
937                                Diff::ONE,
938                            );
939                        }
940                    }
941                },
942                Relation(body) => {
943                    let rel_id = body.rel_id();
944                    if let Some(outputs) = table_info.get_mut(&body.rel_id()) {
945                        // Because the replication stream doesn't include columns' attnums, we need
946                        // to check the current local schema against the current remote schema to
947                        // ensure e.g. we haven't received a schema update with the same terminal
948                        // column name which is actually a different column.
949                        let upstream_info = mz_postgres_util::publication_info(
950                            metadata_client,
951                            publication,
952                            Some(&[rel_id]),
953                        )
954                        .await?;
955
956                        let mut schema_errors = vec![];
957
958                        outputs.retain(|output_index, info| {
959                            match verify_schema(rel_id, info, &upstream_info) {
960                                Ok(()) => true,
961                                Err(err) => {
962                                    schema_errors.push((
963                                        rel_id,
964                                        *output_index,
965                                        Err(err),
966                                        Diff::ONE,
967                                    ));
968                                    false
969                                }
970                            }
971                        });
972                        // Recalculate projection vector for the retained valid outputs. Here we
973                        // must use the column names in the RelationBody message and not the
974                        // upstream_info obtained above, since that one represents the current
975                        // schema upstream which may be many versions head of the one we're about
976                        // to receive after this Relation message.
977                        let column_positions: BTreeMap<_, _> = body
978                            .columns()
979                            .iter()
980                            .enumerate()
981                            .map(|(idx, col)| (col.name().unwrap(), idx))
982                            .collect();
983                        for info in outputs.values_mut() {
984                            let mut projection = vec![];
985                            for col in info.desc.columns.iter() {
986                                projection.push(column_positions[&*col.name]);
987                            }
988                            info.projection = Some(projection);
989                        }
990                        for schema_error in schema_errors {
991                            yield schema_error;
992                        }
993                    }
994                }
995                Truncate(body) => {
996                    for &rel_id in body.rel_ids() {
997                        if let Some(outputs) = table_info.get_mut(&rel_id) {
998                            for (output, _) in std::mem::take(outputs) {
999                                yield (
1000                                    rel_id,
1001                                    output,
1002                                    Err(DefiniteError::TableTruncated),
1003                                    Diff::ONE,
1004                                );
1005                            }
1006                        }
1007                    }
1008                }
1009                Commit(body) => {
1010                    if commit_lsn != body.commit_lsn().into() {
1011                        Err(TransientError::InvalidTransaction)?
1012                    }
1013                    return;
1014                }
1015                // TODO: We should handle origin messages and emit an error as they indicate that
1016                // the upstream performed a point in time restore so all bets are off about the
1017                // continuity of the stream.
1018                Origin(_) | Type(_) => metrics.ignored.inc(),
1019                Begin(_) => Err(TransientError::NestedTransaction)?,
1020                // The enum is marked as non_exhaustive. Better to be conservative
1021                _ => Err(TransientError::UnknownLogicalReplicationMessage)?,
1022            }
1023        }
1024        Err(TransientError::ReplicationEOF)?;
1025    })
1026}
1027
1028/// Unpacks an iterator of TupleData into a list of nullable bytes or an error if this can't be
1029/// done.
1030#[inline]
1031fn unpack_tuple<'a, I>(tuple_data: I, row: &mut Row) -> Result<Row, DefiniteError>
1032where
1033    I: IntoIterator<Item = &'a TupleData>,
1034    I::IntoIter: ExactSizeIterator,
1035{
1036    let iter = tuple_data.into_iter();
1037    let mut packer = row.packer();
1038    for data in iter {
1039        let datum = match data {
1040            TupleData::Text(bytes) => super::decode_utf8_text(bytes)?,
1041            TupleData::Null => Datum::Null,
1042            TupleData::UnchangedToast => return Err(DefiniteError::MissingToast),
1043            TupleData::Binary(_) => return Err(DefiniteError::UnexpectedBinaryData),
1044        };
1045        packer.push(datum);
1046    }
1047    Ok(row.clone())
1048}
1049
1050/// Ensures the publication exists on the server. It returns an outer transient error in case of
1051/// connection issues and an inner definite error if the publication is dropped.
1052async fn ensure_publication_exists(
1053    client: &Client,
1054    publication: &str,
1055) -> Result<Result<(), DefiniteError>, TransientError> {
1056    // Figure out the last written LSN and then add one to convert it into an upper.
1057    let result = client
1058        .query_opt(
1059            "SELECT 1 FROM pg_publication WHERE pubname = $1;",
1060            &[&publication],
1061        )
1062        .await?;
1063    match result {
1064        Some(_) => Ok(Ok(())),
1065        None => Ok(Err(DefiniteError::PublicationDropped(
1066            publication.to_owned(),
1067        ))),
1068    }
1069}
1070
1071/// Ensure the active replication timeline_id matches the one we expect such that we can safely
1072/// resume replication. It returns an outer transient error in case of
1073/// connection issues and an inner definite error if the timeline id does not match.
1074async fn ensure_replication_timeline_id(
1075    replication_client: &Client,
1076    expected_timeline_id: &u64,
1077    config_set: &ConfigSet,
1078) -> Result<Result<(), DefiniteError>, TransientError> {
1079    let timeline_id = mz_postgres_util::get_timeline_id(replication_client).await?;
1080    if timeline_id == *expected_timeline_id {
1081        Ok(Ok(()))
1082    } else {
1083        if PG_SOURCE_VALIDATE_TIMELINE.get(config_set) {
1084            Ok(Err(DefiniteError::InvalidTimelineId {
1085                expected: *expected_timeline_id,
1086                actual: timeline_id,
1087            }))
1088        } else {
1089            tracing::warn!(
1090                "Timeline ID mismatch ignored: expected={expected_timeline_id} actual={timeline_id}"
1091            );
1092            Ok(Ok(()))
1093        }
1094    }
1095}
1096
1097enum SchemaValidationError {
1098    Postgres(PostgresError),
1099    Schema {
1100        oid: u32,
1101        output_index: usize,
1102        error: DefiniteError,
1103    },
1104}
1105
1106fn spawn_schema_validator(
1107    client: Client,
1108    config: &RawSourceCreationConfig,
1109    publication: String,
1110    table_info: BTreeMap<u32, BTreeMap<usize, SourceOutputInfo>>,
1111) -> mpsc::UnboundedReceiver<SchemaValidationError> {
1112    let (tx, rx) = mpsc::unbounded_channel();
1113    let source_id = config.id;
1114    let config_set = Arc::clone(config.config.config_set());
1115
1116    mz_ore::task::spawn(|| format!("schema-validator:{}", source_id), async move {
1117        while !tx.is_closed() {
1118            trace!(%source_id, "validating schemas");
1119
1120            let validation_start = Instant::now();
1121
1122            let upstream_info = match mz_postgres_util::publication_info(
1123                &*client,
1124                &publication,
1125                Some(&table_info.keys().copied().collect::<Vec<_>>()),
1126            )
1127            .await
1128            {
1129                Ok(info) => info,
1130                Err(error) => {
1131                    let _ = tx.send(SchemaValidationError::Postgres(error));
1132                    continue;
1133                }
1134            };
1135
1136            for (&oid, outputs) in table_info.iter() {
1137                for (&output_index, info) in outputs {
1138                    if let Err(error) = verify_schema(oid, info, &upstream_info) {
1139                        trace!(
1140                            %source_id,
1141                            "schema of output index {output_index} for oid {oid} invalid",
1142                        );
1143                        let _ = tx.send(SchemaValidationError::Schema {
1144                            oid,
1145                            output_index,
1146                            error,
1147                        });
1148                    } else {
1149                        trace!(
1150                            %source_id,
1151                            "schema of output index {output_index} for oid {oid} valid",
1152                        );
1153                    }
1154                }
1155            }
1156
1157            let interval = PG_SCHEMA_VALIDATION_INTERVAL.get(&config_set);
1158            let elapsed = validation_start.elapsed();
1159            let wait = interval.saturating_sub(elapsed);
1160            tokio::time::sleep(wait).await;
1161        }
1162    });
1163
1164    rx
1165}