mz_postgres_util/
desc.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Descriptions of PostgreSQL objects.
11
12use std::collections::{BTreeMap, BTreeSet};
13
14use anyhow::bail;
15use mz_proto::{IntoRustIfSome, RustType, TryFromProtoError};
16use proptest::prelude::any;
17use proptest_derive::Arbitrary;
18use serde::{Deserialize, Serialize};
19use tokio_postgres::types::Oid;
20use tracing::warn;
21
22include!(concat!(env!("OUT_DIR"), "/mz_postgres_util.desc.rs"));
23
24/// Describes a schema in a PostgreSQL database.
25///
26/// <https://www.postgresql.org/docs/current/catalog-pg-namespace.html>
27#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)]
28pub struct PostgresSchemaDesc {
29    /// The OID of the schema.
30    pub oid: Oid,
31    /// The name of the schema.
32    pub name: String,
33    /// Owner of the namespace
34    pub owner: Oid,
35}
36
37/// Describes a table in a PostgreSQL database.
38#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, Arbitrary)]
39pub struct PostgresTableDesc {
40    /// The OID of the table.
41    pub oid: Oid,
42    /// The name of the schema that the table belongs to.
43    pub namespace: String,
44    /// The name of the table.
45    pub name: String,
46    /// The description of each column, in order of their position in the table.
47    #[proptest(strategy = "proptest::collection::vec(any::<PostgresColumnDesc>(), 1..4)")]
48    pub columns: Vec<PostgresColumnDesc>,
49    /// Applicable keys for this table (i.e. primary key and unique
50    /// constraints).
51    #[proptest(strategy = "proptest::collection::btree_set(any::<PostgresKeyDesc>(), 1..4)")]
52    pub keys: BTreeSet<PostgresKeyDesc>,
53}
54
55impl PostgresTableDesc {
56    /// Determines if two `PostgresTableDesc` are compatible with one another in
57    /// a way that Materialize can handle.
58    ///
59    /// Currently this means that the values are equal except for the following
60    /// exceptions:
61    /// - `self`'s columns are a compatible prefix of `other`'s columns.
62    ///   Compatibility is defined as returning `true` for
63    ///   `PostgresColumnDesc::is_compatible`.
64    /// - `self`'s keys are all present in `other`
65    pub fn determine_compatibility(
66        &self,
67        other: &PostgresTableDesc,
68        allow_type_to_change_by_col_num: &BTreeSet<u16>,
69    ) -> Result<(), anyhow::Error> {
70        if self == other {
71            return Ok(());
72        }
73
74        let PostgresTableDesc {
75            oid: other_oid,
76            namespace: other_namespace,
77            name: other_name,
78            columns: other_cols,
79            keys: other_keys,
80        } = other;
81
82        let other_cols_by_name = BTreeMap::from_iter(other_cols.iter().map(|c| (&c.name, c)));
83        let columns_compatible =
84            self.columns
85                .iter()
86                .all(|info| match other_cols_by_name.get(&info.name) {
87                    Some(other_info) => {
88                        let allow_type_change =
89                            allow_type_to_change_by_col_num.contains(&info.col_num);
90                        info.is_compatible(other_info, allow_type_change)
91                    }
92                    None => false,
93                });
94
95        if columns_compatible
96            && &self.name == other_name
97            && &self.oid == other_oid
98            && &self.namespace == other_namespace
99            // Our keys are all still present in exactly the same shape.
100            && self.keys.difference(other_keys).next().is_none()
101        {
102            Ok(())
103        } else {
104            warn!(
105                "Error validating table in publication. Expected: {:?} Actual: {:?}",
106                &self, other
107            );
108            bail!(
109                "source table {} with oid {} has been altered",
110                self.name,
111                self.oid
112            )
113        }
114    }
115}
116
117impl RustType<ProtoPostgresTableDesc> for PostgresTableDesc {
118    fn into_proto(&self) -> ProtoPostgresTableDesc {
119        ProtoPostgresTableDesc {
120            oid: self.oid,
121            namespace: self.namespace.clone(),
122            name: self.name.clone(),
123            columns: self.columns.iter().map(|c| c.into_proto()).collect(),
124            keys: self.keys.iter().map(PostgresKeyDesc::into_proto).collect(),
125        }
126    }
127
128    fn from_proto(proto: ProtoPostgresTableDesc) -> Result<Self, TryFromProtoError> {
129        Ok(PostgresTableDesc {
130            oid: proto.oid,
131            namespace: proto.namespace.clone(),
132            name: proto.name.clone(),
133            columns: proto
134                .columns
135                .into_iter()
136                .map(PostgresColumnDesc::from_proto)
137                .collect::<Result<_, _>>()?,
138            keys: proto
139                .keys
140                .into_iter()
141                .map(PostgresKeyDesc::from_proto)
142                .collect::<Result<_, _>>()?,
143        })
144    }
145}
146
147/// Describes a column in a [`PostgresTableDesc`].
148#[derive(Debug, Clone, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize, Arbitrary)]
149pub struct PostgresColumnDesc {
150    /// The name of the column.
151    pub name: String,
152    /// The column's monotonic position in its table, i.e. "this was the _i_th
153    /// column created" irrespective of the current number of columns.
154    pub col_num: u16,
155    /// The OID of the column's type.
156    pub type_oid: Oid,
157    /// The modifier for the column's type.
158    pub type_mod: i32,
159    /// True if the column lacks a `NOT NULL` constraint.
160    pub nullable: bool,
161}
162
163impl PostgresColumnDesc {
164    /// Determines if data a relation with a structure of `other` can be treated
165    /// the same as `self`.
166    ///
167    /// Note that this function somewhat unnecessarily errors if the names
168    /// differ; this is negotiable but we want users to understand the fixedness
169    /// of names in our schemas.
170    fn is_compatible(&self, other: &PostgresColumnDesc, allow_type_change: bool) -> bool {
171        self.name == other.name
172            && self.col_num == other.col_num
173            && (self.type_oid == other.type_oid || allow_type_change)
174            && (self.type_mod == other.type_mod || allow_type_change)
175            // Columns are compatible if:
176            // - self is nullable; introducing a not null constraint doesn't
177            //   change this column's behavior.
178            // - self and other are both not nullable
179            && (self.nullable || self.nullable == other.nullable)
180    }
181}
182
183impl RustType<ProtoPostgresColumnDesc> for PostgresColumnDesc {
184    fn into_proto(&self) -> ProtoPostgresColumnDesc {
185        ProtoPostgresColumnDesc {
186            name: self.name.clone(),
187            col_num: Some(self.col_num.into()),
188            type_oid: self.type_oid,
189            type_mod: self.type_mod,
190            nullable: self.nullable,
191        }
192    }
193
194    fn from_proto(proto: ProtoPostgresColumnDesc) -> Result<Self, TryFromProtoError> {
195        Ok(PostgresColumnDesc {
196            name: proto.name,
197            col_num: {
198                let v: u32 = proto
199                    .col_num
200                    .into_rust_if_some("ProtoPostgresColumnDesc::col_num")?;
201                u16::try_from(v).expect("u16 must roundtrip")
202            },
203            type_oid: proto.type_oid,
204            type_mod: proto.type_mod,
205            nullable: proto.nullable,
206        })
207    }
208}
209
210/// Describes a key in a [`PostgresTableDesc`].
211#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize, PartialOrd, Ord, Arbitrary)]
212pub struct PostgresKeyDesc {
213    /// This key is derived from the `pg_constraint` with this OID.
214    pub oid: Oid,
215    /// The name of the constraints.
216    pub name: String,
217    /// The `attnum` of the columns comprising the key. `attnum` is a unique identifier for a column
218    /// in a PG table; see <https://www.postgresql.org/docs/current/catalog-pg-attribute.html>
219    #[proptest(strategy = "proptest::collection::vec(any::<u16>(), 0..4)")]
220    pub cols: Vec<u16>,
221    /// Whether or not this key is the primary key.
222    pub is_primary: bool,
223    /// If this constraint was generated with NULLS NOT DISTINCT; see
224    /// <https://www.postgresql.org/about/featurematrix/detail/392/>
225    pub nulls_not_distinct: bool,
226}
227
228impl RustType<ProtoPostgresKeyDesc> for PostgresKeyDesc {
229    fn into_proto(&self) -> ProtoPostgresKeyDesc {
230        ProtoPostgresKeyDesc {
231            oid: self.oid,
232            name: self.name.clone(),
233            cols: self.cols.clone().into_iter().map(u32::from).collect(),
234            is_primary: self.is_primary,
235            nulls_not_distinct: self.nulls_not_distinct,
236        }
237    }
238
239    fn from_proto(proto: ProtoPostgresKeyDesc) -> Result<Self, TryFromProtoError> {
240        Ok(PostgresKeyDesc {
241            oid: proto.oid,
242            name: proto.name,
243            cols: proto
244                .cols
245                .into_iter()
246                .map(|c| c.try_into().expect("values roundtrip"))
247                .collect(),
248            is_primary: proto.is_primary,
249            nulls_not_distinct: proto.nulls_not_distinct,
250        })
251    }
252}