mz_postgres_util/
desc.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Descriptions of PostgreSQL objects.
11
12use std::collections::BTreeSet;
13
14use anyhow::bail;
15use mz_proto::{IntoRustIfSome, RustType, TryFromProtoError};
16use proptest::prelude::any;
17use proptest_derive::Arbitrary;
18use serde::{Deserialize, Serialize};
19use tokio_postgres::types::Oid;
20use tracing::warn;
21
22include!(concat!(env!("OUT_DIR"), "/mz_postgres_util.desc.rs"));
23
24/// Describes a schema in a PostgreSQL database.
25///
26/// <https://www.postgresql.org/docs/current/catalog-pg-namespace.html>
27#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
28pub struct PostgresSchemaDesc {
29    /// The OID of the schema.
30    pub oid: Oid,
31    /// The name of the schema.
32    pub name: String,
33    /// Owner of the namespace
34    pub owner: Oid,
35}
36
37/// Describes a table in a PostgreSQL database.
38#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, Arbitrary)]
39pub struct PostgresTableDesc {
40    /// The OID of the table.
41    pub oid: Oid,
42    /// The name of the schema that the table belongs to.
43    pub namespace: String,
44    /// The name of the table.
45    pub name: String,
46    /// The description of each column, in order of their position in the table.
47    #[proptest(strategy = "proptest::collection::vec(any::<PostgresColumnDesc>(), 1..4)")]
48    pub columns: Vec<PostgresColumnDesc>,
49    /// Applicable keys for this table (i.e. primary key and unique
50    /// constraints).
51    #[proptest(strategy = "proptest::collection::btree_set(any::<PostgresKeyDesc>(), 1..4)")]
52    pub keys: BTreeSet<PostgresKeyDesc>,
53}
54
55impl PostgresTableDesc {
56    /// Determines if two `PostgresTableDesc` are compatible with one another in
57    /// a way that Materialize can handle.
58    ///
59    /// Currently this means that the values are equal except for the following
60    /// exceptions:
61    /// - `self`'s columns are a compatible prefix of `other`'s columns.
62    ///   Compatibility is defined as returning `true` for
63    ///   `PostgresColumnDesc::is_compatible`.
64    /// - `self`'s keys are all present in `other`
65    pub fn determine_compatibility(
66        &self,
67        other: &PostgresTableDesc,
68        allow_type_to_change_by_col_num: &BTreeSet<u16>,
69    ) -> Result<(), anyhow::Error> {
70        if self == other {
71            return Ok(());
72        }
73
74        let PostgresTableDesc {
75            oid: other_oid,
76            namespace: other_namespace,
77            name: other_name,
78            columns: other_cols,
79            keys: other_keys,
80        } = other;
81
82        // Table columns cannot change position, so only need to ensure that
83        // `self.columns` is a prefix of `other_cols`.
84        if self.columns.len() <= other_cols.len()
85            && self.columns.iter().zip(other_cols.iter()).all(|(s, o)| s.is_compatible(o, allow_type_to_change_by_col_num))
86            && &self.name == other_name
87            && &self.oid == other_oid
88            && &self.namespace == other_namespace
89            // Our keys are all still present in exactly the same shape.
90            && self.keys.difference(other_keys).next().is_none()
91        {
92            Ok(())
93        } else {
94            warn!(
95                "Error validating table in publication. Expected: {:?} Actual: {:?}",
96                &self, other
97            );
98            bail!(
99                "source table {} with oid {} has been altered",
100                self.name,
101                self.oid
102            )
103        }
104    }
105}
106
107impl RustType<ProtoPostgresTableDesc> for PostgresTableDesc {
108    fn into_proto(&self) -> ProtoPostgresTableDesc {
109        ProtoPostgresTableDesc {
110            oid: self.oid,
111            namespace: self.namespace.clone(),
112            name: self.name.clone(),
113            columns: self.columns.iter().map(|c| c.into_proto()).collect(),
114            keys: self.keys.iter().map(PostgresKeyDesc::into_proto).collect(),
115        }
116    }
117
118    fn from_proto(proto: ProtoPostgresTableDesc) -> Result<Self, TryFromProtoError> {
119        Ok(PostgresTableDesc {
120            oid: proto.oid,
121            namespace: proto.namespace.clone(),
122            name: proto.name.clone(),
123            columns: proto
124                .columns
125                .into_iter()
126                .map(PostgresColumnDesc::from_proto)
127                .collect::<Result<_, _>>()?,
128            keys: proto
129                .keys
130                .into_iter()
131                .map(PostgresKeyDesc::from_proto)
132                .collect::<Result<_, _>>()?,
133        })
134    }
135}
136
137/// Describes a column in a [`PostgresTableDesc`].
138#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, Arbitrary)]
139pub struct PostgresColumnDesc {
140    /// The name of the column.
141    pub name: String,
142    /// The column's monotonic position in its table, i.e. "this was the _i_th
143    /// column created" irrespective of the current number of columns.
144    pub col_num: u16,
145    /// The OID of the column's type.
146    pub type_oid: Oid,
147    /// The modifier for the column's type.
148    pub type_mod: i32,
149    /// True if the column lacks a `NOT NULL` constraint.
150    pub nullable: bool,
151}
152
153impl PostgresColumnDesc {
154    /// Determines if data a relation with a structure of `other` can be treated
155    /// the same as `self`.
156    ///
157    /// Note that this function somewhat unnecessarily errors if the names
158    /// differ; this is negotiable but we want users to understand the fixedness
159    /// of names in our schemas.
160    fn is_compatible(
161        &self,
162        other: &PostgresColumnDesc,
163        allow_type_to_change_by_col_num: &BTreeSet<u16>,
164    ) -> bool {
165        let allow_type_change = allow_type_to_change_by_col_num.contains(&self.col_num);
166
167        self.name == other.name
168            && self.col_num == other.col_num
169            && (self.type_oid == other.type_oid || allow_type_change)
170            && (self.type_mod == other.type_mod || allow_type_change)
171            // Columns are compatible if:
172            // - self is nullable; introducing a not null constraint doesn't
173            //   change this column's behavior.
174            // - self and other are both not nullable
175            && (self.nullable || self.nullable == other.nullable)
176    }
177}
178
179impl RustType<ProtoPostgresColumnDesc> for PostgresColumnDesc {
180    fn into_proto(&self) -> ProtoPostgresColumnDesc {
181        ProtoPostgresColumnDesc {
182            name: self.name.clone(),
183            col_num: Some(self.col_num.into()),
184            type_oid: self.type_oid,
185            type_mod: self.type_mod,
186            nullable: self.nullable,
187        }
188    }
189
190    fn from_proto(proto: ProtoPostgresColumnDesc) -> Result<Self, TryFromProtoError> {
191        Ok(PostgresColumnDesc {
192            name: proto.name,
193            col_num: {
194                let v: u32 = proto
195                    .col_num
196                    .into_rust_if_some("ProtoPostgresColumnDesc::col_num")?;
197                u16::try_from(v).expect("u16 must roundtrip")
198            },
199            type_oid: proto.type_oid,
200            type_mod: proto.type_mod,
201            nullable: proto.nullable,
202        })
203    }
204}
205
206/// Describes a key in a [`PostgresTableDesc`].
207#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, PartialOrd, Ord, Arbitrary)]
208pub struct PostgresKeyDesc {
209    /// This key is derived from the `pg_constraint` with this OID.
210    pub oid: Oid,
211    /// The name of the constraints.
212    pub name: String,
213    /// The `attnum` of the columns comprising the key. `attnum` is a unique identifier for a column
214    /// in a PG table; see <https://www.postgresql.org/docs/current/catalog-pg-attribute.html>
215    #[proptest(strategy = "proptest::collection::vec(any::<u16>(), 0..4)")]
216    pub cols: Vec<u16>,
217    /// Whether or not this key is the primary key.
218    pub is_primary: bool,
219    /// If this constraint was generated with NULLS NOT DISTINCT; see
220    /// <https://www.postgresql.org/about/featurematrix/detail/392/>
221    pub nulls_not_distinct: bool,
222}
223
224impl RustType<ProtoPostgresKeyDesc> for PostgresKeyDesc {
225    fn into_proto(&self) -> ProtoPostgresKeyDesc {
226        ProtoPostgresKeyDesc {
227            oid: self.oid,
228            name: self.name.clone(),
229            cols: self.cols.clone().into_iter().map(u32::from).collect(),
230            is_primary: self.is_primary,
231            nulls_not_distinct: self.nulls_not_distinct,
232        }
233    }
234
235    fn from_proto(proto: ProtoPostgresKeyDesc) -> Result<Self, TryFromProtoError> {
236        Ok(PostgresKeyDesc {
237            oid: proto.oid,
238            name: proto.name,
239            cols: proto
240                .cols
241                .into_iter()
242                .map(|c| c.try_into().expect("values roundtrip"))
243                .collect(),
244            is_primary: proto.is_primary,
245            nulls_not_distinct: proto.nulls_not_distinct,
246        })
247    }
248}