1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
// Copyright Materialize, Inc. and contributors. All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use std::collections::BTreeMap;

use anyhow::{bail, Context};

use crate::action::{ControlFlow, State};
use crate::parser::BuiltinCommand;

// Note(parkmycar): We wrap this in a `mod` block soley for the purpose of allowing lints for the
// generated protobuf code.
#[allow(clippy::as_conversions, clippy::clone_on_ref_ptr)]
mod proto {
    pub mod fivetran {
        include!(concat!(env!("OUT_DIR"), "/fivetran_sdk.rs"));
    }

    // We explicitly generate and then include the "well known types" for Protobuf so we can derive
    // `serde` traits.
    mod google {
        pub mod protobuf {
            include!(concat!(env!("OUT_DIR"), "/google.protobuf.rs"));
        }
    }
}

/// Sends a gRPC request to the currently running Fivetran Destination.
pub async fn run_destination_command(
    mut cmd: BuiltinCommand,
    state: &State,
) -> Result<ControlFlow, anyhow::Error> {
    let action = cmd.args.string("action")?;

    // Interpret the remaining arguments are part of the connection config.

    let sql_host = url::Url::parse(&state.materialize.sql_addr)
        .expect("failed to parse Materialize SQL addr")
        .scheme()
        .to_string();
    let default_config = [
        ("host", sql_host),
        ("user", "materialize".into()),
        ("app_password", "ignored".into()),
        ("dbname", "materialize".into()),
    ];
    let mut config: BTreeMap<_, _> = cmd.args.into_iter().collect();
    for (key, value) in default_config {
        config.entry(key.into()).or_insert(value);
    }

    let config: serde_json::Map<String, serde_json::Value> = config
        .into_iter()
        .map(|(key, value)| (key, serde_json::Value::String(value)))
        .collect();

    let body = cmd.input.join("\n");
    let objects = serde_json::Deserializer::from_str(&body)
        .into_iter::<serde_json::Value>()
        .collect::<Result<Vec<_>, _>>()
        .context("reading body input")?;
    let (mut request, response) = match &objects[..] {
        [req, resp] => (req.clone(), resp.clone()),
        x => bail!("Expected 2 JSON objects, found {}", x.len()),
    };

    // Splice the configuration into the request.
    let Some(request_map) = request.as_object_mut() else {
        bail!("Invalid type found for request");
    };
    if request_map.contains_key("configuration") {
        bail!("Request object should not contain key 'configuration'");
    }
    request_map.insert("configuration".into(), serde_json::Value::Object(config));

    // Connect to the destination.
    println!("{action} @ {}", state.fivetran_destination_url);
    let mut fivetran_client = proto::fivetran::destination_client::DestinationClient::connect(
        state.fivetran_destination_url.clone(),
    )
    .await
    .context("connecting to fivetran destination")?;

    match action.as_str() {
        "describe" => {
            let request: proto::fivetran::DescribeTableRequest =
                serde_json::from_value(request).context("describe request")?;
            let expected_response: proto::fivetran::DescribeTableResponse =
                serde_json::from_value(response).context("describe response")?;

            let response = fivetran_client
                .describe_table(request)
                .await
                .context("describe")?;
            let response = response.into_inner();

            if response != expected_response {
                bail!(
                    "Describe Table Response did not match expected\n
                    response: {response:#?}\n
                    expected: {expected_response:#?}"
                );
            }
        }
        "write_batch" => {
            let request: proto::fivetran::WriteBatchRequest =
                serde_json::from_value(request).context("write batch request")?;
            let expected_response: proto::fivetran::WriteBatchResponse =
                serde_json::from_value(response).context("write batch response")?;

            let response = fivetran_client
                .write_batch(request)
                .await
                .context("write batch")?;
            let response = response.into_inner();

            if response != expected_response {
                bail!(
                    "Write Batch Response did not match expected\n
                    response: {response:#?}\n
                    expected: {expected_response:#?}"
                );
            }
        }
        other => bail!("Unsupported command {other}"),
    }

    Ok(ControlFlow::Continue)
}