1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
// Copyright Materialize, Inc. and contributors. All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use std::collections::BTreeMap;

use mz_pgrepr::Type;
use mz_sql_parser::ast::{Ident, UnresolvedItemName};
use postgres_protocol::escape;
use tokio_postgres::Client;

use crate::destination::{config, FIVETRAN_SYSTEM_COLUMN_DELETE};
use crate::error::{Context, OpError, OpErrorKind};
use crate::fivetran_sdk::{
    AlterTableRequest, Column, CreateTableRequest, DataType, DecimalParams, DescribeTableRequest,
    Table,
};
use crate::utils;

/// HACK(parkmycar): An ugly hack to track whether or not a column is a primary key.
const PRIMARY_KEY_MAGIC_STRING: &str = "mz_is_primary_key";

pub async fn handle_describe_table(
    request: DescribeTableRequest,
) -> Result<Option<Table>, OpError> {
    let (dbname, client) = config::connect(request.configuration).await?;
    describe_table(&client, &dbname, &request.schema_name, &request.table_name).await
}

pub async fn describe_table(
    client: &Client,
    database: &str,
    schema: &str,
    table: &str,
) -> Result<Option<Table>, OpError> {
    let table_id = {
        let rows = client
            .query(
                r#"SELECT t.id
                   FROM mz_tables t
                   JOIN mz_schemas s ON s.id = t.schema_id
                   JOIN mz_databases d ON d.id = s.database_id
                   WHERE d.name = $1 AND s.name = $2 AND t.name = $3
                "#,
                &[&database, &schema, &table],
            )
            .await
            .context("fetching table ID")?;

        match &*rows {
            [] => return Ok(None),
            [row] => row.get::<_, String>("id"),
            _ => {
                let err = OpErrorKind::InvariantViolated(
                    "describe table query returned multiple results".to_string(),
                );
                return Err(err.into());
            }
        }
    };

    let columns = {
        let stmt = r#"SELECT
                   name,
                   type_oid,
                   type_mod,
                   COALESCE(coms.comment, '') = $1 AS primary_key
               FROM mz_columns AS cols
               LEFT JOIN mz_internal.mz_comments AS coms
               ON cols.id = coms.id AND cols.position = coms.object_sub_id
               WHERE cols.id = $2
               ORDER BY cols.position ASC"#;

        let rows = client
            .query(stmt, &[&PRIMARY_KEY_MAGIC_STRING, &table_id])
            .await
            .context("fetching table columns")?;

        let mut columns = vec![];
        for row in rows {
            let name = row.get::<_, String>("name");
            let primary_key = row.get::<_, bool>("primary_key");
            let ty_oid = row.get::<_, u32>("type_oid");
            let ty_mod = row.get::<_, i32>("type_mod");
            let ty = Type::from_oid_and_typmod(ty_oid, ty_mod).with_context(|| {
                format!("looking up type with OID {ty_oid} and modifier {ty_mod}")
            })?;
            let (ty, decimal) = utils::to_fivetran_type(ty)?;

            columns.push(Column {
                name,
                r#type: ty.into(),
                primary_key,
                decimal,
            })
        }
        columns
    };

    Ok(Some(Table {
        name: table.to_string(),
        columns,
    }))
}

pub async fn handle_create_table(request: CreateTableRequest) -> Result<(), OpError> {
    let table = request.table.ok_or(OpErrorKind::FieldMissing("table"))?;

    let schema = Ident::new(&request.schema_name)?;
    let qualified_table_name =
        UnresolvedItemName::qualified(&[schema.clone(), Ident::new(&table.name)?]);

    let mut total_columns = table.columns;
    // We want to make sure the deleted system column is always provided.
    //
    // Note: Instead of creating a map we check existence in the Vec, to retain original order
    // of the columns from the request.
    //
    // TODO(parkmycar): Use an IndexMap here.
    let contains_delete = total_columns
        .iter()
        .any(|col| col.name == FIVETRAN_SYSTEM_COLUMN_DELETE);
    if !contains_delete {
        let delete_column = Column {
            name: FIVETRAN_SYSTEM_COLUMN_DELETE.to_string(),
            r#type: DataType::Boolean.into(),
            primary_key: false,
            decimal: None,
        };
        total_columns.push(delete_column);
    }

    let mut defs = vec![];
    let mut primary_key_columns = vec![];
    for column in total_columns {
        let name = escape::escape_identifier(&column.name);
        let mut ty = utils::to_materialize_type(column.r#type())?.to_string();
        if let Some(d) = column.decimal {
            ty += &format!("({}, {})", d.precision, d.scale);
        }

        defs.push(format!("{name} {ty}"));

        if column.primary_key {
            primary_key_columns.push(name);
        }
    }

    let sql = format!(
        r#"BEGIN; CREATE SCHEMA IF NOT EXISTS {schema}; COMMIT;
        BEGIN; CREATE TABLE {qualified_table_name} ({defs}); COMMIT;"#,
        defs = defs.join(","),
    );

    let (_dbname, client) = config::connect(request.configuration).await?;
    client.batch_execute(&sql).await?;

    // TODO(parkmycar): This is an ugly hack!
    //
    // If Fivetran creates a table with primary keys, it expects a DescribeTableRequest to report
    // those columns as primary keys. But Materialize doesn't support primary keys, so we need to
    // store this metadata somewhere else. For now we do it in a COMMENT.
    for column_name in primary_key_columns {
        let stmt = format!(
            "COMMENT ON COLUMN {qualified_table_name}.{column_name} IS {magic_comment}",
            magic_comment = escape::escape_literal(PRIMARY_KEY_MAGIC_STRING),
        );
        client
            .execute(&stmt, &[])
            .await
            .context("setting magic primary key comment")?;
    }

    Ok(())
}

pub async fn handle_alter_table(request: AlterTableRequest) -> Result<(), OpError> {
    let (dbname, client) = config::connect(request.configuration).await?;

    // Bail early if there isn't a table to alter.
    let Some(request_table) = request.table else {
        return Ok(());
    };

    let current_table = describe_table(&client, &dbname, &request.schema_name, &request_table.name)
        .await
        .context("alter table")?;
    let Some(current_table) = current_table else {
        return Err(OpErrorKind::UnknownTable {
            database: dbname,
            schema: request.schema_name,
            table: request_table.name,
        }
        .into());
    };

    if columns_match(&request_table, &current_table) {
        Ok(())
    } else {
        let error = format!(
            "alter_table, request: {:?}, current: {:?}",
            request_table, current_table
        );
        Err(OpErrorKind::Unsupported(error).into())
    }
}

// TODO(parkmycar): Implement some more complex diffing logic.
fn columns_match(request: &Table, current: &Table) -> bool {
    #[derive(Clone, Debug)]
    struct ColumnMetadata {
        ty: DataType,
        primary_key: bool,
        decimal: Option<DecimalParams>,
    }

    impl PartialEq<ColumnMetadata> for ColumnMetadata {
        fn eq(&self, other: &ColumnMetadata) -> bool {
            self.ty == other.ty
                && self.primary_key == other.primary_key
                // TODO(parkmycar): Better comparison for decimals.
                && self.decimal.is_some() == other.decimal.is_some()
        }
    }

    let map_columns = |col: &Column| {
        let metadata = ColumnMetadata {
            ty: col.r#type(),
            primary_key: col.primary_key,
            decimal: col.decimal.clone(),
        };
        (col.name.clone(), metadata)
    };

    // Sort the columns by name, and check if they're equal. Eventually we'll have some more
    // complex logic here.
    let request_cols: BTreeMap<_, _> = request.columns.iter().map(map_columns).collect();
    let current_cols: BTreeMap<_, _> = current.columns.iter().map(map_columns).collect();

    request_cols == current_cols
}