mz_postgres_client/
lib.rs1#![warn(missing_docs, missing_debug_implementations)]
14#![warn(
15 clippy::cast_possible_truncation,
16 clippy::cast_precision_loss,
17 clippy::cast_sign_loss,
18 clippy::clone_on_ref_ptr
19)]
20
21pub mod error;
22pub mod metrics;
23
24use std::sync::Arc;
25use std::sync::atomic::{AtomicU64, Ordering};
26use std::time::{Duration, Instant};
27
28use deadpool_postgres::tokio_postgres::Config;
29use deadpool_postgres::{
30 Hook, HookError, HookErrorCause, Manager, ManagerConfig, Object, Pool, PoolError,
31 RecyclingMethod, Runtime, Status,
32};
33use mz_ore::cast::{CastFrom, CastLossy};
34use mz_ore::now::SYSTEM_TIME;
35use mz_ore::url::SensitiveUrl;
36use tracing::debug;
37
38use crate::error::PostgresError;
39use crate::metrics::PostgresClientMetrics;
40
41pub trait PostgresClientKnobs: std::fmt::Debug + Send + Sync {
43 fn connection_pool_max_size(&self) -> usize;
45 fn connection_pool_max_wait(&self) -> Option<Duration>;
47 fn connection_pool_ttl(&self) -> Duration;
50 fn connection_pool_ttl_stagger(&self) -> Duration;
53 fn connect_timeout(&self) -> Duration;
55 fn tcp_user_timeout(&self) -> Duration;
57 fn keepalives_idle(&self) -> Duration;
59 fn keepalives_interval(&self) -> Duration;
61 fn keepalives_retries(&self) -> u32;
63}
64
65#[derive(Clone, Debug)]
67pub struct PostgresClientConfig {
68 url: SensitiveUrl,
69 knobs: Arc<dyn PostgresClientKnobs>,
70 metrics: PostgresClientMetrics,
71}
72
73impl PostgresClientConfig {
74 pub fn new(
76 url: SensitiveUrl,
77 knobs: Arc<dyn PostgresClientKnobs>,
78 metrics: PostgresClientMetrics,
79 ) -> Self {
80 PostgresClientConfig {
81 url,
82 knobs,
83 metrics,
84 }
85 }
86}
87
88pub struct PostgresClient {
90 pool: Pool,
91 metrics: PostgresClientMetrics,
92}
93
94impl std::fmt::Debug for PostgresClient {
95 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
96 f.debug_struct("PostgresClient").finish_non_exhaustive()
97 }
98}
99
100impl PostgresClient {
101 pub fn open(config: PostgresClientConfig) -> Result<Self, PostgresError> {
103 let mut pg_config: Config = config.url.to_string_unredacted().parse()?;
104 pg_config.connect_timeout(config.knobs.connect_timeout());
105 pg_config.tcp_user_timeout(config.knobs.tcp_user_timeout());
106
107 pg_config.keepalives(true);
111 pg_config.keepalives_idle(config.knobs.keepalives_idle());
112 pg_config.keepalives_interval(config.knobs.keepalives_interval());
113 pg_config.keepalives_retries(config.knobs.keepalives_retries());
114
115 let tls = mz_tls_util::make_tls(&pg_config).map_err(|tls_err| match tls_err {
116 mz_tls_util::TlsError::Generic(e) => PostgresError::Indeterminate(e),
117 mz_tls_util::TlsError::OpenSsl(e) => PostgresError::Indeterminate(anyhow::anyhow!(e)),
118 })?;
119
120 let manager = Manager::from_config(
121 pg_config,
122 tls,
123 ManagerConfig {
124 recycling_method: RecyclingMethod::Fast,
125 },
126 );
127
128 let last_ttl_connection = AtomicU64::new(0);
129 let connections_created = config.metrics.connpool_connections_created.clone();
130 let ttl_reconnections = config.metrics.connpool_ttl_reconnections.clone();
131 let builder = Pool::builder(manager);
132 let builder = match config.knobs.connection_pool_max_wait() {
133 None => builder,
134 Some(wait) => builder.wait_timeout(Some(wait)).runtime(Runtime::Tokio1),
135 };
136 let pool = builder
137 .max_size(config.knobs.connection_pool_max_size())
138 .post_create(Hook::async_fn(move |client, _| {
139 connections_created.inc();
140 Box::pin(async move {
141 debug!("opened new consensus postgres connection");
142 #[allow(clippy::disallowed_methods)]
145 client
146 .batch_execute(
147 "SET SESSION CHARACTERISTICS AS TRANSACTION ISOLATION LEVEL SERIALIZABLE",
148 )
149 .await
150 .map_err(|e| HookError::Abort(HookErrorCause::Backend(e)))
151 })
152 }))
153 .pre_recycle(Hook::sync_fn(move |_client, conn_metrics| {
154 if conn_metrics.age() < config.knobs.connection_pool_ttl() {
161 return Ok(());
162 }
163
164 let last_ttl = last_ttl_connection.load(Ordering::SeqCst);
165 let now = (SYSTEM_TIME)();
166 let elapsed_since_last_ttl = Duration::from_millis(now.saturating_sub(last_ttl));
167
168 if elapsed_since_last_ttl > config.knobs.connection_pool_ttl_stagger()
170 && last_ttl_connection
171 .compare_exchange_weak(last_ttl, now, Ordering::SeqCst, Ordering::SeqCst)
172 .is_ok()
173 {
174 ttl_reconnections.inc();
175 return Err(HookError::Continue(Some(HookErrorCause::Message(
176 "connection has been TTLed".to_string(),
177 ))));
178 }
179
180 Ok(())
181 }))
182 .build()
183 .expect("postgres connection pool built with incorrect parameters");
184
185 Ok(PostgresClient {
186 pool,
187 metrics: config.metrics,
188 })
189 }
190
191 fn status_metrics(&self, status: Status) {
192 self.metrics
193 .connpool_available
194 .set(f64::cast_lossy(status.available));
195 self.metrics.connpool_size.set(u64::cast_from(status.size));
196 }
198
199 pub async fn get_connection(&self) -> Result<Object, PoolError> {
201 let start = Instant::now();
202 self.status_metrics(self.pool.status());
204 let res = self.pool.get().await;
205 if let Err(PoolError::Backend(err)) = &res {
206 debug!("error establishing connection: {}", err);
207 self.metrics.connpool_connection_errors.inc();
208 }
209 self.metrics
210 .connpool_acquire_seconds
211 .inc_by(start.elapsed().as_secs_f64());
212 self.metrics.connpool_acquires.inc();
213 self.status_metrics(self.pool.status());
214 res
215 }
216}