Skip to main content

mz_persist/
postgres.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Implementation of [Consensus] backed by Postgres.
11
12use std::fmt::Formatter;
13use std::str::FromStr;
14use std::sync::Arc;
15use std::time::Duration;
16
17use anyhow::anyhow;
18use async_stream::try_stream;
19use async_trait::async_trait;
20use bytes::Bytes;
21use deadpool_postgres::tokio_postgres::Config;
22use deadpool_postgres::tokio_postgres::types::{FromSql, IsNull, ToSql, Type, to_sql_checked};
23use deadpool_postgres::{Object, PoolError};
24use futures_util::StreamExt;
25use mz_dyncfg::ConfigSet;
26use mz_ore::cast::CastFrom;
27use mz_ore::metrics::MetricsRegistry;
28use mz_ore::url::SensitiveUrl;
29use mz_postgres_client::metrics::PostgresClientMetrics;
30use mz_postgres_client::{PostgresClient, PostgresClientConfig, PostgresClientKnobs};
31use postgres_protocol::escape::escape_identifier;
32use tokio_postgres::error::SqlState;
33use tokio_postgres::{Row, Statement};
34use tracing::{info, warn};
35
36use crate::error::Error;
37use crate::location::{CaSResult, Consensus, ExternalError, ResultStream, SeqNo, VersionedData};
38
39/// Flag to use concensus queries that are tuned for vanilla Postgres.
40pub const USE_POSTGRES_TUNED_QUERIES: mz_dyncfg::Config<bool> = mz_dyncfg::Config::new(
41    "persist_use_postgres_tuned_queries",
42    false,
43    "Use a set of queries for consensus that have specifically been tuned against
44    Postgres to ensure we acquire a minimal number of locks.",
45);
46
47const SCHEMA: &str = "
48CREATE TABLE IF NOT EXISTS consensus (
49    shard text NOT NULL,
50    sequence_number bigint NOT NULL,
51    data bytea NOT NULL,
52    PRIMARY KEY(shard, sequence_number)
53)
54";
55
56// These `sql_stats_automatic_collection_enabled` are for the cost-based
57// optimizer but all the queries against this table are single-table and very
58// carefully tuned to hit the primary index, so the cost-based optimizer doesn't
59// really get us anything. OTOH, the background jobs that crdb creates to
60// collect these stats fill up the jobs table (slowing down all sorts of
61// things).
62const CRDB_SCHEMA_OPTIONS: &str = "WITH (sql_stats_automatic_collection_enabled = false)";
63// The `consensus` table creates and deletes rows at a high frequency, generating many
64// tombstoned rows. If Cockroach's GC interval is set high (the default is 25h) and
65// these tombstones accumulate, scanning over the table will take increasingly and
66// prohibitively long.
67//
68// See: https://github.com/MaterializeInc/database-issues/issues/4001
69// See: https://www.cockroachlabs.com/docs/stable/configure-zone.html#variables
70const CRDB_CONFIGURE_ZONE: &str = "ALTER TABLE consensus CONFIGURE ZONE USING gc.ttlseconds = 600";
71
72/// NOTE: `mz-persist` intentionally does not depend on `mz-postgres-util`.
73/// These helpers are the only direct driver-call boundary in this module.
74async fn pg_batch_execute(client: &Object, query: &str) -> Result<(), tokio_postgres::Error> {
75    #[allow(clippy::disallowed_methods)]
76    client.batch_execute(query).await
77}
78
79async fn pg_query_prepared(
80    client: &Object,
81    statement: &Statement,
82    params: &[&(dyn ToSql + Sync)],
83) -> Result<Vec<Row>, tokio_postgres::Error> {
84    #[allow(clippy::disallowed_methods)]
85    client.query(statement, params).await
86}
87
88async fn pg_query_opt_prepared(
89    client: &Object,
90    statement: &Statement,
91    params: &[&(dyn ToSql + Sync)],
92) -> Result<Option<Row>, tokio_postgres::Error> {
93    #[allow(clippy::disallowed_methods)]
94    client.query_opt(statement, params).await
95}
96
97async fn pg_execute_prepared(
98    client: &Object,
99    statement: &Statement,
100    params: &[&(dyn ToSql + Sync)],
101) -> Result<u64, tokio_postgres::Error> {
102    #[allow(clippy::disallowed_methods)]
103    client.execute(statement, params).await
104}
105
106impl ToSql for SeqNo {
107    fn to_sql(
108        &self,
109        ty: &Type,
110        w: &mut bytes::BytesMut,
111    ) -> Result<IsNull, Box<dyn std::error::Error + Sync + Send>> {
112        // We can only represent sequence numbers in the range [0, i64::MAX].
113        let value = i64::try_from(self.0)?;
114        <i64 as ToSql>::to_sql(&value, ty, w)
115    }
116
117    fn accepts(ty: &Type) -> bool {
118        <i64 as ToSql>::accepts(ty)
119    }
120
121    to_sql_checked!();
122}
123
124impl<'a> FromSql<'a> for SeqNo {
125    fn from_sql(
126        ty: &Type,
127        raw: &'a [u8],
128    ) -> Result<SeqNo, Box<dyn std::error::Error + Sync + Send>> {
129        let sequence_number = <i64 as FromSql>::from_sql(ty, raw)?;
130
131        // Sanity check that the sequence number we received falls in the
132        // [0, i64::MAX] range.
133        let sequence_number = u64::try_from(sequence_number)?;
134        Ok(SeqNo(sequence_number))
135    }
136
137    fn accepts(ty: &Type) -> bool {
138        <i64 as FromSql>::accepts(ty)
139    }
140}
141
142/// Configuration to connect to a Postgres backed implementation of [Consensus].
143#[derive(Clone, Debug)]
144pub struct PostgresConsensusConfig {
145    url: SensitiveUrl,
146    knobs: Arc<dyn PostgresClientKnobs>,
147    metrics: PostgresClientMetrics,
148    dyncfg: Arc<ConfigSet>,
149}
150
151impl From<PostgresConsensusConfig> for PostgresClientConfig {
152    fn from(config: PostgresConsensusConfig) -> Self {
153        PostgresClientConfig::new(config.url, config.knobs, config.metrics)
154    }
155}
156
157impl PostgresConsensusConfig {
158    const EXTERNAL_TESTS_POSTGRES_URL: &'static str =
159        "MZ_PERSIST_EXTERNAL_STORAGE_TEST_POSTGRES_URL";
160
161    /// Returns a new [PostgresConsensusConfig] for use in production.
162    pub fn new(
163        url: &SensitiveUrl,
164        knobs: Box<dyn PostgresClientKnobs>,
165        metrics: PostgresClientMetrics,
166        dyncfg: Arc<ConfigSet>,
167    ) -> Result<Self, Error> {
168        Ok(PostgresConsensusConfig {
169            url: url.clone(),
170            knobs: Arc::from(knobs),
171            metrics,
172            dyncfg,
173        })
174    }
175
176    /// Returns a new [PostgresConsensusConfig] for use in unit tests.
177    ///
178    /// By default, persist tests that use external storage (like Postgres) are
179    /// no-ops so that `cargo test` works on new environments without any
180    /// configuration. To activate the tests for [PostgresConsensus] set the
181    /// `MZ_PERSIST_EXTERNAL_STORAGE_TEST_POSTGRES_URL` environment variable
182    /// with a valid connection url [1].
183    ///
184    /// [1]: https://docs.rs/tokio-postgres/latest/tokio_postgres/config/struct.Config.html#url
185    pub fn new_for_test() -> Result<Option<Self>, Error> {
186        let url = match std::env::var(Self::EXTERNAL_TESTS_POSTGRES_URL) {
187            Ok(url) => SensitiveUrl::from_str(&url).map_err(|e| e.to_string())?,
188            Err(_) => {
189                if mz_ore::env::is_var_truthy("CI") {
190                    panic!("CI is supposed to run this test but something has gone wrong!");
191                }
192                return Ok(None);
193            }
194        };
195
196        struct TestConsensusKnobs;
197        impl std::fmt::Debug for TestConsensusKnobs {
198            fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
199                f.debug_struct("TestConsensusKnobs").finish_non_exhaustive()
200            }
201        }
202        impl PostgresClientKnobs for TestConsensusKnobs {
203            fn connection_pool_max_size(&self) -> usize {
204                2
205            }
206
207            fn connection_pool_max_wait(&self) -> Option<Duration> {
208                Some(Duration::from_secs(1))
209            }
210
211            fn connection_pool_ttl(&self) -> Duration {
212                Duration::MAX
213            }
214            fn connection_pool_ttl_stagger(&self) -> Duration {
215                Duration::MAX
216            }
217            fn connect_timeout(&self) -> Duration {
218                Duration::MAX
219            }
220            fn tcp_user_timeout(&self) -> Duration {
221                Duration::ZERO
222            }
223
224            fn keepalives_idle(&self) -> Duration {
225                Duration::from_secs(10)
226            }
227
228            fn keepalives_interval(&self) -> Duration {
229                Duration::from_secs(5)
230            }
231
232            fn keepalives_retries(&self) -> u32 {
233                5
234            }
235
236            fn statement_timeout(&self) -> Duration {
237                Duration::ZERO
238            }
239        }
240
241        let dyncfg = ConfigSet::default().add(&USE_POSTGRES_TUNED_QUERIES);
242        let config = PostgresConsensusConfig::new(
243            &url,
244            Box::new(TestConsensusKnobs),
245            PostgresClientMetrics::new(&MetricsRegistry::new(), "mz_persist"),
246            Arc::new(dyncfg),
247        )?;
248        Ok(Some(config))
249    }
250}
251
252/// What flavor of Postgres are we connected to for consensus.
253#[derive(Debug, Clone, Copy, PartialEq, Eq)]
254enum PostgresMode {
255    /// CockroachDB, used in our cloud offering.
256    CockroachDB,
257    /// Vanilla Postgres, the default for our self-hosted offering.
258    Postgres,
259}
260
261/// Implementation of [Consensus] over a Postgres database.
262pub struct PostgresConsensus {
263    postgres_client: PostgresClient,
264    dyncfg: Arc<ConfigSet>,
265    mode: PostgresMode,
266}
267
268impl std::fmt::Debug for PostgresConsensus {
269    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
270        f.debug_struct("PostgresConsensus").finish_non_exhaustive()
271    }
272}
273
274impl PostgresConsensus {
275    /// Open a Postgres [Consensus] instance with `config`, for the collection
276    /// named `shard`.
277    pub async fn open(config: PostgresConsensusConfig) -> Result<Self, ExternalError> {
278        // don't need to unredact here because we just want to pull out the username
279        let pg_config: Config = config.url.to_string().parse()?;
280        let role = pg_config.get_user().expect("failed to get PostgreSQL user");
281        let create_schema = format!(
282            "CREATE SCHEMA IF NOT EXISTS consensus AUTHORIZATION {}",
283            escape_identifier(role),
284        );
285
286        let dyncfg = Arc::clone(&config.dyncfg);
287        let postgres_client = PostgresClient::open(config.into())?;
288
289        let client = postgres_client.get_connection().await?;
290
291        let mode = match pg_batch_execute(
292            &client,
293            &format!(
294                "{}; {}{}; {};",
295                create_schema, SCHEMA, CRDB_SCHEMA_OPTIONS, CRDB_CONFIGURE_ZONE,
296            ),
297        )
298        .await
299        {
300            Ok(()) => PostgresMode::CockroachDB,
301            Err(e) if e.code() == Some(&SqlState::INSUFFICIENT_PRIVILEGE) => {
302                warn!(
303                    "unable to ALTER TABLE consensus, this is expected and OK when connecting with a read-only user"
304                );
305                PostgresMode::CockroachDB
306            }
307            // Vanilla Postgres doesn't support the Cockroach zone configuration
308            // that we attempted, so we use that to determine what mode we're in.
309            Err(e)
310                if e.code() == Some(&SqlState::INVALID_PARAMETER_VALUE)
311                    || e.code() == Some(&SqlState::SYNTAX_ERROR) =>
312            {
313                info!(
314                    "unable to initiate consensus with CRDB params, this is expected and OK when running against Postgres: {:?}",
315                    e
316                );
317                PostgresMode::Postgres
318            }
319            Err(e) => return Err(e.into()),
320        };
321
322        if mode != PostgresMode::CockroachDB {
323            pg_batch_execute(&client, &format!("{}; {};", create_schema, SCHEMA)).await?;
324        }
325
326        Ok(PostgresConsensus {
327            postgres_client,
328            dyncfg,
329            mode,
330        })
331    }
332
333    /// Drops and recreates the `consensus` table in Postgres
334    ///
335    /// ONLY FOR TESTING
336    pub async fn drop_and_recreate(&self) -> Result<(), ExternalError> {
337        // this could be a TRUNCATE if we're confident the db won't reuse any state
338        let client = self.get_connection().await?;
339        pg_batch_execute(&client, "DROP TABLE consensus").await?;
340        let crdb_mode = match pg_batch_execute(
341            &client,
342            &format!("{}{}; {}", SCHEMA, CRDB_SCHEMA_OPTIONS, CRDB_CONFIGURE_ZONE,),
343        )
344        .await
345        {
346            Ok(()) => true,
347            Err(e) if e.code() == Some(&SqlState::INSUFFICIENT_PRIVILEGE) => {
348                warn!(
349                    "unable to ALTER TABLE consensus, this is expected and OK when connecting with a read-only user"
350                );
351                true
352            }
353            Err(e)
354                if e.code() == Some(&SqlState::INVALID_PARAMETER_VALUE)
355                    || e.code() == Some(&SqlState::SYNTAX_ERROR) =>
356            {
357                info!(
358                    "unable to initiate consensus with CRDB params, this is expected and OK when running against Postgres: {:?}",
359                    e
360                );
361                false
362            }
363            Err(e) => return Err(e.into()),
364        };
365
366        if !crdb_mode {
367            pg_batch_execute(&client, SCHEMA).await?;
368        }
369        Ok(())
370    }
371
372    async fn get_connection(&self) -> Result<Object, PoolError> {
373        self.postgres_client.get_connection().await
374    }
375}
376
377#[async_trait]
378impl Consensus for PostgresConsensus {
379    fn list_keys(&self) -> ResultStream<'_, String> {
380        let q = "SELECT DISTINCT shard FROM consensus";
381
382        Box::pin(try_stream! {
383            // NB: it's important that we hang on to this client for the lifetime of the stream,
384            // to avoid returning it to the pool prematurely.
385            let client = self.get_connection().await?;
386            let statement = client.prepare_cached(q).await?;
387            let params: &[String] = &[];
388            let mut rows = Box::pin(client.query_raw(&statement, params).await?);
389            while let Some(row) = rows.next().await {
390                let shard: String = row?.try_get("shard")?;
391                yield shard;
392            }
393        })
394    }
395
396    async fn head(&self, key: &str) -> Result<Option<VersionedData>, ExternalError> {
397        let q = "SELECT sequence_number, data FROM consensus
398             WHERE shard = $1 ORDER BY sequence_number DESC LIMIT 1";
399        let row = {
400            let client = self.get_connection().await?;
401            let statement = client.prepare_cached(q).await?;
402            pg_query_opt_prepared(&client, &statement, &[&key]).await?
403        };
404        let row = match row {
405            None => return Ok(None),
406            Some(row) => row,
407        };
408
409        let seqno: SeqNo = row.try_get("sequence_number")?;
410
411        let data: Vec<u8> = row.try_get("data")?;
412        Ok(Some(VersionedData {
413            seqno,
414            data: Bytes::from(data),
415        }))
416    }
417
418    async fn compare_and_set(
419        &self,
420        key: &str,
421        new: VersionedData,
422    ) -> Result<CaSResult, ExternalError> {
423        let expected = new.seqno.previous();
424
425        let result = if let Some(expected) = expected {
426            /// This query has been written to execute within a single
427            /// network round-trip. The insert performance has been tuned
428            /// against CockroachDB, ensuring it goes through the fast-path
429            /// 1-phase commit of CRDB. Any changes to this query should
430            /// confirm an EXPLAIN ANALYZE (VERBOSE) query plan contains
431            /// `auto commit`
432            static CRDB_CAS_QUERY: &str = "
433                INSERT INTO consensus (shard, sequence_number, data)
434                SELECT $1, $2, $3
435                WHERE (SELECT sequence_number FROM consensus
436                       WHERE shard = $1
437                       ORDER BY sequence_number DESC LIMIT 1) = $4;
438            ";
439
440            /// This query has been written to ensure we only get row level
441            /// locks on the `(shard, seq_no)` we're trying to update. The insert
442            /// performance has been tuned against Postgres 15 to ensure it
443            /// minimizes possible serialization conflicts.
444            static POSTGRES_CAS_QUERY: &str = "
445            WITH last_seq AS (
446                SELECT sequence_number FROM consensus
447                WHERE shard = $1
448                ORDER BY sequence_number DESC
449                LIMIT 1
450                FOR UPDATE
451            )
452            INSERT INTO consensus (shard, sequence_number, data)
453            SELECT $1, $2, $3
454            FROM last_seq
455            WHERE last_seq.sequence_number = $4;
456            ";
457
458            let q = if USE_POSTGRES_TUNED_QUERIES.get(&self.dyncfg)
459                && self.mode == PostgresMode::Postgres
460            {
461                POSTGRES_CAS_QUERY
462            } else {
463                CRDB_CAS_QUERY
464            };
465            let client = self.get_connection().await?;
466            let statement = client.prepare_cached(q).await?;
467            pg_execute_prepared(
468                &client,
469                &statement,
470                &[&key, &new.seqno, &new.data.as_ref(), &expected],
471            )
472            .await?
473        } else {
474            // Insert the new row as long as no other row exists for the same shard.
475            let q = "INSERT INTO consensus SELECT $1, $2, $3 WHERE
476                     NOT EXISTS (
477                         SELECT * FROM consensus WHERE shard = $1
478                     )
479                     ON CONFLICT DO NOTHING";
480            let client = self.get_connection().await?;
481            let statement = client.prepare_cached(q).await?;
482            pg_execute_prepared(&client, &statement, &[&key, &new.seqno, &new.data.as_ref()])
483                .await?
484        };
485
486        if result == 1 {
487            Ok(CaSResult::Committed)
488        } else {
489            Ok(CaSResult::ExpectationMismatch)
490        }
491    }
492
493    async fn scan(
494        &self,
495        key: &str,
496        from: SeqNo,
497        limit: usize,
498    ) -> Result<Vec<VersionedData>, ExternalError> {
499        let q = "SELECT sequence_number, data FROM consensus
500             WHERE shard = $1 AND sequence_number >= $2
501             ORDER BY sequence_number ASC LIMIT $3";
502        let Ok(limit) = i64::try_from(limit) else {
503            return Err(ExternalError::from(anyhow!(
504                "limit must be [0, i64::MAX]. was: {:?}",
505                limit
506            )));
507        };
508        let rows = {
509            let client = self.get_connection().await?;
510            let statement = client.prepare_cached(q).await?;
511            pg_query_prepared(&client, &statement, &[&key, &from, &limit]).await?
512        };
513        let mut results = Vec::with_capacity(rows.len());
514
515        for row in rows {
516            let seqno: SeqNo = row.try_get("sequence_number")?;
517            let data: Vec<u8> = row.try_get("data")?;
518            results.push(VersionedData {
519                seqno,
520                data: Bytes::from(data),
521            });
522        }
523        Ok(results)
524    }
525
526    async fn truncate(&self, key: &str, seqno: SeqNo) -> Result<Option<usize>, ExternalError> {
527        static CRDB_TRUNCATE_QUERY: &str = "
528        DELETE FROM consensus
529        WHERE shard = $1 AND sequence_number < $2 AND
530        EXISTS (
531            SELECT * FROM consensus WHERE shard = $1 AND sequence_number >= $2
532        )
533        ";
534
535        /// This query has been specifically tuned to ensure we get the minimal
536        /// number of __row__ locks possible, and that it doesn't conflict with
537        /// concurrently running compare and swap operations that are trying to
538        /// evolve the shard.
539        ///
540        /// It's performance has been benchmarked against Postgres 15.
541        ///
542        /// Note: The `ORDER BY` in the newer_exists CTE exists so we obtain a
543        /// row lock on the lowest possible sequence number. This ensures
544        /// minimal conflict between concurrently running truncate and append
545        /// operations.
546        static POSTGRES_TRUNCATE_QUERY: &str = "
547        WITH newer_exists AS (
548            SELECT * FROM consensus
549            WHERE shard = $1
550                AND sequence_number >= $2
551            ORDER BY sequence_number ASC
552            LIMIT 1
553            FOR UPDATE
554        ),
555        to_lock AS (
556            SELECT ctid FROM consensus
557            WHERE shard = $1
558            AND sequence_number < $2
559            AND EXISTS (SELECT * FROM newer_exists)
560            ORDER BY sequence_number DESC
561            FOR UPDATE
562        )
563        DELETE FROM consensus
564        USING to_lock
565        WHERE consensus.ctid = to_lock.ctid;
566        ";
567
568        let q = if USE_POSTGRES_TUNED_QUERIES.get(&self.dyncfg)
569            && self.mode == PostgresMode::Postgres
570        {
571            POSTGRES_TRUNCATE_QUERY
572        } else {
573            CRDB_TRUNCATE_QUERY
574        };
575        let result = {
576            let client = self.get_connection().await?;
577            let statement = client.prepare_cached(q).await?;
578            pg_execute_prepared(&client, &statement, &[&key, &seqno]).await?
579        };
580        if result == 0 {
581            // We weren't able to successfully truncate any rows inspect head to
582            // determine whether the request was valid and there were no records in
583            // the provided range, or the request was invalid because it would have
584            // also deleted head.
585
586            // It's safe to call head in a subsequent transaction rather than doing
587            // so directly in the same transaction because, once a given (seqno, data)
588            // pair exists for our shard, we enforce the invariants that
589            // 1. Our shard will always have _some_ data mapped to it.
590            // 2. All operations that modify the (seqno, data) can only increase
591            //    the sequence number.
592            let current = self.head(key).await?;
593            if current.map_or(true, |data| data.seqno < seqno) {
594                return Err(ExternalError::from(anyhow!(
595                    "upper bound too high for truncate: {:?}",
596                    seqno
597                )));
598            }
599        }
600
601        Ok(Some(usize::cast_from(result)))
602    }
603}
604
605#[cfg(test)]
606mod tests {
607    use mz_ore::assert_err;
608    use tracing::info;
609    use uuid::Uuid;
610
611    use crate::location::tests::consensus_impl_test;
612
613    use super::*;
614
615    #[mz_ore::test(tokio::test(flavor = "multi_thread"))]
616    #[cfg_attr(miri, ignore)] // error: unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
617    async fn postgres_consensus() -> Result<(), ExternalError> {
618        let config = match PostgresConsensusConfig::new_for_test()? {
619            Some(config) => config,
620            None => {
621                info!(
622                    "{} env not set: skipping test that uses external service",
623                    PostgresConsensusConfig::EXTERNAL_TESTS_POSTGRES_URL
624                );
625                return Ok(());
626            }
627        };
628
629        consensus_impl_test(|| PostgresConsensus::open(config.clone())).await?;
630
631        // and now verify the implementation-specific `drop_and_recreate` works as intended
632        let consensus = PostgresConsensus::open(config.clone()).await?;
633        let key = Uuid::new_v4().to_string();
634        let state = VersionedData {
635            seqno: SeqNo(0),
636            data: Bytes::from("abc"),
637        };
638
639        assert_eq!(
640            consensus.compare_and_set(&key, state.clone()).await,
641            Ok(CaSResult::Committed),
642        );
643
644        assert_eq!(consensus.head(&key).await, Ok(Some(state.clone())));
645
646        consensus.drop_and_recreate().await?;
647
648        assert_eq!(consensus.head(&key).await, Ok(None));
649
650        // This should be a separate postgres_consensus_blocking test, but nextest makes it
651        // difficult since we can't specify that both tests touch the consensus table and thus
652        // interfere with each other.
653        let config = match PostgresConsensusConfig::new_for_test()? {
654            Some(config) => config,
655            None => {
656                info!(
657                    "{} env not set: skipping test that uses external service",
658                    PostgresConsensusConfig::EXTERNAL_TESTS_POSTGRES_URL
659                );
660                return Ok(());
661            }
662        };
663
664        let consensus: PostgresConsensus = PostgresConsensus::open(config.clone()).await?;
665        // Max size in test is 2... let's saturate the pool.
666        let _conn1 = consensus.get_connection().await?;
667        let _conn2 = consensus.get_connection().await?;
668
669        // And finally, we should see the next connect time out.
670        let conn3 = consensus.get_connection().await;
671
672        assert_err!(conn3);
673
674        Ok(())
675    }
676}