1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767
// Copyright Materialize, Inc. and contributors. All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.
//! Renders the table snapshot side of the [`PostgresSourceConnection`] ingestion dataflow.
//!
//! # Snapshot reading
//!
//! Depending on the resumption LSNs the table reader decides which tables need to be snapshotted
//! and performs a simple `COPY` query on them in order to get a snapshot. There are a few subtle
//! points about this operation, described in the following sections.
//!
//! ## Consistent LSN point for snapshot transactions
//!
//! Given that all our ingestion is based on correctly timestamping updates with the LSN they
//! happened at it is important that we run the `COPY` query at a specific LSN point that is
//! relatable with the LSN numbers we receive from the replication stream. Such point does not
//! necessarily exist for a normal SQL transaction. To achieve this we must force postgres to
//! produce a consistent point and let us know of the LSN number of that by creating a replication
//! slot as the first statement in a transaction.
//!
//! This is a temporary dummy slot that is only used to put our snapshot transaction on a
//! consistent LSN point. Unfortunately no lighterweight method exists for doing this. See this
//! [postgres thread] for more details.
//!
//! One might wonder why we don't use the actual real slot to provide us with the snapshot point
//! which would automatically be at the correct LSN. The answer is that it's possible that we crash
//! and restart after having already created the slot but before having finished the snapshot. In
//! that case the restarting process will have lost its opportunity to run queries at the slot's
//! consistent point as that opportunity only exists in the ephemeral transaction that created the
//! slot and that is long gone. Additionally there are good reasons of why we'd like to move the
//! slot creation much earlier, e.g during purification, in which case the slot will always be
//! pre-created.
//!
//! [postgres thread]: https://www.postgresql.org/message-id/flat/CAMN0T-vzzNy6TV1Jvh4xzNQdAvCLBQK_kh6_U7kAXgGU3ZFg-Q%40mail.gmail.com
//!
//! ## Reusing the consistent point among all workers
//!
//! Creating replication slots is potentially expensive so the code makes is such that all workers
//! cooperate and reuse one consistent snapshot among them. In order to do so we make use the
//! "export transaction" feature of postgres. This feature allows one SQL session to create an
//! identifier for the transaction (a string identifier) it is currently in, which can be used by
//! other sessions to enter the same "snapshot".
//!
//! We accomplish this by picking one worker at random to function as the transaction leader. The
//! transaction leader is responsible for starting a SQL session, creating a temporary replication
//! slot in a transaction, exporting the transaction id, and broadcasting the transaction
//! information to all other workers via a broadcasted feedback edge.
//!
//! During this phase the follower workers are simply waiting to hear on the feedback edge,
//! effectively synchronizing with the leader. Once all workers have received the snapshot
//! information they can all start to perform their assigned COPY queries.
//!
//! The leader and follower steps described above are accomplished by the [`export_snapshot`] and
//! [`use_snapshot`] functions respectively.
//!
//! ## Coordinated transaction COMMIT
//!
//! When follower workers are done with snapshotting they commit their transaction, close their
//! session, and then drop their snapshot feedback capability. When the leader worker is done with
//! snapshotting it drops its snapshot feedback capability and waits until it observes the
//! snapshot input advancing to the empty frontier. This allows the leader to COMMIT its
//! transaction last, which is the transaction that exported the snapshot.
//!
//! It's unclear if this is strictly necessary, but having the frontiers made it easy enough that I
//! added the synchronization.
//!
//! ## Snapshot rewinding
//!
//! Ingestion dataflows must produce definite data, including the snapshot. What this means
//! practically is that whenever we deem it necessary to snapshot a table we must do so at the same
//! LSN. However, the method for running a transaction described above doesn't let us choose the
//! LSN, it could be an LSN in the future chosen by PostgresSQL while it creates the temporary
//! replication slot.
//!
//! The definition of differential collections states that a collection at some time `t_snapshot`
//! is defined to be the accumulation of all updates that happen at `t <= t_snapshot`, where `<=`
//! is the partial order. In this case we are faced with the problem of knowing the state of a
//! table at `t_snapshot` but actually wanting to know the snapshot at `t_slot <= t_snapshot`.
//!
//! From the definition we can see that the snapshot at `t_slot` is related to the snapshot at
//! `t_snapshot` with the following equations:
//!
//!```text
//! sum(update: t <= t_snapshot) = sum(update: t <= t_slot) + sum(update: t_slot <= t <= t_snapshot)
//! |
//! V
//! sum(update: t <= t_slot) = sum(update: t <= snapshot) - sum(update: t_slot <= t <= t_snapshot)
//! ```
//!
//! Therefore, if we manage to recover the `sum(update: t_slot <= t <= t_snapshot)` term we will be
//! able to "rewind" the snapshot we obtained at `t_snapshot` to `t_slot` by emitting all updates
//! that happen between these two points with their diffs negated.
//!
//! It turns out that this term is exactly what the main replication slot provides us with and we
//! can rewind snapshot at arbitrary points! In order to do this the snapshot dataflow emits rewind
//! requests to the replication reader which informs it that a certain range of updates must be
//! emitted at LSN 0 (by convention) with their diffs negated. These negated diffs are consolidated
//! with the diffs taken at `t_snapshot` that were also emitted at LSN 0 (by convention) and we end
//! up with a TVC that at LSN 0 contains the snapshot at `t_slot`.
//!
//! # Snapshot decoding
//!
//! The expectation is that tables will most likely be skewed on the number of rows they contain so
//! while a `COPY` query for any given table runs on a single worker the decoding of the COPY
//! stream is distributed to all workers.
//!
//!
//! ```text
//! ╭──────────────────╮
//! ┏━━━━━━━━━━━━v━┓ │ exported
//! ┃ table ┃ ╭─────────╮ │ snapshot id
//! ┃ reader ┠─>─┤broadcast├──╯
//! ┗━┯━━━━━━━━━━┯━┛ ╰─────────╯
//! raw│ │
//! COPY│ │
//! data│ │
//! ╭────┴─────╮ │
//! │distribute│ │
//! ╰────┬─────╯ │
//! ┏━━━━┷━━━━┓ │
//! ┃ COPY ┃ │
//! ┃ decoder ┃ │
//! ┗━━━━┯━━━━┛ │
//! │ snapshot │rewind
//! │ updates │requests
//! v v
//! ```
use std::collections::BTreeMap;
use std::convert::Infallible;
use std::pin::pin;
use std::rc::Rc;
use std::sync::Arc;
use std::time::Duration;
use anyhow::bail;
use differential_dataflow::AsCollection;
use futures::{StreamExt as _, TryStreamExt};
use mz_ore::cast::CastFrom;
use mz_ore::future::InTask;
use mz_postgres_util::tunnel::PostgresFlavor;
use mz_postgres_util::{simple_query_opt, Client, PostgresError};
use mz_repr::{Datum, DatumVec, Row};
use mz_sql_parser::ast::{display::AstDisplay, Ident};
use mz_storage_types::errors::DataflowError;
use mz_storage_types::sources::{MzOffset, PostgresSourceConnection};
use mz_timely_util::builder_async::{
Event as AsyncEvent, OperatorBuilder as AsyncOperatorBuilder, PressOnDropButton,
};
use mz_timely_util::operator::StreamExt;
use timely::container::CapacityContainerBuilder;
use timely::dataflow::channels::pact::Pipeline;
use timely::dataflow::operators::core::Map;
use timely::dataflow::operators::{Broadcast, CapabilitySet, Concat, ConnectLoop, Feedback};
use timely::dataflow::{Scope, Stream};
use timely::progress::Timestamp;
use tokio_postgres::error::SqlState;
use tokio_postgres::types::{Oid, PgLsn};
use tracing::{error, trace};
use crate::metrics::source::postgres::PgSnapshotMetrics;
use crate::source::postgres::replication::RewindRequest;
use crate::source::postgres::{
verify_schema, DefiniteError, ReplicationError, SourceOutputInfo, TransientError,
};
use crate::source::types::{
ProgressStatisticsUpdate, SignaledFuture, SourceMessage, StackedCollection,
};
use crate::source::RawSourceCreationConfig;
/// Renders the snapshot dataflow. See the module documentation for more information.
pub(crate) fn render<G: Scope<Timestamp = MzOffset>>(
mut scope: G,
config: RawSourceCreationConfig,
connection: PostgresSourceConnection,
table_info: BTreeMap<u32, BTreeMap<usize, SourceOutputInfo>>,
metrics: PgSnapshotMetrics,
) -> (
StackedCollection<G, (usize, Result<SourceMessage, DataflowError>)>,
Stream<G, RewindRequest>,
Stream<G, Infallible>,
Stream<G, ProgressStatisticsUpdate>,
Stream<G, ReplicationError>,
PressOnDropButton,
) {
let op_name = format!("TableReader({})", config.id);
let mut builder = AsyncOperatorBuilder::new(op_name, scope.clone());
let (feedback_handle, feedback_data) = scope.feedback(Default::default());
let (raw_handle, raw_data) = builder.new_output();
let (rewinds_handle, rewinds) = builder.new_output();
// This output is used to signal to the replication operator that the replication slot has been
// created. With the current state of execution serialization there isn't a lot of benefit
// of splitting the snapshot and replication phases into two operators.
// TODO(petrosagg): merge the two operators in one (while still maintaining separation as
// functions/modules)
let (_, slot_ready) = builder.new_output::<CapacityContainerBuilder<_>>();
let (snapshot_handle, snapshot) = builder.new_output();
let (definite_error_handle, definite_errors) = builder.new_output();
let (stats_output, stats_stream) = builder.new_output();
// This operator needs to broadcast data to itself in order to synchronize the transaction
// snapshot. However, none of the feedback capabilities result in output messages and for the
// feedback edge specifically having a default conncetion would result in a loop.
let mut snapshot_input = builder.new_disconnected_input(&feedback_data, Pipeline);
// The export id must be sent to all workes, so we broadcast the feedback connection
snapshot.broadcast().connect_loop(feedback_handle);
let is_snapshot_leader = config.responsible_for("snapshot_leader");
// A global view of all outputs that will be snapshot by all workers.
let mut all_outputs = vec![];
// A filtered table info containing only the tables that this worker should snapshot.
let mut reader_table_info = BTreeMap::new();
for (table, outputs) in table_info.iter() {
for (&output_index, output) in outputs {
if *output.resume_upper != [MzOffset::minimum()] {
// Already has been snapshotted.
continue;
}
all_outputs.push(output_index);
if config.responsible_for(*table) {
reader_table_info
.entry(*table)
.or_insert_with(BTreeMap::new)
.insert(output_index, (output.desc.clone(), output.casts.clone()));
}
}
}
let (button, transient_errors) = builder.build_fallible(move |caps| {
let busy_signal = Arc::clone(&config.busy_signal);
Box::pin(SignaledFuture::new(busy_signal, async move {
let id = config.id;
let worker_id = config.worker_id;
let [
data_cap_set,
rewind_cap_set,
slot_ready_cap_set,
snapshot_cap_set,
definite_error_cap_set,
stats_cap,
]: &mut [_; 6] = caps.try_into().unwrap();
trace!(
%id,
"timely-{worker_id} initializing table reader \
with {} tables to snapshot",
reader_table_info.len()
);
// Nothing needs to be snapshot.
if all_outputs.is_empty() {
trace!(%id, "no exports to snapshot");
// Note we do not emit a `ProgressStatisticsUpdate::Snapshot` update here,
// as we do not want to attempt to override the current value with 0. We
// just leave it null.
return Ok(());
}
let connection_config = connection
.connection
.config(
&config.config.connection_context.secrets_reader,
&config.config,
InTask::Yes,
)
.await?;
let task_name = format!("timely-{worker_id} PG snapshotter");
let client = if is_snapshot_leader {
let client = connection_config
.connect_replication(&config.config.connection_context.ssh_tunnel_manager)
.await?;
// Attempt to export the snapshot by creating the main replication slot. If that
// succeeds then there is no need for creating additional temporary slots.
let main_slot = &connection.publication_details.slot;
let snapshot_info = match export_snapshot(&client, main_slot, false).await {
Ok(info) => info,
Err(err @ TransientError::ReplicationSlotAlreadyExists) => {
match connection.connection.flavor {
// If we're connecting to a vanilla we have the option of exporting a
// snapshot via a temporary slot
PostgresFlavor::Vanilla => {
let tmp_slot = format!(
"mzsnapshot_{}",
uuid::Uuid::new_v4()).replace('-', ""
);
export_snapshot(&client, &tmp_slot, true).await?
}
// No salvation for Yugabyte
PostgresFlavor::Yugabyte => return Err(err),
}
}
Err(err) => return Err(err),
};
trace!(
%id,
"timely-{worker_id} exporting snapshot info {snapshot_info:?}");
snapshot_handle.give(&snapshot_cap_set[0], snapshot_info);
client
} else {
// Only the snapshot leader needs a replication connection.
connection_config
.connect(
&task_name,
&config.config.connection_context.ssh_tunnel_manager,
)
.await?
};
*slot_ready_cap_set = CapabilitySet::new();
// Configure statement_timeout based on param. We want to be able to
// override the server value here in case it's set too low,
// respective to the size of the data we need to copy.
set_statement_timeout(
&client,
config
.config
.parameters
.pg_source_snapshot_statement_timeout,
)
.await?;
let (snapshot, snapshot_lsn) = loop {
match snapshot_input.next().await {
Some(AsyncEvent::Data(_, mut data)) => {
break data.pop().expect("snapshot sent above")
}
Some(AsyncEvent::Progress(_)) => continue,
None => panic!(
"feedback closed \
before sending snapshot info"
),
}
};
// Snapshot leader is already in identified transaction but all other workers need to enter it.
if !is_snapshot_leader {
trace!(%id, "timely-{worker_id} using snapshot id {snapshot:?}");
use_snapshot(&client, &snapshot).await?;
}
let upstream_info = {
let schema_client = connection_config
.connect(
"snapshot schema info",
&config.config.connection_context.ssh_tunnel_manager,
)
.await?;
match mz_postgres_util::publication_info(&schema_client, &connection.publication)
.await
{
// If the replication stream cannot be obtained in a definite way there is
// nothing else to do. These errors are not retractable.
Err(PostgresError::PublicationMissing(publication)) => {
let err = DefiniteError::PublicationDropped(publication);
for (oid, outputs) in reader_table_info.iter() {
// Produce a definite error here and then exit to ensure
// a missing publication doesn't generate a transient
// error and restart this dataflow indefinitely.
//
// We pick `u64::MAX` as the LSN which will (in
// practice) never conflict any previously revealed
// portions of the TVC.
for output_index in outputs.keys() {
let update = (
(*oid, *output_index, Err(err.clone().into())),
MzOffset::from(u64::MAX),
1,
);
raw_handle.give_fueled(&data_cap_set[0], update).await;
}
}
definite_error_handle.give(
&definite_error_cap_set[0],
ReplicationError::Definite(Rc::new(err)),
);
return Ok(());
}
Err(e) => Err(TransientError::from(e))?,
Ok(i) => i,
}
};
let upstream_info = upstream_info.into_iter().map(|t| (t.oid, t)).collect();
let worker_tables = reader_table_info
.iter()
.map(|(_, outputs)| {
// just use the first output's desc since the fields accessed here should
// be the same for all outputs
let desc = &outputs.values().next().expect("at least 1").0;
(
format!(
"{}.{}",
Ident::new_unchecked(desc.namespace.clone()).to_ast_string(),
Ident::new_unchecked(desc.name.clone()).to_ast_string()
),
desc.oid.clone(),
outputs.len(),
)
})
.collect();
let snapshot_total =
fetch_snapshot_size(&client, worker_tables, metrics, &config).await?;
stats_output.give(
&stats_cap[0],
ProgressStatisticsUpdate::Snapshot {
records_known: snapshot_total,
records_staged: 0,
},
);
let mut snapshot_staged = 0;
for (&oid, outputs) in reader_table_info.iter() {
let mut table_name = None;
let mut output_indexes = vec![];
for (output_index, (expected_desc, casts)) in outputs.iter() {
match verify_schema(oid, expected_desc, &upstream_info, casts) {
Ok(()) => {
if table_name.is_none() {
table_name = Some((
expected_desc.namespace.clone(),
expected_desc.name.clone(),
));
}
output_indexes.push(output_index);
}
Err(err) => {
raw_handle
.give_fueled(
&data_cap_set[0],
(
(oid, *output_index, Err(err.into())),
MzOffset::minimum(),
1,
),
)
.await;
continue;
}
};
}
let (namespace, table) = match table_name {
Some(t) => t,
None => {
// all outputs errored for this table
continue;
}
};
trace!(
%id,
"timely-{worker_id} snapshotting table {:?}({oid}) @ {snapshot_lsn}",
table
);
// To handle quoted/keyword names, we can use `Ident`'s AST printing, which
// emulate's PG's rules for name formatting.
let query = format!(
"COPY {}.{} TO STDOUT (FORMAT TEXT, DELIMITER '\t')",
Ident::new_unchecked(namespace).to_ast_string(),
Ident::new_unchecked(table).to_ast_string(),
);
let mut stream = pin!(client.copy_out_simple(&query).await?);
let mut update = ((oid, 0, Ok(vec![])), MzOffset::minimum(), 1);
while let Some(bytes) = stream.try_next().await? {
let data = update.0 .2.as_mut().unwrap();
data.clear();
data.extend_from_slice(&bytes);
for output_index in &output_indexes {
update.0 .1 = **output_index;
raw_handle.give_fueled(&data_cap_set[0], &update).await;
snapshot_staged += 1;
// TODO(guswynn): does this 1000 need to be configurable?
if snapshot_staged % 1000 == 0 {
stats_output.give(
&stats_cap[0],
ProgressStatisticsUpdate::Snapshot {
records_known: snapshot_total,
records_staged: snapshot_staged,
},
);
}
}
}
}
// We are done with the snapshot so now we will emit rewind requests. It is important
// that this happens after the snapshot has finished because this is what unblocks the
// replication operator and we want this to happen serially. It might seem like a good
// idea to read the replication stream concurrently with the snapshot but it actually
// leads to a lot of data being staged for the future, which needlesly consumed memory
// in the cluster.
for output in reader_table_info.values() {
for (output_index, (desc, _)) in output {
trace!(%id, "timely-{worker_id} producing rewind request for table {} output {output_index}", desc.name);
let req = RewindRequest { output_index: *output_index, snapshot_lsn };
rewinds_handle.give(&rewind_cap_set[0], req);
}
}
*rewind_cap_set = CapabilitySet::new();
if snapshot_staged < snapshot_total {
error!(%id, "timely-{worker_id} snapshot size {snapshot_total} is somehow
bigger than records staged {snapshot_staged}");
snapshot_staged = snapshot_total;
}
stats_output.give(
&stats_cap[0],
ProgressStatisticsUpdate::Snapshot {
records_known: snapshot_total,
records_staged: snapshot_staged,
},
);
// Failure scenario after we have produced the snapshot, but before a successful COMMIT
fail::fail_point!("pg_snapshot_failure", |_| Err(
TransientError::SyntheticError
));
// The exporting worker should wait for all the other workers to commit before dropping
// its client since this is what holds the exported transaction alive.
if is_snapshot_leader {
trace!(%id, "timely-{worker_id} waiting for all workers to finish");
*snapshot_cap_set = CapabilitySet::new();
while snapshot_input.next().await.is_some() {}
trace!(%id, "timely-{worker_id} (leader) comitting COPY transaction");
client.simple_query("COMMIT").await?;
} else {
trace!(%id, "timely-{worker_id} comitting COPY transaction");
client.simple_query("COMMIT").await?;
*snapshot_cap_set = CapabilitySet::new();
}
drop(client);
Ok(())
}))
});
// We now decode the COPY protocol and apply the cast expressions
let mut text_row = Row::default();
let mut final_row = Row::default();
let mut datum_vec = DatumVec::new();
let snapshot_updates = raw_data
.distribute()
.map(move |((oid, output_index, event), time, diff)| {
let output = &table_info
.get(oid)
.and_then(|outputs| outputs.get(output_index))
.expect("table_info contains all outputs");
let event = event
.as_ref()
.map_err(|e: &DataflowError| e.clone())
.and_then(|bytes| {
decode_copy_row(bytes, output.casts.len(), &mut text_row)?;
let datums = datum_vec.borrow_with(&text_row);
super::cast_row(&output.casts, &datums, &mut final_row)?;
Ok(SourceMessage {
key: Row::default(),
value: final_row.clone(),
metadata: Row::default(),
})
});
((*output_index, event), *time, *diff)
})
.as_collection();
let errors = definite_errors.concat(&transient_errors.map(ReplicationError::from));
(
snapshot_updates,
rewinds,
slot_ready,
stats_stream,
errors,
button.press_on_drop(),
)
}
/// Starts a read-only transaction on the SQL session of `client` at a consistent LSN point by
/// creating a replication slot. Returns a snapshot identifier that can be imported in
/// other SQL session and the LSN of the consistent point.
async fn export_snapshot(
client: &Client,
slot: &str,
temporary: bool,
) -> Result<(String, MzOffset), TransientError> {
match export_snapshot_inner(client, slot, temporary).await {
Ok(ok) => Ok(ok),
Err(err) => {
// We don't want to leave the client inside a failed tx
client.simple_query("ROLLBACK;").await?;
Err(err)
}
}
}
async fn export_snapshot_inner(
client: &Client,
slot: &str,
temporary: bool,
) -> Result<(String, MzOffset), TransientError> {
client
.simple_query("BEGIN READ ONLY ISOLATION LEVEL REPEATABLE READ;")
.await?;
// Note: Using unchecked here is okay because we're using it in a SQL query.
let slot = Ident::new_unchecked(slot).to_ast_string();
let temporary_str = if temporary { " TEMPORARY" } else { "" };
let query =
format!("CREATE_REPLICATION_SLOT {slot}{temporary_str} LOGICAL \"pgoutput\" USE_SNAPSHOT");
let row = match simple_query_opt(client, &query).await {
Ok(row) => Ok(row.unwrap()),
Err(PostgresError::Postgres(err)) if err.code() == Some(&SqlState::DUPLICATE_OBJECT) => {
return Err(TransientError::ReplicationSlotAlreadyExists)
}
Err(err) => Err(err),
}?;
// When creating a replication slot postgres returns the LSN of its consistent point, which is
// the LSN that must be passed to `START_REPLICATION` to cleanly transition from the snapshot
// phase to the replication phase. `START_REPLICATION` includes all transactions that commit at
// LSNs *greater than or equal* to the passed LSN. Therefore the snapshot phase must happen at
// the greatest LSN that is not beyond the consistent point. That LSN is `consistent_point - 1`
let consistent_point: PgLsn = row.get("consistent_point").unwrap().parse().unwrap();
let consistent_point = u64::from(consistent_point)
.checked_sub(1)
.expect("consistent point is always non-zero");
let row = simple_query_opt(client, "SELECT pg_export_snapshot();")
.await?
.unwrap();
let snapshot = row.get("pg_export_snapshot").unwrap().to_owned();
Ok((snapshot, MzOffset::from(consistent_point)))
}
/// Starts a read-only transaction on the SQL session of `client` at a the consistent LSN point of
/// `snapshot`.
async fn use_snapshot(client: &Client, snapshot: &str) -> Result<(), TransientError> {
client
.simple_query("BEGIN READ ONLY ISOLATION LEVEL REPEATABLE READ;")
.await?;
let query = format!("SET TRANSACTION SNAPSHOT '{snapshot}';");
client.simple_query(&query).await?;
Ok(())
}
async fn set_statement_timeout(client: &Client, timeout: Duration) -> Result<(), TransientError> {
// Value is known to accept milliseconds w/o units.
// https://www.postgresql.org/docs/current/runtime-config-client.html
client
.simple_query(&format!("SET statement_timeout = {}", timeout.as_millis()))
.await?;
Ok(())
}
/// Decodes a row of `col_len` columns obtained from a text encoded COPY query into `row`.
fn decode_copy_row(data: &[u8], col_len: usize, row: &mut Row) -> Result<(), DefiniteError> {
let mut packer = row.packer();
let row_parser = mz_pgcopy::CopyTextFormatParser::new(data, b'\t', "\\N");
let mut column_iter = row_parser.iter_raw_truncating(col_len);
for _ in 0..col_len {
let value = match column_iter.next() {
Some(Ok(value)) => value,
Some(Err(_)) => return Err(DefiniteError::InvalidCopyInput),
None => return Err(DefiniteError::MissingColumn),
};
let datum = value.map(super::decode_utf8_text).transpose()?;
packer.push(datum.unwrap_or(Datum::Null));
}
Ok(())
}
/// Record the sizes of the tables being snapshotted in `PgSnapshotMetrics`.
async fn fetch_snapshot_size(
client: &Client,
// The table names, oids, and number of outputs for this table owned by this worker.
tables: Vec<(String, Oid, usize)>,
metrics: PgSnapshotMetrics,
config: &RawSourceCreationConfig,
) -> Result<u64, anyhow::Error> {
// TODO(guswynn): delete unused configs
let snapshot_config = config.config.parameters.pg_snapshot_config;
let mut total = 0;
for (table, oid, output_count) in tables {
let stats =
collect_table_statistics(client, snapshot_config.collect_strict_count, &table, oid)
.await?;
metrics.record_table_count_latency(
table,
stats.count_latency,
snapshot_config.collect_strict_count,
);
total += stats.count * u64::cast_from(output_count);
}
Ok(total)
}
#[derive(Default)]
struct TableStatistics {
count: u64,
count_latency: f64,
}
async fn collect_table_statistics(
client: &Client,
strict: bool,
table: &str,
oid: u32,
) -> Result<TableStatistics, anyhow::Error> {
use mz_ore::metrics::MetricsFutureExt;
let mut stats = TableStatistics::default();
if strict {
let count_row = simple_query_opt(client, &format!("SELECT count(*) as count from {table}"))
.wall_time()
.set_at(&mut stats.count_latency)
.await?;
match count_row {
Some(row) => {
let count: i64 = row.get("count").unwrap().parse().unwrap();
stats.count = count.try_into()?;
}
None => bail!("failed to get count for {table}"),
}
} else {
let estimate_row = simple_query_opt(
client,
&format!(
"SELECT reltuples::bigint AS estimate_count FROM pg_class WHERE oid = '{oid}'"
),
)
.wall_time()
.set_at(&mut stats.count_latency)
.await?;
match estimate_row {
Some(row) => match row.get("estimate_count").unwrap().parse().unwrap() {
-1 => stats.count = 0,
n => stats.count = n.try_into()?,
},
None => bail!("failed to get estimate count for {table}"),
};
}
Ok(stats)
}