mz_storage/source/postgres/snapshot.rs
1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Renders the table snapshot side of the [`PostgresSourceConnection`] ingestion dataflow.
11//!
12//! # Snapshot reading
13//!
14//! Depending on the resumption LSNs the table reader decides which tables need to be snapshotted
15//! and performs a simple `COPY` query on them in order to get a snapshot. There are a few subtle
16//! points about this operation, described in the following sections.
17//!
18//! ## Consistent LSN point for snapshot transactions
19//!
20//! Given that all our ingestion is based on correctly timestamping updates with the LSN they
21//! happened at it is important that we run the `COPY` query at a specific LSN point that is
22//! relatable with the LSN numbers we receive from the replication stream. Such point does not
23//! necessarily exist for a normal SQL transaction. To achieve this we must force postgres to
24//! produce a consistent point and let us know of the LSN number of that by creating a replication
25//! slot as the first statement in a transaction.
26//!
27//! This is a temporary dummy slot that is only used to put our snapshot transaction on a
28//! consistent LSN point. Unfortunately no lighterweight method exists for doing this. See this
29//! [postgres thread] for more details.
30//!
31//! One might wonder why we don't use the actual real slot to provide us with the snapshot point
32//! which would automatically be at the correct LSN. The answer is that it's possible that we crash
33//! and restart after having already created the slot but before having finished the snapshot. In
34//! that case the restarting process will have lost its opportunity to run queries at the slot's
35//! consistent point as that opportunity only exists in the ephemeral transaction that created the
36//! slot and that is long gone. Additionally there are good reasons of why we'd like to move the
37//! slot creation much earlier, e.g during purification, in which case the slot will always be
38//! pre-created.
39//!
40//! [postgres thread]: https://www.postgresql.org/message-id/flat/CAMN0T-vzzNy6TV1Jvh4xzNQdAvCLBQK_kh6_U7kAXgGU3ZFg-Q%40mail.gmail.com
41//!
42//! ## Reusing the consistent point among all workers
43//!
44//! Creating replication slots is potentially expensive so the code makes is such that all workers
45//! cooperate and reuse one consistent snapshot among them. In order to do so we make use the
46//! "export transaction" feature of postgres. This feature allows one SQL session to create an
47//! identifier for the transaction (a string identifier) it is currently in, which can be used by
48//! other sessions to enter the same "snapshot".
49//!
50//! We accomplish this by picking one worker at random to function as the transaction leader. The
51//! transaction leader is responsible for starting a SQL session, creating a temporary replication
52//! slot in a transaction, exporting the transaction id, and broadcasting the transaction
53//! information to all other workers via a broadcasted feedback edge.
54//!
55//! During this phase the follower workers are simply waiting to hear on the feedback edge,
56//! effectively synchronizing with the leader. Once all workers have received the snapshot
57//! information they can all start to perform their assigned COPY queries.
58//!
59//! The leader and follower steps described above are accomplished by the [`export_snapshot`] and
60//! [`use_snapshot`] functions respectively.
61//!
62//! ## Coordinated transaction COMMIT
63//!
64//! When follower workers are done with snapshotting they commit their transaction, close their
65//! session, and then drop their snapshot feedback capability. When the leader worker is done with
66//! snapshotting it drops its snapshot feedback capability and waits until it observes the
67//! snapshot input advancing to the empty frontier. This allows the leader to COMMIT its
68//! transaction last, which is the transaction that exported the snapshot.
69//!
70//! It's unclear if this is strictly necessary, but having the frontiers made it easy enough that I
71//! added the synchronization.
72//!
73//! ## Snapshot rewinding
74//!
75//! Ingestion dataflows must produce definite data, including the snapshot. What this means
76//! practically is that whenever we deem it necessary to snapshot a table we must do so at the same
77//! LSN. However, the method for running a transaction described above doesn't let us choose the
78//! LSN, it could be an LSN in the future chosen by PostgresSQL while it creates the temporary
79//! replication slot.
80//!
81//! The definition of differential collections states that a collection at some time `t_snapshot`
82//! is defined to be the accumulation of all updates that happen at `t <= t_snapshot`, where `<=`
83//! is the partial order. In this case we are faced with the problem of knowing the state of a
84//! table at `t_snapshot` but actually wanting to know the snapshot at `t_slot <= t_snapshot`.
85//!
86//! From the definition we can see that the snapshot at `t_slot` is related to the snapshot at
87//! `t_snapshot` with the following equations:
88//!
89//!```text
90//! sum(update: t <= t_snapshot) = sum(update: t <= t_slot) + sum(update: t_slot <= t <= t_snapshot)
91//!                                         |
92//!                                         V
93//! sum(update: t <= t_slot) = sum(update: t <= snapshot) - sum(update: t_slot <= t <= t_snapshot)
94//! ```
95//!
96//! Therefore, if we manage to recover the `sum(update: t_slot <= t <= t_snapshot)` term we will be
97//! able to "rewind" the snapshot we obtained at `t_snapshot` to `t_slot` by emitting all updates
98//! that happen between these two points with their diffs negated.
99//!
100//! It turns out that this term is exactly what the main replication slot provides us with and we
101//! can rewind snapshot at arbitrary points! In order to do this the snapshot dataflow emits rewind
102//! requests to the replication reader which informs it that a certain range of updates must be
103//! emitted at LSN 0 (by convention) with their diffs negated. These negated diffs are consolidated
104//! with the diffs taken at `t_snapshot` that were also emitted at LSN 0 (by convention) and we end
105//! up with a TVC that at LSN 0 contains the snapshot at `t_slot`.
106//!
107//! # Snapshot decoding
108//!
109//! The expectation is that tables will most likely be skewed on the number of rows they contain so
110//! while a `COPY` query for any given table runs on a single worker the decoding of the COPY
111//! stream is distributed to all workers.
112//!
113//!
114//! ```text
115//!                 ╭──────────────────╮
116//!    ┏━━━━━━━━━━━━v━┓                │ exported
117//!    ┃    table     ┃   ╭─────────╮  │ snapshot id
118//!    ┃    reader    ┠─>─┤broadcast├──╯
119//!    ┗━┯━━━━━━━━━━┯━┛   ╰─────────╯
120//!   raw│          │
121//!  COPY│          │
122//!  data│          │
123//! ╭────┴─────╮    │
124//! │distribute│    │
125//! ╰────┬─────╯    │
126//! ┏━━━━┷━━━━┓     │
127//! ┃  COPY   ┃     │
128//! ┃ decoder ┃     │
129//! ┗━━━━┯━━━━┛     │
130//!      │ snapshot │rewind
131//!      │ updates  │requests
132//!      v          v
133//! ```
134
135use std::collections::BTreeMap;
136use std::convert::Infallible;
137use std::pin::pin;
138use std::rc::Rc;
139use std::sync::Arc;
140use std::time::Duration;
141
142use anyhow::bail;
143use differential_dataflow::AsCollection;
144use futures::{StreamExt as _, TryStreamExt};
145use itertools::Itertools;
146use mz_ore::cast::CastFrom;
147use mz_ore::future::InTask;
148use mz_postgres_util::desc::PostgresTableDesc;
149use mz_postgres_util::{Client, Config, PostgresError, simple_query_opt};
150use mz_repr::{Datum, DatumVec, Diff, Row};
151use mz_sql_parser::ast::{Ident, display::AstDisplay};
152use mz_storage_types::connections::ConnectionContext;
153use mz_storage_types::errors::DataflowError;
154use mz_storage_types::parameters::PgSourceSnapshotConfig;
155use mz_storage_types::sources::{MzOffset, PostgresSourceConnection};
156use mz_timely_util::builder_async::{
157    Event as AsyncEvent, OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton,
158};
159use timely::container::CapacityContainerBuilder;
160use timely::dataflow::channels::pact::{Exchange, Pipeline};
161use timely::dataflow::operators::core::Map;
162use timely::dataflow::operators::{
163    Broadcast, CapabilitySet, Concat, ConnectLoop, Feedback, Operator,
164};
165use timely::dataflow::{Scope, Stream};
166use timely::progress::Timestamp;
167use tokio_postgres::error::SqlState;
168use tokio_postgres::types::{Oid, PgLsn};
169use tracing::trace;
170
171use crate::metrics::source::postgres::PgSnapshotMetrics;
172use crate::source::RawSourceCreationConfig;
173use crate::source::postgres::replication::RewindRequest;
174use crate::source::postgres::{
175    DefiniteError, ReplicationError, SourceOutputInfo, TransientError, verify_schema,
176};
177use crate::source::types::{SignaledFuture, SourceMessage, StackedCollection};
178use crate::statistics::SourceStatistics;
179
180/// Renders the snapshot dataflow. See the module documentation for more information.
181pub(crate) fn render<G: Scope<Timestamp = MzOffset>>(
182    mut scope: G,
183    config: RawSourceCreationConfig,
184    connection: PostgresSourceConnection,
185    table_info: BTreeMap<u32, BTreeMap<usize, SourceOutputInfo>>,
186    metrics: PgSnapshotMetrics,
187) -> (
188    StackedCollection<G, (usize, Result<SourceMessage, DataflowError>)>,
189    Stream<G, RewindRequest>,
190    Stream<G, Infallible>,
191    Stream<G, ReplicationError>,
192    PressOnDropButton,
193) {
194    let op_name = format!("TableReader({})", config.id);
195    let mut builder = AsyncOperatorBuilder::new(op_name, scope.clone());
196
197    let (feedback_handle, feedback_data) = scope.feedback(Default::default());
198
199    let (raw_handle, raw_data) = builder.new_output();
200    let (rewinds_handle, rewinds) = builder.new_output();
201    // This output is used to signal to the replication operator that the replication slot has been
202    // created. With the current state of execution serialization there isn't a lot of benefit
203    // of splitting the snapshot and replication phases into two operators.
204    // TODO(petrosagg): merge the two operators in one (while still maintaining separation as
205    // functions/modules)
206    let (_, slot_ready) = builder.new_output::<CapacityContainerBuilder<_>>();
207    let (snapshot_handle, snapshot) = builder.new_output();
208    let (definite_error_handle, definite_errors) = builder.new_output();
209
210    // This operator needs to broadcast data to itself in order to synchronize the transaction
211    // snapshot. However, none of the feedback capabilities result in output messages and for the
212    // feedback edge specifically having a default conncetion would result in a loop.
213    let mut snapshot_input = builder.new_disconnected_input(&feedback_data, Pipeline);
214
215    // The export id must be sent to all workers, so we broadcast the feedback connection
216    snapshot.broadcast().connect_loop(feedback_handle);
217
218    let is_snapshot_leader = config.responsible_for("snapshot_leader");
219
220    // A global view of all outputs that will be snapshot by all workers.
221    let mut all_outputs = vec![];
222    // A filtered table info containing only the tables that this worker should snapshot.
223    let mut worker_table_info = BTreeMap::new();
224    // A collecction of `SourceStatistics` to update for a given Oid. Same info exists in reader_table_info,
225    // but this avoids having to iterate + map each time the statistics are needed.
226    let mut export_statistics = BTreeMap::new();
227    for (table, outputs) in table_info.iter() {
228        for (&output_index, output) in outputs {
229            if *output.resume_upper != [MzOffset::minimum()] {
230                // Already has been snapshotted.
231                continue;
232            }
233            all_outputs.push(output_index);
234            if config.responsible_for(*table) {
235                worker_table_info
236                    .entry(*table)
237                    .or_insert_with(BTreeMap::new)
238                    .insert(output_index, output.clone());
239            }
240            let statistics = config
241                .statistics
242                .get(&output.export_id)
243                .expect("statistics are initialized")
244                .clone();
245            export_statistics.insert((*table, output_index), statistics);
246        }
247    }
248
249    let (button, transient_errors) = builder.build_fallible(move |caps| {
250        let busy_signal = Arc::clone(&config.busy_signal);
251        Box::pin(SignaledFuture::new(busy_signal, async move {
252            let id = config.id;
253            let worker_id = config.worker_id;
254            let [
255                data_cap_set,
256                rewind_cap_set,
257                slot_ready_cap_set,
258                snapshot_cap_set,
259                definite_error_cap_set,
260            ]: &mut [_; 5] = caps.try_into().unwrap();
261
262            trace!(
263                %id,
264                "timely-{worker_id} initializing table reader \
265                    with {} tables to snapshot",
266                    worker_table_info.len()
267            );
268
269            let connection_config = connection
270                .connection
271                .config(
272                    &config.config.connection_context.secrets_reader,
273                    &config.config,
274                    InTask::Yes,
275                )
276                .await?;
277
278
279            // The snapshot operator is responsible for creating the replication slot(s).
280            // This first slot is the permanent slot that will be used for reading the replication
281            // stream.  A temporary slot is created further on to capture table snapshots.
282            let replication_client = if is_snapshot_leader {
283                let client = connection_config
284                    .connect_replication(&config.config.connection_context.ssh_tunnel_manager)
285                    .await?;
286                let main_slot = &connection.publication_details.slot;
287
288                tracing::info!(%id, "ensuring replication slot {main_slot} exists");
289                super::ensure_replication_slot(&client, main_slot).await?;
290                Some(client)
291            } else {
292                None
293            };
294            *slot_ready_cap_set = CapabilitySet::new();
295
296            // Nothing needs to be snapshot.
297            if all_outputs.is_empty() {
298                trace!(%id, "no exports to snapshot");
299                // Note we do not emit a `ProgressStatisticsUpdate::Snapshot` update here,
300                // as we do not want to attempt to override the current value with 0. We
301                // just leave it null.
302                return Ok(());
303            }
304
305            // A worker *must* emit a count even if not responsible for snapshotting a table
306            // as statistic summarization will return null if any worker hasn't set a value.
307            // This will also reset snapshot stats for any exports not snapshotting.
308            // If no workers need to snapshot, then avoid emitting these as they will clear
309            // previous stats.
310            for statistics in config.statistics.values() {
311                statistics.set_snapshot_records_known(0);
312                statistics.set_snapshot_records_staged(0);
313            }
314
315            // replication client is only set if this worker is the snapshot leader
316            let client = match replication_client {
317                Some(client) => {
318                    let tmp_slot = format!("mzsnapshot_{}", uuid::Uuid::new_v4()).replace('-', "");
319                    let snapshot_info = export_snapshot(&client, &tmp_slot, true).await?;
320                    trace!(
321                        %id,
322                        "timely-{worker_id} exporting snapshot info {snapshot_info:?}");
323                    snapshot_handle.give(&snapshot_cap_set[0], snapshot_info);
324
325                    client
326                }
327                None => {
328                    // Only the snapshot leader needs a replication connection.
329                    let task_name = format!("timely-{worker_id} PG snapshotter");
330                    connection_config
331                        .connect(
332                            &task_name,
333                            &config.config.connection_context.ssh_tunnel_manager,
334                        )
335                        .await?
336                }
337            };
338
339            // Configure statement_timeout based on param. We want to be able to
340            // override the server value here in case it's set too low,
341            // respective to the size of the data we need to copy.
342            set_statement_timeout(
343                &client,
344                config
345                    .config
346                    .parameters
347                    .pg_source_snapshot_statement_timeout,
348            )
349            .await?;
350
351            let (snapshot, snapshot_lsn) = loop {
352                match snapshot_input.next().await {
353                    Some(AsyncEvent::Data(_, mut data)) => {
354                        break data.pop().expect("snapshot sent above")
355                    }
356                    Some(AsyncEvent::Progress(_)) => continue,
357                    None => panic!(
358                        "feedback closed \
359                    before sending snapshot info"
360                    ),
361                }
362            };
363            // Snapshot leader is already in identified transaction but all other workers need to enter it.
364            if !is_snapshot_leader {
365                trace!(%id, "timely-{worker_id} using snapshot id {snapshot:?}");
366                use_snapshot(&client, &snapshot).await?;
367            }
368
369
370            let upstream_info = {
371                let table_oids = worker_table_info.keys().copied().collect::<Vec<_>>();
372                // As part of retrieving the schema info, RLS policies are checked to ensure the
373                // snapshot can successfully read the tables. RLS policy errors are treated as
374                // transient, as the customer can simply add the BYPASSRLS to the PG account
375                // used by MZ.
376                match retrieve_schema_info(
377                    &connection_config,
378                    &config.config.connection_context,
379                    &connection.publication,
380                    &table_oids)
381                    .await
382                {
383                    // If the replication stream cannot be obtained in a definite way there is
384                    // nothing else to do. These errors are not retractable.
385                    Err(PostgresError::PublicationMissing(publication)) => {
386                        let err = DefiniteError::PublicationDropped(publication);
387                        for (oid, outputs) in worker_table_info.iter() {
388                            // Produce a definite error here and then exit to ensure
389                            // a missing publication doesn't generate a transient
390                            // error and restart this dataflow indefinitely.
391                            //
392                            // We pick `u64::MAX` as the LSN which will (in
393                            // practice) never conflict any previously revealed
394                            // portions of the TVC.
395                            for output_index in outputs.keys() {
396                                let update = (
397                                    (*oid, *output_index, Err(err.clone().into())),
398                                    MzOffset::from(u64::MAX),
399                                    Diff::ONE,
400                                );
401                                raw_handle.give_fueled(&data_cap_set[0], update).await;
402                            }
403                        }
404
405                        definite_error_handle.give(
406                            &definite_error_cap_set[0],
407                            ReplicationError::Definite(Rc::new(err)),
408                        );
409                        return Ok(());
410                    },
411                    Err(e) => Err(TransientError::from(e))?,
412                    Ok(i) => i,
413                }
414            };
415
416            report_snapshot_size(&client, &worker_table_info, metrics, &config, &export_statistics).await?;
417
418            for (&oid, outputs) in worker_table_info.iter() {
419                for (&output_index, info) in outputs.iter() {
420                    if let Err(err) = verify_schema(oid, info, &upstream_info) {
421                        raw_handle
422                            .give_fueled(
423                                &data_cap_set[0],
424                                (
425                                    (oid, output_index, Err(err.into())),
426                                    MzOffset::minimum(),
427                                    Diff::ONE,
428                                ),
429                            )
430                            .await;
431                        continue;
432                    }
433
434                    trace!(
435                        %id,
436                        "timely-{worker_id} snapshotting table {:?}({oid}) @ {snapshot_lsn}",
437                        info.desc.name
438                    );
439
440                    // To handle quoted/keyword names, we can use `Ident`'s AST printing, which
441                    // emulate's PG's rules for name formatting.
442                    let namespace = Ident::new_unchecked(&info.desc.namespace).to_ast_string_stable();
443                    let table = Ident::new_unchecked(&info.desc.name).to_ast_string_stable();
444                    let column_list = info
445                        .desc
446                        .columns
447                        .iter()
448                        .map(|c| Ident::new_unchecked(&c.name).to_ast_string_stable())
449                        .join(",");
450                    let query = format!("COPY {namespace}.{table} ({column_list}) \
451                        TO STDOUT (FORMAT TEXT, DELIMITER '\t')");
452                    let mut stream = pin!(client.copy_out_simple(&query).await?);
453
454                    let mut snapshot_staged = 0;
455                    let mut update = ((oid, output_index, Ok(vec![])), MzOffset::minimum(), Diff::ONE);
456                    while let Some(bytes) = stream.try_next().await? {
457                        let data = update.0 .2.as_mut().unwrap();
458                        data.clear();
459                        data.extend_from_slice(&bytes);
460                        raw_handle.give_fueled(&data_cap_set[0], &update).await;
461                        snapshot_staged += 1;
462                        if snapshot_staged % 1000 == 0 {
463                            export_statistics[&(oid, output_index)].set_snapshot_records_staged(snapshot_staged);
464                        }
465                    }
466                    // final update for snapshot_staged, using the staged values as the total is an estimate
467                    export_statistics[&(oid, output_index)].set_snapshot_records_staged(snapshot_staged);
468                    export_statistics[&(oid, output_index)].set_snapshot_records_known(snapshot_staged);
469                }
470            }
471
472            // We are done with the snapshot so now we will emit rewind requests. It is important
473            // that this happens after the snapshot has finished because this is what unblocks the
474            // replication operator and we want this to happen serially. It might seem like a good
475            // idea to read the replication stream concurrently with the snapshot but it actually
476            // leads to a lot of data being staged for the future, which needlesly consumed memory
477            // in the cluster.
478            for output in worker_table_info.values() {
479                for (output_index, info) in output {
480                    trace!(%id, "timely-{worker_id} producing rewind request for table {} output {output_index}", info.desc.name);
481                    let req = RewindRequest { output_index: *output_index, snapshot_lsn };
482                    rewinds_handle.give(&rewind_cap_set[0], req);
483                }
484            }
485            *rewind_cap_set = CapabilitySet::new();
486
487            // Failure scenario after we have produced the snapshot, but before a successful COMMIT
488            fail::fail_point!("pg_snapshot_failure", |_| Err(
489                TransientError::SyntheticError
490            ));
491
492            // The exporting worker should wait for all the other workers to commit before dropping
493            // its client since this is what holds the exported transaction alive.
494            if is_snapshot_leader {
495                trace!(%id, "timely-{worker_id} waiting for all workers to finish");
496                *snapshot_cap_set = CapabilitySet::new();
497                while snapshot_input.next().await.is_some() {}
498                trace!(%id, "timely-{worker_id} (leader) comitting COPY transaction");
499                client.simple_query("COMMIT").await?;
500            } else {
501                trace!(%id, "timely-{worker_id} comitting COPY transaction");
502                client.simple_query("COMMIT").await?;
503                *snapshot_cap_set = CapabilitySet::new();
504            }
505            drop(client);
506            Ok(())
507        }))
508    });
509
510    // We now decode the COPY protocol and apply the cast expressions
511    let mut text_row = Row::default();
512    let mut final_row = Row::default();
513    let mut datum_vec = DatumVec::new();
514    let mut next_worker = (0..u64::cast_from(scope.peers()))
515        // Round robin on 1000-records basis to avoid creating tiny containers when there are a
516        // small number of updates and a large number of workers.
517        .flat_map(|w| std::iter::repeat_n(w, 1000))
518        .cycle();
519    let round_robin = Exchange::new(move |_| next_worker.next().unwrap());
520    let snapshot_updates = raw_data
521        .map::<Vec<_>, _, _>(Clone::clone)
522        .unary(round_robin, "PgCastSnapshotRows", |_, _| {
523            move |input, output| {
524                while let Some((time, data)) = input.next() {
525                    let mut session = output.session(&time);
526                    for ((oid, output_index, event), time, diff) in data.drain(..) {
527                        let output = &table_info
528                            .get(&oid)
529                            .and_then(|outputs| outputs.get(&output_index))
530                            .expect("table_info contains all outputs");
531
532                        let event = event
533                            .as_ref()
534                            .map_err(|e: &DataflowError| e.clone())
535                            .and_then(|bytes| {
536                                decode_copy_row(bytes, output.casts.len(), &mut text_row)?;
537                                let datums = datum_vec.borrow_with(&text_row);
538                                super::cast_row(&output.casts, &datums, &mut final_row)?;
539                                Ok(SourceMessage {
540                                    key: Row::default(),
541                                    value: final_row.clone(),
542                                    metadata: Row::default(),
543                                })
544                            });
545
546                        session.give(((output_index, event), time, diff));
547                    }
548                }
549            }
550        })
551        .as_collection();
552
553    let errors = definite_errors.concat(&transient_errors.map(ReplicationError::from));
554
555    (
556        snapshot_updates,
557        rewinds,
558        slot_ready,
559        errors,
560        button.press_on_drop(),
561    )
562}
563
564/// Starts a read-only transaction on the SQL session of `client` at a consistent LSN point by
565/// creating a replication slot. Returns a snapshot identifier that can be imported in
566/// other SQL session and the LSN of the consistent point.
567async fn export_snapshot(
568    client: &Client,
569    slot: &str,
570    temporary: bool,
571) -> Result<(String, MzOffset), TransientError> {
572    match export_snapshot_inner(client, slot, temporary).await {
573        Ok(ok) => Ok(ok),
574        Err(err) => {
575            // We don't want to leave the client inside a failed tx
576            client.simple_query("ROLLBACK;").await?;
577            Err(err)
578        }
579    }
580}
581
582async fn export_snapshot_inner(
583    client: &Client,
584    slot: &str,
585    temporary: bool,
586) -> Result<(String, MzOffset), TransientError> {
587    client
588        .simple_query("BEGIN READ ONLY ISOLATION LEVEL REPEATABLE READ;")
589        .await?;
590
591    // Note: Using unchecked here is okay because we're using it in a SQL query.
592    let slot = Ident::new_unchecked(slot).to_ast_string_simple();
593    let temporary_str = if temporary { " TEMPORARY" } else { "" };
594    let query =
595        format!("CREATE_REPLICATION_SLOT {slot}{temporary_str} LOGICAL \"pgoutput\" USE_SNAPSHOT");
596    let row = match simple_query_opt(client, &query).await {
597        Ok(row) => Ok(row.unwrap()),
598        Err(PostgresError::Postgres(err)) if err.code() == Some(&SqlState::DUPLICATE_OBJECT) => {
599            return Err(TransientError::ReplicationSlotAlreadyExists);
600        }
601        Err(err) => Err(err),
602    }?;
603
604    // When creating a replication slot postgres returns the LSN of its consistent point, which is
605    // the LSN that must be passed to `START_REPLICATION` to cleanly transition from the snapshot
606    // phase to the replication phase. `START_REPLICATION` includes all transactions that commit at
607    // LSNs *greater than or equal* to the passed LSN. Therefore the snapshot phase must happen at
608    // the greatest LSN that is not beyond the consistent point. That LSN is `consistent_point - 1`
609    let consistent_point: PgLsn = row.get("consistent_point").unwrap().parse().unwrap();
610    let consistent_point = u64::from(consistent_point)
611        .checked_sub(1)
612        .expect("consistent point is always non-zero");
613
614    let row = simple_query_opt(client, "SELECT pg_export_snapshot();")
615        .await?
616        .unwrap();
617    let snapshot = row.get("pg_export_snapshot").unwrap().to_owned();
618
619    Ok((snapshot, MzOffset::from(consistent_point)))
620}
621
622/// Starts a read-only transaction on the SQL session of `client` at a the consistent LSN point of
623/// `snapshot`.
624async fn use_snapshot(client: &Client, snapshot: &str) -> Result<(), TransientError> {
625    client
626        .simple_query("BEGIN READ ONLY ISOLATION LEVEL REPEATABLE READ;")
627        .await?;
628    let query = format!("SET TRANSACTION SNAPSHOT '{snapshot}';");
629    client.simple_query(&query).await?;
630    Ok(())
631}
632
633async fn set_statement_timeout(client: &Client, timeout: Duration) -> Result<(), TransientError> {
634    // Value is known to accept milliseconds w/o units.
635    // https://www.postgresql.org/docs/current/runtime-config-client.html
636    client
637        .simple_query(&format!("SET statement_timeout = {}", timeout.as_millis()))
638        .await?;
639    Ok(())
640}
641
642/// Decodes a row of `col_len` columns obtained from a text encoded COPY query into `row`.
643fn decode_copy_row(data: &[u8], col_len: usize, row: &mut Row) -> Result<(), DefiniteError> {
644    let mut packer = row.packer();
645    let row_parser = mz_pgcopy::CopyTextFormatParser::new(data, b'\t', "\\N");
646    let mut column_iter = row_parser.iter_raw_truncating(col_len);
647    for _ in 0..col_len {
648        let value = match column_iter.next() {
649            Some(Ok(value)) => value,
650            Some(Err(_)) => return Err(DefiniteError::InvalidCopyInput),
651            None => return Err(DefiniteError::MissingColumn),
652        };
653        let datum = value.map(super::decode_utf8_text).transpose()?;
654        packer.push(datum.unwrap_or(Datum::Null));
655    }
656    Ok(())
657}
658
659/// Record the sizes of the tables being snapshotted in `PgSnapshotMetrics` and emit snapshot statistics for each export.
660async fn report_snapshot_size(
661    client: &Client,
662    worker_table_info: &BTreeMap<u32, BTreeMap<usize, SourceOutputInfo>>,
663    metrics: PgSnapshotMetrics,
664    config: &RawSourceCreationConfig,
665    export_statistics: &BTreeMap<(u32, usize), SourceStatistics>,
666) -> Result<(), anyhow::Error> {
667    // TODO(guswynn): delete unused configs
668    let snapshot_config = config.config.parameters.pg_snapshot_config;
669
670    for (&oid, outputs) in worker_table_info {
671        // Use the first output's desc to make the table name since it is the same for all outputs
672        let Some((_, info)) = outputs.first_key_value() else {
673            continue;
674        };
675        let table = format!(
676            "{}.{}",
677            Ident::new_unchecked(info.desc.namespace.clone()).to_ast_string_simple(),
678            Ident::new_unchecked(info.desc.name.clone()).to_ast_string_simple()
679        );
680        let stats =
681            collect_table_statistics(client, snapshot_config, &table, info.desc.oid).await?;
682        metrics.record_table_count_latency(table, stats.count_latency);
683        for &output_index in outputs.keys() {
684            export_statistics[&(oid, output_index)].set_snapshot_records_known(stats.count);
685            export_statistics[&(oid, output_index)].set_snapshot_records_staged(0);
686        }
687    }
688    Ok(())
689}
690
691#[derive(Default)]
692struct TableStatistics {
693    count: u64,
694    count_latency: f64,
695}
696
697async fn collect_table_statistics(
698    client: &Client,
699    config: PgSourceSnapshotConfig,
700    table: &str,
701    oid: u32,
702) -> Result<TableStatistics, anyhow::Error> {
703    use mz_ore::metrics::MetricsFutureExt;
704    let mut stats = TableStatistics::default();
705
706    let estimate_row = simple_query_opt(
707        client,
708        &format!("SELECT reltuples::bigint AS estimate_count FROM pg_class WHERE oid = '{oid}'"),
709    )
710    .wall_time()
711    .set_at(&mut stats.count_latency)
712    .await?;
713    stats.count = match estimate_row {
714        Some(row) => row.get("estimate_count").unwrap().parse().unwrap_or(0),
715        None => bail!("failed to get estimate count for {table}"),
716    };
717
718    // If the estimate is low enough we can attempt to get an exact count. Note that not yet
719    // vacuumed tables will report zero rows here and there is a possibility that they are very
720    // large. We accept this risk and we offer the feature flag as an escape hatch if it becomes
721    // problematic.
722    if config.collect_strict_count && stats.count < 1_000_000 {
723        let count_row = simple_query_opt(client, &format!("SELECT count(*) as count from {table}"))
724            .wall_time()
725            .set_at(&mut stats.count_latency)
726            .await?;
727        stats.count = match count_row {
728            Some(row) => row.get("count").unwrap().parse().unwrap(),
729            None => bail!("failed to get count for {table}"),
730        }
731    }
732
733    Ok(stats)
734}
735
736/// Validates that there are no blocking RLS polcicies on the tables and retrieves table schemas
737/// for the given publication.
738async fn retrieve_schema_info(
739    connection_config: &Config,
740    connection_context: &ConnectionContext,
741    publication: &str,
742    table_oids: &[Oid],
743) -> Result<BTreeMap<u32, PostgresTableDesc>, PostgresError> {
744    let schema_client = connection_config
745        .connect(
746            "snapshot schema info",
747            &connection_context.ssh_tunnel_manager,
748        )
749        .await?;
750    mz_postgres_util::validate_no_rls_policies(&schema_client, table_oids).await?;
751    mz_postgres_util::publication_info(&schema_client, publication, Some(table_oids)).await
752}