mz_testdrive/action/
webhook.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use anyhow::bail;
11
12use crate::action::{ControlFlow, State};
13use crate::parser::BuiltinCommand;
14
15pub async fn run_append(
16    mut cmd: BuiltinCommand,
17    state: &State,
18) -> Result<ControlFlow, anyhow::Error> {
19    let database = cmd
20        .args
21        .opt_string("database")
22        .unwrap_or_else(|| "materialize".to_string());
23    let schema = cmd
24        .args
25        .opt_string("schema")
26        .unwrap_or_else(|| "public".to_string());
27    let name = cmd.args.string("name")?;
28
29    let status_code = cmd.args.opt_parse::<u16>("status")?;
30    // Interpret the remaining arguments as headers.
31    let headers: Vec<(String, String)> = cmd.args.into_iter().collect();
32
33    let body = cmd.input.join("\n");
34
35    println!("$ webhook-append {database}.{schema}.{name}\n{body}\n{headers:?}");
36
37    let client = if state.materialize.use_https {
38        reqwest::Client::builder()
39            .danger_accept_invalid_certs(true)
40            .build()
41            .unwrap()
42    } else {
43        reqwest::Client::new()
44    };
45    let url = format!(
46        "{}://{}/api/webhook/{database}/{schema}/{name}",
47        if state.materialize.use_https {
48            "https"
49        } else {
50            "http"
51        },
52        state.materialize.http_addr
53    );
54    let mut builder = client.post(url).body(body);
55
56    // Append all of our headers.
57    for (name, value) in headers {
58        builder = builder.header(name, value);
59    }
60
61    let response = builder.send().await?;
62    let status = response.status();
63
64    println!("{}\n{}", status, response.text().await?);
65
66    let expected_status = status_code.unwrap_or(200);
67    if status.as_u16() == expected_status {
68        Ok(ControlFlow::Continue)
69    } else {
70        bail!("webhook append returned unexpected status: {}", status);
71    }
72}