mz_postgres_util/
schemas.rs1use std::collections::{BTreeMap, BTreeSet};
13
14use tokio_postgres::Client;
15use tokio_postgres::types::Oid;
16
17use crate::desc::{PostgresColumnDesc, PostgresKeyDesc, PostgresSchemaDesc, PostgresTableDesc};
18use crate::{PostgresError, query, simple_query_opt};
19
20pub async fn get_schemas(client: &Client) -> Result<Vec<PostgresSchemaDesc>, PostgresError> {
21 Ok(query(
22 client,
23 crate::sql!("SELECT oid, nspname, nspowner FROM pg_namespace"),
24 &[],
25 )
26 .await?
27 .into_iter()
28 .map(|row| {
29 let oid: Oid = row.get("oid");
30 let name: String = row.get("nspname");
31 let owner: Oid = row.get("nspowner");
32 PostgresSchemaDesc { oid, name, owner }
33 })
34 .collect::<Vec<_>>())
35}
36
37pub async fn get_pg_major_version(client: &Client) -> Result<u32, PostgresError> {
39 let row = simple_query_opt(
43 client,
44 crate::sql!(
45 "SELECT pg_catalog.current_setting('server_version_num') AS server_version_num"
46 ),
47 )
48 .await?;
49 let version_num: u32 = row
50 .and_then(|r| r.get("server_version_num").map(|s| s.parse().ok()))
51 .flatten()
52 .ok_or_else(|| {
53 PostgresError::Generic(anyhow::anyhow!("failed to get PostgreSQL version"))
54 })?;
55 Ok(version_num / 10000)
58}
59
60pub async fn publication_info(
73 client: &Client,
74 publication: &str,
75 oids: Option<&[Oid]>,
76) -> Result<BTreeMap<Oid, PostgresTableDesc>, PostgresError> {
77 let server_major_version = get_pg_major_version(client).await?;
78
79 query(
80 client,
81 crate::sql!("SELECT oid FROM pg_publication WHERE pubname = $1"),
82 &[&publication],
83 )
84 .await?
85 .get(0)
86 .ok_or_else(|| PostgresError::PublicationMissing(publication.to_string()))?;
87
88 let tables = if let Some(oids) = oids {
89 query(
90 client,
91 crate::sql!(
92 "SELECT
93 c.oid, p.schemaname, p.tablename
94 FROM
95 pg_catalog.pg_class AS c
96 JOIN pg_namespace AS n ON c.relnamespace = n.oid
97 JOIN pg_publication_tables AS p ON
98 c.relname = p.tablename AND n.nspname = p.schemaname
99 WHERE
100 p.pubname = $1
101 AND c.oid = ANY ($2)"
102 ),
103 &[&publication, &oids],
104 )
105 .await
106 } else {
107 query(
108 client,
109 crate::sql!(
110 "SELECT
111 c.oid, p.schemaname, p.tablename
112 FROM
113 pg_catalog.pg_class AS c
114 JOIN pg_namespace AS n ON c.relnamespace = n.oid
115 JOIN pg_publication_tables AS p ON
116 c.relname = p.tablename AND n.nspname = p.schemaname
117 WHERE
118 p.pubname = $1"
119 ),
120 &[&publication],
121 )
122 .await
123 }?;
124
125 let pg_columns = if server_major_version >= 12 {
129 crate::sql!(
130 "
131 SELECT
132 a.attrelid AS table_oid,
133 a.attname AS name,
134 a.atttypid AS typoid,
135 a.attnum AS colnum,
136 a.atttypmod AS typmod,
137 a.attnotnull AS not_null,
138 b.oid IS NOT NULL AS primary_key
139 FROM pg_catalog.pg_attribute a
140 LEFT JOIN pg_catalog.pg_constraint b
141 ON a.attrelid = b.conrelid
142 AND b.contype = 'p'
143 AND a.attnum = ANY (b.conkey)
144 WHERE a.attnum > 0::pg_catalog.int2
145 AND NOT a.attisdropped
146 AND a.attgenerated = ''
147 AND a.attrelid = ANY ($1)
148 ORDER BY a.attnum"
149 )
150 } else {
151 crate::sql!(
152 "
153 SELECT
154 a.attrelid AS table_oid,
155 a.attname AS name,
156 a.atttypid AS typoid,
157 a.attnum AS colnum,
158 a.atttypmod AS typmod,
159 a.attnotnull AS not_null,
160 b.oid IS NOT NULL AS primary_key
161 FROM pg_catalog.pg_attribute a
162 LEFT JOIN pg_catalog.pg_constraint b
163 ON a.attrelid = b.conrelid
164 AND b.contype = 'p'
165 AND a.attnum = ANY (b.conkey)
166 WHERE a.attnum > 0::pg_catalog.int2
167 AND NOT a.attisdropped
168 AND true
169 AND a.attrelid = ANY ($1)
170 ORDER BY a.attnum"
171 )
172 };
173
174 let table_oids = tables
175 .iter()
176 .map(|row| row.get("oid"))
177 .collect::<Vec<Oid>>();
178
179 let mut columns: BTreeMap<Oid, Vec<_>> = BTreeMap::new();
180 for row in query(client, pg_columns, &[&table_oids]).await? {
181 let table_oid: Oid = row.get("table_oid");
182 let name: String = row.get("name");
183 let type_oid = row.get("typoid");
184 let col_num = row
185 .get::<_, i16>("colnum")
186 .try_into()
187 .expect("non-negative values");
188 let type_mod: i32 = row.get("typmod");
189 let not_null: bool = row.get("not_null");
190 let desc = PostgresColumnDesc {
191 name,
192 col_num,
193 type_oid,
194 type_mod,
195 nullable: !not_null,
196 };
197 columns.entry(table_oid).or_default().push(desc);
198 }
199
200 let pg_keys = if server_major_version >= 15 {
204 crate::sql!(
205 "
206 SELECT
207 pg_constraint.conrelid AS table_oid,
208 pg_constraint.oid,
209 pg_constraint.conkey,
210 pg_constraint.conname,
211 pg_constraint.contype = 'p' AS is_primary,
212 pg_index.indnullsnotdistinct AS nulls_not_distinct
213 FROM
214 pg_constraint
215 JOIN
216 pg_index
217 ON pg_index.indexrelid = pg_constraint.conindid
218 WHERE
219 pg_constraint.conrelid = ANY ($1)
220 AND
221 pg_constraint.contype = ANY (ARRAY['p', 'u']);"
222 )
223 } else {
224 crate::sql!(
225 "
226 SELECT
227 pg_constraint.conrelid AS table_oid,
228 pg_constraint.oid,
229 pg_constraint.conkey,
230 pg_constraint.conname,
231 pg_constraint.contype = 'p' AS is_primary,
232 false AS nulls_not_distinct
233 FROM
234 pg_constraint
235 JOIN
236 pg_index
237 ON pg_index.indexrelid = pg_constraint.conindid
238 WHERE
239 pg_constraint.conrelid = ANY ($1)
240 AND
241 pg_constraint.contype = ANY (ARRAY['p', 'u']);"
242 )
243 };
244
245 let mut keys: BTreeMap<Oid, BTreeSet<_>> = BTreeMap::new();
246 for row in query(client, pg_keys, &[&table_oids]).await? {
247 let table_oid: Oid = row.get("table_oid");
248 let oid: Oid = row.get("oid");
249 let cols: Vec<i16> = row.get("conkey");
250 let name: String = row.get("conname");
251 let is_primary: bool = row.get("is_primary");
252 let nulls_not_distinct: bool = row.get("nulls_not_distinct");
253 let cols = cols
254 .into_iter()
255 .map(|col| u16::try_from(col).expect("non-negative colnums"))
256 .collect();
257 let desc = PostgresKeyDesc {
258 oid,
259 name,
260 cols,
261 is_primary,
262 nulls_not_distinct,
263 };
264 keys.entry(table_oid).or_default().insert(desc);
265 }
266
267 Ok(tables
268 .into_iter()
269 .map(|row| {
270 let oid: Oid = row.get("oid");
271 let columns = columns.remove(&oid).unwrap_or_default();
272 let keys = keys.remove(&oid).unwrap_or_default();
273 let desc = PostgresTableDesc {
274 oid,
275 namespace: row.get("schemaname"),
276 name: row.get("tablename"),
277 columns,
278 keys,
279 };
280 (oid, desc)
281 })
282 .collect())
283}