Skip to main content

mz_storage/source/postgres/
replication.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Renders the logical replication side of the [`PostgresSourceConnection`] ingestion dataflow.
11//!
12//! ```text
13//!              o
14//!              │rewind
15//!              │requests
16//!          ╭───┴────╮
17//!          │exchange│ (collect all requests to one worker)
18//!          ╰───┬────╯
19//!           ┏━━v━━━━━━━━━━┓
20//!           ┃ replication ┃ (single worker)
21//!           ┃   reader    ┃
22//!           ┗━┯━━━━━━━━┯━━┛
23//!             │raw     │
24//!             │data    │
25//!        ╭────┴─────╮  │
26//!        │distribute│  │ (distribute to all workers)
27//!        ╰────┬─────╯  │
28//! ┏━━━━━━━━━━━┷━┓      │
29//! ┃ replication ┃      │ (parallel decode)
30//! ┃   decoder   ┃      │
31//! ┗━━━━━┯━━━━━━━┛      │
32//!       │ replication  │ progress
33//!       │ updates      │ output
34//!       v              v
35//! ```
36//!
37//! # Progress tracking
38//!
39//! In order to avoid causing excessive resource usage in the upstream server it's important to
40//! track the LSN that we have successfully committed to persist and communicate that back to
41//! PostgreSQL. Under normal operation this gauge of progress is provided by the presence of
42//! transactions themselves. Since at a given LSN offset there can be only a single message, when a
43//! transaction is received and processed we can infer that we have seen all the messages that are
44//! not beyond `commit_lsn + 1`.
45//!
46//! Things are a bit more complicated in the absence of transactions though because even though we
47//! don't receive any the server might very well be generating WAL records. This can happen if
48//! there is a separate logical database performing writes (which is the case for RDS databases),
49//! or, in servers running PostgreSQL version 15 or greater, the logical replication process
50//! includes an optimization that omits empty transactions, which can happen if you're only
51//! replicating a subset of the tables and there writes going to the other ones.
52//!
53//! If we fail to detect this situation and don't send LSN feedback in a timely manner the server
54//! will be forced to keep around WAL data that can eventually lead to disk space exhaustion.
55//!
56//! In the absence of transactions the only available piece of information in the replication
57//! stream are keepalive messages. Keepalive messages are documented[1] to contain the current end
58//! of WAL on the server. That is a useless number when it comes to progress tracking because there
59//! might be pending messages at LSNs between the last received commit_lsn and the current end of
60//! WAL.
61//!
62//! Fortunately for us, the documentation for PrimaryKeepalive messages is wrong and it actually
63//! contains the last *sent* LSN[2]. Here sent doesn't necessarily mean sent over the wire, but
64//! sent to the upstream process that is handling producing the logical stream. Therefore, if we
65//! receive a keepalive with a particular LSN we can be certain that there are no other replication
66//! messages at previous LSNs, because they would have been already generated and received. We
67//! therefore connect the keepalive messages directly to our capability.
68//!
69//! [1]: https://www.postgresql.org/docs/15/protocol-replication.html#PROTOCOL-REPLICATION-START-REPLICATION
70//! [2]: https://www.postgresql.org/message-id/CAFPTHDZS9O9WG02EfayBd6oONzK%2BqfUxS6AbVLJ7W%2BKECza2gg%40mail.gmail.com
71
72use std::collections::BTreeMap;
73use std::convert::Infallible;
74use std::pin::pin;
75use std::rc::Rc;
76use std::str::FromStr;
77use std::sync::Arc;
78use std::sync::LazyLock;
79use std::time::Instant;
80use std::time::{Duration, SystemTime, UNIX_EPOCH};
81
82use differential_dataflow::AsCollection;
83use futures::{FutureExt, Stream as AsyncStream, StreamExt, TryStreamExt};
84use mz_dyncfg::ConfigSet;
85use mz_ore::cast::CastFrom;
86use mz_ore::future::InTask;
87use mz_postgres_util::PostgresError;
88use mz_postgres_util::{Client, Sql, execute, query_opt, simple_query_opt, sql};
89use mz_repr::{Datum, DatumVec, Diff, Row};
90use mz_storage_types::dyncfgs::PG_SCHEMA_VALIDATION_INTERVAL;
91use mz_storage_types::dyncfgs::PG_SOURCE_VALIDATE_TIMELINE;
92use mz_storage_types::errors::DataflowError;
93use mz_storage_types::sources::{MzOffset, PostgresSourceConnection};
94use mz_timely_util::builder_async::{
95    AsyncOutputHandle, Event as AsyncEvent, OperatorBuilder as AsyncOperatorBuilder,
96    PressOnDropButton,
97};
98use postgres_replication::LogicalReplicationStream;
99use postgres_replication::protocol::{LogicalReplicationMessage, ReplicationMessage, TupleData};
100use serde::{Deserialize, Serialize};
101use timely::container::CapacityContainerBuilder;
102use timely::dataflow::channels::pact::{Exchange, Pipeline};
103use timely::dataflow::operators::Capability;
104use timely::dataflow::operators::Concat;
105use timely::dataflow::operators::Operator;
106use timely::dataflow::operators::core::Map;
107use timely::dataflow::{Scope, StreamVec};
108use timely::progress::Antichain;
109use tokio::sync::{mpsc, watch};
110use tokio_postgres::error::SqlState;
111use tokio_postgres::types::PgLsn;
112use tracing::{error, trace};
113
114use crate::metrics::source::postgres::PgSourceMetrics;
115use crate::source::RawSourceCreationConfig;
116use crate::source::postgres::verify_schema;
117use crate::source::postgres::{DefiniteError, ReplicationError, SourceOutputInfo, TransientError};
118use crate::source::probe;
119use crate::source::types::{FuelSize, Probe, SignaledFuture, SourceMessage, StackedCollection};
120
121/// A logical replication message from the server.
122type LogicalReplMsg = ReplicationMessage<LogicalReplicationMessage>;
123
124/// A decoded row from a transaction with source information.
125type DecodedRow = (u32, usize, Result<Row, DefiniteError>, Diff);
126
127/// Postgres epoch is 2000-01-01T00:00:00Z
128static PG_EPOCH: LazyLock<SystemTime> =
129    LazyLock::new(|| UNIX_EPOCH + Duration::from_secs(946_684_800));
130
131// A request to rewind a snapshot taken at `snapshot_lsn` to the initial LSN of the replication
132// slot. This is accomplished by emitting `(data, 0, -diff)` for all updates `(data, lsn, diff)`
133// whose `lsn <= snapshot_lsn`. By convention the snapshot is always emitted at LSN 0.
134#[derive(Debug, Clone, Serialize, Deserialize)]
135pub(crate) struct RewindRequest {
136    /// The output index that should be rewound.
137    pub(crate) output_index: usize,
138    /// The LSN that the snapshot was taken at.
139    pub(crate) snapshot_lsn: MzOffset,
140}
141
142/// Renders the replication dataflow. See the module documentation for more information.
143pub(crate) fn render<'scope>(
144    scope: Scope<'scope, MzOffset>,
145    config: RawSourceCreationConfig,
146    connection: PostgresSourceConnection,
147    table_info: BTreeMap<u32, BTreeMap<usize, SourceOutputInfo>>,
148    rewind_stream: StreamVec<'scope, MzOffset, RewindRequest>,
149    slot_ready_stream: StreamVec<'scope, MzOffset, Infallible>,
150    committed_uppers: impl futures::Stream<Item = Antichain<MzOffset>> + 'static,
151    metrics: PgSourceMetrics,
152) -> (
153    StackedCollection<'scope, MzOffset, (usize, Result<SourceMessage, DataflowError>)>,
154    StreamVec<'scope, MzOffset, Probe<MzOffset>>,
155    StreamVec<'scope, MzOffset, ReplicationError>,
156    PressOnDropButton,
157) {
158    let op_name = format!("ReplicationReader({})", config.id);
159    let mut builder = AsyncOperatorBuilder::new(op_name, scope.clone());
160
161    let slot_reader = u64::cast_from(config.responsible_worker("slot"));
162    let (data_output, data_stream) = builder.new_output();
163    let (definite_error_handle, definite_errors) =
164        builder.new_output::<CapacityContainerBuilder<_>>();
165    let (probe_output, probe_stream) = builder.new_output::<CapacityContainerBuilder<_>>();
166
167    let mut rewind_input =
168        builder.new_disconnected_input(rewind_stream, Exchange::new(move |_| slot_reader));
169    let mut slot_ready_input = builder.new_disconnected_input(slot_ready_stream, Pipeline);
170    let output_uppers = table_info
171        .iter()
172        .flat_map(|(_, outputs)| outputs.values().map(|o| o.resume_upper.clone()))
173        .collect::<Vec<_>>();
174    metrics.tables.set(u64::cast_from(output_uppers.len()));
175
176    let reader_table_info = table_info.clone();
177    let (button, transient_errors) = builder.build_fallible(move |caps| {
178        let mut table_info = reader_table_info;
179        let busy_signal = Arc::clone(&config.busy_signal);
180        Box::pin(SignaledFuture::new(busy_signal, async move {
181            let (id, worker_id) = (config.id, config.worker_id);
182            let [data_cap_set, definite_error_cap_set, probe_cap]: &mut [_; 3] =
183                caps.try_into().unwrap();
184
185            if !config.responsible_for("slot") {
186                // Emit 0, to mark this worker as having started up correctly.
187                for stat in config.statistics.values() {
188                    stat.set_offset_known(0);
189                    stat.set_offset_committed(0);
190                }
191                return Ok(());
192            }
193
194            // Determine the slot lsn.
195            let connection_config = connection
196                .connection
197                .config(
198                    &config.config.connection_context.secrets_reader,
199                    &config.config,
200                    InTask::Yes,
201                )
202                .await?;
203
204            let slot = &connection.publication_details.slot;
205            let replication_client = connection_config
206                .connect_replication(&config.config.connection_context.ssh_tunnel_manager)
207                .await?;
208
209            let metadata_client = connection_config
210                .connect(
211                    "replication metadata",
212                    &config.config.connection_context.ssh_tunnel_manager,
213                )
214                .await?;
215            let metadata_client = Arc::new(metadata_client);
216
217            while let Some(_) = slot_ready_input.next().await {
218                // Wait for the slot to be created
219            }
220
221            // The slot is always created by the snapshot operator. If the slot doesn't exist,
222            // when this check runs, this operator will return an error.
223            let slot_metadata = super::fetch_slot_metadata(
224                &*metadata_client,
225                slot,
226                mz_storage_types::dyncfgs::PG_FETCH_SLOT_RESUME_LSN_INTERVAL
227                    .get(config.config.config_set()),
228            )
229            .await?;
230
231            // We're the only application that should be using this replication
232            // slot. The only way that there can be another connection using
233            // this slot under normal operation is if there's a stale TCP
234            // connection from a prior incarnation of the source holding on to
235            // the slot. We don't want to wait for the WAL sender timeout and/or
236            // TCP keepalives to time out that connection, because these values
237            // are generally under the control of the DBA and may not time out
238            // the connection for multiple minutes, or at all. Instead we just
239            // force kill the connection that's using the slot.
240            //
241            // Note that there's a small risk that *we're* the zombie cluster
242            // that should not be using the replication slot. Kubernetes cannot
243            // 100% guarantee that only one cluster is alive at a time. However,
244            // this situation should not last long, and the worst that can
245            // happen is a bit of transient thrashing over ownership of the
246            // replication slot.
247            if let Some(active_pid) = slot_metadata.active_pid {
248                tracing::warn!(
249                    %id, %active_pid,
250                    "replication slot already in use; will attempt to kill existing connection",
251                );
252
253                match execute(
254                    &**metadata_client,
255                    sql!("SELECT pg_terminate_backend($1)"),
256                    &[&active_pid],
257                )
258                .await
259                {
260                    Ok(_) => {
261                        tracing::info!(
262                            "successfully killed existing connection; \
263                            starting replication is likely to succeed"
264                        );
265                        // Note that `pg_terminate_backend` does not wait for
266                        // the termination of the targeted connection to
267                        // complete. We may try to start replication before the
268                        // targeted connection has cleaned up its state. That's
269                        // okay. If that happens we'll just try again from the
270                        // top via the suspend-and-restart flow.
271                    }
272                    Err(e) => {
273                        tracing::warn!(
274                            %e,
275                            "failed to kill existing replication connection; \
276                            replication will likely fail to start"
277                        );
278                        // Continue on anyway, just in case the replication slot
279                        // is actually available. Maybe PostgreSQL has some
280                        // staleness when it reports `active_pid`, for example.
281                    }
282                }
283            }
284
285            // The overall resumption point for this source is the minimum of the resumption points
286            // contributed by each of the outputs.
287            let resume_lsn = output_uppers
288                .iter()
289                .flat_map(|f| f.elements())
290                .map(|&lsn| {
291                    // An output is either an output that has never had data committed to it or one
292                    // that has and needs to resume. We differentiate between the two by checking
293                    // whether an output wishes to "resume" from the minimum timestamp. In that case
294                    // its contribution to the overal resumption point is the earliest point available
295                    // in the slot. This information would normally be something that the storage
296                    // controller figures out in the form of an as-of frontier, but at the moment the
297                    // storage controller does not have visibility into what the replication slot is
298                    // doing.
299                    if lsn == MzOffset::from(0) {
300                        slot_metadata.confirmed_flush_lsn
301                    } else {
302                        lsn
303                    }
304                })
305                .min();
306            let Some(resume_lsn) = resume_lsn else {
307                std::future::pending::<()>().await;
308                return Ok(());
309            };
310            // If we don't set "offset_committed" now, it'll be stuck at 0 (the default value)
311            // until we finish processing the table snapshot. If the snapshot is large, that could be a long time.
312            // This confuses the ingestion lag calculation in the UI, causing it to yield erroneously high values.
313            for stat in config.statistics.values() {
314                stat.set_offset_committed(resume_lsn.offset);
315            }
316            trace!(%id, "timely-{worker_id} replication reader started lsn={resume_lsn}");
317
318            // Emitting an initial probe before we start waiting for rewinds ensures that we will
319            // have a timestamp binding in the remap collection while the snapshot is processed.
320            // This is important because otherwise the snapshot updates would need to be buffered
321            // in the reclock operator, instead of being spilled to S3 in the persist sink.
322            //
323            // Note that we need to fetch the probe LSN _after_ having created the replication
324            // slot, to make sure the fetched LSN will be included in the replication stream.
325            let probe_ts = (config.now_fn)().into();
326            let max_lsn = mz_postgres_util::fetch_max_lsn(
327                &*metadata_client,
328                connection.publication_details.get_is_physical_replica(),
329            )
330            .await?;
331            let probe = Probe {
332                probe_ts,
333                upstream_frontier: Antichain::from_elem(MzOffset::from(max_lsn)),
334            };
335            probe_output.give(&probe_cap[0], probe);
336
337            let mut rewinds = BTreeMap::new();
338            while let Some(event) = rewind_input.next().await {
339                if let AsyncEvent::Data(_, data) = event {
340                    for req in data {
341                        if resume_lsn > req.snapshot_lsn + 1 {
342                            let err = DefiniteError::SlotCompactedPastResumePoint(
343                                req.snapshot_lsn + 1,
344                                resume_lsn,
345                            );
346                            // If the replication stream cannot be obtained from the resume point there is nothing
347                            // else to do. These errors are not retractable.
348                            for (oid, outputs) in table_info.iter() {
349                                for output_index in outputs.keys() {
350                                    // We pick `u64::MAX` as the LSN which will (in practice) never conflict
351                                    // any previously revealed portions of the TVC.
352                                    let update = (
353                                        (
354                                            *oid,
355                                            *output_index,
356                                            Err(DataflowError::from(err.clone())),
357                                        ),
358                                        MzOffset::from(u64::MAX),
359                                        Diff::ONE,
360                                    );
361                                    let size = update.fuel_size();
362                                    data_output
363                                        .give_fueled(&data_cap_set[0], update, size)
364                                        .await;
365                                }
366                            }
367                            definite_error_handle.give(
368                                &definite_error_cap_set[0],
369                                ReplicationError::Definite(Rc::new(err)),
370                            );
371                            return Ok(());
372                        }
373                        rewinds.insert(req.output_index, req);
374                    }
375                }
376            }
377            trace!(%id, "timely-{worker_id} pending rewinds {rewinds:?}");
378
379            let mut committed_uppers = pin!(committed_uppers);
380
381            let stream_result = raw_stream(
382                &config,
383                replication_client,
384                Arc::clone(&metadata_client),
385                &connection.publication_details.slot,
386                &connection.publication_details.timeline_id,
387                &connection.publication,
388                resume_lsn,
389                committed_uppers.as_mut(),
390                &probe_output,
391                &probe_cap[0],
392                connection.publication_details.get_is_physical_replica(),
393            )
394            .await?;
395
396            let stream = match stream_result {
397                Ok(stream) => stream,
398                Err(err) => {
399                    // If the replication stream cannot be obtained in a definite way there is
400                    // nothing else to do. These errors are not retractable.
401                    for (oid, outputs) in table_info.iter() {
402                        for output_index in outputs.keys() {
403                            // We pick `u64::MAX` as the LSN which will (in practice) never conflict
404                            // any previously revealed portions of the TVC.
405                            let update = (
406                                (*oid, *output_index, Err(DataflowError::from(err.clone()))),
407                                MzOffset::from(u64::MAX),
408                                Diff::ONE,
409                            );
410                            let size = update.fuel_size();
411                            data_output
412                                .give_fueled(&data_cap_set[0], update, size)
413                                .await;
414                        }
415                    }
416
417                    definite_error_handle.give(
418                        &definite_error_cap_set[0],
419                        ReplicationError::Definite(Rc::new(err)),
420                    );
421                    return Ok(());
422                }
423            };
424            let mut stream = pin!(stream.peekable());
425
426            // Run the periodic schema validation on a separate task using a separate client,
427            // to prevent it from blocking the replication reading progress.
428            let ssh_tunnel_manager = &config.config.connection_context.ssh_tunnel_manager;
429            let client = connection_config
430                .connect("schema validation", ssh_tunnel_manager)
431                .await?;
432            let mut schema_errors = spawn_schema_validator(
433                client,
434                &config,
435                connection.publication.clone(),
436                table_info.clone(),
437                connection.publication_details.get_is_physical_replica(),
438            );
439
440            // Instead of downgrading the capability for every transaction we process we only do it
441            // if we're about to yield, which is checked at the bottom of the loop. This avoids
442            // creating excessive progress tracking traffic when there are multiple small
443            // transactions ready to go.
444            let mut data_upper = resume_lsn;
445            while let Some(event) = stream.as_mut().next().await {
446                use LogicalReplicationMessage::*;
447                use ReplicationMessage::*;
448                match event {
449                    Ok(XLogData(data)) => match data.data() {
450                        Begin(begin) => {
451                            let commit_lsn = MzOffset::from(begin.final_lsn());
452
453                            let mut tx = pin!(extract_transaction(
454                                stream.by_ref(),
455                                &*metadata_client,
456                                commit_lsn,
457                                &mut table_info,
458                                &metrics,
459                                &connection.publication,
460                            ));
461
462                            trace!(
463                                %id,
464                                "timely-{worker_id} extracting transaction \
465                                    at {commit_lsn}"
466                            );
467                            assert!(
468                                data_upper <= commit_lsn,
469                                "new_upper={data_upper} tx_lsn={commit_lsn}",
470                            );
471                            data_upper = commit_lsn + 1;
472                            while let Some((oid, output_index, event, diff)) = tx.try_next().await?
473                            {
474                                let event = event.map_err(Into::into);
475                                let data = (oid, output_index, event);
476                                if let Some(req) = rewinds.get(&output_index) {
477                                    if commit_lsn <= req.snapshot_lsn {
478                                        let update = (data.clone(), MzOffset::from(0), -diff);
479                                        let size = update.fuel_size();
480                                        data_output
481                                            .give_fueled(&data_cap_set[0], update, size)
482                                            .await;
483                                    }
484                                }
485                                let update = (data, commit_lsn, diff);
486                                let size = update.fuel_size();
487                                data_output
488                                    .give_fueled(&data_cap_set[0], update, size)
489                                    .await;
490                            }
491                        }
492                        _ => return Err(TransientError::BareTransactionEvent),
493                    },
494                    Ok(PrimaryKeepAlive(keepalive)) => {
495                        trace!( %id,
496                            "timely-{worker_id} received keepalive lsn={}",
497                            keepalive.wal_end()
498                        );
499
500                        // Take the opportunity to report any schema validation errors.
501                        while let Ok(error) = schema_errors.try_recv() {
502                            use SchemaValidationError::*;
503                            match error {
504                                Postgres(PostgresError::PublicationMissing(publication)) => {
505                                    let err = DefiniteError::PublicationDropped(publication);
506                                    for (oid, outputs) in table_info.iter() {
507                                        for output_index in outputs.keys() {
508                                            let update = (
509                                                (
510                                                    *oid,
511                                                    *output_index,
512                                                    Err(DataflowError::from(err.clone())),
513                                                ),
514                                                data_cap_set[0].time().clone(),
515                                                Diff::ONE,
516                                            );
517                                            let size = update.fuel_size();
518                                            data_output
519                                                .give_fueled(&data_cap_set[0], update, size)
520                                                .await;
521                                        }
522                                    }
523                                    definite_error_handle.give(
524                                        &definite_error_cap_set[0],
525                                        ReplicationError::Definite(Rc::new(err)),
526                                    );
527                                    return Ok(());
528                                }
529                                Postgres(pg_error) => Err(TransientError::from(pg_error))?,
530                                PhysicalReplicaChanged { expected, actual } => {
531                                    // The upstream is no longer the kind of node the
532                                    // source was created against (e.g. a physical
533                                    // replica was promoted to a primary). Logical
534                                    // decoding cannot safely continue, so stall with a
535                                    // definite, non-retryable error.
536                                    let err =
537                                        DefiniteError::InvalidPhysicalReplica { expected, actual };
538                                    for (oid, outputs) in table_info.iter() {
539                                        for output_index in outputs.keys() {
540                                            let update = (
541                                                (
542                                                    *oid,
543                                                    *output_index,
544                                                    Err(DataflowError::from(err.clone())),
545                                                ),
546                                                // We don't have a clean way to align on when the replica changed so jump straight to u64::MAX to avoid conflicts.
547                                                MzOffset::from(u64::MAX),
548                                                Diff::ONE,
549                                            );
550                                            let size = update.fuel_size();
551                                            data_output
552                                                .give_fueled(&data_cap_set[0], update, size)
553                                                .await;
554                                        }
555                                    }
556                                    definite_error_handle.give(
557                                        &definite_error_cap_set[0],
558                                        ReplicationError::Definite(Rc::new(err)),
559                                    );
560                                    return Ok(());
561                                }
562                                Schema {
563                                    oid,
564                                    output_index,
565                                    error,
566                                } => {
567                                    let table = table_info.get_mut(&oid).unwrap();
568                                    if table.remove(&output_index).is_none() {
569                                        continue;
570                                    }
571
572                                    let update = (
573                                        (oid, output_index, Err(error.into())),
574                                        data_cap_set[0].time().clone(),
575                                        Diff::ONE,
576                                    );
577                                    let size = update.fuel_size();
578                                    data_output
579                                        .give_fueled(&data_cap_set[0], update, size)
580                                        .await;
581                                }
582                            }
583                        }
584                        data_upper = std::cmp::max(data_upper, keepalive.wal_end().into());
585                    }
586                    Ok(_) => return Err(TransientError::UnknownReplicationMessage),
587                    Err(err) => return Err(err),
588                }
589
590                let will_yield = stream.as_mut().peek().now_or_never().is_none();
591                if will_yield {
592                    trace!(%id, "timely-{worker_id} yielding at lsn={data_upper}");
593                    rewinds.retain(|_, req| data_upper <= req.snapshot_lsn);
594                    // As long as there are pending rewinds we can't downgrade our data capability
595                    // since we must be able to produce data at offset 0.
596                    if rewinds.is_empty() {
597                        data_cap_set.downgrade([&data_upper]);
598                    }
599                }
600            }
601            // We never expect the replication stream to gracefully end
602            Err(TransientError::ReplicationEOF)
603        }))
604    });
605
606    // We now process the slot updates and apply the cast expressions
607    let mut final_row = Row::default();
608    let mut datum_vec = DatumVec::new();
609    let mut next_worker = (0..u64::cast_from(scope.peers()))
610        // Round robin on 1000-records basis to avoid creating tiny containers when there are a
611        // small number of updates and a large number of workers.
612        .flat_map(|w| std::iter::repeat_n(w, 1000))
613        .cycle();
614    let round_robin = Exchange::new(move |_| next_worker.next().unwrap());
615    let replication_updates = data_stream
616        .unary(round_robin, "PgCastReplicationRows", |_, _| {
617            move |input, output| {
618                input.for_each_time(|time, data| {
619                    let mut session = output.session(&time);
620                    for ((oid, output_index, event), time, diff) in
621                        data.flat_map(|data| data.drain(..))
622                    {
623                        let output = &table_info
624                            .get(&oid)
625                            .and_then(|outputs| outputs.get(&output_index))
626                            .expect("table_info contains all outputs");
627                        let event = event.and_then(|row| {
628                            let datums = datum_vec.borrow_with(&row);
629                            super::cast_row(&output.casts, &datums, &mut final_row)?;
630                            Ok(SourceMessage {
631                                key: Row::default(),
632                                value: final_row.clone(),
633                                metadata: Row::default(),
634                            })
635                        });
636
637                        session.give(((output_index, event), time, diff));
638                    }
639                });
640            }
641        })
642        .as_collection();
643
644    let errors = definite_errors.concat(transient_errors.map(ReplicationError::from));
645
646    (
647        replication_updates,
648        probe_stream,
649        errors,
650        button.press_on_drop(),
651    )
652}
653
654/// Produces the logical replication stream while taking care of regularly sending standby
655/// keepalive messages with the provided `uppers` stream.
656///
657/// The returned stream will contain all transactions that whose commit LSN is beyond `resume_lsn`.
658async fn raw_stream<'a>(
659    config: &'a RawSourceCreationConfig,
660    replication_client: Client,
661    metadata_client: Arc<Client>,
662    slot: &'a str,
663    timeline_id: &'a Option<u64>,
664    publication: &'a str,
665    resume_lsn: MzOffset,
666    uppers: impl futures::Stream<Item = Antichain<MzOffset>> + 'a,
667    probe_output: &'a AsyncOutputHandle<MzOffset, CapacityContainerBuilder<Vec<Probe<MzOffset>>>>,
668    probe_cap: &'a Capability<MzOffset>,
669    is_physical_replica: bool,
670) -> Result<
671    Result<impl AsyncStream<Item = Result<LogicalReplMsg, TransientError>> + 'a, DefiniteError>,
672    TransientError,
673> {
674    if let Err(err) = ensure_publication_exists(&*metadata_client, publication).await? {
675        // If the publication gets deleted there is nothing else to do. These errors
676        // are not retractable.
677        return Ok(Err(err));
678    }
679
680    // Ensure the upstream server's physical replica status hasn't changed since the source was
681    // created. We use the metadata client here for the same reason as "SHOW wal_sender_timeout" below.
682    //
683    // This runs before the timeline ID check on purpose. Promoting a physical replica to a primary
684    // both flips pg_is_in_recovery() and switches the timeline, so either check could fire. Running
685    // this one first means a promotion always surfaces as the specific InvalidPhysicalReplica error
686    // (rather than the more generic timeline mismatch), which is the clearer signal for an operator.
687    if let Err(err) = ensure_physical_replica(&*metadata_client, is_physical_replica).await? {
688        return Ok(Err(err));
689    }
690
691    // Skip the timeline ID check for sources without a known timeline ID
692    // (sources created before the timeline ID was added to the source details)
693    if let Some(expected_timeline_id) = timeline_id {
694        if let Err(err) = ensure_replication_timeline_id(
695            &replication_client,
696            expected_timeline_id,
697            config.config.config_set(),
698        )
699        .await?
700        {
701            return Ok(Err(err));
702        }
703    }
704
705    // How often a proactive standby status update message should be sent to the server.
706    //
707    // The upstream will periodically request status updates by setting the keepalive's reply field
708    // value to 1. However, we cannot rely on these messages arriving on time. For example, when
709    // the upstream is sending a big transaction its keepalive messages are queued and can be
710    // delayed arbitrarily.
711    //
712    // See: <https://www.postgresql.org/message-id/CAMsr+YE2dSfHVr7iEv1GSPZihitWX-PMkD9QALEGcTYa+sdsgg@mail.gmail.com>
713    //
714    // For this reason we query the server's timeout value and proactively send a keepalive at
715    // twice the frequency to have a healthy margin from the deadline.
716    //
717    // Note: We must use the metadata client here which is NOT in replication mode. Some Aurora
718    // Postgres versions disallow SHOW commands from within replication connection.
719    // See: https://github.com/readysettech/readyset/discussions/28#discussioncomment-4405671
720    let row = simple_query_opt(&*metadata_client, sql!("SHOW wal_sender_timeout;"))
721        .await?
722        .unwrap();
723    let wal_sender_timeout = match row.get("wal_sender_timeout") {
724        // When this parameter is zero the timeout mechanism is disabled
725        Some("0") => None,
726        Some(value) => Some(
727            mz_repr::adt::interval::Interval::from_str(value)
728                .unwrap()
729                .duration()
730                .unwrap(),
731        ),
732        None => panic!("ubiquitous parameter missing"),
733    };
734
735    // This interval controls the cadence at which we send back status updates and, crucially,
736    // request PrimaryKeepAlive messages. PrimaryKeepAlive messages drive the frontier forward in
737    // the absence of data updates and we don't want a large `wal_sender_timeout` value to slow us
738    // down. For this reason the feedback interval is set to one second, or less if the
739    // wal_sender_timeout is less than 2 seconds.
740    let feedback_interval = match wal_sender_timeout {
741        Some(t) => std::cmp::min(Duration::from_secs(1), t.checked_div(2).unwrap()),
742        None => Duration::from_secs(1),
743    };
744
745    let mut feedback_timer = tokio::time::interval(feedback_interval);
746    // 'Delay' ensures we always tick at least 'feedback_interval'.
747    feedback_timer.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
748
749    // Postgres will return all transactions that commit *at or after* after the provided LSN,
750    // following the timely upper semantics.
751    let lsn = PgLsn::from(resume_lsn.offset);
752    let query = sql!(
753        "START_REPLICATION SLOT {} LOGICAL {} (\"proto_version\" '1', \"publication_names\" {})",
754        Sql::ident(slot),
755        Sql::raw_unchecked(lsn.to_string()),
756        Sql::literal(publication)
757    );
758    let copy_stream = match replication_client.copy_both_simple(query.as_str()).await {
759        Ok(copy_stream) => copy_stream,
760        Err(err) if err.code() == Some(&SqlState::OBJECT_NOT_IN_PREREQUISITE_STATE) => {
761            return Ok(Err(DefiniteError::InvalidReplicationSlot));
762        }
763        Err(err) => return Err(err.into()),
764    };
765
766    // According to the documentation [1] we must check that the slot LSN matches our
767    // expectations otherwise we risk getting silently fast-forwarded to a future LSN. In order
768    // to avoid a TOCTOU issue we must do this check after starting the replication stream. We
769    // cannot use the replication client to do that because it's already in CopyBoth mode.
770    // [1] https://www.postgresql.org/docs/15/protocol-replication.html#PROTOCOL-REPLICATION-START-REPLICATION-SLOT-LOGICAL
771    let slot_metadata = super::fetch_slot_metadata(
772        &*metadata_client,
773        slot,
774        mz_storage_types::dyncfgs::PG_FETCH_SLOT_RESUME_LSN_INTERVAL
775            .get(config.config.config_set()),
776    )
777    .await?;
778    let min_resume_lsn = slot_metadata.confirmed_flush_lsn;
779    tracing::info!(
780        %config.id,
781        "started replication using backend PID={:?}. wal_sender_timeout={:?}",
782        slot_metadata.active_pid, wal_sender_timeout
783    );
784
785    let (probe_tx, mut probe_rx) = watch::channel(None);
786    let timestamp_interval = config.timestamp_interval;
787    let now_fn = config.now_fn.clone();
788    let max_lsn_task_handle =
789        mz_ore::task::spawn(|| format!("pg_current_wal_lsn:{}", config.id), async move {
790            let mut probe_ticker = probe::Ticker::new(move || timestamp_interval, now_fn);
791
792            while !probe_tx.is_closed() {
793                let probe_ts = probe_ticker.tick().await;
794                let probe_or_err =
795                    mz_postgres_util::fetch_max_lsn(&*metadata_client, is_physical_replica)
796                        .await
797                        .map(|lsn| Probe {
798                            probe_ts,
799                            upstream_frontier: Antichain::from_elem(MzOffset::from(lsn)),
800                        });
801                let _ = probe_tx.send(Some(probe_or_err));
802            }
803        })
804        .abort_on_drop();
805
806    let stream = async_stream::try_stream!({
807        // Ensure we don't pre-drop the task
808        let _max_lsn_task_handle = max_lsn_task_handle;
809
810        // ensure we don't drop the replication client!
811        let _replication_client = replication_client;
812
813        let mut uppers = pin!(uppers);
814        let mut last_committed_upper = resume_lsn;
815
816        let mut stream = pin!(LogicalReplicationStream::new(copy_stream));
817
818        if !(resume_lsn == MzOffset::from(0) || min_resume_lsn <= resume_lsn) {
819            let err = TransientError::OvercompactedReplicationSlot {
820                available_lsn: min_resume_lsn,
821                requested_lsn: resume_lsn,
822            };
823            error!("timely-{} ({}) {err}", config.worker_id, config.id);
824            Err(err)?;
825        }
826
827        loop {
828            tokio::select! {
829                Some(next_message) = stream.next() => match next_message {
830                    Ok(ReplicationMessage::XLogData(data)) => {
831                        yield ReplicationMessage::XLogData(data);
832                        Ok(())
833                    }
834                    Ok(ReplicationMessage::PrimaryKeepAlive(keepalive)) => {
835                        yield ReplicationMessage::PrimaryKeepAlive(keepalive);
836                        Ok(())
837                    }
838                    Err(err) => Err(err.into()),
839                    _ => Err(TransientError::UnknownReplicationMessage),
840                },
841                _ = feedback_timer.tick() => {
842                    let ts: i64 = PG_EPOCH.elapsed().unwrap().as_micros().try_into().unwrap();
843                    let lsn = PgLsn::from(last_committed_upper.offset);
844                    trace!("timely-{} ({}) sending keepalive {lsn:?}", config.worker_id, config.id);
845                    // Postgres only sends PrimaryKeepAlive messages when *it* wants a reply, which
846                    // happens when out status update is late. Since we send them proactively this
847                    // may never happen. It is therefore *crucial* that we set the last parameter
848                    // (the reply flag) to 1 here. This will cause the upstream server to send us a
849                    // PrimaryKeepAlive message promptly which will give us frontier advancement
850                    // information in the absence of data updates.
851                    let res = stream.as_mut().standby_status_update(lsn, lsn, lsn, ts, 1).await;
852                    res.map_err(|e| e.into())
853                },
854                Some(upper) = uppers.next() => match upper.into_option() {
855                    Some(lsn) => {
856                        if last_committed_upper < lsn {
857                            last_committed_upper = lsn;
858                            for stat in config.statistics.values() {
859                                stat.set_offset_committed(last_committed_upper.offset);
860                            }
861                        }
862                        Ok(())
863                    }
864                    None => Ok(()),
865                },
866                Ok(()) = probe_rx.changed() => match &*probe_rx.borrow() {
867                    Some(Ok(probe)) => {
868                        if let Some(offset_known) = probe.upstream_frontier.as_option() {
869                            for stat in config.statistics.values() {
870                                stat.set_offset_known(offset_known.offset);
871                            }
872                        }
873                        probe_output.give(probe_cap, probe);
874                        Ok(())
875                    },
876                    Some(Err(err)) => Err(anyhow::anyhow!("{err}").into()),
877                    None => Ok(()),
878                },
879                else => return
880            }?;
881        }
882    });
883    Ok(Ok(stream))
884}
885
886/// Extracts a single transaction from the replication stream delimited by a BEGIN and COMMIT
887/// message. The BEGIN message must have already been consumed from the stream before calling this
888/// function.
889fn extract_transaction<'a>(
890    stream: impl AsyncStream<Item = Result<LogicalReplMsg, TransientError>> + 'a,
891    metadata_client: &'a Client,
892    commit_lsn: MzOffset,
893    table_info: &'a mut BTreeMap<u32, BTreeMap<usize, SourceOutputInfo>>,
894    metrics: &'a PgSourceMetrics,
895    publication: &'a str,
896) -> impl AsyncStream<Item = Result<DecodedRow, TransientError>> + 'a {
897    use LogicalReplicationMessage::*;
898    let mut row = Row::default();
899    async_stream::try_stream!({
900        let mut stream = pin!(stream);
901        metrics.transactions.inc();
902        metrics.lsn.set(commit_lsn.offset);
903        while let Some(event) = stream.try_next().await? {
904            // We can ignore keepalive messages while processing a transaction because the
905            // commit_lsn will drive progress.
906            let message = match event {
907                ReplicationMessage::XLogData(data) => data.into_data(),
908                ReplicationMessage::PrimaryKeepAlive(_) => {
909                    metrics.ignored.inc();
910                    continue;
911                }
912                _ => Err(TransientError::UnknownReplicationMessage)?,
913            };
914            metrics.total.inc();
915            match message {
916                Insert(body) if !table_info.contains_key(&body.rel_id()) => metrics.ignored.inc(),
917                Update(body) if !table_info.contains_key(&body.rel_id()) => metrics.ignored.inc(),
918                Delete(body) if !table_info.contains_key(&body.rel_id()) => metrics.ignored.inc(),
919                Relation(body) if !table_info.contains_key(&body.rel_id()) => metrics.ignored.inc(),
920                Insert(body) => {
921                    metrics.inserts.inc();
922                    let rel = body.rel_id();
923                    for (output, info) in table_info.get(&rel).into_iter().flatten() {
924                        let tuple_data = body.tuple().tuple_data();
925                        let Some(ref projection) = info.projection else {
926                            panic!("missing projection for {rel}");
927                        };
928                        let datums = projection.iter().map(|idx| &tuple_data[*idx]);
929                        let row = unpack_tuple(datums, &mut row);
930                        yield (rel, *output, row, Diff::ONE);
931                    }
932                }
933                Update(body) => match body.old_tuple() {
934                    Some(old_tuple) => {
935                        metrics.updates.inc();
936                        let new_tuple = body.new_tuple();
937                        let rel = body.rel_id();
938                        for (output, info) in table_info.get(&rel).into_iter().flatten() {
939                            let Some(ref projection) = info.projection else {
940                                panic!("missing projection for {rel}");
941                            };
942                            let old_tuple =
943                                projection.iter().map(|idx| &old_tuple.tuple_data()[*idx]);
944                            // If the new tuple contains unchanged toast values we reference the old ones
945                            let new_tuple = std::iter::zip(
946                                projection.iter().map(|idx| &new_tuple.tuple_data()[*idx]),
947                                old_tuple.clone(),
948                            )
949                            .map(|(new, old)| match new {
950                                TupleData::UnchangedToast => old,
951                                _ => new,
952                            });
953                            let old_row = unpack_tuple(old_tuple, &mut row);
954                            let new_row = unpack_tuple(new_tuple, &mut row);
955
956                            yield (rel, *output, old_row, Diff::MINUS_ONE);
957                            yield (rel, *output, new_row, Diff::ONE);
958                        }
959                    }
960                    None => {
961                        let rel = body.rel_id();
962                        for (output, _) in table_info.get(&rel).into_iter().flatten() {
963                            yield (
964                                rel,
965                                *output,
966                                Err(DefiniteError::DefaultReplicaIdentity),
967                                Diff::ONE,
968                            );
969                        }
970                    }
971                },
972                Delete(body) => match body.old_tuple() {
973                    Some(old_tuple) => {
974                        metrics.deletes.inc();
975                        let rel = body.rel_id();
976                        for (output, info) in table_info.get(&rel).into_iter().flatten() {
977                            let Some(ref projection) = info.projection else {
978                                panic!("missing projection for {rel}");
979                            };
980                            let datums = projection.iter().map(|idx| &old_tuple.tuple_data()[*idx]);
981                            let row = unpack_tuple(datums, &mut row);
982                            yield (rel, *output, row, Diff::MINUS_ONE);
983                        }
984                    }
985                    None => {
986                        let rel = body.rel_id();
987                        for (output, _) in table_info.get(&rel).into_iter().flatten() {
988                            yield (
989                                rel,
990                                *output,
991                                Err(DefiniteError::DefaultReplicaIdentity),
992                                Diff::ONE,
993                            );
994                        }
995                    }
996                },
997                Relation(body) => {
998                    let rel_id = body.rel_id();
999                    if let Some(outputs) = table_info.get_mut(&body.rel_id()) {
1000                        // Because the replication stream doesn't include columns' attnums, we need
1001                        // to check the current local schema against the current remote schema to
1002                        // ensure e.g. we haven't received a schema update with the same terminal
1003                        // column name which is actually a different column.
1004                        let upstream_info = mz_postgres_util::publication_info(
1005                            metadata_client,
1006                            publication,
1007                            Some(&[rel_id]),
1008                        )
1009                        .await?;
1010
1011                        let mut schema_errors = vec![];
1012
1013                        outputs.retain(|output_index, info| {
1014                            match verify_schema(rel_id, info, &upstream_info) {
1015                                Ok(()) => true,
1016                                Err(err) => {
1017                                    schema_errors.push((
1018                                        rel_id,
1019                                        *output_index,
1020                                        Err(err),
1021                                        Diff::ONE,
1022                                    ));
1023                                    false
1024                                }
1025                            }
1026                        });
1027                        // Recalculate projection vector for the retained valid outputs. Here we
1028                        // must use the column names in the RelationBody message and not the
1029                        // upstream_info obtained above, since that one represents the current
1030                        // schema upstream which may be many versions head of the one we're about
1031                        // to receive after this Relation message.
1032                        let column_positions: BTreeMap<_, _> = body
1033                            .columns()
1034                            .iter()
1035                            .enumerate()
1036                            .map(|(idx, col)| (col.name().unwrap(), idx))
1037                            .collect();
1038                        for info in outputs.values_mut() {
1039                            let mut projection = vec![];
1040                            for col in info.desc.columns.iter() {
1041                                projection.push(column_positions[&*col.name]);
1042                            }
1043                            info.projection = Some(projection);
1044                        }
1045                        for schema_error in schema_errors {
1046                            yield schema_error;
1047                        }
1048                    }
1049                }
1050                Truncate(body) => {
1051                    for &rel_id in body.rel_ids() {
1052                        if let Some(outputs) = table_info.get_mut(&rel_id) {
1053                            for (output, _) in std::mem::take(outputs) {
1054                                yield (
1055                                    rel_id,
1056                                    output,
1057                                    Err(DefiniteError::TableTruncated),
1058                                    Diff::ONE,
1059                                );
1060                            }
1061                        }
1062                    }
1063                }
1064                Commit(body) => {
1065                    if commit_lsn != body.commit_lsn().into() {
1066                        Err(TransientError::InvalidTransaction)?
1067                    }
1068                    return;
1069                }
1070                // TODO: We should handle origin messages and emit an error as they indicate that
1071                // the upstream performed a point in time restore so all bets are off about the
1072                // continuity of the stream.
1073                Origin(_) | Type(_) => metrics.ignored.inc(),
1074                Begin(_) => Err(TransientError::NestedTransaction)?,
1075                // The enum is marked as non_exhaustive. Better to be conservative
1076                _ => Err(TransientError::UnknownLogicalReplicationMessage)?,
1077            }
1078        }
1079        Err(TransientError::ReplicationEOF)?;
1080    })
1081}
1082
1083/// Unpacks an iterator of TupleData into a list of nullable bytes or an error if this can't be
1084/// done.
1085#[inline]
1086fn unpack_tuple<'a, I>(tuple_data: I, row: &mut Row) -> Result<Row, DefiniteError>
1087where
1088    I: IntoIterator<Item = &'a TupleData>,
1089    I::IntoIter: ExactSizeIterator,
1090{
1091    let iter = tuple_data.into_iter();
1092    let mut packer = row.packer();
1093    for data in iter {
1094        let datum = match data {
1095            TupleData::Text(bytes) => super::decode_utf8_text(bytes)?,
1096            TupleData::Null => Datum::Null,
1097            TupleData::UnchangedToast => return Err(DefiniteError::MissingToast),
1098            TupleData::Binary(_) => return Err(DefiniteError::UnexpectedBinaryData),
1099        };
1100        packer.push(datum);
1101    }
1102    Ok(row.clone())
1103}
1104
1105/// Ensures the publication exists on the server. It returns an outer transient error in case of
1106/// connection issues and an inner definite error if the publication is dropped.
1107async fn ensure_publication_exists(
1108    client: &Client,
1109    publication: &str,
1110) -> Result<Result<(), DefiniteError>, TransientError> {
1111    // Figure out the last written LSN and then add one to convert it into an upper.
1112    let result = query_opt(
1113        &**client,
1114        sql!("SELECT 1 FROM pg_publication WHERE pubname = $1;"),
1115        &[&publication],
1116    )
1117    .await?;
1118    match result {
1119        Some(_) => Ok(Ok(())),
1120        None => Ok(Err(DefiniteError::PublicationDropped(
1121            publication.to_owned(),
1122        ))),
1123    }
1124}
1125
1126/// Ensure the active replication timeline_id matches the one we expect such that we can safely
1127/// resume replication. It returns an outer transient error in case of
1128/// connection issues and an inner definite error if the timeline id does not match.
1129async fn ensure_replication_timeline_id(
1130    replication_client: &Client,
1131    expected_timeline_id: &u64,
1132    config_set: &ConfigSet,
1133) -> Result<Result<(), DefiniteError>, TransientError> {
1134    let timeline_id = mz_postgres_util::get_timeline_id(replication_client).await?;
1135    if timeline_id == *expected_timeline_id {
1136        Ok(Ok(()))
1137    } else {
1138        if PG_SOURCE_VALIDATE_TIMELINE.get(config_set) {
1139            Ok(Err(DefiniteError::InvalidTimelineId {
1140                expected: *expected_timeline_id,
1141                actual: timeline_id,
1142            }))
1143        } else {
1144            tracing::warn!(
1145                "Timeline ID mismatch ignored: expected={expected_timeline_id} actual={timeline_id}"
1146            );
1147            Ok(Ok(()))
1148        }
1149    }
1150}
1151
1152async fn ensure_physical_replica(
1153    metadata_client: &Client,
1154    expected_is_physical_replica: bool,
1155) -> Result<Result<(), DefiniteError>, TransientError> {
1156    let is_physical_replica = mz_postgres_util::get_is_in_recovery(metadata_client).await?;
1157    if is_physical_replica == expected_is_physical_replica {
1158        Ok(Ok(()))
1159    } else {
1160        Ok(Err(DefiniteError::InvalidPhysicalReplica {
1161            expected: expected_is_physical_replica,
1162            actual: is_physical_replica,
1163        }))
1164    }
1165}
1166
1167enum SchemaValidationError {
1168    Postgres(PostgresError),
1169    Schema {
1170        oid: u32,
1171        output_index: usize,
1172        error: DefiniteError,
1173    },
1174    /// The upstream's physical-replica status changed out from under us (e.g. a
1175    /// physical replica was promoted to a primary). `expected` is the status the
1176    /// source was created against; `actual` is what the upstream reports now.
1177    PhysicalReplicaChanged {
1178        expected: bool,
1179        actual: bool,
1180    },
1181}
1182
1183fn spawn_schema_validator(
1184    client: Client,
1185    config: &RawSourceCreationConfig,
1186    publication: String,
1187    table_info: BTreeMap<u32, BTreeMap<usize, SourceOutputInfo>>,
1188    is_physical_replica: bool,
1189) -> mpsc::UnboundedReceiver<SchemaValidationError> {
1190    let (tx, rx) = mpsc::unbounded_channel();
1191    let source_id = config.id;
1192    let config_set = Arc::clone(config.config.config_set());
1193
1194    mz_ore::task::spawn(|| format!("schema-validator:{}", source_id), async move {
1195        while !tx.is_closed() {
1196            trace!(%source_id, "validating schemas");
1197
1198            let validation_start = Instant::now();
1199
1200            // Detect a physical replica being promoted to a primary (or vice
1201            // versa) while the stream is live. The startup check in raw_stream
1202            // only catches this across a restart, so we re-check periodically
1203            // here to surface a definite error proactively.
1204            match mz_postgres_util::get_is_in_recovery(&*client).await {
1205                Ok(actual) if actual != is_physical_replica => {
1206                    let _ = tx.send(SchemaValidationError::PhysicalReplicaChanged {
1207                        expected: is_physical_replica,
1208                        actual,
1209                    });
1210                }
1211                Ok(_) => {}
1212                Err(error) => {
1213                    let _ = tx.send(SchemaValidationError::Postgres(error));
1214                    continue;
1215                }
1216            }
1217
1218            let upstream_info = match mz_postgres_util::publication_info(
1219                &*client,
1220                &publication,
1221                Some(&table_info.keys().copied().collect::<Vec<_>>()),
1222            )
1223            .await
1224            {
1225                Ok(info) => info,
1226                Err(error) => {
1227                    let _ = tx.send(SchemaValidationError::Postgres(error));
1228                    continue;
1229                }
1230            };
1231
1232            for (&oid, outputs) in table_info.iter() {
1233                for (&output_index, info) in outputs {
1234                    if let Err(error) = verify_schema(oid, info, &upstream_info) {
1235                        trace!(
1236                            %source_id,
1237                            "schema of output index {output_index} for oid {oid} invalid",
1238                        );
1239                        let _ = tx.send(SchemaValidationError::Schema {
1240                            oid,
1241                            output_index,
1242                            error,
1243                        });
1244                    } else {
1245                        trace!(
1246                            %source_id,
1247                            "schema of output index {output_index} for oid {oid} valid",
1248                        );
1249                    }
1250                }
1251            }
1252
1253            let interval = PG_SCHEMA_VALIDATION_INTERVAL.get(&config_set);
1254            let elapsed = validation_start.elapsed();
1255            let wait = interval.saturating_sub(elapsed);
1256            tokio::time::sleep(wait).await;
1257        }
1258    });
1259
1260    rx
1261}