mz_storage/source/postgres/replication.rs
1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Renders the logical replication side of the [`PostgresSourceConnection`] ingestion dataflow.
11//!
12//! ```text
13//! o
14//! │rewind
15//! │requests
16//! ╭───┴────╮
17//! │exchange│ (collect all requests to one worker)
18//! ╰───┬────╯
19//! ┏━━v━━━━━━━━━━┓
20//! ┃ replication ┃ (single worker)
21//! ┃ reader ┃
22//! ┗━┯━━━━━━━━┯━━┛
23//! │raw │
24//! │data │
25//! ╭────┴─────╮ │
26//! │distribute│ │ (distribute to all workers)
27//! ╰────┬─────╯ │
28//! ┏━━━━━━━━━━━┷━┓ │
29//! ┃ replication ┃ │ (parallel decode)
30//! ┃ decoder ┃ │
31//! ┗━━━━━┯━━━━━━━┛ │
32//! │ replication │ progress
33//! │ updates │ output
34//! v v
35//! ```
36//!
37//! # Progress tracking
38//!
39//! In order to avoid causing excessive resource usage in the upstream server it's important to
40//! track the LSN that we have successfully committed to persist and communicate that back to
41//! PostgreSQL. Under normal operation this gauge of progress is provided by the presence of
42//! transactions themselves. Since at a given LSN offset there can be only a single message, when a
43//! transaction is received and processed we can infer that we have seen all the messages that are
44//! not beyond `commit_lsn + 1`.
45//!
46//! Things are a bit more complicated in the absence of transactions though because even though we
47//! don't receive any the server might very well be generating WAL records. This can happen if
48//! there is a separate logical database performing writes (which is the case for RDS databases),
49//! or, in servers running PostgreSQL version 15 or greater, the logical replication process
50//! includes an optimization that omits empty transactions, which can happen if you're only
51//! replicating a subset of the tables and there writes going to the other ones.
52//!
53//! If we fail to detect this situation and don't send LSN feedback in a timely manner the server
54//! will be forced to keep around WAL data that can eventually lead to disk space exhaustion.
55//!
56//! In the absence of transactions the only available piece of information in the replication
57//! stream are keepalive messages. Keepalive messages are documented[1] to contain the current end
58//! of WAL on the server. That is a useless number when it comes to progress tracking because there
59//! might be pending messages at LSNs between the last received commit_lsn and the current end of
60//! WAL.
61//!
62//! Fortunately for us, the documentation for PrimaryKeepalive messages is wrong and it actually
63//! contains the last *sent* LSN[2]. Here sent doesn't necessarily mean sent over the wire, but
64//! sent to the upstream process that is handling producing the logical stream. Therefore, if we
65//! receive a keepalive with a particular LSN we can be certain that there are no other replication
66//! messages at previous LSNs, because they would have been already generated and received. We
67//! therefore connect the keepalive messages directly to our capability.
68//!
69//! [1]: https://www.postgresql.org/docs/15/protocol-replication.html#PROTOCOL-REPLICATION-START-REPLICATION
70//! [2]: https://www.postgresql.org/message-id/CAFPTHDZS9O9WG02EfayBd6oONzK%2BqfUxS6AbVLJ7W%2BKECza2gg%40mail.gmail.com
71
72use std::collections::BTreeMap;
73use std::convert::Infallible;
74use std::pin::pin;
75use std::rc::Rc;
76use std::str::FromStr;
77use std::sync::Arc;
78use std::sync::LazyLock;
79use std::time::Instant;
80use std::time::{Duration, SystemTime, UNIX_EPOCH};
81
82use differential_dataflow::AsCollection;
83use futures::{FutureExt, Stream as AsyncStream, StreamExt, TryStreamExt};
84use mz_dyncfg::ConfigSet;
85use mz_ore::cast::CastFrom;
86use mz_ore::future::InTask;
87use mz_postgres_util::PostgresError;
88use mz_postgres_util::{Client, Sql, execute, query_opt, simple_query_opt, sql};
89use mz_repr::{Datum, DatumVec, Diff, Row};
90use mz_storage_types::dyncfgs::PG_SCHEMA_VALIDATION_INTERVAL;
91use mz_storage_types::dyncfgs::PG_SOURCE_VALIDATE_TIMELINE;
92use mz_storage_types::errors::DataflowError;
93use mz_storage_types::sources::{MzOffset, PostgresSourceConnection};
94use mz_timely_util::builder_async::{
95 AsyncOutputHandle, Event as AsyncEvent, OperatorBuilder as AsyncOperatorBuilder,
96 PressOnDropButton,
97};
98use postgres_replication::LogicalReplicationStream;
99use postgres_replication::protocol::{LogicalReplicationMessage, ReplicationMessage, TupleData};
100use serde::{Deserialize, Serialize};
101use timely::container::CapacityContainerBuilder;
102use timely::dataflow::channels::pact::{Exchange, Pipeline};
103use timely::dataflow::operators::Capability;
104use timely::dataflow::operators::Concat;
105use timely::dataflow::operators::Operator;
106use timely::dataflow::operators::core::Map;
107use timely::dataflow::{Scope, StreamVec};
108use timely::progress::Antichain;
109use tokio::sync::{mpsc, watch};
110use tokio_postgres::error::SqlState;
111use tokio_postgres::types::PgLsn;
112use tracing::{error, trace};
113
114use crate::metrics::source::postgres::PgSourceMetrics;
115use crate::source::RawSourceCreationConfig;
116use crate::source::postgres::verify_schema;
117use crate::source::postgres::{DefiniteError, ReplicationError, SourceOutputInfo, TransientError};
118use crate::source::probe;
119use crate::source::types::{FuelSize, Probe, SignaledFuture, SourceMessage, StackedCollection};
120
121/// A logical replication message from the server.
122type LogicalReplMsg = ReplicationMessage<LogicalReplicationMessage>;
123
124/// A decoded row from a transaction with source information.
125type DecodedRow = (u32, usize, Result<Row, DefiniteError>, Diff);
126
127/// Postgres epoch is 2000-01-01T00:00:00Z
128static PG_EPOCH: LazyLock<SystemTime> =
129 LazyLock::new(|| UNIX_EPOCH + Duration::from_secs(946_684_800));
130
131// A request to rewind a snapshot taken at `snapshot_lsn` to the initial LSN of the replication
132// slot. This is accomplished by emitting `(data, 0, -diff)` for all updates `(data, lsn, diff)`
133// whose `lsn <= snapshot_lsn`. By convention the snapshot is always emitted at LSN 0.
134#[derive(Debug, Clone, Serialize, Deserialize)]
135pub(crate) struct RewindRequest {
136 /// The output index that should be rewound.
137 pub(crate) output_index: usize,
138 /// The LSN that the snapshot was taken at.
139 pub(crate) snapshot_lsn: MzOffset,
140}
141
142/// Renders the replication dataflow. See the module documentation for more information.
143pub(crate) fn render<'scope>(
144 scope: Scope<'scope, MzOffset>,
145 config: RawSourceCreationConfig,
146 connection: PostgresSourceConnection,
147 table_info: BTreeMap<u32, BTreeMap<usize, SourceOutputInfo>>,
148 rewind_stream: StreamVec<'scope, MzOffset, RewindRequest>,
149 slot_ready_stream: StreamVec<'scope, MzOffset, Infallible>,
150 committed_uppers: impl futures::Stream<Item = Antichain<MzOffset>> + 'static,
151 metrics: PgSourceMetrics,
152) -> (
153 StackedCollection<'scope, MzOffset, (usize, Result<SourceMessage, DataflowError>)>,
154 StreamVec<'scope, MzOffset, Probe<MzOffset>>,
155 StreamVec<'scope, MzOffset, ReplicationError>,
156 PressOnDropButton,
157) {
158 let op_name = format!("ReplicationReader({})", config.id);
159 let mut builder = AsyncOperatorBuilder::new(op_name, scope.clone());
160
161 let slot_reader = u64::cast_from(config.responsible_worker("slot"));
162 let (data_output, data_stream) = builder.new_output();
163 let (definite_error_handle, definite_errors) =
164 builder.new_output::<CapacityContainerBuilder<_>>();
165 let (probe_output, probe_stream) = builder.new_output::<CapacityContainerBuilder<_>>();
166
167 let mut rewind_input =
168 builder.new_disconnected_input(rewind_stream, Exchange::new(move |_| slot_reader));
169 let mut slot_ready_input = builder.new_disconnected_input(slot_ready_stream, Pipeline);
170 let output_uppers = table_info
171 .iter()
172 .flat_map(|(_, outputs)| outputs.values().map(|o| o.resume_upper.clone()))
173 .collect::<Vec<_>>();
174 metrics.tables.set(u64::cast_from(output_uppers.len()));
175
176 let reader_table_info = table_info.clone();
177 let (button, transient_errors) = builder.build_fallible(move |caps| {
178 let mut table_info = reader_table_info;
179 let busy_signal = Arc::clone(&config.busy_signal);
180 Box::pin(SignaledFuture::new(busy_signal, async move {
181 let (id, worker_id) = (config.id, config.worker_id);
182 let [data_cap_set, definite_error_cap_set, probe_cap]: &mut [_; 3] =
183 caps.try_into().unwrap();
184
185 if !config.responsible_for("slot") {
186 // Emit 0, to mark this worker as having started up correctly.
187 for stat in config.statistics.values() {
188 stat.set_offset_known(0);
189 stat.set_offset_committed(0);
190 }
191 return Ok(());
192 }
193
194 // Determine the slot lsn.
195 let connection_config = connection
196 .connection
197 .config(
198 &config.config.connection_context.secrets_reader,
199 &config.config,
200 InTask::Yes,
201 )
202 .await?;
203
204 let slot = &connection.publication_details.slot;
205 let replication_client = connection_config
206 .connect_replication(&config.config.connection_context.ssh_tunnel_manager)
207 .await?;
208
209 let metadata_client = connection_config
210 .connect(
211 "replication metadata",
212 &config.config.connection_context.ssh_tunnel_manager,
213 )
214 .await?;
215 let metadata_client = Arc::new(metadata_client);
216
217 while let Some(_) = slot_ready_input.next().await {
218 // Wait for the slot to be created
219 }
220
221 // The slot is always created by the snapshot operator. If the slot doesn't exist,
222 // when this check runs, this operator will return an error.
223 let slot_metadata = super::fetch_slot_metadata(
224 &*metadata_client,
225 slot,
226 mz_storage_types::dyncfgs::PG_FETCH_SLOT_RESUME_LSN_INTERVAL
227 .get(config.config.config_set()),
228 )
229 .await?;
230
231 // We're the only application that should be using this replication
232 // slot. The only way that there can be another connection using
233 // this slot under normal operation is if there's a stale TCP
234 // connection from a prior incarnation of the source holding on to
235 // the slot. We don't want to wait for the WAL sender timeout and/or
236 // TCP keepalives to time out that connection, because these values
237 // are generally under the control of the DBA and may not time out
238 // the connection for multiple minutes, or at all. Instead we just
239 // force kill the connection that's using the slot.
240 //
241 // Note that there's a small risk that *we're* the zombie cluster
242 // that should not be using the replication slot. Kubernetes cannot
243 // 100% guarantee that only one cluster is alive at a time. However,
244 // this situation should not last long, and the worst that can
245 // happen is a bit of transient thrashing over ownership of the
246 // replication slot.
247 if let Some(active_pid) = slot_metadata.active_pid {
248 tracing::warn!(
249 %id, %active_pid,
250 "replication slot already in use; will attempt to kill existing connection",
251 );
252
253 match execute(
254 &**metadata_client,
255 sql!("SELECT pg_terminate_backend($1)"),
256 &[&active_pid],
257 )
258 .await
259 {
260 Ok(_) => {
261 tracing::info!(
262 "successfully killed existing connection; \
263 starting replication is likely to succeed"
264 );
265 // Note that `pg_terminate_backend` does not wait for
266 // the termination of the targeted connection to
267 // complete. We may try to start replication before the
268 // targeted connection has cleaned up its state. That's
269 // okay. If that happens we'll just try again from the
270 // top via the suspend-and-restart flow.
271 }
272 Err(e) => {
273 tracing::warn!(
274 %e,
275 "failed to kill existing replication connection; \
276 replication will likely fail to start"
277 );
278 // Continue on anyway, just in case the replication slot
279 // is actually available. Maybe PostgreSQL has some
280 // staleness when it reports `active_pid`, for example.
281 }
282 }
283 }
284
285 // The overall resumption point for this source is the minimum of the resumption points
286 // contributed by each of the outputs.
287 let resume_lsn = output_uppers
288 .iter()
289 .flat_map(|f| f.elements())
290 .map(|&lsn| {
291 // An output is either an output that has never had data committed to it or one
292 // that has and needs to resume. We differentiate between the two by checking
293 // whether an output wishes to "resume" from the minimum timestamp. In that case
294 // its contribution to the overal resumption point is the earliest point available
295 // in the slot. This information would normally be something that the storage
296 // controller figures out in the form of an as-of frontier, but at the moment the
297 // storage controller does not have visibility into what the replication slot is
298 // doing.
299 if lsn == MzOffset::from(0) {
300 slot_metadata.confirmed_flush_lsn
301 } else {
302 lsn
303 }
304 })
305 .min();
306 let Some(resume_lsn) = resume_lsn else {
307 std::future::pending::<()>().await;
308 return Ok(());
309 };
310 // If we don't set "offset_committed" now, it'll be stuck at 0 (the default value)
311 // until we finish processing the table snapshot. If the snapshot is large, that could be a long time.
312 // This confuses the ingestion lag calculation in the UI, causing it to yield erroneously high values.
313 for stat in config.statistics.values() {
314 stat.set_offset_committed(resume_lsn.offset);
315 }
316 trace!(%id, "timely-{worker_id} replication reader started lsn={resume_lsn}");
317
318 // Emitting an initial probe before we start waiting for rewinds ensures that we will
319 // have a timestamp binding in the remap collection while the snapshot is processed.
320 // This is important because otherwise the snapshot updates would need to be buffered
321 // in the reclock operator, instead of being spilled to S3 in the persist sink.
322 //
323 // Note that we need to fetch the probe LSN _after_ having created the replication
324 // slot, to make sure the fetched LSN will be included in the replication stream.
325 let probe_ts = (config.now_fn)().into();
326 let max_lsn = mz_postgres_util::fetch_max_lsn(
327 &*metadata_client,
328 connection.publication_details.get_is_physical_replica(),
329 )
330 .await?;
331 let probe = Probe {
332 probe_ts,
333 upstream_frontier: Antichain::from_elem(MzOffset::from(max_lsn)),
334 };
335 probe_output.give(&probe_cap[0], probe);
336
337 let mut rewinds = BTreeMap::new();
338 while let Some(event) = rewind_input.next().await {
339 if let AsyncEvent::Data(_, data) = event {
340 for req in data {
341 if resume_lsn > req.snapshot_lsn + 1 {
342 let err = DefiniteError::SlotCompactedPastResumePoint(
343 req.snapshot_lsn + 1,
344 resume_lsn,
345 );
346 // If the replication stream cannot be obtained from the resume point there is nothing
347 // else to do. These errors are not retractable.
348 for (oid, outputs) in table_info.iter() {
349 for output_index in outputs.keys() {
350 // We pick `u64::MAX` as the LSN which will (in practice) never conflict
351 // any previously revealed portions of the TVC.
352 let update = (
353 (
354 *oid,
355 *output_index,
356 Err(DataflowError::from(err.clone())),
357 ),
358 MzOffset::from(u64::MAX),
359 Diff::ONE,
360 );
361 let size = update.fuel_size();
362 data_output
363 .give_fueled(&data_cap_set[0], update, size)
364 .await;
365 }
366 }
367 definite_error_handle.give(
368 &definite_error_cap_set[0],
369 ReplicationError::Definite(Rc::new(err)),
370 );
371 return Ok(());
372 }
373 rewinds.insert(req.output_index, req);
374 }
375 }
376 }
377 trace!(%id, "timely-{worker_id} pending rewinds {rewinds:?}");
378
379 let mut committed_uppers = pin!(committed_uppers);
380
381 let stream_result = raw_stream(
382 &config,
383 replication_client,
384 Arc::clone(&metadata_client),
385 &connection.publication_details.slot,
386 &connection.publication_details.timeline_id,
387 &connection.publication,
388 resume_lsn,
389 committed_uppers.as_mut(),
390 &probe_output,
391 &probe_cap[0],
392 connection.publication_details.get_is_physical_replica(),
393 )
394 .await?;
395
396 let stream = match stream_result {
397 Ok(stream) => stream,
398 Err(err) => {
399 // If the replication stream cannot be obtained in a definite way there is
400 // nothing else to do. These errors are not retractable.
401 for (oid, outputs) in table_info.iter() {
402 for output_index in outputs.keys() {
403 // We pick `u64::MAX` as the LSN which will (in practice) never conflict
404 // any previously revealed portions of the TVC.
405 let update = (
406 (*oid, *output_index, Err(DataflowError::from(err.clone()))),
407 MzOffset::from(u64::MAX),
408 Diff::ONE,
409 );
410 let size = update.fuel_size();
411 data_output
412 .give_fueled(&data_cap_set[0], update, size)
413 .await;
414 }
415 }
416
417 definite_error_handle.give(
418 &definite_error_cap_set[0],
419 ReplicationError::Definite(Rc::new(err)),
420 );
421 return Ok(());
422 }
423 };
424 let mut stream = pin!(stream.peekable());
425
426 // Run the periodic schema validation on a separate task using a separate client,
427 // to prevent it from blocking the replication reading progress.
428 let ssh_tunnel_manager = &config.config.connection_context.ssh_tunnel_manager;
429 let client = connection_config
430 .connect("schema validation", ssh_tunnel_manager)
431 .await?;
432 let mut schema_errors = spawn_schema_validator(
433 client,
434 &config,
435 connection.publication.clone(),
436 table_info.clone(),
437 connection.publication_details.get_is_physical_replica(),
438 );
439
440 // Instead of downgrading the capability for every transaction we process we only do it
441 // if we're about to yield, which is checked at the bottom of the loop. This avoids
442 // creating excessive progress tracking traffic when there are multiple small
443 // transactions ready to go.
444 let mut data_upper = resume_lsn;
445 while let Some(event) = stream.as_mut().next().await {
446 use LogicalReplicationMessage::*;
447 use ReplicationMessage::*;
448 match event {
449 Ok(XLogData(data)) => match data.data() {
450 Begin(begin) => {
451 let commit_lsn = MzOffset::from(begin.final_lsn());
452
453 let mut tx = pin!(extract_transaction(
454 stream.by_ref(),
455 &*metadata_client,
456 commit_lsn,
457 &mut table_info,
458 &metrics,
459 &connection.publication,
460 ));
461
462 trace!(
463 %id,
464 "timely-{worker_id} extracting transaction \
465 at {commit_lsn}"
466 );
467 assert!(
468 data_upper <= commit_lsn,
469 "new_upper={data_upper} tx_lsn={commit_lsn}",
470 );
471 data_upper = commit_lsn + 1;
472 while let Some((oid, output_index, event, diff)) = tx.try_next().await?
473 {
474 let event = event.map_err(Into::into);
475 let data = (oid, output_index, event);
476 if let Some(req) = rewinds.get(&output_index) {
477 if commit_lsn <= req.snapshot_lsn {
478 let update = (data.clone(), MzOffset::from(0), -diff);
479 let size = update.fuel_size();
480 data_output
481 .give_fueled(&data_cap_set[0], update, size)
482 .await;
483 }
484 }
485 let update = (data, commit_lsn, diff);
486 let size = update.fuel_size();
487 data_output
488 .give_fueled(&data_cap_set[0], update, size)
489 .await;
490 }
491 }
492 _ => return Err(TransientError::BareTransactionEvent),
493 },
494 Ok(PrimaryKeepAlive(keepalive)) => {
495 trace!( %id,
496 "timely-{worker_id} received keepalive lsn={}",
497 keepalive.wal_end()
498 );
499
500 // Take the opportunity to report any schema validation errors.
501 while let Ok(error) = schema_errors.try_recv() {
502 use SchemaValidationError::*;
503 match error {
504 Postgres(PostgresError::PublicationMissing(publication)) => {
505 let err = DefiniteError::PublicationDropped(publication);
506 for (oid, outputs) in table_info.iter() {
507 for output_index in outputs.keys() {
508 let update = (
509 (
510 *oid,
511 *output_index,
512 Err(DataflowError::from(err.clone())),
513 ),
514 data_cap_set[0].time().clone(),
515 Diff::ONE,
516 );
517 let size = update.fuel_size();
518 data_output
519 .give_fueled(&data_cap_set[0], update, size)
520 .await;
521 }
522 }
523 definite_error_handle.give(
524 &definite_error_cap_set[0],
525 ReplicationError::Definite(Rc::new(err)),
526 );
527 return Ok(());
528 }
529 Postgres(pg_error) => Err(TransientError::from(pg_error))?,
530 PhysicalReplicaChanged { expected, actual } => {
531 // The upstream is no longer the kind of node the
532 // source was created against (e.g. a physical
533 // replica was promoted to a primary). Logical
534 // decoding cannot safely continue, so stall with a
535 // definite, non-retryable error.
536 let err =
537 DefiniteError::InvalidPhysicalReplica { expected, actual };
538 for (oid, outputs) in table_info.iter() {
539 for output_index in outputs.keys() {
540 let update = (
541 (
542 *oid,
543 *output_index,
544 Err(DataflowError::from(err.clone())),
545 ),
546 // We don't have a clean way to align on when the replica changed so jump straight to u64::MAX to avoid conflicts.
547 MzOffset::from(u64::MAX),
548 Diff::ONE,
549 );
550 let size = update.fuel_size();
551 data_output
552 .give_fueled(&data_cap_set[0], update, size)
553 .await;
554 }
555 }
556 definite_error_handle.give(
557 &definite_error_cap_set[0],
558 ReplicationError::Definite(Rc::new(err)),
559 );
560 return Ok(());
561 }
562 Schema {
563 oid,
564 output_index,
565 error,
566 } => {
567 let table = table_info.get_mut(&oid).unwrap();
568 if table.remove(&output_index).is_none() {
569 continue;
570 }
571
572 let update = (
573 (oid, output_index, Err(error.into())),
574 data_cap_set[0].time().clone(),
575 Diff::ONE,
576 );
577 let size = update.fuel_size();
578 data_output
579 .give_fueled(&data_cap_set[0], update, size)
580 .await;
581 }
582 }
583 }
584 data_upper = std::cmp::max(data_upper, keepalive.wal_end().into());
585 }
586 Ok(_) => return Err(TransientError::UnknownReplicationMessage),
587 Err(err) => return Err(err),
588 }
589
590 let will_yield = stream.as_mut().peek().now_or_never().is_none();
591 if will_yield {
592 trace!(%id, "timely-{worker_id} yielding at lsn={data_upper}");
593 rewinds.retain(|_, req| data_upper <= req.snapshot_lsn);
594 // As long as there are pending rewinds we can't downgrade our data capability
595 // since we must be able to produce data at offset 0.
596 if rewinds.is_empty() {
597 data_cap_set.downgrade([&data_upper]);
598 }
599 }
600 }
601 // We never expect the replication stream to gracefully end
602 Err(TransientError::ReplicationEOF)
603 }))
604 });
605
606 // We now process the slot updates and apply the cast expressions
607 let mut final_row = Row::default();
608 let mut datum_vec = DatumVec::new();
609 let mut next_worker = (0..u64::cast_from(scope.peers()))
610 // Round robin on 1000-records basis to avoid creating tiny containers when there are a
611 // small number of updates and a large number of workers.
612 .flat_map(|w| std::iter::repeat_n(w, 1000))
613 .cycle();
614 let round_robin = Exchange::new(move |_| next_worker.next().unwrap());
615 let replication_updates = data_stream
616 .unary(round_robin, "PgCastReplicationRows", |_, _| {
617 move |input, output| {
618 input.for_each_time(|time, data| {
619 let mut session = output.session(&time);
620 for ((oid, output_index, event), time, diff) in
621 data.flat_map(|data| data.drain(..))
622 {
623 let output = &table_info
624 .get(&oid)
625 .and_then(|outputs| outputs.get(&output_index))
626 .expect("table_info contains all outputs");
627 let event = event.and_then(|row| {
628 let datums = datum_vec.borrow_with(&row);
629 super::cast_row(&output.casts, &datums, &mut final_row)?;
630 Ok(SourceMessage {
631 key: Row::default(),
632 value: final_row.clone(),
633 metadata: Row::default(),
634 })
635 });
636
637 session.give(((output_index, event), time, diff));
638 }
639 });
640 }
641 })
642 .as_collection();
643
644 let errors = definite_errors.concat(transient_errors.map(ReplicationError::from));
645
646 (
647 replication_updates,
648 probe_stream,
649 errors,
650 button.press_on_drop(),
651 )
652}
653
654/// Produces the logical replication stream while taking care of regularly sending standby
655/// keepalive messages with the provided `uppers` stream.
656///
657/// The returned stream will contain all transactions that whose commit LSN is beyond `resume_lsn`.
658async fn raw_stream<'a>(
659 config: &'a RawSourceCreationConfig,
660 replication_client: Client,
661 metadata_client: Arc<Client>,
662 slot: &'a str,
663 timeline_id: &'a Option<u64>,
664 publication: &'a str,
665 resume_lsn: MzOffset,
666 uppers: impl futures::Stream<Item = Antichain<MzOffset>> + 'a,
667 probe_output: &'a AsyncOutputHandle<MzOffset, CapacityContainerBuilder<Vec<Probe<MzOffset>>>>,
668 probe_cap: &'a Capability<MzOffset>,
669 is_physical_replica: bool,
670) -> Result<
671 Result<impl AsyncStream<Item = Result<LogicalReplMsg, TransientError>> + 'a, DefiniteError>,
672 TransientError,
673> {
674 if let Err(err) = ensure_publication_exists(&*metadata_client, publication).await? {
675 // If the publication gets deleted there is nothing else to do. These errors
676 // are not retractable.
677 return Ok(Err(err));
678 }
679
680 // Ensure the upstream server's physical replica status hasn't changed since the source was
681 // created. We use the metadata client here for the same reason as "SHOW wal_sender_timeout" below.
682 //
683 // This runs before the timeline ID check on purpose. Promoting a physical replica to a primary
684 // both flips pg_is_in_recovery() and switches the timeline, so either check could fire. Running
685 // this one first means a promotion always surfaces as the specific InvalidPhysicalReplica error
686 // (rather than the more generic timeline mismatch), which is the clearer signal for an operator.
687 if let Err(err) = ensure_physical_replica(&*metadata_client, is_physical_replica).await? {
688 return Ok(Err(err));
689 }
690
691 // Skip the timeline ID check for sources without a known timeline ID
692 // (sources created before the timeline ID was added to the source details)
693 if let Some(expected_timeline_id) = timeline_id {
694 if let Err(err) = ensure_replication_timeline_id(
695 &replication_client,
696 expected_timeline_id,
697 config.config.config_set(),
698 )
699 .await?
700 {
701 return Ok(Err(err));
702 }
703 }
704
705 // How often a proactive standby status update message should be sent to the server.
706 //
707 // The upstream will periodically request status updates by setting the keepalive's reply field
708 // value to 1. However, we cannot rely on these messages arriving on time. For example, when
709 // the upstream is sending a big transaction its keepalive messages are queued and can be
710 // delayed arbitrarily.
711 //
712 // See: <https://www.postgresql.org/message-id/CAMsr+YE2dSfHVr7iEv1GSPZihitWX-PMkD9QALEGcTYa+sdsgg@mail.gmail.com>
713 //
714 // For this reason we query the server's timeout value and proactively send a keepalive at
715 // twice the frequency to have a healthy margin from the deadline.
716 //
717 // Note: We must use the metadata client here which is NOT in replication mode. Some Aurora
718 // Postgres versions disallow SHOW commands from within replication connection.
719 // See: https://github.com/readysettech/readyset/discussions/28#discussioncomment-4405671
720 let row = simple_query_opt(&*metadata_client, sql!("SHOW wal_sender_timeout;"))
721 .await?
722 .unwrap();
723 let wal_sender_timeout = match row.get("wal_sender_timeout") {
724 // When this parameter is zero the timeout mechanism is disabled
725 Some("0") => None,
726 Some(value) => Some(
727 mz_repr::adt::interval::Interval::from_str(value)
728 .unwrap()
729 .duration()
730 .unwrap(),
731 ),
732 None => panic!("ubiquitous parameter missing"),
733 };
734
735 // This interval controls the cadence at which we send back status updates and, crucially,
736 // request PrimaryKeepAlive messages. PrimaryKeepAlive messages drive the frontier forward in
737 // the absence of data updates and we don't want a large `wal_sender_timeout` value to slow us
738 // down. For this reason the feedback interval is set to one second, or less if the
739 // wal_sender_timeout is less than 2 seconds.
740 let feedback_interval = match wal_sender_timeout {
741 Some(t) => std::cmp::min(Duration::from_secs(1), t.checked_div(2).unwrap()),
742 None => Duration::from_secs(1),
743 };
744
745 let mut feedback_timer = tokio::time::interval(feedback_interval);
746 // 'Delay' ensures we always tick at least 'feedback_interval'.
747 feedback_timer.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
748
749 // Postgres will return all transactions that commit *at or after* after the provided LSN,
750 // following the timely upper semantics.
751 let lsn = PgLsn::from(resume_lsn.offset);
752 let query = sql!(
753 "START_REPLICATION SLOT {} LOGICAL {} (\"proto_version\" '1', \"publication_names\" {})",
754 Sql::ident(slot),
755 Sql::raw_unchecked(lsn.to_string()),
756 Sql::literal(publication)
757 );
758 let copy_stream = match replication_client.copy_both_simple(query.as_str()).await {
759 Ok(copy_stream) => copy_stream,
760 Err(err) if err.code() == Some(&SqlState::OBJECT_NOT_IN_PREREQUISITE_STATE) => {
761 return Ok(Err(DefiniteError::InvalidReplicationSlot));
762 }
763 Err(err) => return Err(err.into()),
764 };
765
766 // According to the documentation [1] we must check that the slot LSN matches our
767 // expectations otherwise we risk getting silently fast-forwarded to a future LSN. In order
768 // to avoid a TOCTOU issue we must do this check after starting the replication stream. We
769 // cannot use the replication client to do that because it's already in CopyBoth mode.
770 // [1] https://www.postgresql.org/docs/15/protocol-replication.html#PROTOCOL-REPLICATION-START-REPLICATION-SLOT-LOGICAL
771 let slot_metadata = super::fetch_slot_metadata(
772 &*metadata_client,
773 slot,
774 mz_storage_types::dyncfgs::PG_FETCH_SLOT_RESUME_LSN_INTERVAL
775 .get(config.config.config_set()),
776 )
777 .await?;
778 let min_resume_lsn = slot_metadata.confirmed_flush_lsn;
779 tracing::info!(
780 %config.id,
781 "started replication using backend PID={:?}. wal_sender_timeout={:?}",
782 slot_metadata.active_pid, wal_sender_timeout
783 );
784
785 let (probe_tx, mut probe_rx) = watch::channel(None);
786 let timestamp_interval = config.timestamp_interval;
787 let now_fn = config.now_fn.clone();
788 let max_lsn_task_handle =
789 mz_ore::task::spawn(|| format!("pg_current_wal_lsn:{}", config.id), async move {
790 let mut probe_ticker = probe::Ticker::new(move || timestamp_interval, now_fn);
791
792 while !probe_tx.is_closed() {
793 let probe_ts = probe_ticker.tick().await;
794 let probe_or_err =
795 mz_postgres_util::fetch_max_lsn(&*metadata_client, is_physical_replica)
796 .await
797 .map(|lsn| Probe {
798 probe_ts,
799 upstream_frontier: Antichain::from_elem(MzOffset::from(lsn)),
800 });
801 let _ = probe_tx.send(Some(probe_or_err));
802 }
803 })
804 .abort_on_drop();
805
806 let stream = async_stream::try_stream!({
807 // Ensure we don't pre-drop the task
808 let _max_lsn_task_handle = max_lsn_task_handle;
809
810 // ensure we don't drop the replication client!
811 let _replication_client = replication_client;
812
813 let mut uppers = pin!(uppers);
814 let mut last_committed_upper = resume_lsn;
815
816 let mut stream = pin!(LogicalReplicationStream::new(copy_stream));
817
818 if !(resume_lsn == MzOffset::from(0) || min_resume_lsn <= resume_lsn) {
819 let err = TransientError::OvercompactedReplicationSlot {
820 available_lsn: min_resume_lsn,
821 requested_lsn: resume_lsn,
822 };
823 error!("timely-{} ({}) {err}", config.worker_id, config.id);
824 Err(err)?;
825 }
826
827 loop {
828 tokio::select! {
829 Some(next_message) = stream.next() => match next_message {
830 Ok(ReplicationMessage::XLogData(data)) => {
831 yield ReplicationMessage::XLogData(data);
832 Ok(())
833 }
834 Ok(ReplicationMessage::PrimaryKeepAlive(keepalive)) => {
835 yield ReplicationMessage::PrimaryKeepAlive(keepalive);
836 Ok(())
837 }
838 Err(err) => Err(err.into()),
839 _ => Err(TransientError::UnknownReplicationMessage),
840 },
841 _ = feedback_timer.tick() => {
842 let ts: i64 = PG_EPOCH.elapsed().unwrap().as_micros().try_into().unwrap();
843 let lsn = PgLsn::from(last_committed_upper.offset);
844 trace!("timely-{} ({}) sending keepalive {lsn:?}", config.worker_id, config.id);
845 // Postgres only sends PrimaryKeepAlive messages when *it* wants a reply, which
846 // happens when out status update is late. Since we send them proactively this
847 // may never happen. It is therefore *crucial* that we set the last parameter
848 // (the reply flag) to 1 here. This will cause the upstream server to send us a
849 // PrimaryKeepAlive message promptly which will give us frontier advancement
850 // information in the absence of data updates.
851 let res = stream.as_mut().standby_status_update(lsn, lsn, lsn, ts, 1).await;
852 res.map_err(|e| e.into())
853 },
854 Some(upper) = uppers.next() => match upper.into_option() {
855 Some(lsn) => {
856 if last_committed_upper < lsn {
857 last_committed_upper = lsn;
858 for stat in config.statistics.values() {
859 stat.set_offset_committed(last_committed_upper.offset);
860 }
861 }
862 Ok(())
863 }
864 None => Ok(()),
865 },
866 Ok(()) = probe_rx.changed() => match &*probe_rx.borrow() {
867 Some(Ok(probe)) => {
868 if let Some(offset_known) = probe.upstream_frontier.as_option() {
869 for stat in config.statistics.values() {
870 stat.set_offset_known(offset_known.offset);
871 }
872 }
873 probe_output.give(probe_cap, probe);
874 Ok(())
875 },
876 Some(Err(err)) => Err(anyhow::anyhow!("{err}").into()),
877 None => Ok(()),
878 },
879 else => return
880 }?;
881 }
882 });
883 Ok(Ok(stream))
884}
885
886/// Extracts a single transaction from the replication stream delimited by a BEGIN and COMMIT
887/// message. The BEGIN message must have already been consumed from the stream before calling this
888/// function.
889fn extract_transaction<'a>(
890 stream: impl AsyncStream<Item = Result<LogicalReplMsg, TransientError>> + 'a,
891 metadata_client: &'a Client,
892 commit_lsn: MzOffset,
893 table_info: &'a mut BTreeMap<u32, BTreeMap<usize, SourceOutputInfo>>,
894 metrics: &'a PgSourceMetrics,
895 publication: &'a str,
896) -> impl AsyncStream<Item = Result<DecodedRow, TransientError>> + 'a {
897 use LogicalReplicationMessage::*;
898 let mut row = Row::default();
899 async_stream::try_stream!({
900 let mut stream = pin!(stream);
901 metrics.transactions.inc();
902 metrics.lsn.set(commit_lsn.offset);
903 while let Some(event) = stream.try_next().await? {
904 // We can ignore keepalive messages while processing a transaction because the
905 // commit_lsn will drive progress.
906 let message = match event {
907 ReplicationMessage::XLogData(data) => data.into_data(),
908 ReplicationMessage::PrimaryKeepAlive(_) => {
909 metrics.ignored.inc();
910 continue;
911 }
912 _ => Err(TransientError::UnknownReplicationMessage)?,
913 };
914 metrics.total.inc();
915 match message {
916 Insert(body) if !table_info.contains_key(&body.rel_id()) => metrics.ignored.inc(),
917 Update(body) if !table_info.contains_key(&body.rel_id()) => metrics.ignored.inc(),
918 Delete(body) if !table_info.contains_key(&body.rel_id()) => metrics.ignored.inc(),
919 Relation(body) if !table_info.contains_key(&body.rel_id()) => metrics.ignored.inc(),
920 Insert(body) => {
921 metrics.inserts.inc();
922 let rel = body.rel_id();
923 for (output, info) in table_info.get(&rel).into_iter().flatten() {
924 let tuple_data = body.tuple().tuple_data();
925 let Some(ref projection) = info.projection else {
926 panic!("missing projection for {rel}");
927 };
928 let datums = projection.iter().map(|idx| &tuple_data[*idx]);
929 let row = unpack_tuple(datums, &mut row);
930 yield (rel, *output, row, Diff::ONE);
931 }
932 }
933 Update(body) => match body.old_tuple() {
934 Some(old_tuple) => {
935 metrics.updates.inc();
936 let new_tuple = body.new_tuple();
937 let rel = body.rel_id();
938 for (output, info) in table_info.get(&rel).into_iter().flatten() {
939 let Some(ref projection) = info.projection else {
940 panic!("missing projection for {rel}");
941 };
942 let old_tuple =
943 projection.iter().map(|idx| &old_tuple.tuple_data()[*idx]);
944 // If the new tuple contains unchanged toast values we reference the old ones
945 let new_tuple = std::iter::zip(
946 projection.iter().map(|idx| &new_tuple.tuple_data()[*idx]),
947 old_tuple.clone(),
948 )
949 .map(|(new, old)| match new {
950 TupleData::UnchangedToast => old,
951 _ => new,
952 });
953 let old_row = unpack_tuple(old_tuple, &mut row);
954 let new_row = unpack_tuple(new_tuple, &mut row);
955
956 yield (rel, *output, old_row, Diff::MINUS_ONE);
957 yield (rel, *output, new_row, Diff::ONE);
958 }
959 }
960 None => {
961 let rel = body.rel_id();
962 for (output, _) in table_info.get(&rel).into_iter().flatten() {
963 yield (
964 rel,
965 *output,
966 Err(DefiniteError::DefaultReplicaIdentity),
967 Diff::ONE,
968 );
969 }
970 }
971 },
972 Delete(body) => match body.old_tuple() {
973 Some(old_tuple) => {
974 metrics.deletes.inc();
975 let rel = body.rel_id();
976 for (output, info) in table_info.get(&rel).into_iter().flatten() {
977 let Some(ref projection) = info.projection else {
978 panic!("missing projection for {rel}");
979 };
980 let datums = projection.iter().map(|idx| &old_tuple.tuple_data()[*idx]);
981 let row = unpack_tuple(datums, &mut row);
982 yield (rel, *output, row, Diff::MINUS_ONE);
983 }
984 }
985 None => {
986 let rel = body.rel_id();
987 for (output, _) in table_info.get(&rel).into_iter().flatten() {
988 yield (
989 rel,
990 *output,
991 Err(DefiniteError::DefaultReplicaIdentity),
992 Diff::ONE,
993 );
994 }
995 }
996 },
997 Relation(body) => {
998 let rel_id = body.rel_id();
999 if let Some(outputs) = table_info.get_mut(&body.rel_id()) {
1000 // Because the replication stream doesn't include columns' attnums, we need
1001 // to check the current local schema against the current remote schema to
1002 // ensure e.g. we haven't received a schema update with the same terminal
1003 // column name which is actually a different column.
1004 let upstream_info = mz_postgres_util::publication_info(
1005 metadata_client,
1006 publication,
1007 Some(&[rel_id]),
1008 )
1009 .await?;
1010
1011 let mut schema_errors = vec![];
1012
1013 outputs.retain(|output_index, info| {
1014 match verify_schema(rel_id, info, &upstream_info) {
1015 Ok(()) => true,
1016 Err(err) => {
1017 schema_errors.push((
1018 rel_id,
1019 *output_index,
1020 Err(err),
1021 Diff::ONE,
1022 ));
1023 false
1024 }
1025 }
1026 });
1027 // Recalculate projection vector for the retained valid outputs. Here we
1028 // must use the column names in the RelationBody message and not the
1029 // upstream_info obtained above, since that one represents the current
1030 // schema upstream which may be many versions head of the one we're about
1031 // to receive after this Relation message.
1032 let column_positions: BTreeMap<_, _> = body
1033 .columns()
1034 .iter()
1035 .enumerate()
1036 .map(|(idx, col)| (col.name().unwrap(), idx))
1037 .collect();
1038 for info in outputs.values_mut() {
1039 let mut projection = vec![];
1040 for col in info.desc.columns.iter() {
1041 projection.push(column_positions[&*col.name]);
1042 }
1043 info.projection = Some(projection);
1044 }
1045 for schema_error in schema_errors {
1046 yield schema_error;
1047 }
1048 }
1049 }
1050 Truncate(body) => {
1051 for &rel_id in body.rel_ids() {
1052 if let Some(outputs) = table_info.get_mut(&rel_id) {
1053 for (output, _) in std::mem::take(outputs) {
1054 yield (
1055 rel_id,
1056 output,
1057 Err(DefiniteError::TableTruncated),
1058 Diff::ONE,
1059 );
1060 }
1061 }
1062 }
1063 }
1064 Commit(body) => {
1065 if commit_lsn != body.commit_lsn().into() {
1066 Err(TransientError::InvalidTransaction)?
1067 }
1068 return;
1069 }
1070 // TODO: We should handle origin messages and emit an error as they indicate that
1071 // the upstream performed a point in time restore so all bets are off about the
1072 // continuity of the stream.
1073 Origin(_) | Type(_) => metrics.ignored.inc(),
1074 Begin(_) => Err(TransientError::NestedTransaction)?,
1075 // The enum is marked as non_exhaustive. Better to be conservative
1076 _ => Err(TransientError::UnknownLogicalReplicationMessage)?,
1077 }
1078 }
1079 Err(TransientError::ReplicationEOF)?;
1080 })
1081}
1082
1083/// Unpacks an iterator of TupleData into a list of nullable bytes or an error if this can't be
1084/// done.
1085#[inline]
1086fn unpack_tuple<'a, I>(tuple_data: I, row: &mut Row) -> Result<Row, DefiniteError>
1087where
1088 I: IntoIterator<Item = &'a TupleData>,
1089 I::IntoIter: ExactSizeIterator,
1090{
1091 let iter = tuple_data.into_iter();
1092 let mut packer = row.packer();
1093 for data in iter {
1094 let datum = match data {
1095 TupleData::Text(bytes) => super::decode_utf8_text(bytes)?,
1096 TupleData::Null => Datum::Null,
1097 TupleData::UnchangedToast => return Err(DefiniteError::MissingToast),
1098 TupleData::Binary(_) => return Err(DefiniteError::UnexpectedBinaryData),
1099 };
1100 packer.push(datum);
1101 }
1102 Ok(row.clone())
1103}
1104
1105/// Ensures the publication exists on the server. It returns an outer transient error in case of
1106/// connection issues and an inner definite error if the publication is dropped.
1107async fn ensure_publication_exists(
1108 client: &Client,
1109 publication: &str,
1110) -> Result<Result<(), DefiniteError>, TransientError> {
1111 // Figure out the last written LSN and then add one to convert it into an upper.
1112 let result = query_opt(
1113 &**client,
1114 sql!("SELECT 1 FROM pg_publication WHERE pubname = $1;"),
1115 &[&publication],
1116 )
1117 .await?;
1118 match result {
1119 Some(_) => Ok(Ok(())),
1120 None => Ok(Err(DefiniteError::PublicationDropped(
1121 publication.to_owned(),
1122 ))),
1123 }
1124}
1125
1126/// Ensure the active replication timeline_id matches the one we expect such that we can safely
1127/// resume replication. It returns an outer transient error in case of
1128/// connection issues and an inner definite error if the timeline id does not match.
1129async fn ensure_replication_timeline_id(
1130 replication_client: &Client,
1131 expected_timeline_id: &u64,
1132 config_set: &ConfigSet,
1133) -> Result<Result<(), DefiniteError>, TransientError> {
1134 let timeline_id = mz_postgres_util::get_timeline_id(replication_client).await?;
1135 if timeline_id == *expected_timeline_id {
1136 Ok(Ok(()))
1137 } else {
1138 if PG_SOURCE_VALIDATE_TIMELINE.get(config_set) {
1139 Ok(Err(DefiniteError::InvalidTimelineId {
1140 expected: *expected_timeline_id,
1141 actual: timeline_id,
1142 }))
1143 } else {
1144 tracing::warn!(
1145 "Timeline ID mismatch ignored: expected={expected_timeline_id} actual={timeline_id}"
1146 );
1147 Ok(Ok(()))
1148 }
1149 }
1150}
1151
1152async fn ensure_physical_replica(
1153 metadata_client: &Client,
1154 expected_is_physical_replica: bool,
1155) -> Result<Result<(), DefiniteError>, TransientError> {
1156 let is_physical_replica = mz_postgres_util::get_is_in_recovery(metadata_client).await?;
1157 if is_physical_replica == expected_is_physical_replica {
1158 Ok(Ok(()))
1159 } else {
1160 Ok(Err(DefiniteError::InvalidPhysicalReplica {
1161 expected: expected_is_physical_replica,
1162 actual: is_physical_replica,
1163 }))
1164 }
1165}
1166
1167enum SchemaValidationError {
1168 Postgres(PostgresError),
1169 Schema {
1170 oid: u32,
1171 output_index: usize,
1172 error: DefiniteError,
1173 },
1174 /// The upstream's physical-replica status changed out from under us (e.g. a
1175 /// physical replica was promoted to a primary). `expected` is the status the
1176 /// source was created against; `actual` is what the upstream reports now.
1177 PhysicalReplicaChanged {
1178 expected: bool,
1179 actual: bool,
1180 },
1181}
1182
1183fn spawn_schema_validator(
1184 client: Client,
1185 config: &RawSourceCreationConfig,
1186 publication: String,
1187 table_info: BTreeMap<u32, BTreeMap<usize, SourceOutputInfo>>,
1188 is_physical_replica: bool,
1189) -> mpsc::UnboundedReceiver<SchemaValidationError> {
1190 let (tx, rx) = mpsc::unbounded_channel();
1191 let source_id = config.id;
1192 let config_set = Arc::clone(config.config.config_set());
1193
1194 mz_ore::task::spawn(|| format!("schema-validator:{}", source_id), async move {
1195 while !tx.is_closed() {
1196 trace!(%source_id, "validating schemas");
1197
1198 let validation_start = Instant::now();
1199
1200 // Detect a physical replica being promoted to a primary (or vice
1201 // versa) while the stream is live. The startup check in raw_stream
1202 // only catches this across a restart, so we re-check periodically
1203 // here to surface a definite error proactively.
1204 match mz_postgres_util::get_is_in_recovery(&*client).await {
1205 Ok(actual) if actual != is_physical_replica => {
1206 let _ = tx.send(SchemaValidationError::PhysicalReplicaChanged {
1207 expected: is_physical_replica,
1208 actual,
1209 });
1210 }
1211 Ok(_) => {}
1212 Err(error) => {
1213 let _ = tx.send(SchemaValidationError::Postgres(error));
1214 continue;
1215 }
1216 }
1217
1218 let upstream_info = match mz_postgres_util::publication_info(
1219 &*client,
1220 &publication,
1221 Some(&table_info.keys().copied().collect::<Vec<_>>()),
1222 )
1223 .await
1224 {
1225 Ok(info) => info,
1226 Err(error) => {
1227 let _ = tx.send(SchemaValidationError::Postgres(error));
1228 continue;
1229 }
1230 };
1231
1232 for (&oid, outputs) in table_info.iter() {
1233 for (&output_index, info) in outputs {
1234 if let Err(error) = verify_schema(oid, info, &upstream_info) {
1235 trace!(
1236 %source_id,
1237 "schema of output index {output_index} for oid {oid} invalid",
1238 );
1239 let _ = tx.send(SchemaValidationError::Schema {
1240 oid,
1241 output_index,
1242 error,
1243 });
1244 } else {
1245 trace!(
1246 %source_id,
1247 "schema of output index {output_index} for oid {oid} valid",
1248 );
1249 }
1250 }
1251 }
1252
1253 let interval = PG_SCHEMA_VALIDATION_INTERVAL.get(&config_set);
1254 let elapsed = validation_start.elapsed();
1255 let wait = interval.saturating_sub(elapsed);
1256 tokio::time::sleep(wait).await;
1257 }
1258 });
1259
1260 rx
1261}