mz_storage/source/postgres/snapshot.rs
1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Renders the table snapshot side of the [`PostgresSourceConnection`] ingestion dataflow.
11//!
12//! # Snapshot reading
13//!
14//! Depending on the resumption LSNs the table reader decides which tables need to be snapshotted.
15//! Each table is partitioned across all workers using PostgreSQL's `ctid` (tuple identifier)
16//! column, which identifies the physical location of each row. This allows parallel snapshotting
17//! of large tables across all available workers.
18//!
19//! There are a few subtle points about this operation, described in the following sections.
20//!
21//! ## Consistent LSN point for snapshot transactions
22//!
23//! Given that all our ingestion is based on correctly timestamping updates with the LSN they
24//! happened at it is important that we run the `COPY` query at a specific LSN point that is
25//! relatable with the LSN numbers we receive from the replication stream. Such point does not
26//! necessarily exist for a normal SQL transaction. To achieve this we must force postgres to
27//! produce a consistent point and let us know of the LSN number of that by creating a replication
28//! slot as the first statement in a transaction.
29//!
30//! This is a temporary dummy slot that is only used to put our snapshot transaction on a
31//! consistent LSN point. Unfortunately no lighterweight method exists for doing this. See this
32//! [postgres thread] for more details.
33//!
34//! One might wonder why we don't use the actual real slot to provide us with the snapshot point
35//! which would automatically be at the correct LSN. The answer is that it's possible that we crash
36//! and restart after having already created the slot but before having finished the snapshot. In
37//! that case the restarting process will have lost its opportunity to run queries at the slot's
38//! consistent point as that opportunity only exists in the ephemeral transaction that created the
39//! slot and that is long gone. Additionally there are good reasons of why we'd like to move the
40//! slot creation much earlier, e.g during purification, in which case the slot will always be
41//! pre-created.
42//!
43//! [postgres thread]: https://www.postgresql.org/message-id/flat/CAMN0T-vzzNy6TV1Jvh4xzNQdAvCLBQK_kh6_U7kAXgGU3ZFg-Q%40mail.gmail.com
44//!
45//! ## Reusing the consistent point among all workers
46//!
47//! Creating replication slots is potentially expensive so the code makes is such that all workers
48//! cooperate and reuse one consistent snapshot among them. In order to do so we make use the
49//! "export transaction" feature of postgres. This feature allows one SQL session to create an
50//! identifier for the transaction (a string identifier) it is currently in, which can be used by
51//! other sessions to enter the same "snapshot".
52//!
53//! We accomplish this by picking one worker at random to function as the transaction leader. The
54//! transaction leader is responsible for starting a SQL session, creating a temporary replication
55//! slot in a transaction, exporting the transaction id, and broadcasting the transaction
56//! information to all other workers via a broadcasted feedback edge.
57//!
58//! During this phase the follower workers are simply waiting to hear on the feedback edge,
59//! effectively synchronizing with the leader. Once all workers have received the snapshot
60//! information they can all start to perform their assigned COPY queries.
61//!
62//! The leader and follower steps described above are accomplished by the [`export_snapshot`] and
63//! [`use_snapshot`] functions respectively.
64//!
65//! ## Coordinated transaction COMMIT
66//!
67//! When follower workers are done with snapshotting they commit their transaction, close their
68//! session, and then drop their snapshot feedback capability. When the leader worker is done with
69//! snapshotting it drops its snapshot feedback capability and waits until it observes the
70//! snapshot input advancing to the empty frontier. This allows the leader to COMMIT its
71//! transaction last, which is the transaction that exported the snapshot.
72//!
73//! It's unclear if this is strictly necessary, but having the frontiers made it easy enough that I
74//! added the synchronization.
75//!
76//! ## Snapshot rewinding
77//!
78//! Ingestion dataflows must produce definite data, including the snapshot. What this means
79//! practically is that whenever we deem it necessary to snapshot a table we must do so at the same
80//! LSN. However, the method for running a transaction described above doesn't let us choose the
81//! LSN, it could be an LSN in the future chosen by PostgresSQL while it creates the temporary
82//! replication slot.
83//!
84//! The definition of differential collections states that a collection at some time `t_snapshot`
85//! is defined to be the accumulation of all updates that happen at `t <= t_snapshot`, where `<=`
86//! is the partial order. In this case we are faced with the problem of knowing the state of a
87//! table at `t_snapshot` but actually wanting to know the snapshot at `t_slot <= t_snapshot`.
88//!
89//! From the definition we can see that the snapshot at `t_slot` is related to the snapshot at
90//! `t_snapshot` with the following equations:
91//!
92//!```text
93//! sum(update: t <= t_snapshot) = sum(update: t <= t_slot) + sum(update: t_slot <= t <= t_snapshot)
94//! |
95//! V
96//! sum(update: t <= t_slot) = sum(update: t <= snapshot) - sum(update: t_slot <= t <= t_snapshot)
97//! ```
98//!
99//! Therefore, if we manage to recover the `sum(update: t_slot <= t <= t_snapshot)` term we will be
100//! able to "rewind" the snapshot we obtained at `t_snapshot` to `t_slot` by emitting all updates
101//! that happen between these two points with their diffs negated.
102//!
103//! It turns out that this term is exactly what the main replication slot provides us with and we
104//! can rewind snapshot at arbitrary points! In order to do this the snapshot dataflow emits rewind
105//! requests to the replication reader which informs it that a certain range of updates must be
106//! emitted at LSN 0 (by convention) with their diffs negated. These negated diffs are consolidated
107//! with the diffs taken at `t_snapshot` that were also emitted at LSN 0 (by convention) and we end
108//! up with a TVC that at LSN 0 contains the snapshot at `t_slot`.
109//!
110//! # Parallel table snapshotting with ctid ranges
111//!
112//! Each table is partitioned across workers using PostgreSQL's `ctid` column. The `ctid` is a
113//! tuple identifier of the form `(block_number, tuple_index)` that represents the physical
114//! location of a row on disk. By partitioning the ctid range, each worker can independently
115//! fetch a portion of the table.
116//!
117//! The partitioning works as follows:
118//! 1. The snapshot leader queries `pg_class.relpages` to estimate the number of blocks for each
119//! table. This is much faster than querying `max(ctid)` which would require a sequential scan.
120//! 2. The leader broadcasts the block count estimates along with the snapshot transaction ID
121//! to all workers, ensuring all workers use consistent estimates for partitioning.
122//! 3. Each worker calculates its assigned block range and fetches rows using a `COPY` query
123//! with a `SELECT` that filters by `ctid >= start AND ctid < end`.
124//! 4. The last worker uses an open-ended range (`ctid >= start`) to capture any rows beyond
125//! the estimated block count (handles cases where statistics are stale or table has grown).
126//!
127//! This approach efficiently parallelizes large table snapshots while maintaining the benefits
128//! of the `COPY` protocol for bulk data transfer.
129//!
130//! ## PostgreSQL version requirements
131//!
132//! Ctid range scans are only efficient on PostgreSQL >= 14 due to TID range scan optimizations
133//! introduced in that version. For older PostgreSQL versions, the snapshot falls back to the
134//! single-worker-per-table mode where each table is assigned to one worker based on consistent
135//! hashing. This is implemented by having the leader broadcast all-zero block counts when
136//! PostgreSQL version < 14.
137//!
138//! # Snapshot decoding
139//!
140//! Each worker fetches its ctid range directly and decodes the COPY stream locally.
141//!
142//! ```text
143//! ╭──────────────────╮
144//! ┏━━━━━━━━━━━━v━┓ │ exported
145//! ┃ table ┃ ╭─────────╮ │ snapshot id
146//! ┃ readers ┠─>─┤broadcast├──╯
147//! ┃ (parallel) ┃ ╰─────────╯
148//! ┗━┯━━━━━━━━━━┯━┛
149//! raw│ │
150//! COPY│ │
151//! data│ │
152//! ┏━━━━┷━━━━┓ │
153//! ┃ COPY ┃ │
154//! ┃ decoder ┃ │
155//! ┗━━━━┯━━━━┛ │
156//! │ snapshot │rewind
157//! │ updates │requests
158//! v v
159//! ```
160
161use std::collections::BTreeMap;
162use std::convert::Infallible;
163use std::pin::pin;
164use std::rc::Rc;
165use std::sync::Arc;
166use std::time::Duration;
167
168use anyhow::bail;
169use differential_dataflow::AsCollection;
170use futures::{StreamExt as _, TryStreamExt};
171use mz_ore::cast::CastFrom;
172use mz_ore::future::InTask;
173use mz_postgres_util::desc::PostgresTableDesc;
174use mz_postgres_util::schemas::get_pg_major_version;
175use mz_postgres_util::{Client, Config, PostgresError, Sql, simple_query, simple_query_opt, sql};
176use mz_repr::{Datum, DatumVec, Diff, Row};
177use mz_storage_types::connections::ConnectionContext;
178use mz_storage_types::errors::DataflowError;
179use mz_storage_types::parameters::PgSourceSnapshotConfig;
180use mz_storage_types::sources::{MzOffset, PostgresSourceConnection};
181use mz_timely_util::builder_async::{
182 Event as AsyncEvent, OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton,
183};
184use timely::container::CapacityContainerBuilder;
185use timely::dataflow::channels::pact::Pipeline;
186use timely::dataflow::operators::core::Map;
187use timely::dataflow::operators::vec::Broadcast;
188use timely::dataflow::operators::{CapabilitySet, Concat, ConnectLoop, Feedback, Operator};
189use timely::dataflow::{Scope, StreamVec};
190use timely::progress::Timestamp;
191use tokio_postgres::error::SqlState;
192use tokio_postgres::types::{Oid, PgLsn};
193use tracing::trace;
194
195use crate::metrics::source::postgres::PgSnapshotMetrics;
196use crate::source::RawSourceCreationConfig;
197use crate::source::postgres::replication::RewindRequest;
198use crate::source::postgres::{
199 DefiniteError, ReplicationError, SourceOutputInfo, TransientError, verify_schema,
200};
201use crate::source::types::{FuelSize, SignaledFuture, SourceMessage, StackedCollection};
202use crate::statistics::SourceStatistics;
203
204/// Information broadcasted from the snapshot leader to all workers.
205/// This includes the transaction snapshot ID, LSN, and estimated block counts for each table.
206#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
207struct SnapshotInfo {
208 /// The exported transaction snapshot identifier.
209 snapshot_id: String,
210 /// The LSN at which the snapshot was taken.
211 snapshot_lsn: MzOffset,
212 /// Estimated number of blocks (pages) for each table, keyed by OID.
213 /// This is derived from `pg_class.relpages` and used to partition ctid ranges.
214 table_block_counts: BTreeMap<u32, u64>,
215 /// The current upstream schema of each table.
216 upstream_info: BTreeMap<u32, PostgresTableDesc>,
217}
218
219/// Represents a ctid range that a worker should snapshot.
220/// The range is [start_block, end_block) where end_block is optional (None means unbounded).
221#[derive(Debug)]
222struct CtidRange {
223 /// The starting block number (inclusive).
224 start_block: u64,
225 /// The ending block number (exclusive). None means unbounded (open-ended range).
226 end_block: Option<u64>,
227}
228
229/// Calculate the ctid range for a given worker based on estimated block count.
230///
231/// The table is partitioned by block number across all workers. Each worker gets a contiguous
232/// range of blocks. The last worker gets an open-ended range to handle any rows beyond the
233/// estimated block count.
234///
235/// When `estimated_blocks` is 0 (either because statistics are unavailable, the table appears
236/// empty, or PostgreSQL version < 14 doesn't support ctid range scans), the table is assigned
237/// to a single worker determined by `config.responsible_for(oid)` and that worker scans the
238/// full table.
239///
240/// Returns None if this worker has no work to do.
241fn worker_ctid_range(
242 config: &RawSourceCreationConfig,
243 estimated_blocks: u64,
244 oid: u32,
245) -> Option<CtidRange> {
246 // If estimated_blocks is 0, fall back to single-worker mode for this table.
247 // This handles:
248 // - PostgreSQL < 14 (ctid range scans not supported)
249 // - Tables that appear empty in statistics
250 // - Tables with stale/missing statistics
251 // The responsible worker scans the full table with an open-ended range.
252 if estimated_blocks == 0 {
253 let fallback = if config.responsible_for(oid) {
254 Some(CtidRange {
255 start_block: 0,
256 end_block: None,
257 })
258 } else {
259 None
260 };
261 return fallback;
262 }
263
264 let worker_id = u64::cast_from(config.worker_id);
265 let worker_count = u64::cast_from(config.worker_count);
266
267 // If there are more workers than blocks, only assign work to workers with id < estimated_blocks
268 // The last assigned worker still gets an open range.
269 let effective_worker_count = std::cmp::min(worker_count, estimated_blocks);
270
271 if worker_id >= effective_worker_count {
272 // This worker has no work to do
273 return None;
274 }
275
276 // Calculate start block for this worker (integer division distributes blocks evenly)
277 let start_block = worker_id * estimated_blocks / effective_worker_count;
278
279 // The last effective worker gets an open-ended range
280 let is_last_effective_worker = worker_id == effective_worker_count - 1;
281 if is_last_effective_worker {
282 Some(CtidRange {
283 start_block,
284 end_block: None,
285 })
286 } else {
287 let end_block = (worker_id + 1) * estimated_blocks / effective_worker_count;
288 Some(CtidRange {
289 start_block,
290 end_block: Some(end_block),
291 })
292 }
293}
294
295/// Estimate the number of blocks for each table from pg_class statistics.
296/// This is used to partition ctid ranges across workers.
297async fn estimate_table_block_counts(
298 client: &Client,
299 table_oids: &[u32],
300) -> Result<BTreeMap<u32, u64>, TransientError> {
301 if table_oids.is_empty() {
302 return Ok(BTreeMap::new());
303 }
304
305 // Query relpages for all tables at once.
306 let oid_list = Sql::join(table_oids.iter().copied().map(Sql::from), ",");
307 let query = sql!(
308 "SELECT oid, relpages FROM pg_class WHERE oid IN ({})",
309 oid_list
310 );
311
312 let mut block_counts = BTreeMap::new();
313 // Initialize all tables with 0 blocks (in case they're not in pg_class)
314 for &oid in table_oids {
315 block_counts.insert(oid, 0);
316 }
317
318 // Execute the query and collect results
319 let rows = simple_query(client, query).await?;
320 for msg in rows {
321 if let tokio_postgres::SimpleQueryMessage::Row(row) = msg {
322 let oid: u32 = row.get("oid").unwrap().parse().unwrap();
323 let relpages: i64 = row.get("relpages").unwrap().parse().unwrap_or(0);
324 // relpages can be -1 if never analyzed, treat as 0
325 let relpages = std::cmp::max(0, relpages).try_into().unwrap();
326 block_counts.insert(oid, relpages);
327 }
328 }
329
330 Ok(block_counts)
331}
332
333/// Renders the snapshot dataflow. See the module documentation for more information.
334pub(crate) fn render<'scope>(
335 scope: Scope<'scope, MzOffset>,
336 config: RawSourceCreationConfig,
337 connection: PostgresSourceConnection,
338 table_info: BTreeMap<u32, BTreeMap<usize, SourceOutputInfo>>,
339 metrics: PgSnapshotMetrics,
340) -> (
341 StackedCollection<'scope, MzOffset, (usize, Result<SourceMessage, DataflowError>)>,
342 StreamVec<'scope, MzOffset, RewindRequest>,
343 StreamVec<'scope, MzOffset, Infallible>,
344 StreamVec<'scope, MzOffset, ReplicationError>,
345 PressOnDropButton,
346) {
347 let op_name = format!("TableReader({})", config.id);
348 let mut builder = AsyncOperatorBuilder::new(op_name, scope.clone());
349
350 let (feedback_handle, feedback_data) = scope.feedback(Default::default());
351
352 let (raw_handle, raw_data) = builder.new_output();
353 let (rewinds_handle, rewinds) = builder.new_output::<CapacityContainerBuilder<_>>();
354 // This output is used to signal to the replication operator that the replication slot has been
355 // created. With the current state of execution serialization there isn't a lot of benefit
356 // of splitting the snapshot and replication phases into two operators.
357 // TODO(petrosagg): merge the two operators in one (while still maintaining separation as
358 // functions/modules)
359 let (_, slot_ready) = builder.new_output::<CapacityContainerBuilder<_>>();
360 let (snapshot_handle, snapshot) = builder.new_output::<CapacityContainerBuilder<_>>();
361 let (definite_error_handle, definite_errors) =
362 builder.new_output::<CapacityContainerBuilder<_>>();
363
364 // This operator needs to broadcast data to itself in order to synchronize the transaction
365 // snapshot. However, none of the feedback capabilities result in output messages and for the
366 // feedback edge specifically having a default conncetion would result in a loop.
367 let mut snapshot_input = builder.new_disconnected_input(feedback_data, Pipeline);
368
369 // The export id must be sent to all workers, so we broadcast the feedback connection
370 snapshot.broadcast().connect_loop(feedback_handle);
371
372 let is_snapshot_leader = config.responsible_for("snapshot_leader");
373
374 // A global view of all outputs that will be snapshot by all workers.
375 let mut all_outputs = vec![];
376 // Table info for tables that need snapshotting. All workers will snapshot all tables,
377 // but each worker will handle a different ctid range within each table.
378 let mut tables_to_snapshot = BTreeMap::new();
379 // A collection of `SourceStatistics` to update for a given Oid. Same info exists in table_info,
380 // but this avoids having to iterate + map each time the statistics are needed.
381 let mut export_statistics = BTreeMap::new();
382 for (table, outputs) in table_info.iter() {
383 for (&output_index, output) in outputs {
384 if *output.resume_upper != [MzOffset::minimum()] {
385 // Already has been snapshotted.
386 continue;
387 }
388 all_outputs.push(output_index);
389 tables_to_snapshot
390 .entry(*table)
391 .or_insert_with(BTreeMap::new)
392 .insert(output_index, output.clone());
393 let statistics = config
394 .statistics
395 .get(&output.export_id)
396 .expect("statistics are initialized")
397 .clone();
398 export_statistics.insert((*table, output_index), statistics);
399 }
400 }
401
402 let (button, transient_errors) = builder.build_fallible(move |caps| {
403 let busy_signal = Arc::clone(&config.busy_signal);
404 Box::pin(SignaledFuture::new(busy_signal, async move {
405 let id = config.id;
406 let worker_id = config.worker_id;
407 let [
408 data_cap_set,
409 rewind_cap_set,
410 slot_ready_cap_set,
411 snapshot_cap_set,
412 definite_error_cap_set,
413 ]: &mut [_; 5] = caps.try_into().unwrap();
414
415 trace!(
416 %id,
417 "timely-{worker_id} initializing table reader \
418 with {} tables to snapshot",
419 tables_to_snapshot.len()
420 );
421
422 let connection_config = connection
423 .connection
424 .config(
425 &config.config.connection_context.secrets_reader,
426 &config.config,
427 InTask::Yes,
428 )
429 .await?;
430
431
432 // The snapshot operator is responsible for creating the replication slot(s).
433 // This first slot is the permanent slot that will be used for reading the replication
434 // stream. A temporary slot is created further on to capture table snapshots.
435 let replication_client = if is_snapshot_leader {
436 let client = connection_config
437 .connect_replication(&config.config.connection_context.ssh_tunnel_manager)
438 .await?;
439 let main_slot = &connection.publication_details.slot;
440
441 tracing::info!(%id, "ensuring replication slot {main_slot} exists");
442 super::ensure_replication_slot(&client, main_slot).await?;
443 Some(client)
444 } else {
445 None
446 };
447 *slot_ready_cap_set = CapabilitySet::new();
448
449 // Nothing needs to be snapshot.
450 if all_outputs.is_empty() {
451 trace!(%id, "no exports to snapshot");
452 // Note we do not emit a `ProgressStatisticsUpdate::Snapshot` update here,
453 // as we do not want to attempt to override the current value with 0. We
454 // just leave it null.
455 return Ok(());
456 }
457
458 // A worker *must* emit a count even if not responsible for snapshotting a table
459 // as statistic summarization will return null if any worker hasn't set a value.
460 // This will also reset snapshot stats for any exports not snapshotting.
461 // If no workers need to snapshot, then avoid emitting these as they will clear
462 // previous stats.
463 for statistics in config.statistics.values() {
464 statistics.set_snapshot_records_known(0);
465 statistics.set_snapshot_records_staged(0);
466 }
467
468 // Collect table OIDs for block count estimation
469 let table_oids: Vec<u32> = tables_to_snapshot.keys().copied().collect();
470
471 // replication client is only set if this worker is the snapshot leader
472 let client = match replication_client {
473 Some(client) => {
474 let tmp_slot = format!("mzsnapshot_{}", uuid::Uuid::new_v4()).replace('-', "");
475 let (snapshot_id, snapshot_lsn) =
476 export_snapshot(&client, &tmp_slot, true).await?;
477
478 // Check PostgreSQL version. Ctid range scans are only efficient on PG >= 14
479 // due to improvements in TID range scan support.
480 let pg_version = get_pg_major_version(&client).await?;
481
482 // Estimate block counts for all tables from pg_class statistics.
483 // This must be done by the leader and broadcasted to ensure all workers
484 // use the same estimates for ctid range partitioning.
485 //
486 // For PostgreSQL < 14, we set all block counts to 0 to fall back to
487 // single-worker-per-table mode, as ctid range scans are not well supported.
488 let table_block_counts = if pg_version >= 14 {
489 estimate_table_block_counts(&client, &table_oids).await?
490 } else {
491 trace!(
492 %id,
493 "timely-{worker_id} PostgreSQL version {pg_version} < 14, \
494 falling back to single-worker-per-table snapshot mode"
495 );
496 // Return all zeros to trigger fallback mode
497 table_oids.iter().map(|&oid| (oid, 0u64)).collect()
498 };
499
500 report_snapshot_size(
501 &client,
502 &tables_to_snapshot,
503 metrics,
504 &config,
505 &export_statistics,
506 )
507 .await?;
508
509 let upstream_info = {
510 // As part of retrieving the schema info, RLS policies are checked to ensure the
511 // snapshot can successfully read the tables. RLS policy errors are treated as
512 // transient, as the customer can simply add the BYPASSRLS to the PG account
513 // used by MZ.
514 match retrieve_schema_info(
515 &connection_config,
516 &config.config.connection_context,
517 &connection.publication,
518 &table_oids)
519 .await
520 {
521 // If the replication stream cannot be obtained in a definite way there is
522 // nothing else to do. These errors are not retractable.
523 Err(PostgresError::PublicationMissing(publication)) => {
524 let err = DefiniteError::PublicationDropped(publication);
525 for (oid, outputs) in tables_to_snapshot.iter() {
526 // Produce a definite error here and then exit to ensure
527 // a missing publication doesn't generate a transient
528 // error and restart this dataflow indefinitely.
529 //
530 // We pick `u64::MAX` as the LSN which will (in
531 // practice) never conflict any previously revealed
532 // portions of the TVC.
533 for output_index in outputs.keys() {
534 let update = (
535 (*oid, *output_index, Err(err.clone().into())),
536 MzOffset::from(u64::MAX),
537 Diff::ONE,
538 );
539 let size = update.fuel_size();
540 raw_handle
541 .give_fueled(&data_cap_set[0], update, size)
542 .await;
543 }
544 }
545
546 definite_error_handle.give(
547 &definite_error_cap_set[0],
548 ReplicationError::Definite(Rc::new(err)),
549 );
550 return Ok(());
551 },
552 Err(e) => Err(TransientError::from(e))?,
553 Ok(i) => i,
554 }
555 };
556
557 let snapshot_info = SnapshotInfo {
558 snapshot_id,
559 snapshot_lsn,
560 upstream_info,
561 table_block_counts,
562 };
563 trace!(
564 %id,
565 "timely-{worker_id} exporting snapshot info {snapshot_info:?}");
566 snapshot_handle.give(&snapshot_cap_set[0], snapshot_info);
567
568 client
569 }
570 None => {
571 // Only the snapshot leader needs a replication connection.
572 let task_name = format!("timely-{worker_id} PG snapshotter");
573 connection_config
574 .connect(
575 &task_name,
576 &config.config.connection_context.ssh_tunnel_manager,
577 )
578 .await?
579 }
580 };
581
582 // Configure statement_timeout based on param. We want to be able to
583 // override the server value here in case it's set too low,
584 // respective to the size of the data we need to copy.
585 set_statement_timeout(
586 &client,
587 config
588 .config
589 .parameters
590 .pg_source_snapshot_statement_timeout,
591 )
592 .await?;
593
594 let snapshot_info = loop {
595 match snapshot_input.next().await {
596 Some(AsyncEvent::Data(_, mut data)) => {
597 break data.pop().expect("snapshot sent above")
598 }
599 Some(AsyncEvent::Progress(_)) => continue,
600 None => panic!(
601 "feedback closed \
602 before sending snapshot info"
603 ),
604 }
605 };
606 let SnapshotInfo {
607 snapshot_id,
608 snapshot_lsn,
609 table_block_counts,
610 upstream_info,
611 } = snapshot_info;
612
613 // Snapshot leader is already in identified transaction but all other workers need to enter it.
614 if !is_snapshot_leader {
615 trace!(%id, "timely-{worker_id} using snapshot id {snapshot_id:?}");
616 use_snapshot(&client, &snapshot_id).await?;
617 }
618
619 for (&oid, outputs) in tables_to_snapshot.iter() {
620 for (&output_index, info) in outputs.iter() {
621 if let Err(err) = verify_schema(oid, info, &upstream_info) {
622 let update = (
623 (oid, output_index, Err(err.into())),
624 MzOffset::minimum(),
625 Diff::ONE,
626 );
627 let size = update.fuel_size();
628 raw_handle
629 .give_fueled(&data_cap_set[0], update, size)
630 .await;
631 continue;
632 }
633
634 // Get estimated block count from the broadcasted table statistics
635 let block_count = table_block_counts.get(&oid).copied().unwrap_or(0);
636
637 // Calculate this worker's ctid range based on estimated blocks.
638 // When estimated_blocks is 0 (PG < 14 or empty table), fall back to
639 // single-worker mode using responsible_for to pick the worker.
640 let Some(ctid_range) = worker_ctid_range(&config, block_count, oid) else {
641 // This worker has no work for this table (more workers than blocks)
642 trace!(
643 %id,
644 "timely-{worker_id} no ctid range assigned for table {:?}({oid})",
645 info.desc.name
646 );
647 continue;
648 };
649
650 trace!(
651 %id,
652 "timely-{worker_id} snapshotting table {:?}({oid}) output {output_index} \
653 @ {snapshot_lsn} with ctid range {:?}",
654 info.desc.name,
655 ctid_range
656 );
657
658 let namespace = Sql::ident(&info.desc.namespace);
659 let table = Sql::ident(&info.desc.name);
660 let column_list =
661 Sql::join(info.desc.columns.iter().map(|c| Sql::ident(&c.name)), ",");
662
663 let ctid_filter = match ctid_range.end_block {
664 Some(end) => sql!(
665 "WHERE ctid >= '({},0)'::tid AND ctid < '({},0)'::tid",
666 ctid_range.start_block,
667 end
668 ),
669 None => sql!(
670 "WHERE ctid >= '({},0)'::tid",
671 ctid_range.start_block
672 ),
673 };
674 let query = sql!(
675 "COPY (SELECT {} FROM {}.{} {}) TO STDOUT (FORMAT TEXT, DELIMITER '\t')",
676 column_list,
677 namespace,
678 table,
679 ctid_filter
680 );
681 let mut stream = pin!(client.copy_out_simple(query.as_str()).await?);
682
683 let mut snapshot_staged = 0;
684 while let Some(bytes) = stream.try_next().await? {
685 let update = (
686 (oid, output_index, Ok(bytes)),
687 MzOffset::minimum(),
688 Diff::ONE,
689 );
690 let size = update.fuel_size();
691 raw_handle
692 .give_fueled(&data_cap_set[0], update, size)
693 .await;
694 snapshot_staged += 1;
695 if snapshot_staged % 1000 == 0 {
696 let stat = &export_statistics[&(oid, output_index)];
697 stat.set_snapshot_records_staged(snapshot_staged);
698 }
699 }
700 // final update for snapshot_staged, using the staged
701 // values as the total is an estimate
702 let stat = &export_statistics[&(oid, output_index)];
703 stat.set_snapshot_records_staged(snapshot_staged);
704 }
705 }
706
707 // We are done with the snapshot so now we will emit rewind requests. It is important
708 // that this happens after the snapshot has finished because this is what unblocks the
709 // replication operator and we want this to happen serially. It might seem like a good
710 // idea to read the replication stream concurrently with the snapshot but it actually
711 // leads to a lot of data being staged for the future, which needlessly consumed memory
712 // in the cluster.
713 //
714 // Since all workers now snapshot all tables (each with different ctid ranges), we only
715 // emit rewind requests from the worker responsible for each output to avoid duplicates.
716 for (&oid, output) in tables_to_snapshot.iter() {
717 for (output_index, info) in output {
718 // Only emit rewind request from one worker per output
719 if !config.responsible_for((oid, *output_index)) {
720 continue;
721 }
722 trace!(%id, "timely-{worker_id} producing rewind request for table {} output {output_index}", info.desc.name);
723 let req = RewindRequest { output_index: *output_index, snapshot_lsn };
724 rewinds_handle.give(&rewind_cap_set[0], req);
725 }
726 }
727 *rewind_cap_set = CapabilitySet::new();
728
729 // Failure scenario after we have produced the snapshot, but before a successful COMMIT
730 fail::fail_point!("pg_snapshot_failure", |_| Err(
731 TransientError::SyntheticError
732 ));
733
734 // The exporting worker should wait for all the other workers to commit before dropping
735 // its client since this is what holds the exported transaction alive.
736 if is_snapshot_leader {
737 trace!(%id, "timely-{worker_id} waiting for all workers to finish");
738 *snapshot_cap_set = CapabilitySet::new();
739 while snapshot_input.next().await.is_some() {}
740 trace!(%id, "timely-{worker_id} (leader) comitting COPY transaction");
741 simple_query(&client, sql!("COMMIT")).await?;
742 } else {
743 trace!(%id, "timely-{worker_id} comitting COPY transaction");
744 simple_query(&client, sql!("COMMIT")).await?;
745 *snapshot_cap_set = CapabilitySet::new();
746 }
747 drop(client);
748 Ok(())
749 }))
750 });
751
752 // We now decode the COPY protocol and apply the cast expressions
753 let mut text_row = Row::default();
754 let mut final_row = Row::default();
755 let mut datum_vec = DatumVec::new();
756 let snapshot_updates = raw_data
757 .unary(Pipeline, "PgCastSnapshotRows", |_, _| {
758 move |input, output| {
759 input.for_each_time(|time, data| {
760 let mut session = output.session(&time);
761 for ((oid, output_index, event), time, diff) in
762 data.flat_map(|data| data.drain(..))
763 {
764 let output = &table_info
765 .get(&oid)
766 .and_then(|outputs| outputs.get(&output_index))
767 .expect("table_info contains all outputs");
768
769 let event = event
770 .as_ref()
771 .map_err(|e: &DataflowError| e.clone())
772 .and_then(|bytes| {
773 decode_copy_row(bytes, output.casts.len(), &mut text_row)?;
774 let datums = datum_vec.borrow_with(&text_row);
775 super::cast_row(&output.casts, &datums, &mut final_row)?;
776 Ok(SourceMessage {
777 key: Row::default(),
778 value: final_row.clone(),
779 metadata: Row::default(),
780 })
781 });
782
783 session.give(((output_index, event), time, diff));
784 }
785 });
786 }
787 })
788 .as_collection();
789
790 let errors = definite_errors.concat(transient_errors.map(ReplicationError::from));
791
792 (
793 snapshot_updates,
794 rewinds,
795 slot_ready,
796 errors,
797 button.press_on_drop(),
798 )
799}
800
801/// Starts a read-only transaction on the SQL session of `client` at a consistent LSN point by
802/// creating a replication slot. Returns a snapshot identifier that can be imported in
803/// other SQL session and the LSN of the consistent point.
804async fn export_snapshot(
805 client: &Client,
806 slot: &str,
807 temporary: bool,
808) -> Result<(String, MzOffset), TransientError> {
809 match export_snapshot_inner(client, slot, temporary).await {
810 Ok(ok) => Ok(ok),
811 Err(err) => {
812 // We don't want to leave the client inside a failed tx
813 simple_query(client, sql!("ROLLBACK;")).await?;
814 Err(err)
815 }
816 }
817}
818
819async fn export_snapshot_inner(
820 client: &Client,
821 slot: &str,
822 temporary: bool,
823) -> Result<(String, MzOffset), TransientError> {
824 simple_query(
825 client,
826 sql!("BEGIN READ ONLY ISOLATION LEVEL REPEATABLE READ;"),
827 )
828 .await?;
829
830 let query = if temporary {
831 sql!(
832 "CREATE_REPLICATION_SLOT {} TEMPORARY LOGICAL \"pgoutput\" USE_SNAPSHOT",
833 Sql::ident(slot)
834 )
835 } else {
836 sql!(
837 "CREATE_REPLICATION_SLOT {} LOGICAL \"pgoutput\" USE_SNAPSHOT",
838 Sql::ident(slot)
839 )
840 };
841 let row = match simple_query_opt(client, query).await {
842 Ok(row) => Ok(row.unwrap()),
843 Err(PostgresError::Postgres(err)) if err.code() == Some(&SqlState::DUPLICATE_OBJECT) => {
844 return Err(TransientError::ReplicationSlotAlreadyExists);
845 }
846 Err(err) => Err(err),
847 }?;
848
849 // When creating a replication slot postgres returns the LSN of its consistent point, which is
850 // the LSN that must be passed to `START_REPLICATION` to cleanly transition from the snapshot
851 // phase to the replication phase. `START_REPLICATION` includes all transactions that commit at
852 // LSNs *greater than or equal* to the passed LSN. Therefore the snapshot phase must happen at
853 // the greatest LSN that is not beyond the consistent point. That LSN is `consistent_point - 1`
854 let consistent_point: PgLsn = row.get("consistent_point").unwrap().parse().unwrap();
855 let consistent_point = u64::from(consistent_point)
856 .checked_sub(1)
857 .expect("consistent point is always non-zero");
858
859 let row = simple_query_opt(client, sql!("SELECT pg_export_snapshot();"))
860 .await?
861 .unwrap();
862 let snapshot = row.get("pg_export_snapshot").unwrap().to_owned();
863
864 Ok((snapshot, MzOffset::from(consistent_point)))
865}
866
867/// Starts a read-only transaction on the SQL session of `client` at a the consistent LSN point of
868/// `snapshot`.
869async fn use_snapshot(client: &Client, snapshot: &str) -> Result<(), TransientError> {
870 simple_query(
871 client,
872 sql!("BEGIN READ ONLY ISOLATION LEVEL REPEATABLE READ;"),
873 )
874 .await?;
875 let query = sql!("SET TRANSACTION SNAPSHOT {};", Sql::literal(snapshot));
876 simple_query(client, query).await?;
877 Ok(())
878}
879
880async fn set_statement_timeout(client: &Client, timeout: Duration) -> Result<(), TransientError> {
881 // Value is known to accept milliseconds w/o units.
882 // https://www.postgresql.org/docs/current/runtime-config-client.html
883 let query = sql!(
884 "SET statement_timeout = {}",
885 Sql::literal(&timeout.as_millis().to_string())
886 );
887 simple_query(client, query).await?;
888 Ok(())
889}
890
891/// Decodes a row of `col_len` columns obtained from a text encoded COPY query into `row`.
892fn decode_copy_row(data: &[u8], col_len: usize, row: &mut Row) -> Result<(), DefiniteError> {
893 let mut packer = row.packer();
894 let row_parser = mz_pgcopy::CopyTextFormatParser::new(data, b'\t', "\\N");
895 let mut column_iter = row_parser.iter_raw_truncating(col_len);
896 for _ in 0..col_len {
897 let value = match column_iter.next() {
898 Some(Ok(value)) => value,
899 Some(Err(_)) => return Err(DefiniteError::InvalidCopyInput),
900 None => return Err(DefiniteError::MissingColumn),
901 };
902 let datum = value.map(super::decode_utf8_text).transpose()?;
903 packer.push(datum.unwrap_or(Datum::Null));
904 }
905 Ok(())
906}
907
908/// Record the sizes of the tables being snapshotted in `PgSnapshotMetrics` and emit snapshot statistics for each export.
909async fn report_snapshot_size(
910 client: &Client,
911 tables_to_snapshot: &BTreeMap<u32, BTreeMap<usize, SourceOutputInfo>>,
912 metrics: PgSnapshotMetrics,
913 config: &RawSourceCreationConfig,
914 export_statistics: &BTreeMap<(u32, usize), SourceStatistics>,
915) -> Result<(), anyhow::Error> {
916 // TODO(guswynn): delete unused configs
917 let snapshot_config = config.config.parameters.pg_snapshot_config;
918
919 for (&oid, outputs) in tables_to_snapshot {
920 // Use the first output's desc to make the table name since it is the same for all outputs
921 let Some((_, info)) = outputs.first_key_value() else {
922 continue;
923 };
924 let table = sql!(
925 "{}.{}",
926 Sql::ident(&info.desc.namespace),
927 Sql::ident(&info.desc.name)
928 )
929 .into_string();
930 let stats = collect_table_statistics(
931 client,
932 snapshot_config,
933 &info.desc.namespace,
934 &info.desc.name,
935 info.desc.oid,
936 )
937 .await?;
938 metrics.record_table_count_latency(table, stats.count_latency);
939 for &output_index in outputs.keys() {
940 export_statistics[&(oid, output_index)].set_snapshot_records_known(stats.count);
941 export_statistics[&(oid, output_index)].set_snapshot_records_staged(0);
942 }
943 }
944 Ok(())
945}
946
947#[derive(Default)]
948struct TableStatistics {
949 count: u64,
950 count_latency: f64,
951}
952
953async fn collect_table_statistics(
954 client: &Client,
955 config: PgSourceSnapshotConfig,
956 schema: &str,
957 table: &str,
958 oid: u32,
959) -> Result<TableStatistics, anyhow::Error> {
960 use mz_ore::metrics::MetricsFutureExt;
961 let mut stats = TableStatistics::default();
962
963 let estimate_query = sql!(
964 "SELECT reltuples::bigint AS estimate_count FROM pg_class WHERE oid = {}",
965 Sql::literal(&oid.to_string())
966 );
967 let estimate_row = simple_query_opt(client, estimate_query)
968 .wall_time()
969 .set_at(&mut stats.count_latency)
970 .await?;
971 stats.count = match estimate_row {
972 Some(row) => row.get("estimate_count").unwrap().parse().unwrap_or(0),
973 None => bail!("failed to get estimate count for {schema}.{table}"),
974 };
975
976 // If the estimate is low enough we can attempt to get an exact count. Note that not yet
977 // vacuumed tables will report zero rows here and there is a possibility that they are very
978 // large. We accept this risk and we offer the feature flag as an escape hatch if it becomes
979 // problematic.
980 if config.collect_strict_count && stats.count < 1_000_000 {
981 let count_query = sql!(
982 "SELECT count(*) as count from {}.{}",
983 Sql::ident(schema),
984 Sql::ident(table)
985 );
986 let count_row = simple_query_opt(client, count_query)
987 .wall_time()
988 .set_at(&mut stats.count_latency)
989 .await?;
990 stats.count = match count_row {
991 Some(row) => row.get("count").unwrap().parse().unwrap(),
992 None => bail!("failed to get count for {schema}.{table}"),
993 }
994 }
995
996 Ok(stats)
997}
998
999/// Validates that there are no blocking RLS polcicies on the tables and retrieves table schemas
1000/// for the given publication.
1001async fn retrieve_schema_info(
1002 connection_config: &Config,
1003 connection_context: &ConnectionContext,
1004 publication: &str,
1005 table_oids: &[Oid],
1006) -> Result<BTreeMap<u32, PostgresTableDesc>, PostgresError> {
1007 let schema_client = connection_config
1008 .connect(
1009 "snapshot schema info",
1010 &connection_context.ssh_tunnel_manager,
1011 )
1012 .await?;
1013 mz_postgres_util::validate_no_rls_policies(&schema_client, table_oids).await?;
1014 mz_postgres_util::publication_info(&schema_client, publication, Some(table_oids)).await
1015}