tokio_postgres/
client.rs

1use crate::codec::{BackendMessages, FrontendMessage};
2use crate::config::{SslMode, SslNegotiation};
3use crate::connection::{Request, RequestMessages};
4use crate::copy_both::{CopyBothDuplex, CopyBothReceiver};
5use crate::copy_out::CopyOutStream;
6#[cfg(feature = "runtime")]
7use crate::keepalive::KeepaliveConfig;
8use crate::query::RowStream;
9use crate::simple_query::SimpleQueryStream;
10#[cfg(feature = "runtime")]
11use crate::tls::MakeTlsConnect;
12use crate::tls::TlsConnect;
13use crate::types::{Oid, ToSql, Type};
14#[cfg(feature = "runtime")]
15use crate::Socket;
16use crate::{
17    copy_both, copy_in, copy_out, prepare, query, simple_query, slice_iter, CancelToken,
18    CopyInSink, Error, Row, SimpleQueryMessage, Statement, ToStatement, Transaction,
19    TransactionBuilder,
20};
21use bytes::{Buf, BytesMut};
22use fallible_iterator::FallibleIterator;
23use futures_channel::mpsc;
24use futures_util::{Stream, StreamExt, TryStreamExt};
25use parking_lot::Mutex;
26use postgres_protocol::message::backend::Message;
27use postgres_protocol::message::frontend;
28use postgres_types::BorrowToSql;
29use std::collections::HashMap;
30use std::fmt;
31use std::future;
32#[cfg(feature = "runtime")]
33use std::net::IpAddr;
34#[cfg(feature = "runtime")]
35use std::path::PathBuf;
36use std::pin::pin;
37use std::pin::Pin;
38use std::sync::Arc;
39use std::task::{ready, Context, Poll};
40#[cfg(feature = "runtime")]
41use std::time::Duration;
42use tokio::io::{AsyncRead, AsyncWrite};
43
44pub struct Responses {
45    receiver: mpsc::Receiver<BackendMessages>,
46    cur: BackendMessages,
47}
48
49pub struct CopyBothHandles {
50    pub(crate) stream_receiver: mpsc::Receiver<Result<Message, Error>>,
51    pub(crate) sink_sender: mpsc::Sender<FrontendMessage>,
52}
53
54impl Responses {
55    pub fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Result<Message, Error>> {
56        loop {
57            match self.cur.next().map_err(Error::parse)? {
58                Some(Message::ErrorResponse(body)) => return Poll::Ready(Err(Error::db(body))),
59                Some(message) => return Poll::Ready(Ok(message)),
60                None => {}
61            }
62
63            match ready!(self.receiver.poll_next_unpin(cx)) {
64                Some(messages) => self.cur = messages,
65                None => return Poll::Ready(Err(Error::closed())),
66            }
67        }
68    }
69
70    pub async fn next(&mut self) -> Result<Message, Error> {
71        future::poll_fn(|cx| self.poll_next(cx)).await
72    }
73}
74
75impl Stream for Responses {
76    type Item = Result<Message, Error>;
77
78    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
79        match ready!((*self).poll_next(cx)) {
80            Err(err) if err.is_closed() => Poll::Ready(None),
81            msg => Poll::Ready(Some(msg)),
82        }
83    }
84}
85
86/// A cache of type info and prepared statements for fetching type info
87/// (corresponding to the queries in the [prepare](prepare) module).
88#[derive(Default)]
89struct CachedTypeInfo {
90    /// A statement for basic information for a type from its
91    /// OID. Corresponds to [TYPEINFO_QUERY](prepare::TYPEINFO_QUERY) (or its
92    /// fallback).
93    typeinfo: Option<Statement>,
94    /// A statement for getting information for a composite type from its OID.
95    /// Corresponds to [TYPEINFO_QUERY](prepare::TYPEINFO_COMPOSITE_QUERY).
96    typeinfo_composite: Option<Statement>,
97    /// A statement for getting information for a composite type from its OID.
98    /// Corresponds to [TYPEINFO_QUERY](prepare::TYPEINFO_COMPOSITE_QUERY) (or
99    /// its fallback).
100    typeinfo_enum: Option<Statement>,
101
102    /// Cache of types already looked up.
103    types: HashMap<Oid, Type>,
104}
105
106pub struct InnerClient {
107    sender: mpsc::UnboundedSender<Request>,
108    cached_typeinfo: Mutex<CachedTypeInfo>,
109
110    /// A buffer to use when writing out postgres commands.
111    buffer: Mutex<BytesMut>,
112}
113
114impl InnerClient {
115    pub fn send(&self, messages: RequestMessages) -> Result<Responses, Error> {
116        let (sender, receiver) = mpsc::channel(1);
117        let request = Request { messages, sender };
118        self.sender
119            .unbounded_send(request)
120            .map_err(|_| Error::closed())?;
121
122        Ok(Responses {
123            receiver,
124            cur: BackendMessages::empty(),
125        })
126    }
127
128    pub fn start_copy_both(&self) -> Result<CopyBothHandles, Error> {
129        let (sender, receiver) = mpsc::channel(16);
130        let (stream_sender, stream_receiver) = mpsc::channel(16);
131        let (sink_sender, sink_receiver) = mpsc::channel(16);
132
133        let responses = Responses {
134            receiver,
135            cur: BackendMessages::empty(),
136        };
137        let messages = RequestMessages::CopyBoth(CopyBothReceiver::new(
138            responses,
139            sink_receiver,
140            stream_sender,
141        ));
142
143        let request = Request { messages, sender };
144        self.sender
145            .unbounded_send(request)
146            .map_err(|_| Error::closed())?;
147
148        Ok(CopyBothHandles {
149            stream_receiver,
150            sink_sender,
151        })
152    }
153
154    pub fn typeinfo(&self) -> Option<Statement> {
155        self.cached_typeinfo.lock().typeinfo.clone()
156    }
157
158    pub fn set_typeinfo(&self, statement: &Statement) {
159        self.cached_typeinfo.lock().typeinfo = Some(statement.clone());
160    }
161
162    pub fn typeinfo_composite(&self) -> Option<Statement> {
163        self.cached_typeinfo.lock().typeinfo_composite.clone()
164    }
165
166    pub fn set_typeinfo_composite(&self, statement: &Statement) {
167        self.cached_typeinfo.lock().typeinfo_composite = Some(statement.clone());
168    }
169
170    pub fn typeinfo_enum(&self) -> Option<Statement> {
171        self.cached_typeinfo.lock().typeinfo_enum.clone()
172    }
173
174    pub fn set_typeinfo_enum(&self, statement: &Statement) {
175        self.cached_typeinfo.lock().typeinfo_enum = Some(statement.clone());
176    }
177
178    pub fn type_(&self, oid: Oid) -> Option<Type> {
179        self.cached_typeinfo.lock().types.get(&oid).cloned()
180    }
181
182    pub fn set_type(&self, oid: Oid, type_: &Type) {
183        self.cached_typeinfo.lock().types.insert(oid, type_.clone());
184    }
185
186    pub fn clear_type_cache(&self) {
187        self.cached_typeinfo.lock().types.clear();
188    }
189
190    /// Call the given function with a buffer to be used when writing out
191    /// postgres commands.
192    pub fn with_buf<F, R>(&self, f: F) -> R
193    where
194        F: FnOnce(&mut BytesMut) -> R,
195    {
196        let mut buffer = self.buffer.lock();
197        let r = f(&mut buffer);
198        buffer.clear();
199        r
200    }
201}
202
203#[cfg(feature = "runtime")]
204#[derive(Clone)]
205pub(crate) struct SocketConfig {
206    pub addr: Addr,
207    pub hostname: Option<String>,
208    pub port: u16,
209    pub connect_timeout: Option<Duration>,
210    pub tcp_user_timeout: Option<Duration>,
211    pub keepalive: Option<KeepaliveConfig>,
212}
213
214#[cfg(feature = "runtime")]
215#[derive(Clone)]
216pub(crate) enum Addr {
217    Tcp(IpAddr),
218    #[cfg(unix)]
219    Unix(PathBuf),
220}
221
222/// An asynchronous PostgreSQL client.
223///
224/// The client is one half of what is returned when a connection is established. Users interact with the database
225/// through this client object.
226pub struct Client {
227    inner: Arc<InnerClient>,
228    #[cfg(feature = "runtime")]
229    socket_config: Option<SocketConfig>,
230    ssl_mode: SslMode,
231    ssl_negotiation: SslNegotiation,
232    process_id: i32,
233    secret_key: i32,
234}
235
236impl Client {
237    pub(crate) fn new(
238        sender: mpsc::UnboundedSender<Request>,
239        ssl_mode: SslMode,
240        ssl_negotiation: SslNegotiation,
241        process_id: i32,
242        secret_key: i32,
243    ) -> Client {
244        Client {
245            inner: Arc::new(InnerClient {
246                sender,
247                cached_typeinfo: Default::default(),
248                buffer: Default::default(),
249            }),
250            #[cfg(feature = "runtime")]
251            socket_config: None,
252            ssl_mode,
253            ssl_negotiation,
254            process_id,
255            secret_key,
256        }
257    }
258
259    pub(crate) fn inner(&self) -> &Arc<InnerClient> {
260        &self.inner
261    }
262
263    #[cfg(feature = "runtime")]
264    pub(crate) fn set_socket_config(&mut self, socket_config: SocketConfig) {
265        self.socket_config = Some(socket_config);
266    }
267
268    /// Creates a new prepared statement.
269    ///
270    /// Prepared statements can be executed repeatedly, and may contain query parameters (indicated by `$1`, `$2`, etc),
271    /// which are set when executed. Prepared statements can only be used with the connection that created them.
272    pub async fn prepare(&self, query: &str) -> Result<Statement, Error> {
273        self.prepare_typed(query, &[]).await
274    }
275
276    /// Like `prepare`, but allows the types of query parameters to be explicitly specified.
277    ///
278    /// The list of types may be smaller than the number of parameters - the types of the remaining parameters will be
279    /// inferred. For example, `client.prepare_typed(query, &[])` is equivalent to `client.prepare(query)`.
280    pub async fn prepare_typed(
281        &self,
282        query: &str,
283        parameter_types: &[Type],
284    ) -> Result<Statement, Error> {
285        prepare::prepare(&self.inner, query, parameter_types).await
286    }
287
288    /// Executes a statement, returning a vector of the resulting rows.
289    ///
290    /// A statement may contain parameters, specified by `$n`, where `n` is the index of the parameter of the list
291    /// provided, 1-indexed.
292    ///
293    /// The `statement` argument can either be a `Statement`, or a raw query string. If the same statement will be
294    /// repeatedly executed (perhaps with different query parameters), consider preparing the statement up front
295    /// with the `prepare` method.
296    pub async fn query<T>(
297        &self,
298        statement: &T,
299        params: &[&(dyn ToSql + Sync)],
300    ) -> Result<Vec<Row>, Error>
301    where
302        T: ?Sized + ToStatement,
303    {
304        self.query_raw(statement, slice_iter(params))
305            .await?
306            .try_collect()
307            .await
308    }
309
310    /// Executes a statement which returns a single row, returning it.
311    ///
312    /// Returns an error if the query does not return exactly one row.
313    ///
314    /// A statement may contain parameters, specified by `$n`, where `n` is the index of the parameter of the list
315    /// provided, 1-indexed.
316    ///
317    /// The `statement` argument can either be a `Statement`, or a raw query string. If the same statement will be
318    /// repeatedly executed (perhaps with different query parameters), consider preparing the statement up front
319    /// with the `prepare` method.
320    pub async fn query_one<T>(
321        &self,
322        statement: &T,
323        params: &[&(dyn ToSql + Sync)],
324    ) -> Result<Row, Error>
325    where
326        T: ?Sized + ToStatement,
327    {
328        self.query_opt(statement, params)
329            .await
330            .and_then(|res| res.ok_or_else(Error::row_count))
331    }
332
333    /// Executes a statements which returns zero or one rows, returning it.
334    ///
335    /// Returns an error if the query returns more than one row.
336    ///
337    /// A statement may contain parameters, specified by `$n`, where `n` is the index of the parameter of the list
338    /// provided, 1-indexed.
339    ///
340    /// The `statement` argument can either be a `Statement`, or a raw query string. If the same statement will be
341    /// repeatedly executed (perhaps with different query parameters), consider preparing the statement up front
342    /// with the `prepare` method.
343    pub async fn query_opt<T>(
344        &self,
345        statement: &T,
346        params: &[&(dyn ToSql + Sync)],
347    ) -> Result<Option<Row>, Error>
348    where
349        T: ?Sized + ToStatement,
350    {
351        let mut stream = pin!(self.query_raw(statement, slice_iter(params)).await?);
352
353        let mut first = None;
354
355        // Originally this was two calls to `try_next().await?`,
356        // once for the first element, and second to error if more than one.
357        //
358        // However, this new form with only one .await in a loop generates
359        // slightly smaller codegen/stack usage for the resulting future.
360        while let Some(row) = stream.try_next().await? {
361            if first.is_some() {
362                return Err(Error::row_count());
363            }
364
365            first = Some(row);
366        }
367
368        Ok(first)
369    }
370
371    /// The maximally flexible version of [`query`].
372    ///
373    /// A statement may contain parameters, specified by `$n`, where `n` is the index of the parameter of the list
374    /// provided, 1-indexed.
375    ///
376    /// The `statement` argument can either be a `Statement`, or a raw query string. If the same statement will be
377    /// repeatedly executed (perhaps with different query parameters), consider preparing the statement up front
378    /// with the `prepare` method.
379    ///
380    /// [`query`]: #method.query
381    ///
382    /// # Examples
383    ///
384    /// ```no_run
385    /// # async fn async_main(client: &tokio_postgres::Client) -> Result<(), tokio_postgres::Error> {
386    /// use std::pin::pin;
387    /// use futures_util::TryStreamExt;
388    ///
389    /// let params: Vec<String> = vec![
390    ///     "first param".into(),
391    ///     "second param".into(),
392    /// ];
393    /// let mut it = pin!(client.query_raw(
394    ///     "SELECT foo FROM bar WHERE biz = $1 AND baz = $2",
395    ///     params,
396    /// ).await?);
397    ///
398    /// while let Some(row) = it.try_next().await? {
399    ///     let foo: i32 = row.get("foo");
400    ///     println!("foo: {}", foo);
401    /// }
402    /// # Ok(())
403    /// # }
404    /// ```
405    pub async fn query_raw<T, P, I>(&self, statement: &T, params: I) -> Result<RowStream, Error>
406    where
407        T: ?Sized + ToStatement,
408        P: BorrowToSql,
409        I: IntoIterator<Item = P>,
410        I::IntoIter: ExactSizeIterator,
411    {
412        let statement = statement.__convert().into_statement(self).await?;
413        query::query(&self.inner, statement, params).await
414    }
415
416    /// Like `query`, but requires the types of query parameters to be explicitly specified.
417    ///
418    /// Compared to `query`, this method allows performing queries without three round trips (for
419    /// prepare, execute, and close) by requiring the caller to specify parameter values along with
420    /// their Postgres type. Thus, this is suitable in environments where prepared statements aren't
421    /// supported (such as Cloudflare Workers with Hyperdrive).
422    ///
423    /// A statement may contain parameters, specified by `$n`, where `n` is the index of the
424    /// parameter of the list provided, 1-indexed.
425    pub async fn query_typed(
426        &self,
427        query: &str,
428        params: &[(&(dyn ToSql + Sync), Type)],
429    ) -> Result<Vec<Row>, Error> {
430        self.query_typed_raw(query, params.iter().map(|(v, t)| (*v, t.clone())))
431            .await?
432            .try_collect()
433            .await
434    }
435
436    /// The maximally flexible version of [`query_typed`].
437    ///
438    /// Compared to `query`, this method allows performing queries without three round trips (for
439    /// prepare, execute, and close) by requiring the caller to specify parameter values along with
440    /// their Postgres type. Thus, this is suitable in environments where prepared statements aren't
441    /// supported (such as Cloudflare Workers with Hyperdrive).
442    ///
443    /// A statement may contain parameters, specified by `$n`, where `n` is the index of the
444    /// parameter of the list provided, 1-indexed.
445    ///
446    /// [`query_typed`]: #method.query_typed
447    ///
448    /// # Examples
449    ///
450    /// ```no_run
451    /// # async fn async_main(client: &tokio_postgres::Client) -> Result<(), tokio_postgres::Error> {
452    /// use std::pin::pin;
453    /// use futures_util::{TryStreamExt};
454    /// use tokio_postgres::types::Type;
455    ///
456    /// let params: Vec<(String, Type)> = vec![
457    ///     ("first param".into(), Type::TEXT),
458    ///     ("second param".into(), Type::TEXT),
459    /// ];
460    /// let mut it = pin!(client.query_typed_raw(
461    ///     "SELECT foo FROM bar WHERE biz = $1 AND baz = $2",
462    ///     params,
463    /// ).await?);
464    ///
465    /// while let Some(row) = it.try_next().await? {
466    ///     let foo: i32 = row.get("foo");
467    ///     println!("foo: {}", foo);
468    /// }
469    /// # Ok(())
470    /// # }
471    /// ```
472    pub async fn query_typed_raw<P, I>(&self, query: &str, params: I) -> Result<RowStream, Error>
473    where
474        P: BorrowToSql,
475        I: IntoIterator<Item = (P, Type)>,
476    {
477        query::query_typed(&self.inner, query, params).await
478    }
479
480    /// Executes a statement, returning the number of rows modified.
481    ///
482    /// A statement may contain parameters, specified by `$n`, where `n` is the index of the parameter of the list
483    /// provided, 1-indexed.
484    ///
485    /// The `statement` argument can either be a `Statement`, or a raw query string. If the same statement will be
486    /// repeatedly executed (perhaps with different query parameters), consider preparing the statement up front
487    /// with the `prepare` method.
488    ///
489    /// If the statement does not modify any rows (e.g. `SELECT`), 0 is returned.
490    pub async fn execute<T>(
491        &self,
492        statement: &T,
493        params: &[&(dyn ToSql + Sync)],
494    ) -> Result<u64, Error>
495    where
496        T: ?Sized + ToStatement,
497    {
498        self.execute_raw(statement, slice_iter(params)).await
499    }
500
501    /// The maximally flexible version of [`execute`].
502    ///
503    /// A statement may contain parameters, specified by `$n`, where `n` is the index of the parameter of the list
504    /// provided, 1-indexed.
505    ///
506    /// The `statement` argument can either be a `Statement`, or a raw query string. If the same statement will be
507    /// repeatedly executed (perhaps with different query parameters), consider preparing the statement up front
508    /// with the `prepare` method.
509    ///
510    /// [`execute`]: #method.execute
511    pub async fn execute_raw<T, P, I>(&self, statement: &T, params: I) -> Result<u64, Error>
512    where
513        T: ?Sized + ToStatement,
514        P: BorrowToSql,
515        I: IntoIterator<Item = P>,
516        I::IntoIter: ExactSizeIterator,
517    {
518        let statement = statement.__convert().into_statement(self).await?;
519        query::execute(self.inner(), statement, params).await
520    }
521
522    /// Executes a `COPY FROM STDIN` statement, returning a sink used to write the copy data.
523    ///
524    /// PostgreSQL does not support parameters in `COPY` statements, so this method does not take any. The copy *must*
525    /// be explicitly completed via the `Sink::close` or `finish` methods. If it is not, the copy will be aborted.
526    pub async fn copy_in<T, U>(&self, statement: &T) -> Result<CopyInSink<U>, Error>
527    where
528        T: ?Sized + ToStatement,
529        U: Buf + 'static + Send,
530    {
531        let statement = statement.__convert().into_statement(self).await?;
532        copy_in::copy_in(self.inner(), statement).await
533    }
534
535    /// Executes a `COPY FROM STDIN` query, returning a sink used to write the copy data.
536    pub async fn copy_in_simple<U>(&self, query: &str) -> Result<CopyInSink<U>, Error>
537    where
538        U: Buf + 'static + Send,
539    {
540        copy_in::copy_in_simple(self.inner(), query).await
541    }
542
543    /// Executes a `COPY TO STDOUT` statement, returning a stream of the resulting data.
544    ///
545    /// PostgreSQL does not support parameters in `COPY` statements, so this method does not take any.
546    pub async fn copy_out<T>(&self, statement: &T) -> Result<CopyOutStream, Error>
547    where
548        T: ?Sized + ToStatement,
549    {
550        let statement = statement.__convert().into_statement(self).await?;
551        copy_out::copy_out(self.inner(), statement).await
552    }
553
554    /// Executes a `COPY TO STDOUT` query, returning a stream of the resulting data.
555    pub async fn copy_out_simple(&self, query: &str) -> Result<CopyOutStream, Error> {
556        copy_out::copy_out_simple(self.inner(), query).await
557    }
558
559    /// Executes a CopyBoth query, returning a combined Stream+Sink type to read and write copy
560    /// data.
561    pub async fn copy_both_simple<T>(&self, query: &str) -> Result<CopyBothDuplex<T>, Error>
562    where
563        T: Buf + 'static + Send,
564    {
565        copy_both::copy_both_simple(self.inner(), query).await
566    }
567
568    /// Executes a sequence of SQL statements using the simple query protocol, returning the resulting rows.
569    ///
570    /// Statements should be separated by semicolons. If an error occurs, execution of the sequence will stop at that
571    /// point. The simple query protocol returns the values in rows as strings rather than in their binary encodings,
572    /// so the associated row type doesn't work with the `FromSql` trait. Rather than simply returning a list of the
573    /// rows, this method returns a list of an enum which indicates either the completion of one of the commands,
574    /// or a row of data. This preserves the framing between the separate statements in the request.
575    ///
576    /// # Warning
577    ///
578    /// Prepared statements should be use for any query which contains user-specified data, as they provided the
579    /// functionality to safely embed that data in the request. Do not form statements via string concatenation and pass
580    /// them to this method!
581    pub async fn simple_query(&self, query: &str) -> Result<Vec<SimpleQueryMessage>, Error> {
582        self.simple_query_raw(query).await?.try_collect().await
583    }
584
585    /// Executes a sequence of SQL statements using the simple query protocol, returning the resulting rows as a stream.
586    ///
587    /// Statements should be separated by semicolons. If an error occurs, execution of the sequence will stop at that
588    /// point. The simple query protocol returns the values in rows as strings rather than in their binary encodings,
589    /// so the associated row type doesn't work with the `FromSql` trait. Rather than simply returning a list of the
590    /// rows, this method returns a list of an enum which indicates either the completion of one of the commands,
591    /// or a row of data. This preserves the framing between the separate statements in the request.
592    ///
593    /// # Warning
594    ///
595    /// Prepared statements should be use for any query which contains user-specified data, as they provided the
596    /// functionality to safely embed that data in the request. Do not form statements via string concatenation and pass
597    /// them to this method!
598    pub async fn simple_query_raw(&self, query: &str) -> Result<SimpleQueryStream, Error> {
599        simple_query::simple_query(self.inner(), query).await
600    }
601
602    /// Executes a sequence of SQL statements using the simple query protocol.
603    ///
604    /// Statements should be separated by semicolons. If an error occurs, execution of the sequence will stop at that
605    /// point. This is intended for use when, for example, initializing a database schema.
606    ///
607    /// # Warning
608    ///
609    /// Prepared statements should be use for any query which contains user-specified data, as they provided the
610    /// functionality to safely embed that data in the request. Do not form statements via string concatenation and pass
611    /// them to this method!
612    pub async fn batch_execute(&self, query: &str) -> Result<(), Error> {
613        simple_query::batch_execute(self.inner(), query).await
614    }
615
616    /// Check that the connection is alive and wait for the confirmation.
617    pub async fn check_connection(&self) -> Result<(), Error> {
618        // sync is a very quick message to test the connection health.
619        query::sync(self.inner()).await
620    }
621
622    /// Begins a new database transaction.
623    ///
624    /// The transaction will roll back by default - use the `commit` method to commit it.
625    pub async fn transaction(&mut self) -> Result<Transaction<'_>, Error> {
626        self.build_transaction().start().await
627    }
628
629    /// Returns a builder for a transaction with custom settings.
630    ///
631    /// Unlike the `transaction` method, the builder can be used to control the transaction's isolation level and other
632    /// attributes.
633    pub fn build_transaction(&mut self) -> TransactionBuilder<'_> {
634        TransactionBuilder::new(self)
635    }
636
637    /// Returns the server's process ID for the connection.
638    pub fn backend_pid(&self) -> i32 {
639        self.process_id
640    }
641
642    /// Constructs a cancellation token that can later be used to request cancellation of a query running on the
643    /// connection associated with this client.
644    pub fn cancel_token(&self) -> CancelToken {
645        CancelToken {
646            #[cfg(feature = "runtime")]
647            socket_config: self.socket_config.clone(),
648            ssl_mode: self.ssl_mode,
649            ssl_negotiation: self.ssl_negotiation,
650            process_id: self.process_id,
651            secret_key: self.secret_key,
652        }
653    }
654
655    /// Attempts to cancel an in-progress query.
656    ///
657    /// The server provides no information about whether a cancellation attempt was successful or not. An error will
658    /// only be returned if the client was unable to connect to the database.
659    ///
660    /// Requires the `runtime` Cargo feature (enabled by default).
661    #[cfg(feature = "runtime")]
662    #[deprecated(since = "0.6.0", note = "use Client::cancel_token() instead")]
663    pub async fn cancel_query<T>(&self, tls: T) -> Result<(), Error>
664    where
665        T: MakeTlsConnect<Socket>,
666    {
667        self.cancel_token().cancel_query(tls).await
668    }
669
670    /// Like `cancel_query`, but uses a stream which is already connected to the server rather than opening a new
671    /// connection itself.
672    #[deprecated(since = "0.6.0", note = "use Client::cancel_token() instead")]
673    pub async fn cancel_query_raw<S, T>(&self, stream: S, tls: T) -> Result<(), Error>
674    where
675        S: AsyncRead + AsyncWrite + Unpin,
676        T: TlsConnect<S>,
677    {
678        self.cancel_token().cancel_query_raw(stream, tls).await
679    }
680
681    /// Clears the client's type information cache.
682    ///
683    /// When user-defined types are used in a query, the client loads their definitions from the database and caches
684    /// them for the lifetime of the client. If those definitions are changed in the database, this method can be used
685    /// to flush the local cache and allow the new, updated definitions to be loaded.
686    pub fn clear_type_cache(&self) {
687        self.inner().clear_type_cache();
688    }
689
690    /// Determines if the connection to the server has already closed.
691    ///
692    /// In that case, all future queries will fail.
693    pub fn is_closed(&self) -> bool {
694        self.inner.sender.is_closed()
695    }
696
697    #[doc(hidden)]
698    pub fn __private_api_rollback(&self, name: Option<&str>) {
699        let buf = self.inner().with_buf(|buf| {
700            if let Some(name) = name {
701                frontend::query(&format!("ROLLBACK TO {}", name), buf).unwrap();
702            } else {
703                frontend::query("ROLLBACK", buf).unwrap();
704            }
705            buf.split().freeze()
706        });
707        let _ = self
708            .inner()
709            .send(RequestMessages::Single(FrontendMessage::Raw(buf)));
710    }
711
712    #[doc(hidden)]
713    pub fn __private_api_close(&mut self) {
714        self.inner.sender.close_channel()
715    }
716}
717
718impl fmt::Debug for Client {
719    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
720        f.debug_struct("Client").finish()
721    }
722}