mz_postgres_util/
desc.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Descriptions of PostgreSQL objects.
11
12use std::collections::BTreeSet;
13
14use anyhow::bail;
15use itertools::Itertools;
16use mz_proto::{IntoRustIfSome, RustType, TryFromProtoError};
17use proptest::prelude::any;
18use proptest_derive::Arbitrary;
19use serde::{Deserialize, Serialize};
20use tokio_postgres::types::Oid;
21use tracing::warn;
22
23include!(concat!(env!("OUT_DIR"), "/mz_postgres_util.desc.rs"));
24
25/// Describes a schema in a PostgreSQL database.
26///
27/// <https://www.postgresql.org/docs/current/catalog-pg-namespace.html>
28#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
29pub struct PostgresSchemaDesc {
30    /// The OID of the schema.
31    pub oid: Oid,
32    /// The name of the schema.
33    pub name: String,
34    /// Owner of the namespace
35    pub owner: Oid,
36}
37
38/// Describes a table in a PostgreSQL database.
39#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, Arbitrary)]
40pub struct PostgresTableDesc {
41    /// The OID of the table.
42    pub oid: Oid,
43    /// The name of the schema that the table belongs to.
44    pub namespace: String,
45    /// The name of the table.
46    pub name: String,
47    /// The description of each column, in order of their position in the table.
48    #[proptest(strategy = "proptest::collection::vec(any::<PostgresColumnDesc>(), 1..4)")]
49    pub columns: Vec<PostgresColumnDesc>,
50    /// Applicable keys for this table (i.e. primary key and unique
51    /// constraints).
52    #[proptest(strategy = "proptest::collection::btree_set(any::<PostgresKeyDesc>(), 1..4)")]
53    pub keys: BTreeSet<PostgresKeyDesc>,
54}
55
56impl PostgresTableDesc {
57    /// Determines if two `PostgresTableDesc` are compatible with one another in
58    /// a way that Materialize can handle.
59    ///
60    /// Currently this means that the values are equal except for the following
61    /// exceptions:
62    /// - `self`'s columns are a compatible prefix of `other`'s columns.
63    ///   Compatibility is defined as returning `true` for
64    ///   `PostgresColumnDesc::is_compatible`.
65    /// - `self`'s keys are all present in `other`
66    pub fn determine_compatibility(
67        &self,
68        other: &PostgresTableDesc,
69        allow_type_to_change_by_col_num: &BTreeSet<u16>,
70    ) -> Result<(), anyhow::Error> {
71        if self == other {
72            return Ok(());
73        }
74
75        let PostgresTableDesc {
76            oid: other_oid,
77            namespace: other_namespace,
78            name: other_name,
79            columns: other_cols,
80            keys: other_keys,
81        } = other;
82
83        // Table columns cannot change position, so only need to ensure that
84        // `self.columns` is a prefix of `other_cols`.
85        if let Some(prefix) = other_cols.get(0..self.columns.len())
86            && self.columns.iter().zip_eq(prefix.iter()).all(|(s, o)| s.is_compatible(o, allow_type_to_change_by_col_num))
87            && &self.name == other_name
88            && &self.oid == other_oid
89            && &self.namespace == other_namespace
90            // Our keys are all still present in exactly the same shape.
91            && self.keys.difference(other_keys).next().is_none()
92        {
93            Ok(())
94        } else {
95            warn!(
96                "Error validating table in publication. Expected: {:?} Actual: {:?}",
97                &self, other
98            );
99            bail!(
100                "source table {} with oid {} has been altered",
101                self.name,
102                self.oid
103            )
104        }
105    }
106}
107
108impl RustType<ProtoPostgresTableDesc> for PostgresTableDesc {
109    fn into_proto(&self) -> ProtoPostgresTableDesc {
110        ProtoPostgresTableDesc {
111            oid: self.oid,
112            namespace: self.namespace.clone(),
113            name: self.name.clone(),
114            columns: self.columns.iter().map(|c| c.into_proto()).collect(),
115            keys: self.keys.iter().map(PostgresKeyDesc::into_proto).collect(),
116        }
117    }
118
119    fn from_proto(proto: ProtoPostgresTableDesc) -> Result<Self, TryFromProtoError> {
120        Ok(PostgresTableDesc {
121            oid: proto.oid,
122            namespace: proto.namespace.clone(),
123            name: proto.name.clone(),
124            columns: proto
125                .columns
126                .into_iter()
127                .map(PostgresColumnDesc::from_proto)
128                .collect::<Result<_, _>>()?,
129            keys: proto
130                .keys
131                .into_iter()
132                .map(PostgresKeyDesc::from_proto)
133                .collect::<Result<_, _>>()?,
134        })
135    }
136}
137
138/// Describes a column in a [`PostgresTableDesc`].
139#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, Arbitrary)]
140pub struct PostgresColumnDesc {
141    /// The name of the column.
142    pub name: String,
143    /// The column's monotonic position in its table, i.e. "this was the _i_th
144    /// column created" irrespective of the current number of columns.
145    pub col_num: u16,
146    /// The OID of the column's type.
147    pub type_oid: Oid,
148    /// The modifier for the column's type.
149    pub type_mod: i32,
150    /// True if the column lacks a `NOT NULL` constraint.
151    pub nullable: bool,
152}
153
154impl PostgresColumnDesc {
155    /// Determines if data a relation with a structure of `other` can be treated
156    /// the same as `self`.
157    ///
158    /// Note that this function somewhat unnecessarily errors if the names
159    /// differ; this is negotiable but we want users to understand the fixedness
160    /// of names in our schemas.
161    fn is_compatible(
162        &self,
163        other: &PostgresColumnDesc,
164        allow_type_to_change_by_col_num: &BTreeSet<u16>,
165    ) -> bool {
166        let allow_type_change = allow_type_to_change_by_col_num.contains(&self.col_num);
167
168        self.name == other.name
169            && self.col_num == other.col_num
170            && (self.type_oid == other.type_oid || allow_type_change)
171            && (self.type_mod == other.type_mod || allow_type_change)
172            // Columns are compatible if:
173            // - self is nullable; introducing a not null constraint doesn't
174            //   change this column's behavior.
175            // - self and other are both not nullable
176            && (self.nullable || self.nullable == other.nullable)
177    }
178}
179
180impl RustType<ProtoPostgresColumnDesc> for PostgresColumnDesc {
181    fn into_proto(&self) -> ProtoPostgresColumnDesc {
182        ProtoPostgresColumnDesc {
183            name: self.name.clone(),
184            col_num: Some(self.col_num.into()),
185            type_oid: self.type_oid,
186            type_mod: self.type_mod,
187            nullable: self.nullable,
188        }
189    }
190
191    fn from_proto(proto: ProtoPostgresColumnDesc) -> Result<Self, TryFromProtoError> {
192        Ok(PostgresColumnDesc {
193            name: proto.name,
194            col_num: {
195                let v: u32 = proto
196                    .col_num
197                    .into_rust_if_some("ProtoPostgresColumnDesc::col_num")?;
198                u16::try_from(v).expect("u16 must roundtrip")
199            },
200            type_oid: proto.type_oid,
201            type_mod: proto.type_mod,
202            nullable: proto.nullable,
203        })
204    }
205}
206
207/// Describes a key in a [`PostgresTableDesc`].
208#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, PartialOrd, Ord, Arbitrary)]
209pub struct PostgresKeyDesc {
210    /// This key is derived from the `pg_constraint` with this OID.
211    pub oid: Oid,
212    /// The name of the constraints.
213    pub name: String,
214    /// The `attnum` of the columns comprising the key. `attnum` is a unique identifier for a column
215    /// in a PG table; see <https://www.postgresql.org/docs/current/catalog-pg-attribute.html>
216    #[proptest(strategy = "proptest::collection::vec(any::<u16>(), 0..4)")]
217    pub cols: Vec<u16>,
218    /// Whether or not this key is the primary key.
219    pub is_primary: bool,
220    /// If this constraint was generated with NULLS NOT DISTINCT; see
221    /// <https://www.postgresql.org/about/featurematrix/detail/392/>
222    pub nulls_not_distinct: bool,
223}
224
225impl RustType<ProtoPostgresKeyDesc> for PostgresKeyDesc {
226    fn into_proto(&self) -> ProtoPostgresKeyDesc {
227        ProtoPostgresKeyDesc {
228            oid: self.oid,
229            name: self.name.clone(),
230            cols: self.cols.clone().into_iter().map(u32::from).collect(),
231            is_primary: self.is_primary,
232            nulls_not_distinct: self.nulls_not_distinct,
233        }
234    }
235
236    fn from_proto(proto: ProtoPostgresKeyDesc) -> Result<Self, TryFromProtoError> {
237        Ok(PostgresKeyDesc {
238            oid: proto.oid,
239            name: proto.name,
240            cols: proto
241                .cols
242                .into_iter()
243                .map(|c| c.try_into().expect("values roundtrip"))
244                .collect(),
245            is_primary: proto.is_primary,
246            nulls_not_distinct: proto.nulls_not_distinct,
247        })
248    }
249}