Skip to main content

mz_postgres_util/
query.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10#![allow(clippy::disallowed_methods)]
11
12pub use mz_ore::sql::{Sql, SqlFormatError, SqlTemplateError, sql_template_placeholder_count};
13
14use tokio_postgres::types::ToSql;
15use tokio_postgres::{Client, GenericClient, Row, SimpleQueryMessage, SimpleQueryRow, Statement};
16
17use crate::PostgresError;
18
19/// Runs the given query using the client and expects at most a single row to be returned.
20pub async fn simple_query_opt(
21    client: &Client,
22    query: Sql,
23) -> Result<Option<SimpleQueryRow>, PostgresError> {
24    let result = simple_query(client, query).await?;
25    let mut rows = result.into_iter().filter_map(|msg| match msg {
26        SimpleQueryMessage::Row(row) => Some(row),
27        _ => None,
28    });
29    match (rows.next(), rows.next()) {
30        (Some(row), None) => Ok(Some(row)),
31        (None, None) => Ok(None),
32        _ => Err(PostgresError::UnexpectedRow),
33    }
34}
35
36/// Runs a simple query and returns all protocol messages.
37pub async fn simple_query(
38    client: &Client,
39    query: Sql,
40) -> Result<Vec<SimpleQueryMessage>, PostgresError> {
41    Ok(client.simple_query(query.as_str()).await?)
42}
43
44/// Runs a query and returns all resulting rows.
45pub async fn query<C: GenericClient + Sync>(
46    client: &C,
47    query: Sql,
48    params: &[&(dyn ToSql + Sync)],
49) -> Result<Vec<Row>, PostgresError> {
50    Ok(client.query(query.as_str(), params).await?)
51}
52
53/// Runs a prepared query and returns all resulting rows.
54pub async fn query_prepared<C: GenericClient + Sync>(
55    client: &C,
56    statement: &Statement,
57    params: &[&(dyn ToSql + Sync)],
58) -> Result<Vec<Row>, PostgresError> {
59    Ok(client.query(statement, params).await?)
60}
61
62/// Runs a query and returns exactly one row.
63pub async fn query_one<C: GenericClient + Sync>(
64    client: &C,
65    query: Sql,
66    params: &[&(dyn ToSql + Sync)],
67) -> Result<Row, PostgresError> {
68    Ok(client.query_one(query.as_str(), params).await?)
69}
70
71/// Runs a prepared query and returns exactly one row.
72pub async fn query_one_prepared<C: GenericClient + Sync>(
73    client: &C,
74    statement: &Statement,
75    params: &[&(dyn ToSql + Sync)],
76) -> Result<Row, PostgresError> {
77    Ok(client.query_one(statement, params).await?)
78}
79
80/// Runs a query and returns at most one row.
81pub async fn query_opt<C: GenericClient + Sync>(
82    client: &C,
83    query: Sql,
84    params: &[&(dyn ToSql + Sync)],
85) -> Result<Option<Row>, PostgresError> {
86    Ok(client.query_opt(query.as_str(), params).await?)
87}
88
89/// Runs a prepared query and returns at most one row.
90pub async fn query_opt_prepared<C: GenericClient + Sync>(
91    client: &C,
92    statement: &Statement,
93    params: &[&(dyn ToSql + Sync)],
94) -> Result<Option<Row>, PostgresError> {
95    Ok(client.query_opt(statement, params).await?)
96}
97
98/// Runs a query and returns the number of affected rows.
99pub async fn execute<C: GenericClient + Sync>(
100    client: &C,
101    query: Sql,
102    params: &[&(dyn ToSql + Sync)],
103) -> Result<u64, PostgresError> {
104    Ok(client.execute(query.as_str(), params).await?)
105}
106
107/// Runs a prepared query and returns the number of affected rows.
108pub async fn execute_prepared<C: GenericClient + Sync>(
109    client: &C,
110    statement: &Statement,
111    params: &[&(dyn ToSql + Sync)],
112) -> Result<u64, PostgresError> {
113    Ok(client.execute(statement, params).await?)
114}
115
116/// Runs one or more SQL statements with no returned rows.
117pub async fn batch_execute<C: GenericClient + Sync>(
118    client: &C,
119    query: Sql,
120) -> Result<(), PostgresError> {
121    Ok(client.batch_execute(query.as_str()).await?)
122}