mz_testdrive/action/postgres/
verify_slot.rs1use std::cmp;
11use std::time::Duration;
12
13use anyhow::{Context, bail};
14use mz_ore::retry::Retry;
15use mz_postgres_util::{query, sql};
16
17use crate::action::{ControlFlow, State};
18use crate::parser::BuiltinCommand;
19use crate::util::postgres::postgres_client;
20
21pub async fn run_verify_slot(
22 mut cmd: BuiltinCommand,
23 state: &State,
24) -> Result<ControlFlow, anyhow::Error> {
25 let connection = cmd.args.string("connection")?;
26 let slot = cmd.args.string("slot")?;
27 let expect_active: bool = cmd.args.parse("active")?;
28 cmd.args.done()?;
29
30 let (client, conn_handle) = postgres_client(&connection, state.default_timeout).await?;
31
32 Retry::default()
33 .initial_backoff(Duration::from_millis(50))
34 .max_duration(cmp::max(state.default_timeout, Duration::from_secs(60)))
35 .retry_async_canceling(|_| async {
36 println!(">> checking for postgres replication slot {}", &slot);
37 let rows = query(
38 &client,
39 sql!("SELECT active_pid FROM pg_replication_slots WHERE slot_name LIKE $1::TEXT"),
40 &[&slot],
41 )
42 .await
43 .context("querying postgres for replication slot")?;
44
45 if rows.len() != 1 {
46 bail!(
47 "expected entry for slot {} in pg_replication slots, found {}",
48 &slot,
49 rows.len()
50 );
51 }
52 let active_pid: Option<i32> = rows[0].get(0);
53 match (expect_active, active_pid) {
54 (true, None) => bail!("expected slot {slot} to be active, is inactive"),
55 (false, Some(pid)) => {
56 bail!("expected slot {slot} to be inactive, is active for pid {pid}")
57 }
58 _ => {}
59 };
60 Ok(())
61 })
62 .await?;
63
64 drop(client);
65 conn_handle.await.context("postgres connection error")?;
66
67 Ok(ControlFlow::Continue)
68}