tokio_postgres/
simple_query.rs
1use crate::client::{InnerClient, Responses};
2use crate::codec::FrontendMessage;
3use crate::connection::RequestMessages;
4use crate::query::extract_row_affected;
5use crate::{Error, SimpleQueryMessage, SimpleQueryRow};
6use bytes::Bytes;
7use fallible_iterator::FallibleIterator;
8use futures_util::{ready, Stream};
9use log::debug;
10use pin_project_lite::pin_project;
11use postgres_protocol::message::backend::Message;
12use postgres_protocol::message::frontend;
13use std::marker::PhantomPinned;
14use std::pin::Pin;
15use std::sync::Arc;
16use std::task::{Context, Poll};
17
18#[derive(Debug)]
20pub struct SimpleColumn {
21 name: String,
22}
23
24impl SimpleColumn {
25 pub(crate) fn new(name: String) -> SimpleColumn {
26 SimpleColumn { name }
27 }
28
29 pub fn name(&self) -> &str {
31 &self.name
32 }
33}
34
35pub async fn simple_query(client: &InnerClient, query: &str) -> Result<SimpleQueryStream, Error> {
36 debug!("executing simple query: {}", query);
37
38 let buf = encode(client, query)?;
39 let responses = client.send(RequestMessages::Single(FrontendMessage::Raw(buf)))?;
40
41 Ok(SimpleQueryStream {
42 responses,
43 columns: None,
44 _p: PhantomPinned,
45 })
46}
47
48pub async fn batch_execute(client: &InnerClient, query: &str) -> Result<(), Error> {
49 debug!("executing statement batch: {}", query);
50
51 let buf = encode(client, query)?;
52 let mut responses = client.send(RequestMessages::Single(FrontendMessage::Raw(buf)))?;
53
54 loop {
55 match responses.next().await? {
56 Message::ReadyForQuery(_) => return Ok(()),
57 Message::CommandComplete(_)
58 | Message::EmptyQueryResponse
59 | Message::RowDescription(_)
60 | Message::DataRow(_) => {}
61 _ => return Err(Error::unexpected_message()),
62 }
63 }
64}
65
66pub(crate) fn encode(client: &InnerClient, query: &str) -> Result<Bytes, Error> {
67 client.with_buf(|buf| {
68 frontend::query(query, buf).map_err(Error::encode)?;
69 Ok(buf.split().freeze())
70 })
71}
72
73pin_project! {
74 pub struct SimpleQueryStream {
76 responses: Responses,
77 columns: Option<Arc<[SimpleColumn]>>,
78 #[pin]
79 _p: PhantomPinned,
80 }
81}
82
83impl Stream for SimpleQueryStream {
84 type Item = Result<SimpleQueryMessage, Error>;
85
86 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
87 let this = self.project();
88 match ready!(this.responses.poll_next(cx)?) {
89 Message::CommandComplete(body) => {
90 let rows = extract_row_affected(&body)?;
91 Poll::Ready(Some(Ok(SimpleQueryMessage::CommandComplete(rows))))
92 }
93 Message::EmptyQueryResponse => {
94 Poll::Ready(Some(Ok(SimpleQueryMessage::CommandComplete(0))))
95 }
96 Message::RowDescription(body) => {
97 let columns: Arc<[SimpleColumn]> = body
98 .fields()
99 .map(|f| Ok(SimpleColumn::new(f.name().to_string())))
100 .collect::<Vec<_>>()
101 .map_err(Error::parse)?
102 .into();
103
104 *this.columns = Some(columns.clone());
105 Poll::Ready(Some(Ok(SimpleQueryMessage::RowDescription(columns))))
106 }
107 Message::DataRow(body) => {
108 let row = match &this.columns {
109 Some(columns) => SimpleQueryRow::new(columns.clone(), body)?,
110 None => return Poll::Ready(Some(Err(Error::unexpected_message()))),
111 };
112 Poll::Ready(Some(Ok(SimpleQueryMessage::Row(row))))
113 }
114 Message::ReadyForQuery(_) => Poll::Ready(None),
115 _ => Poll::Ready(Some(Err(Error::unexpected_message()))),
116 }
117 }
118}