Skip to main content

mz_postgres_util/
schemas.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Utilities to fetch schema information for Postgres sources.
11
12use std::collections::{BTreeMap, BTreeSet};
13
14use tokio_postgres::Client;
15use tokio_postgres::types::Oid;
16
17use crate::desc::{PostgresColumnDesc, PostgresKeyDesc, PostgresSchemaDesc, PostgresTableDesc};
18use crate::{PostgresError, query, simple_query_opt};
19
20pub async fn get_schemas(client: &Client) -> Result<Vec<PostgresSchemaDesc>, PostgresError> {
21    Ok(query(
22        client,
23        crate::sql!("SELECT oid, nspname, nspowner FROM pg_namespace"),
24        &[],
25    )
26    .await?
27    .into_iter()
28    .map(|row| {
29        let oid: Oid = row.get("oid");
30        let name: String = row.get("nspname");
31        let owner: Oid = row.get("nspowner");
32        PostgresSchemaDesc { oid, name, owner }
33    })
34    .collect::<Vec<_>>())
35}
36
37/// Get the major version of the PostgreSQL server.
38pub async fn get_pg_major_version(client: &Client) -> Result<u32, PostgresError> {
39    // server_version_num is an integer like 140005 for version 14.5
40    // NOTE: We use the statement SELECT instead of SHOW because older Aurora
41    // versions don't support SHOW via a replication channel.
42    let row = simple_query_opt(
43        client,
44        crate::sql!(
45            "SELECT pg_catalog.current_setting('server_version_num') AS server_version_num"
46        ),
47    )
48    .await?;
49    let version_num: u32 = row
50        .and_then(|r| r.get("server_version_num").map(|s| s.parse().ok()))
51        .flatten()
52        .ok_or_else(|| {
53            PostgresError::Generic(anyhow::anyhow!("failed to get PostgreSQL version"))
54        })?;
55    // server_version_num format: XXYYZZ where XX is major, YY is minor, ZZ is patch
56    // For PG >= 10, it's XXXYYZZ (3 digit major)
57    Ok(version_num / 10000)
58}
59
60/// Fetches table schema information from an upstream Postgres source for tables
61/// that are part of a publication, given a connection string and the
62/// publication name. Returns a map from table OID to table schema information.
63///
64/// The `oids` parameter controls for which tables to fetch schema information. If `None`,
65/// schema information for all tables in the publication is fetched. If `Some`, only
66/// schema information for the tables with the specified OIDs is fetched.
67///
68/// # Errors
69///
70/// - Invalid connection string, user information, or user permissions.
71/// - Upstream publication does not exist or contains invalid values.
72pub async fn publication_info(
73    client: &Client,
74    publication: &str,
75    oids: Option<&[Oid]>,
76) -> Result<BTreeMap<Oid, PostgresTableDesc>, PostgresError> {
77    let server_major_version = get_pg_major_version(client).await?;
78
79    query(
80        client,
81        crate::sql!("SELECT oid FROM pg_publication WHERE pubname = $1"),
82        &[&publication],
83    )
84    .await?
85    .get(0)
86    .ok_or_else(|| PostgresError::PublicationMissing(publication.to_string()))?;
87
88    let tables = if let Some(oids) = oids {
89        query(
90            client,
91            crate::sql!(
92                "SELECT
93                    c.oid, p.schemaname, p.tablename
94                FROM
95                    pg_catalog.pg_class AS c
96                    JOIN pg_namespace AS n ON c.relnamespace = n.oid
97                    JOIN pg_publication_tables AS p ON
98                            c.relname = p.tablename AND n.nspname = p.schemaname
99                WHERE
100                    p.pubname = $1
101                    AND c.oid = ANY ($2)"
102            ),
103            &[&publication, &oids],
104        )
105        .await
106    } else {
107        query(
108            client,
109            crate::sql!(
110                "SELECT
111                    c.oid, p.schemaname, p.tablename
112                FROM
113                    pg_catalog.pg_class AS c
114                    JOIN pg_namespace AS n ON c.relnamespace = n.oid
115                    JOIN pg_publication_tables AS p ON
116                            c.relname = p.tablename AND n.nspname = p.schemaname
117                WHERE
118                    p.pubname = $1"
119            ),
120            &[&publication],
121        )
122        .await
123    }?;
124
125    // The Postgres replication protocol does not support GENERATED columns
126    // so we exclude them from this query. But not all Postgres-like
127    // databases have the `pg_attribute.attgenerated` column.
128    let pg_columns = if server_major_version >= 12 {
129        crate::sql!(
130            "
131        SELECT
132            a.attrelid AS table_oid,
133            a.attname AS name,
134            a.atttypid AS typoid,
135            a.attnum AS colnum,
136            a.atttypmod AS typmod,
137            a.attnotnull AS not_null,
138            b.oid IS NOT NULL AS primary_key
139        FROM pg_catalog.pg_attribute a
140        LEFT JOIN pg_catalog.pg_constraint b
141            ON a.attrelid = b.conrelid
142            AND b.contype = 'p'
143            AND a.attnum = ANY (b.conkey)
144        WHERE a.attnum > 0::pg_catalog.int2
145            AND NOT a.attisdropped
146            AND a.attgenerated = ''
147            AND a.attrelid = ANY ($1)
148        ORDER BY a.attnum"
149        )
150    } else {
151        crate::sql!(
152            "
153        SELECT
154            a.attrelid AS table_oid,
155            a.attname AS name,
156            a.atttypid AS typoid,
157            a.attnum AS colnum,
158            a.atttypmod AS typmod,
159            a.attnotnull AS not_null,
160            b.oid IS NOT NULL AS primary_key
161        FROM pg_catalog.pg_attribute a
162        LEFT JOIN pg_catalog.pg_constraint b
163            ON a.attrelid = b.conrelid
164            AND b.contype = 'p'
165            AND a.attnum = ANY (b.conkey)
166        WHERE a.attnum > 0::pg_catalog.int2
167            AND NOT a.attisdropped
168            AND true
169            AND a.attrelid = ANY ($1)
170        ORDER BY a.attnum"
171        )
172    };
173
174    let table_oids = tables
175        .iter()
176        .map(|row| row.get("oid"))
177        .collect::<Vec<Oid>>();
178
179    let mut columns: BTreeMap<Oid, Vec<_>> = BTreeMap::new();
180    for row in query(client, pg_columns, &[&table_oids]).await? {
181        let table_oid: Oid = row.get("table_oid");
182        let name: String = row.get("name");
183        let type_oid = row.get("typoid");
184        let col_num = row
185            .get::<_, i16>("colnum")
186            .try_into()
187            .expect("non-negative values");
188        let type_mod: i32 = row.get("typmod");
189        let not_null: bool = row.get("not_null");
190        let desc = PostgresColumnDesc {
191            name,
192            col_num,
193            type_oid,
194            type_mod,
195            nullable: !not_null,
196        };
197        columns.entry(table_oid).or_default().push(desc);
198    }
199
200    // PG 15 adds UNIQUE NULLS NOT DISTINCT, which would let us use `UNIQUE` constraints over
201    // nullable columns as keys; i.e. aligns a PG index's NULL handling with an arrangement's
202    // keys. For more info, see https://www.postgresql.org/about/featurematrix/detail/392/
203    let pg_keys = if server_major_version >= 15 {
204        crate::sql!(
205            "
206        SELECT
207            pg_constraint.conrelid AS table_oid,
208            pg_constraint.oid,
209            pg_constraint.conkey,
210            pg_constraint.conname,
211            pg_constraint.contype = 'p' AS is_primary,
212            pg_index.indnullsnotdistinct AS nulls_not_distinct
213        FROM
214            pg_constraint
215                JOIN
216                    pg_index
217                    ON pg_index.indexrelid = pg_constraint.conindid
218        WHERE
219            pg_constraint.conrelid = ANY ($1)
220                AND
221            pg_constraint.contype = ANY (ARRAY['p', 'u']);"
222        )
223    } else {
224        crate::sql!(
225            "
226        SELECT
227            pg_constraint.conrelid AS table_oid,
228            pg_constraint.oid,
229            pg_constraint.conkey,
230            pg_constraint.conname,
231            pg_constraint.contype = 'p' AS is_primary,
232            false AS nulls_not_distinct
233        FROM
234            pg_constraint
235                JOIN
236                    pg_index
237                    ON pg_index.indexrelid = pg_constraint.conindid
238        WHERE
239            pg_constraint.conrelid = ANY ($1)
240                AND
241            pg_constraint.contype = ANY (ARRAY['p', 'u']);"
242        )
243    };
244
245    let mut keys: BTreeMap<Oid, BTreeSet<_>> = BTreeMap::new();
246    for row in query(client, pg_keys, &[&table_oids]).await? {
247        let table_oid: Oid = row.get("table_oid");
248        let oid: Oid = row.get("oid");
249        let cols: Vec<i16> = row.get("conkey");
250        let name: String = row.get("conname");
251        let is_primary: bool = row.get("is_primary");
252        let nulls_not_distinct: bool = row.get("nulls_not_distinct");
253        let cols = cols
254            .into_iter()
255            .map(|col| u16::try_from(col).expect("non-negative colnums"))
256            .collect();
257        let desc = PostgresKeyDesc {
258            oid,
259            name,
260            cols,
261            is_primary,
262            nulls_not_distinct,
263        };
264        keys.entry(table_oid).or_default().insert(desc);
265    }
266
267    Ok(tables
268        .into_iter()
269        .map(|row| {
270            let oid: Oid = row.get("oid");
271            let columns = columns.remove(&oid).unwrap_or_default();
272            let keys = keys.remove(&oid).unwrap_or_default();
273            let desc = PostgresTableDesc {
274                oid,
275                namespace: row.get("schemaname"),
276                name: row.get("tablename"),
277                columns,
278                keys,
279            };
280            (oid, desc)
281        })
282        .collect())
283}