mz_testdrive/action/sql_server/
execute.rs1use std::time::Duration;
11
12use anyhow::{Context, anyhow};
13use mz_ore::str::StrExt;
14
15use crate::action::{ControlFlow, State};
16use crate::parser::BuiltinCommand;
17
18fn is_retryable_error(err: &anyhow::Error) -> bool {
27 let msg = format!("{:#}", err);
30 (msg.contains("1205") && msg.contains("deadlock"))
31 || msg.contains("SQLServerAgent is starting")
32 || msg.contains("cannot be autostarted during server shutdown or startup")
33}
34
35const MAX_RETRIES: usize = 20;
37
38const RETRY_BACKOFF: Duration = Duration::from_millis(100);
40
41async fn execute_with_retry(
42 client: &mut mz_sql_server_util::Client,
43 query: &str,
44) -> Result<(), anyhow::Error> {
45 for attempt in 0..=MAX_RETRIES {
46 match client
47 .simple_query(query.to_string())
48 .await
49 .context("executing SQL Server query")
50 {
51 Ok(_) => return Ok(()),
52 Err(err) if is_retryable_error(&err) && attempt < MAX_RETRIES => {
53 println!(
54 ">> transient error (attempt {}/{}), retrying after {:?}: {:#}",
55 attempt + 1,
56 MAX_RETRIES,
57 RETRY_BACKOFF,
58 err,
59 );
60 tokio::time::sleep(RETRY_BACKOFF).await;
61 }
62 Err(err) => return Err(err),
63 }
64 }
65 unreachable!()
66}
67
68pub async fn run_execute(
69 mut cmd: BuiltinCommand,
70 state: &mut State,
71) -> Result<ControlFlow, anyhow::Error> {
72 let name = cmd.args.string("name")?;
73 let split_lines = cmd.args.opt_bool("split-lines")?.unwrap_or(true);
74 let abandon_txn = cmd.args.opt_bool("abandon-txn")?.unwrap_or(false);
78 cmd.args.done()?;
79
80 let client = state
81 .sql_server_clients
82 .get_mut(&name)
83 .ok_or_else(|| anyhow!("connection {} not found", name.quoted()))?;
84
85 if abandon_txn {
86 let mut txn = client
87 .transaction()
88 .await
89 .context("begin transaction for abandon-txn")?;
90 if split_lines {
91 for query in &cmd.input {
92 println!(">> (abandon-txn) {}", query);
93 txn.simple_query(query.to_string())
94 .await
95 .context("executing SQL Server query in transaction")?;
96 }
97 } else {
98 let query = cmd.input.join("\n");
99 println!(">> (abandon-txn) {}", query);
100 txn.simple_query(query)
101 .await
102 .context("executing SQL Server query in transaction")?;
103 }
104 } else {
107 if split_lines {
108 for query in &cmd.input {
109 println!(">> {}", query);
110 execute_with_retry(client, query).await?;
111 }
112 } else {
113 let query = cmd.input.join("\n");
114 println!(">> {}", query);
115 execute_with_retry(client, &query).await?;
118 }
119 }
120
121 Ok(ControlFlow::Continue)
122}