Skip to main content

mz_persist/
postgres.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Implementation of [Consensus] backed by Postgres.
11
12use std::fmt::Formatter;
13use std::str::FromStr;
14use std::sync::Arc;
15use std::time::Duration;
16
17use anyhow::anyhow;
18use async_stream::try_stream;
19use async_trait::async_trait;
20use bytes::Bytes;
21use deadpool_postgres::tokio_postgres::Config;
22use deadpool_postgres::tokio_postgres::types::{FromSql, IsNull, ToSql, Type, to_sql_checked};
23use deadpool_postgres::{Object, PoolError};
24use futures_util::StreamExt;
25use mz_dyncfg::ConfigSet;
26use mz_ore::cast::CastFrom;
27use mz_ore::metrics::MetricsRegistry;
28use mz_ore::url::SensitiveUrl;
29use mz_postgres_client::metrics::PostgresClientMetrics;
30use mz_postgres_client::{PostgresClient, PostgresClientConfig, PostgresClientKnobs};
31use postgres_protocol::escape::escape_identifier;
32use tokio_postgres::error::SqlState;
33use tokio_postgres::{Row, Statement};
34use tracing::{info, warn};
35
36use crate::error::Error;
37use crate::location::{CaSResult, Consensus, ExternalError, ResultStream, SeqNo, VersionedData};
38
39/// Flag to use concensus queries that are tuned for vanilla Postgres.
40pub const USE_POSTGRES_TUNED_QUERIES: mz_dyncfg::Config<bool> = mz_dyncfg::Config::new(
41    "persist_use_postgres_tuned_queries",
42    false,
43    "Use a set of queries for consensus that have specifically been tuned against
44    Postgres to ensure we acquire a minimal number of locks.",
45);
46
47const SCHEMA: &str = "
48CREATE TABLE IF NOT EXISTS consensus (
49    shard text NOT NULL,
50    sequence_number bigint NOT NULL,
51    data bytea NOT NULL,
52    PRIMARY KEY(shard, sequence_number)
53)
54";
55
56// These `sql_stats_automatic_collection_enabled` are for the cost-based
57// optimizer but all the queries against this table are single-table and very
58// carefully tuned to hit the primary index, so the cost-based optimizer doesn't
59// really get us anything. OTOH, the background jobs that crdb creates to
60// collect these stats fill up the jobs table (slowing down all sorts of
61// things).
62const CRDB_SCHEMA_OPTIONS: &str = "WITH (sql_stats_automatic_collection_enabled = false)";
63// The `consensus` table creates and deletes rows at a high frequency, generating many
64// tombstoned rows. If Cockroach's GC interval is set high (the default is 25h) and
65// these tombstones accumulate, scanning over the table will take increasingly and
66// prohibitively long.
67//
68// See: https://github.com/MaterializeInc/database-issues/issues/4001
69// See: https://www.cockroachlabs.com/docs/stable/configure-zone.html#variables
70const CRDB_CONFIGURE_ZONE: &str = "ALTER TABLE consensus CONFIGURE ZONE USING gc.ttlseconds = 600";
71
72/// NOTE: `mz-persist` intentionally does not depend on `mz-postgres-util`.
73/// These helpers are the only direct driver-call boundary in this module.
74async fn pg_batch_execute(client: &Object, query: &str) -> Result<(), tokio_postgres::Error> {
75    #[allow(clippy::disallowed_methods)]
76    client.batch_execute(query).await
77}
78
79async fn pg_query_prepared(
80    client: &Object,
81    statement: &Statement,
82    params: &[&(dyn ToSql + Sync)],
83) -> Result<Vec<Row>, tokio_postgres::Error> {
84    #[allow(clippy::disallowed_methods)]
85    client.query(statement, params).await
86}
87
88async fn pg_query_opt_prepared(
89    client: &Object,
90    statement: &Statement,
91    params: &[&(dyn ToSql + Sync)],
92) -> Result<Option<Row>, tokio_postgres::Error> {
93    #[allow(clippy::disallowed_methods)]
94    client.query_opt(statement, params).await
95}
96
97async fn pg_execute_prepared(
98    client: &Object,
99    statement: &Statement,
100    params: &[&(dyn ToSql + Sync)],
101) -> Result<u64, tokio_postgres::Error> {
102    #[allow(clippy::disallowed_methods)]
103    client.execute(statement, params).await
104}
105
106impl ToSql for SeqNo {
107    fn to_sql(
108        &self,
109        ty: &Type,
110        w: &mut bytes::BytesMut,
111    ) -> Result<IsNull, Box<dyn std::error::Error + Sync + Send>> {
112        // We can only represent sequence numbers in the range [0, i64::MAX].
113        let value = i64::try_from(self.0)?;
114        <i64 as ToSql>::to_sql(&value, ty, w)
115    }
116
117    fn accepts(ty: &Type) -> bool {
118        <i64 as ToSql>::accepts(ty)
119    }
120
121    to_sql_checked!();
122}
123
124impl<'a> FromSql<'a> for SeqNo {
125    fn from_sql(
126        ty: &Type,
127        raw: &'a [u8],
128    ) -> Result<SeqNo, Box<dyn std::error::Error + Sync + Send>> {
129        let sequence_number = <i64 as FromSql>::from_sql(ty, raw)?;
130
131        // Sanity check that the sequence number we received falls in the
132        // [0, i64::MAX] range.
133        let sequence_number = u64::try_from(sequence_number)?;
134        Ok(SeqNo(sequence_number))
135    }
136
137    fn accepts(ty: &Type) -> bool {
138        <i64 as FromSql>::accepts(ty)
139    }
140}
141
142/// Configuration to connect to a Postgres backed implementation of [Consensus].
143#[derive(Clone, Debug)]
144pub struct PostgresConsensusConfig {
145    url: SensitiveUrl,
146    knobs: Arc<dyn PostgresClientKnobs>,
147    metrics: PostgresClientMetrics,
148    dyncfg: Arc<ConfigSet>,
149}
150
151impl From<PostgresConsensusConfig> for PostgresClientConfig {
152    fn from(config: PostgresConsensusConfig) -> Self {
153        PostgresClientConfig::new(config.url, config.knobs, config.metrics)
154    }
155}
156
157impl PostgresConsensusConfig {
158    const EXTERNAL_TESTS_POSTGRES_URL: &'static str =
159        "MZ_PERSIST_EXTERNAL_STORAGE_TEST_POSTGRES_URL";
160
161    /// Returns a new [PostgresConsensusConfig] for use in production.
162    pub fn new(
163        url: &SensitiveUrl,
164        knobs: Box<dyn PostgresClientKnobs>,
165        metrics: PostgresClientMetrics,
166        dyncfg: Arc<ConfigSet>,
167    ) -> Result<Self, Error> {
168        Ok(PostgresConsensusConfig {
169            url: url.clone(),
170            knobs: Arc::from(knobs),
171            metrics,
172            dyncfg,
173        })
174    }
175
176    /// Returns a new [PostgresConsensusConfig] for use in unit tests.
177    ///
178    /// By default, persist tests that use external storage (like Postgres) are
179    /// no-ops so that `cargo test` works on new environments without any
180    /// configuration. To activate the tests for [PostgresConsensus] set the
181    /// `MZ_PERSIST_EXTERNAL_STORAGE_TEST_POSTGRES_URL` environment variable
182    /// with a valid connection url [1].
183    ///
184    /// [1]: https://docs.rs/tokio-postgres/latest/tokio_postgres/config/struct.Config.html#url
185    pub fn new_for_test() -> Result<Option<Self>, Error> {
186        let url = match std::env::var(Self::EXTERNAL_TESTS_POSTGRES_URL) {
187            Ok(url) => SensitiveUrl::from_str(&url).map_err(|e| e.to_string())?,
188            Err(_) => {
189                if mz_ore::env::is_var_truthy("CI") {
190                    panic!("CI is supposed to run this test but something has gone wrong!");
191                }
192                return Ok(None);
193            }
194        };
195
196        struct TestConsensusKnobs;
197        impl std::fmt::Debug for TestConsensusKnobs {
198            fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
199                f.debug_struct("TestConsensusKnobs").finish_non_exhaustive()
200            }
201        }
202        impl PostgresClientKnobs for TestConsensusKnobs {
203            fn connection_pool_max_size(&self) -> usize {
204                2
205            }
206
207            fn connection_pool_max_wait(&self) -> Option<Duration> {
208                Some(Duration::from_secs(1))
209            }
210
211            fn connection_pool_ttl(&self) -> Duration {
212                Duration::MAX
213            }
214            fn connection_pool_ttl_stagger(&self) -> Duration {
215                Duration::MAX
216            }
217            fn connect_timeout(&self) -> Duration {
218                Duration::MAX
219            }
220            fn tcp_user_timeout(&self) -> Duration {
221                Duration::ZERO
222            }
223
224            fn keepalives_idle(&self) -> Duration {
225                Duration::from_secs(10)
226            }
227
228            fn keepalives_interval(&self) -> Duration {
229                Duration::from_secs(5)
230            }
231
232            fn keepalives_retries(&self) -> u32 {
233                5
234            }
235        }
236
237        let dyncfg = ConfigSet::default().add(&USE_POSTGRES_TUNED_QUERIES);
238        let config = PostgresConsensusConfig::new(
239            &url,
240            Box::new(TestConsensusKnobs),
241            PostgresClientMetrics::new(&MetricsRegistry::new(), "mz_persist"),
242            Arc::new(dyncfg),
243        )?;
244        Ok(Some(config))
245    }
246}
247
248/// What flavor of Postgres are we connected to for consensus.
249#[derive(Debug, Clone, Copy, PartialEq, Eq)]
250enum PostgresMode {
251    /// CockroachDB, used in our cloud offering.
252    CockroachDB,
253    /// Vanilla Postgres, the default for our self-hosted offering.
254    Postgres,
255}
256
257/// Implementation of [Consensus] over a Postgres database.
258pub struct PostgresConsensus {
259    postgres_client: PostgresClient,
260    dyncfg: Arc<ConfigSet>,
261    mode: PostgresMode,
262}
263
264impl std::fmt::Debug for PostgresConsensus {
265    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
266        f.debug_struct("PostgresConsensus").finish_non_exhaustive()
267    }
268}
269
270impl PostgresConsensus {
271    /// Open a Postgres [Consensus] instance with `config`, for the collection
272    /// named `shard`.
273    pub async fn open(config: PostgresConsensusConfig) -> Result<Self, ExternalError> {
274        // don't need to unredact here because we just want to pull out the username
275        let pg_config: Config = config.url.to_string().parse()?;
276        let role = pg_config.get_user().expect("failed to get PostgreSQL user");
277        let create_schema = format!(
278            "CREATE SCHEMA IF NOT EXISTS consensus AUTHORIZATION {}",
279            escape_identifier(role),
280        );
281
282        let dyncfg = Arc::clone(&config.dyncfg);
283        let postgres_client = PostgresClient::open(config.into())?;
284
285        let client = postgres_client.get_connection().await?;
286
287        let mode = match pg_batch_execute(
288            &client,
289            &format!(
290                "{}; {}{}; {};",
291                create_schema, SCHEMA, CRDB_SCHEMA_OPTIONS, CRDB_CONFIGURE_ZONE,
292            ),
293        )
294        .await
295        {
296            Ok(()) => PostgresMode::CockroachDB,
297            Err(e) if e.code() == Some(&SqlState::INSUFFICIENT_PRIVILEGE) => {
298                warn!(
299                    "unable to ALTER TABLE consensus, this is expected and OK when connecting with a read-only user"
300                );
301                PostgresMode::CockroachDB
302            }
303            // Vanilla Postgres doesn't support the Cockroach zone configuration
304            // that we attempted, so we use that to determine what mode we're in.
305            Err(e)
306                if e.code() == Some(&SqlState::INVALID_PARAMETER_VALUE)
307                    || e.code() == Some(&SqlState::SYNTAX_ERROR) =>
308            {
309                info!(
310                    "unable to initiate consensus with CRDB params, this is expected and OK when running against Postgres: {:?}",
311                    e
312                );
313                PostgresMode::Postgres
314            }
315            Err(e) => return Err(e.into()),
316        };
317
318        if mode != PostgresMode::CockroachDB {
319            pg_batch_execute(&client, &format!("{}; {};", create_schema, SCHEMA)).await?;
320        }
321
322        Ok(PostgresConsensus {
323            postgres_client,
324            dyncfg,
325            mode,
326        })
327    }
328
329    /// Drops and recreates the `consensus` table in Postgres
330    ///
331    /// ONLY FOR TESTING
332    pub async fn drop_and_recreate(&self) -> Result<(), ExternalError> {
333        // this could be a TRUNCATE if we're confident the db won't reuse any state
334        let client = self.get_connection().await?;
335        pg_batch_execute(&client, "DROP TABLE consensus").await?;
336        let crdb_mode = match pg_batch_execute(
337            &client,
338            &format!("{}{}; {}", SCHEMA, CRDB_SCHEMA_OPTIONS, CRDB_CONFIGURE_ZONE,),
339        )
340        .await
341        {
342            Ok(()) => true,
343            Err(e) if e.code() == Some(&SqlState::INSUFFICIENT_PRIVILEGE) => {
344                warn!(
345                    "unable to ALTER TABLE consensus, this is expected and OK when connecting with a read-only user"
346                );
347                true
348            }
349            Err(e)
350                if e.code() == Some(&SqlState::INVALID_PARAMETER_VALUE)
351                    || e.code() == Some(&SqlState::SYNTAX_ERROR) =>
352            {
353                info!(
354                    "unable to initiate consensus with CRDB params, this is expected and OK when running against Postgres: {:?}",
355                    e
356                );
357                false
358            }
359            Err(e) => return Err(e.into()),
360        };
361
362        if !crdb_mode {
363            pg_batch_execute(&client, SCHEMA).await?;
364        }
365        Ok(())
366    }
367
368    async fn get_connection(&self) -> Result<Object, PoolError> {
369        self.postgres_client.get_connection().await
370    }
371}
372
373#[async_trait]
374impl Consensus for PostgresConsensus {
375    fn list_keys(&self) -> ResultStream<'_, String> {
376        let q = "SELECT DISTINCT shard FROM consensus";
377
378        Box::pin(try_stream! {
379            // NB: it's important that we hang on to this client for the lifetime of the stream,
380            // to avoid returning it to the pool prematurely.
381            let client = self.get_connection().await?;
382            let statement = client.prepare_cached(q).await?;
383            let params: &[String] = &[];
384            let mut rows = Box::pin(client.query_raw(&statement, params).await?);
385            while let Some(row) = rows.next().await {
386                let shard: String = row?.try_get("shard")?;
387                yield shard;
388            }
389        })
390    }
391
392    async fn head(&self, key: &str) -> Result<Option<VersionedData>, ExternalError> {
393        let q = "SELECT sequence_number, data FROM consensus
394             WHERE shard = $1 ORDER BY sequence_number DESC LIMIT 1";
395        let row = {
396            let client = self.get_connection().await?;
397            let statement = client.prepare_cached(q).await?;
398            pg_query_opt_prepared(&client, &statement, &[&key]).await?
399        };
400        let row = match row {
401            None => return Ok(None),
402            Some(row) => row,
403        };
404
405        let seqno: SeqNo = row.try_get("sequence_number")?;
406
407        let data: Vec<u8> = row.try_get("data")?;
408        Ok(Some(VersionedData {
409            seqno,
410            data: Bytes::from(data),
411        }))
412    }
413
414    async fn compare_and_set(
415        &self,
416        key: &str,
417        new: VersionedData,
418    ) -> Result<CaSResult, ExternalError> {
419        let expected = new.seqno.previous();
420
421        let result = if let Some(expected) = expected {
422            /// This query has been written to execute within a single
423            /// network round-trip. The insert performance has been tuned
424            /// against CockroachDB, ensuring it goes through the fast-path
425            /// 1-phase commit of CRDB. Any changes to this query should
426            /// confirm an EXPLAIN ANALYZE (VERBOSE) query plan contains
427            /// `auto commit`
428            static CRDB_CAS_QUERY: &str = "
429                INSERT INTO consensus (shard, sequence_number, data)
430                SELECT $1, $2, $3
431                WHERE (SELECT sequence_number FROM consensus
432                       WHERE shard = $1
433                       ORDER BY sequence_number DESC LIMIT 1) = $4;
434            ";
435
436            /// This query has been written to ensure we only get row level
437            /// locks on the `(shard, seq_no)` we're trying to update. The insert
438            /// performance has been tuned against Postgres 15 to ensure it
439            /// minimizes possible serialization conflicts.
440            static POSTGRES_CAS_QUERY: &str = "
441            WITH last_seq AS (
442                SELECT sequence_number FROM consensus
443                WHERE shard = $1
444                ORDER BY sequence_number DESC
445                LIMIT 1
446                FOR UPDATE
447            )
448            INSERT INTO consensus (shard, sequence_number, data)
449            SELECT $1, $2, $3
450            FROM last_seq
451            WHERE last_seq.sequence_number = $4;
452            ";
453
454            let q = if USE_POSTGRES_TUNED_QUERIES.get(&self.dyncfg)
455                && self.mode == PostgresMode::Postgres
456            {
457                POSTGRES_CAS_QUERY
458            } else {
459                CRDB_CAS_QUERY
460            };
461            let client = self.get_connection().await?;
462            let statement = client.prepare_cached(q).await?;
463            pg_execute_prepared(
464                &client,
465                &statement,
466                &[&key, &new.seqno, &new.data.as_ref(), &expected],
467            )
468            .await?
469        } else {
470            // Insert the new row as long as no other row exists for the same shard.
471            let q = "INSERT INTO consensus SELECT $1, $2, $3 WHERE
472                     NOT EXISTS (
473                         SELECT * FROM consensus WHERE shard = $1
474                     )
475                     ON CONFLICT DO NOTHING";
476            let client = self.get_connection().await?;
477            let statement = client.prepare_cached(q).await?;
478            pg_execute_prepared(&client, &statement, &[&key, &new.seqno, &new.data.as_ref()])
479                .await?
480        };
481
482        if result == 1 {
483            Ok(CaSResult::Committed)
484        } else {
485            Ok(CaSResult::ExpectationMismatch)
486        }
487    }
488
489    async fn scan(
490        &self,
491        key: &str,
492        from: SeqNo,
493        limit: usize,
494    ) -> Result<Vec<VersionedData>, ExternalError> {
495        let q = "SELECT sequence_number, data FROM consensus
496             WHERE shard = $1 AND sequence_number >= $2
497             ORDER BY sequence_number ASC LIMIT $3";
498        let Ok(limit) = i64::try_from(limit) else {
499            return Err(ExternalError::from(anyhow!(
500                "limit must be [0, i64::MAX]. was: {:?}",
501                limit
502            )));
503        };
504        let rows = {
505            let client = self.get_connection().await?;
506            let statement = client.prepare_cached(q).await?;
507            pg_query_prepared(&client, &statement, &[&key, &from, &limit]).await?
508        };
509        let mut results = Vec::with_capacity(rows.len());
510
511        for row in rows {
512            let seqno: SeqNo = row.try_get("sequence_number")?;
513            let data: Vec<u8> = row.try_get("data")?;
514            results.push(VersionedData {
515                seqno,
516                data: Bytes::from(data),
517            });
518        }
519        Ok(results)
520    }
521
522    async fn truncate(&self, key: &str, seqno: SeqNo) -> Result<Option<usize>, ExternalError> {
523        static CRDB_TRUNCATE_QUERY: &str = "
524        DELETE FROM consensus
525        WHERE shard = $1 AND sequence_number < $2 AND
526        EXISTS (
527            SELECT * FROM consensus WHERE shard = $1 AND sequence_number >= $2
528        )
529        ";
530
531        /// This query has been specifically tuned to ensure we get the minimal
532        /// number of __row__ locks possible, and that it doesn't conflict with
533        /// concurrently running compare and swap operations that are trying to
534        /// evolve the shard.
535        ///
536        /// It's performance has been benchmarked against Postgres 15.
537        ///
538        /// Note: The `ORDER BY` in the newer_exists CTE exists so we obtain a
539        /// row lock on the lowest possible sequence number. This ensures
540        /// minimal conflict between concurrently running truncate and append
541        /// operations.
542        static POSTGRES_TRUNCATE_QUERY: &str = "
543        WITH newer_exists AS (
544            SELECT * FROM consensus
545            WHERE shard = $1
546                AND sequence_number >= $2
547            ORDER BY sequence_number ASC
548            LIMIT 1
549            FOR UPDATE
550        ),
551        to_lock AS (
552            SELECT ctid FROM consensus
553            WHERE shard = $1
554            AND sequence_number < $2
555            AND EXISTS (SELECT * FROM newer_exists)
556            ORDER BY sequence_number DESC
557            FOR UPDATE
558        )
559        DELETE FROM consensus
560        USING to_lock
561        WHERE consensus.ctid = to_lock.ctid;
562        ";
563
564        let q = if USE_POSTGRES_TUNED_QUERIES.get(&self.dyncfg)
565            && self.mode == PostgresMode::Postgres
566        {
567            POSTGRES_TRUNCATE_QUERY
568        } else {
569            CRDB_TRUNCATE_QUERY
570        };
571        let result = {
572            let client = self.get_connection().await?;
573            let statement = client.prepare_cached(q).await?;
574            pg_execute_prepared(&client, &statement, &[&key, &seqno]).await?
575        };
576        if result == 0 {
577            // We weren't able to successfully truncate any rows inspect head to
578            // determine whether the request was valid and there were no records in
579            // the provided range, or the request was invalid because it would have
580            // also deleted head.
581
582            // It's safe to call head in a subsequent transaction rather than doing
583            // so directly in the same transaction because, once a given (seqno, data)
584            // pair exists for our shard, we enforce the invariants that
585            // 1. Our shard will always have _some_ data mapped to it.
586            // 2. All operations that modify the (seqno, data) can only increase
587            //    the sequence number.
588            let current = self.head(key).await?;
589            if current.map_or(true, |data| data.seqno < seqno) {
590                return Err(ExternalError::from(anyhow!(
591                    "upper bound too high for truncate: {:?}",
592                    seqno
593                )));
594            }
595        }
596
597        Ok(Some(usize::cast_from(result)))
598    }
599}
600
601#[cfg(test)]
602mod tests {
603    use mz_ore::assert_err;
604    use tracing::info;
605    use uuid::Uuid;
606
607    use crate::location::tests::consensus_impl_test;
608
609    use super::*;
610
611    #[mz_ore::test(tokio::test(flavor = "multi_thread"))]
612    #[cfg_attr(miri, ignore)] // error: unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
613    async fn postgres_consensus() -> Result<(), ExternalError> {
614        let config = match PostgresConsensusConfig::new_for_test()? {
615            Some(config) => config,
616            None => {
617                info!(
618                    "{} env not set: skipping test that uses external service",
619                    PostgresConsensusConfig::EXTERNAL_TESTS_POSTGRES_URL
620                );
621                return Ok(());
622            }
623        };
624
625        consensus_impl_test(|| PostgresConsensus::open(config.clone())).await?;
626
627        // and now verify the implementation-specific `drop_and_recreate` works as intended
628        let consensus = PostgresConsensus::open(config.clone()).await?;
629        let key = Uuid::new_v4().to_string();
630        let state = VersionedData {
631            seqno: SeqNo(0),
632            data: Bytes::from("abc"),
633        };
634
635        assert_eq!(
636            consensus.compare_and_set(&key, state.clone()).await,
637            Ok(CaSResult::Committed),
638        );
639
640        assert_eq!(consensus.head(&key).await, Ok(Some(state.clone())));
641
642        consensus.drop_and_recreate().await?;
643
644        assert_eq!(consensus.head(&key).await, Ok(None));
645
646        // This should be a separate postgres_consensus_blocking test, but nextest makes it
647        // difficult since we can't specify that both tests touch the consensus table and thus
648        // interfere with each other.
649        let config = match PostgresConsensusConfig::new_for_test()? {
650            Some(config) => config,
651            None => {
652                info!(
653                    "{} env not set: skipping test that uses external service",
654                    PostgresConsensusConfig::EXTERNAL_TESTS_POSTGRES_URL
655                );
656                return Ok(());
657            }
658        };
659
660        let consensus: PostgresConsensus = PostgresConsensus::open(config.clone()).await?;
661        // Max size in test is 2... let's saturate the pool.
662        let _conn1 = consensus.get_connection().await?;
663        let _conn2 = consensus.get_connection().await?;
664
665        // And finally, we should see the next connect time out.
666        let conn3 = consensus.get_connection().await;
667
668        assert_err!(conn3);
669
670        Ok(())
671    }
672}