mz_storage/source/postgres/replication.rs
1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Renders the logical replication side of the [`PostgresSourceConnection`] ingestion dataflow.
11//!
12//! ```text
13//! o
14//! │rewind
15//! │requests
16//! ╭───┴────╮
17//! │exchange│ (collect all requests to one worker)
18//! ╰───┬────╯
19//! ┏━━v━━━━━━━━━━┓
20//! ┃ replication ┃ (single worker)
21//! ┃ reader ┃
22//! ┗━┯━━━━━━━━┯━━┛
23//! │raw │
24//! │data │
25//! ╭────┴─────╮ │
26//! │distribute│ │ (distribute to all workers)
27//! ╰────┬─────╯ │
28//! ┏━━━━━━━━━━━┷━┓ │
29//! ┃ replication ┃ │ (parallel decode)
30//! ┃ decoder ┃ │
31//! ┗━━━━━┯━━━━━━━┛ │
32//! │ replication │ progress
33//! │ updates │ output
34//! v v
35//! ```
36//!
37//! # Progress tracking
38//!
39//! In order to avoid causing excessive resource usage in the upstream server it's important to
40//! track the LSN that we have successfully committed to persist and communicate that back to
41//! PostgreSQL. Under normal operation this gauge of progress is provided by the presence of
42//! transactions themselves. Since at a given LSN offset there can be only a single message, when a
43//! transaction is received and processed we can infer that we have seen all the messages that are
44//! not beyond `commit_lsn + 1`.
45//!
46//! Things are a bit more complicated in the absence of transactions though because even though we
47//! don't receive any the server might very well be generating WAL records. This can happen if
48//! there is a separate logical database performing writes (which is the case for RDS databases),
49//! or, in servers running PostgreSQL version 15 or greater, the logical replication process
50//! includes an optimization that omits empty transactions, which can happen if you're only
51//! replicating a subset of the tables and there writes going to the other ones.
52//!
53//! If we fail to detect this situation and don't send LSN feedback in a timely manner the server
54//! will be forced to keep around WAL data that can eventually lead to disk space exhaustion.
55//!
56//! In the absence of transactions the only available piece of information in the replication
57//! stream are keepalive messages. Keepalive messages are documented[1] to contain the current end
58//! of WAL on the server. That is a useless number when it comes to progress tracking because there
59//! might be pending messages at LSNs between the last received commit_lsn and the current end of
60//! WAL.
61//!
62//! Fortunately for us, the documentation for PrimaryKeepalive messages is wrong and it actually
63//! contains the last *sent* LSN[2]. Here sent doesn't necessarily mean sent over the wire, but
64//! sent to the upstream process that is handling producing the logical stream. Therefore, if we
65//! receive a keepalive with a particular LSN we can be certain that there are no other replication
66//! messages at previous LSNs, because they would have been already generated and received. We
67//! therefore connect the keepalive messages directly to our capability.
68//!
69//! [1]: https://www.postgresql.org/docs/15/protocol-replication.html#PROTOCOL-REPLICATION-START-REPLICATION
70//! [2]: https://www.postgresql.org/message-id/CAFPTHDZS9O9WG02EfayBd6oONzK%2BqfUxS6AbVLJ7W%2BKECza2gg%40mail.gmail.com
71
72use std::collections::BTreeMap;
73use std::convert::Infallible;
74use std::pin::pin;
75use std::rc::Rc;
76use std::str::FromStr;
77use std::sync::Arc;
78use std::sync::LazyLock;
79use std::time::Instant;
80use std::time::{Duration, SystemTime, UNIX_EPOCH};
81
82use differential_dataflow::AsCollection;
83use futures::{FutureExt, Stream as AsyncStream, StreamExt, TryStreamExt};
84use mz_dyncfg::ConfigSet;
85use mz_ore::cast::CastFrom;
86use mz_ore::future::InTask;
87use mz_postgres_util::PostgresError;
88use mz_postgres_util::{Client, simple_query_opt};
89use mz_repr::{Datum, DatumVec, Diff, Row};
90use mz_sql_parser::ast::{Ident, display::AstDisplay};
91use mz_storage_types::dyncfgs::PG_SOURCE_VALIDATE_TIMELINE;
92use mz_storage_types::dyncfgs::{PG_OFFSET_KNOWN_INTERVAL, PG_SCHEMA_VALIDATION_INTERVAL};
93use mz_storage_types::errors::DataflowError;
94use mz_storage_types::sources::{MzOffset, PostgresSourceConnection};
95use mz_timely_util::builder_async::{
96 AsyncOutputHandle, Event as AsyncEvent, OperatorBuilder as AsyncOperatorBuilder,
97 PressOnDropButton,
98};
99use postgres_replication::LogicalReplicationStream;
100use postgres_replication::protocol::{LogicalReplicationMessage, ReplicationMessage, TupleData};
101use serde::{Deserialize, Serialize};
102use timely::container::CapacityContainerBuilder;
103use timely::dataflow::channels::pact::{Exchange, Pipeline};
104use timely::dataflow::operators::Capability;
105use timely::dataflow::operators::Concat;
106use timely::dataflow::operators::Operator;
107use timely::dataflow::operators::core::Map;
108use timely::dataflow::{Scope, Stream};
109use timely::progress::Antichain;
110use tokio::sync::{mpsc, watch};
111use tokio_postgres::error::SqlState;
112use tokio_postgres::types::PgLsn;
113use tracing::{error, trace};
114
115use crate::metrics::source::postgres::PgSourceMetrics;
116use crate::source::RawSourceCreationConfig;
117use crate::source::postgres::verify_schema;
118use crate::source::postgres::{DefiniteError, ReplicationError, SourceOutputInfo, TransientError};
119use crate::source::probe;
120use crate::source::types::{Probe, SignaledFuture, SourceMessage, StackedCollection};
121
122/// A logical replication message from the server.
123type LogicalReplMsg = ReplicationMessage<LogicalReplicationMessage>;
124
125/// A decoded row from a transaction with source information.
126type DecodedRow = (u32, usize, Result<Row, DefiniteError>, Diff);
127
128/// Postgres epoch is 2000-01-01T00:00:00Z
129static PG_EPOCH: LazyLock<SystemTime> =
130 LazyLock::new(|| UNIX_EPOCH + Duration::from_secs(946_684_800));
131
132// A request to rewind a snapshot taken at `snapshot_lsn` to the initial LSN of the replication
133// slot. This is accomplished by emitting `(data, 0, -diff)` for all updates `(data, lsn, diff)`
134// whose `lsn <= snapshot_lsn`. By convention the snapshot is always emitted at LSN 0.
135#[derive(Debug, Clone, Serialize, Deserialize)]
136pub(crate) struct RewindRequest {
137 /// The output index that should be rewound.
138 pub(crate) output_index: usize,
139 /// The LSN that the snapshot was taken at.
140 pub(crate) snapshot_lsn: MzOffset,
141}
142
143/// Renders the replication dataflow. See the module documentation for more information.
144pub(crate) fn render<G: Scope<Timestamp = MzOffset>>(
145 scope: G,
146 config: RawSourceCreationConfig,
147 connection: PostgresSourceConnection,
148 table_info: BTreeMap<u32, BTreeMap<usize, SourceOutputInfo>>,
149 rewind_stream: &Stream<G, RewindRequest>,
150 slot_ready_stream: &Stream<G, Infallible>,
151 committed_uppers: impl futures::Stream<Item = Antichain<MzOffset>> + 'static,
152 metrics: PgSourceMetrics,
153) -> (
154 StackedCollection<G, (usize, Result<SourceMessage, DataflowError>)>,
155 Stream<G, Infallible>,
156 Option<Stream<G, Probe<MzOffset>>>,
157 Stream<G, ReplicationError>,
158 PressOnDropButton,
159) {
160 let op_name = format!("ReplicationReader({})", config.id);
161 let mut builder = AsyncOperatorBuilder::new(op_name, scope.clone());
162
163 let slot_reader = u64::cast_from(config.responsible_worker("slot"));
164 let (data_output, data_stream) = builder.new_output();
165 let (_upper_output, upper_stream) = builder.new_output::<CapacityContainerBuilder<_>>();
166 let (definite_error_handle, definite_errors) =
167 builder.new_output::<CapacityContainerBuilder<_>>();
168 let (probe_output, probe_stream) = builder.new_output::<CapacityContainerBuilder<_>>();
169
170 let mut rewind_input =
171 builder.new_disconnected_input(rewind_stream, Exchange::new(move |_| slot_reader));
172 let mut slot_ready_input = builder.new_disconnected_input(slot_ready_stream, Pipeline);
173 let output_uppers = table_info
174 .iter()
175 .flat_map(|(_, outputs)| outputs.values().map(|o| o.resume_upper.clone()))
176 .collect::<Vec<_>>();
177 metrics.tables.set(u64::cast_from(output_uppers.len()));
178
179 let reader_table_info = table_info.clone();
180 let (button, transient_errors) = builder.build_fallible(move |caps| {
181 let mut table_info = reader_table_info;
182 let busy_signal = Arc::clone(&config.busy_signal);
183 Box::pin(SignaledFuture::new(busy_signal, async move {
184 let (id, worker_id) = (config.id, config.worker_id);
185 let [
186 data_cap_set,
187 upper_cap_set,
188 definite_error_cap_set,
189 probe_cap,
190 ]: &mut [_; 4] = caps.try_into().unwrap();
191
192 if !config.responsible_for("slot") {
193 // Emit 0, to mark this worker as having started up correctly.
194 for stat in config.statistics.values() {
195 stat.set_offset_known(0);
196 stat.set_offset_committed(0);
197 }
198 return Ok(());
199 }
200
201 // Determine the slot lsn.
202 let connection_config = connection
203 .connection
204 .config(
205 &config.config.connection_context.secrets_reader,
206 &config.config,
207 InTask::Yes,
208 )
209 .await?;
210
211 let slot = &connection.publication_details.slot;
212 let replication_client = connection_config
213 .connect_replication(&config.config.connection_context.ssh_tunnel_manager)
214 .await?;
215
216 let metadata_client = connection_config
217 .connect(
218 "replication metadata",
219 &config.config.connection_context.ssh_tunnel_manager,
220 )
221 .await?;
222 let metadata_client = Arc::new(metadata_client);
223
224 while let Some(_) = slot_ready_input.next().await {
225 // Wait for the slot to be created
226 }
227
228 // The slot is always created by the snapshot operator. If the slot doesn't exist,
229 // when this check runs, this operator will return an error.
230 let slot_metadata = super::fetch_slot_metadata(
231 &*metadata_client,
232 slot,
233 mz_storage_types::dyncfgs::PG_FETCH_SLOT_RESUME_LSN_INTERVAL
234 .get(config.config.config_set()),
235 )
236 .await?;
237
238 // We're the only application that should be using this replication
239 // slot. The only way that there can be another connection using
240 // this slot under normal operation is if there's a stale TCP
241 // connection from a prior incarnation of the source holding on to
242 // the slot. We don't want to wait for the WAL sender timeout and/or
243 // TCP keepalives to time out that connection, because these values
244 // are generally under the control of the DBA and may not time out
245 // the connection for multiple minutes, or at all. Instead we just
246 // force kill the connection that's using the slot.
247 //
248 // Note that there's a small risk that *we're* the zombie cluster
249 // that should not be using the replication slot. Kubernetes cannot
250 // 100% guarantee that only one cluster is alive at a time. However,
251 // this situation should not last long, and the worst that can
252 // happen is a bit of transient thrashing over ownership of the
253 // replication slot.
254 if let Some(active_pid) = slot_metadata.active_pid {
255 tracing::warn!(
256 %id, %active_pid,
257 "replication slot already in use; will attempt to kill existing connection",
258 );
259
260 match metadata_client
261 .execute("SELECT pg_terminate_backend($1)", &[&active_pid])
262 .await
263 {
264 Ok(_) => {
265 tracing::info!(
266 "successfully killed existing connection; \
267 starting replication is likely to succeed"
268 );
269 // Note that `pg_terminate_backend` does not wait for
270 // the termination of the targeted connection to
271 // complete. We may try to start replication before the
272 // targeted connection has cleaned up its state. That's
273 // okay. If that happens we'll just try again from the
274 // top via the suspend-and-restart flow.
275 }
276 Err(e) => {
277 tracing::warn!(
278 %e,
279 "failed to kill existing replication connection; \
280 replication will likely fail to start"
281 );
282 // Continue on anyway, just in case the replication slot
283 // is actually available. Maybe PostgreSQL has some
284 // staleness when it reports `active_pid`, for example.
285 }
286 }
287 }
288
289 // The overall resumption point for this source is the minimum of the resumption points
290 // contributed by each of the outputs.
291 let resume_lsn = output_uppers
292 .iter()
293 .flat_map(|f| f.elements())
294 .map(|&lsn| {
295 // An output is either an output that has never had data committed to it or one
296 // that has and needs to resume. We differentiate between the two by checking
297 // whether an output wishes to "resume" from the minimum timestamp. In that case
298 // its contribution to the overal resumption point is the earliest point available
299 // in the slot. This information would normally be something that the storage
300 // controller figures out in the form of an as-of frontier, but at the moment the
301 // storage controller does not have visibility into what the replication slot is
302 // doing.
303 if lsn == MzOffset::from(0) {
304 slot_metadata.confirmed_flush_lsn
305 } else {
306 lsn
307 }
308 })
309 .min();
310 let Some(resume_lsn) = resume_lsn else {
311 std::future::pending::<()>().await;
312 return Ok(());
313 };
314 upper_cap_set.downgrade([&resume_lsn]);
315 trace!(%id, "timely-{worker_id} replication reader started lsn={resume_lsn}");
316
317 // Emitting an initial probe before we start waiting for rewinds ensures that we will
318 // have a timestamp binding in the remap collection while the snapshot is processed.
319 // This is important because otherwise the snapshot updates would need to be buffered
320 // in the reclock operator, instead of being spilled to S3 in the persist sink.
321 //
322 // Note that we need to fetch the probe LSN _after_ having created the replication
323 // slot, to make sure the fetched LSN will be included in the replication stream.
324 let probe_ts = (config.now_fn)().into();
325 let max_lsn = super::fetch_max_lsn(&*metadata_client).await?;
326 let probe = Probe {
327 probe_ts,
328 upstream_frontier: Antichain::from_elem(max_lsn),
329 };
330 probe_output.give(&probe_cap[0], probe);
331
332 let mut rewinds = BTreeMap::new();
333 while let Some(event) = rewind_input.next().await {
334 if let AsyncEvent::Data(_, data) = event {
335 for req in data {
336 if resume_lsn > req.snapshot_lsn + 1 {
337 let err = DefiniteError::SlotCompactedPastResumePoint(
338 req.snapshot_lsn + 1,
339 resume_lsn,
340 );
341 // If the replication stream cannot be obtained from the resume point there is nothing
342 // else to do. These errors are not retractable.
343 for (oid, outputs) in table_info.iter() {
344 for output_index in outputs.keys() {
345 // We pick `u64::MAX` as the LSN which will (in practice) never conflict
346 // any previously revealed portions of the TVC.
347 let update = (
348 (
349 *oid,
350 *output_index,
351 Err(DataflowError::from(err.clone())),
352 ),
353 MzOffset::from(u64::MAX),
354 Diff::ONE,
355 );
356 data_output.give_fueled(&data_cap_set[0], update).await;
357 }
358 }
359 definite_error_handle.give(
360 &definite_error_cap_set[0],
361 ReplicationError::Definite(Rc::new(err)),
362 );
363 return Ok(());
364 }
365 rewinds.insert(req.output_index, req);
366 }
367 }
368 }
369 trace!(%id, "timely-{worker_id} pending rewinds {rewinds:?}");
370
371 let mut committed_uppers = pin!(committed_uppers);
372
373 let stream_result = raw_stream(
374 &config,
375 replication_client,
376 Arc::clone(&metadata_client),
377 &connection.publication_details.slot,
378 &connection.publication_details.timeline_id,
379 &connection.publication,
380 resume_lsn,
381 committed_uppers.as_mut(),
382 &probe_output,
383 &probe_cap[0],
384 )
385 .await?;
386
387 let stream = match stream_result {
388 Ok(stream) => stream,
389 Err(err) => {
390 // If the replication stream cannot be obtained in a definite way there is
391 // nothing else to do. These errors are not retractable.
392 for (oid, outputs) in table_info.iter() {
393 for output_index in outputs.keys() {
394 // We pick `u64::MAX` as the LSN which will (in practice) never conflict
395 // any previously revealed portions of the TVC.
396 let update = (
397 (*oid, *output_index, Err(DataflowError::from(err.clone()))),
398 MzOffset::from(u64::MAX),
399 Diff::ONE,
400 );
401 data_output.give_fueled(&data_cap_set[0], update).await;
402 }
403 }
404
405 definite_error_handle.give(
406 &definite_error_cap_set[0],
407 ReplicationError::Definite(Rc::new(err)),
408 );
409 return Ok(());
410 }
411 };
412 let mut stream = pin!(stream.peekable());
413
414 // Run the periodic schema validation on a separate task using a separate client,
415 // to prevent it from blocking the replication reading progress.
416 let ssh_tunnel_manager = &config.config.connection_context.ssh_tunnel_manager;
417 let client = connection_config
418 .connect("schema validation", ssh_tunnel_manager)
419 .await?;
420 let mut schema_errors = spawn_schema_validator(
421 client,
422 &config,
423 connection.publication.clone(),
424 table_info.clone(),
425 );
426
427 // Instead of downgrading the capability for every transaction we process we only do it
428 // if we're about to yield, which is checked at the bottom of the loop. This avoids
429 // creating excessive progress tracking traffic when there are multiple small
430 // transactions ready to go.
431 let mut data_upper = resume_lsn;
432 // A stash of reusable vectors to convert from bytes::Bytes based data, which is not
433 // compatible with `columnation`, to Vec<u8> data that is.
434 while let Some(event) = stream.as_mut().next().await {
435 use LogicalReplicationMessage::*;
436 use ReplicationMessage::*;
437 match event {
438 Ok(XLogData(data)) => match data.data() {
439 Begin(begin) => {
440 let commit_lsn = MzOffset::from(begin.final_lsn());
441
442 let mut tx = pin!(extract_transaction(
443 stream.by_ref(),
444 &*metadata_client,
445 commit_lsn,
446 &mut table_info,
447 &metrics,
448 &connection.publication,
449 ));
450
451 trace!(
452 %id,
453 "timely-{worker_id} extracting transaction \
454 at {commit_lsn}"
455 );
456 assert!(
457 data_upper <= commit_lsn,
458 "new_upper={data_upper} tx_lsn={commit_lsn}",
459 );
460 data_upper = commit_lsn + 1;
461 // We are about to ingest a transaction which has the possiblity to be
462 // very big and we certainly don't want to hold the data in memory. For
463 // this reason we eagerly downgrade the upper capability in order for
464 // the reclocking machinery to mint a binding that includes
465 // this transaction and therefore be able to pass the data of the
466 // transaction through as we stream it.
467 upper_cap_set.downgrade([&data_upper]);
468 while let Some((oid, output_index, event, diff)) = tx.try_next().await?
469 {
470 let event = event.map_err(Into::into);
471 let mut data = (oid, output_index, event);
472 if let Some(req) = rewinds.get(&output_index) {
473 if commit_lsn <= req.snapshot_lsn {
474 let update = (data, MzOffset::from(0), -diff);
475 data_output.give_fueled(&data_cap_set[0], &update).await;
476 data = update.0;
477 }
478 }
479 let update = (data, commit_lsn, diff);
480 data_output.give_fueled(&data_cap_set[0], &update).await;
481 }
482 }
483 _ => return Err(TransientError::BareTransactionEvent),
484 },
485 Ok(PrimaryKeepAlive(keepalive)) => {
486 trace!( %id,
487 "timely-{worker_id} received keepalive lsn={}",
488 keepalive.wal_end()
489 );
490
491 // Take the opportunity to report any schema validation errors.
492 while let Ok(error) = schema_errors.try_recv() {
493 use SchemaValidationError::*;
494 match error {
495 Postgres(PostgresError::PublicationMissing(publication)) => {
496 let err = DefiniteError::PublicationDropped(publication);
497 for (oid, outputs) in table_info.iter() {
498 for output_index in outputs.keys() {
499 let update = (
500 (
501 *oid,
502 *output_index,
503 Err(DataflowError::from(err.clone())),
504 ),
505 data_cap_set[0].time().clone(),
506 Diff::ONE,
507 );
508 data_output.give_fueled(&data_cap_set[0], update).await;
509 }
510 }
511 definite_error_handle.give(
512 &definite_error_cap_set[0],
513 ReplicationError::Definite(Rc::new(err)),
514 );
515 return Ok(());
516 }
517 Postgres(pg_error) => Err(TransientError::from(pg_error))?,
518 Schema {
519 oid,
520 output_index,
521 error,
522 } => {
523 let table = table_info.get_mut(&oid).unwrap();
524 if table.remove(&output_index).is_none() {
525 continue;
526 }
527
528 let update = (
529 (oid, output_index, Err(error.into())),
530 data_cap_set[0].time().clone(),
531 Diff::ONE,
532 );
533 data_output.give_fueled(&data_cap_set[0], update).await;
534 }
535 }
536 }
537 data_upper = std::cmp::max(data_upper, keepalive.wal_end().into());
538 }
539 Ok(_) => return Err(TransientError::UnknownReplicationMessage),
540 Err(err) => return Err(err),
541 }
542
543 let will_yield = stream.as_mut().peek().now_or_never().is_none();
544 if will_yield {
545 trace!(%id, "timely-{worker_id} yielding at lsn={data_upper}");
546 rewinds.retain(|_, req| data_upper <= req.snapshot_lsn);
547 // As long as there are pending rewinds we can't downgrade our data capability
548 // since we must be able to produce data at offset 0.
549 if rewinds.is_empty() {
550 data_cap_set.downgrade([&data_upper]);
551 }
552 upper_cap_set.downgrade([&data_upper]);
553 }
554 }
555 // We never expect the replication stream to gracefully end
556 Err(TransientError::ReplicationEOF)
557 }))
558 });
559
560 // We now process the slot updates and apply the cast expressions
561 let mut final_row = Row::default();
562 let mut datum_vec = DatumVec::new();
563 let mut next_worker = (0..u64::cast_from(scope.peers()))
564 // Round robin on 1000-records basis to avoid creating tiny containers when there are a
565 // small number of updates and a large number of workers.
566 .flat_map(|w| std::iter::repeat_n(w, 1000))
567 .cycle();
568 let round_robin = Exchange::new(move |_| next_worker.next().unwrap());
569 let replication_updates = data_stream
570 .map::<Vec<_>, _, _>(Clone::clone)
571 .unary(round_robin, "PgCastReplicationRows", |_, _| {
572 move |input, output| {
573 input.for_each_time(|time, data| {
574 let mut session = output.session(&time);
575 for ((oid, output_index, event), time, diff) in
576 data.flat_map(|data| data.drain(..))
577 {
578 let output = &table_info
579 .get(&oid)
580 .and_then(|outputs| outputs.get(&output_index))
581 .expect("table_info contains all outputs");
582 let event = event.and_then(|row| {
583 let datums = datum_vec.borrow_with(&row);
584 super::cast_row(&output.casts, &datums, &mut final_row)?;
585 Ok(SourceMessage {
586 key: Row::default(),
587 value: final_row.clone(),
588 metadata: Row::default(),
589 })
590 });
591
592 session.give(((output_index, event), time, diff));
593 }
594 });
595 }
596 })
597 .as_collection();
598
599 let errors = definite_errors.concat(&transient_errors.map(ReplicationError::from));
600
601 (
602 replication_updates,
603 upper_stream,
604 Some(probe_stream),
605 errors,
606 button.press_on_drop(),
607 )
608}
609
610/// Produces the logical replication stream while taking care of regularly sending standby
611/// keepalive messages with the provided `uppers` stream.
612///
613/// The returned stream will contain all transactions that whose commit LSN is beyond `resume_lsn`.
614async fn raw_stream<'a>(
615 config: &'a RawSourceCreationConfig,
616 replication_client: Client,
617 metadata_client: Arc<Client>,
618 slot: &'a str,
619 timeline_id: &'a Option<u64>,
620 publication: &'a str,
621 resume_lsn: MzOffset,
622 uppers: impl futures::Stream<Item = Antichain<MzOffset>> + 'a,
623 probe_output: &'a AsyncOutputHandle<MzOffset, CapacityContainerBuilder<Vec<Probe<MzOffset>>>>,
624 probe_cap: &'a Capability<MzOffset>,
625) -> Result<
626 Result<impl AsyncStream<Item = Result<LogicalReplMsg, TransientError>> + 'a, DefiniteError>,
627 TransientError,
628> {
629 if let Err(err) = ensure_publication_exists(&*metadata_client, publication).await? {
630 // If the publication gets deleted there is nothing else to do. These errors
631 // are not retractable.
632 return Ok(Err(err));
633 }
634
635 // Skip the timeline ID check for sources without a known timeline ID
636 // (sources created before the timeline ID was added to the source details)
637 if let Some(expected_timeline_id) = timeline_id {
638 if let Err(err) = ensure_replication_timeline_id(
639 &replication_client,
640 expected_timeline_id,
641 config.config.config_set(),
642 )
643 .await?
644 {
645 return Ok(Err(err));
646 }
647 }
648
649 // How often a proactive standby status update message should be sent to the server.
650 //
651 // The upstream will periodically request status updates by setting the keepalive's reply field
652 // value to 1. However, we cannot rely on these messages arriving on time. For example, when
653 // the upstream is sending a big transaction its keepalive messages are queued and can be
654 // delayed arbitrarily.
655 //
656 // See: <https://www.postgresql.org/message-id/CAMsr+YE2dSfHVr7iEv1GSPZihitWX-PMkD9QALEGcTYa+sdsgg@mail.gmail.com>
657 //
658 // For this reason we query the server's timeout value and proactively send a keepalive at
659 // twice the frequency to have a healthy margin from the deadline.
660 //
661 // Note: We must use the metadata client here which is NOT in replication mode. Some Aurora
662 // Postgres versions disallow SHOW commands from within replication connection.
663 // See: https://github.com/readysettech/readyset/discussions/28#discussioncomment-4405671
664 let row = simple_query_opt(&*metadata_client, "SHOW wal_sender_timeout;")
665 .await?
666 .unwrap();
667 let wal_sender_timeout = match row.get("wal_sender_timeout") {
668 // When this parameter is zero the timeout mechanism is disabled
669 Some("0") => None,
670 Some(value) => Some(
671 mz_repr::adt::interval::Interval::from_str(value)
672 .unwrap()
673 .duration()
674 .unwrap(),
675 ),
676 None => panic!("ubiquitous parameter missing"),
677 };
678
679 // This interval controls the cadence at which we send back status updates and, crucially,
680 // request PrimaryKeepAlive messages. PrimaryKeepAlive messages drive the frontier forward in
681 // the absence of data updates and we don't want a large `wal_sender_timeout` value to slow us
682 // down. For this reason the feedback interval is set to one second, or less if the
683 // wal_sender_timeout is less than 2 seconds.
684 let feedback_interval = match wal_sender_timeout {
685 Some(t) => std::cmp::min(Duration::from_secs(1), t.checked_div(2).unwrap()),
686 None => Duration::from_secs(1),
687 };
688
689 let mut feedback_timer = tokio::time::interval(feedback_interval);
690 // 'Delay' ensures we always tick at least 'feedback_interval'.
691 feedback_timer.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
692
693 // Postgres will return all transactions that commit *at or after* after the provided LSN,
694 // following the timely upper semantics.
695 let lsn = PgLsn::from(resume_lsn.offset);
696 let query = format!(
697 r#"START_REPLICATION SLOT "{}" LOGICAL {} ("proto_version" '1', "publication_names" '{}')"#,
698 Ident::new_unchecked(slot).to_ast_string_simple(),
699 lsn,
700 publication,
701 );
702 let copy_stream = match replication_client.copy_both_simple(&query).await {
703 Ok(copy_stream) => copy_stream,
704 Err(err) if err.code() == Some(&SqlState::OBJECT_NOT_IN_PREREQUISITE_STATE) => {
705 return Ok(Err(DefiniteError::InvalidReplicationSlot));
706 }
707 Err(err) => return Err(err.into()),
708 };
709
710 // According to the documentation [1] we must check that the slot LSN matches our
711 // expectations otherwise we risk getting silently fast-forwarded to a future LSN. In order
712 // to avoid a TOCTOU issue we must do this check after starting the replication stream. We
713 // cannot use the replication client to do that because it's already in CopyBoth mode.
714 // [1] https://www.postgresql.org/docs/15/protocol-replication.html#PROTOCOL-REPLICATION-START-REPLICATION-SLOT-LOGICAL
715 let slot_metadata = super::fetch_slot_metadata(
716 &*metadata_client,
717 slot,
718 mz_storage_types::dyncfgs::PG_FETCH_SLOT_RESUME_LSN_INTERVAL
719 .get(config.config.config_set()),
720 )
721 .await?;
722 let min_resume_lsn = slot_metadata.confirmed_flush_lsn;
723 tracing::info!(
724 %config.id,
725 "started replication using backend PID={:?}. wal_sender_timeout={:?}",
726 slot_metadata.active_pid, wal_sender_timeout
727 );
728
729 let (probe_tx, mut probe_rx) = watch::channel(None);
730 let config_set = Arc::clone(config.config.config_set());
731 let now_fn = config.now_fn.clone();
732 let max_lsn_task_handle =
733 mz_ore::task::spawn(|| format!("pg_current_wal_lsn:{}", config.id), async move {
734 let mut probe_ticker =
735 probe::Ticker::new(|| PG_OFFSET_KNOWN_INTERVAL.get(&config_set), now_fn);
736
737 while !probe_tx.is_closed() {
738 let probe_ts = probe_ticker.tick().await;
739 let probe_or_err = super::fetch_max_lsn(&*metadata_client)
740 .await
741 .map(|lsn| Probe {
742 probe_ts,
743 upstream_frontier: Antichain::from_elem(lsn),
744 });
745 let _ = probe_tx.send(Some(probe_or_err));
746 }
747 })
748 .abort_on_drop();
749
750 let stream = async_stream::try_stream!({
751 // Ensure we don't pre-drop the task
752 let _max_lsn_task_handle = max_lsn_task_handle;
753
754 // ensure we don't drop the replication client!
755 let _replication_client = replication_client;
756
757 let mut uppers = pin!(uppers);
758 let mut last_committed_upper = resume_lsn;
759
760 let mut stream = pin!(LogicalReplicationStream::new(copy_stream));
761
762 if !(resume_lsn == MzOffset::from(0) || min_resume_lsn <= resume_lsn) {
763 let err = TransientError::OvercompactedReplicationSlot {
764 available_lsn: min_resume_lsn,
765 requested_lsn: resume_lsn,
766 };
767 error!("timely-{} ({}) {err}", config.worker_id, config.id);
768 Err(err)?;
769 }
770
771 loop {
772 tokio::select! {
773 Some(next_message) = stream.next() => match next_message {
774 Ok(ReplicationMessage::XLogData(data)) => {
775 yield ReplicationMessage::XLogData(data);
776 Ok(())
777 }
778 Ok(ReplicationMessage::PrimaryKeepAlive(keepalive)) => {
779 yield ReplicationMessage::PrimaryKeepAlive(keepalive);
780 Ok(())
781 }
782 Err(err) => Err(err.into()),
783 _ => Err(TransientError::UnknownReplicationMessage),
784 },
785 _ = feedback_timer.tick() => {
786 let ts: i64 = PG_EPOCH.elapsed().unwrap().as_micros().try_into().unwrap();
787 let lsn = PgLsn::from(last_committed_upper.offset);
788 trace!("timely-{} ({}) sending keepalive {lsn:?}", config.worker_id, config.id);
789 // Postgres only sends PrimaryKeepAlive messages when *it* wants a reply, which
790 // happens when out status update is late. Since we send them proactively this
791 // may never happen. It is therefore *crucial* that we set the last parameter
792 // (the reply flag) to 1 here. This will cause the upstream server to send us a
793 // PrimaryKeepAlive message promptly which will give us frontier advancement
794 // information in the absence of data updates.
795 let res = stream.as_mut().standby_status_update(lsn, lsn, lsn, ts, 1).await;
796 res.map_err(|e| e.into())
797 },
798 Some(upper) = uppers.next() => match upper.into_option() {
799 Some(lsn) => {
800 if last_committed_upper < lsn {
801 last_committed_upper = lsn;
802 for stat in config.statistics.values() {
803 stat.set_offset_committed(last_committed_upper.offset);
804 }
805 }
806 Ok(())
807 }
808 None => Ok(()),
809 },
810 Ok(()) = probe_rx.changed() => match &*probe_rx.borrow() {
811 Some(Ok(probe)) => {
812 if let Some(offset_known) = probe.upstream_frontier.as_option() {
813 for stat in config.statistics.values() {
814 stat.set_offset_known(offset_known.offset);
815 }
816 }
817 probe_output.give(probe_cap, probe);
818 Ok(())
819 },
820 Some(Err(err)) => Err(anyhow::anyhow!("{err}").into()),
821 None => Ok(()),
822 },
823 else => return
824 }?;
825 }
826 });
827 Ok(Ok(stream))
828}
829
830/// Extracts a single transaction from the replication stream delimited by a BEGIN and COMMIT
831/// message. The BEGIN message must have already been consumed from the stream before calling this
832/// function.
833fn extract_transaction<'a>(
834 stream: impl AsyncStream<Item = Result<LogicalReplMsg, TransientError>> + 'a,
835 metadata_client: &'a Client,
836 commit_lsn: MzOffset,
837 table_info: &'a mut BTreeMap<u32, BTreeMap<usize, SourceOutputInfo>>,
838 metrics: &'a PgSourceMetrics,
839 publication: &'a str,
840) -> impl AsyncStream<Item = Result<DecodedRow, TransientError>> + 'a {
841 use LogicalReplicationMessage::*;
842 let mut row = Row::default();
843 async_stream::try_stream!({
844 let mut stream = pin!(stream);
845 metrics.transactions.inc();
846 metrics.lsn.set(commit_lsn.offset);
847 while let Some(event) = stream.try_next().await? {
848 // We can ignore keepalive messages while processing a transaction because the
849 // commit_lsn will drive progress.
850 let message = match event {
851 ReplicationMessage::XLogData(data) => data.into_data(),
852 ReplicationMessage::PrimaryKeepAlive(_) => {
853 metrics.ignored.inc();
854 continue;
855 }
856 _ => Err(TransientError::UnknownReplicationMessage)?,
857 };
858 metrics.total.inc();
859 match message {
860 Insert(body) if !table_info.contains_key(&body.rel_id()) => metrics.ignored.inc(),
861 Update(body) if !table_info.contains_key(&body.rel_id()) => metrics.ignored.inc(),
862 Delete(body) if !table_info.contains_key(&body.rel_id()) => metrics.ignored.inc(),
863 Relation(body) if !table_info.contains_key(&body.rel_id()) => metrics.ignored.inc(),
864 Insert(body) => {
865 metrics.inserts.inc();
866 let rel = body.rel_id();
867 for (output, info) in table_info.get(&rel).into_iter().flatten() {
868 let tuple_data = body.tuple().tuple_data();
869 let Some(ref projection) = info.projection else {
870 panic!("missing projection for {rel}");
871 };
872 let datums = projection.iter().map(|idx| &tuple_data[*idx]);
873 let row = unpack_tuple(datums, &mut row);
874 yield (rel, *output, row, Diff::ONE);
875 }
876 }
877 Update(body) => match body.old_tuple() {
878 Some(old_tuple) => {
879 metrics.updates.inc();
880 let new_tuple = body.new_tuple();
881 let rel = body.rel_id();
882 for (output, info) in table_info.get(&rel).into_iter().flatten() {
883 let Some(ref projection) = info.projection else {
884 panic!("missing projection for {rel}");
885 };
886 let old_tuple =
887 projection.iter().map(|idx| &old_tuple.tuple_data()[*idx]);
888 // If the new tuple contains unchanged toast values we reference the old ones
889 let new_tuple = std::iter::zip(
890 projection.iter().map(|idx| &new_tuple.tuple_data()[*idx]),
891 old_tuple.clone(),
892 )
893 .map(|(new, old)| match new {
894 TupleData::UnchangedToast => old,
895 _ => new,
896 });
897 let old_row = unpack_tuple(old_tuple, &mut row);
898 let new_row = unpack_tuple(new_tuple, &mut row);
899
900 yield (rel, *output, old_row, Diff::MINUS_ONE);
901 yield (rel, *output, new_row, Diff::ONE);
902 }
903 }
904 None => {
905 let rel = body.rel_id();
906 for (output, _) in table_info.get(&rel).into_iter().flatten() {
907 yield (
908 rel,
909 *output,
910 Err(DefiniteError::DefaultReplicaIdentity),
911 Diff::ONE,
912 );
913 }
914 }
915 },
916 Delete(body) => match body.old_tuple() {
917 Some(old_tuple) => {
918 metrics.deletes.inc();
919 let rel = body.rel_id();
920 for (output, info) in table_info.get(&rel).into_iter().flatten() {
921 let Some(ref projection) = info.projection else {
922 panic!("missing projection for {rel}");
923 };
924 let datums = projection.iter().map(|idx| &old_tuple.tuple_data()[*idx]);
925 let row = unpack_tuple(datums, &mut row);
926 yield (rel, *output, row, Diff::MINUS_ONE);
927 }
928 }
929 None => {
930 let rel = body.rel_id();
931 for (output, _) in table_info.get(&rel).into_iter().flatten() {
932 yield (
933 rel,
934 *output,
935 Err(DefiniteError::DefaultReplicaIdentity),
936 Diff::ONE,
937 );
938 }
939 }
940 },
941 Relation(body) => {
942 let rel_id = body.rel_id();
943 if let Some(outputs) = table_info.get_mut(&body.rel_id()) {
944 // Because the replication stream doesn't include columns' attnums, we need
945 // to check the current local schema against the current remote schema to
946 // ensure e.g. we haven't received a schema update with the same terminal
947 // column name which is actually a different column.
948 let upstream_info = mz_postgres_util::publication_info(
949 metadata_client,
950 publication,
951 Some(&[rel_id]),
952 )
953 .await?;
954
955 let mut schema_errors = vec![];
956
957 outputs.retain(|output_index, info| {
958 match verify_schema(rel_id, info, &upstream_info) {
959 Ok(()) => true,
960 Err(err) => {
961 schema_errors.push((
962 rel_id,
963 *output_index,
964 Err(err),
965 Diff::ONE,
966 ));
967 false
968 }
969 }
970 });
971 // Recalculate projection vector for the retained valid outputs. Here we
972 // must use the column names in the RelationBody message and not the
973 // upstream_info obtained above, since that one represents the current
974 // schema upstream which may be many versions head of the one we're about
975 // to receive after this Relation message.
976 let column_positions: BTreeMap<_, _> = body
977 .columns()
978 .iter()
979 .enumerate()
980 .map(|(idx, col)| (col.name().unwrap(), idx))
981 .collect();
982 for info in outputs.values_mut() {
983 let mut projection = vec![];
984 for col in info.desc.columns.iter() {
985 projection.push(column_positions[&*col.name]);
986 }
987 info.projection = Some(projection);
988 }
989 for schema_error in schema_errors {
990 yield schema_error;
991 }
992 }
993 }
994 Truncate(body) => {
995 for &rel_id in body.rel_ids() {
996 if let Some(outputs) = table_info.get_mut(&rel_id) {
997 for (output, _) in std::mem::take(outputs) {
998 yield (
999 rel_id,
1000 output,
1001 Err(DefiniteError::TableTruncated),
1002 Diff::ONE,
1003 );
1004 }
1005 }
1006 }
1007 }
1008 Commit(body) => {
1009 if commit_lsn != body.commit_lsn().into() {
1010 Err(TransientError::InvalidTransaction)?
1011 }
1012 return;
1013 }
1014 // TODO: We should handle origin messages and emit an error as they indicate that
1015 // the upstream performed a point in time restore so all bets are off about the
1016 // continuity of the stream.
1017 Origin(_) | Type(_) => metrics.ignored.inc(),
1018 Begin(_) => Err(TransientError::NestedTransaction)?,
1019 // The enum is marked as non_exhaustive. Better to be conservative
1020 _ => Err(TransientError::UnknownLogicalReplicationMessage)?,
1021 }
1022 }
1023 Err(TransientError::ReplicationEOF)?;
1024 })
1025}
1026
1027/// Unpacks an iterator of TupleData into a list of nullable bytes or an error if this can't be
1028/// done.
1029#[inline]
1030fn unpack_tuple<'a, I>(tuple_data: I, row: &mut Row) -> Result<Row, DefiniteError>
1031where
1032 I: IntoIterator<Item = &'a TupleData>,
1033 I::IntoIter: ExactSizeIterator,
1034{
1035 let iter = tuple_data.into_iter();
1036 let mut packer = row.packer();
1037 for data in iter {
1038 let datum = match data {
1039 TupleData::Text(bytes) => super::decode_utf8_text(bytes)?,
1040 TupleData::Null => Datum::Null,
1041 TupleData::UnchangedToast => return Err(DefiniteError::MissingToast),
1042 TupleData::Binary(_) => return Err(DefiniteError::UnexpectedBinaryData),
1043 };
1044 packer.push(datum);
1045 }
1046 Ok(row.clone())
1047}
1048
1049/// Ensures the publication exists on the server. It returns an outer transient error in case of
1050/// connection issues and an inner definite error if the publication is dropped.
1051async fn ensure_publication_exists(
1052 client: &Client,
1053 publication: &str,
1054) -> Result<Result<(), DefiniteError>, TransientError> {
1055 // Figure out the last written LSN and then add one to convert it into an upper.
1056 let result = client
1057 .query_opt(
1058 "SELECT 1 FROM pg_publication WHERE pubname = $1;",
1059 &[&publication],
1060 )
1061 .await?;
1062 match result {
1063 Some(_) => Ok(Ok(())),
1064 None => Ok(Err(DefiniteError::PublicationDropped(
1065 publication.to_owned(),
1066 ))),
1067 }
1068}
1069
1070/// Ensure the active replication timeline_id matches the one we expect such that we can safely
1071/// resume replication. It returns an outer transient error in case of
1072/// connection issues and an inner definite error if the timeline id does not match.
1073async fn ensure_replication_timeline_id(
1074 replication_client: &Client,
1075 expected_timeline_id: &u64,
1076 config_set: &ConfigSet,
1077) -> Result<Result<(), DefiniteError>, TransientError> {
1078 let timeline_id = mz_postgres_util::get_timeline_id(replication_client).await?;
1079 if timeline_id == *expected_timeline_id {
1080 Ok(Ok(()))
1081 } else {
1082 if PG_SOURCE_VALIDATE_TIMELINE.get(config_set) {
1083 Ok(Err(DefiniteError::InvalidTimelineId {
1084 expected: *expected_timeline_id,
1085 actual: timeline_id,
1086 }))
1087 } else {
1088 tracing::warn!(
1089 "Timeline ID mismatch ignored: expected={expected_timeline_id} actual={timeline_id}"
1090 );
1091 Ok(Ok(()))
1092 }
1093 }
1094}
1095
1096enum SchemaValidationError {
1097 Postgres(PostgresError),
1098 Schema {
1099 oid: u32,
1100 output_index: usize,
1101 error: DefiniteError,
1102 },
1103}
1104
1105fn spawn_schema_validator(
1106 client: Client,
1107 config: &RawSourceCreationConfig,
1108 publication: String,
1109 table_info: BTreeMap<u32, BTreeMap<usize, SourceOutputInfo>>,
1110) -> mpsc::UnboundedReceiver<SchemaValidationError> {
1111 let (tx, rx) = mpsc::unbounded_channel();
1112 let source_id = config.id;
1113 let config_set = Arc::clone(config.config.config_set());
1114
1115 mz_ore::task::spawn(|| format!("schema-validator:{}", source_id), async move {
1116 while !tx.is_closed() {
1117 trace!(%source_id, "validating schemas");
1118
1119 let validation_start = Instant::now();
1120
1121 let upstream_info = match mz_postgres_util::publication_info(
1122 &*client,
1123 &publication,
1124 Some(&table_info.keys().copied().collect::<Vec<_>>()),
1125 )
1126 .await
1127 {
1128 Ok(info) => info,
1129 Err(error) => {
1130 let _ = tx.send(SchemaValidationError::Postgres(error));
1131 continue;
1132 }
1133 };
1134
1135 for (&oid, outputs) in table_info.iter() {
1136 for (&output_index, info) in outputs {
1137 if let Err(error) = verify_schema(oid, info, &upstream_info) {
1138 trace!(
1139 %source_id,
1140 "schema of output index {output_index} for oid {oid} invalid",
1141 );
1142 let _ = tx.send(SchemaValidationError::Schema {
1143 oid,
1144 output_index,
1145 error,
1146 });
1147 } else {
1148 trace!(
1149 %source_id,
1150 "schema of output index {output_index} for oid {oid} valid",
1151 );
1152 }
1153 }
1154 }
1155
1156 let interval = PG_SCHEMA_VALIDATION_INTERVAL.get(&config_set);
1157 let elapsed = validation_start.elapsed();
1158 let wait = interval.saturating_sub(elapsed);
1159 tokio::time::sleep(wait).await;
1160 }
1161 });
1162
1163 rx
1164}