tokio_postgres/
copy_out.rs
1use crate::client::{InnerClient, Responses};
2use crate::codec::FrontendMessage;
3use crate::connection::RequestMessages;
4use crate::{query, simple_query, slice_iter, Error, Statement};
5use bytes::Bytes;
6use futures_util::{ready, Stream};
7use log::debug;
8use pin_project_lite::pin_project;
9use postgres_protocol::message::backend::Message;
10use std::marker::PhantomPinned;
11use std::pin::Pin;
12use std::task::{Context, Poll};
13
14pub async fn copy_out_simple(client: &InnerClient, query: &str) -> Result<CopyOutStream, Error> {
15 debug!("executing copy out query {}", query);
16
17 let buf = simple_query::encode(client, query)?;
18 let responses = start(client, buf, true).await?;
19 Ok(CopyOutStream {
20 responses,
21 _p: PhantomPinned,
22 })
23}
24
25pub async fn copy_out(client: &InnerClient, statement: Statement) -> Result<CopyOutStream, Error> {
26 debug!("executing copy out statement {}", statement.name());
27
28 let buf = query::encode(client, &statement, slice_iter(&[]))?;
29 let responses = start(client, buf, false).await?;
30 Ok(CopyOutStream {
31 responses,
32 _p: PhantomPinned,
33 })
34}
35
36async fn start(client: &InnerClient, buf: Bytes, simple: bool) -> Result<Responses, Error> {
37 let mut responses = client.send(RequestMessages::Single(FrontendMessage::Raw(buf)))?;
38
39 if !simple {
40 match responses.next().await? {
41 Message::BindComplete => {}
42 _ => return Err(Error::unexpected_message()),
43 }
44 }
45
46 match responses.next().await? {
47 Message::CopyOutResponse(_) => {}
48 _ => return Err(Error::unexpected_message()),
49 }
50
51 Ok(responses)
52}
53
54pin_project! {
55 pub struct CopyOutStream {
57 responses: Responses,
58 #[pin]
59 _p: PhantomPinned,
60 }
61}
62
63impl Stream for CopyOutStream {
64 type Item = Result<Bytes, Error>;
65
66 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
67 let this = self.project();
68
69 match ready!(this.responses.poll_next(cx)?) {
70 Message::CopyData(body) => Poll::Ready(Some(Ok(body.into_bytes()))),
71 Message::CopyDone => Poll::Ready(None),
72 _ => Poll::Ready(Some(Err(Error::unexpected_message()))),
73 }
74 }
75}