postgres/
copy_out_reader.rs

1use crate::connection::ConnectionRef;
2use crate::lazy_pin::LazyPin;
3use bytes::{Buf, Bytes};
4use futures_util::StreamExt;
5use std::io::{self, BufRead, Read};
6use tokio_postgres::CopyOutStream;
7
8/// The reader returned by the `copy_out` method.
9pub struct CopyOutReader<'a> {
10    pub(crate) connection: ConnectionRef<'a>,
11    pub(crate) stream: LazyPin<CopyOutStream>,
12    cur: Bytes,
13}
14
15impl<'a> CopyOutReader<'a> {
16    pub(crate) fn new(connection: ConnectionRef<'a>, stream: CopyOutStream) -> CopyOutReader<'a> {
17        CopyOutReader {
18            connection,
19            stream: LazyPin::new(stream),
20            cur: Bytes::new(),
21        }
22    }
23}
24
25impl Read for CopyOutReader<'_> {
26    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
27        let b = self.fill_buf()?;
28        let len = usize::min(buf.len(), b.len());
29        buf[..len].copy_from_slice(&b[..len]);
30        self.consume(len);
31        Ok(len)
32    }
33}
34
35impl BufRead for CopyOutReader<'_> {
36    fn fill_buf(&mut self) -> io::Result<&[u8]> {
37        while !self.cur.has_remaining() {
38            let mut stream = self.stream.pinned();
39            match self
40                .connection
41                .block_on(async { stream.next().await.transpose() })
42            {
43                Ok(Some(cur)) => self.cur = cur,
44                Err(e) => return Err(io::Error::new(io::ErrorKind::Other, e)),
45                Ok(None) => break,
46            };
47        }
48
49        Ok(&self.cur)
50    }
51
52    fn consume(&mut self, amt: usize) {
53        self.cur.advance(amt);
54    }
55}