mz_storage/source/postgres/replication.rs
1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Renders the logical replication side of the [`PostgresSourceConnection`] ingestion dataflow.
11//!
12//! ```text
13//! o
14//! │rewind
15//! │requests
16//! ╭───┴────╮
17//! │exchange│ (collect all requests to one worker)
18//! ╰───┬────╯
19//! ┏━━v━━━━━━━━━━┓
20//! ┃ replication ┃ (single worker)
21//! ┃ reader ┃
22//! ┗━┯━━━━━━━━┯━━┛
23//! │raw │
24//! │data │
25//! ╭────┴─────╮ │
26//! │distribute│ │ (distribute to all workers)
27//! ╰────┬─────╯ │
28//! ┏━━━━━━━━━━━┷━┓ │
29//! ┃ replication ┃ │ (parallel decode)
30//! ┃ decoder ┃ │
31//! ┗━━━━━┯━━━━━━━┛ │
32//! │ replication │ progress
33//! │ updates │ output
34//! v v
35//! ```
36//!
37//! # Progress tracking
38//!
39//! In order to avoid causing excessive resource usage in the upstream server it's important to
40//! track the LSN that we have successfully committed to persist and communicate that back to
41//! PostgreSQL. Under normal operation this gauge of progress is provided by the presence of
42//! transactions themselves. Since at a given LSN offset there can be only a single message, when a
43//! transaction is received and processed we can infer that we have seen all the messages that are
44//! not beyond `commit_lsn + 1`.
45//!
46//! Things are a bit more complicated in the absence of transactions though because even though we
47//! don't receive any the server might very well be generating WAL records. This can happen if
48//! there is a separate logical database performing writes (which is the case for RDS databases),
49//! or, in servers running PostgreSQL version 15 or greater, the logical replication process
50//! includes an optimization that omits empty transactions, which can happen if you're only
51//! replicating a subset of the tables and there writes going to the other ones.
52//!
53//! If we fail to detect this situation and don't send LSN feedback in a timely manner the server
54//! will be forced to keep around WAL data that can eventually lead to disk space exhaustion.
55//!
56//! In the absence of transactions the only available piece of information in the replication
57//! stream are keepalive messages. Keepalive messages are documented[1] to contain the current end
58//! of WAL on the server. That is a useless number when it comes to progress tracking because there
59//! might be pending messages at LSNs between the last received commit_lsn and the current end of
60//! WAL.
61//!
62//! Fortunately for us, the documentation for PrimaryKeepalive messages is wrong and it actually
63//! contains the last *sent* LSN[2]. Here sent doesn't necessarily mean sent over the wire, but
64//! sent to the upstream process that is handling producing the logical stream. Therefore, if we
65//! receive a keepalive with a particular LSN we can be certain that there are no other replication
66//! messages at previous LSNs, because they would have been already generated and received. We
67//! therefore connect the keepalive messages directly to our capability.
68//!
69//! [1]: https://www.postgresql.org/docs/15/protocol-replication.html#PROTOCOL-REPLICATION-START-REPLICATION
70//! [2]: https://www.postgresql.org/message-id/CAFPTHDZS9O9WG02EfayBd6oONzK%2BqfUxS6AbVLJ7W%2BKECza2gg%40mail.gmail.com
71
72use std::collections::BTreeMap;
73use std::convert::Infallible;
74use std::pin::pin;
75use std::rc::Rc;
76use std::str::FromStr;
77use std::sync::Arc;
78use std::sync::LazyLock;
79use std::time::Instant;
80use std::time::{Duration, SystemTime, UNIX_EPOCH};
81
82use differential_dataflow::AsCollection;
83use futures::{FutureExt, Stream as AsyncStream, StreamExt, TryStreamExt};
84use mz_ore::cast::CastFrom;
85use mz_ore::future::InTask;
86use mz_postgres_util::PostgresError;
87use mz_postgres_util::{Client, simple_query_opt};
88use mz_repr::{Datum, DatumVec, Diff, Row};
89use mz_sql_parser::ast::{Ident, display::AstDisplay};
90use mz_storage_types::dyncfgs::{PG_OFFSET_KNOWN_INTERVAL, PG_SCHEMA_VALIDATION_INTERVAL};
91use mz_storage_types::errors::DataflowError;
92use mz_storage_types::sources::{MzOffset, PostgresSourceConnection};
93use mz_timely_util::builder_async::{
94 AsyncOutputHandle, Event as AsyncEvent, OperatorBuilder as AsyncOperatorBuilder,
95 PressOnDropButton,
96};
97use postgres_replication::LogicalReplicationStream;
98use postgres_replication::protocol::{LogicalReplicationMessage, ReplicationMessage, TupleData};
99use serde::{Deserialize, Serialize};
100use timely::container::CapacityContainerBuilder;
101use timely::dataflow::channels::pact::{Exchange, Pipeline};
102use timely::dataflow::channels::pushers::Tee;
103use timely::dataflow::operators::Capability;
104use timely::dataflow::operators::Concat;
105use timely::dataflow::operators::Operator;
106use timely::dataflow::operators::core::Map;
107use timely::dataflow::{Scope, Stream};
108use timely::progress::Antichain;
109use tokio::sync::{mpsc, watch};
110use tokio_postgres::error::SqlState;
111use tokio_postgres::types::PgLsn;
112use tracing::{error, trace};
113
114use crate::metrics::source::postgres::PgSourceMetrics;
115use crate::source::RawSourceCreationConfig;
116use crate::source::postgres::verify_schema;
117use crate::source::postgres::{DefiniteError, ReplicationError, SourceOutputInfo, TransientError};
118use crate::source::probe;
119use crate::source::types::{Probe, SignaledFuture, SourceMessage, StackedCollection};
120
121/// Postgres epoch is 2000-01-01T00:00:00Z
122static PG_EPOCH: LazyLock<SystemTime> =
123 LazyLock::new(|| UNIX_EPOCH + Duration::from_secs(946_684_800));
124
125// A request to rewind a snapshot taken at `snapshot_lsn` to the initial LSN of the replication
126// slot. This is accomplished by emitting `(data, 0, -diff)` for all updates `(data, lsn, diff)`
127// whose `lsn <= snapshot_lsn`. By convention the snapshot is always emitted at LSN 0.
128#[derive(Debug, Clone, Serialize, Deserialize)]
129pub(crate) struct RewindRequest {
130 /// The output index that should be rewound.
131 pub(crate) output_index: usize,
132 /// The LSN that the snapshot was taken at.
133 pub(crate) snapshot_lsn: MzOffset,
134}
135
136/// Renders the replication dataflow. See the module documentation for more information.
137pub(crate) fn render<G: Scope<Timestamp = MzOffset>>(
138 scope: G,
139 config: RawSourceCreationConfig,
140 connection: PostgresSourceConnection,
141 table_info: BTreeMap<u32, BTreeMap<usize, SourceOutputInfo>>,
142 rewind_stream: &Stream<G, RewindRequest>,
143 slot_ready_stream: &Stream<G, Infallible>,
144 committed_uppers: impl futures::Stream<Item = Antichain<MzOffset>> + 'static,
145 metrics: PgSourceMetrics,
146) -> (
147 StackedCollection<G, (usize, Result<SourceMessage, DataflowError>)>,
148 Stream<G, Infallible>,
149 Option<Stream<G, Probe<MzOffset>>>,
150 Stream<G, ReplicationError>,
151 PressOnDropButton,
152) {
153 let op_name = format!("ReplicationReader({})", config.id);
154 let mut builder = AsyncOperatorBuilder::new(op_name, scope.clone());
155
156 let slot_reader = u64::cast_from(config.responsible_worker("slot"));
157 let (data_output, data_stream) = builder.new_output();
158 let (_upper_output, upper_stream) = builder.new_output::<CapacityContainerBuilder<_>>();
159 let (definite_error_handle, definite_errors) = builder.new_output();
160 let (probe_output, probe_stream) = builder.new_output::<CapacityContainerBuilder<_>>();
161
162 let mut rewind_input =
163 builder.new_disconnected_input(rewind_stream, Exchange::new(move |_| slot_reader));
164 let mut slot_ready_input = builder.new_disconnected_input(slot_ready_stream, Pipeline);
165 let output_uppers = table_info
166 .iter()
167 .flat_map(|(_, outputs)| outputs.values().map(|o| o.resume_upper.clone()))
168 .collect::<Vec<_>>();
169 metrics.tables.set(u64::cast_from(output_uppers.len()));
170
171 let reader_table_info = table_info.clone();
172 let (button, transient_errors) = builder.build_fallible(move |caps| {
173 let mut table_info = reader_table_info;
174 let busy_signal = Arc::clone(&config.busy_signal);
175 Box::pin(SignaledFuture::new(busy_signal, async move {
176 let (id, worker_id) = (config.id, config.worker_id);
177 let [
178 data_cap_set,
179 upper_cap_set,
180 definite_error_cap_set,
181 probe_cap,
182 ]: &mut [_; 4] = caps.try_into().unwrap();
183
184 if !config.responsible_for("slot") {
185 // Emit 0, to mark this worker as having started up correctly.
186 for stat in config.statistics.values() {
187 stat.set_offset_known(0);
188 stat.set_offset_committed(0);
189 }
190 return Ok(());
191 }
192
193 // Determine the slot lsn.
194 let connection_config = connection
195 .connection
196 .config(
197 &config.config.connection_context.secrets_reader,
198 &config.config,
199 InTask::Yes,
200 )
201 .await?;
202
203 let slot = &connection.publication_details.slot;
204 let replication_client = connection_config
205 .connect_replication(&config.config.connection_context.ssh_tunnel_manager)
206 .await?;
207
208 let metadata_client = connection_config
209 .connect(
210 "replication metadata",
211 &config.config.connection_context.ssh_tunnel_manager,
212 )
213 .await?;
214 let metadata_client = Arc::new(metadata_client);
215
216 while let Some(_) = slot_ready_input.next().await {
217 // Wait for the slot to be created
218 }
219
220 // The slot is always created by the snapshot operator. If the slot doesn't exist,
221 // when this check runs, this operator will return an error.
222 let slot_metadata = super::fetch_slot_metadata(
223 &*metadata_client,
224 slot,
225 mz_storage_types::dyncfgs::PG_FETCH_SLOT_RESUME_LSN_INTERVAL
226 .get(config.config.config_set()),
227 )
228 .await?;
229
230 // We're the only application that should be using this replication
231 // slot. The only way that there can be another connection using
232 // this slot under normal operation is if there's a stale TCP
233 // connection from a prior incarnation of the source holding on to
234 // the slot. We don't want to wait for the WAL sender timeout and/or
235 // TCP keepalives to time out that connection, because these values
236 // are generally under the control of the DBA and may not time out
237 // the connection for multiple minutes, or at all. Instead we just
238 // force kill the connection that's using the slot.
239 //
240 // Note that there's a small risk that *we're* the zombie cluster
241 // that should not be using the replication slot. Kubernetes cannot
242 // 100% guarantee that only one cluster is alive at a time. However,
243 // this situation should not last long, and the worst that can
244 // happen is a bit of transient thrashing over ownership of the
245 // replication slot.
246 if let Some(active_pid) = slot_metadata.active_pid {
247 tracing::warn!(
248 %id, %active_pid,
249 "replication slot already in use; will attempt to kill existing connection",
250 );
251
252 match metadata_client
253 .execute("SELECT pg_terminate_backend($1)", &[&active_pid])
254 .await
255 {
256 Ok(_) => {
257 tracing::info!(
258 "successfully killed existing connection; \
259 starting replication is likely to succeed"
260 );
261 // Note that `pg_terminate_backend` does not wait for
262 // the termination of the targeted connection to
263 // complete. We may try to start replication before the
264 // targeted connection has cleaned up its state. That's
265 // okay. If that happens we'll just try again from the
266 // top via the suspend-and-restart flow.
267 }
268 Err(e) => {
269 tracing::warn!(
270 %e,
271 "failed to kill existing replication connection; \
272 replication will likely fail to start"
273 );
274 // Continue on anyway, just in case the replication slot
275 // is actually available. Maybe PostgreSQL has some
276 // staleness when it reports `active_pid`, for example.
277 }
278 }
279 }
280
281 // The overall resumption point for this source is the minimum of the resumption points
282 // contributed by each of the outputs.
283 let resume_lsn = output_uppers
284 .iter()
285 .flat_map(|f| f.elements())
286 .map(|&lsn| {
287 // An output is either an output that has never had data committed to it or one
288 // that has and needs to resume. We differentiate between the two by checking
289 // whether an output wishes to "resume" from the minimum timestamp. In that case
290 // its contribution to the overal resumption point is the earliest point available
291 // in the slot. This information would normally be something that the storage
292 // controller figures out in the form of an as-of frontier, but at the moment the
293 // storage controller does not have visibility into what the replication slot is
294 // doing.
295 if lsn == MzOffset::from(0) {
296 slot_metadata.confirmed_flush_lsn
297 } else {
298 lsn
299 }
300 })
301 .min();
302 let Some(resume_lsn) = resume_lsn else {
303 std::future::pending::<()>().await;
304 return Ok(());
305 };
306 upper_cap_set.downgrade([&resume_lsn]);
307 trace!(%id, "timely-{worker_id} replication reader started lsn={resume_lsn}");
308
309 // Emitting an initial probe before we start waiting for rewinds ensures that we will
310 // have a timestamp binding in the remap collection while the snapshot is processed.
311 // This is important because otherwise the snapshot updates would need to be buffered
312 // in the reclock operator, instead of being spilled to S3 in the persist sink.
313 //
314 // Note that we need to fetch the probe LSN _after_ having created the replication
315 // slot, to make sure the fetched LSN will be included in the replication stream.
316 let probe_ts = (config.now_fn)().into();
317 let max_lsn = super::fetch_max_lsn(&*metadata_client).await?;
318 let probe = Probe {
319 probe_ts,
320 upstream_frontier: Antichain::from_elem(max_lsn),
321 };
322 probe_output.give(&probe_cap[0], probe);
323
324 let mut rewinds = BTreeMap::new();
325 while let Some(event) = rewind_input.next().await {
326 if let AsyncEvent::Data(_, data) = event {
327 for req in data {
328 if resume_lsn > req.snapshot_lsn + 1 {
329 let err = DefiniteError::SlotCompactedPastResumePoint(
330 req.snapshot_lsn + 1,
331 resume_lsn,
332 );
333 // If the replication stream cannot be obtained from the resume point there is nothing
334 // else to do. These errors are not retractable.
335 for (oid, outputs) in table_info.iter() {
336 for output_index in outputs.keys() {
337 // We pick `u64::MAX` as the LSN which will (in practice) never conflict
338 // any previously revealed portions of the TVC.
339 let update = (
340 (
341 *oid,
342 *output_index,
343 Err(DataflowError::from(err.clone())),
344 ),
345 MzOffset::from(u64::MAX),
346 Diff::ONE,
347 );
348 data_output.give_fueled(&data_cap_set[0], update).await;
349 }
350 }
351 definite_error_handle.give(
352 &definite_error_cap_set[0],
353 ReplicationError::Definite(Rc::new(err)),
354 );
355 return Ok(());
356 }
357 rewinds.insert(req.output_index, req);
358 }
359 }
360 }
361 trace!(%id, "timely-{worker_id} pending rewinds {rewinds:?}");
362
363 let mut committed_uppers = pin!(committed_uppers);
364
365 let stream_result = raw_stream(
366 &config,
367 replication_client,
368 Arc::clone(&metadata_client),
369 &connection.publication_details.slot,
370 &connection.publication_details.timeline_id,
371 &connection.publication,
372 resume_lsn,
373 committed_uppers.as_mut(),
374 &probe_output,
375 &probe_cap[0],
376 )
377 .await?;
378
379 let stream = match stream_result {
380 Ok(stream) => stream,
381 Err(err) => {
382 // If the replication stream cannot be obtained in a definite way there is
383 // nothing else to do. These errors are not retractable.
384 for (oid, outputs) in table_info.iter() {
385 for output_index in outputs.keys() {
386 // We pick `u64::MAX` as the LSN which will (in practice) never conflict
387 // any previously revealed portions of the TVC.
388 let update = (
389 (*oid, *output_index, Err(DataflowError::from(err.clone()))),
390 MzOffset::from(u64::MAX),
391 Diff::ONE,
392 );
393 data_output.give_fueled(&data_cap_set[0], update).await;
394 }
395 }
396
397 definite_error_handle.give(
398 &definite_error_cap_set[0],
399 ReplicationError::Definite(Rc::new(err)),
400 );
401 return Ok(());
402 }
403 };
404 let mut stream = pin!(stream.peekable());
405
406 // Run the periodic schema validation on a separate task using a separate client,
407 // to prevent it from blocking the replication reading progress.
408 let ssh_tunnel_manager = &config.config.connection_context.ssh_tunnel_manager;
409 let client = connection_config
410 .connect("schema validation", ssh_tunnel_manager)
411 .await?;
412 let mut schema_errors = spawn_schema_validator(
413 client,
414 &config,
415 connection.publication.clone(),
416 table_info.clone(),
417 );
418
419 // Instead of downgrading the capability for every transaction we process we only do it
420 // if we're about to yield, which is checked at the bottom of the loop. This avoids
421 // creating excessive progress tracking traffic when there are multiple small
422 // transactions ready to go.
423 let mut data_upper = resume_lsn;
424 // A stash of reusable vectors to convert from bytes::Bytes based data, which is not
425 // compatible with `columnation`, to Vec<u8> data that is.
426 while let Some(event) = stream.as_mut().next().await {
427 use LogicalReplicationMessage::*;
428 use ReplicationMessage::*;
429 match event {
430 Ok(XLogData(data)) => match data.data() {
431 Begin(begin) => {
432 let commit_lsn = MzOffset::from(begin.final_lsn());
433
434 let mut tx = pin!(extract_transaction(
435 stream.by_ref(),
436 &*metadata_client,
437 commit_lsn,
438 &mut table_info,
439 &metrics,
440 &connection.publication,
441 ));
442
443 trace!(
444 %id,
445 "timely-{worker_id} extracting transaction \
446 at {commit_lsn}"
447 );
448 assert!(
449 data_upper <= commit_lsn,
450 "new_upper={data_upper} tx_lsn={commit_lsn}",
451 );
452 data_upper = commit_lsn + 1;
453 // We are about to ingest a transaction which has the possiblity to be
454 // very big and we certainly don't want to hold the data in memory. For
455 // this reason we eagerly downgrade the upper capability in order for
456 // the reclocking machinery to mint a binding that includes
457 // this transaction and therefore be able to pass the data of the
458 // transaction through as we stream it.
459 upper_cap_set.downgrade([&data_upper]);
460 while let Some((oid, output_index, event, diff)) = tx.try_next().await?
461 {
462 let event = event.map_err(Into::into);
463 let mut data = (oid, output_index, event);
464 if let Some(req) = rewinds.get(&output_index) {
465 if commit_lsn <= req.snapshot_lsn {
466 let update = (data, MzOffset::from(0), -diff);
467 data_output.give_fueled(&data_cap_set[0], &update).await;
468 data = update.0;
469 }
470 }
471 let update = (data, commit_lsn, diff);
472 data_output.give_fueled(&data_cap_set[0], &update).await;
473 }
474 }
475 _ => return Err(TransientError::BareTransactionEvent),
476 },
477 Ok(PrimaryKeepAlive(keepalive)) => {
478 trace!( %id,
479 "timely-{worker_id} received keepalive lsn={}",
480 keepalive.wal_end()
481 );
482
483 // Take the opportunity to report any schema validation errors.
484 while let Ok(error) = schema_errors.try_recv() {
485 use SchemaValidationError::*;
486 match error {
487 Postgres(PostgresError::PublicationMissing(publication)) => {
488 let err = DefiniteError::PublicationDropped(publication);
489 for (oid, outputs) in table_info.iter() {
490 for output_index in outputs.keys() {
491 let update = (
492 (
493 *oid,
494 *output_index,
495 Err(DataflowError::from(err.clone())),
496 ),
497 data_cap_set[0].time().clone(),
498 Diff::ONE,
499 );
500 data_output.give_fueled(&data_cap_set[0], update).await;
501 }
502 }
503 definite_error_handle.give(
504 &definite_error_cap_set[0],
505 ReplicationError::Definite(Rc::new(err)),
506 );
507 return Ok(());
508 }
509 Postgres(pg_error) => Err(TransientError::from(pg_error))?,
510 Schema {
511 oid,
512 output_index,
513 error,
514 } => {
515 let table = table_info.get_mut(&oid).unwrap();
516 if table.remove(&output_index).is_none() {
517 continue;
518 }
519
520 let update = (
521 (oid, output_index, Err(error.into())),
522 data_cap_set[0].time().clone(),
523 Diff::ONE,
524 );
525 data_output.give_fueled(&data_cap_set[0], update).await;
526 }
527 }
528 }
529 data_upper = std::cmp::max(data_upper, keepalive.wal_end().into());
530 }
531 Ok(_) => return Err(TransientError::UnknownReplicationMessage),
532 Err(err) => return Err(err),
533 }
534
535 let will_yield = stream.as_mut().peek().now_or_never().is_none();
536 if will_yield {
537 trace!(%id, "timely-{worker_id} yielding at lsn={data_upper}");
538 rewinds.retain(|_, req| data_upper <= req.snapshot_lsn);
539 // As long as there are pending rewinds we can't downgrade our data capability
540 // since we must be able to produce data at offset 0.
541 if rewinds.is_empty() {
542 data_cap_set.downgrade([&data_upper]);
543 }
544 upper_cap_set.downgrade([&data_upper]);
545 }
546 }
547 // We never expect the replication stream to gracefully end
548 Err(TransientError::ReplicationEOF)
549 }))
550 });
551
552 // We now process the slot updates and apply the cast expressions
553 let mut final_row = Row::default();
554 let mut datum_vec = DatumVec::new();
555 let mut next_worker = (0..u64::cast_from(scope.peers()))
556 // Round robin on 1000-records basis to avoid creating tiny containers when there are a
557 // small number of updates and a large number of workers.
558 .flat_map(|w| std::iter::repeat_n(w, 1000))
559 .cycle();
560 let round_robin = Exchange::new(move |_| next_worker.next().unwrap());
561 let replication_updates = data_stream
562 .map::<Vec<_>, _, _>(Clone::clone)
563 .unary(round_robin, "PgCastReplicationRows", |_, _| {
564 move |input, output| {
565 while let Some((time, data)) = input.next() {
566 let mut session = output.session(&time);
567 for ((oid, output_index, event), time, diff) in data.drain(..) {
568 let output = &table_info
569 .get(&oid)
570 .and_then(|outputs| outputs.get(&output_index))
571 .expect("table_info contains all outputs");
572 let event = event.and_then(|row| {
573 let datums = datum_vec.borrow_with(&row);
574 super::cast_row(&output.casts, &datums, &mut final_row)?;
575 Ok(SourceMessage {
576 key: Row::default(),
577 value: final_row.clone(),
578 metadata: Row::default(),
579 })
580 });
581
582 session.give(((output_index, event), time, diff));
583 }
584 }
585 }
586 })
587 .as_collection();
588
589 let errors = definite_errors.concat(&transient_errors.map(ReplicationError::from));
590
591 (
592 replication_updates,
593 upper_stream,
594 Some(probe_stream),
595 errors,
596 button.press_on_drop(),
597 )
598}
599
600/// Produces the logical replication stream while taking care of regularly sending standby
601/// keepalive messages with the provided `uppers` stream.
602///
603/// The returned stream will contain all transactions that whose commit LSN is beyond `resume_lsn`.
604async fn raw_stream<'a>(
605 config: &'a RawSourceCreationConfig,
606 replication_client: Client,
607 metadata_client: Arc<Client>,
608 slot: &'a str,
609 timeline_id: &'a Option<u64>,
610 publication: &'a str,
611 resume_lsn: MzOffset,
612 uppers: impl futures::Stream<Item = Antichain<MzOffset>> + 'a,
613 probe_output: &'a AsyncOutputHandle<
614 MzOffset,
615 CapacityContainerBuilder<Vec<Probe<MzOffset>>>,
616 Tee<MzOffset, Vec<Probe<MzOffset>>>,
617 >,
618 probe_cap: &'a Capability<MzOffset>,
619) -> Result<
620 Result<
621 impl AsyncStream<Item = Result<ReplicationMessage<LogicalReplicationMessage>, TransientError>>
622 + 'a,
623 DefiniteError,
624 >,
625 TransientError,
626> {
627 if let Err(err) = ensure_publication_exists(&*metadata_client, publication).await? {
628 // If the publication gets deleted there is nothing else to do. These errors
629 // are not retractable.
630 return Ok(Err(err));
631 }
632
633 // Skip the timeline ID check for sources without a known timeline ID
634 // (sources created before the timeline ID was added to the source details)
635 if let Some(expected_timeline_id) = timeline_id {
636 if let Err(err) =
637 ensure_replication_timeline_id(&replication_client, expected_timeline_id).await?
638 {
639 return Ok(Err(err));
640 }
641 }
642
643 // How often a proactive standby status update message should be sent to the server.
644 //
645 // The upstream will periodically request status updates by setting the keepalive's reply field
646 // value to 1. However, we cannot rely on these messages arriving on time. For example, when
647 // the upstream is sending a big transaction its keepalive messages are queued and can be
648 // delayed arbitrarily.
649 //
650 // See: <https://www.postgresql.org/message-id/CAMsr+YE2dSfHVr7iEv1GSPZihitWX-PMkD9QALEGcTYa+sdsgg@mail.gmail.com>
651 //
652 // For this reason we query the server's timeout value and proactively send a keepalive at
653 // twice the frequency to have a healthy margin from the deadline.
654 //
655 // Note: We must use the metadata client here which is NOT in replication mode. Some Aurora
656 // Postgres versions disallow SHOW commands from within replication connection.
657 // See: https://github.com/readysettech/readyset/discussions/28#discussioncomment-4405671
658 let row = simple_query_opt(&*metadata_client, "SHOW wal_sender_timeout;")
659 .await?
660 .unwrap();
661 let wal_sender_timeout = match row.get("wal_sender_timeout") {
662 // When this parameter is zero the timeout mechanism is disabled
663 Some("0") => None,
664 Some(value) => Some(
665 mz_repr::adt::interval::Interval::from_str(value)
666 .unwrap()
667 .duration()
668 .unwrap(),
669 ),
670 None => panic!("ubiquitous parameter missing"),
671 };
672
673 // This interval controls the cadence at which we send back status updates and, crucially,
674 // request PrimaryKeepAlive messages. PrimaryKeepAlive messages drive the frontier forward in
675 // the absence of data updates and we don't want a large `wal_sender_timeout` value to slow us
676 // down. For this reason the feedback interval is set to one second, or less if the
677 // wal_sender_timeout is less than 2 seconds.
678 let feedback_interval = match wal_sender_timeout {
679 Some(t) => std::cmp::min(Duration::from_secs(1), t.checked_div(2).unwrap()),
680 None => Duration::from_secs(1),
681 };
682
683 let mut feedback_timer = tokio::time::interval(feedback_interval);
684 // 'Delay' ensures we always tick at least 'feedback_interval'.
685 feedback_timer.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
686
687 // Postgres will return all transactions that commit *at or after* after the provided LSN,
688 // following the timely upper semantics.
689 let lsn = PgLsn::from(resume_lsn.offset);
690 let query = format!(
691 r#"START_REPLICATION SLOT "{}" LOGICAL {} ("proto_version" '1', "publication_names" '{}')"#,
692 Ident::new_unchecked(slot).to_ast_string_simple(),
693 lsn,
694 publication,
695 );
696 let copy_stream = match replication_client.copy_both_simple(&query).await {
697 Ok(copy_stream) => copy_stream,
698 Err(err) if err.code() == Some(&SqlState::OBJECT_NOT_IN_PREREQUISITE_STATE) => {
699 return Ok(Err(DefiniteError::InvalidReplicationSlot));
700 }
701 Err(err) => return Err(err.into()),
702 };
703
704 // According to the documentation [1] we must check that the slot LSN matches our
705 // expectations otherwise we risk getting silently fast-forwarded to a future LSN. In order
706 // to avoid a TOCTOU issue we must do this check after starting the replication stream. We
707 // cannot use the replication client to do that because it's already in CopyBoth mode.
708 // [1] https://www.postgresql.org/docs/15/protocol-replication.html#PROTOCOL-REPLICATION-START-REPLICATION-SLOT-LOGICAL
709 let slot_metadata = super::fetch_slot_metadata(
710 &*metadata_client,
711 slot,
712 mz_storage_types::dyncfgs::PG_FETCH_SLOT_RESUME_LSN_INTERVAL
713 .get(config.config.config_set()),
714 )
715 .await?;
716 let min_resume_lsn = slot_metadata.confirmed_flush_lsn;
717 tracing::info!(
718 %config.id,
719 "started replication using backend PID={:?}. wal_sender_timeout={:?}",
720 slot_metadata.active_pid, wal_sender_timeout
721 );
722
723 let (probe_tx, mut probe_rx) = watch::channel(None);
724 let config_set = Arc::clone(config.config.config_set());
725 let now_fn = config.now_fn.clone();
726 let max_lsn_task_handle =
727 mz_ore::task::spawn(|| format!("pg_current_wal_lsn:{}", config.id), async move {
728 let mut probe_ticker =
729 probe::Ticker::new(|| PG_OFFSET_KNOWN_INTERVAL.get(&config_set), now_fn);
730
731 while !probe_tx.is_closed() {
732 let probe_ts = probe_ticker.tick().await;
733 let probe_or_err = super::fetch_max_lsn(&*metadata_client)
734 .await
735 .map(|lsn| Probe {
736 probe_ts,
737 upstream_frontier: Antichain::from_elem(lsn),
738 });
739 let _ = probe_tx.send(Some(probe_or_err));
740 }
741 })
742 .abort_on_drop();
743
744 let stream = async_stream::try_stream!({
745 // Ensure we don't pre-drop the task
746 let _max_lsn_task_handle = max_lsn_task_handle;
747
748 // ensure we don't drop the replication client!
749 let _replication_client = replication_client;
750
751 let mut uppers = pin!(uppers);
752 let mut last_committed_upper = resume_lsn;
753
754 let mut stream = pin!(LogicalReplicationStream::new(copy_stream));
755
756 if !(resume_lsn == MzOffset::from(0) || min_resume_lsn <= resume_lsn) {
757 let err = TransientError::OvercompactedReplicationSlot {
758 available_lsn: min_resume_lsn,
759 requested_lsn: resume_lsn,
760 };
761 error!("timely-{} ({}) {err}", config.worker_id, config.id);
762 Err(err)?;
763 }
764
765 loop {
766 tokio::select! {
767 Some(next_message) = stream.next() => match next_message {
768 Ok(ReplicationMessage::XLogData(data)) => {
769 yield ReplicationMessage::XLogData(data);
770 Ok(())
771 }
772 Ok(ReplicationMessage::PrimaryKeepAlive(keepalive)) => {
773 yield ReplicationMessage::PrimaryKeepAlive(keepalive);
774 Ok(())
775 }
776 Err(err) => Err(err.into()),
777 _ => Err(TransientError::UnknownReplicationMessage),
778 },
779 _ = feedback_timer.tick() => {
780 let ts: i64 = PG_EPOCH.elapsed().unwrap().as_micros().try_into().unwrap();
781 let lsn = PgLsn::from(last_committed_upper.offset);
782 trace!("timely-{} ({}) sending keepalive {lsn:?}", config.worker_id, config.id);
783 // Postgres only sends PrimaryKeepAlive messages when *it* wants a reply, which
784 // happens when out status update is late. Since we send them proactively this
785 // may never happen. It is therefore *crucial* that we set the last parameter
786 // (the reply flag) to 1 here. This will cause the upstream server to send us a
787 // PrimaryKeepAlive message promptly which will give us frontier advancement
788 // information in the absence of data updates.
789 let res = stream.as_mut().standby_status_update(lsn, lsn, lsn, ts, 1).await;
790 res.map_err(|e| e.into())
791 },
792 Some(upper) = uppers.next() => match upper.into_option() {
793 Some(lsn) => {
794 if last_committed_upper < lsn {
795 last_committed_upper = lsn;
796 for stat in config.statistics.values() {
797 stat.set_offset_committed(last_committed_upper.offset);
798 }
799 }
800 Ok(())
801 }
802 None => Ok(()),
803 },
804 Ok(()) = probe_rx.changed() => match &*probe_rx.borrow() {
805 Some(Ok(probe)) => {
806 if let Some(offset_known) = probe.upstream_frontier.as_option() {
807 for stat in config.statistics.values() {
808 stat.set_offset_known(offset_known.offset);
809 }
810 }
811 probe_output.give(probe_cap, probe);
812 Ok(())
813 },
814 Some(Err(err)) => Err(anyhow::anyhow!("{err}").into()),
815 None => Ok(()),
816 },
817 else => return
818 }?;
819 }
820 });
821 Ok(Ok(stream))
822}
823
824/// Extracts a single transaction from the replication stream delimited by a BEGIN and COMMIT
825/// message. The BEGIN message must have already been consumed from the stream before calling this
826/// function.
827fn extract_transaction<'a>(
828 stream: impl AsyncStream<
829 Item = Result<ReplicationMessage<LogicalReplicationMessage>, TransientError>,
830 > + 'a,
831 metadata_client: &'a Client,
832 commit_lsn: MzOffset,
833 table_info: &'a mut BTreeMap<u32, BTreeMap<usize, SourceOutputInfo>>,
834 metrics: &'a PgSourceMetrics,
835 publication: &'a str,
836) -> impl AsyncStream<Item = Result<(u32, usize, Result<Row, DefiniteError>, Diff), TransientError>> + 'a
837{
838 use LogicalReplicationMessage::*;
839 let mut row = Row::default();
840 async_stream::try_stream!({
841 let mut stream = pin!(stream);
842 metrics.transactions.inc();
843 metrics.lsn.set(commit_lsn.offset);
844 while let Some(event) = stream.try_next().await? {
845 // We can ignore keepalive messages while processing a transaction because the
846 // commit_lsn will drive progress.
847 let message = match event {
848 ReplicationMessage::XLogData(data) => data.into_data(),
849 ReplicationMessage::PrimaryKeepAlive(_) => {
850 metrics.ignored.inc();
851 continue;
852 }
853 _ => Err(TransientError::UnknownReplicationMessage)?,
854 };
855 metrics.total.inc();
856 match message {
857 Insert(body) if !table_info.contains_key(&body.rel_id()) => metrics.ignored.inc(),
858 Update(body) if !table_info.contains_key(&body.rel_id()) => metrics.ignored.inc(),
859 Delete(body) if !table_info.contains_key(&body.rel_id()) => metrics.ignored.inc(),
860 Relation(body) if !table_info.contains_key(&body.rel_id()) => metrics.ignored.inc(),
861 Insert(body) => {
862 metrics.inserts.inc();
863 let rel = body.rel_id();
864 for (output, info) in table_info.get(&rel).into_iter().flatten() {
865 let tuple_data = body.tuple().tuple_data();
866 let Some(ref projection) = info.projection else {
867 panic!("missing projection for {rel}");
868 };
869 let datums = projection.iter().map(|idx| &tuple_data[*idx]);
870 let row = unpack_tuple(datums, &mut row);
871 yield (rel, *output, row, Diff::ONE);
872 }
873 }
874 Update(body) => match body.old_tuple() {
875 Some(old_tuple) => {
876 metrics.updates.inc();
877 let new_tuple = body.new_tuple();
878 let rel = body.rel_id();
879 for (output, info) in table_info.get(&rel).into_iter().flatten() {
880 let Some(ref projection) = info.projection else {
881 panic!("missing projection for {rel}");
882 };
883 let old_tuple =
884 projection.iter().map(|idx| &old_tuple.tuple_data()[*idx]);
885 // If the new tuple contains unchanged toast values we reference the old ones
886 let new_tuple = std::iter::zip(
887 projection.iter().map(|idx| &new_tuple.tuple_data()[*idx]),
888 old_tuple.clone(),
889 )
890 .map(|(new, old)| match new {
891 TupleData::UnchangedToast => old,
892 _ => new,
893 });
894 let old_row = unpack_tuple(old_tuple, &mut row);
895 let new_row = unpack_tuple(new_tuple, &mut row);
896
897 yield (rel, *output, old_row, Diff::MINUS_ONE);
898 yield (rel, *output, new_row, Diff::ONE);
899 }
900 }
901 None => {
902 let rel = body.rel_id();
903 for (output, _) in table_info.get(&rel).into_iter().flatten() {
904 yield (
905 rel,
906 *output,
907 Err(DefiniteError::DefaultReplicaIdentity),
908 Diff::ONE,
909 );
910 }
911 }
912 },
913 Delete(body) => match body.old_tuple() {
914 Some(old_tuple) => {
915 metrics.deletes.inc();
916 let rel = body.rel_id();
917 for (output, info) in table_info.get(&rel).into_iter().flatten() {
918 let Some(ref projection) = info.projection else {
919 panic!("missing projection for {rel}");
920 };
921 let datums = projection.iter().map(|idx| &old_tuple.tuple_data()[*idx]);
922 let row = unpack_tuple(datums, &mut row);
923 yield (rel, *output, row, Diff::MINUS_ONE);
924 }
925 }
926 None => {
927 let rel = body.rel_id();
928 for (output, _) in table_info.get(&rel).into_iter().flatten() {
929 yield (
930 rel,
931 *output,
932 Err(DefiniteError::DefaultReplicaIdentity),
933 Diff::ONE,
934 );
935 }
936 }
937 },
938 Relation(body) => {
939 let rel_id = body.rel_id();
940 if let Some(outputs) = table_info.get_mut(&body.rel_id()) {
941 // Because the replication stream doesn't include columns' attnums, we need
942 // to check the current local schema against the current remote schema to
943 // ensure e.g. we haven't received a schema update with the same terminal
944 // column name which is actually a different column.
945 let upstream_info = mz_postgres_util::publication_info(
946 metadata_client,
947 publication,
948 Some(&[rel_id]),
949 )
950 .await?;
951
952 let mut schema_errors = vec![];
953
954 outputs.retain(|output_index, info| {
955 match verify_schema(rel_id, info, &upstream_info) {
956 Ok(()) => true,
957 Err(err) => {
958 schema_errors.push((
959 rel_id,
960 *output_index,
961 Err(err),
962 Diff::ONE,
963 ));
964 false
965 }
966 }
967 });
968 // Recalculate projection vector for the retained valid outputs. Here we
969 // must use the column names in the RelationBody message and not the
970 // upstream_info obtained above, since that one represents the current
971 // schema upstream which may be many versions head of the one we're about
972 // to receive after this Relation message.
973 let column_positions: BTreeMap<_, _> = body
974 .columns()
975 .iter()
976 .enumerate()
977 .map(|(idx, col)| (col.name().unwrap(), idx))
978 .collect();
979 for info in outputs.values_mut() {
980 let mut projection = vec![];
981 for col in info.desc.columns.iter() {
982 projection.push(column_positions[&*col.name]);
983 }
984 info.projection = Some(projection);
985 }
986 for schema_error in schema_errors {
987 yield schema_error;
988 }
989 }
990 }
991 Truncate(body) => {
992 for &rel_id in body.rel_ids() {
993 if let Some(outputs) = table_info.get_mut(&rel_id) {
994 for (output, _) in std::mem::take(outputs) {
995 yield (
996 rel_id,
997 output,
998 Err(DefiniteError::TableTruncated),
999 Diff::ONE,
1000 );
1001 }
1002 }
1003 }
1004 }
1005 Commit(body) => {
1006 if commit_lsn != body.commit_lsn().into() {
1007 Err(TransientError::InvalidTransaction)?
1008 }
1009 return;
1010 }
1011 // TODO: We should handle origin messages and emit an error as they indicate that
1012 // the upstream performed a point in time restore so all bets are off about the
1013 // continuity of the stream.
1014 Origin(_) | Type(_) => metrics.ignored.inc(),
1015 Begin(_) => Err(TransientError::NestedTransaction)?,
1016 // The enum is marked as non_exhaustive. Better to be conservative
1017 _ => Err(TransientError::UnknownLogicalReplicationMessage)?,
1018 }
1019 }
1020 Err(TransientError::ReplicationEOF)?;
1021 })
1022}
1023
1024/// Unpacks an iterator of TupleData into a list of nullable bytes or an error if this can't be
1025/// done.
1026#[inline]
1027fn unpack_tuple<'a, I>(tuple_data: I, row: &mut Row) -> Result<Row, DefiniteError>
1028where
1029 I: IntoIterator<Item = &'a TupleData>,
1030 I::IntoIter: ExactSizeIterator,
1031{
1032 let iter = tuple_data.into_iter();
1033 let mut packer = row.packer();
1034 for data in iter {
1035 let datum = match data {
1036 TupleData::Text(bytes) => super::decode_utf8_text(bytes)?,
1037 TupleData::Null => Datum::Null,
1038 TupleData::UnchangedToast => return Err(DefiniteError::MissingToast),
1039 TupleData::Binary(_) => return Err(DefiniteError::UnexpectedBinaryData),
1040 };
1041 packer.push(datum);
1042 }
1043 Ok(row.clone())
1044}
1045
1046/// Ensures the publication exists on the server. It returns an outer transient error in case of
1047/// connection issues and an inner definite error if the publication is dropped.
1048async fn ensure_publication_exists(
1049 client: &Client,
1050 publication: &str,
1051) -> Result<Result<(), DefiniteError>, TransientError> {
1052 // Figure out the last written LSN and then add one to convert it into an upper.
1053 let result = client
1054 .query_opt(
1055 "SELECT 1 FROM pg_publication WHERE pubname = $1;",
1056 &[&publication],
1057 )
1058 .await?;
1059 match result {
1060 Some(_) => Ok(Ok(())),
1061 None => Ok(Err(DefiniteError::PublicationDropped(
1062 publication.to_owned(),
1063 ))),
1064 }
1065}
1066
1067/// Ensure the active replication timeline_id matches the one we expect such that we can safely
1068/// resume replication. It returns an outer transient error in case of
1069/// connection issues and an inner definite error if the timeline id does not match.
1070async fn ensure_replication_timeline_id(
1071 replication_client: &Client,
1072 expected_timeline_id: &u64,
1073) -> Result<Result<(), DefiniteError>, TransientError> {
1074 let timeline_id = mz_postgres_util::get_timeline_id(replication_client).await?;
1075 if timeline_id == *expected_timeline_id {
1076 Ok(Ok(()))
1077 } else {
1078 Ok(Err(DefiniteError::InvalidTimelineId {
1079 expected: *expected_timeline_id,
1080 actual: timeline_id,
1081 }))
1082 }
1083}
1084
1085enum SchemaValidationError {
1086 Postgres(PostgresError),
1087 Schema {
1088 oid: u32,
1089 output_index: usize,
1090 error: DefiniteError,
1091 },
1092}
1093
1094fn spawn_schema_validator(
1095 client: Client,
1096 config: &RawSourceCreationConfig,
1097 publication: String,
1098 table_info: BTreeMap<u32, BTreeMap<usize, SourceOutputInfo>>,
1099) -> mpsc::UnboundedReceiver<SchemaValidationError> {
1100 let (tx, rx) = mpsc::unbounded_channel();
1101 let source_id = config.id;
1102 let config_set = Arc::clone(config.config.config_set());
1103
1104 mz_ore::task::spawn(|| format!("schema-validator:{}", source_id), async move {
1105 while !tx.is_closed() {
1106 trace!(%source_id, "validating schemas");
1107
1108 let validation_start = Instant::now();
1109
1110 let upstream_info = match mz_postgres_util::publication_info(
1111 &*client,
1112 &publication,
1113 Some(&table_info.keys().copied().collect::<Vec<_>>()),
1114 )
1115 .await
1116 {
1117 Ok(info) => info,
1118 Err(error) => {
1119 let _ = tx.send(SchemaValidationError::Postgres(error));
1120 continue;
1121 }
1122 };
1123
1124 for (&oid, outputs) in table_info.iter() {
1125 for (&output_index, info) in outputs {
1126 if let Err(error) = verify_schema(oid, info, &upstream_info) {
1127 trace!(
1128 %source_id,
1129 "schema of output index {output_index} for oid {oid} invalid",
1130 );
1131 let _ = tx.send(SchemaValidationError::Schema {
1132 oid,
1133 output_index,
1134 error,
1135 });
1136 } else {
1137 trace!(
1138 %source_id,
1139 "schema of output index {output_index} for oid {oid} valid",
1140 );
1141 }
1142 }
1143 }
1144
1145 let interval = PG_SCHEMA_VALIDATION_INTERVAL.get(&config_set);
1146 let elapsed = validation_start.elapsed();
1147 let wait = interval.saturating_sub(elapsed);
1148 tokio::time::sleep(wait).await;
1149 }
1150 });
1151
1152 rx
1153}