mz_storage/source/postgres/
replication.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Renders the logical replication side of the [`PostgresSourceConnection`] ingestion dataflow.
11//!
12//! ```text
13//!              o
14//!              │rewind
15//!              │requests
16//!          ╭───┴────╮
17//!          │exchange│ (collect all requests to one worker)
18//!          ╰───┬────╯
19//!           ┏━━v━━━━━━━━━━┓
20//!           ┃ replication ┃ (single worker)
21//!           ┃   reader    ┃
22//!           ┗━┯━━━━━━━━┯━━┛
23//!             │raw     │
24//!             │data    │
25//!        ╭────┴─────╮  │
26//!        │distribute│  │ (distribute to all workers)
27//!        ╰────┬─────╯  │
28//! ┏━━━━━━━━━━━┷━┓      │
29//! ┃ replication ┃      │ (parallel decode)
30//! ┃   decoder   ┃      │
31//! ┗━━━━━┯━━━━━━━┛      │
32//!       │ replication  │ progress
33//!       │ updates      │ output
34//!       v              v
35//! ```
36//!
37//! # Progress tracking
38//!
39//! In order to avoid causing excessive resource usage in the upstream server it's important to
40//! track the LSN that we have successfully committed to persist and communicate that back to
41//! PostgreSQL. Under normal operation this gauge of progress is provided by the presence of
42//! transactions themselves. Since at a given LSN offset there can be only a single message, when a
43//! transaction is received and processed we can infer that we have seen all the messages that are
44//! not beyond `commit_lsn + 1`.
45//!
46//! Things are a bit more complicated in the absence of transactions though because even though we
47//! don't receive any the server might very well be generating WAL records. This can happen if
48//! there is a separate logical database performing writes (which is the case for RDS databases),
49//! or, in servers running PostgreSQL version 15 or greater, the logical replication process
50//! includes an optimization that omits empty transactions, which can happen if you're only
51//! replicating a subset of the tables and there writes going to the other ones.
52//!
53//! If we fail to detect this situation and don't send LSN feedback in a timely manner the server
54//! will be forced to keep around WAL data that can eventually lead to disk space exhaustion.
55//!
56//! In the absence of transactions the only available piece of information in the replication
57//! stream are keepalive messages. Keepalive messages are documented[1] to contain the current end
58//! of WAL on the server. That is a useless number when it comes to progress tracking because there
59//! might be pending messages at LSNs between the last received commit_lsn and the current end of
60//! WAL.
61//!
62//! Fortunately for us, the documentation for PrimaryKeepalive messages is wrong and it actually
63//! contains the last *sent* LSN[2]. Here sent doesn't necessarily mean sent over the wire, but
64//! sent to the upstream process that is handling producing the logical stream. Therefore, if we
65//! receive a keepalive with a particular LSN we can be certain that there are no other replication
66//! messages at previous LSNs, because they would have been already generated and received. We
67//! therefore connect the keepalive messages directly to our capability.
68//!
69//! [1]: https://www.postgresql.org/docs/15/protocol-replication.html#PROTOCOL-REPLICATION-START-REPLICATION
70//! [2]: https://www.postgresql.org/message-id/CAFPTHDZS9O9WG02EfayBd6oONzK%2BqfUxS6AbVLJ7W%2BKECza2gg%40mail.gmail.com
71
72use std::collections::BTreeMap;
73use std::convert::Infallible;
74use std::pin::pin;
75use std::rc::Rc;
76use std::str::FromStr;
77use std::sync::Arc;
78use std::sync::LazyLock;
79use std::time::Instant;
80use std::time::{Duration, SystemTime, UNIX_EPOCH};
81
82use differential_dataflow::AsCollection;
83use futures::{FutureExt, Stream as AsyncStream, StreamExt, TryStreamExt};
84use mz_ore::cast::CastFrom;
85use mz_ore::collections::HashSet;
86use mz_ore::future::InTask;
87use mz_ore::iter::IteratorExt;
88use mz_postgres_util::PostgresError;
89use mz_postgres_util::{Client, simple_query_opt};
90use mz_repr::{Datum, DatumVec, Diff, Row};
91use mz_sql_parser::ast::{Ident, display::AstDisplay};
92use mz_storage_types::dyncfgs::{PG_OFFSET_KNOWN_INTERVAL, PG_SCHEMA_VALIDATION_INTERVAL};
93use mz_storage_types::errors::DataflowError;
94use mz_storage_types::sources::SourceTimestamp;
95use mz_storage_types::sources::{MzOffset, PostgresSourceConnection};
96use mz_timely_util::builder_async::{
97    AsyncOutputHandle, Event as AsyncEvent, OperatorBuilder as AsyncOperatorBuilder,
98    PressOnDropButton,
99};
100use postgres_replication::LogicalReplicationStream;
101use postgres_replication::protocol::{LogicalReplicationMessage, ReplicationMessage, TupleData};
102use serde::{Deserialize, Serialize};
103use timely::container::CapacityContainerBuilder;
104use timely::dataflow::channels::pact::{Exchange, Pipeline};
105use timely::dataflow::channels::pushers::Tee;
106use timely::dataflow::operators::Capability;
107use timely::dataflow::operators::Concat;
108use timely::dataflow::operators::Operator;
109use timely::dataflow::operators::core::Map;
110use timely::dataflow::{Scope, Stream};
111use timely::progress::Antichain;
112use tokio::sync::{mpsc, watch};
113use tokio_postgres::error::SqlState;
114use tokio_postgres::types::PgLsn;
115use tracing::{error, trace};
116
117use crate::metrics::source::postgres::PgSourceMetrics;
118use crate::source::RawSourceCreationConfig;
119use crate::source::postgres::verify_schema;
120use crate::source::postgres::{DefiniteError, ReplicationError, SourceOutputInfo, TransientError};
121use crate::source::probe;
122use crate::source::types::{Probe, SignaledFuture, SourceMessage, StackedCollection};
123
124/// Postgres epoch is 2000-01-01T00:00:00Z
125static PG_EPOCH: LazyLock<SystemTime> =
126    LazyLock::new(|| UNIX_EPOCH + Duration::from_secs(946_684_800));
127
128// A request to rewind a snapshot taken at `snapshot_lsn` to the initial LSN of the replication
129// slot. This is accomplished by emitting `(data, 0, -diff)` for all updates `(data, lsn, diff)`
130// whose `lsn <= snapshot_lsn`. By convention the snapshot is always emitted at LSN 0.
131#[derive(Debug, Clone, Serialize, Deserialize)]
132pub(crate) struct RewindRequest {
133    /// The output index that should be rewound.
134    pub(crate) output_index: usize,
135    /// The LSN that the snapshot was taken at.
136    pub(crate) snapshot_lsn: MzOffset,
137}
138
139/// Renders the replication dataflow. See the module documentation for more information.
140pub(crate) fn render<G: Scope<Timestamp = MzOffset>>(
141    scope: G,
142    config: RawSourceCreationConfig,
143    connection: PostgresSourceConnection,
144    table_info: BTreeMap<u32, BTreeMap<usize, SourceOutputInfo>>,
145    rewind_stream: &Stream<G, RewindRequest>,
146    slot_ready_stream: &Stream<G, Infallible>,
147    committed_uppers: impl futures::Stream<Item = Antichain<MzOffset>> + 'static,
148    metrics: PgSourceMetrics,
149) -> (
150    StackedCollection<G, (usize, Result<SourceMessage, DataflowError>)>,
151    Stream<G, Infallible>,
152    Option<Stream<G, Probe<MzOffset>>>,
153    Stream<G, ReplicationError>,
154    PressOnDropButton,
155) {
156    let op_name = format!("ReplicationReader({})", config.id);
157    let mut builder = AsyncOperatorBuilder::new(op_name, scope.clone());
158
159    let slot_reader = u64::cast_from(config.responsible_worker("slot"));
160    let (data_output, data_stream) = builder.new_output();
161    let (_upper_output, upper_stream) = builder.new_output::<CapacityContainerBuilder<_>>();
162    let (definite_error_handle, definite_errors) = builder.new_output();
163    let (probe_output, probe_stream) = builder.new_output::<CapacityContainerBuilder<_>>();
164
165    let mut rewind_input =
166        builder.new_disconnected_input(rewind_stream, Exchange::new(move |_| slot_reader));
167    let mut slot_ready_input = builder.new_disconnected_input(slot_ready_stream, Pipeline);
168    let mut output_uppers = table_info
169        .iter()
170        .flat_map(|(_, outputs)| outputs.values().map(|o| o.resume_upper.clone()))
171        .collect::<Vec<_>>();
172    metrics.tables.set(u64::cast_from(output_uppers.len()));
173
174    // Include the upper of the main source output for use in calculating the initial
175    // resume upper.
176    output_uppers.push(Antichain::from_iter(
177        config
178            .source_resume_uppers
179            .get(&config.id)
180            .expect("id exists")
181            .iter()
182            .map(MzOffset::decode_row),
183    ));
184
185    let reader_table_info = table_info.clone();
186    let (button, transient_errors) = builder.build_fallible(move |caps| {
187        let table_info = reader_table_info;
188        let busy_signal = Arc::clone(&config.busy_signal);
189        Box::pin(SignaledFuture::new(busy_signal, async move {
190            let (id, worker_id) = (config.id, config.worker_id);
191            let [
192                data_cap_set,
193                upper_cap_set,
194                definite_error_cap_set,
195                probe_cap,
196            ]: &mut [_; 4] = caps.try_into().unwrap();
197
198            if !config.responsible_for("slot") {
199                // Emit 0, to mark this worker as having started up correctly.
200                for stat in config.statistics.values() {
201                    stat.set_offset_known(0);
202                    stat.set_offset_committed(0);
203                }
204                return Ok(());
205            }
206
207            // Determine the slot lsn.
208            let connection_config = connection
209                .connection
210                .config(
211                    &config.config.connection_context.secrets_reader,
212                    &config.config,
213                    InTask::Yes,
214                )
215                .await?;
216
217            let slot = &connection.publication_details.slot;
218            let replication_client = connection_config
219                .connect_replication(&config.config.connection_context.ssh_tunnel_manager)
220                .await?;
221
222            let metadata_client = connection_config
223                .connect(
224                    "replication metadata",
225                    &config.config.connection_context.ssh_tunnel_manager,
226                )
227                .await?;
228            let metadata_client = Arc::new(metadata_client);
229
230            while let Some(_) = slot_ready_input.next().await {
231                // Wait for the slot to be created
232            }
233
234            // The slot is always created by the snapshot operator. If the slot doesn't exist,
235            // when this check runs, this operator will return an error.
236            let slot_metadata = super::fetch_slot_metadata(
237                &*metadata_client,
238                slot,
239                mz_storage_types::dyncfgs::PG_FETCH_SLOT_RESUME_LSN_INTERVAL
240                    .get(config.config.config_set()),
241            )
242            .await?;
243
244            // We're the only application that should be using this replication
245            // slot. The only way that there can be another connection using
246            // this slot under normal operation is if there's a stale TCP
247            // connection from a prior incarnation of the source holding on to
248            // the slot. We don't want to wait for the WAL sender timeout and/or
249            // TCP keepalives to time out that connection, because these values
250            // are generally under the control of the DBA and may not time out
251            // the connection for multiple minutes, or at all. Instead we just
252            // force kill the connection that's using the slot.
253            //
254            // Note that there's a small risk that *we're* the zombie cluster
255            // that should not be using the replication slot. Kubernetes cannot
256            // 100% guarantee that only one cluster is alive at a time. However,
257            // this situation should not last long, and the worst that can
258            // happen is a bit of transient thrashing over ownership of the
259            // replication slot.
260            if let Some(active_pid) = slot_metadata.active_pid {
261                tracing::warn!(
262                    %id, %active_pid,
263                    "replication slot already in use; will attempt to kill existing connection",
264                );
265
266                match metadata_client
267                    .execute("SELECT pg_terminate_backend($1)", &[&active_pid])
268                    .await
269                {
270                    Ok(_) => {
271                        tracing::info!(
272                            "successfully killed existing connection; \
273                            starting replication is likely to succeed"
274                        );
275                        // Note that `pg_terminate_backend` does not wait for
276                        // the termination of the targeted connection to
277                        // complete. We may try to start replication before the
278                        // targeted connection has cleaned up its state. That's
279                        // okay. If that happens we'll just try again from the
280                        // top via the suspend-and-restart flow.
281                    }
282                    Err(e) => {
283                        tracing::warn!(
284                            %e,
285                            "failed to kill existing replication connection; \
286                            replication will likely fail to start"
287                        );
288                        // Continue on anyway, just in case the replication slot
289                        // is actually available. Maybe PostgreSQL has some
290                        // staleness when it reports `active_pid`, for example.
291                    }
292                }
293            }
294
295            // The overall resumption point for this source is the minimum of the resumption points
296            // contributed by each of the outputs.
297            let resume_lsn = output_uppers
298                .iter()
299                .flat_map(|f| f.elements())
300                .map(|&lsn| {
301                    // An output is either an output that has never had data committed to it or one
302                    // that has and needs to resume. We differentiate between the two by checking
303                    // whether an output wishes to "resume" from the minimum timestamp. In that case
304                    // its contribution to the overal resumption point is the earliest point available
305                    // in the slot. This information would normally be something that the storage
306                    // controller figures out in the form of an as-of frontier, but at the moment the
307                    // storage controller does not have visibility into what the replication slot is
308                    // doing.
309                    if lsn == MzOffset::from(0) {
310                        slot_metadata.confirmed_flush_lsn
311                    } else {
312                        lsn
313                    }
314                })
315                .min();
316            let Some(resume_lsn) = resume_lsn else {
317                return Ok(());
318            };
319            upper_cap_set.downgrade([&resume_lsn]);
320            trace!(%id, "timely-{worker_id} replication reader started lsn={resume_lsn}");
321
322            // Emitting an initial probe before we start waiting for rewinds ensures that we will
323            // have a timestamp binding in the remap collection while the snapshot is processed.
324            // This is important because otherwise the snapshot updates would need to be buffered
325            // in the reclock operator, instead of being spilled to S3 in the persist sink.
326            //
327            // Note that we need to fetch the probe LSN _after_ having created the replication
328            // slot, to make sure the fetched LSN will be included in the replication stream.
329            let probe_ts = (config.now_fn)().into();
330            let max_lsn = super::fetch_max_lsn(&*metadata_client).await?;
331            let probe = Probe {
332                probe_ts,
333                upstream_frontier: Antichain::from_elem(max_lsn),
334            };
335            probe_output.give(&probe_cap[0], probe);
336
337            let mut rewinds = BTreeMap::new();
338            while let Some(event) = rewind_input.next().await {
339                if let AsyncEvent::Data(_, data) = event {
340                    for req in data {
341                        if resume_lsn > req.snapshot_lsn + 1 {
342                            let err = DefiniteError::SlotCompactedPastResumePoint(
343                                req.snapshot_lsn + 1,
344                                resume_lsn,
345                            );
346                            // If the replication stream cannot be obtained from the resume point there is nothing
347                            // else to do. These errors are not retractable.
348                            for (oid, outputs) in table_info.iter() {
349                                for output_index in outputs.keys() {
350                                    // We pick `u64::MAX` as the LSN which will (in practice) never conflict
351                                    // any previously revealed portions of the TVC.
352                                    let update = (
353                                        (
354                                            *oid,
355                                            *output_index,
356                                            Err(DataflowError::from(err.clone())),
357                                        ),
358                                        MzOffset::from(u64::MAX),
359                                        Diff::ONE,
360                                    );
361                                    data_output.give_fueled(&data_cap_set[0], update).await;
362                                }
363                            }
364                            definite_error_handle.give(
365                                &definite_error_cap_set[0],
366                                ReplicationError::Definite(Rc::new(err)),
367                            );
368                            return Ok(());
369                        }
370                        rewinds.insert(req.output_index, req);
371                    }
372                }
373            }
374            trace!(%id, "timely-{worker_id} pending rewinds {rewinds:?}");
375
376            let mut committed_uppers = pin!(committed_uppers);
377
378            let stream_result = raw_stream(
379                &config,
380                replication_client,
381                Arc::clone(&metadata_client),
382                &connection.publication_details.slot,
383                &connection.publication_details.timeline_id,
384                &connection.publication,
385                resume_lsn,
386                committed_uppers.as_mut(),
387                &probe_output,
388                &probe_cap[0],
389            )
390            .await?;
391
392            let stream = match stream_result {
393                Ok(stream) => stream,
394                Err(err) => {
395                    // If the replication stream cannot be obtained in a definite way there is
396                    // nothing else to do. These errors are not retractable.
397                    for (oid, outputs) in table_info.iter() {
398                        for output_index in outputs.keys() {
399                            // We pick `u64::MAX` as the LSN which will (in practice) never conflict
400                            // any previously revealed portions of the TVC.
401                            let update = (
402                                (*oid, *output_index, Err(DataflowError::from(err.clone()))),
403                                MzOffset::from(u64::MAX),
404                                Diff::ONE,
405                            );
406                            data_output.give_fueled(&data_cap_set[0], update).await;
407                        }
408                    }
409
410                    definite_error_handle.give(
411                        &definite_error_cap_set[0],
412                        ReplicationError::Definite(Rc::new(err)),
413                    );
414                    return Ok(());
415                }
416            };
417            let mut stream = pin!(stream.peekable());
418
419            // Run the periodic schema validation on a separate task using a separate client,
420            // to prevent it from blocking the replication reading progress.
421            let ssh_tunnel_manager = &config.config.connection_context.ssh_tunnel_manager;
422            let client = connection_config
423                .connect("schema validation", ssh_tunnel_manager)
424                .await?;
425            let mut schema_errors = spawn_schema_validator(
426                client,
427                &config,
428                connection.publication.clone(),
429                table_info.clone(),
430            );
431
432            let mut errored = HashSet::new();
433            // Instead of downgrading the capability for every transaction we process we only do it
434            // if we're about to yield, which is checked at the bottom of the loop. This avoids
435            // creating excessive progress tracking traffic when there are multiple small
436            // transactions ready to go.
437            let mut data_upper = resume_lsn;
438            // A stash of reusable vectors to convert from bytes::Bytes based data, which is not
439            // compatible with `columnation`, to Vec<u8> data that is.
440            while let Some(event) = stream.as_mut().next().await {
441                use LogicalReplicationMessage::*;
442                use ReplicationMessage::*;
443                match event {
444                    Ok(XLogData(data)) => match data.data() {
445                        Begin(begin) => {
446                            let commit_lsn = MzOffset::from(begin.final_lsn());
447
448                            let mut tx = pin!(extract_transaction(
449                                stream.by_ref(),
450                                &*metadata_client,
451                                commit_lsn,
452                                &table_info,
453                                &metrics,
454                                &connection.publication,
455                                &mut errored
456                            ));
457
458                            trace!(
459                                %id,
460                                "timely-{worker_id} extracting transaction \
461                                    at {commit_lsn}"
462                            );
463                            assert!(
464                                data_upper <= commit_lsn,
465                                "new_upper={data_upper} tx_lsn={commit_lsn}",
466                            );
467                            data_upper = commit_lsn + 1;
468                            // We are about to ingest a transaction which has the possiblity to be
469                            // very big and we certainly don't want to hold the data in memory. For
470                            // this reason we eagerly downgrade the upper capability in order for
471                            // the reclocking machinery to mint a binding that includes
472                            // this transaction and therefore be able to pass the data of the
473                            // transaction through as we stream it.
474                            upper_cap_set.downgrade([&data_upper]);
475                            while let Some((oid, output_index, event, diff)) = tx.try_next().await?
476                            {
477                                let event = event.map_err(Into::into);
478                                let mut data = (oid, output_index, event);
479                                if let Some(req) = rewinds.get(&output_index) {
480                                    if commit_lsn <= req.snapshot_lsn {
481                                        let update = (data, MzOffset::from(0), -diff);
482                                        data_output.give_fueled(&data_cap_set[0], &update).await;
483                                        data = update.0;
484                                    }
485                                }
486                                let update = (data, commit_lsn, diff);
487                                data_output.give_fueled(&data_cap_set[0], &update).await;
488                            }
489                        }
490                        _ => return Err(TransientError::BareTransactionEvent),
491                    },
492                    Ok(PrimaryKeepAlive(keepalive)) => {
493                        trace!( %id,
494                            "timely-{worker_id} received keepalive lsn={}",
495                            keepalive.wal_end()
496                        );
497
498                        // Take the opportunity to report any schema validation errors.
499                        while let Ok(error) = schema_errors.try_recv() {
500                            use SchemaValidationError::*;
501                            match error {
502                                Postgres(PostgresError::PublicationMissing(publication)) => {
503                                    let err = DefiniteError::PublicationDropped(publication);
504                                    for (oid, outputs) in table_info.iter() {
505                                        for output_index in outputs.keys() {
506                                            let update = (
507                                                (
508                                                    *oid,
509                                                    *output_index,
510                                                    Err(DataflowError::from(err.clone())),
511                                                ),
512                                                data_cap_set[0].time().clone(),
513                                                Diff::ONE,
514                                            );
515                                            data_output.give_fueled(&data_cap_set[0], update).await;
516                                        }
517                                    }
518                                    definite_error_handle.give(
519                                        &definite_error_cap_set[0],
520                                        ReplicationError::Definite(Rc::new(err)),
521                                    );
522                                    return Ok(());
523                                }
524                                Postgres(pg_error) => Err(TransientError::from(pg_error))?,
525                                Schema {
526                                    oid,
527                                    output_index,
528                                    error,
529                                } => {
530                                    if errored.contains(&output_index) {
531                                        continue;
532                                    }
533
534                                    let update = (
535                                        (oid, output_index, Err(error.into())),
536                                        data_cap_set[0].time().clone(),
537                                        Diff::ONE,
538                                    );
539                                    data_output.give_fueled(&data_cap_set[0], update).await;
540                                    errored.insert(output_index);
541                                }
542                            }
543                        }
544                        data_upper = std::cmp::max(data_upper, keepalive.wal_end().into());
545                    }
546                    Ok(_) => return Err(TransientError::UnknownReplicationMessage),
547                    Err(err) => return Err(err),
548                }
549
550                let will_yield = stream.as_mut().peek().now_or_never().is_none();
551                if will_yield {
552                    trace!(%id, "timely-{worker_id} yielding at lsn={data_upper}");
553                    rewinds.retain(|_, req| data_upper <= req.snapshot_lsn);
554                    // As long as there are pending rewinds we can't downgrade our data capability
555                    // since we must be able to produce data at offset 0.
556                    if rewinds.is_empty() {
557                        data_cap_set.downgrade([&data_upper]);
558                    }
559                    upper_cap_set.downgrade([&data_upper]);
560                }
561            }
562            // We never expect the replication stream to gracefully end
563            Err(TransientError::ReplicationEOF)
564        }))
565    });
566
567    // We now process the slot updates and apply the cast expressions
568    let mut final_row = Row::default();
569    let mut datum_vec = DatumVec::new();
570    let mut next_worker = (0..u64::cast_from(scope.peers()))
571        // Round robin on 1000-records basis to avoid creating tiny containers when there are a
572        // small number of updates and a large number of workers.
573        .flat_map(|w| std::iter::repeat_n(w, 1000))
574        .cycle();
575    let round_robin = Exchange::new(move |_| next_worker.next().unwrap());
576    let replication_updates = data_stream
577        .map::<Vec<_>, _, _>(Clone::clone)
578        .unary(round_robin, "PgCastReplicationRows", |_, _| {
579            move |input, output| {
580                while let Some((time, data)) = input.next() {
581                    let mut session = output.session(&time);
582                    for ((oid, output_index, event), time, diff) in data.drain(..) {
583                        let output = &table_info
584                            .get(&oid)
585                            .and_then(|outputs| outputs.get(&output_index))
586                            .expect("table_info contains all outputs");
587                        let event = event.and_then(|row| {
588                            let datums = datum_vec.borrow_with(&row);
589                            super::cast_row(&output.casts, &datums, &mut final_row)?;
590                            Ok(SourceMessage {
591                                key: Row::default(),
592                                value: final_row.clone(),
593                                metadata: Row::default(),
594                            })
595                        });
596
597                        session.give(((output_index, event), time, diff));
598                    }
599                }
600            }
601        })
602        .as_collection();
603
604    let errors = definite_errors.concat(&transient_errors.map(ReplicationError::from));
605
606    (
607        replication_updates,
608        upper_stream,
609        Some(probe_stream),
610        errors,
611        button.press_on_drop(),
612    )
613}
614
615/// Produces the logical replication stream while taking care of regularly sending standby
616/// keepalive messages with the provided `uppers` stream.
617///
618/// The returned stream will contain all transactions that whose commit LSN is beyond `resume_lsn`.
619async fn raw_stream<'a>(
620    config: &'a RawSourceCreationConfig,
621    replication_client: Client,
622    metadata_client: Arc<Client>,
623    slot: &'a str,
624    timeline_id: &'a Option<u64>,
625    publication: &'a str,
626    resume_lsn: MzOffset,
627    uppers: impl futures::Stream<Item = Antichain<MzOffset>> + 'a,
628    probe_output: &'a AsyncOutputHandle<
629        MzOffset,
630        CapacityContainerBuilder<Vec<Probe<MzOffset>>>,
631        Tee<MzOffset, Vec<Probe<MzOffset>>>,
632    >,
633    probe_cap: &'a Capability<MzOffset>,
634) -> Result<
635    Result<
636        impl AsyncStream<Item = Result<ReplicationMessage<LogicalReplicationMessage>, TransientError>>
637        + 'a,
638        DefiniteError,
639    >,
640    TransientError,
641> {
642    if let Err(err) = ensure_publication_exists(&*metadata_client, publication).await? {
643        // If the publication gets deleted there is nothing else to do. These errors
644        // are not retractable.
645        return Ok(Err(err));
646    }
647
648    // Skip the timeline ID check for sources without a known timeline ID
649    // (sources created before the timeline ID was added to the source details)
650    if let Some(expected_timeline_id) = timeline_id {
651        if let Err(err) =
652            ensure_replication_timeline_id(&replication_client, expected_timeline_id).await?
653        {
654            return Ok(Err(err));
655        }
656    }
657
658    // How often a proactive standby status update message should be sent to the server.
659    //
660    // The upstream will periodically request status updates by setting the keepalive's reply field
661    // value to 1. However, we cannot rely on these messages arriving on time. For example, when
662    // the upstream is sending a big transaction its keepalive messages are queued and can be
663    // delayed arbitrarily.
664    //
665    // See: <https://www.postgresql.org/message-id/CAMsr+YE2dSfHVr7iEv1GSPZihitWX-PMkD9QALEGcTYa+sdsgg@mail.gmail.com>
666    //
667    // For this reason we query the server's timeout value and proactively send a keepalive at
668    // twice the frequency to have a healthy margin from the deadline.
669    //
670    // Note: We must use the metadata client here which is NOT in replication mode. Some Aurora
671    // Postgres versions disallow SHOW commands from within replication connection.
672    // See: https://github.com/readysettech/readyset/discussions/28#discussioncomment-4405671
673    let row = simple_query_opt(&*metadata_client, "SHOW wal_sender_timeout;")
674        .await?
675        .unwrap();
676    let wal_sender_timeout = match row.get("wal_sender_timeout") {
677        // When this parameter is zero the timeout mechanism is disabled
678        Some("0") => None,
679        Some(value) => Some(
680            mz_repr::adt::interval::Interval::from_str(value)
681                .unwrap()
682                .duration()
683                .unwrap(),
684        ),
685        None => panic!("ubiquitous parameter missing"),
686    };
687
688    // This interval controls the cadence at which we send back status updates and, crucially,
689    // request PrimaryKeepAlive messages. PrimaryKeepAlive messages drive the frontier forward in
690    // the absence of data updates and we don't want a large `wal_sender_timeout` value to slow us
691    // down. For this reason the feedback interval is set to one second, or less if the
692    // wal_sender_timeout is less than 2 seconds.
693    let feedback_interval = match wal_sender_timeout {
694        Some(t) => std::cmp::min(Duration::from_secs(1), t.checked_div(2).unwrap()),
695        None => Duration::from_secs(1),
696    };
697
698    let mut feedback_timer = tokio::time::interval(feedback_interval);
699    // 'Delay' ensures we always tick at least 'feedback_interval'.
700    feedback_timer.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
701
702    // Postgres will return all transactions that commit *at or after* after the provided LSN,
703    // following the timely upper semantics.
704    let lsn = PgLsn::from(resume_lsn.offset);
705    let query = format!(
706        r#"START_REPLICATION SLOT "{}" LOGICAL {} ("proto_version" '1', "publication_names" '{}')"#,
707        Ident::new_unchecked(slot).to_ast_string_simple(),
708        lsn,
709        publication,
710    );
711    let copy_stream = match replication_client.copy_both_simple(&query).await {
712        Ok(copy_stream) => copy_stream,
713        Err(err) if err.code() == Some(&SqlState::OBJECT_NOT_IN_PREREQUISITE_STATE) => {
714            return Ok(Err(DefiniteError::InvalidReplicationSlot));
715        }
716        Err(err) => return Err(err.into()),
717    };
718
719    // According to the documentation [1] we must check that the slot LSN matches our
720    // expectations otherwise we risk getting silently fast-forwarded to a future LSN. In order
721    // to avoid a TOCTOU issue we must do this check after starting the replication stream. We
722    // cannot use the replication client to do that because it's already in CopyBoth mode.
723    // [1] https://www.postgresql.org/docs/15/protocol-replication.html#PROTOCOL-REPLICATION-START-REPLICATION-SLOT-LOGICAL
724    let slot_metadata = super::fetch_slot_metadata(
725        &*metadata_client,
726        slot,
727        mz_storage_types::dyncfgs::PG_FETCH_SLOT_RESUME_LSN_INTERVAL
728            .get(config.config.config_set()),
729    )
730    .await?;
731    let min_resume_lsn = slot_metadata.confirmed_flush_lsn;
732    tracing::info!(
733        %config.id,
734        "started replication using backend PID={:?}. wal_sender_timeout={:?}",
735        slot_metadata.active_pid, wal_sender_timeout
736    );
737
738    let (probe_tx, mut probe_rx) = watch::channel(None);
739    let config_set = Arc::clone(config.config.config_set());
740    let now_fn = config.now_fn.clone();
741    let max_lsn_task_handle =
742        mz_ore::task::spawn(|| format!("pg_current_wal_lsn:{}", config.id), async move {
743            let mut probe_ticker =
744                probe::Ticker::new(|| PG_OFFSET_KNOWN_INTERVAL.get(&config_set), now_fn);
745
746            while !probe_tx.is_closed() {
747                let probe_ts = probe_ticker.tick().await;
748                let probe_or_err = super::fetch_max_lsn(&*metadata_client)
749                    .await
750                    .map(|lsn| Probe {
751                        probe_ts,
752                        upstream_frontier: Antichain::from_elem(lsn),
753                    });
754                let _ = probe_tx.send(Some(probe_or_err));
755            }
756        })
757        .abort_on_drop();
758
759    let stream = async_stream::try_stream!({
760        // Ensure we don't pre-drop the task
761        let _max_lsn_task_handle = max_lsn_task_handle;
762
763        // ensure we don't drop the replication client!
764        let _replication_client = replication_client;
765
766        let mut uppers = pin!(uppers);
767        let mut last_committed_upper = resume_lsn;
768
769        let mut stream = pin!(LogicalReplicationStream::new(copy_stream));
770
771        if !(resume_lsn == MzOffset::from(0) || min_resume_lsn <= resume_lsn) {
772            let err = TransientError::OvercompactedReplicationSlot {
773                available_lsn: min_resume_lsn,
774                requested_lsn: resume_lsn,
775            };
776            error!("timely-{} ({}) {err}", config.worker_id, config.id);
777            Err(err)?;
778        }
779
780        loop {
781            tokio::select! {
782                Some(next_message) = stream.next() => match next_message {
783                    Ok(ReplicationMessage::XLogData(data)) => {
784                        yield ReplicationMessage::XLogData(data);
785                        Ok(())
786                    }
787                    Ok(ReplicationMessage::PrimaryKeepAlive(keepalive)) => {
788                        yield ReplicationMessage::PrimaryKeepAlive(keepalive);
789                        Ok(())
790                    }
791                    Err(err) => Err(err.into()),
792                    _ => Err(TransientError::UnknownReplicationMessage),
793                },
794                _ = feedback_timer.tick() => {
795                    let ts: i64 = PG_EPOCH.elapsed().unwrap().as_micros().try_into().unwrap();
796                    let lsn = PgLsn::from(last_committed_upper.offset);
797                    trace!("timely-{} ({}) sending keepalive {lsn:?}", config.worker_id, config.id);
798                    // Postgres only sends PrimaryKeepAlive messages when *it* wants a reply, which
799                    // happens when out status update is late. Since we send them proactively this
800                    // may never happen. It is therefore *crucial* that we set the last parameter
801                    // (the reply flag) to 1 here. This will cause the upstream server to send us a
802                    // PrimaryKeepAlive message promptly which will give us frontier advancement
803                    // information in the absence of data updates.
804                    let res = stream.as_mut().standby_status_update(lsn, lsn, lsn, ts, 1).await;
805                    res.map_err(|e| e.into())
806                },
807                Some(upper) = uppers.next() => match upper.into_option() {
808                    Some(lsn) => {
809                        if last_committed_upper < lsn {
810                            last_committed_upper = lsn;
811                            for stat in config.statistics.values() {
812                                stat.set_offset_committed(last_committed_upper.offset);
813                            }
814                        }
815                        Ok(())
816                    }
817                    None => Ok(()),
818                },
819                Ok(()) = probe_rx.changed() => match &*probe_rx.borrow() {
820                    Some(Ok(probe)) => {
821                        if let Some(offset_known) = probe.upstream_frontier.as_option() {
822                            for stat in config.statistics.values() {
823                                stat.set_offset_known(offset_known.offset);
824                            }
825                        }
826                        probe_output.give(probe_cap, probe);
827                        Ok(())
828                    },
829                    Some(Err(err)) => Err(anyhow::anyhow!("{err}").into()),
830                    None => Ok(()),
831                },
832                else => return
833            }?;
834        }
835    });
836    Ok(Ok(stream))
837}
838
839/// Extracts a single transaction from the replication stream delimited by a BEGIN and COMMIT
840/// message. The BEGIN message must have already been consumed from the stream before calling this
841/// function.
842fn extract_transaction<'a>(
843    stream: impl AsyncStream<
844        Item = Result<ReplicationMessage<LogicalReplicationMessage>, TransientError>,
845    > + 'a,
846    metadata_client: &'a Client,
847    commit_lsn: MzOffset,
848    table_info: &'a BTreeMap<u32, BTreeMap<usize, SourceOutputInfo>>,
849    metrics: &'a PgSourceMetrics,
850    publication: &'a str,
851    errored_outputs: &'a mut HashSet<usize>,
852) -> impl AsyncStream<Item = Result<(u32, usize, Result<Row, DefiniteError>, Diff), TransientError>> + 'a
853{
854    use LogicalReplicationMessage::*;
855    let mut row = Row::default();
856    async_stream::try_stream!({
857        let mut stream = pin!(stream);
858        metrics.transactions.inc();
859        metrics.lsn.set(commit_lsn.offset);
860        while let Some(event) = stream.try_next().await? {
861            // We can ignore keepalive messages while processing a transaction because the
862            // commit_lsn will drive progress.
863            let message = match event {
864                ReplicationMessage::XLogData(data) => data.into_data(),
865                ReplicationMessage::PrimaryKeepAlive(_) => {
866                    metrics.ignored.inc();
867                    continue;
868                }
869                _ => Err(TransientError::UnknownReplicationMessage)?,
870            };
871            metrics.total.inc();
872            match message {
873                Insert(body) if !table_info.contains_key(&body.rel_id()) => metrics.ignored.inc(),
874                Update(body) if !table_info.contains_key(&body.rel_id()) => metrics.ignored.inc(),
875                Delete(body) if !table_info.contains_key(&body.rel_id()) => metrics.ignored.inc(),
876                Relation(body) if !table_info.contains_key(&body.rel_id()) => metrics.ignored.inc(),
877                Insert(body) => {
878                    metrics.inserts.inc();
879                    let row = unpack_tuple(body.tuple().tuple_data(), &mut row);
880                    let rel = body.rel_id();
881                    for ((output, _), row) in table_info
882                        .get(&rel)
883                        .map(|o| o.iter().filter(|(o, _)| !errored_outputs.contains(o)))
884                        .into_iter()
885                        .flatten()
886                        .repeat_clone(row)
887                    {
888                        yield (rel, *output, row, Diff::ONE);
889                    }
890                }
891                Update(body) => match body.old_tuple() {
892                    Some(old_tuple) => {
893                        metrics.updates.inc();
894                        // If the new tuple contains unchanged toast values we reference the old ones
895                        let new_tuple =
896                            std::iter::zip(body.new_tuple().tuple_data(), old_tuple.tuple_data())
897                                .map(|(new, old)| match new {
898                                    TupleData::UnchangedToast => old,
899                                    _ => new,
900                                });
901                        let old_row = unpack_tuple(old_tuple.tuple_data(), &mut row);
902                        let new_row = unpack_tuple(new_tuple, &mut row);
903                        let rel = body.rel_id();
904                        for ((output, _), (old_row, new_row)) in table_info
905                            .get(&rel)
906                            .map(|o| o.iter().filter(|(o, _)| !errored_outputs.contains(o)))
907                            .into_iter()
908                            .flatten()
909                            .repeat_clone((old_row, new_row))
910                        {
911                            yield (rel, *output, old_row, Diff::MINUS_ONE);
912                            yield (rel, *output, new_row, Diff::ONE);
913                        }
914                    }
915                    None => {
916                        let rel = body.rel_id();
917                        for (output, _) in table_info
918                            .get(&rel)
919                            .map(|o| o.iter().filter(|(o, _)| !errored_outputs.contains(o)))
920                            .into_iter()
921                            .flatten()
922                        {
923                            yield (
924                                rel,
925                                *output,
926                                Err(DefiniteError::DefaultReplicaIdentity),
927                                Diff::ONE,
928                            );
929                        }
930                    }
931                },
932                Delete(body) => match body.old_tuple() {
933                    Some(old_tuple) => {
934                        metrics.deletes.inc();
935                        let row = unpack_tuple(old_tuple.tuple_data(), &mut row);
936                        let rel = body.rel_id();
937                        for ((output, _), row) in table_info
938                            .get(&rel)
939                            .map(|o| o.iter().filter(|(o, _)| !errored_outputs.contains(o)))
940                            .into_iter()
941                            .flatten()
942                            .repeat_clone(row)
943                        {
944                            yield (rel, *output, row, Diff::MINUS_ONE);
945                        }
946                    }
947                    None => {
948                        let rel = body.rel_id();
949                        for (output, _) in table_info
950                            .get(&rel)
951                            .map(|o| o.iter().filter(|(o, _)| !errored_outputs.contains(o)))
952                            .into_iter()
953                            .flatten()
954                        {
955                            yield (
956                                rel,
957                                *output,
958                                Err(DefiniteError::DefaultReplicaIdentity),
959                                Diff::ONE,
960                            );
961                        }
962                    }
963                },
964                Relation(body) => {
965                    let rel_id = body.rel_id();
966                    let valid_outputs = table_info
967                        .get(&rel_id)
968                        .map(|o| o.iter().filter(|(o, _)| !errored_outputs.contains(o)))
969                        .into_iter()
970                        .flatten()
971                        .collect::<Vec<_>>();
972                    if valid_outputs.len() > 0 {
973                        // Because the replication stream doesn't include columns' attnums, we need
974                        // to check the current local schema against the current remote schema to
975                        // ensure e.g. we haven't received a schema update with the same terminal
976                        // column name which is actually a different column.
977                        let oids = std::iter::once(rel_id)
978                            .chain(table_info.keys().copied())
979                            .collect::<Vec<_>>();
980                        let upstream_info = mz_postgres_util::publication_info(
981                            metadata_client,
982                            publication,
983                            Some(&oids),
984                        )
985                        .await?;
986
987                        for (output_index, output) in valid_outputs {
988                            if let Err(err) =
989                                verify_schema(rel_id, &output.desc, &upstream_info, &output.casts)
990                            {
991                                errored_outputs.insert(*output_index);
992                                yield (rel_id, *output_index, Err(err), Diff::ONE);
993                            }
994
995                            // Error any dropped tables.
996                            for (oid, outputs) in table_info {
997                                if !upstream_info.contains_key(oid) {
998                                    for output in outputs.keys() {
999                                        if errored_outputs.insert(*output) {
1000                                            // Minimize the number of excessive errors
1001                                            // this will generate.
1002                                            yield (
1003                                                *oid,
1004                                                *output,
1005                                                Err(DefiniteError::TableDropped),
1006                                                Diff::ONE,
1007                                            );
1008                                        }
1009                                    }
1010                                }
1011                            }
1012                        }
1013                    }
1014                }
1015                Truncate(body) => {
1016                    for &rel_id in body.rel_ids() {
1017                        if let Some(outputs) = table_info.get(&rel_id) {
1018                            for (output, _) in outputs {
1019                                if errored_outputs.insert(*output) {
1020                                    yield (
1021                                        rel_id,
1022                                        *output,
1023                                        Err(DefiniteError::TableTruncated),
1024                                        Diff::ONE,
1025                                    );
1026                                }
1027                            }
1028                        }
1029                    }
1030                }
1031                Commit(body) => {
1032                    if commit_lsn != body.commit_lsn().into() {
1033                        Err(TransientError::InvalidTransaction)?
1034                    }
1035                    return;
1036                }
1037                // TODO: We should handle origin messages and emit an error as they indicate that
1038                // the upstream performed a point in time restore so all bets are off about the
1039                // continuity of the stream.
1040                Origin(_) | Type(_) => metrics.ignored.inc(),
1041                Begin(_) => Err(TransientError::NestedTransaction)?,
1042                // The enum is marked as non_exhaustive. Better to be conservative
1043                _ => Err(TransientError::UnknownLogicalReplicationMessage)?,
1044            }
1045        }
1046        Err(TransientError::ReplicationEOF)?;
1047    })
1048}
1049
1050/// Unpacks an iterator of TupleData into a list of nullable bytes or an error if this can't be
1051/// done.
1052#[inline]
1053fn unpack_tuple<'a, I>(tuple_data: I, row: &mut Row) -> Result<Row, DefiniteError>
1054where
1055    I: IntoIterator<Item = &'a TupleData>,
1056    I::IntoIter: ExactSizeIterator,
1057{
1058    let iter = tuple_data.into_iter();
1059    let mut packer = row.packer();
1060    for data in iter {
1061        let datum = match data {
1062            TupleData::Text(bytes) => super::decode_utf8_text(bytes)?,
1063            TupleData::Null => Datum::Null,
1064            TupleData::UnchangedToast => return Err(DefiniteError::MissingToast),
1065            TupleData::Binary(_) => return Err(DefiniteError::UnexpectedBinaryData),
1066        };
1067        packer.push(datum);
1068    }
1069    Ok(row.clone())
1070}
1071
1072/// Ensures the publication exists on the server. It returns an outer transient error in case of
1073/// connection issues and an inner definite error if the publication is dropped.
1074async fn ensure_publication_exists(
1075    client: &Client,
1076    publication: &str,
1077) -> Result<Result<(), DefiniteError>, TransientError> {
1078    // Figure out the last written LSN and then add one to convert it into an upper.
1079    let result = client
1080        .query_opt(
1081            "SELECT 1 FROM pg_publication WHERE pubname = $1;",
1082            &[&publication],
1083        )
1084        .await?;
1085    match result {
1086        Some(_) => Ok(Ok(())),
1087        None => Ok(Err(DefiniteError::PublicationDropped(
1088            publication.to_owned(),
1089        ))),
1090    }
1091}
1092
1093/// Ensure the active replication timeline_id matches the one we expect such that we can safely
1094/// resume replication. It returns an outer transient error in case of
1095/// connection issues and an inner definite error if the timeline id does not match.
1096async fn ensure_replication_timeline_id(
1097    replication_client: &Client,
1098    expected_timeline_id: &u64,
1099) -> Result<Result<(), DefiniteError>, TransientError> {
1100    let timeline_id = mz_postgres_util::get_timeline_id(replication_client).await?;
1101    if timeline_id == *expected_timeline_id {
1102        Ok(Ok(()))
1103    } else {
1104        Ok(Err(DefiniteError::InvalidTimelineId {
1105            expected: *expected_timeline_id,
1106            actual: timeline_id,
1107        }))
1108    }
1109}
1110
1111enum SchemaValidationError {
1112    Postgres(PostgresError),
1113    Schema {
1114        oid: u32,
1115        output_index: usize,
1116        error: DefiniteError,
1117    },
1118}
1119
1120fn spawn_schema_validator(
1121    client: Client,
1122    config: &RawSourceCreationConfig,
1123    publication: String,
1124    table_info: BTreeMap<u32, BTreeMap<usize, SourceOutputInfo>>,
1125) -> mpsc::UnboundedReceiver<SchemaValidationError> {
1126    let (tx, rx) = mpsc::unbounded_channel();
1127    let source_id = config.id;
1128    let config_set = Arc::clone(config.config.config_set());
1129
1130    mz_ore::task::spawn(|| format!("schema-validator:{}", source_id), async move {
1131        while !tx.is_closed() {
1132            trace!(%source_id, "validating schemas");
1133
1134            let validation_start = Instant::now();
1135
1136            let upstream_info = match mz_postgres_util::publication_info(
1137                &*client,
1138                &publication,
1139                Some(&table_info.keys().copied().collect::<Vec<_>>()),
1140            )
1141            .await
1142            {
1143                Ok(info) => info,
1144                Err(error) => {
1145                    let _ = tx.send(SchemaValidationError::Postgres(error));
1146                    continue;
1147                }
1148            };
1149
1150            for (&oid, outputs) in table_info.iter() {
1151                for (&output_index, output_info) in outputs {
1152                    let expected_desc = &output_info.desc;
1153                    let casts = &output_info.casts;
1154                    if let Err(error) = verify_schema(oid, expected_desc, &upstream_info, casts) {
1155                        trace!(
1156                            %source_id,
1157                            "schema of output index {output_index} for oid {oid} invalid",
1158                        );
1159                        let _ = tx.send(SchemaValidationError::Schema {
1160                            oid,
1161                            output_index,
1162                            error,
1163                        });
1164                    } else {
1165                        trace!(
1166                            %source_id,
1167                            "schema of output index {output_index} for oid {oid} valid",
1168                        );
1169                    }
1170                }
1171            }
1172
1173            let interval = PG_SCHEMA_VALIDATION_INTERVAL.get(&config_set);
1174            let elapsed = validation_start.elapsed();
1175            let wait = interval.saturating_sub(elapsed);
1176            tokio::time::sleep(wait).await;
1177        }
1178    });
1179
1180    rx
1181}