mz_storage/source/postgres/snapshot.rs
1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Renders the table snapshot side of the [`PostgresSourceConnection`] ingestion dataflow.
11//!
12//! # Snapshot reading
13//!
14//! Depending on the resumption LSNs the table reader decides which tables need to be snapshotted
15//! and performs a simple `COPY` query on them in order to get a snapshot. There are a few subtle
16//! points about this operation, described in the following sections.
17//!
18//! ## Consistent LSN point for snapshot transactions
19//!
20//! Given that all our ingestion is based on correctly timestamping updates with the LSN they
21//! happened at it is important that we run the `COPY` query at a specific LSN point that is
22//! relatable with the LSN numbers we receive from the replication stream. Such point does not
23//! necessarily exist for a normal SQL transaction. To achieve this we must force postgres to
24//! produce a consistent point and let us know of the LSN number of that by creating a replication
25//! slot as the first statement in a transaction.
26//!
27//! This is a temporary dummy slot that is only used to put our snapshot transaction on a
28//! consistent LSN point. Unfortunately no lighterweight method exists for doing this. See this
29//! [postgres thread] for more details.
30//!
31//! One might wonder why we don't use the actual real slot to provide us with the snapshot point
32//! which would automatically be at the correct LSN. The answer is that it's possible that we crash
33//! and restart after having already created the slot but before having finished the snapshot. In
34//! that case the restarting process will have lost its opportunity to run queries at the slot's
35//! consistent point as that opportunity only exists in the ephemeral transaction that created the
36//! slot and that is long gone. Additionally there are good reasons of why we'd like to move the
37//! slot creation much earlier, e.g during purification, in which case the slot will always be
38//! pre-created.
39//!
40//! [postgres thread]: https://www.postgresql.org/message-id/flat/CAMN0T-vzzNy6TV1Jvh4xzNQdAvCLBQK_kh6_U7kAXgGU3ZFg-Q%40mail.gmail.com
41//!
42//! ## Reusing the consistent point among all workers
43//!
44//! Creating replication slots is potentially expensive so the code makes is such that all workers
45//! cooperate and reuse one consistent snapshot among them. In order to do so we make use the
46//! "export transaction" feature of postgres. This feature allows one SQL session to create an
47//! identifier for the transaction (a string identifier) it is currently in, which can be used by
48//! other sessions to enter the same "snapshot".
49//!
50//! We accomplish this by picking one worker at random to function as the transaction leader. The
51//! transaction leader is responsible for starting a SQL session, creating a temporary replication
52//! slot in a transaction, exporting the transaction id, and broadcasting the transaction
53//! information to all other workers via a broadcasted feedback edge.
54//!
55//! During this phase the follower workers are simply waiting to hear on the feedback edge,
56//! effectively synchronizing with the leader. Once all workers have received the snapshot
57//! information they can all start to perform their assigned COPY queries.
58//!
59//! The leader and follower steps described above are accomplished by the [`export_snapshot`] and
60//! [`use_snapshot`] functions respectively.
61//!
62//! ## Coordinated transaction COMMIT
63//!
64//! When follower workers are done with snapshotting they commit their transaction, close their
65//! session, and then drop their snapshot feedback capability. When the leader worker is done with
66//! snapshotting it drops its snapshot feedback capability and waits until it observes the
67//! snapshot input advancing to the empty frontier. This allows the leader to COMMIT its
68//! transaction last, which is the transaction that exported the snapshot.
69//!
70//! It's unclear if this is strictly necessary, but having the frontiers made it easy enough that I
71//! added the synchronization.
72//!
73//! ## Snapshot rewinding
74//!
75//! Ingestion dataflows must produce definite data, including the snapshot. What this means
76//! practically is that whenever we deem it necessary to snapshot a table we must do so at the same
77//! LSN. However, the method for running a transaction described above doesn't let us choose the
78//! LSN, it could be an LSN in the future chosen by PostgresSQL while it creates the temporary
79//! replication slot.
80//!
81//! The definition of differential collections states that a collection at some time `t_snapshot`
82//! is defined to be the accumulation of all updates that happen at `t <= t_snapshot`, where `<=`
83//! is the partial order. In this case we are faced with the problem of knowing the state of a
84//! table at `t_snapshot` but actually wanting to know the snapshot at `t_slot <= t_snapshot`.
85//!
86//! From the definition we can see that the snapshot at `t_slot` is related to the snapshot at
87//! `t_snapshot` with the following equations:
88//!
89//!```text
90//! sum(update: t <= t_snapshot) = sum(update: t <= t_slot) + sum(update: t_slot <= t <= t_snapshot)
91//! |
92//! V
93//! sum(update: t <= t_slot) = sum(update: t <= snapshot) - sum(update: t_slot <= t <= t_snapshot)
94//! ```
95//!
96//! Therefore, if we manage to recover the `sum(update: t_slot <= t <= t_snapshot)` term we will be
97//! able to "rewind" the snapshot we obtained at `t_snapshot` to `t_slot` by emitting all updates
98//! that happen between these two points with their diffs negated.
99//!
100//! It turns out that this term is exactly what the main replication slot provides us with and we
101//! can rewind snapshot at arbitrary points! In order to do this the snapshot dataflow emits rewind
102//! requests to the replication reader which informs it that a certain range of updates must be
103//! emitted at LSN 0 (by convention) with their diffs negated. These negated diffs are consolidated
104//! with the diffs taken at `t_snapshot` that were also emitted at LSN 0 (by convention) and we end
105//! up with a TVC that at LSN 0 contains the snapshot at `t_slot`.
106//!
107//! # Snapshot decoding
108//!
109//! The expectation is that tables will most likely be skewed on the number of rows they contain so
110//! while a `COPY` query for any given table runs on a single worker the decoding of the COPY
111//! stream is distributed to all workers.
112//!
113//!
114//! ```text
115//! ╭──────────────────╮
116//! ┏━━━━━━━━━━━━v━┓ │ exported
117//! ┃ table ┃ ╭─────────╮ │ snapshot id
118//! ┃ reader ┠─>─┤broadcast├──╯
119//! ┗━┯━━━━━━━━━━┯━┛ ╰─────────╯
120//! raw│ │
121//! COPY│ │
122//! data│ │
123//! ╭────┴─────╮ │
124//! │distribute│ │
125//! ╰────┬─────╯ │
126//! ┏━━━━┷━━━━┓ │
127//! ┃ COPY ┃ │
128//! ┃ decoder ┃ │
129//! ┗━━━━┯━━━━┛ │
130//! │ snapshot │rewind
131//! │ updates │requests
132//! v v
133//! ```
134
135use std::collections::BTreeMap;
136use std::convert::Infallible;
137use std::pin::pin;
138use std::rc::Rc;
139use std::sync::Arc;
140use std::time::Duration;
141
142use anyhow::bail;
143use differential_dataflow::AsCollection;
144use futures::{StreamExt as _, TryStreamExt};
145use mz_ore::cast::CastFrom;
146use mz_ore::future::InTask;
147use mz_postgres_util::{Client, PostgresError, simple_query_opt};
148use mz_repr::{Datum, DatumVec, Diff, Row};
149use mz_sql_parser::ast::{Ident, display::AstDisplay};
150use mz_storage_types::errors::DataflowError;
151use mz_storage_types::parameters::PgSourceSnapshotConfig;
152use mz_storage_types::sources::{MzOffset, PostgresSourceConnection};
153use mz_timely_util::builder_async::{
154 Event as AsyncEvent, OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton,
155};
156use timely::container::CapacityContainerBuilder;
157use timely::dataflow::channels::pact::{Exchange, Pipeline};
158use timely::dataflow::operators::core::Map;
159use timely::dataflow::operators::{
160 Broadcast, CapabilitySet, Concat, ConnectLoop, Feedback, Operator,
161};
162use timely::dataflow::{Scope, Stream};
163use timely::progress::Timestamp;
164use tokio_postgres::error::SqlState;
165use tokio_postgres::types::{Oid, PgLsn};
166use tracing::trace;
167
168use crate::metrics::source::postgres::PgSnapshotMetrics;
169use crate::source::RawSourceCreationConfig;
170use crate::source::postgres::replication::RewindRequest;
171use crate::source::postgres::{
172 DefiniteError, ReplicationError, SourceOutputInfo, TransientError, verify_schema,
173};
174use crate::source::types::{SignaledFuture, SourceMessage, StackedCollection};
175use crate::statistics::SourceStatistics;
176
177/// Renders the snapshot dataflow. See the module documentation for more information.
178pub(crate) fn render<G: Scope<Timestamp = MzOffset>>(
179 mut scope: G,
180 config: RawSourceCreationConfig,
181 connection: PostgresSourceConnection,
182 table_info: BTreeMap<u32, BTreeMap<usize, SourceOutputInfo>>,
183 metrics: PgSnapshotMetrics,
184) -> (
185 StackedCollection<G, (usize, Result<SourceMessage, DataflowError>)>,
186 Stream<G, RewindRequest>,
187 Stream<G, Infallible>,
188 Stream<G, ReplicationError>,
189 PressOnDropButton,
190) {
191 let op_name = format!("TableReader({})", config.id);
192 let mut builder = AsyncOperatorBuilder::new(op_name, scope.clone());
193
194 let (feedback_handle, feedback_data) = scope.feedback(Default::default());
195
196 let (raw_handle, raw_data) = builder.new_output();
197 let (rewinds_handle, rewinds) = builder.new_output();
198 // This output is used to signal to the replication operator that the replication slot has been
199 // created. With the current state of execution serialization there isn't a lot of benefit
200 // of splitting the snapshot and replication phases into two operators.
201 // TODO(petrosagg): merge the two operators in one (while still maintaining separation as
202 // functions/modules)
203 let (_, slot_ready) = builder.new_output::<CapacityContainerBuilder<_>>();
204 let (snapshot_handle, snapshot) = builder.new_output();
205 let (definite_error_handle, definite_errors) = builder.new_output();
206
207 // This operator needs to broadcast data to itself in order to synchronize the transaction
208 // snapshot. However, none of the feedback capabilities result in output messages and for the
209 // feedback edge specifically having a default conncetion would result in a loop.
210 let mut snapshot_input = builder.new_disconnected_input(&feedback_data, Pipeline);
211
212 // The export id must be sent to all workers, so we broadcast the feedback connection
213 snapshot.broadcast().connect_loop(feedback_handle);
214
215 let is_snapshot_leader = config.responsible_for("snapshot_leader");
216
217 // A global view of all outputs that will be snapshot by all workers.
218 let mut all_outputs = vec![];
219 // A filtered table info containing only the tables that this worker should snapshot.
220 let mut reader_table_info = BTreeMap::new();
221 // A collecction of `SourceStatistics` to update for a given Oid. Same info exists in reader_table_info,
222 // but this avoids having to iterate + map each time the statistics are needed.
223 let mut export_statistics = BTreeMap::new();
224 for (table, outputs) in table_info.iter() {
225 for (&output_index, output) in outputs {
226 if *output.resume_upper != [MzOffset::minimum()] {
227 // Already has been snapshotted.
228 continue;
229 }
230 all_outputs.push(output_index);
231 if config.responsible_for(*table) {
232 reader_table_info
233 .entry(*table)
234 .or_insert_with(BTreeMap::new)
235 .insert(output_index, (output.desc.clone(), output.casts.clone()));
236
237 let statistics = config
238 .statistics
239 .get(&output.export_id)
240 .expect("statistics are initialized")
241 .clone();
242 export_statistics
243 .entry(*table)
244 .or_insert_with(Vec::new)
245 .push(statistics);
246 }
247 }
248 }
249
250 let (button, transient_errors) = builder.build_fallible(move |caps| {
251 let busy_signal = Arc::clone(&config.busy_signal);
252 Box::pin(SignaledFuture::new(busy_signal, async move {
253 let id = config.id;
254 let worker_id = config.worker_id;
255 let [
256 data_cap_set,
257 rewind_cap_set,
258 slot_ready_cap_set,
259 snapshot_cap_set,
260 definite_error_cap_set,
261 ]: &mut [_; 5] = caps.try_into().unwrap();
262
263 trace!(
264 %id,
265 "timely-{worker_id} initializing table reader \
266 with {} tables to snapshot",
267 reader_table_info.len()
268 );
269
270 let connection_config = connection
271 .connection
272 .config(
273 &config.config.connection_context.secrets_reader,
274 &config.config,
275 InTask::Yes,
276 )
277 .await?;
278
279
280 // The snapshot operator is responsible for creating the replication slot(s).
281 // This first slot is the permanent slot that will be used for reading the replication
282 // stream. A temporary slot is created further on to capture table snapshots.
283 let replication_client = if is_snapshot_leader {
284 let client = connection_config
285 .connect_replication(&config.config.connection_context.ssh_tunnel_manager)
286 .await?;
287 let main_slot = &connection.publication_details.slot;
288
289 tracing::info!(%id, "ensuring replication slot {main_slot} exists");
290 super::ensure_replication_slot(&client, main_slot).await?;
291 Some(client)
292 } else {
293 None
294 };
295 *slot_ready_cap_set = CapabilitySet::new();
296
297 // Nothing needs to be snapshot.
298 if all_outputs.is_empty() {
299 trace!(%id, "no exports to snapshot");
300 // Note we do not emit a `ProgressStatisticsUpdate::Snapshot` update here,
301 // as we do not want to attempt to override the current value with 0. We
302 // just leave it null.
303 return Ok(());
304 }
305
306 // A worker *must* emit a count even if not responsible for snapshotting a table
307 // as statistic summarization will return null if any worker hasn't set a value.
308 // This will also reset snapshot stats for any exports not snapshotting.
309 // If no workers need to snapshot, then avoid emitting these as they will clear
310 // previous stats.
311 for statistics in config.statistics.values() {
312 statistics.set_snapshot_records_known(0);
313 statistics.set_snapshot_records_staged(0);
314 }
315
316 // replication client is only set if this worker is the snapshot leader
317 let client = match replication_client {
318 Some(client) => {
319 let tmp_slot = format!("mzsnapshot_{}", uuid::Uuid::new_v4()).replace('-', "");
320 let snapshot_info = export_snapshot(&client, &tmp_slot, true).await?;
321 trace!(
322 %id,
323 "timely-{worker_id} exporting snapshot info {snapshot_info:?}");
324 snapshot_handle.give(&snapshot_cap_set[0], snapshot_info);
325
326 client
327 }
328 None => {
329 // Only the snapshot leader needs a replication connection.
330 let task_name = format!("timely-{worker_id} PG snapshotter");
331 connection_config
332 .connect(
333 &task_name,
334 &config.config.connection_context.ssh_tunnel_manager,
335 )
336 .await?
337 }
338 };
339
340 // Configure statement_timeout based on param. We want to be able to
341 // override the server value here in case it's set too low,
342 // respective to the size of the data we need to copy.
343 set_statement_timeout(
344 &client,
345 config
346 .config
347 .parameters
348 .pg_source_snapshot_statement_timeout,
349 )
350 .await?;
351
352 let (snapshot, snapshot_lsn) = loop {
353 match snapshot_input.next().await {
354 Some(AsyncEvent::Data(_, mut data)) => {
355 break data.pop().expect("snapshot sent above")
356 }
357 Some(AsyncEvent::Progress(_)) => continue,
358 None => panic!(
359 "feedback closed \
360 before sending snapshot info"
361 ),
362 }
363 };
364 // Snapshot leader is already in identified transaction but all other workers need to enter it.
365 if !is_snapshot_leader {
366 trace!(%id, "timely-{worker_id} using snapshot id {snapshot:?}");
367 use_snapshot(&client, &snapshot).await?;
368 }
369
370 let upstream_info = {
371 let schema_client = connection_config
372 .connect(
373 "snapshot schema info",
374 &config.config.connection_context.ssh_tunnel_manager,
375 )
376 .await?;
377 match mz_postgres_util::publication_info(&schema_client, &connection.publication, Some(&reader_table_info.keys().copied().collect::<Vec<_>>()))
378 .await
379 {
380 // If the replication stream cannot be obtained in a definite way there is
381 // nothing else to do. These errors are not retractable.
382 Err(PostgresError::PublicationMissing(publication)) => {
383 let err = DefiniteError::PublicationDropped(publication);
384 for (oid, outputs) in reader_table_info.iter() {
385 // Produce a definite error here and then exit to ensure
386 // a missing publication doesn't generate a transient
387 // error and restart this dataflow indefinitely.
388 //
389 // We pick `u64::MAX` as the LSN which will (in
390 // practice) never conflict any previously revealed
391 // portions of the TVC.
392 for output_index in outputs.keys() {
393 let update = (
394 (*oid, *output_index, Err(err.clone().into())),
395 MzOffset::from(u64::MAX),
396 Diff::ONE,
397 );
398 raw_handle.give_fueled(&data_cap_set[0], update).await;
399 }
400 }
401
402 definite_error_handle.give(
403 &definite_error_cap_set[0],
404 ReplicationError::Definite(Rc::new(err)),
405 );
406 return Ok(());
407 }
408 Err(e) => Err(TransientError::from(e))?,
409 Ok(i) => i,
410 }
411 };
412
413 let worker_tables = reader_table_info
414 .iter()
415 .map(|(_, outputs)| {
416 // just use the first output's desc since the fields accessed here should
417 // be the same for all outputs
418 let desc = &outputs.values().next().expect("at least 1").0;
419 (
420 format!(
421 "{}.{}",
422 Ident::new_unchecked(desc.namespace.clone()).to_ast_string_simple(),
423 Ident::new_unchecked(desc.name.clone()).to_ast_string_simple()
424 ),
425 desc.oid.clone(),
426 outputs.len(),
427 export_statistics.get(&desc.oid).unwrap(),
428 )
429 })
430 .collect();
431
432 report_snapshot_size(&client, worker_tables, metrics, &config).await?;
433
434 for (&oid, outputs) in reader_table_info.iter() {
435 let mut table_name = None;
436 let mut output_indexes = vec![];
437 for (output_index, (expected_desc, casts)) in outputs.iter() {
438 match verify_schema(oid, expected_desc, &upstream_info, casts) {
439 Ok(()) => {
440 if table_name.is_none() {
441 table_name = Some((
442 expected_desc.namespace.clone(),
443 expected_desc.name.clone(),
444 ));
445 }
446 output_indexes.push(output_index);
447 }
448 Err(err) => {
449 raw_handle
450 .give_fueled(
451 &data_cap_set[0],
452 (
453 (oid, *output_index, Err(err.into())),
454 MzOffset::minimum(),
455 Diff::ONE,
456 ),
457 )
458 .await;
459 continue;
460 }
461 };
462 }
463
464 let (namespace, table) = match table_name {
465 Some(t) => t,
466 None => {
467 // all outputs errored for this table
468 continue;
469 }
470 };
471
472 trace!(
473 %id,
474 "timely-{worker_id} snapshotting table {:?}({oid}) @ {snapshot_lsn}",
475 table
476 );
477
478 // To handle quoted/keyword names, we can use `Ident`'s AST printing, which
479 // emulate's PG's rules for name formatting.
480 let query = format!(
481 "COPY {}.{} TO STDOUT (FORMAT TEXT, DELIMITER '\t')",
482 Ident::new_unchecked(namespace).to_ast_string_simple(),
483 Ident::new_unchecked(table).to_ast_string_simple(),
484 );
485 let mut stream = pin!(client.copy_out_simple(&query).await?);
486
487 let mut snapshot_staged = 0;
488 let mut update = ((oid, 0, Ok(vec![])), MzOffset::minimum(), Diff::ONE);
489 while let Some(bytes) = stream.try_next().await? {
490 let data = update.0 .2.as_mut().unwrap();
491 data.clear();
492 data.extend_from_slice(&bytes);
493 for output_index in &output_indexes {
494 update.0 .1 = **output_index;
495 raw_handle.give_fueled(&data_cap_set[0], &update).await;
496 }
497 snapshot_staged += 1;
498 if snapshot_staged % 1000 == 0 {
499 for export_stat in export_statistics.get(&oid).unwrap() {
500 export_stat.set_snapshot_records_staged(snapshot_staged);
501 }
502 }
503 }
504 // final update for snapshot_staged, using the staged values as the total is an estimate
505 for export_stat in export_statistics.get(&oid).unwrap() {
506 export_stat.set_snapshot_records_staged(snapshot_staged);
507 export_stat.set_snapshot_records_known(snapshot_staged);
508 }
509 }
510
511 // We are done with the snapshot so now we will emit rewind requests. It is important
512 // that this happens after the snapshot has finished because this is what unblocks the
513 // replication operator and we want this to happen serially. It might seem like a good
514 // idea to read the replication stream concurrently with the snapshot but it actually
515 // leads to a lot of data being staged for the future, which needlesly consumed memory
516 // in the cluster.
517 for output in reader_table_info.values() {
518 for (output_index, (desc, _)) in output {
519 trace!(%id, "timely-{worker_id} producing rewind request for table {} output {output_index}", desc.name);
520 let req = RewindRequest { output_index: *output_index, snapshot_lsn };
521 rewinds_handle.give(&rewind_cap_set[0], req);
522 }
523 }
524 *rewind_cap_set = CapabilitySet::new();
525
526 // Failure scenario after we have produced the snapshot, but before a successful COMMIT
527 fail::fail_point!("pg_snapshot_failure", |_| Err(
528 TransientError::SyntheticError
529 ));
530
531 // The exporting worker should wait for all the other workers to commit before dropping
532 // its client since this is what holds the exported transaction alive.
533 if is_snapshot_leader {
534 trace!(%id, "timely-{worker_id} waiting for all workers to finish");
535 *snapshot_cap_set = CapabilitySet::new();
536 while snapshot_input.next().await.is_some() {}
537 trace!(%id, "timely-{worker_id} (leader) comitting COPY transaction");
538 client.simple_query("COMMIT").await?;
539 } else {
540 trace!(%id, "timely-{worker_id} comitting COPY transaction");
541 client.simple_query("COMMIT").await?;
542 *snapshot_cap_set = CapabilitySet::new();
543 }
544 drop(client);
545 Ok(())
546 }))
547 });
548
549 // We now decode the COPY protocol and apply the cast expressions
550 let mut text_row = Row::default();
551 let mut final_row = Row::default();
552 let mut datum_vec = DatumVec::new();
553 let mut next_worker = (0..u64::cast_from(scope.peers()))
554 // Round robin on 1000-records basis to avoid creating tiny containers when there are a
555 // small number of updates and a large number of workers.
556 .flat_map(|w| std::iter::repeat_n(w, 1000))
557 .cycle();
558 let round_robin = Exchange::new(move |_| next_worker.next().unwrap());
559 let snapshot_updates = raw_data
560 .map::<Vec<_>, _, _>(Clone::clone)
561 .unary(round_robin, "PgCastSnapshotRows", |_, _| {
562 move |input, output| {
563 while let Some((time, data)) = input.next() {
564 let mut session = output.session(&time);
565 for ((oid, output_index, event), time, diff) in data.drain(..) {
566 let output = &table_info
567 .get(&oid)
568 .and_then(|outputs| outputs.get(&output_index))
569 .expect("table_info contains all outputs");
570
571 let event = event
572 .as_ref()
573 .map_err(|e: &DataflowError| e.clone())
574 .and_then(|bytes| {
575 decode_copy_row(bytes, output.casts.len(), &mut text_row)?;
576 let datums = datum_vec.borrow_with(&text_row);
577 super::cast_row(&output.casts, &datums, &mut final_row)?;
578 Ok(SourceMessage {
579 key: Row::default(),
580 value: final_row.clone(),
581 metadata: Row::default(),
582 })
583 });
584
585 session.give(((output_index, event), time, diff));
586 }
587 }
588 }
589 })
590 .as_collection();
591
592 let errors = definite_errors.concat(&transient_errors.map(ReplicationError::from));
593
594 (
595 snapshot_updates,
596 rewinds,
597 slot_ready,
598 errors,
599 button.press_on_drop(),
600 )
601}
602
603/// Starts a read-only transaction on the SQL session of `client` at a consistent LSN point by
604/// creating a replication slot. Returns a snapshot identifier that can be imported in
605/// other SQL session and the LSN of the consistent point.
606async fn export_snapshot(
607 client: &Client,
608 slot: &str,
609 temporary: bool,
610) -> Result<(String, MzOffset), TransientError> {
611 match export_snapshot_inner(client, slot, temporary).await {
612 Ok(ok) => Ok(ok),
613 Err(err) => {
614 // We don't want to leave the client inside a failed tx
615 client.simple_query("ROLLBACK;").await?;
616 Err(err)
617 }
618 }
619}
620
621async fn export_snapshot_inner(
622 client: &Client,
623 slot: &str,
624 temporary: bool,
625) -> Result<(String, MzOffset), TransientError> {
626 client
627 .simple_query("BEGIN READ ONLY ISOLATION LEVEL REPEATABLE READ;")
628 .await?;
629
630 // Note: Using unchecked here is okay because we're using it in a SQL query.
631 let slot = Ident::new_unchecked(slot).to_ast_string_simple();
632 let temporary_str = if temporary { " TEMPORARY" } else { "" };
633 let query =
634 format!("CREATE_REPLICATION_SLOT {slot}{temporary_str} LOGICAL \"pgoutput\" USE_SNAPSHOT");
635 let row = match simple_query_opt(client, &query).await {
636 Ok(row) => Ok(row.unwrap()),
637 Err(PostgresError::Postgres(err)) if err.code() == Some(&SqlState::DUPLICATE_OBJECT) => {
638 return Err(TransientError::ReplicationSlotAlreadyExists);
639 }
640 Err(err) => Err(err),
641 }?;
642
643 // When creating a replication slot postgres returns the LSN of its consistent point, which is
644 // the LSN that must be passed to `START_REPLICATION` to cleanly transition from the snapshot
645 // phase to the replication phase. `START_REPLICATION` includes all transactions that commit at
646 // LSNs *greater than or equal* to the passed LSN. Therefore the snapshot phase must happen at
647 // the greatest LSN that is not beyond the consistent point. That LSN is `consistent_point - 1`
648 let consistent_point: PgLsn = row.get("consistent_point").unwrap().parse().unwrap();
649 let consistent_point = u64::from(consistent_point)
650 .checked_sub(1)
651 .expect("consistent point is always non-zero");
652
653 let row = simple_query_opt(client, "SELECT pg_export_snapshot();")
654 .await?
655 .unwrap();
656 let snapshot = row.get("pg_export_snapshot").unwrap().to_owned();
657
658 Ok((snapshot, MzOffset::from(consistent_point)))
659}
660
661/// Starts a read-only transaction on the SQL session of `client` at a the consistent LSN point of
662/// `snapshot`.
663async fn use_snapshot(client: &Client, snapshot: &str) -> Result<(), TransientError> {
664 client
665 .simple_query("BEGIN READ ONLY ISOLATION LEVEL REPEATABLE READ;")
666 .await?;
667 let query = format!("SET TRANSACTION SNAPSHOT '{snapshot}';");
668 client.simple_query(&query).await?;
669 Ok(())
670}
671
672async fn set_statement_timeout(client: &Client, timeout: Duration) -> Result<(), TransientError> {
673 // Value is known to accept milliseconds w/o units.
674 // https://www.postgresql.org/docs/current/runtime-config-client.html
675 client
676 .simple_query(&format!("SET statement_timeout = {}", timeout.as_millis()))
677 .await?;
678 Ok(())
679}
680
681/// Decodes a row of `col_len` columns obtained from a text encoded COPY query into `row`.
682fn decode_copy_row(data: &[u8], col_len: usize, row: &mut Row) -> Result<(), DefiniteError> {
683 let mut packer = row.packer();
684 let row_parser = mz_pgcopy::CopyTextFormatParser::new(data, b'\t', "\\N");
685 let mut column_iter = row_parser.iter_raw_truncating(col_len);
686 for _ in 0..col_len {
687 let value = match column_iter.next() {
688 Some(Ok(value)) => value,
689 Some(Err(_)) => return Err(DefiniteError::InvalidCopyInput),
690 None => return Err(DefiniteError::MissingColumn),
691 };
692 let datum = value.map(super::decode_utf8_text).transpose()?;
693 packer.push(datum.unwrap_or(Datum::Null));
694 }
695 Ok(())
696}
697
698/// Record the sizes of the tables being snapshotted in `PgSnapshotMetrics` and emit snapshot statistics for each export.
699async fn report_snapshot_size(
700 client: &Client,
701 // The table names, oids, number of outputs, and export_ids for this table owned by this worker.
702 tables: Vec<(String, Oid, usize, &Vec<SourceStatistics>)>,
703 metrics: PgSnapshotMetrics,
704 config: &RawSourceCreationConfig,
705) -> Result<(), anyhow::Error> {
706 // TODO(guswynn): delete unused configs
707 let snapshot_config = config.config.parameters.pg_snapshot_config;
708
709 for (table, oid, _, export_stats) in tables {
710 let stats = collect_table_statistics(client, snapshot_config, &table, oid).await?;
711 metrics.record_table_count_latency(table, stats.count_latency);
712 for export_stat in export_stats {
713 export_stat.set_snapshot_records_known(stats.count);
714 export_stat.set_snapshot_records_staged(0);
715 }
716 }
717 Ok(())
718}
719
720#[derive(Default)]
721struct TableStatistics {
722 count: u64,
723 count_latency: f64,
724}
725
726async fn collect_table_statistics(
727 client: &Client,
728 config: PgSourceSnapshotConfig,
729 table: &str,
730 oid: u32,
731) -> Result<TableStatistics, anyhow::Error> {
732 use mz_ore::metrics::MetricsFutureExt;
733 let mut stats = TableStatistics::default();
734
735 let estimate_row = simple_query_opt(
736 client,
737 &format!("SELECT reltuples::bigint AS estimate_count FROM pg_class WHERE oid = '{oid}'"),
738 )
739 .wall_time()
740 .set_at(&mut stats.count_latency)
741 .await?;
742 stats.count = match estimate_row {
743 Some(row) => row.get("estimate_count").unwrap().parse().unwrap_or(0),
744 None => bail!("failed to get estimate count for {table}"),
745 };
746
747 // If the estimate is low enough we can attempt to get an exact count. Note that not yet
748 // vacuumed tables will report zero rows here and there is a possibility that they are very
749 // large. We accept this risk and we offer the feature flag as an escape hatch if it becomes
750 // problematic.
751 if config.collect_strict_count && stats.count < 1_000_000 {
752 let count_row = simple_query_opt(client, &format!("SELECT count(*) as count from {table}"))
753 .wall_time()
754 .set_at(&mut stats.count_latency)
755 .await?;
756 stats.count = match count_row {
757 Some(row) => row.get("count").unwrap().parse().unwrap(),
758 None => bail!("failed to get count for {table}"),
759 }
760 }
761
762 Ok(stats)
763}