Skip to main content

mz_postgres_client/
lib.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! A Postgres client that uses deadpool as a connection pool and comes with
11//! common/default configuration options.
12
13#![warn(missing_docs, missing_debug_implementations)]
14#![warn(
15    clippy::cast_possible_truncation,
16    clippy::cast_precision_loss,
17    clippy::cast_sign_loss,
18    clippy::clone_on_ref_ptr
19)]
20
21pub mod error;
22pub mod metrics;
23
24use std::sync::Arc;
25use std::sync::atomic::{AtomicU64, Ordering};
26use std::time::{Duration, Instant};
27
28use deadpool_postgres::tokio_postgres::Config;
29use deadpool_postgres::{
30    Hook, HookError, HookErrorCause, Manager, ManagerConfig, Object, Pool, PoolError,
31    RecyclingMethod, Runtime, Status,
32};
33use mz_ore::cast::{CastFrom, CastLossy};
34use mz_ore::now::SYSTEM_TIME;
35use mz_ore::url::SensitiveUrl;
36use tracing::debug;
37
38use crate::error::PostgresError;
39use crate::metrics::PostgresClientMetrics;
40
41/// Configuration knobs for [PostgresClient].
42pub trait PostgresClientKnobs: std::fmt::Debug + Send + Sync {
43    /// Maximum number of connections allowed in a pool.
44    fn connection_pool_max_size(&self) -> usize;
45    /// The maximum time to wait to obtain a connection, if any.
46    fn connection_pool_max_wait(&self) -> Option<Duration>;
47    /// Minimum TTL of a connection. It is expected that connections are
48    /// routinely culled to balance load to the backing store.
49    fn connection_pool_ttl(&self) -> Duration;
50    /// Minimum time between TTLing connections. Helps stagger reconnections
51    /// to avoid stampeding the backing store.
52    fn connection_pool_ttl_stagger(&self) -> Duration;
53    /// Time to wait for a connection to be made before retrying.
54    fn connect_timeout(&self) -> Duration;
55    /// TCP user timeout for connections.
56    fn tcp_user_timeout(&self) -> Duration;
57    /// Amount of idle time before a TCP keepalive packet is sent on a connection.
58    fn keepalives_idle(&self) -> Duration;
59    /// Time interval between TCP keepalive probes.
60    fn keepalives_interval(&self) -> Duration;
61    /// Maximum number of TCP keepalive probes that will be sent before dropping a connection.
62    fn keepalives_retries(&self) -> u32;
63}
64
65/// Configuration for creating a [PostgresClient].
66#[derive(Clone, Debug)]
67pub struct PostgresClientConfig {
68    url: SensitiveUrl,
69    knobs: Arc<dyn PostgresClientKnobs>,
70    metrics: PostgresClientMetrics,
71}
72
73impl PostgresClientConfig {
74    /// Returns a new [PostgresClientConfig] for use in production.
75    pub fn new(
76        url: SensitiveUrl,
77        knobs: Arc<dyn PostgresClientKnobs>,
78        metrics: PostgresClientMetrics,
79    ) -> Self {
80        PostgresClientConfig {
81            url,
82            knobs,
83            metrics,
84        }
85    }
86}
87
88/// A Postgres client wrapper that uses deadpool as a connection pool.
89pub struct PostgresClient {
90    pool: Pool,
91    metrics: PostgresClientMetrics,
92}
93
94impl std::fmt::Debug for PostgresClient {
95    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
96        f.debug_struct("PostgresClient").finish_non_exhaustive()
97    }
98}
99
100impl PostgresClient {
101    /// Open a [PostgresClient] using the given `config`.
102    pub fn open(config: PostgresClientConfig) -> Result<Self, PostgresError> {
103        let mut pg_config: Config = config.url.to_string_unredacted().parse()?;
104        pg_config.connect_timeout(config.knobs.connect_timeout());
105        pg_config.tcp_user_timeout(config.knobs.tcp_user_timeout());
106
107        // Configuring keepalives is important to ensure we can detect broken connections quickly.
108        // TCP_USER_TIMEOUT is not sufficient as it only enforces a timeout on ACKs for transmitted
109        // data, which only helps if we... transmit data.
110        pg_config.keepalives(true);
111        pg_config.keepalives_idle(config.knobs.keepalives_idle());
112        pg_config.keepalives_interval(config.knobs.keepalives_interval());
113        pg_config.keepalives_retries(config.knobs.keepalives_retries());
114
115        let tls = mz_tls_util::make_tls(&pg_config).map_err(|tls_err| match tls_err {
116            mz_tls_util::TlsError::Generic(e) => PostgresError::Indeterminate(e),
117            mz_tls_util::TlsError::OpenSsl(e) => PostgresError::Indeterminate(anyhow::anyhow!(e)),
118        })?;
119
120        let manager = Manager::from_config(
121            pg_config,
122            tls,
123            ManagerConfig {
124                recycling_method: RecyclingMethod::Fast,
125            },
126        );
127
128        let last_ttl_connection = AtomicU64::new(0);
129        let connections_created = config.metrics.connpool_connections_created.clone();
130        let ttl_reconnections = config.metrics.connpool_ttl_reconnections.clone();
131        let builder = Pool::builder(manager);
132        let builder = match config.knobs.connection_pool_max_wait() {
133            None => builder,
134            Some(wait) => builder.wait_timeout(Some(wait)).runtime(Runtime::Tokio1),
135        };
136        let pool = builder
137            .max_size(config.knobs.connection_pool_max_size())
138            .post_create(Hook::async_fn(move |client, _| {
139                connections_created.inc();
140                Box::pin(async move {
141                    debug!("opened new consensus postgres connection");
142                    // This hook must return `tokio_postgres::Error`; using
143                    // `mz_postgres_util` wrappers would change the error type.
144                    #[allow(clippy::disallowed_methods)]
145                    client
146                        .batch_execute(
147                        "SET SESSION CHARACTERISTICS AS TRANSACTION ISOLATION LEVEL SERIALIZABLE",
148                    )
149                        .await
150                        .map_err(|e| HookError::Abort(HookErrorCause::Backend(e)))
151                })
152            }))
153            .pre_recycle(Hook::sync_fn(move |_client, conn_metrics| {
154                // proactively TTL connections to rebalance load to Postgres/CRDB. this helps
155                // fix skew when downstream DB operations (e.g. CRDB rolling restart) result
156                // in uneven load to each node, and works to reduce the # of connections
157                // maintained by the pool after bursty workloads.
158
159                // add a bias towards TTLing older connections first
160                if conn_metrics.age() < config.knobs.connection_pool_ttl() {
161                    return Ok(());
162                }
163
164                let last_ttl = last_ttl_connection.load(Ordering::SeqCst);
165                let now = (SYSTEM_TIME)();
166                let elapsed_since_last_ttl = Duration::from_millis(now.saturating_sub(last_ttl));
167
168                // stagger out reconnections to avoid stampeding the DB
169                if elapsed_since_last_ttl > config.knobs.connection_pool_ttl_stagger()
170                    && last_ttl_connection
171                        .compare_exchange_weak(last_ttl, now, Ordering::SeqCst, Ordering::SeqCst)
172                        .is_ok()
173                {
174                    ttl_reconnections.inc();
175                    return Err(HookError::Continue(Some(HookErrorCause::Message(
176                        "connection has been TTLed".to_string(),
177                    ))));
178                }
179
180                Ok(())
181            }))
182            .build()
183            .expect("postgres connection pool built with incorrect parameters");
184
185        Ok(PostgresClient {
186            pool,
187            metrics: config.metrics,
188        })
189    }
190
191    fn status_metrics(&self, status: Status) {
192        self.metrics
193            .connpool_available
194            .set(f64::cast_lossy(status.available));
195        self.metrics.connpool_size.set(u64::cast_from(status.size));
196        // Don't bother reporting the maximum size of the pool... we know that from config.
197    }
198
199    /// Gets connection from the pool or waits for one to become available.
200    pub async fn get_connection(&self) -> Result<Object, PoolError> {
201        let start = Instant::now();
202        // note that getting the pool size here requires briefly locking the pool
203        self.status_metrics(self.pool.status());
204        let res = self.pool.get().await;
205        if let Err(PoolError::Backend(err)) = &res {
206            debug!("error establishing connection: {}", err);
207            self.metrics.connpool_connection_errors.inc();
208        }
209        self.metrics
210            .connpool_acquire_seconds
211            .inc_by(start.elapsed().as_secs_f64());
212        self.metrics.connpool_acquires.inc();
213        self.status_metrics(self.pool.status());
214        res
215    }
216}