tokio_postgres/
copy_out.rs

1use crate::client::{InnerClient, Responses};
2use crate::codec::FrontendMessage;
3use crate::connection::RequestMessages;
4use crate::{query, simple_query, slice_iter, Error, Statement};
5use bytes::Bytes;
6use futures_util::Stream;
7use log::debug;
8use pin_project_lite::pin_project;
9use postgres_protocol::message::backend::Message;
10use std::pin::Pin;
11use std::task::{ready, Context, Poll};
12
13pub async fn copy_out_simple(client: &InnerClient, query: &str) -> Result<CopyOutStream, Error> {
14    debug!("executing copy out query {}", query);
15
16    let buf = simple_query::encode(client, query)?;
17    let responses = start(client, buf, true).await?;
18    Ok(CopyOutStream { responses })
19}
20
21pub async fn copy_out(client: &InnerClient, statement: Statement) -> Result<CopyOutStream, Error> {
22    debug!("executing copy out statement {}", statement.name());
23
24    let buf = query::encode(client, &statement, slice_iter(&[]))?;
25    let responses = start(client, buf, false).await?;
26    Ok(CopyOutStream { responses })
27}
28
29async fn start(client: &InnerClient, buf: Bytes, simple: bool) -> Result<Responses, Error> {
30    let mut responses = client.send(RequestMessages::Single(FrontendMessage::Raw(buf)))?;
31
32    if !simple {
33        match responses.next().await? {
34            Message::BindComplete => {}
35            _ => return Err(Error::unexpected_message()),
36        }
37    }
38
39    match responses.next().await? {
40        Message::CopyOutResponse(_) => {}
41        _ => return Err(Error::unexpected_message()),
42    }
43
44    Ok(responses)
45}
46
47pin_project! {
48    /// A stream of `COPY ... TO STDOUT` query data.
49    #[project(!Unpin)]
50    pub struct CopyOutStream {
51        responses: Responses,
52    }
53}
54
55impl Stream for CopyOutStream {
56    type Item = Result<Bytes, Error>;
57
58    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
59        let this = self.project();
60
61        match ready!(this.responses.poll_next(cx)?) {
62            Message::CopyData(body) => Poll::Ready(Some(Ok(body.into_bytes()))),
63            Message::CopyDone => Poll::Ready(None),
64            _ => Poll::Ready(Some(Err(Error::unexpected_message()))),
65        }
66    }
67}