Skip to main content

mz_persist/
postgres.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Implementation of [Consensus] backed by Postgres.
11
12use std::fmt::Formatter;
13use std::str::FromStr;
14use std::sync::Arc;
15use std::time::Duration;
16
17use anyhow::anyhow;
18use async_stream::try_stream;
19use async_trait::async_trait;
20use bytes::Bytes;
21use deadpool_postgres::tokio_postgres::Config;
22use deadpool_postgres::tokio_postgres::types::{FromSql, IsNull, ToSql, Type, to_sql_checked};
23use deadpool_postgres::{Object, PoolError};
24use futures_util::StreamExt;
25use mz_dyncfg::ConfigSet;
26use mz_ore::cast::CastFrom;
27use mz_ore::metrics::MetricsRegistry;
28use mz_ore::url::SensitiveUrl;
29use mz_postgres_client::metrics::PostgresClientMetrics;
30use mz_postgres_client::{PostgresClient, PostgresClientConfig, PostgresClientKnobs};
31use postgres_protocol::escape::escape_identifier;
32use tokio_postgres::error::SqlState;
33use tracing::{info, warn};
34
35use crate::error::Error;
36use crate::location::{CaSResult, Consensus, ExternalError, ResultStream, SeqNo, VersionedData};
37
38/// Flag to use concensus queries that are tuned for vanilla Postgres.
39pub const USE_POSTGRES_TUNED_QUERIES: mz_dyncfg::Config<bool> = mz_dyncfg::Config::new(
40    "persist_use_postgres_tuned_queries",
41    false,
42    "Use a set of queries for consensus that have specifically been tuned against
43    Postgres to ensure we acquire a minimal number of locks.",
44);
45
46const SCHEMA: &str = "
47CREATE TABLE IF NOT EXISTS consensus (
48    shard text NOT NULL,
49    sequence_number bigint NOT NULL,
50    data bytea NOT NULL,
51    PRIMARY KEY(shard, sequence_number)
52)
53";
54
55// These `sql_stats_automatic_collection_enabled` are for the cost-based
56// optimizer but all the queries against this table are single-table and very
57// carefully tuned to hit the primary index, so the cost-based optimizer doesn't
58// really get us anything. OTOH, the background jobs that crdb creates to
59// collect these stats fill up the jobs table (slowing down all sorts of
60// things).
61const CRDB_SCHEMA_OPTIONS: &str = "WITH (sql_stats_automatic_collection_enabled = false)";
62// The `consensus` table creates and deletes rows at a high frequency, generating many
63// tombstoned rows. If Cockroach's GC interval is set high (the default is 25h) and
64// these tombstones accumulate, scanning over the table will take increasingly and
65// prohibitively long.
66//
67// See: https://github.com/MaterializeInc/database-issues/issues/4001
68// See: https://www.cockroachlabs.com/docs/stable/configure-zone.html#variables
69const CRDB_CONFIGURE_ZONE: &str = "ALTER TABLE consensus CONFIGURE ZONE USING gc.ttlseconds = 600";
70
71impl ToSql for SeqNo {
72    fn to_sql(
73        &self,
74        ty: &Type,
75        w: &mut bytes::BytesMut,
76    ) -> Result<IsNull, Box<dyn std::error::Error + Sync + Send>> {
77        // We can only represent sequence numbers in the range [0, i64::MAX].
78        let value = i64::try_from(self.0)?;
79        <i64 as ToSql>::to_sql(&value, ty, w)
80    }
81
82    fn accepts(ty: &Type) -> bool {
83        <i64 as ToSql>::accepts(ty)
84    }
85
86    to_sql_checked!();
87}
88
89impl<'a> FromSql<'a> for SeqNo {
90    fn from_sql(
91        ty: &Type,
92        raw: &'a [u8],
93    ) -> Result<SeqNo, Box<dyn std::error::Error + Sync + Send>> {
94        let sequence_number = <i64 as FromSql>::from_sql(ty, raw)?;
95
96        // Sanity check that the sequence number we received falls in the
97        // [0, i64::MAX] range.
98        let sequence_number = u64::try_from(sequence_number)?;
99        Ok(SeqNo(sequence_number))
100    }
101
102    fn accepts(ty: &Type) -> bool {
103        <i64 as FromSql>::accepts(ty)
104    }
105}
106
107/// Configuration to connect to a Postgres backed implementation of [Consensus].
108#[derive(Clone, Debug)]
109pub struct PostgresConsensusConfig {
110    url: SensitiveUrl,
111    knobs: Arc<dyn PostgresClientKnobs>,
112    metrics: PostgresClientMetrics,
113    dyncfg: Arc<ConfigSet>,
114}
115
116impl From<PostgresConsensusConfig> for PostgresClientConfig {
117    fn from(config: PostgresConsensusConfig) -> Self {
118        PostgresClientConfig::new(config.url, config.knobs, config.metrics)
119    }
120}
121
122impl PostgresConsensusConfig {
123    const EXTERNAL_TESTS_POSTGRES_URL: &'static str =
124        "MZ_PERSIST_EXTERNAL_STORAGE_TEST_POSTGRES_URL";
125
126    /// Returns a new [PostgresConsensusConfig] for use in production.
127    pub fn new(
128        url: &SensitiveUrl,
129        knobs: Box<dyn PostgresClientKnobs>,
130        metrics: PostgresClientMetrics,
131        dyncfg: Arc<ConfigSet>,
132    ) -> Result<Self, Error> {
133        Ok(PostgresConsensusConfig {
134            url: url.clone(),
135            knobs: Arc::from(knobs),
136            metrics,
137            dyncfg,
138        })
139    }
140
141    /// Returns a new [PostgresConsensusConfig] for use in unit tests.
142    ///
143    /// By default, persist tests that use external storage (like Postgres) are
144    /// no-ops so that `cargo test` works on new environments without any
145    /// configuration. To activate the tests for [PostgresConsensus] set the
146    /// `MZ_PERSIST_EXTERNAL_STORAGE_TEST_POSTGRES_URL` environment variable
147    /// with a valid connection url [1].
148    ///
149    /// [1]: https://docs.rs/tokio-postgres/latest/tokio_postgres/config/struct.Config.html#url
150    pub fn new_for_test() -> Result<Option<Self>, Error> {
151        let url = match std::env::var(Self::EXTERNAL_TESTS_POSTGRES_URL) {
152            Ok(url) => SensitiveUrl::from_str(&url).map_err(|e| e.to_string())?,
153            Err(_) => {
154                if mz_ore::env::is_var_truthy("CI") {
155                    panic!("CI is supposed to run this test but something has gone wrong!");
156                }
157                return Ok(None);
158            }
159        };
160
161        struct TestConsensusKnobs;
162        impl std::fmt::Debug for TestConsensusKnobs {
163            fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
164                f.debug_struct("TestConsensusKnobs").finish_non_exhaustive()
165            }
166        }
167        impl PostgresClientKnobs for TestConsensusKnobs {
168            fn connection_pool_max_size(&self) -> usize {
169                2
170            }
171
172            fn connection_pool_max_wait(&self) -> Option<Duration> {
173                Some(Duration::from_secs(1))
174            }
175
176            fn connection_pool_ttl(&self) -> Duration {
177                Duration::MAX
178            }
179            fn connection_pool_ttl_stagger(&self) -> Duration {
180                Duration::MAX
181            }
182            fn connect_timeout(&self) -> Duration {
183                Duration::MAX
184            }
185            fn tcp_user_timeout(&self) -> Duration {
186                Duration::ZERO
187            }
188
189            fn keepalives_idle(&self) -> Duration {
190                Duration::from_secs(10)
191            }
192
193            fn keepalives_interval(&self) -> Duration {
194                Duration::from_secs(5)
195            }
196
197            fn keepalives_retries(&self) -> u32 {
198                5
199            }
200        }
201
202        let dyncfg = ConfigSet::default().add(&USE_POSTGRES_TUNED_QUERIES);
203        let config = PostgresConsensusConfig::new(
204            &url,
205            Box::new(TestConsensusKnobs),
206            PostgresClientMetrics::new(&MetricsRegistry::new(), "mz_persist"),
207            Arc::new(dyncfg),
208        )?;
209        Ok(Some(config))
210    }
211}
212
213/// What flavor of Postgres are we connected to for consensus.
214#[derive(Debug, Clone, Copy, PartialEq, Eq)]
215enum PostgresMode {
216    /// CockroachDB, used in our cloud offering.
217    CockroachDB,
218    /// Vanilla Postgres, the default for our self-hosted offering.
219    Postgres,
220}
221
222/// Implementation of [Consensus] over a Postgres database.
223pub struct PostgresConsensus {
224    postgres_client: PostgresClient,
225    dyncfg: Arc<ConfigSet>,
226    mode: PostgresMode,
227}
228
229impl std::fmt::Debug for PostgresConsensus {
230    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
231        f.debug_struct("PostgresConsensus").finish_non_exhaustive()
232    }
233}
234
235impl PostgresConsensus {
236    /// Open a Postgres [Consensus] instance with `config`, for the collection
237    /// named `shard`.
238    pub async fn open(config: PostgresConsensusConfig) -> Result<Self, ExternalError> {
239        // don't need to unredact here because we just want to pull out the username
240        let pg_config: Config = config.url.to_string().parse()?;
241        let role = pg_config.get_user().expect("failed to get PostgreSQL user");
242        let create_schema = format!(
243            "CREATE SCHEMA IF NOT EXISTS consensus AUTHORIZATION {}",
244            escape_identifier(role),
245        );
246
247        let dyncfg = Arc::clone(&config.dyncfg);
248        let postgres_client = PostgresClient::open(config.into())?;
249
250        let client = postgres_client.get_connection().await?;
251
252        let mode = match client
253            .batch_execute(&format!(
254                "{}; {}{}; {};",
255                create_schema, SCHEMA, CRDB_SCHEMA_OPTIONS, CRDB_CONFIGURE_ZONE,
256            ))
257            .await
258        {
259            Ok(()) => PostgresMode::CockroachDB,
260            Err(e) if e.code() == Some(&SqlState::INSUFFICIENT_PRIVILEGE) => {
261                warn!(
262                    "unable to ALTER TABLE consensus, this is expected and OK when connecting with a read-only user"
263                );
264                PostgresMode::CockroachDB
265            }
266            // Vanilla Postgres doesn't support the Cockroach zone configuration
267            // that we attempted, so we use that to determine what mode we're in.
268            Err(e)
269                if e.code() == Some(&SqlState::INVALID_PARAMETER_VALUE)
270                    || e.code() == Some(&SqlState::SYNTAX_ERROR) =>
271            {
272                info!(
273                    "unable to initiate consensus with CRDB params, this is expected and OK when running against Postgres: {:?}",
274                    e
275                );
276                PostgresMode::Postgres
277            }
278            Err(e) => return Err(e.into()),
279        };
280
281        if mode != PostgresMode::CockroachDB {
282            client
283                .batch_execute(&format!("{}; {};", create_schema, SCHEMA))
284                .await?;
285        }
286
287        Ok(PostgresConsensus {
288            postgres_client,
289            dyncfg,
290            mode,
291        })
292    }
293
294    /// Drops and recreates the `consensus` table in Postgres
295    ///
296    /// ONLY FOR TESTING
297    pub async fn drop_and_recreate(&self) -> Result<(), ExternalError> {
298        // this could be a TRUNCATE if we're confident the db won't reuse any state
299        let client = self.get_connection().await?;
300        client.execute("DROP TABLE consensus", &[]).await?;
301        let crdb_mode = match client
302            .batch_execute(&format!(
303                "{}{}; {}",
304                SCHEMA, CRDB_SCHEMA_OPTIONS, CRDB_CONFIGURE_ZONE,
305            ))
306            .await
307        {
308            Ok(()) => true,
309            Err(e) if e.code() == Some(&SqlState::INSUFFICIENT_PRIVILEGE) => {
310                warn!(
311                    "unable to ALTER TABLE consensus, this is expected and OK when connecting with a read-only user"
312                );
313                true
314            }
315            Err(e)
316                if e.code() == Some(&SqlState::INVALID_PARAMETER_VALUE)
317                    || e.code() == Some(&SqlState::SYNTAX_ERROR) =>
318            {
319                info!(
320                    "unable to initiate consensus with CRDB params, this is expected and OK when running against Postgres: {:?}",
321                    e
322                );
323                false
324            }
325            Err(e) => return Err(e.into()),
326        };
327
328        if !crdb_mode {
329            client.execute(SCHEMA, &[]).await?;
330        }
331        Ok(())
332    }
333
334    async fn get_connection(&self) -> Result<Object, PoolError> {
335        self.postgres_client.get_connection().await
336    }
337}
338
339#[async_trait]
340impl Consensus for PostgresConsensus {
341    fn list_keys(&self) -> ResultStream<'_, String> {
342        let q = "SELECT DISTINCT shard FROM consensus";
343
344        Box::pin(try_stream! {
345            // NB: it's important that we hang on to this client for the lifetime of the stream,
346            // to avoid returning it to the pool prematurely.
347            let client = self.get_connection().await?;
348            let statement = client.prepare_cached(q).await?;
349            let params: &[String] = &[];
350            let mut rows = Box::pin(client.query_raw(&statement, params).await?);
351            while let Some(row) = rows.next().await {
352                let shard: String = row?.try_get("shard")?;
353                yield shard;
354            }
355        })
356    }
357
358    async fn head(&self, key: &str) -> Result<Option<VersionedData>, ExternalError> {
359        let q = "SELECT sequence_number, data FROM consensus
360             WHERE shard = $1 ORDER BY sequence_number DESC LIMIT 1";
361        let row = {
362            let client = self.get_connection().await?;
363            let statement = client.prepare_cached(q).await?;
364            client.query_opt(&statement, &[&key]).await?
365        };
366        let row = match row {
367            None => return Ok(None),
368            Some(row) => row,
369        };
370
371        let seqno: SeqNo = row.try_get("sequence_number")?;
372
373        let data: Vec<u8> = row.try_get("data")?;
374        Ok(Some(VersionedData {
375            seqno,
376            data: Bytes::from(data),
377        }))
378    }
379
380    async fn compare_and_set(
381        &self,
382        key: &str,
383        new: VersionedData,
384    ) -> Result<CaSResult, ExternalError> {
385        let expected = new.seqno.previous();
386
387        let result = if let Some(expected) = expected {
388            /// This query has been written to execute within a single
389            /// network round-trip. The insert performance has been tuned
390            /// against CockroachDB, ensuring it goes through the fast-path
391            /// 1-phase commit of CRDB. Any changes to this query should
392            /// confirm an EXPLAIN ANALYZE (VERBOSE) query plan contains
393            /// `auto commit`
394            static CRDB_CAS_QUERY: &str = "
395                INSERT INTO consensus (shard, sequence_number, data)
396                SELECT $1, $2, $3
397                WHERE (SELECT sequence_number FROM consensus
398                       WHERE shard = $1
399                       ORDER BY sequence_number DESC LIMIT 1) = $4;
400            ";
401
402            /// This query has been written to ensure we only get row level
403            /// locks on the `(shard, seq_no)` we're trying to update. The insert
404            /// performance has been tuned against Postgres 15 to ensure it
405            /// minimizes possible serialization conflicts.
406            static POSTGRES_CAS_QUERY: &str = "
407            WITH last_seq AS (
408                SELECT sequence_number FROM consensus
409                WHERE shard = $1
410                ORDER BY sequence_number DESC
411                LIMIT 1
412                FOR UPDATE
413            )
414            INSERT INTO consensus (shard, sequence_number, data)
415            SELECT $1, $2, $3
416            FROM last_seq
417            WHERE last_seq.sequence_number = $4;
418            ";
419
420            let q = if USE_POSTGRES_TUNED_QUERIES.get(&self.dyncfg)
421                && self.mode == PostgresMode::Postgres
422            {
423                POSTGRES_CAS_QUERY
424            } else {
425                CRDB_CAS_QUERY
426            };
427            let client = self.get_connection().await?;
428            let statement = client.prepare_cached(q).await?;
429            client
430                .execute(
431                    &statement,
432                    &[&key, &new.seqno, &new.data.as_ref(), &expected],
433                )
434                .await?
435        } else {
436            // Insert the new row as long as no other row exists for the same shard.
437            let q = "INSERT INTO consensus SELECT $1, $2, $3 WHERE
438                     NOT EXISTS (
439                         SELECT * FROM consensus WHERE shard = $1
440                     )
441                     ON CONFLICT DO NOTHING";
442            let client = self.get_connection().await?;
443            let statement = client.prepare_cached(q).await?;
444            client
445                .execute(&statement, &[&key, &new.seqno, &new.data.as_ref()])
446                .await?
447        };
448
449        if result == 1 {
450            Ok(CaSResult::Committed)
451        } else {
452            Ok(CaSResult::ExpectationMismatch)
453        }
454    }
455
456    async fn scan(
457        &self,
458        key: &str,
459        from: SeqNo,
460        limit: usize,
461    ) -> Result<Vec<VersionedData>, ExternalError> {
462        let q = "SELECT sequence_number, data FROM consensus
463             WHERE shard = $1 AND sequence_number >= $2
464             ORDER BY sequence_number ASC LIMIT $3";
465        let Ok(limit) = i64::try_from(limit) else {
466            return Err(ExternalError::from(anyhow!(
467                "limit must be [0, i64::MAX]. was: {:?}",
468                limit
469            )));
470        };
471        let rows = {
472            let client = self.get_connection().await?;
473            let statement = client.prepare_cached(q).await?;
474            client.query(&statement, &[&key, &from, &limit]).await?
475        };
476        let mut results = Vec::with_capacity(rows.len());
477
478        for row in rows {
479            let seqno: SeqNo = row.try_get("sequence_number")?;
480            let data: Vec<u8> = row.try_get("data")?;
481            results.push(VersionedData {
482                seqno,
483                data: Bytes::from(data),
484            });
485        }
486        Ok(results)
487    }
488
489    async fn truncate(&self, key: &str, seqno: SeqNo) -> Result<Option<usize>, ExternalError> {
490        static CRDB_TRUNCATE_QUERY: &str = "
491        DELETE FROM consensus
492        WHERE shard = $1 AND sequence_number < $2 AND
493        EXISTS (
494            SELECT * FROM consensus WHERE shard = $1 AND sequence_number >= $2
495        )
496        ";
497
498        /// This query has been specifically tuned to ensure we get the minimal
499        /// number of __row__ locks possible, and that it doesn't conflict with
500        /// concurrently running compare and swap operations that are trying to
501        /// evolve the shard.
502        ///
503        /// It's performance has been benchmarked against Postgres 15.
504        ///
505        /// Note: The `ORDER BY` in the newer_exists CTE exists so we obtain a
506        /// row lock on the lowest possible sequence number. This ensures
507        /// minimal conflict between concurrently running truncate and append
508        /// operations.
509        static POSTGRES_TRUNCATE_QUERY: &str = "
510        WITH newer_exists AS (
511            SELECT * FROM consensus
512            WHERE shard = $1
513                AND sequence_number >= $2
514            ORDER BY sequence_number ASC
515            LIMIT 1
516            FOR UPDATE
517        ),
518        to_lock AS (
519            SELECT ctid FROM consensus
520            WHERE shard = $1
521            AND sequence_number < $2
522            AND EXISTS (SELECT * FROM newer_exists)
523            ORDER BY sequence_number DESC
524            FOR UPDATE
525        )
526        DELETE FROM consensus
527        USING to_lock
528        WHERE consensus.ctid = to_lock.ctid;
529        ";
530
531        let q = if USE_POSTGRES_TUNED_QUERIES.get(&self.dyncfg)
532            && self.mode == PostgresMode::Postgres
533        {
534            POSTGRES_TRUNCATE_QUERY
535        } else {
536            CRDB_TRUNCATE_QUERY
537        };
538        let result = {
539            let client = self.get_connection().await?;
540            let statement = client.prepare_cached(q).await?;
541            client.execute(&statement, &[&key, &seqno]).await?
542        };
543        if result == 0 {
544            // We weren't able to successfully truncate any rows inspect head to
545            // determine whether the request was valid and there were no records in
546            // the provided range, or the request was invalid because it would have
547            // also deleted head.
548
549            // It's safe to call head in a subsequent transaction rather than doing
550            // so directly in the same transaction because, once a given (seqno, data)
551            // pair exists for our shard, we enforce the invariants that
552            // 1. Our shard will always have _some_ data mapped to it.
553            // 2. All operations that modify the (seqno, data) can only increase
554            //    the sequence number.
555            let current = self.head(key).await?;
556            if current.map_or(true, |data| data.seqno < seqno) {
557                return Err(ExternalError::from(anyhow!(
558                    "upper bound too high for truncate: {:?}",
559                    seqno
560                )));
561            }
562        }
563
564        Ok(Some(usize::cast_from(result)))
565    }
566}
567
568#[cfg(test)]
569mod tests {
570    use mz_ore::assert_err;
571    use tracing::info;
572    use uuid::Uuid;
573
574    use crate::location::tests::consensus_impl_test;
575
576    use super::*;
577
578    #[mz_ore::test(tokio::test(flavor = "multi_thread"))]
579    #[cfg_attr(miri, ignore)] // error: unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
580    async fn postgres_consensus() -> Result<(), ExternalError> {
581        let config = match PostgresConsensusConfig::new_for_test()? {
582            Some(config) => config,
583            None => {
584                info!(
585                    "{} env not set: skipping test that uses external service",
586                    PostgresConsensusConfig::EXTERNAL_TESTS_POSTGRES_URL
587                );
588                return Ok(());
589            }
590        };
591
592        consensus_impl_test(|| PostgresConsensus::open(config.clone())).await?;
593
594        // and now verify the implementation-specific `drop_and_recreate` works as intended
595        let consensus = PostgresConsensus::open(config.clone()).await?;
596        let key = Uuid::new_v4().to_string();
597        let state = VersionedData {
598            seqno: SeqNo(0),
599            data: Bytes::from("abc"),
600        };
601
602        assert_eq!(
603            consensus.compare_and_set(&key, state.clone()).await,
604            Ok(CaSResult::Committed),
605        );
606
607        assert_eq!(consensus.head(&key).await, Ok(Some(state.clone())));
608
609        consensus.drop_and_recreate().await?;
610
611        assert_eq!(consensus.head(&key).await, Ok(None));
612
613        // This should be a separate postgres_consensus_blocking test, but nextest makes it
614        // difficult since we can't specify that both tests touch the consensus table and thus
615        // interfere with each other.
616        let config = match PostgresConsensusConfig::new_for_test()? {
617            Some(config) => config,
618            None => {
619                info!(
620                    "{} env not set: skipping test that uses external service",
621                    PostgresConsensusConfig::EXTERNAL_TESTS_POSTGRES_URL
622                );
623                return Ok(());
624            }
625        };
626
627        let consensus: PostgresConsensus = PostgresConsensus::open(config.clone()).await?;
628        // Max size in test is 2... let's saturate the pool.
629        let _conn1 = consensus.get_connection().await?;
630        let _conn2 = consensus.get_connection().await?;
631
632        // And finally, we should see the next connect time out.
633        let conn3 = consensus.get_connection().await;
634
635        assert_err!(conn3);
636
637        Ok(())
638    }
639}