1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
// Copyright Materialize, Inc. and contributors. All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

//! Descriptions of PostgreSQL objects.

use std::collections::BTreeSet;

use anyhow::bail;
use mz_proto::{IntoRustIfSome, RustType, TryFromProtoError};
use proptest::prelude::any;
use proptest_derive::Arbitrary;
use serde::{Deserialize, Serialize};
use tokio_postgres::types::Oid;
use tracing::warn;

include!(concat!(env!("OUT_DIR"), "/mz_postgres_util.desc.rs"));

/// Describes a schema in a PostgreSQL database.
///
/// <https://www.postgresql.org/docs/current/catalog-pg-namespace.html>
#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
pub struct PostgresSchemaDesc {
    /// The OID of the schema.
    pub oid: Oid,
    /// The name of the schema.
    pub name: String,
    /// Owner of the namespace
    pub owner: Oid,
}

/// Describes a table in a PostgreSQL database.
#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, Arbitrary)]
pub struct PostgresTableDesc {
    /// The OID of the table.
    pub oid: Oid,
    /// The name of the schema that the table belongs to.
    pub namespace: String,
    /// The name of the table.
    pub name: String,
    /// The description of each column, in order of their position in the table.
    #[proptest(strategy = "proptest::collection::vec(any::<PostgresColumnDesc>(), 1..4)")]
    pub columns: Vec<PostgresColumnDesc>,
    /// Applicable keys for this table (i.e. primary key and unique
    /// constraints).
    #[proptest(strategy = "proptest::collection::btree_set(any::<PostgresKeyDesc>(), 1..4)")]
    pub keys: BTreeSet<PostgresKeyDesc>,
}

impl PostgresTableDesc {
    /// Determines if two `PostgresTableDesc` are compatible with one another in
    /// a way that Materialize can handle.
    ///
    /// Currently this means that the values are equal except for the following
    /// exceptions:
    /// - `self`'s columns are a compatible prefix of `other`'s columns.
    ///   Compatibility is defined as returning `true` for
    ///   `PostgresColumnDesc::is_compatible`.
    /// - `self`'s keys are all present in `other`
    pub fn determine_compatibility(
        &self,
        other: &PostgresTableDesc,
        allow_type_to_change_by_col_num: &BTreeSet<u16>,
    ) -> Result<(), anyhow::Error> {
        if self == other {
            return Ok(());
        }

        let PostgresTableDesc {
            oid: other_oid,
            namespace: other_namespace,
            name: other_name,
            columns: other_cols,
            keys: other_keys,
        } = other;

        // Table columns cannot change position, so only need to ensure that
        // `self.columns` is a prefix of `other_cols`.
        if self.columns.len() <= other_cols.len()
            && self.columns.iter().zip(other_cols.iter()).all(|(s, o)| s.is_compatible(o, allow_type_to_change_by_col_num))
            && &self.name == other_name
            && &self.oid == other_oid
            && &self.namespace == other_namespace
            // Our keys are all still present in exactly the same shape.
            && self.keys.difference(other_keys).next().is_none()
        {
            Ok(())
        } else {
            warn!(
                "Error validating table in publication. Expected: {:?} Actual: {:?}",
                &self, other
            );
            bail!(
                "source table {} with oid {} has been altered",
                self.name,
                self.oid
            )
        }
    }
}

impl RustType<ProtoPostgresTableDesc> for PostgresTableDesc {
    fn into_proto(&self) -> ProtoPostgresTableDesc {
        ProtoPostgresTableDesc {
            oid: self.oid,
            namespace: self.namespace.clone(),
            name: self.name.clone(),
            columns: self.columns.iter().map(|c| c.into_proto()).collect(),
            keys: self.keys.iter().map(PostgresKeyDesc::into_proto).collect(),
        }
    }

    fn from_proto(proto: ProtoPostgresTableDesc) -> Result<Self, TryFromProtoError> {
        Ok(PostgresTableDesc {
            oid: proto.oid,
            namespace: proto.namespace.clone(),
            name: proto.name.clone(),
            columns: proto
                .columns
                .into_iter()
                .map(PostgresColumnDesc::from_proto)
                .collect::<Result<_, _>>()?,
            keys: proto
                .keys
                .into_iter()
                .map(PostgresKeyDesc::from_proto)
                .collect::<Result<_, _>>()?,
        })
    }
}

/// Describes a column in a [`PostgresTableDesc`].
#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, Arbitrary)]
pub struct PostgresColumnDesc {
    /// The name of the column.
    pub name: String,
    /// The column's monotonic position in its table, i.e. "this was the _i_th
    /// column created" irrespective of the current number of columns.
    pub col_num: u16,
    /// The OID of the column's type.
    pub type_oid: Oid,
    /// The modifier for the column's type.
    pub type_mod: i32,
    /// True if the column lacks a `NOT NULL` constraint.
    pub nullable: bool,
}

impl PostgresColumnDesc {
    /// Determines if data a relation with a structure of `other` can be treated
    /// the same as `self`.
    ///
    /// Note that this function somewhat unnecessarily errors if the names
    /// differ; this is negotiable but we want users to understand the fixedness
    /// of names in our schemas.
    fn is_compatible(
        &self,
        other: &PostgresColumnDesc,
        allow_type_to_change_by_col_num: &BTreeSet<u16>,
    ) -> bool {
        let allow_type_change = allow_type_to_change_by_col_num.contains(&self.col_num);

        self.name == other.name
            && self.col_num == other.col_num
            && (self.type_oid == other.type_oid || allow_type_change)
            && (self.type_mod == other.type_mod || allow_type_change)
            // Columns are compatible if:
            // - self is nullable; introducing a not null constraint doesn't
            //   change this column's behavior.
            // - self and other are both not nullable
            && (self.nullable || self.nullable == other.nullable)
    }
}

impl RustType<ProtoPostgresColumnDesc> for PostgresColumnDesc {
    fn into_proto(&self) -> ProtoPostgresColumnDesc {
        ProtoPostgresColumnDesc {
            name: self.name.clone(),
            col_num: Some(self.col_num.into()),
            type_oid: self.type_oid,
            type_mod: self.type_mod,
            nullable: self.nullable,
        }
    }

    fn from_proto(proto: ProtoPostgresColumnDesc) -> Result<Self, TryFromProtoError> {
        Ok(PostgresColumnDesc {
            name: proto.name,
            col_num: {
                let v: u32 = proto
                    .col_num
                    .into_rust_if_some("ProtoPostgresColumnDesc::col_num")?;
                u16::try_from(v).expect("u16 must roundtrip")
            },
            type_oid: proto.type_oid,
            type_mod: proto.type_mod,
            nullable: proto.nullable,
        })
    }
}

/// Describes a key in a [`PostgresTableDesc`].
#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, PartialOrd, Ord, Arbitrary)]
pub struct PostgresKeyDesc {
    /// This key is derived from the `pg_constraint` with this OID.
    pub oid: Oid,
    /// The name of the constraints.
    pub name: String,
    /// The `attnum` of the columns comprising the key. `attnum` is a unique identifier for a column
    /// in a PG table; see <https://www.postgresql.org/docs/current/catalog-pg-attribute.html>
    #[proptest(strategy = "proptest::collection::vec(any::<u16>(), 0..4)")]
    pub cols: Vec<u16>,
    /// Whether or not this key is the primary key.
    pub is_primary: bool,
    /// If this constraint was generated with NULLS NOT DISTINCT; see
    /// <https://www.postgresql.org/about/featurematrix/detail/392/>
    pub nulls_not_distinct: bool,
}

impl RustType<ProtoPostgresKeyDesc> for PostgresKeyDesc {
    fn into_proto(&self) -> ProtoPostgresKeyDesc {
        ProtoPostgresKeyDesc {
            oid: self.oid,
            name: self.name.clone(),
            cols: self.cols.clone().into_iter().map(u32::from).collect(),
            is_primary: self.is_primary,
            nulls_not_distinct: self.nulls_not_distinct,
        }
    }

    fn from_proto(proto: ProtoPostgresKeyDesc) -> Result<Self, TryFromProtoError> {
        Ok(PostgresKeyDesc {
            oid: proto.oid,
            name: proto.name,
            cols: proto
                .cols
                .into_iter()
                .map(|c| c.try_into().expect("values roundtrip"))
                .collect(),
            is_primary: proto.is_primary,
            nulls_not_distinct: proto.nulls_not_distinct,
        })
    }
}