tokio_postgres/
simple_query.rs

1use crate::client::{InnerClient, Responses};
2use crate::codec::FrontendMessage;
3use crate::connection::RequestMessages;
4use crate::query::extract_row_affected;
5use crate::{Error, SimpleQueryMessage, SimpleQueryRow};
6use bytes::Bytes;
7use fallible_iterator::FallibleIterator;
8use futures_util::{ready, Stream};
9use log::debug;
10use pin_project_lite::pin_project;
11use postgres_protocol::message::backend::Message;
12use postgres_protocol::message::frontend;
13use std::marker::PhantomPinned;
14use std::pin::Pin;
15use std::sync::Arc;
16use std::task::{Context, Poll};
17
18/// Information about a column of a single query row.
19#[derive(Debug)]
20pub struct SimpleColumn {
21    name: String,
22}
23
24impl SimpleColumn {
25    pub(crate) fn new(name: String) -> SimpleColumn {
26        SimpleColumn { name }
27    }
28
29    /// Returns the name of the column.
30    pub fn name(&self) -> &str {
31        &self.name
32    }
33}
34
35pub async fn simple_query(client: &InnerClient, query: &str) -> Result<SimpleQueryStream, Error> {
36    debug!("executing simple query: {}", query);
37
38    let buf = encode(client, query)?;
39    let responses = client.send(RequestMessages::Single(FrontendMessage::Raw(buf)))?;
40
41    Ok(SimpleQueryStream {
42        responses,
43        columns: None,
44        _p: PhantomPinned,
45    })
46}
47
48pub async fn batch_execute(client: &InnerClient, query: &str) -> Result<(), Error> {
49    debug!("executing statement batch: {}", query);
50
51    let buf = encode(client, query)?;
52    let mut responses = client.send(RequestMessages::Single(FrontendMessage::Raw(buf)))?;
53
54    loop {
55        match responses.next().await? {
56            Message::ReadyForQuery(_) => return Ok(()),
57            Message::CommandComplete(_)
58            | Message::EmptyQueryResponse
59            | Message::RowDescription(_)
60            | Message::DataRow(_) => {}
61            _ => return Err(Error::unexpected_message()),
62        }
63    }
64}
65
66pub(crate) fn encode(client: &InnerClient, query: &str) -> Result<Bytes, Error> {
67    client.with_buf(|buf| {
68        frontend::query(query, buf).map_err(Error::encode)?;
69        Ok(buf.split().freeze())
70    })
71}
72
73pin_project! {
74    /// A stream of simple query results.
75    pub struct SimpleQueryStream {
76        responses: Responses,
77        columns: Option<Arc<[SimpleColumn]>>,
78        #[pin]
79        _p: PhantomPinned,
80    }
81}
82
83impl Stream for SimpleQueryStream {
84    type Item = Result<SimpleQueryMessage, Error>;
85
86    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
87        let this = self.project();
88        match ready!(this.responses.poll_next(cx)?) {
89            Message::CommandComplete(body) => {
90                let rows = extract_row_affected(&body)?;
91                Poll::Ready(Some(Ok(SimpleQueryMessage::CommandComplete(rows))))
92            }
93            Message::EmptyQueryResponse => {
94                Poll::Ready(Some(Ok(SimpleQueryMessage::CommandComplete(0))))
95            }
96            Message::RowDescription(body) => {
97                let columns: Arc<[SimpleColumn]> = body
98                    .fields()
99                    .map(|f| Ok(SimpleColumn::new(f.name().to_string())))
100                    .collect::<Vec<_>>()
101                    .map_err(Error::parse)?
102                    .into();
103
104                *this.columns = Some(columns.clone());
105                Poll::Ready(Some(Ok(SimpleQueryMessage::RowDescription(columns))))
106            }
107            Message::DataRow(body) => {
108                let row = match &this.columns {
109                    Some(columns) => SimpleQueryRow::new(columns.clone(), body)?,
110                    None => return Poll::Ready(Some(Err(Error::unexpected_message()))),
111                };
112                Poll::Ready(Some(Ok(SimpleQueryMessage::Row(row))))
113            }
114            Message::ReadyForQuery(_) => Poll::Ready(None),
115            _ => Poll::Ready(Some(Err(Error::unexpected_message()))),
116        }
117    }
118}