mz_testdrive/action/
fivetran.rs
1use std::collections::BTreeMap;
11
12use anyhow::{Context, bail};
13
14use crate::action::{ControlFlow, State};
15use crate::parser::BuiltinCommand;
16
17#[allow(clippy::as_conversions, clippy::clone_on_ref_ptr)]
20mod proto {
21 pub mod fivetran {
22 include!(concat!(env!("OUT_DIR"), "/fivetran_sdk.v2.rs"));
23 }
24}
25
26#[allow(clippy::as_conversions, clippy::clone_on_ref_ptr)]
29mod google {
30 pub mod protobuf {
31 include!(concat!(env!("OUT_DIR"), "/google.protobuf.rs"));
32 }
33}
34
35pub async fn run_destination_command(
37 mut cmd: BuiltinCommand,
38 state: &State,
39) -> Result<ControlFlow, anyhow::Error> {
40 let action = cmd.args.string("action")?;
41
42 let sql_host = url::Url::parse(&state.materialize.sql_addr)
45 .expect("failed to parse Materialize SQL addr")
46 .scheme()
47 .to_string();
48 let default_config = [
49 ("host", sql_host),
50 ("user", "materialize".into()),
51 ("app_password", "ignored".into()),
52 ("dbname", "materialize".into()),
53 ];
54 let mut config: BTreeMap<_, _> = cmd.args.into_iter().collect();
55 for (key, value) in default_config {
56 config.entry(key.into()).or_insert(value);
57 }
58
59 let config: serde_json::Map<String, serde_json::Value> = config
60 .into_iter()
61 .map(|(key, value)| (key, serde_json::Value::String(value)))
62 .collect();
63
64 let body = cmd.input.join("\n");
65 let objects = serde_json::Deserializer::from_str(&body)
66 .into_iter::<serde_json::Value>()
67 .collect::<Result<Vec<_>, _>>()
68 .context("reading body input")?;
69 let (mut request, response) = match &objects[..] {
70 [req, resp] => (req.clone(), resp.clone()),
71 x => bail!("Expected 2 JSON objects, found {}", x.len()),
72 };
73
74 let Some(request_map) = request.as_object_mut() else {
76 bail!("Invalid type found for request");
77 };
78 if request_map.contains_key("configuration") {
79 bail!("Request object should not contain key 'configuration'");
80 }
81 request_map.insert("configuration".into(), serde_json::Value::Object(config));
82
83 println!("{action} @ {}", state.fivetran_destination_url);
85 let mut fivetran_client =
86 proto::fivetran::destination_connector_client::DestinationConnectorClient::connect(
87 state.fivetran_destination_url.clone(),
88 )
89 .await
90 .context("connecting to fivetran destination")?;
91
92 match action.as_str() {
93 "describe" => {
94 let request: proto::fivetran::DescribeTableRequest =
95 serde_json::from_value(request).context("describe request")?;
96 let expected_response: proto::fivetran::DescribeTableResponse =
97 serde_json::from_value(response).context("describe response")?;
98
99 let response = fivetran_client
100 .describe_table(request)
101 .await
102 .context("describe")?;
103 let response = response.into_inner();
104
105 if response != expected_response {
106 bail!(
107 "Describe Table Response did not match expected\n
108 response: {response:#?}\n
109 expected: {expected_response:#?}"
110 );
111 }
112 }
113 "write_batch" => {
114 let request: proto::fivetran::WriteBatchRequest =
115 serde_json::from_value(request).context("write batch request")?;
116 let expected_response: proto::fivetran::WriteBatchResponse =
117 serde_json::from_value(response).context("write batch response")?;
118
119 let response = fivetran_client
120 .write_batch(request)
121 .await
122 .context("write batch")?;
123 let response = response.into_inner();
124
125 if response != expected_response {
126 bail!(
127 "Write Batch Response did not match expected\n
128 response: {response:#?}\n
129 expected: {expected_response:#?}"
130 );
131 }
132 }
133 other => bail!("Unsupported command {other}"),
134 }
135
136 Ok(ControlFlow::Continue)
137}