1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
// Copyright Materialize, Inc. and contributors. All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use openssl::ssl::{SslConnector, SslMethod};
use openssl::x509::store::X509StoreBuilder;
use openssl::x509::X509;
use postgres_openssl::MakeTlsConnector;
use std::collections::BTreeMap;

use crate::error::{Context, OpError, OpErrorKind};
use crate::fivetran_sdk::form_field::Type;
use crate::fivetran_sdk::{
    ConfigurationFormResponse, ConfigurationTest, FormField, TestRequest, TextField,
};

pub const FIVETRAN_DESTINATION_APPLICATION_NAME: &str = "materialize_fivetran_destination";

pub fn handle_configuration_form_request() -> ConfigurationFormResponse {
    ConfigurationFormResponse {
        schema_selection_supported: true,
        table_selection_supported: true,
        fields: vec![
            FormField {
                name: "host".into(),
                label: "Host".into(),
                description: Some("The hostname of your Materialize region".into()),
                required: true,
                r#type: Some(Type::TextField(TextField::PlainText.into())),
            },
            FormField {
                name: "user".into(),
                label: "User".into(),
                description: Some("The user to connect as".into()),
                required: true,
                r#type: Some(Type::TextField(TextField::PlainText.into())),
            },
            FormField {
                name: "app_password".into(),
                label: "App password".into(),
                description: Some("The app password to authenticate with".into()),
                required: true,
                r#type: Some(Type::TextField(TextField::Password.into())),
            },
            FormField {
                name: "dbname".into(),
                label: "Database".into(),
                description: Some("The name of the database to connect to".into()),
                required: true,
                r#type: Some(Type::TextField(TextField::PlainText.into())),
            },
        ],
        tests: vec![
            ConfigurationTest {
                name: "connect".into(),
                label: "Connecting to Materialize region".into(),
            },
            ConfigurationTest {
                name: "permissions".into(),
                label: "Checking permissions".into(),
            },
        ],
    }
}

pub async fn handle_test_request(request: TestRequest) -> Result<(), OpError> {
    match request.name.as_str() {
        "connect" => test_connect(request.configuration)
            .await
            .context("test_connect"),
        "permissions" => test_permissions(request.configuration)
            .await
            .context("test_permissions"),
        "ping" => Ok(()),
        name => {
            let error = OpErrorKind::UnknownRequest(name.to_string());
            Err(error.into())
        }
    }
}

async fn test_connect(config: BTreeMap<String, String>) -> Result<(), OpError> {
    let _ = connect(config).await?;
    Ok(())
}

async fn test_permissions(config: BTreeMap<String, String>) -> Result<(), OpError> {
    let (dbname, client) = connect(config).await?;
    let row = client
        .query_one(
            "SELECT has_database_privilege($1, 'CREATE') OR mz_is_superuser() AS has_create",
            &[&dbname],
        )
        .await
        .context("querying privileges")?;
    let has_create: bool = row.get("has_create");

    if !has_create {
        let err = OpErrorKind::MissingPrivilege {
            privilege: "CREATE",
            object: dbname,
        }
        .into();
        return Err(err);
    }
    Ok(())
}

pub async fn connect(
    mut config: BTreeMap<String, String>,
) -> Result<(String, tokio_postgres::Client), OpError> {
    let host = config
        .remove("host")
        .ok_or(OpErrorKind::FieldMissing("host"))?;
    let user = config
        .remove("user")
        .ok_or(OpErrorKind::FieldMissing("user"))?;
    let app_password = config
        .remove("app_password")
        .ok_or(OpErrorKind::FieldMissing("app_password"))?;
    let dbname = config
        .remove("dbname")
        .ok_or(OpErrorKind::FieldMissing("dbname"))?;

    // Compile in the CA certificate bundle downloaded by the build script, and
    // configure the TLS connector to reference that compiled-in CA bundle,
    // rather than attempting to use the system's CA bundle. This supports
    // running in Fivetran's environment, where the CA bundle will not be
    // available. This does introduce a small amount of risk, as the CA bundle
    // will not be updated until we issue a new release of the Fivetran
    // destination.
    //
    // TODO: depend on the system's certificate bundle instead, once Fivetran
    // supports running destinations in a containerized environment.
    let ca_bundle = include_bytes!(concat!(env!("OUT_DIR"), "/ca-certificate.crt"));
    let ca_certs = X509::stack_from_pem(ca_bundle)?;
    let mut cert_store = X509StoreBuilder::new()?;
    for cert in ca_certs {
        cert_store.add_cert(cert)?;
    }
    let mut builder = SslConnector::builder(SslMethod::tls_client())?;
    builder.set_verify_cert_store(cert_store.build())?;

    let tls_connector = MakeTlsConnector::new(builder.build());
    let (client, conn) = tokio_postgres::Config::new()
        .host(&host)
        .user(&user)
        .port(6875)
        .password(app_password)
        .dbname(&dbname)
        .application_name(FIVETRAN_DESTINATION_APPLICATION_NAME)
        .connect(tls_connector)
        .await?;

    mz_ore::task::spawn(|| "postgres_connection", async move {
        if let Err(e) = conn.await {
            panic!("tokio-postgres connection error: {}", e);
        }
    });

    Ok((dbname, client))
}