tokio_postgres/client.rs
1use crate::codec::{BackendMessages, FrontendMessage};
2use crate::config::SslMode;
3use crate::connection::{Request, RequestMessages};
4use crate::copy_both::{CopyBothDuplex, CopyBothReceiver};
5use crate::copy_out::CopyOutStream;
6#[cfg(feature = "runtime")]
7use crate::keepalive::KeepaliveConfig;
8use crate::query::RowStream;
9use crate::simple_query::SimpleQueryStream;
10#[cfg(feature = "runtime")]
11use crate::tls::MakeTlsConnect;
12use crate::tls::TlsConnect;
13use crate::types::{Oid, ToSql, Type};
14#[cfg(feature = "runtime")]
15use crate::Socket;
16use crate::{
17    copy_both, copy_in, copy_out, prepare, query, simple_query, slice_iter, CancelToken,
18    CopyInSink, Error, Row, SimpleQueryMessage, Statement, ToStatement, Transaction,
19    TransactionBuilder,
20};
21use bytes::{Buf, BytesMut};
22use fallible_iterator::FallibleIterator;
23use futures_channel::mpsc;
24use futures_util::{future, pin_mut, ready, Stream, StreamExt, TryStreamExt};
25use parking_lot::Mutex;
26use postgres_protocol::message::backend::Message;
27use postgres_types::BorrowToSql;
28use std::collections::HashMap;
29use std::fmt;
30#[cfg(feature = "runtime")]
31use std::net::IpAddr;
32#[cfg(feature = "runtime")]
33use std::path::PathBuf;
34use std::pin::Pin;
35use std::sync::Arc;
36use std::task::{Context, Poll};
37#[cfg(feature = "runtime")]
38use std::time::Duration;
39use tokio::io::{AsyncRead, AsyncWrite};
40
41pub struct Responses {
42    receiver: mpsc::Receiver<BackendMessages>,
43    cur: BackendMessages,
44}
45
46pub struct CopyBothHandles {
47    pub(crate) stream_receiver: mpsc::Receiver<Result<Message, Error>>,
48    pub(crate) sink_sender: mpsc::Sender<FrontendMessage>,
49}
50
51impl Responses {
52    pub fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Result<Message, Error>> {
53        loop {
54            match self.cur.next().map_err(Error::parse)? {
55                Some(Message::ErrorResponse(body)) => return Poll::Ready(Err(Error::db(body))),
56                Some(message) => return Poll::Ready(Ok(message)),
57                None => {}
58            }
59
60            match ready!(self.receiver.poll_next_unpin(cx)) {
61                Some(messages) => self.cur = messages,
62                None => return Poll::Ready(Err(Error::closed())),
63            }
64        }
65    }
66
67    pub async fn next(&mut self) -> Result<Message, Error> {
68        future::poll_fn(|cx| self.poll_next(cx)).await
69    }
70}
71
72impl Stream for Responses {
73    type Item = Result<Message, Error>;
74
75    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
76        match ready!((*self).poll_next(cx)) {
77            Err(err) if err.is_closed() => Poll::Ready(None),
78            msg => Poll::Ready(Some(msg)),
79        }
80    }
81}
82
83/// A cache of type info and prepared statements for fetching type info
84/// (corresponding to the queries in the [prepare](prepare) module).
85#[derive(Default)]
86struct CachedTypeInfo {
87    /// A statement for basic information for a type from its
88    /// OID. Corresponds to [TYPEINFO_QUERY](prepare::TYPEINFO_QUERY) (or its
89    /// fallback).
90    typeinfo: Option<Statement>,
91    /// A statement for getting information for a composite type from its OID.
92    /// Corresponds to [TYPEINFO_QUERY](prepare::TYPEINFO_COMPOSITE_QUERY).
93    typeinfo_composite: Option<Statement>,
94    /// A statement for getting information for a composite type from its OID.
95    /// Corresponds to [TYPEINFO_QUERY](prepare::TYPEINFO_COMPOSITE_QUERY) (or
96    /// its fallback).
97    typeinfo_enum: Option<Statement>,
98
99    /// Cache of types already looked up.
100    types: HashMap<Oid, Type>,
101}
102
103pub struct InnerClient {
104    sender: mpsc::UnboundedSender<Request>,
105    cached_typeinfo: Mutex<CachedTypeInfo>,
106
107    /// A buffer to use when writing out postgres commands.
108    buffer: Mutex<BytesMut>,
109}
110
111impl InnerClient {
112    pub fn send(&self, messages: RequestMessages) -> Result<Responses, Error> {
113        let (sender, receiver) = mpsc::channel(1);
114        let request = Request { messages, sender };
115        self.sender
116            .unbounded_send(request)
117            .map_err(|_| Error::closed())?;
118
119        Ok(Responses {
120            receiver,
121            cur: BackendMessages::empty(),
122        })
123    }
124
125    pub fn start_copy_both(&self) -> Result<CopyBothHandles, Error> {
126        let (sender, receiver) = mpsc::channel(16);
127        let (stream_sender, stream_receiver) = mpsc::channel(16);
128        let (sink_sender, sink_receiver) = mpsc::channel(16);
129
130        let responses = Responses {
131            receiver,
132            cur: BackendMessages::empty(),
133        };
134        let messages = RequestMessages::CopyBoth(CopyBothReceiver::new(
135            responses,
136            sink_receiver,
137            stream_sender,
138        ));
139
140        let request = Request { messages, sender };
141        self.sender
142            .unbounded_send(request)
143            .map_err(|_| Error::closed())?;
144
145        Ok(CopyBothHandles {
146            stream_receiver,
147            sink_sender,
148        })
149    }
150
151    pub fn typeinfo(&self) -> Option<Statement> {
152        self.cached_typeinfo.lock().typeinfo.clone()
153    }
154
155    pub fn set_typeinfo(&self, statement: &Statement) {
156        self.cached_typeinfo.lock().typeinfo = Some(statement.clone());
157    }
158
159    pub fn typeinfo_composite(&self) -> Option<Statement> {
160        self.cached_typeinfo.lock().typeinfo_composite.clone()
161    }
162
163    pub fn set_typeinfo_composite(&self, statement: &Statement) {
164        self.cached_typeinfo.lock().typeinfo_composite = Some(statement.clone());
165    }
166
167    pub fn typeinfo_enum(&self) -> Option<Statement> {
168        self.cached_typeinfo.lock().typeinfo_enum.clone()
169    }
170
171    pub fn set_typeinfo_enum(&self, statement: &Statement) {
172        self.cached_typeinfo.lock().typeinfo_enum = Some(statement.clone());
173    }
174
175    pub fn type_(&self, oid: Oid) -> Option<Type> {
176        self.cached_typeinfo.lock().types.get(&oid).cloned()
177    }
178
179    pub fn set_type(&self, oid: Oid, type_: &Type) {
180        self.cached_typeinfo.lock().types.insert(oid, type_.clone());
181    }
182
183    pub fn clear_type_cache(&self) {
184        self.cached_typeinfo.lock().types.clear();
185    }
186
187    /// Call the given function with a buffer to be used when writing out
188    /// postgres commands.
189    pub fn with_buf<F, R>(&self, f: F) -> R
190    where
191        F: FnOnce(&mut BytesMut) -> R,
192    {
193        let mut buffer = self.buffer.lock();
194        let r = f(&mut buffer);
195        buffer.clear();
196        r
197    }
198}
199
200#[cfg(feature = "runtime")]
201#[derive(Clone)]
202pub(crate) struct SocketConfig {
203    pub addr: Addr,
204    pub hostname: Option<String>,
205    pub port: u16,
206    pub connect_timeout: Option<Duration>,
207    pub tcp_user_timeout: Option<Duration>,
208    pub keepalive: Option<KeepaliveConfig>,
209}
210
211#[cfg(feature = "runtime")]
212#[derive(Clone)]
213pub(crate) enum Addr {
214    Tcp(IpAddr),
215    #[cfg(unix)]
216    Unix(PathBuf),
217}
218
219/// An asynchronous PostgreSQL client.
220///
221/// The client is one half of what is returned when a connection is established. Users interact with the database
222/// through this client object.
223pub struct Client {
224    inner: Arc<InnerClient>,
225    #[cfg(feature = "runtime")]
226    socket_config: Option<SocketConfig>,
227    ssl_mode: SslMode,
228    process_id: i32,
229    secret_key: i32,
230}
231
232impl Client {
233    pub(crate) fn new(
234        sender: mpsc::UnboundedSender<Request>,
235        ssl_mode: SslMode,
236        process_id: i32,
237        secret_key: i32,
238    ) -> Client {
239        Client {
240            inner: Arc::new(InnerClient {
241                sender,
242                cached_typeinfo: Default::default(),
243                buffer: Default::default(),
244            }),
245            #[cfg(feature = "runtime")]
246            socket_config: None,
247            ssl_mode,
248            process_id,
249            secret_key,
250        }
251    }
252
253    pub(crate) fn inner(&self) -> &Arc<InnerClient> {
254        &self.inner
255    }
256
257    #[cfg(feature = "runtime")]
258    pub(crate) fn set_socket_config(&mut self, socket_config: SocketConfig) {
259        self.socket_config = Some(socket_config);
260    }
261
262    /// Creates a new prepared statement.
263    ///
264    /// Prepared statements can be executed repeatedly, and may contain query parameters (indicated by `$1`, `$2`, etc),
265    /// which are set when executed. Prepared statements can only be used with the connection that created them.
266    pub async fn prepare(&self, query: &str) -> Result<Statement, Error> {
267        self.prepare_typed(query, &[]).await
268    }
269
270    /// Like `prepare`, but allows the types of query parameters to be explicitly specified.
271    ///
272    /// The list of types may be smaller than the number of parameters - the types of the remaining parameters will be
273    /// inferred. For example, `client.prepare_typed(query, &[])` is equivalent to `client.prepare(query)`.
274    pub async fn prepare_typed(
275        &self,
276        query: &str,
277        parameter_types: &[Type],
278    ) -> Result<Statement, Error> {
279        prepare::prepare(&self.inner, query, parameter_types).await
280    }
281
282    /// Executes a statement, returning a vector of the resulting rows.
283    ///
284    /// A statement may contain parameters, specified by `$n`, where `n` is the index of the parameter of the list
285    /// provided, 1-indexed.
286    ///
287    /// The `statement` argument can either be a `Statement`, or a raw query string. If the same statement will be
288    /// repeatedly executed (perhaps with different query parameters), consider preparing the statement up front
289    /// with the `prepare` method.
290    pub async fn query<T>(
291        &self,
292        statement: &T,
293        params: &[&(dyn ToSql + Sync)],
294    ) -> Result<Vec<Row>, Error>
295    where
296        T: ?Sized + ToStatement,
297    {
298        self.query_raw(statement, slice_iter(params))
299            .await?
300            .try_collect()
301            .await
302    }
303
304    /// Executes a statement which returns a single row, returning it.
305    ///
306    /// Returns an error if the query does not return exactly one row.
307    ///
308    /// A statement may contain parameters, specified by `$n`, where `n` is the index of the parameter of the list
309    /// provided, 1-indexed.
310    ///
311    /// The `statement` argument can either be a `Statement`, or a raw query string. If the same statement will be
312    /// repeatedly executed (perhaps with different query parameters), consider preparing the statement up front
313    /// with the `prepare` method.
314    pub async fn query_one<T>(
315        &self,
316        statement: &T,
317        params: &[&(dyn ToSql + Sync)],
318    ) -> Result<Row, Error>
319    where
320        T: ?Sized + ToStatement,
321    {
322        self.query_opt(statement, params)
323            .await
324            .and_then(|res| res.ok_or_else(Error::row_count))
325    }
326
327    /// Executes a statements which returns zero or one rows, returning it.
328    ///
329    /// Returns an error if the query returns more than one row.
330    ///
331    /// A statement may contain parameters, specified by `$n`, where `n` is the index of the parameter of the list
332    /// provided, 1-indexed.
333    ///
334    /// The `statement` argument can either be a `Statement`, or a raw query string. If the same statement will be
335    /// repeatedly executed (perhaps with different query parameters), consider preparing the statement up front
336    /// with the `prepare` method.
337    pub async fn query_opt<T>(
338        &self,
339        statement: &T,
340        params: &[&(dyn ToSql + Sync)],
341    ) -> Result<Option<Row>, Error>
342    where
343        T: ?Sized + ToStatement,
344    {
345        let stream = self.query_raw(statement, slice_iter(params)).await?;
346        pin_mut!(stream);
347
348        let mut first = None;
349
350        // Originally this was two calls to `try_next().await?`,
351        // once for the first element, and second to error if more than one.
352        //
353        // However, this new form with only one .await in a loop generates
354        // slightly smaller codegen/stack usage for the resulting future.
355        while let Some(row) = stream.try_next().await? {
356            if first.is_some() {
357                return Err(Error::row_count());
358            }
359
360            first = Some(row);
361        }
362
363        Ok(first)
364    }
365
366    /// The maximally flexible version of [`query`].
367    ///
368    /// A statement may contain parameters, specified by `$n`, where `n` is the index of the parameter of the list
369    /// provided, 1-indexed.
370    ///
371    /// The `statement` argument can either be a `Statement`, or a raw query string. If the same statement will be
372    /// repeatedly executed (perhaps with different query parameters), consider preparing the statement up front
373    /// with the `prepare` method.
374    ///
375    /// [`query`]: #method.query
376    ///
377    /// # Examples
378    ///
379    /// ```no_run
380    /// # async fn async_main(client: &tokio_postgres::Client) -> Result<(), tokio_postgres::Error> {
381    /// use futures_util::{pin_mut, TryStreamExt};
382    ///
383    /// let params: Vec<String> = vec![
384    ///     "first param".into(),
385    ///     "second param".into(),
386    /// ];
387    /// let mut it = client.query_raw(
388    ///     "SELECT foo FROM bar WHERE biz = $1 AND baz = $2",
389    ///     params,
390    /// ).await?;
391    ///
392    /// pin_mut!(it);
393    /// while let Some(row) = it.try_next().await? {
394    ///     let foo: i32 = row.get("foo");
395    ///     println!("foo: {}", foo);
396    /// }
397    /// # Ok(())
398    /// # }
399    /// ```
400    pub async fn query_raw<T, P, I>(&self, statement: &T, params: I) -> Result<RowStream, Error>
401    where
402        T: ?Sized + ToStatement,
403        P: BorrowToSql,
404        I: IntoIterator<Item = P>,
405        I::IntoIter: ExactSizeIterator,
406    {
407        let statement = statement.__convert().into_statement(self).await?;
408        query::query(&self.inner, statement, params).await
409    }
410
411    /// Like `query`, but requires the types of query parameters to be explicitly specified.
412    ///
413    /// Compared to `query`, this method allows performing queries without three round trips (for
414    /// prepare, execute, and close) by requiring the caller to specify parameter values along with
415    /// their Postgres type. Thus, this is suitable in environments where prepared statements aren't
416    /// supported (such as Cloudflare Workers with Hyperdrive).
417    ///
418    /// A statement may contain parameters, specified by `$n`, where `n` is the index of the
419    /// parameter of the list provided, 1-indexed.
420    pub async fn query_typed(
421        &self,
422        query: &str,
423        params: &[(&(dyn ToSql + Sync), Type)],
424    ) -> Result<Vec<Row>, Error> {
425        self.query_typed_raw(query, params.iter().map(|(v, t)| (*v, t.clone())))
426            .await?
427            .try_collect()
428            .await
429    }
430
431    /// The maximally flexible version of [`query_typed`].
432    ///
433    /// Compared to `query`, this method allows performing queries without three round trips (for
434    /// prepare, execute, and close) by requiring the caller to specify parameter values along with
435    /// their Postgres type. Thus, this is suitable in environments where prepared statements aren't
436    /// supported (such as Cloudflare Workers with Hyperdrive).
437    ///
438    /// A statement may contain parameters, specified by `$n`, where `n` is the index of the
439    /// parameter of the list provided, 1-indexed.
440    ///
441    /// [`query_typed`]: #method.query_typed
442    ///
443    /// # Examples
444    ///
445    /// ```no_run
446    /// # async fn async_main(client: &tokio_postgres::Client) -> Result<(), tokio_postgres::Error> {
447    /// use futures_util::{pin_mut, TryStreamExt};
448    /// use tokio_postgres::types::Type;
449    ///
450    /// let params: Vec<(String, Type)> = vec![
451    ///     ("first param".into(), Type::TEXT),
452    ///     ("second param".into(), Type::TEXT),
453    /// ];
454    /// let mut it = client.query_typed_raw(
455    ///     "SELECT foo FROM bar WHERE biz = $1 AND baz = $2",
456    ///     params,
457    /// ).await?;
458    ///
459    /// pin_mut!(it);
460    /// while let Some(row) = it.try_next().await? {
461    ///     let foo: i32 = row.get("foo");
462    ///     println!("foo: {}", foo);
463    /// }
464    /// # Ok(())
465    /// # }
466    /// ```
467    pub async fn query_typed_raw<P, I>(&self, query: &str, params: I) -> Result<RowStream, Error>
468    where
469        P: BorrowToSql,
470        I: IntoIterator<Item = (P, Type)>,
471    {
472        query::query_typed(&self.inner, query, params).await
473    }
474
475    /// Executes a statement, returning the number of rows modified.
476    ///
477    /// A statement may contain parameters, specified by `$n`, where `n` is the index of the parameter of the list
478    /// provided, 1-indexed.
479    ///
480    /// The `statement` argument can either be a `Statement`, or a raw query string. If the same statement will be
481    /// repeatedly executed (perhaps with different query parameters), consider preparing the statement up front
482    /// with the `prepare` method.
483    ///
484    /// If the statement does not modify any rows (e.g. `SELECT`), 0 is returned.
485    pub async fn execute<T>(
486        &self,
487        statement: &T,
488        params: &[&(dyn ToSql + Sync)],
489    ) -> Result<u64, Error>
490    where
491        T: ?Sized + ToStatement,
492    {
493        self.execute_raw(statement, slice_iter(params)).await
494    }
495
496    /// The maximally flexible version of [`execute`].
497    ///
498    /// A statement may contain parameters, specified by `$n`, where `n` is the index of the parameter of the list
499    /// provided, 1-indexed.
500    ///
501    /// The `statement` argument can either be a `Statement`, or a raw query string. If the same statement will be
502    /// repeatedly executed (perhaps with different query parameters), consider preparing the statement up front
503    /// with the `prepare` method.
504    ///
505    /// [`execute`]: #method.execute
506    pub async fn execute_raw<T, P, I>(&self, statement: &T, params: I) -> Result<u64, Error>
507    where
508        T: ?Sized + ToStatement,
509        P: BorrowToSql,
510        I: IntoIterator<Item = P>,
511        I::IntoIter: ExactSizeIterator,
512    {
513        let statement = statement.__convert().into_statement(self).await?;
514        query::execute(self.inner(), statement, params).await
515    }
516
517    /// Executes a `COPY FROM STDIN` statement, returning a sink used to write the copy data.
518    ///
519    /// PostgreSQL does not support parameters in `COPY` statements, so this method does not take any. The copy *must*
520    /// be explicitly completed via the `Sink::close` or `finish` methods. If it is not, the copy will be aborted.
521    pub async fn copy_in<T, U>(&self, statement: &T) -> Result<CopyInSink<U>, Error>
522    where
523        T: ?Sized + ToStatement,
524        U: Buf + 'static + Send,
525    {
526        let statement = statement.__convert().into_statement(self).await?;
527        copy_in::copy_in(self.inner(), statement).await
528    }
529
530    /// Executes a `COPY FROM STDIN` query, returning a sink used to write the copy data.
531    pub async fn copy_in_simple<U>(&self, query: &str) -> Result<CopyInSink<U>, Error>
532    where
533        U: Buf + 'static + Send,
534    {
535        copy_in::copy_in_simple(self.inner(), query).await
536    }
537
538    /// Executes a `COPY TO STDOUT` statement, returning a stream of the resulting data.
539    ///
540    /// PostgreSQL does not support parameters in `COPY` statements, so this method does not take any.
541    pub async fn copy_out<T>(&self, statement: &T) -> Result<CopyOutStream, Error>
542    where
543        T: ?Sized + ToStatement,
544    {
545        let statement = statement.__convert().into_statement(self).await?;
546        copy_out::copy_out(self.inner(), statement).await
547    }
548
549    /// Executes a `COPY TO STDOUT` query, returning a stream of the resulting data.
550    pub async fn copy_out_simple(&self, query: &str) -> Result<CopyOutStream, Error> {
551        copy_out::copy_out_simple(self.inner(), query).await
552    }
553
554    /// Executes a CopyBoth query, returning a combined Stream+Sink type to read and write copy
555    /// data.
556    pub async fn copy_both_simple<T>(&self, query: &str) -> Result<CopyBothDuplex<T>, Error>
557    where
558        T: Buf + 'static + Send,
559    {
560        copy_both::copy_both_simple(self.inner(), query).await
561    }
562
563    /// Executes a sequence of SQL statements using the simple query protocol, returning the resulting rows.
564    ///
565    /// Statements should be separated by semicolons. If an error occurs, execution of the sequence will stop at that
566    /// point. The simple query protocol returns the values in rows as strings rather than in their binary encodings,
567    /// so the associated row type doesn't work with the `FromSql` trait. Rather than simply returning a list of the
568    /// rows, this method returns a list of an enum which indicates either the completion of one of the commands,
569    /// or a row of data. This preserves the framing between the separate statements in the request.
570    ///
571    /// # Warning
572    ///
573    /// Prepared statements should be use for any query which contains user-specified data, as they provided the
574    /// functionality to safely embed that data in the request. Do not form statements via string concatenation and pass
575    /// them to this method!
576    pub async fn simple_query(&self, query: &str) -> Result<Vec<SimpleQueryMessage>, Error> {
577        self.simple_query_raw(query).await?.try_collect().await
578    }
579
580    pub(crate) async fn simple_query_raw(&self, query: &str) -> Result<SimpleQueryStream, Error> {
581        simple_query::simple_query(self.inner(), query).await
582    }
583
584    /// Executes a sequence of SQL statements using the simple query protocol.
585    ///
586    /// Statements should be separated by semicolons. If an error occurs, execution of the sequence will stop at that
587    /// point. This is intended for use when, for example, initializing a database schema.
588    ///
589    /// # Warning
590    ///
591    /// Prepared statements should be use for any query which contains user-specified data, as they provided the
592    /// functionality to safely embed that data in the request. Do not form statements via string concatenation and pass
593    /// them to this method!
594    pub async fn batch_execute(&self, query: &str) -> Result<(), Error> {
595        simple_query::batch_execute(self.inner(), query).await
596    }
597
598    /// Begins a new database transaction.
599    ///
600    /// The transaction will roll back by default - use the `commit` method to commit it.
601    pub async fn transaction(&mut self) -> Result<Transaction<'_>, Error> {
602        self.build_transaction().start().await
603    }
604
605    /// Returns a builder for a transaction with custom settings.
606    ///
607    /// Unlike the `transaction` method, the builder can be used to control the transaction's isolation level and other
608    /// attributes.
609    pub fn build_transaction(&mut self) -> TransactionBuilder<'_> {
610        TransactionBuilder::new(self)
611    }
612
613    /// Returns the server's process ID for the connection.
614    pub fn backend_pid(&self) -> i32 {
615        self.process_id
616    }
617
618    /// Constructs a cancellation token that can later be used to request cancellation of a query running on the
619    /// connection associated with this client.
620    pub fn cancel_token(&self) -> CancelToken {
621        CancelToken {
622            #[cfg(feature = "runtime")]
623            socket_config: self.socket_config.clone(),
624            ssl_mode: self.ssl_mode,
625            process_id: self.process_id,
626            secret_key: self.secret_key,
627        }
628    }
629
630    /// Attempts to cancel an in-progress query.
631    ///
632    /// The server provides no information about whether a cancellation attempt was successful or not. An error will
633    /// only be returned if the client was unable to connect to the database.
634    ///
635    /// Requires the `runtime` Cargo feature (enabled by default).
636    #[cfg(feature = "runtime")]
637    #[deprecated(since = "0.6.0", note = "use Client::cancel_token() instead")]
638    pub async fn cancel_query<T>(&self, tls: T) -> Result<(), Error>
639    where
640        T: MakeTlsConnect<Socket>,
641    {
642        self.cancel_token().cancel_query(tls).await
643    }
644
645    /// Like `cancel_query`, but uses a stream which is already connected to the server rather than opening a new
646    /// connection itself.
647    #[deprecated(since = "0.6.0", note = "use Client::cancel_token() instead")]
648    pub async fn cancel_query_raw<S, T>(&self, stream: S, tls: T) -> Result<(), Error>
649    where
650        S: AsyncRead + AsyncWrite + Unpin,
651        T: TlsConnect<S>,
652    {
653        self.cancel_token().cancel_query_raw(stream, tls).await
654    }
655
656    /// Clears the client's type information cache.
657    ///
658    /// When user-defined types are used in a query, the client loads their definitions from the database and caches
659    /// them for the lifetime of the client. If those definitions are changed in the database, this method can be used
660    /// to flush the local cache and allow the new, updated definitions to be loaded.
661    pub fn clear_type_cache(&self) {
662        self.inner().clear_type_cache();
663    }
664
665    /// Determines if the connection to the server has already closed.
666    ///
667    /// In that case, all future queries will fail.
668    pub fn is_closed(&self) -> bool {
669        self.inner.sender.is_closed()
670    }
671
672    #[doc(hidden)]
673    pub fn __private_api_close(&mut self) {
674        self.inner.sender.close_channel()
675    }
676}
677
678impl fmt::Debug for Client {
679    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
680        f.debug_struct("Client").finish()
681    }
682}