mz_persist/
postgres.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Implementation of [Consensus] backed by Postgres.
11
12use std::fmt::Formatter;
13use std::str::FromStr;
14use std::sync::Arc;
15use std::time::Duration;
16
17use anyhow::anyhow;
18use async_stream::try_stream;
19use async_trait::async_trait;
20use bytes::Bytes;
21use deadpool_postgres::tokio_postgres::Config;
22use deadpool_postgres::tokio_postgres::types::{FromSql, IsNull, ToSql, Type, to_sql_checked};
23use deadpool_postgres::{Object, PoolError};
24use futures_util::StreamExt;
25use mz_dyncfg::ConfigSet;
26use mz_ore::cast::CastFrom;
27use mz_ore::metrics::MetricsRegistry;
28use mz_ore::url::SensitiveUrl;
29use mz_postgres_client::metrics::PostgresClientMetrics;
30use mz_postgres_client::{PostgresClient, PostgresClientConfig, PostgresClientKnobs};
31use postgres_protocol::escape::escape_identifier;
32use tokio_postgres::error::SqlState;
33use tracing::{info, warn};
34
35use crate::error::Error;
36use crate::location::{CaSResult, Consensus, ExternalError, ResultStream, SeqNo, VersionedData};
37
38/// Flag to use concensus queries that are tuned for vanilla Postgres.
39pub const USE_POSTGRES_TUNED_QUERIES: mz_dyncfg::Config<bool> = mz_dyncfg::Config::new(
40    "persist_use_postgres_tuned_queries",
41    false,
42    "Use a set of queries for consensus that have specifically been tuned against
43    Postgres to ensure we acquire a minimal number of locks.",
44);
45
46const SCHEMA: &str = "
47CREATE TABLE IF NOT EXISTS consensus (
48    shard text NOT NULL,
49    sequence_number bigint NOT NULL,
50    data bytea NOT NULL,
51    PRIMARY KEY(shard, sequence_number)
52)
53";
54
55// These `sql_stats_automatic_collection_enabled` are for the cost-based
56// optimizer but all the queries against this table are single-table and very
57// carefully tuned to hit the primary index, so the cost-based optimizer doesn't
58// really get us anything. OTOH, the background jobs that crdb creates to
59// collect these stats fill up the jobs table (slowing down all sorts of
60// things).
61const CRDB_SCHEMA_OPTIONS: &str = "WITH (sql_stats_automatic_collection_enabled = false)";
62// The `consensus` table creates and deletes rows at a high frequency, generating many
63// tombstoned rows. If Cockroach's GC interval is set high (the default is 25h) and
64// these tombstones accumulate, scanning over the table will take increasingly and
65// prohibitively long.
66//
67// See: https://github.com/MaterializeInc/database-issues/issues/4001
68// See: https://www.cockroachlabs.com/docs/stable/configure-zone.html#variables
69const CRDB_CONFIGURE_ZONE: &str = "ALTER TABLE consensus CONFIGURE ZONE USING gc.ttlseconds = 600";
70
71impl ToSql for SeqNo {
72    fn to_sql(
73        &self,
74        ty: &Type,
75        w: &mut bytes::BytesMut,
76    ) -> Result<IsNull, Box<dyn std::error::Error + Sync + Send>> {
77        // We can only represent sequence numbers in the range [0, i64::MAX].
78        let value = i64::try_from(self.0)?;
79        <i64 as ToSql>::to_sql(&value, ty, w)
80    }
81
82    fn accepts(ty: &Type) -> bool {
83        <i64 as ToSql>::accepts(ty)
84    }
85
86    to_sql_checked!();
87}
88
89impl<'a> FromSql<'a> for SeqNo {
90    fn from_sql(
91        ty: &Type,
92        raw: &'a [u8],
93    ) -> Result<SeqNo, Box<dyn std::error::Error + Sync + Send>> {
94        let sequence_number = <i64 as FromSql>::from_sql(ty, raw)?;
95
96        // Sanity check that the sequence number we received falls in the
97        // [0, i64::MAX] range.
98        let sequence_number = u64::try_from(sequence_number)?;
99        Ok(SeqNo(sequence_number))
100    }
101
102    fn accepts(ty: &Type) -> bool {
103        <i64 as FromSql>::accepts(ty)
104    }
105}
106
107/// Configuration to connect to a Postgres backed implementation of [Consensus].
108#[derive(Clone, Debug)]
109pub struct PostgresConsensusConfig {
110    url: SensitiveUrl,
111    knobs: Arc<dyn PostgresClientKnobs>,
112    metrics: PostgresClientMetrics,
113    dyncfg: Arc<ConfigSet>,
114}
115
116impl From<PostgresConsensusConfig> for PostgresClientConfig {
117    fn from(config: PostgresConsensusConfig) -> Self {
118        PostgresClientConfig::new(config.url, config.knobs, config.metrics)
119    }
120}
121
122impl PostgresConsensusConfig {
123    const EXTERNAL_TESTS_POSTGRES_URL: &'static str =
124        "MZ_PERSIST_EXTERNAL_STORAGE_TEST_POSTGRES_URL";
125
126    /// Returns a new [PostgresConsensusConfig] for use in production.
127    pub fn new(
128        url: &SensitiveUrl,
129        knobs: Box<dyn PostgresClientKnobs>,
130        metrics: PostgresClientMetrics,
131        dyncfg: Arc<ConfigSet>,
132    ) -> Result<Self, Error> {
133        Ok(PostgresConsensusConfig {
134            url: url.clone(),
135            knobs: Arc::from(knobs),
136            metrics,
137            dyncfg,
138        })
139    }
140
141    /// Returns a new [PostgresConsensusConfig] for use in unit tests.
142    ///
143    /// By default, persist tests that use external storage (like Postgres) are
144    /// no-ops so that `cargo test` works on new environments without any
145    /// configuration. To activate the tests for [PostgresConsensus] set the
146    /// `MZ_PERSIST_EXTERNAL_STORAGE_TEST_POSTGRES_URL` environment variable
147    /// with a valid connection url [1].
148    ///
149    /// [1]: https://docs.rs/tokio-postgres/latest/tokio_postgres/config/struct.Config.html#url
150    pub fn new_for_test() -> Result<Option<Self>, Error> {
151        let url = match std::env::var(Self::EXTERNAL_TESTS_POSTGRES_URL) {
152            Ok(url) => SensitiveUrl::from_str(&url).map_err(|e| e.to_string())?,
153            Err(_) => {
154                if mz_ore::env::is_var_truthy("CI") {
155                    panic!("CI is supposed to run this test but something has gone wrong!");
156                }
157                return Ok(None);
158            }
159        };
160
161        struct TestConsensusKnobs;
162        impl std::fmt::Debug for TestConsensusKnobs {
163            fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
164                f.debug_struct("TestConsensusKnobs").finish_non_exhaustive()
165            }
166        }
167        impl PostgresClientKnobs for TestConsensusKnobs {
168            fn connection_pool_max_size(&self) -> usize {
169                2
170            }
171
172            fn connection_pool_max_wait(&self) -> Option<Duration> {
173                Some(Duration::from_secs(1))
174            }
175
176            fn connection_pool_ttl(&self) -> Duration {
177                Duration::MAX
178            }
179            fn connection_pool_ttl_stagger(&self) -> Duration {
180                Duration::MAX
181            }
182            fn connect_timeout(&self) -> Duration {
183                Duration::MAX
184            }
185            fn tcp_user_timeout(&self) -> Duration {
186                Duration::ZERO
187            }
188        }
189
190        let dyncfg = ConfigSet::default().add(&USE_POSTGRES_TUNED_QUERIES);
191        let config = PostgresConsensusConfig::new(
192            &url,
193            Box::new(TestConsensusKnobs),
194            PostgresClientMetrics::new(&MetricsRegistry::new(), "mz_persist"),
195            Arc::new(dyncfg),
196        )?;
197        Ok(Some(config))
198    }
199}
200
201/// What flavor of Postgres are we connected to for consensus.
202#[derive(Debug, Clone, Copy, PartialEq, Eq)]
203enum PostgresMode {
204    /// CockroachDB, used in our cloud offering.
205    CockroachDB,
206    /// Vanilla Postgres, the default for our self-hosted offering.
207    Postgres,
208}
209
210/// Implementation of [Consensus] over a Postgres database.
211pub struct PostgresConsensus {
212    postgres_client: PostgresClient,
213    dyncfg: Arc<ConfigSet>,
214    mode: PostgresMode,
215}
216
217impl std::fmt::Debug for PostgresConsensus {
218    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
219        f.debug_struct("PostgresConsensus").finish_non_exhaustive()
220    }
221}
222
223impl PostgresConsensus {
224    /// Open a Postgres [Consensus] instance with `config`, for the collection
225    /// named `shard`.
226    pub async fn open(config: PostgresConsensusConfig) -> Result<Self, ExternalError> {
227        // don't need to unredact here because we just want to pull out the username
228        let pg_config: Config = config.url.to_string().parse()?;
229        let role = pg_config.get_user().unwrap();
230        let create_schema = format!(
231            "CREATE SCHEMA IF NOT EXISTS consensus AUTHORIZATION {}",
232            escape_identifier(role),
233        );
234
235        let dyncfg = Arc::clone(&config.dyncfg);
236        let postgres_client = PostgresClient::open(config.into())?;
237
238        let client = postgres_client.get_connection().await?;
239
240        let mode = match client
241            .batch_execute(&format!(
242                "{}; {}{}; {};",
243                create_schema, SCHEMA, CRDB_SCHEMA_OPTIONS, CRDB_CONFIGURE_ZONE,
244            ))
245            .await
246        {
247            Ok(()) => PostgresMode::CockroachDB,
248            Err(e) if e.code() == Some(&SqlState::INSUFFICIENT_PRIVILEGE) => {
249                warn!(
250                    "unable to ALTER TABLE consensus, this is expected and OK when connecting with a read-only user"
251                );
252                PostgresMode::CockroachDB
253            }
254            // Vanilla Postgres doesn't support the Cockroach zone configuration
255            // that we attempted, so we use that to determine what mode we're in.
256            Err(e)
257                if e.code() == Some(&SqlState::INVALID_PARAMETER_VALUE)
258                    || e.code() == Some(&SqlState::SYNTAX_ERROR) =>
259            {
260                info!(
261                    "unable to initiate consensus with CRDB params, this is expected and OK when running against Postgres: {:?}",
262                    e
263                );
264                PostgresMode::Postgres
265            }
266            Err(e) => return Err(e.into()),
267        };
268
269        if mode != PostgresMode::CockroachDB {
270            client
271                .batch_execute(&format!("{}; {};", create_schema, SCHEMA))
272                .await?;
273        }
274
275        Ok(PostgresConsensus {
276            postgres_client,
277            dyncfg,
278            mode,
279        })
280    }
281
282    /// Drops and recreates the `consensus` table in Postgres
283    ///
284    /// ONLY FOR TESTING
285    pub async fn drop_and_recreate(&self) -> Result<(), ExternalError> {
286        // this could be a TRUNCATE if we're confident the db won't reuse any state
287        let client = self.get_connection().await?;
288        client.execute("DROP TABLE consensus", &[]).await?;
289        let crdb_mode = match client
290            .batch_execute(&format!(
291                "{}{}; {}",
292                SCHEMA, CRDB_SCHEMA_OPTIONS, CRDB_CONFIGURE_ZONE,
293            ))
294            .await
295        {
296            Ok(()) => true,
297            Err(e) if e.code() == Some(&SqlState::INSUFFICIENT_PRIVILEGE) => {
298                warn!(
299                    "unable to ALTER TABLE consensus, this is expected and OK when connecting with a read-only user"
300                );
301                true
302            }
303            Err(e)
304                if e.code() == Some(&SqlState::INVALID_PARAMETER_VALUE)
305                    || e.code() == Some(&SqlState::SYNTAX_ERROR) =>
306            {
307                info!(
308                    "unable to initiate consensus with CRDB params, this is expected and OK when running against Postgres: {:?}",
309                    e
310                );
311                false
312            }
313            Err(e) => return Err(e.into()),
314        };
315
316        if !crdb_mode {
317            client.execute(SCHEMA, &[]).await?;
318        }
319        Ok(())
320    }
321
322    async fn get_connection(&self) -> Result<Object, PoolError> {
323        self.postgres_client.get_connection().await
324    }
325}
326
327#[async_trait]
328impl Consensus for PostgresConsensus {
329    fn list_keys(&self) -> ResultStream<String> {
330        let q = "SELECT DISTINCT shard FROM consensus";
331
332        Box::pin(try_stream! {
333            // NB: it's important that we hang on to this client for the lifetime of the stream,
334            // to avoid returning it to the pool prematurely.
335            let client = self.get_connection().await?;
336            let statement = client.prepare_cached(q).await?;
337            let params: &[String] = &[];
338            let mut rows = Box::pin(client.query_raw(&statement, params).await?);
339            while let Some(row) = rows.next().await {
340                let shard: String = row?.try_get("shard")?;
341                yield shard;
342            }
343        })
344    }
345
346    async fn head(&self, key: &str) -> Result<Option<VersionedData>, ExternalError> {
347        let q = "SELECT sequence_number, data FROM consensus
348             WHERE shard = $1 ORDER BY sequence_number DESC LIMIT 1";
349        let row = {
350            let client = self.get_connection().await?;
351            let statement = client.prepare_cached(q).await?;
352            client.query_opt(&statement, &[&key]).await?
353        };
354        let row = match row {
355            None => return Ok(None),
356            Some(row) => row,
357        };
358
359        let seqno: SeqNo = row.try_get("sequence_number")?;
360
361        let data: Vec<u8> = row.try_get("data")?;
362        Ok(Some(VersionedData {
363            seqno,
364            data: Bytes::from(data),
365        }))
366    }
367
368    async fn compare_and_set(
369        &self,
370        key: &str,
371        expected: Option<SeqNo>,
372        new: VersionedData,
373    ) -> Result<CaSResult, ExternalError> {
374        if let Some(expected) = expected {
375            if new.seqno <= expected {
376                return Err(Error::from(
377                        format!("new seqno must be strictly greater than expected. Got new: {:?} expected: {:?}",
378                                 new.seqno, expected)).into());
379            }
380        }
381
382        let result = if let Some(expected) = expected {
383            /// This query has been written to execute within a single
384            /// network round-trip. The insert performance has been tuned
385            /// against CockroachDB, ensuring it goes through the fast-path
386            /// 1-phase commit of CRDB. Any changes to this query should
387            /// confirm an EXPLAIN ANALYZE (VERBOSE) query plan contains
388            /// `auto commit`
389            static CRDB_CAS_QUERY: &str = "
390                INSERT INTO consensus (shard, sequence_number, data)
391                SELECT $1, $2, $3
392                WHERE (SELECT sequence_number FROM consensus
393                       WHERE shard = $1
394                       ORDER BY sequence_number DESC LIMIT 1) = $4;
395            ";
396
397            /// This query has been written to ensure we only get row level
398            /// locks on the `(shard, seq_no)` we're trying to update. The insert
399            /// performance has been tuned against Postgres 15 to ensure it
400            /// minimizes possible serialization conflicts.
401            static POSTGRES_CAS_QUERY: &str = "
402            WITH last_seq AS (
403                SELECT sequence_number FROM consensus
404                WHERE shard = $1
405                ORDER BY sequence_number DESC
406                LIMIT 1
407                FOR UPDATE
408            )
409            INSERT INTO consensus (shard, sequence_number, data)
410            SELECT $1, $2, $3
411            FROM last_seq
412            WHERE last_seq.sequence_number = $4;
413            ";
414
415            let q = if USE_POSTGRES_TUNED_QUERIES.get(&self.dyncfg)
416                && self.mode == PostgresMode::Postgres
417            {
418                POSTGRES_CAS_QUERY
419            } else {
420                CRDB_CAS_QUERY
421            };
422            let client = self.get_connection().await?;
423            let statement = client.prepare_cached(q).await?;
424            client
425                .execute(
426                    &statement,
427                    &[&key, &new.seqno, &new.data.as_ref(), &expected],
428                )
429                .await?
430        } else {
431            // Insert the new row as long as no other row exists for the same shard.
432            let q = "INSERT INTO consensus SELECT $1, $2, $3 WHERE
433                     NOT EXISTS (
434                         SELECT * FROM consensus WHERE shard = $1
435                     )
436                     ON CONFLICT DO NOTHING";
437            let client = self.get_connection().await?;
438            let statement = client.prepare_cached(q).await?;
439            client
440                .execute(&statement, &[&key, &new.seqno, &new.data.as_ref()])
441                .await?
442        };
443
444        if result == 1 {
445            Ok(CaSResult::Committed)
446        } else {
447            Ok(CaSResult::ExpectationMismatch)
448        }
449    }
450
451    async fn scan(
452        &self,
453        key: &str,
454        from: SeqNo,
455        limit: usize,
456    ) -> Result<Vec<VersionedData>, ExternalError> {
457        let q = "SELECT sequence_number, data FROM consensus
458             WHERE shard = $1 AND sequence_number >= $2
459             ORDER BY sequence_number ASC LIMIT $3";
460        let Ok(limit) = i64::try_from(limit) else {
461            return Err(ExternalError::from(anyhow!(
462                "limit must be [0, i64::MAX]. was: {:?}",
463                limit
464            )));
465        };
466        let rows = {
467            let client = self.get_connection().await?;
468            let statement = client.prepare_cached(q).await?;
469            client.query(&statement, &[&key, &from, &limit]).await?
470        };
471        let mut results = Vec::with_capacity(rows.len());
472
473        for row in rows {
474            let seqno: SeqNo = row.try_get("sequence_number")?;
475            let data: Vec<u8> = row.try_get("data")?;
476            results.push(VersionedData {
477                seqno,
478                data: Bytes::from(data),
479            });
480        }
481        Ok(results)
482    }
483
484    async fn truncate(&self, key: &str, seqno: SeqNo) -> Result<usize, ExternalError> {
485        static CRDB_TRUNCATE_QUERY: &str = "
486        DELETE FROM consensus
487        WHERE shard = $1 AND sequence_number < $2 AND
488        EXISTS (
489            SELECT * FROM consensus WHERE shard = $1 AND sequence_number >= $2
490        )
491        ";
492
493        /// This query has been specifically tuned to ensure we get the minimal
494        /// number of __row__ locks possible, and that it doesn't conflict with
495        /// concurrently running compare and swap operations that are trying to
496        /// evolve the shard.
497        ///
498        /// It's performance has been benchmarked against Postgres 15.
499        ///
500        /// Note: The `ORDER BY` in the newer_exists CTE exists so we obtain a
501        /// row lock on the lowest possible sequence number. This ensures
502        /// minimal conflict between concurrently running truncate and append
503        /// operations.
504        static POSTGRES_TRUNCATE_QUERY: &str = "
505        WITH newer_exists AS (
506            SELECT * FROM consensus
507            WHERE shard = $1
508                AND sequence_number >= $2
509            ORDER BY sequence_number ASC
510            LIMIT 1
511            FOR UPDATE
512        ),
513        to_lock AS (
514            SELECT ctid FROM consensus
515            WHERE shard = $1
516            AND sequence_number < $2
517            AND EXISTS (SELECT * FROM newer_exists)
518            ORDER BY sequence_number DESC
519            FOR UPDATE
520        )
521        DELETE FROM consensus
522        USING to_lock
523        WHERE consensus.ctid = to_lock.ctid;
524        ";
525
526        let q = if USE_POSTGRES_TUNED_QUERIES.get(&self.dyncfg)
527            && self.mode == PostgresMode::Postgres
528        {
529            POSTGRES_TRUNCATE_QUERY
530        } else {
531            CRDB_TRUNCATE_QUERY
532        };
533        let result = {
534            let client = self.get_connection().await?;
535            let statement = client.prepare_cached(q).await?;
536            client.execute(&statement, &[&key, &seqno]).await?
537        };
538        if result == 0 {
539            // We weren't able to successfully truncate any rows inspect head to
540            // determine whether the request was valid and there were no records in
541            // the provided range, or the request was invalid because it would have
542            // also deleted head.
543
544            // It's safe to call head in a subsequent transaction rather than doing
545            // so directly in the same transaction because, once a given (seqno, data)
546            // pair exists for our shard, we enforce the invariants that
547            // 1. Our shard will always have _some_ data mapped to it.
548            // 2. All operations that modify the (seqno, data) can only increase
549            //    the sequence number.
550            let current = self.head(key).await?;
551            if current.map_or(true, |data| data.seqno < seqno) {
552                return Err(ExternalError::from(anyhow!(
553                    "upper bound too high for truncate: {:?}",
554                    seqno
555                )));
556            }
557        }
558
559        Ok(usize::cast_from(result))
560    }
561}
562
563#[cfg(test)]
564mod tests {
565    use mz_ore::assert_err;
566    use tracing::info;
567    use uuid::Uuid;
568
569    use crate::location::tests::consensus_impl_test;
570
571    use super::*;
572
573    #[mz_ore::test(tokio::test(flavor = "multi_thread"))]
574    #[cfg_attr(miri, ignore)] // error: unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
575    async fn postgres_consensus() -> Result<(), ExternalError> {
576        let config = match PostgresConsensusConfig::new_for_test()? {
577            Some(config) => config,
578            None => {
579                info!(
580                    "{} env not set: skipping test that uses external service",
581                    PostgresConsensusConfig::EXTERNAL_TESTS_POSTGRES_URL
582                );
583                return Ok(());
584            }
585        };
586
587        consensus_impl_test(|| PostgresConsensus::open(config.clone())).await?;
588
589        // and now verify the implementation-specific `drop_and_recreate` works as intended
590        let consensus = PostgresConsensus::open(config.clone()).await?;
591        let key = Uuid::new_v4().to_string();
592        let state = VersionedData {
593            seqno: SeqNo(5),
594            data: Bytes::from("abc"),
595        };
596
597        assert_eq!(
598            consensus.compare_and_set(&key, None, state.clone()).await,
599            Ok(CaSResult::Committed),
600        );
601
602        assert_eq!(consensus.head(&key).await, Ok(Some(state.clone())));
603
604        consensus.drop_and_recreate().await?;
605
606        assert_eq!(consensus.head(&key).await, Ok(None));
607
608        // This should be a separate postgres_consensus_blocking test, but nextest makes it
609        // difficult since we can't specify that both tests touch the consensus table and thus
610        // interfere with each other.
611        let config = match PostgresConsensusConfig::new_for_test()? {
612            Some(config) => config,
613            None => {
614                info!(
615                    "{} env not set: skipping test that uses external service",
616                    PostgresConsensusConfig::EXTERNAL_TESTS_POSTGRES_URL
617                );
618                return Ok(());
619            }
620        };
621
622        let consensus: PostgresConsensus = PostgresConsensus::open(config.clone()).await?;
623        // Max size in test is 2... let's saturate the pool.
624        let _conn1 = consensus.get_connection().await?;
625        let _conn2 = consensus.get_connection().await?;
626
627        // And finally, we should see the next connect time out.
628        let conn3 = consensus.get_connection().await;
629
630        assert_err!(conn3);
631
632        Ok(())
633    }
634}