mz_storage/source/postgres/
snapshot.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Renders the table snapshot side of the [`PostgresSourceConnection`] ingestion dataflow.
11//!
12//! # Snapshot reading
13//!
14//! Depending on the resumption LSNs the table reader decides which tables need to be snapshotted
15//! and performs a simple `COPY` query on them in order to get a snapshot. There are a few subtle
16//! points about this operation, described in the following sections.
17//!
18//! ## Consistent LSN point for snapshot transactions
19//!
20//! Given that all our ingestion is based on correctly timestamping updates with the LSN they
21//! happened at it is important that we run the `COPY` query at a specific LSN point that is
22//! relatable with the LSN numbers we receive from the replication stream. Such point does not
23//! necessarily exist for a normal SQL transaction. To achieve this we must force postgres to
24//! produce a consistent point and let us know of the LSN number of that by creating a replication
25//! slot as the first statement in a transaction.
26//!
27//! This is a temporary dummy slot that is only used to put our snapshot transaction on a
28//! consistent LSN point. Unfortunately no lighterweight method exists for doing this. See this
29//! [postgres thread] for more details.
30//!
31//! One might wonder why we don't use the actual real slot to provide us with the snapshot point
32//! which would automatically be at the correct LSN. The answer is that it's possible that we crash
33//! and restart after having already created the slot but before having finished the snapshot. In
34//! that case the restarting process will have lost its opportunity to run queries at the slot's
35//! consistent point as that opportunity only exists in the ephemeral transaction that created the
36//! slot and that is long gone. Additionally there are good reasons of why we'd like to move the
37//! slot creation much earlier, e.g during purification, in which case the slot will always be
38//! pre-created.
39//!
40//! [postgres thread]: https://www.postgresql.org/message-id/flat/CAMN0T-vzzNy6TV1Jvh4xzNQdAvCLBQK_kh6_U7kAXgGU3ZFg-Q%40mail.gmail.com
41//!
42//! ## Reusing the consistent point among all workers
43//!
44//! Creating replication slots is potentially expensive so the code makes is such that all workers
45//! cooperate and reuse one consistent snapshot among them. In order to do so we make use the
46//! "export transaction" feature of postgres. This feature allows one SQL session to create an
47//! identifier for the transaction (a string identifier) it is currently in, which can be used by
48//! other sessions to enter the same "snapshot".
49//!
50//! We accomplish this by picking one worker at random to function as the transaction leader. The
51//! transaction leader is responsible for starting a SQL session, creating a temporary replication
52//! slot in a transaction, exporting the transaction id, and broadcasting the transaction
53//! information to all other workers via a broadcasted feedback edge.
54//!
55//! During this phase the follower workers are simply waiting to hear on the feedback edge,
56//! effectively synchronizing with the leader. Once all workers have received the snapshot
57//! information they can all start to perform their assigned COPY queries.
58//!
59//! The leader and follower steps described above are accomplished by the [`export_snapshot`] and
60//! [`use_snapshot`] functions respectively.
61//!
62//! ## Coordinated transaction COMMIT
63//!
64//! When follower workers are done with snapshotting they commit their transaction, close their
65//! session, and then drop their snapshot feedback capability. When the leader worker is done with
66//! snapshotting it drops its snapshot feedback capability and waits until it observes the
67//! snapshot input advancing to the empty frontier. This allows the leader to COMMIT its
68//! transaction last, which is the transaction that exported the snapshot.
69//!
70//! It's unclear if this is strictly necessary, but having the frontiers made it easy enough that I
71//! added the synchronization.
72//!
73//! ## Snapshot rewinding
74//!
75//! Ingestion dataflows must produce definite data, including the snapshot. What this means
76//! practically is that whenever we deem it necessary to snapshot a table we must do so at the same
77//! LSN. However, the method for running a transaction described above doesn't let us choose the
78//! LSN, it could be an LSN in the future chosen by PostgresSQL while it creates the temporary
79//! replication slot.
80//!
81//! The definition of differential collections states that a collection at some time `t_snapshot`
82//! is defined to be the accumulation of all updates that happen at `t <= t_snapshot`, where `<=`
83//! is the partial order. In this case we are faced with the problem of knowing the state of a
84//! table at `t_snapshot` but actually wanting to know the snapshot at `t_slot <= t_snapshot`.
85//!
86//! From the definition we can see that the snapshot at `t_slot` is related to the snapshot at
87//! `t_snapshot` with the following equations:
88//!
89//!```text
90//! sum(update: t <= t_snapshot) = sum(update: t <= t_slot) + sum(update: t_slot <= t <= t_snapshot)
91//!                                         |
92//!                                         V
93//! sum(update: t <= t_slot) = sum(update: t <= snapshot) - sum(update: t_slot <= t <= t_snapshot)
94//! ```
95//!
96//! Therefore, if we manage to recover the `sum(update: t_slot <= t <= t_snapshot)` term we will be
97//! able to "rewind" the snapshot we obtained at `t_snapshot` to `t_slot` by emitting all updates
98//! that happen between these two points with their diffs negated.
99//!
100//! It turns out that this term is exactly what the main replication slot provides us with and we
101//! can rewind snapshot at arbitrary points! In order to do this the snapshot dataflow emits rewind
102//! requests to the replication reader which informs it that a certain range of updates must be
103//! emitted at LSN 0 (by convention) with their diffs negated. These negated diffs are consolidated
104//! with the diffs taken at `t_snapshot` that were also emitted at LSN 0 (by convention) and we end
105//! up with a TVC that at LSN 0 contains the snapshot at `t_slot`.
106//!
107//! # Snapshot decoding
108//!
109//! The expectation is that tables will most likely be skewed on the number of rows they contain so
110//! while a `COPY` query for any given table runs on a single worker the decoding of the COPY
111//! stream is distributed to all workers.
112//!
113//!
114//! ```text
115//!                 ╭──────────────────╮
116//!    ┏━━━━━━━━━━━━v━┓                │ exported
117//!    ┃    table     ┃   ╭─────────╮  │ snapshot id
118//!    ┃    reader    ┠─>─┤broadcast├──╯
119//!    ┗━┯━━━━━━━━━━┯━┛   ╰─────────╯
120//!   raw│          │
121//!  COPY│          │
122//!  data│          │
123//! ╭────┴─────╮    │
124//! │distribute│    │
125//! ╰────┬─────╯    │
126//! ┏━━━━┷━━━━┓     │
127//! ┃  COPY   ┃     │
128//! ┃ decoder ┃     │
129//! ┗━━━━┯━━━━┛     │
130//!      │ snapshot │rewind
131//!      │ updates  │requests
132//!      v          v
133//! ```
134
135use std::collections::BTreeMap;
136use std::convert::Infallible;
137use std::pin::pin;
138use std::rc::Rc;
139use std::sync::Arc;
140use std::time::Duration;
141
142use anyhow::bail;
143use differential_dataflow::AsCollection;
144use futures::{StreamExt as _, TryStreamExt};
145use mz_ore::cast::CastFrom;
146use mz_ore::future::InTask;
147use mz_postgres_util::tunnel::PostgresFlavor;
148use mz_postgres_util::{Client, PostgresError, simple_query_opt};
149use mz_repr::{Datum, DatumVec, Diff, Row};
150use mz_sql_parser::ast::{Ident, display::AstDisplay};
151use mz_storage_types::errors::DataflowError;
152use mz_storage_types::sources::{MzOffset, PostgresSourceConnection};
153use mz_timely_util::builder_async::{
154    Event as AsyncEvent, OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton,
155};
156use mz_timely_util::operator::StreamExt;
157use timely::container::CapacityContainerBuilder;
158use timely::dataflow::channels::pact::Pipeline;
159use timely::dataflow::operators::core::Map;
160use timely::dataflow::operators::{Broadcast, CapabilitySet, Concat, ConnectLoop, Feedback};
161use timely::dataflow::{Scope, Stream};
162use timely::progress::Timestamp;
163use tokio_postgres::error::SqlState;
164use tokio_postgres::types::{Oid, PgLsn};
165use tracing::{error, trace};
166
167use crate::metrics::source::postgres::PgSnapshotMetrics;
168use crate::source::RawSourceCreationConfig;
169use crate::source::postgres::replication::RewindRequest;
170use crate::source::postgres::{
171    DefiniteError, ReplicationError, SourceOutputInfo, TransientError, verify_schema,
172};
173use crate::source::types::{
174    ProgressStatisticsUpdate, SignaledFuture, SourceMessage, StackedCollection,
175};
176
177/// Renders the snapshot dataflow. See the module documentation for more information.
178pub(crate) fn render<G: Scope<Timestamp = MzOffset>>(
179    mut scope: G,
180    config: RawSourceCreationConfig,
181    connection: PostgresSourceConnection,
182    table_info: BTreeMap<u32, BTreeMap<usize, SourceOutputInfo>>,
183    metrics: PgSnapshotMetrics,
184) -> (
185    StackedCollection<G, (usize, Result<SourceMessage, DataflowError>)>,
186    Stream<G, RewindRequest>,
187    Stream<G, Infallible>,
188    Stream<G, ProgressStatisticsUpdate>,
189    Stream<G, ReplicationError>,
190    PressOnDropButton,
191) {
192    let op_name = format!("TableReader({})", config.id);
193    let mut builder = AsyncOperatorBuilder::new(op_name, scope.clone());
194
195    let (feedback_handle, feedback_data) = scope.feedback(Default::default());
196
197    let (raw_handle, raw_data) = builder.new_output();
198    let (rewinds_handle, rewinds) = builder.new_output();
199    // This output is used to signal to the replication operator that the replication slot has been
200    // created. With the current state of execution serialization there isn't a lot of benefit
201    // of splitting the snapshot and replication phases into two operators.
202    // TODO(petrosagg): merge the two operators in one (while still maintaining separation as
203    // functions/modules)
204    let (_, slot_ready) = builder.new_output::<CapacityContainerBuilder<_>>();
205    let (snapshot_handle, snapshot) = builder.new_output();
206    let (definite_error_handle, definite_errors) = builder.new_output();
207
208    let (stats_output, stats_stream) = builder.new_output();
209
210    // This operator needs to broadcast data to itself in order to synchronize the transaction
211    // snapshot. However, none of the feedback capabilities result in output messages and for the
212    // feedback edge specifically having a default conncetion would result in a loop.
213    let mut snapshot_input = builder.new_disconnected_input(&feedback_data, Pipeline);
214
215    // The export id must be sent to all workes, so we broadcast the feedback connection
216    snapshot.broadcast().connect_loop(feedback_handle);
217
218    let is_snapshot_leader = config.responsible_for("snapshot_leader");
219
220    // A global view of all outputs that will be snapshot by all workers.
221    let mut all_outputs = vec![];
222    // A filtered table info containing only the tables that this worker should snapshot.
223    let mut reader_table_info = BTreeMap::new();
224    for (table, outputs) in table_info.iter() {
225        for (&output_index, output) in outputs {
226            if *output.resume_upper != [MzOffset::minimum()] {
227                // Already has been snapshotted.
228                continue;
229            }
230            all_outputs.push(output_index);
231            if config.responsible_for(*table) {
232                reader_table_info
233                    .entry(*table)
234                    .or_insert_with(BTreeMap::new)
235                    .insert(output_index, (output.desc.clone(), output.casts.clone()));
236            }
237        }
238    }
239
240    let (button, transient_errors) = builder.build_fallible(move |caps| {
241        let busy_signal = Arc::clone(&config.busy_signal);
242        Box::pin(SignaledFuture::new(busy_signal, async move {
243            let id = config.id;
244            let worker_id = config.worker_id;
245
246            let [
247                data_cap_set,
248                rewind_cap_set,
249                slot_ready_cap_set,
250                snapshot_cap_set,
251                definite_error_cap_set,
252                stats_cap,
253            ]: &mut [_; 6] = caps.try_into().unwrap();
254
255            trace!(
256                %id,
257                "timely-{worker_id} initializing table reader \
258                    with {} tables to snapshot",
259                    reader_table_info.len()
260            );
261
262            // Nothing needs to be snapshot.
263            if all_outputs.is_empty() {
264                trace!(%id, "no exports to snapshot");
265                // Note we do not emit a `ProgressStatisticsUpdate::Snapshot` update here,
266                // as we do not want to attempt to override the current value with 0. We
267                // just leave it null.
268                return Ok(());
269            }
270
271            let connection_config = connection
272                .connection
273                .config(
274                    &config.config.connection_context.secrets_reader,
275                    &config.config,
276                    InTask::Yes,
277                )
278                .await?;
279            let task_name = format!("timely-{worker_id} PG snapshotter");
280
281            let client = if is_snapshot_leader {
282                let client = connection_config
283                    .connect_replication(&config.config.connection_context.ssh_tunnel_manager)
284                    .await?;
285
286                // Attempt to export the snapshot by creating the main replication slot. If that
287                // succeeds then there is no need for creating additional temporary slots.
288                let main_slot = &connection.publication_details.slot;
289                let snapshot_info = match export_snapshot(&client, main_slot, false).await {
290                    Ok(info) => info,
291                    Err(err @ TransientError::ReplicationSlotAlreadyExists) => {
292                        match connection.connection.flavor {
293                            // If we're connecting to a vanilla we have the option of exporting a
294                            // snapshot via a temporary slot
295                            PostgresFlavor::Vanilla => {
296                                let tmp_slot = format!(
297                                    "mzsnapshot_{}",
298                                    uuid::Uuid::new_v4()).replace('-', ""
299                                );
300                                export_snapshot(&client, &tmp_slot, true).await?
301                            }
302                            // No salvation for Yugabyte
303                            PostgresFlavor::Yugabyte => return Err(err),
304                        }
305                    }
306                    Err(err) => return Err(err),
307                };
308                trace!(
309                    %id,
310                    "timely-{worker_id} exporting snapshot info {snapshot_info:?}");
311                snapshot_handle.give(&snapshot_cap_set[0], snapshot_info);
312
313                client
314            } else {
315                // Only the snapshot leader needs a replication connection.
316                connection_config
317                    .connect(
318                        &task_name,
319                        &config.config.connection_context.ssh_tunnel_manager,
320                    )
321                    .await?
322            };
323            *slot_ready_cap_set = CapabilitySet::new();
324
325            // Configure statement_timeout based on param. We want to be able to
326            // override the server value here in case it's set too low,
327            // respective to the size of the data we need to copy.
328            set_statement_timeout(
329                &client,
330                config
331                    .config
332                    .parameters
333                    .pg_source_snapshot_statement_timeout,
334            )
335            .await?;
336
337            let (snapshot, snapshot_lsn) = loop {
338                match snapshot_input.next().await {
339                    Some(AsyncEvent::Data(_, mut data)) => {
340                        break data.pop().expect("snapshot sent above")
341                    }
342                    Some(AsyncEvent::Progress(_)) => continue,
343                    None => panic!(
344                        "feedback closed \
345                    before sending snapshot info"
346                    ),
347                }
348            };
349            // Snapshot leader is already in identified transaction but all other workers need to enter it.
350            if !is_snapshot_leader {
351                trace!(%id, "timely-{worker_id} using snapshot id {snapshot:?}");
352                use_snapshot(&client, &snapshot).await?;
353            }
354
355            let upstream_info = {
356                let schema_client = connection_config
357                    .connect(
358                        "snapshot schema info",
359                        &config.config.connection_context.ssh_tunnel_manager,
360                    )
361                    .await?;
362                match mz_postgres_util::publication_info(&schema_client, &connection.publication, Some(&reader_table_info.keys().copied().collect::<Vec<_>>()))
363                    .await
364                {
365                    // If the replication stream cannot be obtained in a definite way there is
366                    // nothing else to do. These errors are not retractable.
367                    Err(PostgresError::PublicationMissing(publication)) => {
368                        let err = DefiniteError::PublicationDropped(publication);
369                        for (oid, outputs) in reader_table_info.iter() {
370                            // Produce a definite error here and then exit to ensure
371                            // a missing publication doesn't generate a transient
372                            // error and restart this dataflow indefinitely.
373                            //
374                            // We pick `u64::MAX` as the LSN which will (in
375                            // practice) never conflict any previously revealed
376                            // portions of the TVC.
377                            for output_index in outputs.keys() {
378                                let update = (
379                                    (*oid, *output_index, Err(err.clone().into())),
380                                    MzOffset::from(u64::MAX),
381                                    Diff::ONE,
382                                );
383                                raw_handle.give_fueled(&data_cap_set[0], update).await;
384                            }
385                        }
386
387                        definite_error_handle.give(
388                            &definite_error_cap_set[0],
389                            ReplicationError::Definite(Rc::new(err)),
390                        );
391                        return Ok(());
392                    }
393                    Err(e) => Err(TransientError::from(e))?,
394                    Ok(i) => i,
395                }
396            };
397
398            let worker_tables = reader_table_info
399                .iter()
400                .map(|(_, outputs)| {
401                    // just use the first output's desc since the fields accessed here should
402                    // be the same for all outputs
403                    let desc = &outputs.values().next().expect("at least 1").0;
404                    (
405                        format!(
406                            "{}.{}",
407                            Ident::new_unchecked(desc.namespace.clone()).to_ast_string_simple(),
408                            Ident::new_unchecked(desc.name.clone()).to_ast_string_simple()
409                        ),
410                        desc.oid.clone(),
411                        outputs.len(),
412                    )
413                })
414                .collect();
415
416            let snapshot_total =
417                fetch_snapshot_size(&client, worker_tables, metrics, &config).await?;
418
419            stats_output.give(
420                &stats_cap[0],
421                ProgressStatisticsUpdate::Snapshot {
422                    records_known: snapshot_total,
423                    records_staged: 0,
424                },
425            );
426
427            let mut snapshot_staged = 0;
428            for (&oid, outputs) in reader_table_info.iter() {
429                let mut table_name = None;
430                let mut output_indexes = vec![];
431                for (output_index, (expected_desc, casts)) in outputs.iter() {
432                    match verify_schema(oid, expected_desc, &upstream_info, casts) {
433                        Ok(()) => {
434                            if table_name.is_none() {
435                                table_name = Some((
436                                    expected_desc.namespace.clone(),
437                                    expected_desc.name.clone(),
438                                ));
439                            }
440                            output_indexes.push(output_index);
441                        }
442                        Err(err) => {
443                            raw_handle
444                                .give_fueled(
445                                    &data_cap_set[0],
446                                    (
447                                        (oid, *output_index, Err(err.into())),
448                                        MzOffset::minimum(),
449                                       Diff::ONE,
450                                    ),
451                                )
452                                .await;
453                            continue;
454                        }
455                    };
456                }
457
458                let (namespace, table) = match table_name {
459                    Some(t) => t,
460                    None => {
461                        // all outputs errored for this table
462                        continue;
463                    }
464                };
465
466                trace!(
467                    %id,
468                    "timely-{worker_id} snapshotting table {:?}({oid}) @ {snapshot_lsn}",
469                    table
470                );
471
472                // To handle quoted/keyword names, we can use `Ident`'s AST printing, which
473                // emulate's PG's rules for name formatting.
474                let query = format!(
475                    "COPY {}.{} TO STDOUT (FORMAT TEXT, DELIMITER '\t')",
476                    Ident::new_unchecked(namespace).to_ast_string_simple(),
477                    Ident::new_unchecked(table).to_ast_string_simple(),
478                );
479                let mut stream = pin!(client.copy_out_simple(&query).await?);
480
481                let mut update = ((oid, 0, Ok(vec![])), MzOffset::minimum(), Diff::ONE);
482                while let Some(bytes) = stream.try_next().await? {
483                    let data = update.0 .2.as_mut().unwrap();
484                    data.clear();
485                    data.extend_from_slice(&bytes);
486                    for output_index in &output_indexes {
487                        update.0 .1 = **output_index;
488                        raw_handle.give_fueled(&data_cap_set[0], &update).await;
489                        snapshot_staged += 1;
490                        // TODO(guswynn): does this 1000 need to be configurable?
491                        if snapshot_staged % 1000 == 0 {
492                            stats_output.give(
493                                &stats_cap[0],
494                                ProgressStatisticsUpdate::Snapshot {
495                                    records_known: snapshot_total,
496                                    records_staged: snapshot_staged,
497                                },
498                            );
499                        }
500                    }
501                }
502            }
503
504            // We are done with the snapshot so now we will emit rewind requests. It is important
505            // that this happens after the snapshot has finished because this is what unblocks the
506            // replication operator and we want this to happen serially. It might seem like a good
507            // idea to read the replication stream concurrently with the snapshot but it actually
508            // leads to a lot of data being staged for the future, which needlesly consumed memory
509            // in the cluster.
510            for output in reader_table_info.values() {
511                for (output_index, (desc, _)) in output {
512                    trace!(%id, "timely-{worker_id} producing rewind request for table {} output {output_index}", desc.name);
513                    let req = RewindRequest { output_index: *output_index, snapshot_lsn };
514                    rewinds_handle.give(&rewind_cap_set[0], req);
515                }
516            }
517            *rewind_cap_set = CapabilitySet::new();
518
519            if snapshot_staged < snapshot_total {
520                error!(%id, "timely-{worker_id} snapshot size {snapshot_total} is somehow
521                                 bigger than records staged {snapshot_staged}");
522                snapshot_staged = snapshot_total;
523            }
524            stats_output.give(
525                &stats_cap[0],
526                ProgressStatisticsUpdate::Snapshot {
527                    records_known: snapshot_total,
528                    records_staged: snapshot_staged,
529                },
530            );
531
532            // Failure scenario after we have produced the snapshot, but before a successful COMMIT
533            fail::fail_point!("pg_snapshot_failure", |_| Err(
534                TransientError::SyntheticError
535            ));
536
537            // The exporting worker should wait for all the other workers to commit before dropping
538            // its client since this is what holds the exported transaction alive.
539            if is_snapshot_leader {
540                trace!(%id, "timely-{worker_id} waiting for all workers to finish");
541                *snapshot_cap_set = CapabilitySet::new();
542                while snapshot_input.next().await.is_some() {}
543                trace!(%id, "timely-{worker_id} (leader) comitting COPY transaction");
544                client.simple_query("COMMIT").await?;
545            } else {
546                trace!(%id, "timely-{worker_id} comitting COPY transaction");
547                client.simple_query("COMMIT").await?;
548                *snapshot_cap_set = CapabilitySet::new();
549            }
550            drop(client);
551            Ok(())
552        }))
553    });
554
555    // We now decode the COPY protocol and apply the cast expressions
556    let mut text_row = Row::default();
557    let mut final_row = Row::default();
558    let mut datum_vec = DatumVec::new();
559    let snapshot_updates = raw_data
560        .map::<Vec<_>, _, _>(Clone::clone)
561        .distribute()
562        .map(move |((oid, output_index, event), time, diff)| {
563            let output = &table_info
564                .get(&oid)
565                .and_then(|outputs| outputs.get(&output_index))
566                .expect("table_info contains all outputs");
567
568            let event = event
569                .as_ref()
570                .map_err(|e: &DataflowError| e.clone())
571                .and_then(|bytes| {
572                    decode_copy_row(bytes, output.casts.len(), &mut text_row)?;
573                    let datums = datum_vec.borrow_with(&text_row);
574                    super::cast_row(&output.casts, &datums, &mut final_row)?;
575                    Ok(SourceMessage {
576                        key: Row::default(),
577                        value: final_row.clone(),
578                        metadata: Row::default(),
579                    })
580                });
581
582            ((output_index, event), time, diff)
583        })
584        .as_collection();
585
586    let errors = definite_errors.concat(&transient_errors.map(ReplicationError::from));
587
588    (
589        snapshot_updates,
590        rewinds,
591        slot_ready,
592        stats_stream,
593        errors,
594        button.press_on_drop(),
595    )
596}
597
598/// Starts a read-only transaction on the SQL session of `client` at a consistent LSN point by
599/// creating a replication slot. Returns a snapshot identifier that can be imported in
600/// other SQL session and the LSN of the consistent point.
601async fn export_snapshot(
602    client: &Client,
603    slot: &str,
604    temporary: bool,
605) -> Result<(String, MzOffset), TransientError> {
606    match export_snapshot_inner(client, slot, temporary).await {
607        Ok(ok) => Ok(ok),
608        Err(err) => {
609            // We don't want to leave the client inside a failed tx
610            client.simple_query("ROLLBACK;").await?;
611            Err(err)
612        }
613    }
614}
615
616async fn export_snapshot_inner(
617    client: &Client,
618    slot: &str,
619    temporary: bool,
620) -> Result<(String, MzOffset), TransientError> {
621    client
622        .simple_query("BEGIN READ ONLY ISOLATION LEVEL REPEATABLE READ;")
623        .await?;
624
625    // Note: Using unchecked here is okay because we're using it in a SQL query.
626    let slot = Ident::new_unchecked(slot).to_ast_string_simple();
627    let temporary_str = if temporary { " TEMPORARY" } else { "" };
628    let query =
629        format!("CREATE_REPLICATION_SLOT {slot}{temporary_str} LOGICAL \"pgoutput\" USE_SNAPSHOT");
630    let row = match simple_query_opt(client, &query).await {
631        Ok(row) => Ok(row.unwrap()),
632        Err(PostgresError::Postgres(err)) if err.code() == Some(&SqlState::DUPLICATE_OBJECT) => {
633            return Err(TransientError::ReplicationSlotAlreadyExists);
634        }
635        Err(err) => Err(err),
636    }?;
637
638    // When creating a replication slot postgres returns the LSN of its consistent point, which is
639    // the LSN that must be passed to `START_REPLICATION` to cleanly transition from the snapshot
640    // phase to the replication phase. `START_REPLICATION` includes all transactions that commit at
641    // LSNs *greater than or equal* to the passed LSN. Therefore the snapshot phase must happen at
642    // the greatest LSN that is not beyond the consistent point. That LSN is `consistent_point - 1`
643    let consistent_point: PgLsn = row.get("consistent_point").unwrap().parse().unwrap();
644    let consistent_point = u64::from(consistent_point)
645        .checked_sub(1)
646        .expect("consistent point is always non-zero");
647
648    let row = simple_query_opt(client, "SELECT pg_export_snapshot();")
649        .await?
650        .unwrap();
651    let snapshot = row.get("pg_export_snapshot").unwrap().to_owned();
652
653    Ok((snapshot, MzOffset::from(consistent_point)))
654}
655
656/// Starts a read-only transaction on the SQL session of `client` at a the consistent LSN point of
657/// `snapshot`.
658async fn use_snapshot(client: &Client, snapshot: &str) -> Result<(), TransientError> {
659    client
660        .simple_query("BEGIN READ ONLY ISOLATION LEVEL REPEATABLE READ;")
661        .await?;
662    let query = format!("SET TRANSACTION SNAPSHOT '{snapshot}';");
663    client.simple_query(&query).await?;
664    Ok(())
665}
666
667async fn set_statement_timeout(client: &Client, timeout: Duration) -> Result<(), TransientError> {
668    // Value is known to accept milliseconds w/o units.
669    // https://www.postgresql.org/docs/current/runtime-config-client.html
670    client
671        .simple_query(&format!("SET statement_timeout = {}", timeout.as_millis()))
672        .await?;
673    Ok(())
674}
675
676/// Decodes a row of `col_len` columns obtained from a text encoded COPY query into `row`.
677fn decode_copy_row(data: &[u8], col_len: usize, row: &mut Row) -> Result<(), DefiniteError> {
678    let mut packer = row.packer();
679    let row_parser = mz_pgcopy::CopyTextFormatParser::new(data, b'\t', "\\N");
680    let mut column_iter = row_parser.iter_raw_truncating(col_len);
681    for _ in 0..col_len {
682        let value = match column_iter.next() {
683            Some(Ok(value)) => value,
684            Some(Err(_)) => return Err(DefiniteError::InvalidCopyInput),
685            None => return Err(DefiniteError::MissingColumn),
686        };
687        let datum = value.map(super::decode_utf8_text).transpose()?;
688        packer.push(datum.unwrap_or(Datum::Null));
689    }
690    Ok(())
691}
692
693/// Record the sizes of the tables being snapshotted in `PgSnapshotMetrics`.
694async fn fetch_snapshot_size(
695    client: &Client,
696    // The table names, oids, and number of outputs for this table owned by this worker.
697    tables: Vec<(String, Oid, usize)>,
698    metrics: PgSnapshotMetrics,
699    config: &RawSourceCreationConfig,
700) -> Result<u64, anyhow::Error> {
701    // TODO(guswynn): delete unused configs
702    let snapshot_config = config.config.parameters.pg_snapshot_config;
703
704    let mut total = 0;
705    for (table, oid, output_count) in tables {
706        let stats =
707            collect_table_statistics(client, snapshot_config.collect_strict_count, &table, oid)
708                .await?;
709        metrics.record_table_count_latency(
710            table,
711            stats.count_latency,
712            snapshot_config.collect_strict_count,
713        );
714        total += stats.count * u64::cast_from(output_count);
715    }
716    Ok(total)
717}
718
719#[derive(Default)]
720struct TableStatistics {
721    count: u64,
722    count_latency: f64,
723}
724
725async fn collect_table_statistics(
726    client: &Client,
727    strict: bool,
728    table: &str,
729    oid: u32,
730) -> Result<TableStatistics, anyhow::Error> {
731    use mz_ore::metrics::MetricsFutureExt;
732    let mut stats = TableStatistics::default();
733
734    if strict {
735        let count_row = simple_query_opt(client, &format!("SELECT count(*) as count from {table}"))
736            .wall_time()
737            .set_at(&mut stats.count_latency)
738            .await?;
739        match count_row {
740            Some(row) => {
741                let count: i64 = row.get("count").unwrap().parse().unwrap();
742                stats.count = count.try_into()?;
743            }
744            None => bail!("failed to get count for {table}"),
745        }
746    } else {
747        let estimate_row = simple_query_opt(
748            client,
749            &format!(
750                "SELECT reltuples::bigint AS estimate_count FROM pg_class WHERE oid = '{oid}'"
751            ),
752        )
753        .wall_time()
754        .set_at(&mut stats.count_latency)
755        .await?;
756        match estimate_row {
757            Some(row) => match row.get("estimate_count").unwrap().parse().unwrap() {
758                -1 => stats.count = 0,
759                n => stats.count = n.try_into()?,
760            },
761            None => bail!("failed to get estimate count for {table}"),
762        };
763    }
764
765    Ok(stats)
766}