mz_postgres_util/
query.rs1#![allow(clippy::disallowed_methods)]
11
12pub use mz_ore::sql::{Sql, SqlFormatError, SqlTemplateError, sql_template_placeholder_count};
13
14use tokio_postgres::types::ToSql;
15use tokio_postgres::{Client, GenericClient, Row, SimpleQueryMessage, SimpleQueryRow, Statement};
16
17use crate::PostgresError;
18
19pub async fn simple_query_opt(
21 client: &Client,
22 query: Sql,
23) -> Result<Option<SimpleQueryRow>, PostgresError> {
24 let result = simple_query(client, query).await?;
25 let mut rows = result.into_iter().filter_map(|msg| match msg {
26 SimpleQueryMessage::Row(row) => Some(row),
27 _ => None,
28 });
29 match (rows.next(), rows.next()) {
30 (Some(row), None) => Ok(Some(row)),
31 (None, None) => Ok(None),
32 _ => Err(PostgresError::UnexpectedRow),
33 }
34}
35
36pub async fn simple_query(
38 client: &Client,
39 query: Sql,
40) -> Result<Vec<SimpleQueryMessage>, PostgresError> {
41 Ok(client.simple_query(query.as_str()).await?)
42}
43
44pub async fn query<C: GenericClient + Sync>(
46 client: &C,
47 query: Sql,
48 params: &[&(dyn ToSql + Sync)],
49) -> Result<Vec<Row>, PostgresError> {
50 Ok(client.query(query.as_str(), params).await?)
51}
52
53pub async fn query_prepared<C: GenericClient + Sync>(
55 client: &C,
56 statement: &Statement,
57 params: &[&(dyn ToSql + Sync)],
58) -> Result<Vec<Row>, PostgresError> {
59 Ok(client.query(statement, params).await?)
60}
61
62pub async fn query_one<C: GenericClient + Sync>(
64 client: &C,
65 query: Sql,
66 params: &[&(dyn ToSql + Sync)],
67) -> Result<Row, PostgresError> {
68 Ok(client.query_one(query.as_str(), params).await?)
69}
70
71pub async fn query_one_prepared<C: GenericClient + Sync>(
73 client: &C,
74 statement: &Statement,
75 params: &[&(dyn ToSql + Sync)],
76) -> Result<Row, PostgresError> {
77 Ok(client.query_one(statement, params).await?)
78}
79
80pub async fn query_opt<C: GenericClient + Sync>(
82 client: &C,
83 query: Sql,
84 params: &[&(dyn ToSql + Sync)],
85) -> Result<Option<Row>, PostgresError> {
86 Ok(client.query_opt(query.as_str(), params).await?)
87}
88
89pub async fn query_opt_prepared<C: GenericClient + Sync>(
91 client: &C,
92 statement: &Statement,
93 params: &[&(dyn ToSql + Sync)],
94) -> Result<Option<Row>, PostgresError> {
95 Ok(client.query_opt(statement, params).await?)
96}
97
98pub async fn execute<C: GenericClient + Sync>(
100 client: &C,
101 query: Sql,
102 params: &[&(dyn ToSql + Sync)],
103) -> Result<u64, PostgresError> {
104 Ok(client.execute(query.as_str(), params).await?)
105}
106
107pub async fn execute_prepared<C: GenericClient + Sync>(
109 client: &C,
110 statement: &Statement,
111 params: &[&(dyn ToSql + Sync)],
112) -> Result<u64, PostgresError> {
113 Ok(client.execute(statement, params).await?)
114}
115
116pub async fn batch_execute<C: GenericClient + Sync>(
118 client: &C,
119 query: Sql,
120) -> Result<(), PostgresError> {
121 Ok(client.batch_execute(query.as_str()).await?)
122}