mz_testdrive/action/
fivetran.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::BTreeMap;
11
12use anyhow::{Context, bail};
13
14use crate::action::{ControlFlow, State};
15use crate::parser::BuiltinCommand;
16
17// Note(parkmycar): We wrap this in a `mod` block soley for the purpose of allowing lints for the
18// generated protobuf code.
19#[allow(clippy::as_conversions, clippy::clone_on_ref_ptr)]
20mod proto {
21    pub mod fivetran {
22        include!(concat!(env!("OUT_DIR"), "/fivetran_sdk.v2.rs"));
23    }
24}
25
26// We explicitly generate and then include the "well known types" for Protobuf so we can derive
27// `serde` traits.
28#[allow(clippy::as_conversions, clippy::clone_on_ref_ptr)]
29mod google {
30    pub mod protobuf {
31        include!(concat!(env!("OUT_DIR"), "/google.protobuf.rs"));
32    }
33}
34
35/// Sends a gRPC request to the currently running Fivetran Destination.
36pub async fn run_destination_command(
37    mut cmd: BuiltinCommand,
38    state: &State,
39) -> Result<ControlFlow, anyhow::Error> {
40    let action = cmd.args.string("action")?;
41
42    // Interpret the remaining arguments are part of the connection config.
43
44    let sql_host = url::Url::parse(&state.materialize.sql_addr)
45        .expect("failed to parse Materialize SQL addr")
46        .scheme()
47        .to_string();
48    let default_config = [
49        ("host", sql_host),
50        ("user", "materialize".into()),
51        ("app_password", "ignored".into()),
52        ("dbname", "materialize".into()),
53    ];
54    let mut config: BTreeMap<_, _> = cmd.args.into_iter().collect();
55    for (key, value) in default_config {
56        config.entry(key.into()).or_insert(value);
57    }
58
59    let config: serde_json::Map<String, serde_json::Value> = config
60        .into_iter()
61        .map(|(key, value)| (key, serde_json::Value::String(value)))
62        .collect();
63
64    let body = cmd.input.join("\n");
65    let objects = serde_json::Deserializer::from_str(&body)
66        .into_iter::<serde_json::Value>()
67        .collect::<Result<Vec<_>, _>>()
68        .context("reading body input")?;
69    let (mut request, response) = match &objects[..] {
70        [req, resp] => (req.clone(), resp.clone()),
71        x => bail!("Expected 2 JSON objects, found {}", x.len()),
72    };
73
74    // Splice the configuration into the request.
75    let Some(request_map) = request.as_object_mut() else {
76        bail!("Invalid type found for request");
77    };
78    if request_map.contains_key("configuration") {
79        bail!("Request object should not contain key 'configuration'");
80    }
81    request_map.insert("configuration".into(), serde_json::Value::Object(config));
82
83    // Connect to the destination.
84    println!("{action} @ {}", state.fivetran_destination_url);
85    let mut fivetran_client =
86        proto::fivetran::destination_connector_client::DestinationConnectorClient::connect(
87            state.fivetran_destination_url.clone(),
88        )
89        .await
90        .context("connecting to fivetran destination")?;
91
92    match action.as_str() {
93        "describe" => {
94            let request: proto::fivetran::DescribeTableRequest =
95                serde_json::from_value(request).context("describe request")?;
96            let expected_response: proto::fivetran::DescribeTableResponse =
97                serde_json::from_value(response).context("describe response")?;
98
99            let response = fivetran_client
100                .describe_table(request)
101                .await
102                .context("describe")?;
103            let response = response.into_inner();
104
105            if response != expected_response {
106                bail!(
107                    "Describe Table Response did not match expected\n
108                    response: {response:#?}\n
109                    expected: {expected_response:#?}"
110                );
111            }
112        }
113        "write_batch" => {
114            let request: proto::fivetran::WriteBatchRequest =
115                serde_json::from_value(request).context("write batch request")?;
116            let expected_response: proto::fivetran::WriteBatchResponse =
117                serde_json::from_value(response).context("write batch response")?;
118
119            let response = fivetran_client
120                .write_batch(request)
121                .await
122                .context("write batch")?;
123            let response = response.into_inner();
124
125            if response != expected_response {
126                bail!(
127                    "Write Batch Response did not match expected\n
128                    response: {response:#?}\n
129                    expected: {expected_response:#?}"
130                );
131            }
132        }
133        other => bail!("Unsupported command {other}"),
134    }
135
136    Ok(ControlFlow::Continue)
137}