1//! Utilities for working with the PostgreSQL binary copy format.
23use crate::connection::ConnectionRef;
4use crate::types::{BorrowToSql, ToSql, Type};
5use crate::{CopyInWriter, CopyOutReader, Error};
6use fallible_iterator::FallibleIterator;
7use futures_util::StreamExt;
8use std::pin::Pin;
9#[doc(inline)]
10pub use tokio_postgres::binary_copy::BinaryCopyOutRow;
11use tokio_postgres::binary_copy::{self, BinaryCopyOutStream};
1213/// A type which serializes rows into the PostgreSQL binary copy format.
14///
15/// The copy *must* be explicitly completed via the `finish` method. If it is not, the copy will be aborted.
16pub struct BinaryCopyInWriter<'a> {
17 connection: ConnectionRef<'a>,
18 sink: Pin<Box<binary_copy::BinaryCopyInWriter>>,
19}
2021impl<'a> BinaryCopyInWriter<'a> {
22/// Creates a new writer which will write rows of the provided types.
23pub fn new(writer: CopyInWriter<'a>, types: &[Type]) -> BinaryCopyInWriter<'a> {
24let stream = writer
25 .sink
26 .into_unpinned()
27 .expect("writer has already been written to");
2829 BinaryCopyInWriter {
30 connection: writer.connection,
31 sink: Box::pin(binary_copy::BinaryCopyInWriter::new(stream, types)),
32 }
33 }
3435/// Writes a single row.
36 ///
37 /// # Panics
38 ///
39 /// Panics if the number of values provided does not match the number expected.
40pub fn write(&mut self, values: &[&(dyn ToSql + Sync)]) -> Result<(), Error> {
41self.connection.block_on(self.sink.as_mut().write(values))
42 }
4344/// A maximally-flexible version of `write`.
45 ///
46 /// # Panics
47 ///
48 /// Panics if the number of values provided does not match the number expected.
49pub fn write_raw<P, I>(&mut self, values: I) -> Result<(), Error>
50where
51P: BorrowToSql,
52 I: IntoIterator<Item = P>,
53 I::IntoIter: ExactSizeIterator,
54 {
55self.connection
56 .block_on(self.sink.as_mut().write_raw(values))
57 }
5859/// Completes the copy, returning the number of rows added.
60 ///
61 /// This method *must* be used to complete the copy process. If it is not, the copy will be aborted.
62pub fn finish(mut self) -> Result<u64, Error> {
63self.connection.block_on(self.sink.as_mut().finish())
64 }
65}
6667/// An iterator of rows deserialized from the PostgreSQL binary copy format.
68pub struct BinaryCopyOutIter<'a> {
69 connection: ConnectionRef<'a>,
70 stream: Pin<Box<BinaryCopyOutStream>>,
71}
7273impl<'a> BinaryCopyOutIter<'a> {
74/// Creates a new iterator from a raw copy out reader and the types of the columns being returned.
75pub fn new(reader: CopyOutReader<'a>, types: &[Type]) -> BinaryCopyOutIter<'a> {
76let stream = reader
77 .stream
78 .into_unpinned()
79 .expect("reader has already been read from");
8081 BinaryCopyOutIter {
82 connection: reader.connection,
83 stream: Box::pin(BinaryCopyOutStream::new(stream, types)),
84 }
85 }
86}
8788impl FallibleIterator for BinaryCopyOutIter<'_> {
89type Item = BinaryCopyOutRow;
90type Error = Error;
9192fn next(&mut self) -> Result<Option<BinaryCopyOutRow>, Error> {
93let stream = &mut self.stream;
94self.connection
95 .block_on(async { stream.next().await.transpose() })
96 }
97}