tokio_postgres/client.rs
1use crate::codec::{BackendMessages, FrontendMessage};
2use crate::config::{SslMode, SslNegotiation};
3use crate::connection::{Request, RequestMessages};
4use crate::copy_both::{CopyBothDuplex, CopyBothReceiver};
5use crate::copy_out::CopyOutStream;
6#[cfg(feature = "runtime")]
7use crate::keepalive::KeepaliveConfig;
8use crate::query::RowStream;
9use crate::simple_query::SimpleQueryStream;
10#[cfg(feature = "runtime")]
11use crate::tls::MakeTlsConnect;
12use crate::tls::TlsConnect;
13use crate::types::{Oid, ToSql, Type};
14#[cfg(feature = "runtime")]
15use crate::Socket;
16use crate::{
17 copy_both, copy_in, copy_out, prepare, query, simple_query, slice_iter, CancelToken,
18 CopyInSink, Error, Row, SimpleQueryMessage, Statement, ToStatement, Transaction,
19 TransactionBuilder,
20};
21use bytes::{Buf, BytesMut};
22use fallible_iterator::FallibleIterator;
23use futures_channel::mpsc;
24use futures_util::{Stream, StreamExt, TryStreamExt};
25use parking_lot::Mutex;
26use postgres_protocol::message::backend::Message;
27use postgres_protocol::message::frontend;
28use postgres_types::BorrowToSql;
29use std::collections::HashMap;
30use std::fmt;
31use std::future;
32#[cfg(feature = "runtime")]
33use std::net::IpAddr;
34#[cfg(feature = "runtime")]
35use std::path::PathBuf;
36use std::pin::pin;
37use std::pin::Pin;
38use std::sync::Arc;
39use std::task::{ready, Context, Poll};
40#[cfg(feature = "runtime")]
41use std::time::Duration;
42use tokio::io::{AsyncRead, AsyncWrite};
43
44pub struct Responses {
45 receiver: mpsc::Receiver<BackendMessages>,
46 cur: BackendMessages,
47}
48
49pub struct CopyBothHandles {
50 pub(crate) stream_receiver: mpsc::Receiver<Result<Message, Error>>,
51 pub(crate) sink_sender: mpsc::Sender<FrontendMessage>,
52}
53
54impl Responses {
55 pub fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll<Result<Message, Error>> {
56 loop {
57 match self.cur.next().map_err(Error::parse)? {
58 Some(Message::ErrorResponse(body)) => return Poll::Ready(Err(Error::db(body))),
59 Some(message) => return Poll::Ready(Ok(message)),
60 None => {}
61 }
62
63 match ready!(self.receiver.poll_next_unpin(cx)) {
64 Some(messages) => self.cur = messages,
65 None => return Poll::Ready(Err(Error::closed())),
66 }
67 }
68 }
69
70 pub async fn next(&mut self) -> Result<Message, Error> {
71 future::poll_fn(|cx| self.poll_next(cx)).await
72 }
73}
74
75impl Stream for Responses {
76 type Item = Result<Message, Error>;
77
78 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
79 match ready!((*self).poll_next(cx)) {
80 Err(err) if err.is_closed() => Poll::Ready(None),
81 msg => Poll::Ready(Some(msg)),
82 }
83 }
84}
85
86/// A cache of type info and prepared statements for fetching type info
87/// (corresponding to the queries in the [prepare](prepare) module).
88#[derive(Default)]
89struct CachedTypeInfo {
90 /// A statement for basic information for a type from its
91 /// OID. Corresponds to [TYPEINFO_QUERY](prepare::TYPEINFO_QUERY) (or its
92 /// fallback).
93 typeinfo: Option<Statement>,
94 /// A statement for getting information for a composite type from its OID.
95 /// Corresponds to [TYPEINFO_QUERY](prepare::TYPEINFO_COMPOSITE_QUERY).
96 typeinfo_composite: Option<Statement>,
97 /// A statement for getting information for a composite type from its OID.
98 /// Corresponds to [TYPEINFO_QUERY](prepare::TYPEINFO_COMPOSITE_QUERY) (or
99 /// its fallback).
100 typeinfo_enum: Option<Statement>,
101
102 /// Cache of types already looked up.
103 types: HashMap<Oid, Type>,
104}
105
106pub struct InnerClient {
107 sender: mpsc::UnboundedSender<Request>,
108 cached_typeinfo: Mutex<CachedTypeInfo>,
109
110 /// A buffer to use when writing out postgres commands.
111 buffer: Mutex<BytesMut>,
112}
113
114impl InnerClient {
115 pub fn send(&self, messages: RequestMessages) -> Result<Responses, Error> {
116 let (sender, receiver) = mpsc::channel(1);
117 let request = Request { messages, sender };
118 self.sender
119 .unbounded_send(request)
120 .map_err(|_| Error::closed())?;
121
122 Ok(Responses {
123 receiver,
124 cur: BackendMessages::empty(),
125 })
126 }
127
128 pub fn start_copy_both(&self) -> Result<CopyBothHandles, Error> {
129 let (sender, receiver) = mpsc::channel(16);
130 let (stream_sender, stream_receiver) = mpsc::channel(16);
131 let (sink_sender, sink_receiver) = mpsc::channel(16);
132
133 let responses = Responses {
134 receiver,
135 cur: BackendMessages::empty(),
136 };
137 let messages = RequestMessages::CopyBoth(CopyBothReceiver::new(
138 responses,
139 sink_receiver,
140 stream_sender,
141 ));
142
143 let request = Request { messages, sender };
144 self.sender
145 .unbounded_send(request)
146 .map_err(|_| Error::closed())?;
147
148 Ok(CopyBothHandles {
149 stream_receiver,
150 sink_sender,
151 })
152 }
153
154 pub fn typeinfo(&self) -> Option<Statement> {
155 self.cached_typeinfo.lock().typeinfo.clone()
156 }
157
158 pub fn set_typeinfo(&self, statement: &Statement) {
159 self.cached_typeinfo.lock().typeinfo = Some(statement.clone());
160 }
161
162 pub fn typeinfo_composite(&self) -> Option<Statement> {
163 self.cached_typeinfo.lock().typeinfo_composite.clone()
164 }
165
166 pub fn set_typeinfo_composite(&self, statement: &Statement) {
167 self.cached_typeinfo.lock().typeinfo_composite = Some(statement.clone());
168 }
169
170 pub fn typeinfo_enum(&self) -> Option<Statement> {
171 self.cached_typeinfo.lock().typeinfo_enum.clone()
172 }
173
174 pub fn set_typeinfo_enum(&self, statement: &Statement) {
175 self.cached_typeinfo.lock().typeinfo_enum = Some(statement.clone());
176 }
177
178 pub fn type_(&self, oid: Oid) -> Option<Type> {
179 self.cached_typeinfo.lock().types.get(&oid).cloned()
180 }
181
182 pub fn set_type(&self, oid: Oid, type_: &Type) {
183 self.cached_typeinfo.lock().types.insert(oid, type_.clone());
184 }
185
186 pub fn clear_type_cache(&self) {
187 self.cached_typeinfo.lock().types.clear();
188 }
189
190 /// Call the given function with a buffer to be used when writing out
191 /// postgres commands.
192 pub fn with_buf<F, R>(&self, f: F) -> R
193 where
194 F: FnOnce(&mut BytesMut) -> R,
195 {
196 let mut buffer = self.buffer.lock();
197 let r = f(&mut buffer);
198 buffer.clear();
199 r
200 }
201}
202
203#[cfg(feature = "runtime")]
204#[derive(Clone)]
205pub(crate) struct SocketConfig {
206 pub addr: Addr,
207 pub hostname: Option<String>,
208 pub port: u16,
209 pub connect_timeout: Option<Duration>,
210 pub tcp_user_timeout: Option<Duration>,
211 pub keepalive: Option<KeepaliveConfig>,
212}
213
214#[cfg(feature = "runtime")]
215#[derive(Clone)]
216pub(crate) enum Addr {
217 Tcp(IpAddr),
218 #[cfg(unix)]
219 Unix(PathBuf),
220}
221
222/// An asynchronous PostgreSQL client.
223///
224/// The client is one half of what is returned when a connection is established. Users interact with the database
225/// through this client object.
226pub struct Client {
227 inner: Arc<InnerClient>,
228 #[cfg(feature = "runtime")]
229 socket_config: Option<SocketConfig>,
230 ssl_mode: SslMode,
231 ssl_negotiation: SslNegotiation,
232 process_id: i32,
233 secret_key: i32,
234}
235
236impl Client {
237 pub(crate) fn new(
238 sender: mpsc::UnboundedSender<Request>,
239 ssl_mode: SslMode,
240 ssl_negotiation: SslNegotiation,
241 process_id: i32,
242 secret_key: i32,
243 ) -> Client {
244 Client {
245 inner: Arc::new(InnerClient {
246 sender,
247 cached_typeinfo: Default::default(),
248 buffer: Default::default(),
249 }),
250 #[cfg(feature = "runtime")]
251 socket_config: None,
252 ssl_mode,
253 ssl_negotiation,
254 process_id,
255 secret_key,
256 }
257 }
258
259 pub(crate) fn inner(&self) -> &Arc<InnerClient> {
260 &self.inner
261 }
262
263 #[cfg(feature = "runtime")]
264 pub(crate) fn set_socket_config(&mut self, socket_config: SocketConfig) {
265 self.socket_config = Some(socket_config);
266 }
267
268 /// Creates a new prepared statement.
269 ///
270 /// Prepared statements can be executed repeatedly, and may contain query parameters (indicated by `$1`, `$2`, etc),
271 /// which are set when executed. Prepared statements can only be used with the connection that created them.
272 pub async fn prepare(&self, query: &str) -> Result<Statement, Error> {
273 self.prepare_typed(query, &[]).await
274 }
275
276 /// Like `prepare`, but allows the types of query parameters to be explicitly specified.
277 ///
278 /// The list of types may be smaller than the number of parameters - the types of the remaining parameters will be
279 /// inferred. For example, `client.prepare_typed(query, &[])` is equivalent to `client.prepare(query)`.
280 pub async fn prepare_typed(
281 &self,
282 query: &str,
283 parameter_types: &[Type],
284 ) -> Result<Statement, Error> {
285 prepare::prepare(&self.inner, query, parameter_types).await
286 }
287
288 /// Executes a statement, returning a vector of the resulting rows.
289 ///
290 /// A statement may contain parameters, specified by `$n`, where `n` is the index of the parameter of the list
291 /// provided, 1-indexed.
292 ///
293 /// The `statement` argument can either be a `Statement`, or a raw query string. If the same statement will be
294 /// repeatedly executed (perhaps with different query parameters), consider preparing the statement up front
295 /// with the `prepare` method.
296 pub async fn query<T>(
297 &self,
298 statement: &T,
299 params: &[&(dyn ToSql + Sync)],
300 ) -> Result<Vec<Row>, Error>
301 where
302 T: ?Sized + ToStatement,
303 {
304 self.query_raw(statement, slice_iter(params))
305 .await?
306 .try_collect()
307 .await
308 }
309
310 /// Executes a statement which returns a single row, returning it.
311 ///
312 /// Returns an error if the query does not return exactly one row.
313 ///
314 /// A statement may contain parameters, specified by `$n`, where `n` is the index of the parameter of the list
315 /// provided, 1-indexed.
316 ///
317 /// The `statement` argument can either be a `Statement`, or a raw query string. If the same statement will be
318 /// repeatedly executed (perhaps with different query parameters), consider preparing the statement up front
319 /// with the `prepare` method.
320 pub async fn query_one<T>(
321 &self,
322 statement: &T,
323 params: &[&(dyn ToSql + Sync)],
324 ) -> Result<Row, Error>
325 where
326 T: ?Sized + ToStatement,
327 {
328 self.query_opt(statement, params)
329 .await
330 .and_then(|res| res.ok_or_else(Error::row_count))
331 }
332
333 /// Executes a statements which returns zero or one rows, returning it.
334 ///
335 /// Returns an error if the query returns more than one row.
336 ///
337 /// A statement may contain parameters, specified by `$n`, where `n` is the index of the parameter of the list
338 /// provided, 1-indexed.
339 ///
340 /// The `statement` argument can either be a `Statement`, or a raw query string. If the same statement will be
341 /// repeatedly executed (perhaps with different query parameters), consider preparing the statement up front
342 /// with the `prepare` method.
343 pub async fn query_opt<T>(
344 &self,
345 statement: &T,
346 params: &[&(dyn ToSql + Sync)],
347 ) -> Result<Option<Row>, Error>
348 where
349 T: ?Sized + ToStatement,
350 {
351 let mut stream = pin!(self.query_raw(statement, slice_iter(params)).await?);
352
353 let mut first = None;
354
355 // Originally this was two calls to `try_next().await?`,
356 // once for the first element, and second to error if more than one.
357 //
358 // However, this new form with only one .await in a loop generates
359 // slightly smaller codegen/stack usage for the resulting future.
360 while let Some(row) = stream.try_next().await? {
361 if first.is_some() {
362 return Err(Error::row_count());
363 }
364
365 first = Some(row);
366 }
367
368 Ok(first)
369 }
370
371 /// The maximally flexible version of [`query`].
372 ///
373 /// A statement may contain parameters, specified by `$n`, where `n` is the index of the parameter of the list
374 /// provided, 1-indexed.
375 ///
376 /// The `statement` argument can either be a `Statement`, or a raw query string. If the same statement will be
377 /// repeatedly executed (perhaps with different query parameters), consider preparing the statement up front
378 /// with the `prepare` method.
379 ///
380 /// [`query`]: #method.query
381 ///
382 /// # Examples
383 ///
384 /// ```no_run
385 /// # async fn async_main(client: &tokio_postgres::Client) -> Result<(), tokio_postgres::Error> {
386 /// use std::pin::pin;
387 /// use futures_util::TryStreamExt;
388 ///
389 /// let params: Vec<String> = vec![
390 /// "first param".into(),
391 /// "second param".into(),
392 /// ];
393 /// let mut it = pin!(client.query_raw(
394 /// "SELECT foo FROM bar WHERE biz = $1 AND baz = $2",
395 /// params,
396 /// ).await?);
397 ///
398 /// while let Some(row) = it.try_next().await? {
399 /// let foo: i32 = row.get("foo");
400 /// println!("foo: {}", foo);
401 /// }
402 /// # Ok(())
403 /// # }
404 /// ```
405 pub async fn query_raw<T, P, I>(&self, statement: &T, params: I) -> Result<RowStream, Error>
406 where
407 T: ?Sized + ToStatement,
408 P: BorrowToSql,
409 I: IntoIterator<Item = P>,
410 I::IntoIter: ExactSizeIterator,
411 {
412 let statement = statement.__convert().into_statement(self).await?;
413 query::query(&self.inner, statement, params).await
414 }
415
416 /// Like `query`, but requires the types of query parameters to be explicitly specified.
417 ///
418 /// Compared to `query`, this method allows performing queries without three round trips (for
419 /// prepare, execute, and close) by requiring the caller to specify parameter values along with
420 /// their Postgres type. Thus, this is suitable in environments where prepared statements aren't
421 /// supported (such as Cloudflare Workers with Hyperdrive).
422 ///
423 /// A statement may contain parameters, specified by `$n`, where `n` is the index of the
424 /// parameter of the list provided, 1-indexed.
425 pub async fn query_typed(
426 &self,
427 query: &str,
428 params: &[(&(dyn ToSql + Sync), Type)],
429 ) -> Result<Vec<Row>, Error> {
430 self.query_typed_raw(query, params.iter().map(|(v, t)| (*v, t.clone())))
431 .await?
432 .try_collect()
433 .await
434 }
435
436 /// The maximally flexible version of [`query_typed`].
437 ///
438 /// Compared to `query`, this method allows performing queries without three round trips (for
439 /// prepare, execute, and close) by requiring the caller to specify parameter values along with
440 /// their Postgres type. Thus, this is suitable in environments where prepared statements aren't
441 /// supported (such as Cloudflare Workers with Hyperdrive).
442 ///
443 /// A statement may contain parameters, specified by `$n`, where `n` is the index of the
444 /// parameter of the list provided, 1-indexed.
445 ///
446 /// [`query_typed`]: #method.query_typed
447 ///
448 /// # Examples
449 ///
450 /// ```no_run
451 /// # async fn async_main(client: &tokio_postgres::Client) -> Result<(), tokio_postgres::Error> {
452 /// use std::pin::pin;
453 /// use futures_util::{TryStreamExt};
454 /// use tokio_postgres::types::Type;
455 ///
456 /// let params: Vec<(String, Type)> = vec![
457 /// ("first param".into(), Type::TEXT),
458 /// ("second param".into(), Type::TEXT),
459 /// ];
460 /// let mut it = pin!(client.query_typed_raw(
461 /// "SELECT foo FROM bar WHERE biz = $1 AND baz = $2",
462 /// params,
463 /// ).await?);
464 ///
465 /// while let Some(row) = it.try_next().await? {
466 /// let foo: i32 = row.get("foo");
467 /// println!("foo: {}", foo);
468 /// }
469 /// # Ok(())
470 /// # }
471 /// ```
472 pub async fn query_typed_raw<P, I>(&self, query: &str, params: I) -> Result<RowStream, Error>
473 where
474 P: BorrowToSql,
475 I: IntoIterator<Item = (P, Type)>,
476 {
477 query::query_typed(&self.inner, query, params).await
478 }
479
480 /// Executes a statement, returning the number of rows modified.
481 ///
482 /// A statement may contain parameters, specified by `$n`, where `n` is the index of the parameter of the list
483 /// provided, 1-indexed.
484 ///
485 /// The `statement` argument can either be a `Statement`, or a raw query string. If the same statement will be
486 /// repeatedly executed (perhaps with different query parameters), consider preparing the statement up front
487 /// with the `prepare` method.
488 ///
489 /// If the statement does not modify any rows (e.g. `SELECT`), 0 is returned.
490 pub async fn execute<T>(
491 &self,
492 statement: &T,
493 params: &[&(dyn ToSql + Sync)],
494 ) -> Result<u64, Error>
495 where
496 T: ?Sized + ToStatement,
497 {
498 self.execute_raw(statement, slice_iter(params)).await
499 }
500
501 /// The maximally flexible version of [`execute`].
502 ///
503 /// A statement may contain parameters, specified by `$n`, where `n` is the index of the parameter of the list
504 /// provided, 1-indexed.
505 ///
506 /// The `statement` argument can either be a `Statement`, or a raw query string. If the same statement will be
507 /// repeatedly executed (perhaps with different query parameters), consider preparing the statement up front
508 /// with the `prepare` method.
509 ///
510 /// [`execute`]: #method.execute
511 pub async fn execute_raw<T, P, I>(&self, statement: &T, params: I) -> Result<u64, Error>
512 where
513 T: ?Sized + ToStatement,
514 P: BorrowToSql,
515 I: IntoIterator<Item = P>,
516 I::IntoIter: ExactSizeIterator,
517 {
518 let statement = statement.__convert().into_statement(self).await?;
519 query::execute(self.inner(), statement, params).await
520 }
521
522 /// Executes a `COPY FROM STDIN` statement, returning a sink used to write the copy data.
523 ///
524 /// PostgreSQL does not support parameters in `COPY` statements, so this method does not take any. The copy *must*
525 /// be explicitly completed via the `Sink::close` or `finish` methods. If it is not, the copy will be aborted.
526 pub async fn copy_in<T, U>(&self, statement: &T) -> Result<CopyInSink<U>, Error>
527 where
528 T: ?Sized + ToStatement,
529 U: Buf + 'static + Send,
530 {
531 let statement = statement.__convert().into_statement(self).await?;
532 copy_in::copy_in(self.inner(), statement).await
533 }
534
535 /// Executes a `COPY FROM STDIN` query, returning a sink used to write the copy data.
536 pub async fn copy_in_simple<U>(&self, query: &str) -> Result<CopyInSink<U>, Error>
537 where
538 U: Buf + 'static + Send,
539 {
540 copy_in::copy_in_simple(self.inner(), query).await
541 }
542
543 /// Executes a `COPY TO STDOUT` statement, returning a stream of the resulting data.
544 ///
545 /// PostgreSQL does not support parameters in `COPY` statements, so this method does not take any.
546 pub async fn copy_out<T>(&self, statement: &T) -> Result<CopyOutStream, Error>
547 where
548 T: ?Sized + ToStatement,
549 {
550 let statement = statement.__convert().into_statement(self).await?;
551 copy_out::copy_out(self.inner(), statement).await
552 }
553
554 /// Executes a `COPY TO STDOUT` query, returning a stream of the resulting data.
555 pub async fn copy_out_simple(&self, query: &str) -> Result<CopyOutStream, Error> {
556 copy_out::copy_out_simple(self.inner(), query).await
557 }
558
559 /// Executes a CopyBoth query, returning a combined Stream+Sink type to read and write copy
560 /// data.
561 pub async fn copy_both_simple<T>(&self, query: &str) -> Result<CopyBothDuplex<T>, Error>
562 where
563 T: Buf + 'static + Send,
564 {
565 copy_both::copy_both_simple(self.inner(), query).await
566 }
567
568 /// Executes a sequence of SQL statements using the simple query protocol, returning the resulting rows.
569 ///
570 /// Statements should be separated by semicolons. If an error occurs, execution of the sequence will stop at that
571 /// point. The simple query protocol returns the values in rows as strings rather than in their binary encodings,
572 /// so the associated row type doesn't work with the `FromSql` trait. Rather than simply returning a list of the
573 /// rows, this method returns a list of an enum which indicates either the completion of one of the commands,
574 /// or a row of data. This preserves the framing between the separate statements in the request.
575 ///
576 /// # Warning
577 ///
578 /// Prepared statements should be use for any query which contains user-specified data, as they provided the
579 /// functionality to safely embed that data in the request. Do not form statements via string concatenation and pass
580 /// them to this method!
581 pub async fn simple_query(&self, query: &str) -> Result<Vec<SimpleQueryMessage>, Error> {
582 self.simple_query_raw(query).await?.try_collect().await
583 }
584
585 /// Executes a sequence of SQL statements using the simple query protocol, returning the resulting rows as a stream.
586 ///
587 /// Statements should be separated by semicolons. If an error occurs, execution of the sequence will stop at that
588 /// point. The simple query protocol returns the values in rows as strings rather than in their binary encodings,
589 /// so the associated row type doesn't work with the `FromSql` trait. Rather than simply returning a list of the
590 /// rows, this method returns a list of an enum which indicates either the completion of one of the commands,
591 /// or a row of data. This preserves the framing between the separate statements in the request.
592 ///
593 /// # Warning
594 ///
595 /// Prepared statements should be use for any query which contains user-specified data, as they provided the
596 /// functionality to safely embed that data in the request. Do not form statements via string concatenation and pass
597 /// them to this method!
598 pub async fn simple_query_raw(&self, query: &str) -> Result<SimpleQueryStream, Error> {
599 simple_query::simple_query(self.inner(), query).await
600 }
601
602 /// Executes a sequence of SQL statements using the simple query protocol.
603 ///
604 /// Statements should be separated by semicolons. If an error occurs, execution of the sequence will stop at that
605 /// point. This is intended for use when, for example, initializing a database schema.
606 ///
607 /// # Warning
608 ///
609 /// Prepared statements should be use for any query which contains user-specified data, as they provided the
610 /// functionality to safely embed that data in the request. Do not form statements via string concatenation and pass
611 /// them to this method!
612 pub async fn batch_execute(&self, query: &str) -> Result<(), Error> {
613 simple_query::batch_execute(self.inner(), query).await
614 }
615
616 /// Check that the connection is alive and wait for the confirmation.
617 pub async fn check_connection(&self) -> Result<(), Error> {
618 // sync is a very quick message to test the connection health.
619 query::sync(self.inner()).await
620 }
621
622 /// Begins a new database transaction.
623 ///
624 /// The transaction will roll back by default - use the `commit` method to commit it.
625 pub async fn transaction(&mut self) -> Result<Transaction<'_>, Error> {
626 self.build_transaction().start().await
627 }
628
629 /// Returns a builder for a transaction with custom settings.
630 ///
631 /// Unlike the `transaction` method, the builder can be used to control the transaction's isolation level and other
632 /// attributes.
633 pub fn build_transaction(&mut self) -> TransactionBuilder<'_> {
634 TransactionBuilder::new(self)
635 }
636
637 /// Returns the server's process ID for the connection.
638 pub fn backend_pid(&self) -> i32 {
639 self.process_id
640 }
641
642 /// Constructs a cancellation token that can later be used to request cancellation of a query running on the
643 /// connection associated with this client.
644 pub fn cancel_token(&self) -> CancelToken {
645 CancelToken {
646 #[cfg(feature = "runtime")]
647 socket_config: self.socket_config.clone(),
648 ssl_mode: self.ssl_mode,
649 ssl_negotiation: self.ssl_negotiation,
650 process_id: self.process_id,
651 secret_key: self.secret_key,
652 }
653 }
654
655 /// Attempts to cancel an in-progress query.
656 ///
657 /// The server provides no information about whether a cancellation attempt was successful or not. An error will
658 /// only be returned if the client was unable to connect to the database.
659 ///
660 /// Requires the `runtime` Cargo feature (enabled by default).
661 #[cfg(feature = "runtime")]
662 #[deprecated(since = "0.6.0", note = "use Client::cancel_token() instead")]
663 pub async fn cancel_query<T>(&self, tls: T) -> Result<(), Error>
664 where
665 T: MakeTlsConnect<Socket>,
666 {
667 self.cancel_token().cancel_query(tls).await
668 }
669
670 /// Like `cancel_query`, but uses a stream which is already connected to the server rather than opening a new
671 /// connection itself.
672 #[deprecated(since = "0.6.0", note = "use Client::cancel_token() instead")]
673 pub async fn cancel_query_raw<S, T>(&self, stream: S, tls: T) -> Result<(), Error>
674 where
675 S: AsyncRead + AsyncWrite + Unpin,
676 T: TlsConnect<S>,
677 {
678 self.cancel_token().cancel_query_raw(stream, tls).await
679 }
680
681 /// Clears the client's type information cache.
682 ///
683 /// When user-defined types are used in a query, the client loads their definitions from the database and caches
684 /// them for the lifetime of the client. If those definitions are changed in the database, this method can be used
685 /// to flush the local cache and allow the new, updated definitions to be loaded.
686 pub fn clear_type_cache(&self) {
687 self.inner().clear_type_cache();
688 }
689
690 /// Determines if the connection to the server has already closed.
691 ///
692 /// In that case, all future queries will fail.
693 pub fn is_closed(&self) -> bool {
694 self.inner.sender.is_closed()
695 }
696
697 #[doc(hidden)]
698 pub fn __private_api_rollback(&self, name: Option<&str>) {
699 let buf = self.inner().with_buf(|buf| {
700 if let Some(name) = name {
701 frontend::query(&format!("ROLLBACK TO {}", name), buf).unwrap();
702 } else {
703 frontend::query("ROLLBACK", buf).unwrap();
704 }
705 buf.split().freeze()
706 });
707 let _ = self
708 .inner()
709 .send(RequestMessages::Single(FrontendMessage::Raw(buf)));
710 }
711
712 #[doc(hidden)]
713 pub fn __private_api_close(&mut self) {
714 self.inner.sender.close_channel()
715 }
716}
717
718impl fmt::Debug for Client {
719 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
720 f.debug_struct("Client").finish()
721 }
722}