1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
910//! A Postgres client that uses deadpool as a connection pool and comes with
11//! common/default configuration options.
1213#![warn(missing_docs, missing_debug_implementations)]
14#![warn(
15 clippy::cast_possible_truncation,
16 clippy::cast_precision_loss,
17 clippy::cast_sign_loss,
18 clippy::clone_on_ref_ptr
19)]
2021pub mod error;
22pub mod metrics;
2324use std::sync::Arc;
25use std::sync::atomic::{AtomicU64, Ordering};
26use std::time::{Duration, Instant};
2728use deadpool_postgres::tokio_postgres::Config;
29use deadpool_postgres::{
30 Hook, HookError, HookErrorCause, Manager, ManagerConfig, Object, Pool, PoolError,
31 RecyclingMethod, Runtime, Status,
32};
33use mz_ore::cast::{CastFrom, CastLossy};
34use mz_ore::now::SYSTEM_TIME;
35use mz_ore::url::SensitiveUrl;
36use tracing::debug;
3738use crate::error::PostgresError;
39use crate::metrics::PostgresClientMetrics;
4041/// Configuration knobs for [PostgresClient].
42pub trait PostgresClientKnobs: std::fmt::Debug + Send + Sync {
43/// Maximum number of connections allowed in a pool.
44fn connection_pool_max_size(&self) -> usize;
45/// The maximum time to wait to obtain a connection, if any.
46fn connection_pool_max_wait(&self) -> Option<Duration>;
47/// Minimum TTL of a connection. It is expected that connections are
48 /// routinely culled to balance load to the backing store.
49fn connection_pool_ttl(&self) -> Duration;
50/// Minimum time between TTLing connections. Helps stagger reconnections
51 /// to avoid stampeding the backing store.
52fn connection_pool_ttl_stagger(&self) -> Duration;
53/// Time to wait for a connection to be made before trying.
54fn connect_timeout(&self) -> Duration;
55/// TCP user timeout for connection attempts.
56fn tcp_user_timeout(&self) -> Duration;
57}
5859/// Configuration for creating a [PostgresClient].
60#[derive(Clone, Debug)]
61pub struct PostgresClientConfig {
62 url: SensitiveUrl,
63 knobs: Arc<dyn PostgresClientKnobs>,
64 metrics: PostgresClientMetrics,
65}
6667impl PostgresClientConfig {
68/// Returns a new [PostgresClientConfig] for use in production.
69pub fn new(
70 url: SensitiveUrl,
71 knobs: Arc<dyn PostgresClientKnobs>,
72 metrics: PostgresClientMetrics,
73 ) -> Self {
74 PostgresClientConfig {
75 url,
76 knobs,
77 metrics,
78 }
79 }
80}
8182/// A Postgres client wrapper that uses deadpool as a connection pool.
83pub struct PostgresClient {
84 pool: Pool,
85 metrics: PostgresClientMetrics,
86}
8788impl std::fmt::Debug for PostgresClient {
89fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
90 f.debug_struct("PostgresClient").finish_non_exhaustive()
91 }
92}
9394impl PostgresClient {
95/// Open a [PostgresClient] using the given `config`.
96pub fn open(config: PostgresClientConfig) -> Result<Self, PostgresError> {
97let mut pg_config: Config = config.url.to_string_unredacted().parse()?;
98 pg_config.connect_timeout(config.knobs.connect_timeout());
99 pg_config.tcp_user_timeout(config.knobs.tcp_user_timeout());
100101let tls = mz_tls_util::make_tls(&pg_config).map_err(|tls_err| match tls_err {
102 mz_tls_util::TlsError::Generic(e) => PostgresError::Indeterminate(e),
103 mz_tls_util::TlsError::OpenSsl(e) => PostgresError::Indeterminate(anyhow::anyhow!(e)),
104 })?;
105106let manager = Manager::from_config(
107 pg_config,
108 tls,
109 ManagerConfig {
110 recycling_method: RecyclingMethod::Fast,
111 },
112 );
113114let last_ttl_connection = AtomicU64::new(0);
115let connections_created = config.metrics.connpool_connections_created.clone();
116let ttl_reconnections = config.metrics.connpool_ttl_reconnections.clone();
117let builder = Pool::builder(manager);
118let builder = match config.knobs.connection_pool_max_wait() {
119None => builder,
120Some(wait) => builder.wait_timeout(Some(wait)).runtime(Runtime::Tokio1),
121 };
122let pool = builder
123 .max_size(config.knobs.connection_pool_max_size())
124 .post_create(Hook::async_fn(move |client, _| {
125 connections_created.inc();
126 Box::pin(async move {
127debug!("opened new consensus postgres connection");
128 client.batch_execute(
129"SET SESSION CHARACTERISTICS AS TRANSACTION ISOLATION LEVEL SERIALIZABLE",
130 ).await.map_err(|e| HookError::Abort(HookErrorCause::Backend(e)))
131 })
132 }))
133 .pre_recycle(Hook::sync_fn(move |_client, conn_metrics| {
134// proactively TTL connections to rebalance load to Postgres/CRDB. this helps
135 // fix skew when downstream DB operations (e.g. CRDB rolling restart) result
136 // in uneven load to each node, and works to reduce the # of connections
137 // maintained by the pool after bursty workloads.
138139 // add a bias towards TTLing older connections first
140if conn_metrics.age() < config.knobs.connection_pool_ttl() {
141return Ok(());
142 }
143144let last_ttl = last_ttl_connection.load(Ordering::SeqCst);
145let now = (SYSTEM_TIME)();
146let elapsed_since_last_ttl = Duration::from_millis(now.saturating_sub(last_ttl));
147148// stagger out reconnections to avoid stampeding the DB
149if elapsed_since_last_ttl > config.knobs.connection_pool_ttl_stagger()
150 && last_ttl_connection
151 .compare_exchange_weak(last_ttl, now, Ordering::SeqCst, Ordering::SeqCst)
152 .is_ok()
153 {
154 ttl_reconnections.inc();
155return Err(HookError::Continue(Some(HookErrorCause::Message(
156"connection has been TTLed".to_string(),
157 ))));
158 }
159160Ok(())
161 }))
162 .build()
163 .expect("postgres connection pool built with incorrect parameters");
164165Ok(PostgresClient {
166 pool,
167 metrics: config.metrics,
168 })
169 }
170171fn status_metrics(&self, status: Status) {
172self.metrics
173 .connpool_available
174 .set(f64::cast_lossy(status.available));
175self.metrics.connpool_size.set(u64::cast_from(status.size));
176// Don't bother reporting the maximum size of the pool... we know that from config.
177}
178179/// Gets connection from the pool or waits for one to become available.
180pub async fn get_connection(&self) -> Result<Object, PoolError> {
181let start = Instant::now();
182// note that getting the pool size here requires briefly locking the pool
183self.status_metrics(self.pool.status());
184let res = self.pool.get().await;
185if let Err(PoolError::Backend(err)) = &res {
186debug!("error establishing connection: {}", err);
187self.metrics.connpool_connection_errors.inc();
188 }
189self.metrics
190 .connpool_acquire_seconds
191 .inc_by(start.elapsed().as_secs_f64());
192self.metrics.connpool_acquires.inc();
193self.status_metrics(self.pool.status());
194 res
195 }
196}