1use std::fmt::Formatter;
13use std::str::FromStr;
14use std::sync::Arc;
15use std::time::Duration;
16
17use anyhow::anyhow;
18use async_stream::try_stream;
19use async_trait::async_trait;
20use bytes::Bytes;
21use deadpool_postgres::tokio_postgres::Config;
22use deadpool_postgres::tokio_postgres::types::{FromSql, IsNull, ToSql, Type, to_sql_checked};
23use deadpool_postgres::{Object, PoolError};
24use futures_util::StreamExt;
25use mz_dyncfg::ConfigSet;
26use mz_ore::cast::CastFrom;
27use mz_ore::metrics::MetricsRegistry;
28use mz_ore::url::SensitiveUrl;
29use mz_postgres_client::metrics::PostgresClientMetrics;
30use mz_postgres_client::{PostgresClient, PostgresClientConfig, PostgresClientKnobs};
31use postgres_protocol::escape::escape_identifier;
32use tokio_postgres::error::SqlState;
33use tokio_postgres::{Row, Statement};
34use tracing::{info, warn};
35
36use crate::error::Error;
37use crate::location::{CaSResult, Consensus, ExternalError, ResultStream, SeqNo, VersionedData};
38
39pub const USE_POSTGRES_TUNED_QUERIES: mz_dyncfg::Config<bool> = mz_dyncfg::Config::new(
41 "persist_use_postgres_tuned_queries",
42 false,
43 "Use a set of queries for consensus that have specifically been tuned against
44 Postgres to ensure we acquire a minimal number of locks.",
45);
46
47const SCHEMA: &str = "
48CREATE TABLE IF NOT EXISTS consensus (
49 shard text NOT NULL,
50 sequence_number bigint NOT NULL,
51 data bytea NOT NULL,
52 PRIMARY KEY(shard, sequence_number)
53)
54";
55
56const CRDB_SCHEMA_OPTIONS: &str = "WITH (sql_stats_automatic_collection_enabled = false)";
63const CRDB_CONFIGURE_ZONE: &str = "ALTER TABLE consensus CONFIGURE ZONE USING gc.ttlseconds = 600";
71
72async fn pg_batch_execute(client: &Object, query: &str) -> Result<(), tokio_postgres::Error> {
75 #[allow(clippy::disallowed_methods)]
76 client.batch_execute(query).await
77}
78
79async fn pg_query_prepared(
80 client: &Object,
81 statement: &Statement,
82 params: &[&(dyn ToSql + Sync)],
83) -> Result<Vec<Row>, tokio_postgres::Error> {
84 #[allow(clippy::disallowed_methods)]
85 client.query(statement, params).await
86}
87
88async fn pg_query_opt_prepared(
89 client: &Object,
90 statement: &Statement,
91 params: &[&(dyn ToSql + Sync)],
92) -> Result<Option<Row>, tokio_postgres::Error> {
93 #[allow(clippy::disallowed_methods)]
94 client.query_opt(statement, params).await
95}
96
97async fn pg_execute_prepared(
98 client: &Object,
99 statement: &Statement,
100 params: &[&(dyn ToSql + Sync)],
101) -> Result<u64, tokio_postgres::Error> {
102 #[allow(clippy::disallowed_methods)]
103 client.execute(statement, params).await
104}
105
106impl ToSql for SeqNo {
107 fn to_sql(
108 &self,
109 ty: &Type,
110 w: &mut bytes::BytesMut,
111 ) -> Result<IsNull, Box<dyn std::error::Error + Sync + Send>> {
112 let value = i64::try_from(self.0)?;
114 <i64 as ToSql>::to_sql(&value, ty, w)
115 }
116
117 fn accepts(ty: &Type) -> bool {
118 <i64 as ToSql>::accepts(ty)
119 }
120
121 to_sql_checked!();
122}
123
124impl<'a> FromSql<'a> for SeqNo {
125 fn from_sql(
126 ty: &Type,
127 raw: &'a [u8],
128 ) -> Result<SeqNo, Box<dyn std::error::Error + Sync + Send>> {
129 let sequence_number = <i64 as FromSql>::from_sql(ty, raw)?;
130
131 let sequence_number = u64::try_from(sequence_number)?;
134 Ok(SeqNo(sequence_number))
135 }
136
137 fn accepts(ty: &Type) -> bool {
138 <i64 as FromSql>::accepts(ty)
139 }
140}
141
142#[derive(Clone, Debug)]
144pub struct PostgresConsensusConfig {
145 url: SensitiveUrl,
146 knobs: Arc<dyn PostgresClientKnobs>,
147 metrics: PostgresClientMetrics,
148 dyncfg: Arc<ConfigSet>,
149}
150
151impl From<PostgresConsensusConfig> for PostgresClientConfig {
152 fn from(config: PostgresConsensusConfig) -> Self {
153 PostgresClientConfig::new(config.url, config.knobs, config.metrics)
154 }
155}
156
157impl PostgresConsensusConfig {
158 const EXTERNAL_TESTS_POSTGRES_URL: &'static str =
159 "MZ_PERSIST_EXTERNAL_STORAGE_TEST_POSTGRES_URL";
160
161 pub fn new(
163 url: &SensitiveUrl,
164 knobs: Box<dyn PostgresClientKnobs>,
165 metrics: PostgresClientMetrics,
166 dyncfg: Arc<ConfigSet>,
167 ) -> Result<Self, Error> {
168 Ok(PostgresConsensusConfig {
169 url: url.clone(),
170 knobs: Arc::from(knobs),
171 metrics,
172 dyncfg,
173 })
174 }
175
176 pub fn new_for_test() -> Result<Option<Self>, Error> {
186 let url = match std::env::var(Self::EXTERNAL_TESTS_POSTGRES_URL) {
187 Ok(url) => SensitiveUrl::from_str(&url).map_err(|e| e.to_string())?,
188 Err(_) => {
189 if mz_ore::env::is_var_truthy("CI") {
190 panic!("CI is supposed to run this test but something has gone wrong!");
191 }
192 return Ok(None);
193 }
194 };
195
196 struct TestConsensusKnobs;
197 impl std::fmt::Debug for TestConsensusKnobs {
198 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
199 f.debug_struct("TestConsensusKnobs").finish_non_exhaustive()
200 }
201 }
202 impl PostgresClientKnobs for TestConsensusKnobs {
203 fn connection_pool_max_size(&self) -> usize {
204 2
205 }
206
207 fn connection_pool_max_wait(&self) -> Option<Duration> {
208 Some(Duration::from_secs(1))
209 }
210
211 fn connection_pool_ttl(&self) -> Duration {
212 Duration::MAX
213 }
214 fn connection_pool_ttl_stagger(&self) -> Duration {
215 Duration::MAX
216 }
217 fn connect_timeout(&self) -> Duration {
218 Duration::MAX
219 }
220 fn tcp_user_timeout(&self) -> Duration {
221 Duration::ZERO
222 }
223
224 fn keepalives_idle(&self) -> Duration {
225 Duration::from_secs(10)
226 }
227
228 fn keepalives_interval(&self) -> Duration {
229 Duration::from_secs(5)
230 }
231
232 fn keepalives_retries(&self) -> u32 {
233 5
234 }
235
236 fn statement_timeout(&self) -> Duration {
237 Duration::ZERO
238 }
239 }
240
241 let dyncfg = ConfigSet::default().add(&USE_POSTGRES_TUNED_QUERIES);
242 let config = PostgresConsensusConfig::new(
243 &url,
244 Box::new(TestConsensusKnobs),
245 PostgresClientMetrics::new(&MetricsRegistry::new(), "mz_persist"),
246 Arc::new(dyncfg),
247 )?;
248 Ok(Some(config))
249 }
250}
251
252#[derive(Debug, Clone, Copy, PartialEq, Eq)]
254enum PostgresMode {
255 CockroachDB,
257 Postgres,
259}
260
261pub struct PostgresConsensus {
263 postgres_client: PostgresClient,
264 dyncfg: Arc<ConfigSet>,
265 mode: PostgresMode,
266}
267
268impl std::fmt::Debug for PostgresConsensus {
269 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
270 f.debug_struct("PostgresConsensus").finish_non_exhaustive()
271 }
272}
273
274impl PostgresConsensus {
275 pub async fn open(config: PostgresConsensusConfig) -> Result<Self, ExternalError> {
278 let pg_config: Config = config.url.to_string().parse()?;
280 let role = pg_config.get_user().expect("failed to get PostgreSQL user");
281 let create_schema = format!(
282 "CREATE SCHEMA IF NOT EXISTS consensus AUTHORIZATION {}",
283 escape_identifier(role),
284 );
285
286 let dyncfg = Arc::clone(&config.dyncfg);
287 let postgres_client = PostgresClient::open(config.into())?;
288
289 let client = postgres_client.get_connection().await?;
290
291 let mode = match pg_batch_execute(
292 &client,
293 &format!(
294 "{}; {}{}; {};",
295 create_schema, SCHEMA, CRDB_SCHEMA_OPTIONS, CRDB_CONFIGURE_ZONE,
296 ),
297 )
298 .await
299 {
300 Ok(()) => PostgresMode::CockroachDB,
301 Err(e) if e.code() == Some(&SqlState::INSUFFICIENT_PRIVILEGE) => {
302 warn!(
303 "unable to ALTER TABLE consensus, this is expected and OK when connecting with a read-only user"
304 );
305 PostgresMode::CockroachDB
306 }
307 Err(e)
310 if e.code() == Some(&SqlState::INVALID_PARAMETER_VALUE)
311 || e.code() == Some(&SqlState::SYNTAX_ERROR) =>
312 {
313 info!(
314 "unable to initiate consensus with CRDB params, this is expected and OK when running against Postgres: {:?}",
315 e
316 );
317 PostgresMode::Postgres
318 }
319 Err(e) => return Err(e.into()),
320 };
321
322 if mode != PostgresMode::CockroachDB {
323 pg_batch_execute(&client, &format!("{}; {};", create_schema, SCHEMA)).await?;
324 }
325
326 Ok(PostgresConsensus {
327 postgres_client,
328 dyncfg,
329 mode,
330 })
331 }
332
333 pub async fn drop_and_recreate(&self) -> Result<(), ExternalError> {
337 let client = self.get_connection().await?;
339 pg_batch_execute(&client, "DROP TABLE consensus").await?;
340 let crdb_mode = match pg_batch_execute(
341 &client,
342 &format!("{}{}; {}", SCHEMA, CRDB_SCHEMA_OPTIONS, CRDB_CONFIGURE_ZONE,),
343 )
344 .await
345 {
346 Ok(()) => true,
347 Err(e) if e.code() == Some(&SqlState::INSUFFICIENT_PRIVILEGE) => {
348 warn!(
349 "unable to ALTER TABLE consensus, this is expected and OK when connecting with a read-only user"
350 );
351 true
352 }
353 Err(e)
354 if e.code() == Some(&SqlState::INVALID_PARAMETER_VALUE)
355 || e.code() == Some(&SqlState::SYNTAX_ERROR) =>
356 {
357 info!(
358 "unable to initiate consensus with CRDB params, this is expected and OK when running against Postgres: {:?}",
359 e
360 );
361 false
362 }
363 Err(e) => return Err(e.into()),
364 };
365
366 if !crdb_mode {
367 pg_batch_execute(&client, SCHEMA).await?;
368 }
369 Ok(())
370 }
371
372 async fn get_connection(&self) -> Result<Object, PoolError> {
373 self.postgres_client.get_connection().await
374 }
375}
376
377#[async_trait]
378impl Consensus for PostgresConsensus {
379 fn list_keys(&self) -> ResultStream<'_, String> {
380 let q = "SELECT DISTINCT shard FROM consensus";
381
382 Box::pin(try_stream! {
383 let client = self.get_connection().await?;
386 let statement = client.prepare_cached(q).await?;
387 let params: &[String] = &[];
388 let mut rows = Box::pin(client.query_raw(&statement, params).await?);
389 while let Some(row) = rows.next().await {
390 let shard: String = row?.try_get("shard")?;
391 yield shard;
392 }
393 })
394 }
395
396 async fn head(&self, key: &str) -> Result<Option<VersionedData>, ExternalError> {
397 let q = "SELECT sequence_number, data FROM consensus
398 WHERE shard = $1 ORDER BY sequence_number DESC LIMIT 1";
399 let row = {
400 let client = self.get_connection().await?;
401 let statement = client.prepare_cached(q).await?;
402 pg_query_opt_prepared(&client, &statement, &[&key]).await?
403 };
404 let row = match row {
405 None => return Ok(None),
406 Some(row) => row,
407 };
408
409 let seqno: SeqNo = row.try_get("sequence_number")?;
410
411 let data: Vec<u8> = row.try_get("data")?;
412 Ok(Some(VersionedData {
413 seqno,
414 data: Bytes::from(data),
415 }))
416 }
417
418 async fn compare_and_set(
419 &self,
420 key: &str,
421 new: VersionedData,
422 ) -> Result<CaSResult, ExternalError> {
423 let expected = new.seqno.previous();
424
425 let result = if let Some(expected) = expected {
426 static CRDB_CAS_QUERY: &str = "
433 INSERT INTO consensus (shard, sequence_number, data)
434 SELECT $1, $2, $3
435 WHERE (SELECT sequence_number FROM consensus
436 WHERE shard = $1
437 ORDER BY sequence_number DESC LIMIT 1) = $4;
438 ";
439
440 static POSTGRES_CAS_QUERY: &str = "
445 WITH last_seq AS (
446 SELECT sequence_number FROM consensus
447 WHERE shard = $1
448 ORDER BY sequence_number DESC
449 LIMIT 1
450 FOR UPDATE
451 )
452 INSERT INTO consensus (shard, sequence_number, data)
453 SELECT $1, $2, $3
454 FROM last_seq
455 WHERE last_seq.sequence_number = $4;
456 ";
457
458 let q = if USE_POSTGRES_TUNED_QUERIES.get(&self.dyncfg)
459 && self.mode == PostgresMode::Postgres
460 {
461 POSTGRES_CAS_QUERY
462 } else {
463 CRDB_CAS_QUERY
464 };
465 let client = self.get_connection().await?;
466 let statement = client.prepare_cached(q).await?;
467 pg_execute_prepared(
468 &client,
469 &statement,
470 &[&key, &new.seqno, &new.data.as_ref(), &expected],
471 )
472 .await?
473 } else {
474 let q = "INSERT INTO consensus SELECT $1, $2, $3 WHERE
476 NOT EXISTS (
477 SELECT * FROM consensus WHERE shard = $1
478 )
479 ON CONFLICT DO NOTHING";
480 let client = self.get_connection().await?;
481 let statement = client.prepare_cached(q).await?;
482 pg_execute_prepared(&client, &statement, &[&key, &new.seqno, &new.data.as_ref()])
483 .await?
484 };
485
486 if result == 1 {
487 Ok(CaSResult::Committed)
488 } else {
489 Ok(CaSResult::ExpectationMismatch)
490 }
491 }
492
493 async fn scan(
494 &self,
495 key: &str,
496 from: SeqNo,
497 limit: usize,
498 ) -> Result<Vec<VersionedData>, ExternalError> {
499 let q = "SELECT sequence_number, data FROM consensus
500 WHERE shard = $1 AND sequence_number >= $2
501 ORDER BY sequence_number ASC LIMIT $3";
502 let Ok(limit) = i64::try_from(limit) else {
503 return Err(ExternalError::from(anyhow!(
504 "limit must be [0, i64::MAX]. was: {:?}",
505 limit
506 )));
507 };
508 let rows = {
509 let client = self.get_connection().await?;
510 let statement = client.prepare_cached(q).await?;
511 pg_query_prepared(&client, &statement, &[&key, &from, &limit]).await?
512 };
513 let mut results = Vec::with_capacity(rows.len());
514
515 for row in rows {
516 let seqno: SeqNo = row.try_get("sequence_number")?;
517 let data: Vec<u8> = row.try_get("data")?;
518 results.push(VersionedData {
519 seqno,
520 data: Bytes::from(data),
521 });
522 }
523 Ok(results)
524 }
525
526 async fn truncate(&self, key: &str, seqno: SeqNo) -> Result<Option<usize>, ExternalError> {
527 static CRDB_TRUNCATE_QUERY: &str = "
528 DELETE FROM consensus
529 WHERE shard = $1 AND sequence_number < $2 AND
530 EXISTS (
531 SELECT * FROM consensus WHERE shard = $1 AND sequence_number >= $2
532 )
533 ";
534
535 static POSTGRES_TRUNCATE_QUERY: &str = "
547 WITH newer_exists AS (
548 SELECT * FROM consensus
549 WHERE shard = $1
550 AND sequence_number >= $2
551 ORDER BY sequence_number ASC
552 LIMIT 1
553 FOR UPDATE
554 ),
555 to_lock AS (
556 SELECT ctid FROM consensus
557 WHERE shard = $1
558 AND sequence_number < $2
559 AND EXISTS (SELECT * FROM newer_exists)
560 ORDER BY sequence_number DESC
561 FOR UPDATE
562 )
563 DELETE FROM consensus
564 USING to_lock
565 WHERE consensus.ctid = to_lock.ctid;
566 ";
567
568 let q = if USE_POSTGRES_TUNED_QUERIES.get(&self.dyncfg)
569 && self.mode == PostgresMode::Postgres
570 {
571 POSTGRES_TRUNCATE_QUERY
572 } else {
573 CRDB_TRUNCATE_QUERY
574 };
575 let result = {
576 let client = self.get_connection().await?;
577 let statement = client.prepare_cached(q).await?;
578 pg_execute_prepared(&client, &statement, &[&key, &seqno]).await?
579 };
580 if result == 0 {
581 let current = self.head(key).await?;
593 if current.map_or(true, |data| data.seqno < seqno) {
594 return Err(ExternalError::from(anyhow!(
595 "upper bound too high for truncate: {:?}",
596 seqno
597 )));
598 }
599 }
600
601 Ok(Some(usize::cast_from(result)))
602 }
603}
604
605#[cfg(test)]
606mod tests {
607 use mz_ore::assert_err;
608 use tracing::info;
609 use uuid::Uuid;
610
611 use crate::location::tests::consensus_impl_test;
612
613 use super::*;
614
615 #[mz_ore::test(tokio::test(flavor = "multi_thread"))]
616 #[cfg_attr(miri, ignore)] async fn postgres_consensus() -> Result<(), ExternalError> {
618 let config = match PostgresConsensusConfig::new_for_test()? {
619 Some(config) => config,
620 None => {
621 info!(
622 "{} env not set: skipping test that uses external service",
623 PostgresConsensusConfig::EXTERNAL_TESTS_POSTGRES_URL
624 );
625 return Ok(());
626 }
627 };
628
629 consensus_impl_test(|| PostgresConsensus::open(config.clone())).await?;
630
631 let consensus = PostgresConsensus::open(config.clone()).await?;
633 let key = Uuid::new_v4().to_string();
634 let state = VersionedData {
635 seqno: SeqNo(0),
636 data: Bytes::from("abc"),
637 };
638
639 assert_eq!(
640 consensus.compare_and_set(&key, state.clone()).await,
641 Ok(CaSResult::Committed),
642 );
643
644 assert_eq!(consensus.head(&key).await, Ok(Some(state.clone())));
645
646 consensus.drop_and_recreate().await?;
647
648 assert_eq!(consensus.head(&key).await, Ok(None));
649
650 let config = match PostgresConsensusConfig::new_for_test()? {
654 Some(config) => config,
655 None => {
656 info!(
657 "{} env not set: skipping test that uses external service",
658 PostgresConsensusConfig::EXTERNAL_TESTS_POSTGRES_URL
659 );
660 return Ok(());
661 }
662 };
663
664 let consensus: PostgresConsensus = PostgresConsensus::open(config.clone()).await?;
665 let _conn1 = consensus.get_connection().await?;
667 let _conn2 = consensus.get_connection().await?;
668
669 let conn3 = consensus.get_connection().await;
671
672 assert_err!(conn3);
673
674 Ok(())
675 }
676}