mz_testdrive/action/sql_server/
execute.rs1use std::time::Duration;
11
12use anyhow::{Context, anyhow};
13use mz_ore::str::StrExt;
14
15use crate::action::{ControlFlow, State};
16use crate::parser::BuiltinCommand;
17
18fn is_deadlock_error(err: &anyhow::Error) -> bool {
20 let msg = format!("{:#}", err);
23 msg.contains("1205") && msg.contains("deadlock")
25}
26
27const DEADLOCK_MAX_RETRIES: usize = 5;
29
30const DEADLOCK_INITIAL_BACKOFF: Duration = Duration::from_millis(500);
32
33async fn execute_with_deadlock_retry(
34 client: &mut mz_sql_server_util::Client,
35 query: &str,
36) -> Result<(), anyhow::Error> {
37 let mut backoff = DEADLOCK_INITIAL_BACKOFF;
38 for attempt in 0..=DEADLOCK_MAX_RETRIES {
39 match client
40 .simple_query(query.to_string())
41 .await
42 .context("executing SQL Server query")
43 {
44 Ok(_) => return Ok(()),
45 Err(err) if is_deadlock_error(&err) && attempt < DEADLOCK_MAX_RETRIES => {
46 println!(
47 ">> deadlock detected (attempt {}/{}), retrying after {:?}",
48 attempt + 1,
49 DEADLOCK_MAX_RETRIES,
50 backoff,
51 );
52 tokio::time::sleep(backoff).await;
53 backoff *= 2;
54 }
55 Err(err) => return Err(err),
56 }
57 }
58 unreachable!()
59}
60
61pub async fn run_execute(
62 mut cmd: BuiltinCommand,
63 state: &mut State,
64) -> Result<ControlFlow, anyhow::Error> {
65 let name = cmd.args.string("name")?;
66 let split_lines = cmd.args.opt_bool("split-lines")?.unwrap_or(true);
67 cmd.args.done()?;
68
69 let client = state
70 .sql_server_clients
71 .get_mut(&name)
72 .ok_or_else(|| anyhow!("connection {} not found", name.quoted()))?;
73
74 if split_lines {
75 for query in &cmd.input {
76 println!(">> {}", query);
77 execute_with_deadlock_retry(client, query).await?;
78 }
79 } else {
80 let query = cmd.input.join("\n");
81 println!(">> {}", query);
82 execute_with_deadlock_retry(client, &query).await?;
85 }
86
87 Ok(ControlFlow::Continue)
88}