use std::collections::BTreeMap;
use anyhow::{bail, Context};
use crate::action::{ControlFlow, State};
use crate::parser::BuiltinCommand;
#[allow(clippy::as_conversions, clippy::clone_on_ref_ptr)]
mod proto {
pub mod fivetran {
include!(concat!(env!("OUT_DIR"), "/fivetran_sdk.rs"));
}
mod google {
pub mod protobuf {
include!(concat!(env!("OUT_DIR"), "/google.protobuf.rs"));
}
}
}
pub async fn run_destination_command(
mut cmd: BuiltinCommand,
state: &State,
) -> Result<ControlFlow, anyhow::Error> {
let action = cmd.args.string("action")?;
let sql_host = url::Url::parse(&state.materialize.sql_addr)
.expect("failed to parse Materialize SQL addr")
.scheme()
.to_string();
let default_config = [
("host", sql_host),
("user", "materialize".into()),
("app_password", "ignored".into()),
("dbname", "materialize".into()),
];
let mut config: BTreeMap<_, _> = cmd.args.into_iter().collect();
for (key, value) in default_config {
config.entry(key.into()).or_insert(value);
}
let config: serde_json::Map<String, serde_json::Value> = config
.into_iter()
.map(|(key, value)| (key, serde_json::Value::String(value)))
.collect();
let body = cmd.input.join("\n");
let objects = serde_json::Deserializer::from_str(&body)
.into_iter::<serde_json::Value>()
.collect::<Result<Vec<_>, _>>()
.context("reading body input")?;
let (mut request, response) = match &objects[..] {
[req, resp] => (req.clone(), resp.clone()),
x => bail!("Expected 2 JSON objects, found {}", x.len()),
};
let Some(request_map) = request.as_object_mut() else {
bail!("Invalid type found for request");
};
if request_map.contains_key("configuration") {
bail!("Request object should not contain key 'configuration'");
}
request_map.insert("configuration".into(), serde_json::Value::Object(config));
println!("{action} @ {}", state.fivetran_destination_url);
let mut fivetran_client = proto::fivetran::destination_client::DestinationClient::connect(
state.fivetran_destination_url.clone(),
)
.await
.context("connecting to fivetran destination")?;
match action.as_str() {
"describe" => {
let request: proto::fivetran::DescribeTableRequest =
serde_json::from_value(request).context("describe request")?;
let expected_response: proto::fivetran::DescribeTableResponse =
serde_json::from_value(response).context("describe response")?;
let response = fivetran_client
.describe_table(request)
.await
.context("describe")?;
let response = response.into_inner();
if response != expected_response {
bail!(
"Describe Table Response did not match expected\n
response: {response:#?}\n
expected: {expected_response:#?}"
);
}
}
"write_batch" => {
let request: proto::fivetran::WriteBatchRequest =
serde_json::from_value(request).context("write batch request")?;
let expected_response: proto::fivetran::WriteBatchResponse =
serde_json::from_value(response).context("write batch response")?;
let response = fivetran_client
.write_batch(request)
.await
.context("write batch")?;
let response = response.into_inner();
if response != expected_response {
bail!(
"Write Batch Response did not match expected\n
response: {response:#?}\n
expected: {expected_response:#?}"
);
}
}
other => bail!("Unsupported command {other}"),
}
Ok(ControlFlow::Continue)
}