mz_storage/source/postgres/replication.rs
1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Renders the logical replication side of the [`PostgresSourceConnection`] ingestion dataflow.
11//!
12//! ```text
13//! o
14//! │rewind
15//! │requests
16//! ╭───┴────╮
17//! │exchange│ (collect all requests to one worker)
18//! ╰───┬────╯
19//! ┏━━v━━━━━━━━━━┓
20//! ┃ replication ┃ (single worker)
21//! ┃ reader ┃
22//! ┗━┯━━━━━━━━┯━━┛
23//! │raw │
24//! │data │
25//! ╭────┴─────╮ │
26//! │distribute│ │ (distribute to all workers)
27//! ╰────┬─────╯ │
28//! ┏━━━━━━━━━━━┷━┓ │
29//! ┃ replication ┃ │ (parallel decode)
30//! ┃ decoder ┃ │
31//! ┗━━━━━┯━━━━━━━┛ │
32//! │ replication │ progress
33//! │ updates │ output
34//! v v
35//! ```
36//!
37//! # Progress tracking
38//!
39//! In order to avoid causing excessive resource usage in the upstream server it's important to
40//! track the LSN that we have successfully committed to persist and communicate that back to
41//! PostgreSQL. Under normal operation this gauge of progress is provided by the presence of
42//! transactions themselves. Since at a given LSN offset there can be only a single message, when a
43//! transaction is received and processed we can infer that we have seen all the messages that are
44//! not beyond `commit_lsn + 1`.
45//!
46//! Things are a bit more complicated in the absence of transactions though because even though we
47//! don't receive any the server might very well be generating WAL records. This can happen if
48//! there is a separate logical database performing writes (which is the case for RDS databases),
49//! or, in servers running PostgreSQL version 15 or greater, the logical replication process
50//! includes an optimization that omits empty transactions, which can happen if you're only
51//! replicating a subset of the tables and there writes going to the other ones.
52//!
53//! If we fail to detect this situation and don't send LSN feedback in a timely manner the server
54//! will be forced to keep around WAL data that can eventually lead to disk space exhaustion.
55//!
56//! In the absence of transactions the only available piece of information in the replication
57//! stream are keepalive messages. Keepalive messages are documented[1] to contain the current end
58//! of WAL on the server. That is a useless number when it comes to progress tracking because there
59//! might be pending messages at LSNs between the last received commit_lsn and the current end of
60//! WAL.
61//!
62//! Fortunately for us, the documentation for PrimaryKeepalive messages is wrong and it actually
63//! contains the last *sent* LSN[2]. Here sent doesn't necessarily mean sent over the wire, but
64//! sent to the upstream process that is handling producing the logical stream. Therefore, if we
65//! receive a keepalive with a particular LSN we can be certain that there are no other replication
66//! messages at previous LSNs, because they would have been already generated and received. We
67//! therefore connect the keepalive messages directly to our capability.
68//!
69//! [1]: https://www.postgresql.org/docs/15/protocol-replication.html#PROTOCOL-REPLICATION-START-REPLICATION
70//! [2]: https://www.postgresql.org/message-id/CAFPTHDZS9O9WG02EfayBd6oONzK%2BqfUxS6AbVLJ7W%2BKECza2gg%40mail.gmail.com
71
72use std::collections::BTreeMap;
73use std::convert::Infallible;
74use std::pin::pin;
75use std::rc::Rc;
76use std::str::FromStr;
77use std::sync::Arc;
78use std::sync::LazyLock;
79use std::time::Instant;
80use std::time::{Duration, SystemTime, UNIX_EPOCH};
81
82use differential_dataflow::AsCollection;
83use futures::{FutureExt, Stream as AsyncStream, StreamExt, TryStreamExt};
84use mz_ore::cast::CastFrom;
85use mz_ore::collections::HashSet;
86use mz_ore::future::InTask;
87use mz_ore::iter::IteratorExt;
88use mz_postgres_util::PostgresError;
89use mz_postgres_util::tunnel::PostgresFlavor;
90use mz_postgres_util::{Client, simple_query_opt};
91use mz_repr::{Datum, DatumVec, Diff, Row};
92use mz_sql_parser::ast::{Ident, display::AstDisplay};
93use mz_storage_types::dyncfgs::{PG_OFFSET_KNOWN_INTERVAL, PG_SCHEMA_VALIDATION_INTERVAL};
94use mz_storage_types::errors::DataflowError;
95use mz_storage_types::sources::SourceTimestamp;
96use mz_storage_types::sources::{MzOffset, PostgresSourceConnection};
97use mz_timely_util::builder_async::{
98 AsyncOutputHandle, Event as AsyncEvent, OperatorBuilder as AsyncOperatorBuilder,
99 PressOnDropButton,
100};
101use mz_timely_util::operator::StreamExt as TimelyStreamExt;
102use postgres_replication::LogicalReplicationStream;
103use postgres_replication::protocol::{LogicalReplicationMessage, ReplicationMessage, TupleData};
104use serde::{Deserialize, Serialize};
105use timely::container::CapacityContainerBuilder;
106use timely::dataflow::channels::pact::{Exchange, Pipeline};
107use timely::dataflow::channels::pushers::Tee;
108use timely::dataflow::operators::Capability;
109use timely::dataflow::operators::Concat;
110use timely::dataflow::operators::core::Map;
111use timely::dataflow::{Scope, Stream};
112use timely::progress::Antichain;
113use tokio::sync::{mpsc, watch};
114use tokio_postgres::error::SqlState;
115use tokio_postgres::types::PgLsn;
116use tracing::{error, trace};
117
118use crate::metrics::source::postgres::PgSourceMetrics;
119use crate::source::RawSourceCreationConfig;
120use crate::source::postgres::verify_schema;
121use crate::source::postgres::{DefiniteError, ReplicationError, SourceOutputInfo, TransientError};
122use crate::source::probe;
123use crate::source::types::{
124 Probe, ProgressStatisticsUpdate, SignaledFuture, SourceMessage, StackedCollection,
125};
126
127/// Postgres epoch is 2000-01-01T00:00:00Z
128static PG_EPOCH: LazyLock<SystemTime> =
129 LazyLock::new(|| UNIX_EPOCH + Duration::from_secs(946_684_800));
130
131// A request to rewind a snapshot taken at `snapshot_lsn` to the initial LSN of the replication
132// slot. This is accomplished by emitting `(data, 0, -diff)` for all updates `(data, lsn, diff)`
133// whose `lsn <= snapshot_lsn`. By convention the snapshot is always emitted at LSN 0.
134#[derive(Debug, Clone, Serialize, Deserialize)]
135pub(crate) struct RewindRequest {
136 /// The output index that should be rewound.
137 pub(crate) output_index: usize,
138 /// The LSN that the snapshot was taken at.
139 pub(crate) snapshot_lsn: MzOffset,
140}
141
142/// Renders the replication dataflow. See the module documentation for more information.
143pub(crate) fn render<G: Scope<Timestamp = MzOffset>>(
144 scope: G,
145 config: RawSourceCreationConfig,
146 connection: PostgresSourceConnection,
147 table_info: BTreeMap<u32, BTreeMap<usize, SourceOutputInfo>>,
148 rewind_stream: &Stream<G, RewindRequest>,
149 slot_ready_stream: &Stream<G, Infallible>,
150 committed_uppers: impl futures::Stream<Item = Antichain<MzOffset>> + 'static,
151 metrics: PgSourceMetrics,
152) -> (
153 StackedCollection<G, (usize, Result<SourceMessage, DataflowError>)>,
154 Stream<G, Infallible>,
155 Stream<G, ProgressStatisticsUpdate>,
156 Option<Stream<G, Probe<MzOffset>>>,
157 Stream<G, ReplicationError>,
158 PressOnDropButton,
159) {
160 let op_name = format!("ReplicationReader({})", config.id);
161 let mut builder = AsyncOperatorBuilder::new(op_name, scope.clone());
162
163 let slot_reader = u64::cast_from(config.responsible_worker("slot"));
164 let (data_output, data_stream) = builder.new_output();
165 let (_upper_output, upper_stream) = builder.new_output::<CapacityContainerBuilder<_>>();
166 let (definite_error_handle, definite_errors) = builder.new_output();
167
168 let (stats_output, stats_stream) = builder.new_output::<CapacityContainerBuilder<_>>();
169 let (probe_output, probe_stream) = builder.new_output::<CapacityContainerBuilder<_>>();
170
171 // Yugabyte doesn't support LSN probing currently.
172 let probe_stream = match connection.connection.flavor {
173 PostgresFlavor::Vanilla => Some(probe_stream),
174 PostgresFlavor::Yugabyte => None,
175 };
176
177 let mut rewind_input =
178 builder.new_disconnected_input(rewind_stream, Exchange::new(move |_| slot_reader));
179 let mut slot_ready_input = builder.new_disconnected_input(slot_ready_stream, Pipeline);
180 let mut output_uppers = table_info
181 .iter()
182 .flat_map(|(_, outputs)| outputs.values().map(|o| o.resume_upper.clone()))
183 .collect::<Vec<_>>();
184 metrics.tables.set(u64::cast_from(output_uppers.len()));
185
186 // Include the upper of the main source output for use in calculating the initial
187 // resume upper.
188 output_uppers.push(Antichain::from_iter(
189 config
190 .source_resume_uppers
191 .get(&config.id)
192 .expect("id exists")
193 .iter()
194 .map(MzOffset::decode_row),
195 ));
196
197 let reader_table_info = table_info.clone();
198 let (button, transient_errors) = builder.build_fallible(move |caps| {
199 let table_info = reader_table_info;
200 let busy_signal = Arc::clone(&config.busy_signal);
201 Box::pin(SignaledFuture::new(busy_signal, async move {
202 let (id, worker_id) = (config.id, config.worker_id);
203 let [
204 data_cap_set,
205 upper_cap_set,
206 definite_error_cap_set,
207 stats_cap,
208 probe_cap,
209 ]: &mut [_; 5] = caps.try_into().unwrap();
210
211 if !config.responsible_for("slot") {
212 // Emit 0, to mark this worker as having started up correctly.
213 stats_output.give(
214 &stats_cap[0],
215 ProgressStatisticsUpdate::SteadyState {
216 offset_known: 0,
217 offset_committed: 0,
218 },
219 );
220 return Ok(());
221 }
222
223 // Determine the slot lsn.
224 let connection_config = connection
225 .connection
226 .config(
227 &config.config.connection_context.secrets_reader,
228 &config.config,
229 InTask::Yes,
230 )
231 .await?;
232
233 let slot = &connection.publication_details.slot;
234 let replication_client = connection_config
235 .connect_replication(&config.config.connection_context.ssh_tunnel_manager)
236 .await?;
237
238 let metadata_client = connection_config
239 .connect(
240 "replication metadata",
241 &config.config.connection_context.ssh_tunnel_manager,
242 )
243 .await?;
244 let metadata_client = Arc::new(metadata_client);
245
246 while let Some(_) = slot_ready_input.next().await {
247 // Wait for the slot to be created
248 }
249 tracing::info!(%id, "ensuring replication slot {slot} exists");
250 super::ensure_replication_slot(&replication_client, slot).await?;
251 let slot_metadata = super::fetch_slot_metadata(
252 &*metadata_client,
253 slot,
254 mz_storage_types::dyncfgs::PG_FETCH_SLOT_RESUME_LSN_INTERVAL
255 .get(config.config.config_set()),
256 )
257 .await?;
258
259 // We're the only application that should be using this replication
260 // slot. The only way that there can be another connection using
261 // this slot under normal operation is if there's a stale TCP
262 // connection from a prior incarnation of the source holding on to
263 // the slot. We don't want to wait for the WAL sender timeout and/or
264 // TCP keepalives to time out that connection, because these values
265 // are generally under the control of the DBA and may not time out
266 // the connection for multiple minutes, or at all. Instead we just
267 // force kill the connection that's using the slot.
268 //
269 // Note that there's a small risk that *we're* the zombie cluster
270 // that should not be using the replication slot. Kubernetes cannot
271 // 100% guarantee that only one cluster is alive at a time. However,
272 // this situation should not last long, and the worst that can
273 // happen is a bit of transient thrashing over ownership of the
274 // replication slot.
275 if let Some(active_pid) = slot_metadata.active_pid {
276 tracing::warn!(
277 %id, %active_pid,
278 "replication slot already in use; will attempt to kill existing connection",
279 );
280
281 match metadata_client
282 .execute("SELECT pg_terminate_backend($1)", &[&active_pid])
283 .await
284 {
285 Ok(_) => {
286 tracing::info!(
287 "successfully killed existing connection; \
288 starting replication is likely to succeed"
289 );
290 // Note that `pg_terminate_backend` does not wait for
291 // the termination of the targeted connection to
292 // complete. We may try to start replication before the
293 // targeted connection has cleaned up its state. That's
294 // okay. If that happens we'll just try again from the
295 // top via the suspend-and-restart flow.
296 }
297 Err(e) => {
298 tracing::warn!(
299 %e,
300 "failed to kill existing replication connection; \
301 replication will likely fail to start"
302 );
303 // Continue on anyway, just in case the replication slot
304 // is actually available. Maybe PostgreSQL has some
305 // staleness when it reports `active_pid`, for example.
306 }
307 }
308 }
309
310 // The overall resumption point for this source is the minimum of the resumption points
311 // contributed by each of the outputs.
312 let resume_lsn = output_uppers
313 .iter()
314 .flat_map(|f| f.elements())
315 .map(|&lsn| {
316 // An output is either an output that has never had data committed to it or one
317 // that has and needs to resume. We differentiate between the two by checking
318 // whether an output wishes to "resume" from the minimum timestamp. In that case
319 // its contribution to the overal resumption point is the earliest point available
320 // in the slot. This information would normally be something that the storage
321 // controller figures out in the form of an as-of frontier, but at the moment the
322 // storage controller does not have visibility into what the replication slot is
323 // doing.
324 if lsn == MzOffset::from(0) {
325 slot_metadata.confirmed_flush_lsn
326 } else {
327 lsn
328 }
329 })
330 .min();
331 let Some(resume_lsn) = resume_lsn else {
332 return Ok(());
333 };
334 upper_cap_set.downgrade([&resume_lsn]);
335 trace!(%id, "timely-{worker_id} replication reader started lsn={resume_lsn}");
336
337 // Emitting an initial probe before we start waiting for rewinds ensures that we will
338 // have a timestamp binding in the remap collection while the snapshot is processed.
339 // This is important because otherwise the snapshot updates would need to be buffered
340 // in the reclock operator, instead of being spilled to S3 in the persist sink.
341 //
342 // Note that we need to fetch the probe LSN _after_ having created the replication
343 // slot, to make sure the fetched LSN will be included in the replication stream.
344 let probe_ts = (config.now_fn)().into();
345 let max_lsn = super::fetch_max_lsn(&*metadata_client).await?;
346 let probe = Probe {
347 probe_ts,
348 upstream_frontier: Antichain::from_elem(max_lsn),
349 };
350 probe_output.give(&probe_cap[0], probe);
351
352 let mut rewinds = BTreeMap::new();
353 while let Some(event) = rewind_input.next().await {
354 if let AsyncEvent::Data(_, data) = event {
355 for req in data {
356 if resume_lsn > req.snapshot_lsn + 1 {
357 let err = DefiniteError::SlotCompactedPastResumePoint(
358 req.snapshot_lsn + 1,
359 resume_lsn,
360 );
361 // If the replication stream cannot be obtained from the resume point there is nothing
362 // else to do. These errors are not retractable.
363 for (oid, outputs) in table_info.iter() {
364 for output_index in outputs.keys() {
365 // We pick `u64::MAX` as the LSN which will (in practice) never conflict
366 // any previously revealed portions of the TVC.
367 let update = (
368 (
369 *oid,
370 *output_index,
371 Err(DataflowError::from(err.clone())),
372 ),
373 MzOffset::from(u64::MAX),
374 Diff::ONE,
375 );
376 data_output.give_fueled(&data_cap_set[0], update).await;
377 }
378 }
379 definite_error_handle.give(
380 &definite_error_cap_set[0],
381 ReplicationError::Definite(Rc::new(err)),
382 );
383 return Ok(());
384 }
385 rewinds.insert(req.output_index, req);
386 }
387 }
388 }
389 trace!(%id, "timely-{worker_id} pending rewinds {rewinds:?}");
390
391 let mut committed_uppers = pin!(committed_uppers);
392
393 let stream_result = raw_stream(
394 &config,
395 replication_client,
396 Arc::clone(&metadata_client),
397 &connection.publication_details.slot,
398 &connection.publication_details.timeline_id,
399 &connection.publication,
400 resume_lsn,
401 committed_uppers.as_mut(),
402 &stats_output,
403 &stats_cap[0],
404 &probe_output,
405 &probe_cap[0],
406 )
407 .await?;
408
409 let stream = match stream_result {
410 Ok(stream) => stream,
411 Err(err) => {
412 // If the replication stream cannot be obtained in a definite way there is
413 // nothing else to do. These errors are not retractable.
414 for (oid, outputs) in table_info.iter() {
415 for output_index in outputs.keys() {
416 // We pick `u64::MAX` as the LSN which will (in practice) never conflict
417 // any previously revealed portions of the TVC.
418 let update = (
419 (*oid, *output_index, Err(DataflowError::from(err.clone()))),
420 MzOffset::from(u64::MAX),
421 Diff::ONE,
422 );
423 data_output.give_fueled(&data_cap_set[0], update).await;
424 }
425 }
426
427 definite_error_handle.give(
428 &definite_error_cap_set[0],
429 ReplicationError::Definite(Rc::new(err)),
430 );
431 return Ok(());
432 }
433 };
434 let mut stream = pin!(stream.peekable());
435
436 // Run the periodic schema validation on a separate task using a separate client,
437 // to prevent it from blocking the replication reading progress.
438 let ssh_tunnel_manager = &config.config.connection_context.ssh_tunnel_manager;
439 let client = connection_config
440 .connect("schema validation", ssh_tunnel_manager)
441 .await?;
442 let mut schema_errors = spawn_schema_validator(
443 client,
444 &config,
445 connection.publication.clone(),
446 table_info.clone(),
447 );
448
449 let mut errored = HashSet::new();
450 // Instead of downgrading the capability for every transaction we process we only do it
451 // if we're about to yield, which is checked at the bottom of the loop. This avoids
452 // creating excessive progress tracking traffic when there are multiple small
453 // transactions ready to go.
454 let mut data_upper = resume_lsn;
455 // A stash of reusable vectors to convert from bytes::Bytes based data, which is not
456 // compatible with `columnation`, to Vec<u8> data that is.
457 while let Some(event) = stream.as_mut().next().await {
458 use LogicalReplicationMessage::*;
459 use ReplicationMessage::*;
460 match event {
461 Ok(XLogData(data)) => match data.data() {
462 Begin(begin) => {
463 let commit_lsn = MzOffset::from(begin.final_lsn());
464
465 let mut tx = pin!(extract_transaction(
466 stream.by_ref(),
467 &*metadata_client,
468 commit_lsn,
469 &table_info,
470 &metrics,
471 &connection.publication,
472 &mut errored
473 ));
474
475 trace!(
476 %id,
477 "timely-{worker_id} extracting transaction \
478 at {commit_lsn}"
479 );
480 assert!(
481 data_upper <= commit_lsn,
482 "new_upper={data_upper} tx_lsn={commit_lsn}",
483 );
484 data_upper = commit_lsn + 1;
485 // We are about to ingest a transaction which has the possiblity to be
486 // very big and we certainly don't want to hold the data in memory. For
487 // this reason we eagerly downgrade the upper capability in order for
488 // the reclocking machinery to mint a binding that includes
489 // this transaction and therefore be able to pass the data of the
490 // transaction through as we stream it.
491 upper_cap_set.downgrade([&data_upper]);
492 while let Some((oid, output_index, event, diff)) = tx.try_next().await?
493 {
494 let event = event.map_err(Into::into);
495 let mut data = (oid, output_index, event);
496 if let Some(req) = rewinds.get(&output_index) {
497 if commit_lsn <= req.snapshot_lsn {
498 let update = (data, MzOffset::from(0), -diff);
499 data_output.give_fueled(&data_cap_set[0], &update).await;
500 data = update.0;
501 }
502 }
503 let update = (data, commit_lsn, diff);
504 data_output.give_fueled(&data_cap_set[0], &update).await;
505 }
506 }
507 _ => return Err(TransientError::BareTransactionEvent),
508 },
509 Ok(PrimaryKeepAlive(keepalive)) => {
510 trace!( %id,
511 "timely-{worker_id} received keepalive lsn={}",
512 keepalive.wal_end()
513 );
514
515 // Take the opportunity to report any schema validation errors.
516 while let Ok(error) = schema_errors.try_recv() {
517 use SchemaValidationError::*;
518 match error {
519 Postgres(PostgresError::PublicationMissing(publication)) => {
520 let err = DefiniteError::PublicationDropped(publication);
521 for (oid, outputs) in table_info.iter() {
522 for output_index in outputs.keys() {
523 let update = (
524 (
525 *oid,
526 *output_index,
527 Err(DataflowError::from(err.clone())),
528 ),
529 data_cap_set[0].time().clone(),
530 Diff::ONE,
531 );
532 data_output.give_fueled(&data_cap_set[0], update).await;
533 }
534 }
535 definite_error_handle.give(
536 &definite_error_cap_set[0],
537 ReplicationError::Definite(Rc::new(err)),
538 );
539 return Ok(());
540 }
541 Postgres(pg_error) => Err(TransientError::from(pg_error))?,
542 Schema {
543 oid,
544 output_index,
545 error,
546 } => {
547 if errored.contains(&output_index) {
548 continue;
549 }
550
551 let update = (
552 (oid, output_index, Err(error.into())),
553 data_cap_set[0].time().clone(),
554 Diff::ONE,
555 );
556 data_output.give_fueled(&data_cap_set[0], update).await;
557 errored.insert(output_index);
558 }
559 }
560 }
561 data_upper = std::cmp::max(data_upper, keepalive.wal_end().into());
562 }
563 Ok(_) => return Err(TransientError::UnknownReplicationMessage),
564 Err(err) => return Err(err),
565 }
566
567 let will_yield = stream.as_mut().peek().now_or_never().is_none();
568 if will_yield {
569 trace!(%id, "timely-{worker_id} yielding at lsn={data_upper}");
570 rewinds.retain(|_, req| data_upper <= req.snapshot_lsn);
571 // As long as there are pending rewinds we can't downgrade our data capability
572 // since we must be able to produce data at offset 0.
573 if rewinds.is_empty() {
574 data_cap_set.downgrade([&data_upper]);
575 }
576 upper_cap_set.downgrade([&data_upper]);
577 }
578 }
579 // We never expect the replication stream to gracefully end
580 Err(TransientError::ReplicationEOF)
581 }))
582 });
583
584 // Distribute the raw slot data to all workers.
585 let data_stream = data_stream.map::<Vec<_>, _, _>(Clone::clone).distribute();
586
587 // We now process the slot updates and apply the cast expressions
588 let mut final_row = Row::default();
589 let mut datum_vec = DatumVec::new();
590 let replication_updates = data_stream
591 .map(move |((oid, output_index, event), time, diff)| {
592 let output = &table_info
593 .get(&oid)
594 .and_then(|outputs| outputs.get(&output_index))
595 .expect("table_info contains all outputs");
596 let event = event.and_then(|row| {
597 let datums = datum_vec.borrow_with(&row);
598 super::cast_row(&output.casts, &datums, &mut final_row)?;
599 Ok(SourceMessage {
600 key: Row::default(),
601 value: final_row.clone(),
602 metadata: Row::default(),
603 })
604 });
605
606 ((output_index, event), time, diff)
607 })
608 .as_collection();
609
610 let errors = definite_errors.concat(&transient_errors.map(ReplicationError::from));
611
612 (
613 replication_updates,
614 upper_stream,
615 stats_stream,
616 probe_stream,
617 errors,
618 button.press_on_drop(),
619 )
620}
621
622/// Produces the logical replication stream while taking care of regularly sending standby
623/// keepalive messages with the provided `uppers` stream.
624///
625/// The returned stream will contain all transactions that whose commit LSN is beyond `resume_lsn`.
626async fn raw_stream<'a>(
627 config: &'a RawSourceCreationConfig,
628 replication_client: Client,
629 metadata_client: Arc<Client>,
630 slot: &'a str,
631 timeline_id: &'a Option<u64>,
632 publication: &'a str,
633 resume_lsn: MzOffset,
634 uppers: impl futures::Stream<Item = Antichain<MzOffset>> + 'a,
635 stats_output: &'a AsyncOutputHandle<
636 MzOffset,
637 CapacityContainerBuilder<Vec<ProgressStatisticsUpdate>>,
638 Tee<MzOffset, Vec<ProgressStatisticsUpdate>>,
639 >,
640 stats_cap: &'a Capability<MzOffset>,
641 probe_output: &'a AsyncOutputHandle<
642 MzOffset,
643 CapacityContainerBuilder<Vec<Probe<MzOffset>>>,
644 Tee<MzOffset, Vec<Probe<MzOffset>>>,
645 >,
646 probe_cap: &'a Capability<MzOffset>,
647) -> Result<
648 Result<
649 impl AsyncStream<Item = Result<ReplicationMessage<LogicalReplicationMessage>, TransientError>>
650 + 'a,
651 DefiniteError,
652 >,
653 TransientError,
654> {
655 if let Err(err) = ensure_publication_exists(&*metadata_client, publication).await? {
656 // If the publication gets deleted there is nothing else to do. These errors
657 // are not retractable.
658 return Ok(Err(err));
659 }
660
661 // Skip the timeline ID check for sources without a known timeline ID
662 // (sources created before the timeline ID was added to the source details)
663 if let Some(expected_timeline_id) = timeline_id {
664 if let Err(err) =
665 ensure_replication_timeline_id(&replication_client, expected_timeline_id).await?
666 {
667 return Ok(Err(err));
668 }
669 }
670
671 // How often a proactive standby status update message should be sent to the server.
672 //
673 // The upstream will periodically request status updates by setting the keepalive's reply field
674 // value to 1. However, we cannot rely on these messages arriving on time. For example, when
675 // the upstream is sending a big transaction its keepalive messages are queued and can be
676 // delayed arbitrarily.
677 //
678 // See: <https://www.postgresql.org/message-id/CAMsr+YE2dSfHVr7iEv1GSPZihitWX-PMkD9QALEGcTYa+sdsgg@mail.gmail.com>
679 //
680 // For this reason we query the server's timeout value and proactively send a keepalive at
681 // twice the frequency to have a healthy margin from the deadline.
682 //
683 // Note: We must use the metadata client here which is NOT in replication mode. Some Aurora
684 // Postgres versions disallow SHOW commands from within replication connection.
685 // See: https://github.com/readysettech/readyset/discussions/28#discussioncomment-4405671
686 let row = simple_query_opt(&*metadata_client, "SHOW wal_sender_timeout;")
687 .await?
688 .unwrap();
689 let wal_sender_timeout = match row.get("wal_sender_timeout") {
690 // When this parameter is zero the timeout mechanism is disabled
691 Some("0") => None,
692 Some(value) => Some(
693 mz_repr::adt::interval::Interval::from_str(value)
694 .unwrap()
695 .duration()
696 .unwrap(),
697 ),
698 None => panic!("ubiquitous parameter missing"),
699 };
700
701 // This interval controls the cadence at which we send back status updates and, crucially,
702 // request PrimaryKeepAlive messages. PrimaryKeepAlive messages drive the frontier forward in
703 // the absence of data updates and we don't want a large `wal_sender_timeout` value to slow us
704 // down. For this reason the feedback interval is set to one second, or less if the
705 // wal_sender_timeout is less than 2 seconds.
706 let feedback_interval = match wal_sender_timeout {
707 Some(t) => std::cmp::min(Duration::from_secs(1), t.checked_div(2).unwrap()),
708 None => Duration::from_secs(1),
709 };
710
711 let mut feedback_timer = tokio::time::interval(feedback_interval);
712 // 'Delay' ensures we always tick at least 'feedback_interval'.
713 feedback_timer.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
714
715 // Postgres will return all transactions that commit *at or after* after the provided LSN,
716 // following the timely upper semantics.
717 let lsn = PgLsn::from(resume_lsn.offset);
718 let query = format!(
719 r#"START_REPLICATION SLOT "{}" LOGICAL {} ("proto_version" '1', "publication_names" '{}')"#,
720 Ident::new_unchecked(slot).to_ast_string_simple(),
721 lsn,
722 publication,
723 );
724 let copy_stream = match replication_client.copy_both_simple(&query).await {
725 Ok(copy_stream) => copy_stream,
726 Err(err) if err.code() == Some(&SqlState::OBJECT_NOT_IN_PREREQUISITE_STATE) => {
727 return Ok(Err(DefiniteError::InvalidReplicationSlot));
728 }
729 Err(err) => return Err(err.into()),
730 };
731
732 // According to the documentation [1] we must check that the slot LSN matches our
733 // expectations otherwise we risk getting silently fast-forwarded to a future LSN. In order
734 // to avoid a TOCTOU issue we must do this check after starting the replication stream. We
735 // cannot use the replication client to do that because it's already in CopyBoth mode.
736 // [1] https://www.postgresql.org/docs/15/protocol-replication.html#PROTOCOL-REPLICATION-START-REPLICATION-SLOT-LOGICAL
737 let slot_metadata = super::fetch_slot_metadata(
738 &*metadata_client,
739 slot,
740 mz_storage_types::dyncfgs::PG_FETCH_SLOT_RESUME_LSN_INTERVAL
741 .get(config.config.config_set()),
742 )
743 .await?;
744 let min_resume_lsn = slot_metadata.confirmed_flush_lsn;
745 tracing::info!(
746 %config.id,
747 "started replication using backend PID={:?}. wal_sender_timeout={:?}",
748 slot_metadata.active_pid, wal_sender_timeout
749 );
750
751 let (probe_tx, mut probe_rx) = watch::channel(None);
752 let config_set = Arc::clone(config.config.config_set());
753 let now_fn = config.now_fn.clone();
754 let max_lsn_task_handle =
755 mz_ore::task::spawn(|| format!("pg_current_wal_lsn:{}", config.id), async move {
756 let mut probe_ticker =
757 probe::Ticker::new(|| PG_OFFSET_KNOWN_INTERVAL.get(&config_set), now_fn);
758
759 while !probe_tx.is_closed() {
760 let probe_ts = probe_ticker.tick().await;
761 let probe_or_err = super::fetch_max_lsn(&*metadata_client)
762 .await
763 .map(|lsn| Probe {
764 probe_ts,
765 upstream_frontier: Antichain::from_elem(lsn),
766 });
767 let _ = probe_tx.send(Some(probe_or_err));
768 }
769 })
770 .abort_on_drop();
771
772 let stream = async_stream::try_stream!({
773 // Ensure we don't pre-drop the task
774 let _max_lsn_task_handle = max_lsn_task_handle;
775
776 let mut uppers = pin!(uppers);
777 let mut last_committed_upper = resume_lsn;
778
779 let mut stream = pin!(LogicalReplicationStream::new(copy_stream));
780
781 if !(resume_lsn == MzOffset::from(0) || min_resume_lsn <= resume_lsn) {
782 let err = TransientError::OvercompactedReplicationSlot {
783 available_lsn: min_resume_lsn,
784 requested_lsn: resume_lsn,
785 };
786 error!("timely-{} ({}) {err}", config.worker_id, config.id);
787 Err(err)?;
788 }
789
790 loop {
791 tokio::select! {
792 Some(next_message) = stream.next() => match next_message {
793 Ok(ReplicationMessage::XLogData(data)) => {
794 yield ReplicationMessage::XLogData(data);
795 Ok(())
796 }
797 Ok(ReplicationMessage::PrimaryKeepAlive(keepalive)) => {
798 yield ReplicationMessage::PrimaryKeepAlive(keepalive);
799 Ok(())
800 }
801 Err(err) => Err(err.into()),
802 _ => Err(TransientError::UnknownReplicationMessage),
803 },
804 _ = feedback_timer.tick() => {
805 let ts: i64 = PG_EPOCH.elapsed().unwrap().as_micros().try_into().unwrap();
806 let lsn = PgLsn::from(last_committed_upper.offset);
807 trace!("timely-{} ({}) sending keepalive {lsn:?}", config.worker_id, config.id);
808 // Postgres only sends PrimaryKeepAlive messages when *it* wants a reply, which
809 // happens when out status update is late. Since we send them proactively this
810 // may never happen. It is therefore *crucial* that we set the last parameter
811 // (the reply flag) to 1 here. This will cause the upstream server to send us a
812 // PrimaryKeepAlive message promptly which will give us frontier advancement
813 // information in the absence of data updates.
814 let res = stream.as_mut().standby_status_update(lsn, lsn, lsn, ts, 1).await;
815 res.map_err(|e| e.into())
816 },
817 Some(upper) = uppers.next() => match upper.into_option() {
818 Some(lsn) => {
819 last_committed_upper = std::cmp::max(last_committed_upper, lsn);
820 Ok(())
821 }
822 None => Ok(()),
823 },
824 Ok(()) = probe_rx.changed() => match &*probe_rx.borrow() {
825 Some(Ok(probe)) => {
826 if let Some(offset_known) = probe.upstream_frontier.as_option() {
827 stats_output.give(
828 stats_cap,
829 ProgressStatisticsUpdate::SteadyState {
830 // Similar to the kafka source, we don't subtract 1 from the
831 // upper as we want to report the _number of bytes_ we have
832 // processed/in upstream.
833 offset_known: offset_known.offset,
834 offset_committed: last_committed_upper.offset,
835 },
836 );
837 }
838 probe_output.give(probe_cap, probe.clone());
839 Ok(())
840 },
841 Some(Err(err)) => Err(anyhow::anyhow!("{err}").into()),
842 None => Ok(()),
843 },
844 else => return
845 }?;
846 }
847 });
848 Ok(Ok(stream))
849}
850
851/// Extracts a single transaction from the replication stream delimited by a BEGIN and COMMIT
852/// message. The BEGIN message must have already been consumed from the stream before calling this
853/// function.
854fn extract_transaction<'a>(
855 stream: impl AsyncStream<
856 Item = Result<ReplicationMessage<LogicalReplicationMessage>, TransientError>,
857 > + 'a,
858 metadata_client: &'a Client,
859 commit_lsn: MzOffset,
860 table_info: &'a BTreeMap<u32, BTreeMap<usize, SourceOutputInfo>>,
861 metrics: &'a PgSourceMetrics,
862 publication: &'a str,
863 errored_outputs: &'a mut HashSet<usize>,
864) -> impl AsyncStream<Item = Result<(u32, usize, Result<Row, DefiniteError>, Diff), TransientError>> + 'a
865{
866 use LogicalReplicationMessage::*;
867 let mut row = Row::default();
868 async_stream::try_stream!({
869 let mut stream = pin!(stream);
870 metrics.transactions.inc();
871 metrics.lsn.set(commit_lsn.offset);
872 while let Some(event) = stream.try_next().await? {
873 // We can ignore keepalive messages while processing a transaction because the
874 // commit_lsn will drive progress.
875 let message = match event {
876 ReplicationMessage::XLogData(data) => data.into_data(),
877 ReplicationMessage::PrimaryKeepAlive(_) => continue,
878 _ => Err(TransientError::UnknownReplicationMessage)?,
879 };
880 metrics.total.inc();
881 match message {
882 Insert(body) if !table_info.contains_key(&body.rel_id()) => continue,
883 Update(body) if !table_info.contains_key(&body.rel_id()) => continue,
884 Delete(body) if !table_info.contains_key(&body.rel_id()) => continue,
885 Relation(body) if !table_info.contains_key(&body.rel_id()) => continue,
886 Insert(body) => {
887 metrics.inserts.inc();
888 let row = unpack_tuple(body.tuple().tuple_data(), &mut row);
889 let rel = body.rel_id();
890 for ((output, _), row) in table_info
891 .get(&rel)
892 .map(|o| o.iter().filter(|(o, _)| !errored_outputs.contains(o)))
893 .into_iter()
894 .flatten()
895 .repeat_clone(row)
896 {
897 yield (rel, *output, row, Diff::ONE);
898 }
899 }
900 Update(body) => match body.old_tuple() {
901 Some(old_tuple) => {
902 metrics.updates.inc();
903 // If the new tuple contains unchanged toast values we reference the old ones
904 let new_tuple =
905 std::iter::zip(body.new_tuple().tuple_data(), old_tuple.tuple_data())
906 .map(|(new, old)| match new {
907 TupleData::UnchangedToast => old,
908 _ => new,
909 });
910 let old_row = unpack_tuple(old_tuple.tuple_data(), &mut row);
911 let new_row = unpack_tuple(new_tuple, &mut row);
912 let rel = body.rel_id();
913 for ((output, _), (old_row, new_row)) in table_info
914 .get(&rel)
915 .map(|o| o.iter().filter(|(o, _)| !errored_outputs.contains(o)))
916 .into_iter()
917 .flatten()
918 .repeat_clone((old_row, new_row))
919 {
920 yield (rel, *output, old_row, Diff::MINUS_ONE);
921 yield (rel, *output, new_row, Diff::ONE);
922 }
923 }
924 None => {
925 let rel = body.rel_id();
926 for (output, _) in table_info
927 .get(&rel)
928 .map(|o| o.iter().filter(|(o, _)| !errored_outputs.contains(o)))
929 .into_iter()
930 .flatten()
931 {
932 yield (
933 rel,
934 *output,
935 Err(DefiniteError::DefaultReplicaIdentity),
936 Diff::ONE,
937 );
938 }
939 }
940 },
941 Delete(body) => match body.old_tuple() {
942 Some(old_tuple) => {
943 metrics.deletes.inc();
944 let row = unpack_tuple(old_tuple.tuple_data(), &mut row);
945 let rel = body.rel_id();
946 for ((output, _), row) in table_info
947 .get(&rel)
948 .map(|o| o.iter().filter(|(o, _)| !errored_outputs.contains(o)))
949 .into_iter()
950 .flatten()
951 .repeat_clone(row)
952 {
953 yield (rel, *output, row, Diff::MINUS_ONE);
954 }
955 }
956 None => {
957 let rel = body.rel_id();
958 for (output, _) in table_info
959 .get(&rel)
960 .map(|o| o.iter().filter(|(o, _)| !errored_outputs.contains(o)))
961 .into_iter()
962 .flatten()
963 {
964 yield (
965 rel,
966 *output,
967 Err(DefiniteError::DefaultReplicaIdentity),
968 Diff::ONE,
969 );
970 }
971 }
972 },
973 Relation(body) => {
974 let rel_id = body.rel_id();
975 let valid_outputs = table_info
976 .get(&rel_id)
977 .map(|o| o.iter().filter(|(o, _)| !errored_outputs.contains(o)))
978 .into_iter()
979 .flatten()
980 .collect::<Vec<_>>();
981 if valid_outputs.len() > 0 {
982 // Because the replication stream doesn't include columns' attnums, we need
983 // to check the current local schema against the current remote schema to
984 // ensure e.g. we haven't received a schema update with the same terminal
985 // column name which is actually a different column.
986 let oids = std::iter::once(rel_id)
987 .chain(table_info.keys().copied())
988 .collect::<Vec<_>>();
989 let upstream_info = mz_postgres_util::publication_info(
990 metadata_client,
991 publication,
992 Some(&oids),
993 )
994 .await?;
995
996 for (output_index, output) in valid_outputs {
997 if let Err(err) =
998 verify_schema(rel_id, &output.desc, &upstream_info, &output.casts)
999 {
1000 errored_outputs.insert(*output_index);
1001 yield (rel_id, *output_index, Err(err), Diff::ONE);
1002 }
1003
1004 // Error any dropped tables.
1005 for (oid, outputs) in table_info {
1006 if !upstream_info.contains_key(oid) {
1007 for output in outputs.keys() {
1008 if errored_outputs.insert(*output) {
1009 // Minimize the number of excessive errors
1010 // this will generate.
1011 yield (
1012 *oid,
1013 *output,
1014 Err(DefiniteError::TableDropped),
1015 Diff::ONE,
1016 );
1017 }
1018 }
1019 }
1020 }
1021 }
1022 }
1023 }
1024 Truncate(body) => {
1025 for &rel_id in body.rel_ids() {
1026 if let Some(outputs) = table_info.get(&rel_id) {
1027 for (output, _) in outputs {
1028 if errored_outputs.insert(*output) {
1029 yield (
1030 rel_id,
1031 *output,
1032 Err(DefiniteError::TableTruncated),
1033 Diff::ONE,
1034 );
1035 }
1036 }
1037 }
1038 }
1039 }
1040 Commit(body) => {
1041 if commit_lsn != body.commit_lsn().into() {
1042 Err(TransientError::InvalidTransaction)?
1043 }
1044 return;
1045 }
1046 // TODO: We should handle origin messages and emit an error as they indicate that
1047 // the upstream performed a point in time restore so all bets are off about the
1048 // continuity of the stream.
1049 Origin(_) | Type(_) => metrics.ignored.inc(),
1050 Begin(_) => Err(TransientError::NestedTransaction)?,
1051 // The enum is marked as non_exhaustive. Better to be conservative
1052 _ => Err(TransientError::UnknownLogicalReplicationMessage)?,
1053 }
1054 }
1055 Err(TransientError::ReplicationEOF)?;
1056 })
1057}
1058
1059/// Unpacks an iterator of TupleData into a list of nullable bytes or an error if this can't be
1060/// done.
1061#[inline]
1062fn unpack_tuple<'a, I>(tuple_data: I, row: &mut Row) -> Result<Row, DefiniteError>
1063where
1064 I: IntoIterator<Item = &'a TupleData>,
1065 I::IntoIter: ExactSizeIterator,
1066{
1067 let iter = tuple_data.into_iter();
1068 let mut packer = row.packer();
1069 for data in iter {
1070 let datum = match data {
1071 TupleData::Text(bytes) => super::decode_utf8_text(bytes)?,
1072 TupleData::Null => Datum::Null,
1073 TupleData::UnchangedToast => return Err(DefiniteError::MissingToast),
1074 TupleData::Binary(_) => return Err(DefiniteError::UnexpectedBinaryData),
1075 };
1076 packer.push(datum);
1077 }
1078 Ok(row.clone())
1079}
1080
1081/// Ensures the publication exists on the server. It returns an outer transient error in case of
1082/// connection issues and an inner definite error if the publication is dropped.
1083async fn ensure_publication_exists(
1084 client: &Client,
1085 publication: &str,
1086) -> Result<Result<(), DefiniteError>, TransientError> {
1087 // Figure out the last written LSN and then add one to convert it into an upper.
1088 let result = client
1089 .query_opt(
1090 "SELECT 1 FROM pg_publication WHERE pubname = $1;",
1091 &[&publication],
1092 )
1093 .await?;
1094 match result {
1095 Some(_) => Ok(Ok(())),
1096 None => Ok(Err(DefiniteError::PublicationDropped(
1097 publication.to_owned(),
1098 ))),
1099 }
1100}
1101
1102/// Ensure the active replication timeline_id matches the one we expect such that we can safely
1103/// resume replication. It returns an outer transient error in case of
1104/// connection issues and an inner definite error if the timeline id does not match.
1105async fn ensure_replication_timeline_id(
1106 replication_client: &Client,
1107 expected_timeline_id: &u64,
1108) -> Result<Result<(), DefiniteError>, TransientError> {
1109 let timeline_id = mz_postgres_util::get_timeline_id(replication_client).await?;
1110 if timeline_id == *expected_timeline_id {
1111 Ok(Ok(()))
1112 } else {
1113 Ok(Err(DefiniteError::InvalidTimelineId {
1114 expected: *expected_timeline_id,
1115 actual: timeline_id,
1116 }))
1117 }
1118}
1119
1120enum SchemaValidationError {
1121 Postgres(PostgresError),
1122 Schema {
1123 oid: u32,
1124 output_index: usize,
1125 error: DefiniteError,
1126 },
1127}
1128
1129fn spawn_schema_validator(
1130 client: Client,
1131 config: &RawSourceCreationConfig,
1132 publication: String,
1133 table_info: BTreeMap<u32, BTreeMap<usize, SourceOutputInfo>>,
1134) -> mpsc::UnboundedReceiver<SchemaValidationError> {
1135 let (tx, rx) = mpsc::unbounded_channel();
1136 let source_id = config.id;
1137 let config_set = Arc::clone(config.config.config_set());
1138
1139 mz_ore::task::spawn(|| format!("schema-validator:{}", source_id), async move {
1140 while !tx.is_closed() {
1141 trace!(%source_id, "validating schemas");
1142
1143 let validation_start = Instant::now();
1144
1145 let upstream_info = match mz_postgres_util::publication_info(
1146 &*client,
1147 &publication,
1148 Some(&table_info.keys().copied().collect::<Vec<_>>()),
1149 )
1150 .await
1151 {
1152 Ok(info) => info,
1153 Err(error) => {
1154 let _ = tx.send(SchemaValidationError::Postgres(error));
1155 continue;
1156 }
1157 };
1158
1159 for (&oid, outputs) in table_info.iter() {
1160 for (&output_index, output_info) in outputs {
1161 let expected_desc = &output_info.desc;
1162 let casts = &output_info.casts;
1163 if let Err(error) = verify_schema(oid, expected_desc, &upstream_info, casts) {
1164 trace!(
1165 %source_id,
1166 "schema of output index {output_index} for oid {oid} invalid",
1167 );
1168 let _ = tx.send(SchemaValidationError::Schema {
1169 oid,
1170 output_index,
1171 error,
1172 });
1173 } else {
1174 trace!(
1175 %source_id,
1176 "schema of output index {output_index} for oid {oid} valid",
1177 );
1178 }
1179 }
1180 }
1181
1182 let interval = PG_SCHEMA_VALIDATION_INTERVAL.get(&config_set);
1183 let elapsed = validation_start.elapsed();
1184 let wait = interval.saturating_sub(elapsed);
1185 tokio::time::sleep(wait).await;
1186 }
1187 });
1188
1189 rx
1190}