1use std::fmt::Formatter;
13use std::str::FromStr;
14use std::sync::Arc;
15use std::time::Duration;
16
17use anyhow::anyhow;
18use async_stream::try_stream;
19use async_trait::async_trait;
20use bytes::Bytes;
21use deadpool_postgres::tokio_postgres::Config;
22use deadpool_postgres::tokio_postgres::types::{FromSql, IsNull, ToSql, Type, to_sql_checked};
23use deadpool_postgres::{Object, PoolError};
24use futures_util::StreamExt;
25use mz_dyncfg::ConfigSet;
26use mz_ore::cast::CastFrom;
27use mz_ore::metrics::MetricsRegistry;
28use mz_ore::url::SensitiveUrl;
29use mz_postgres_client::metrics::PostgresClientMetrics;
30use mz_postgres_client::{PostgresClient, PostgresClientConfig, PostgresClientKnobs};
31use postgres_protocol::escape::escape_identifier;
32use tokio_postgres::error::SqlState;
33use tokio_postgres::{Row, Statement};
34use tracing::{info, warn};
35
36use crate::error::Error;
37use crate::location::{CaSResult, Consensus, ExternalError, ResultStream, SeqNo, VersionedData};
38
39pub const USE_POSTGRES_TUNED_QUERIES: mz_dyncfg::Config<bool> = mz_dyncfg::Config::new(
41 "persist_use_postgres_tuned_queries",
42 false,
43 "Use a set of queries for consensus that have specifically been tuned against
44 Postgres to ensure we acquire a minimal number of locks.",
45);
46
47const SCHEMA: &str = "
48CREATE TABLE IF NOT EXISTS consensus (
49 shard text NOT NULL,
50 sequence_number bigint NOT NULL,
51 data bytea NOT NULL,
52 PRIMARY KEY(shard, sequence_number)
53)
54";
55
56const CRDB_SCHEMA_OPTIONS: &str = "WITH (sql_stats_automatic_collection_enabled = false)";
63const CRDB_CONFIGURE_ZONE: &str = "ALTER TABLE consensus CONFIGURE ZONE USING gc.ttlseconds = 600";
71
72async fn pg_batch_execute(client: &Object, query: &str) -> Result<(), tokio_postgres::Error> {
75 #[allow(clippy::disallowed_methods)]
76 client.batch_execute(query).await
77}
78
79async fn pg_query_prepared(
80 client: &Object,
81 statement: &Statement,
82 params: &[&(dyn ToSql + Sync)],
83) -> Result<Vec<Row>, tokio_postgres::Error> {
84 #[allow(clippy::disallowed_methods)]
85 client.query(statement, params).await
86}
87
88async fn pg_query_opt_prepared(
89 client: &Object,
90 statement: &Statement,
91 params: &[&(dyn ToSql + Sync)],
92) -> Result<Option<Row>, tokio_postgres::Error> {
93 #[allow(clippy::disallowed_methods)]
94 client.query_opt(statement, params).await
95}
96
97async fn pg_execute_prepared(
98 client: &Object,
99 statement: &Statement,
100 params: &[&(dyn ToSql + Sync)],
101) -> Result<u64, tokio_postgres::Error> {
102 #[allow(clippy::disallowed_methods)]
103 client.execute(statement, params).await
104}
105
106impl ToSql for SeqNo {
107 fn to_sql(
108 &self,
109 ty: &Type,
110 w: &mut bytes::BytesMut,
111 ) -> Result<IsNull, Box<dyn std::error::Error + Sync + Send>> {
112 let value = i64::try_from(self.0)?;
114 <i64 as ToSql>::to_sql(&value, ty, w)
115 }
116
117 fn accepts(ty: &Type) -> bool {
118 <i64 as ToSql>::accepts(ty)
119 }
120
121 to_sql_checked!();
122}
123
124impl<'a> FromSql<'a> for SeqNo {
125 fn from_sql(
126 ty: &Type,
127 raw: &'a [u8],
128 ) -> Result<SeqNo, Box<dyn std::error::Error + Sync + Send>> {
129 let sequence_number = <i64 as FromSql>::from_sql(ty, raw)?;
130
131 let sequence_number = u64::try_from(sequence_number)?;
134 Ok(SeqNo(sequence_number))
135 }
136
137 fn accepts(ty: &Type) -> bool {
138 <i64 as FromSql>::accepts(ty)
139 }
140}
141
142#[derive(Clone, Debug)]
144pub struct PostgresConsensusConfig {
145 url: SensitiveUrl,
146 knobs: Arc<dyn PostgresClientKnobs>,
147 metrics: PostgresClientMetrics,
148 dyncfg: Arc<ConfigSet>,
149}
150
151impl From<PostgresConsensusConfig> for PostgresClientConfig {
152 fn from(config: PostgresConsensusConfig) -> Self {
153 PostgresClientConfig::new(config.url, config.knobs, config.metrics)
154 }
155}
156
157impl PostgresConsensusConfig {
158 const EXTERNAL_TESTS_POSTGRES_URL: &'static str =
159 "MZ_PERSIST_EXTERNAL_STORAGE_TEST_POSTGRES_URL";
160
161 pub fn new(
163 url: &SensitiveUrl,
164 knobs: Box<dyn PostgresClientKnobs>,
165 metrics: PostgresClientMetrics,
166 dyncfg: Arc<ConfigSet>,
167 ) -> Result<Self, Error> {
168 Ok(PostgresConsensusConfig {
169 url: url.clone(),
170 knobs: Arc::from(knobs),
171 metrics,
172 dyncfg,
173 })
174 }
175
176 pub fn new_for_test() -> Result<Option<Self>, Error> {
186 let url = match std::env::var(Self::EXTERNAL_TESTS_POSTGRES_URL) {
187 Ok(url) => SensitiveUrl::from_str(&url).map_err(|e| e.to_string())?,
188 Err(_) => {
189 if mz_ore::env::is_var_truthy("CI") {
190 panic!("CI is supposed to run this test but something has gone wrong!");
191 }
192 return Ok(None);
193 }
194 };
195
196 struct TestConsensusKnobs;
197 impl std::fmt::Debug for TestConsensusKnobs {
198 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
199 f.debug_struct("TestConsensusKnobs").finish_non_exhaustive()
200 }
201 }
202 impl PostgresClientKnobs for TestConsensusKnobs {
203 fn connection_pool_max_size(&self) -> usize {
204 2
205 }
206
207 fn connection_pool_max_wait(&self) -> Option<Duration> {
208 Some(Duration::from_secs(1))
209 }
210
211 fn connection_pool_ttl(&self) -> Duration {
212 Duration::MAX
213 }
214 fn connection_pool_ttl_stagger(&self) -> Duration {
215 Duration::MAX
216 }
217 fn connect_timeout(&self) -> Duration {
218 Duration::MAX
219 }
220 fn tcp_user_timeout(&self) -> Duration {
221 Duration::ZERO
222 }
223
224 fn keepalives_idle(&self) -> Duration {
225 Duration::from_secs(10)
226 }
227
228 fn keepalives_interval(&self) -> Duration {
229 Duration::from_secs(5)
230 }
231
232 fn keepalives_retries(&self) -> u32 {
233 5
234 }
235 }
236
237 let dyncfg = ConfigSet::default().add(&USE_POSTGRES_TUNED_QUERIES);
238 let config = PostgresConsensusConfig::new(
239 &url,
240 Box::new(TestConsensusKnobs),
241 PostgresClientMetrics::new(&MetricsRegistry::new(), "mz_persist"),
242 Arc::new(dyncfg),
243 )?;
244 Ok(Some(config))
245 }
246}
247
248#[derive(Debug, Clone, Copy, PartialEq, Eq)]
250enum PostgresMode {
251 CockroachDB,
253 Postgres,
255}
256
257pub struct PostgresConsensus {
259 postgres_client: PostgresClient,
260 dyncfg: Arc<ConfigSet>,
261 mode: PostgresMode,
262}
263
264impl std::fmt::Debug for PostgresConsensus {
265 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
266 f.debug_struct("PostgresConsensus").finish_non_exhaustive()
267 }
268}
269
270impl PostgresConsensus {
271 pub async fn open(config: PostgresConsensusConfig) -> Result<Self, ExternalError> {
274 let pg_config: Config = config.url.to_string().parse()?;
276 let role = pg_config.get_user().expect("failed to get PostgreSQL user");
277 let create_schema = format!(
278 "CREATE SCHEMA IF NOT EXISTS consensus AUTHORIZATION {}",
279 escape_identifier(role),
280 );
281
282 let dyncfg = Arc::clone(&config.dyncfg);
283 let postgres_client = PostgresClient::open(config.into())?;
284
285 let client = postgres_client.get_connection().await?;
286
287 let mode = match pg_batch_execute(
288 &client,
289 &format!(
290 "{}; {}{}; {};",
291 create_schema, SCHEMA, CRDB_SCHEMA_OPTIONS, CRDB_CONFIGURE_ZONE,
292 ),
293 )
294 .await
295 {
296 Ok(()) => PostgresMode::CockroachDB,
297 Err(e) if e.code() == Some(&SqlState::INSUFFICIENT_PRIVILEGE) => {
298 warn!(
299 "unable to ALTER TABLE consensus, this is expected and OK when connecting with a read-only user"
300 );
301 PostgresMode::CockroachDB
302 }
303 Err(e)
306 if e.code() == Some(&SqlState::INVALID_PARAMETER_VALUE)
307 || e.code() == Some(&SqlState::SYNTAX_ERROR) =>
308 {
309 info!(
310 "unable to initiate consensus with CRDB params, this is expected and OK when running against Postgres: {:?}",
311 e
312 );
313 PostgresMode::Postgres
314 }
315 Err(e) => return Err(e.into()),
316 };
317
318 if mode != PostgresMode::CockroachDB {
319 pg_batch_execute(&client, &format!("{}; {};", create_schema, SCHEMA)).await?;
320 }
321
322 Ok(PostgresConsensus {
323 postgres_client,
324 dyncfg,
325 mode,
326 })
327 }
328
329 pub async fn drop_and_recreate(&self) -> Result<(), ExternalError> {
333 let client = self.get_connection().await?;
335 pg_batch_execute(&client, "DROP TABLE consensus").await?;
336 let crdb_mode = match pg_batch_execute(
337 &client,
338 &format!("{}{}; {}", SCHEMA, CRDB_SCHEMA_OPTIONS, CRDB_CONFIGURE_ZONE,),
339 )
340 .await
341 {
342 Ok(()) => true,
343 Err(e) if e.code() == Some(&SqlState::INSUFFICIENT_PRIVILEGE) => {
344 warn!(
345 "unable to ALTER TABLE consensus, this is expected and OK when connecting with a read-only user"
346 );
347 true
348 }
349 Err(e)
350 if e.code() == Some(&SqlState::INVALID_PARAMETER_VALUE)
351 || e.code() == Some(&SqlState::SYNTAX_ERROR) =>
352 {
353 info!(
354 "unable to initiate consensus with CRDB params, this is expected and OK when running against Postgres: {:?}",
355 e
356 );
357 false
358 }
359 Err(e) => return Err(e.into()),
360 };
361
362 if !crdb_mode {
363 pg_batch_execute(&client, SCHEMA).await?;
364 }
365 Ok(())
366 }
367
368 async fn get_connection(&self) -> Result<Object, PoolError> {
369 self.postgres_client.get_connection().await
370 }
371}
372
373#[async_trait]
374impl Consensus for PostgresConsensus {
375 fn list_keys(&self) -> ResultStream<'_, String> {
376 let q = "SELECT DISTINCT shard FROM consensus";
377
378 Box::pin(try_stream! {
379 let client = self.get_connection().await?;
382 let statement = client.prepare_cached(q).await?;
383 let params: &[String] = &[];
384 let mut rows = Box::pin(client.query_raw(&statement, params).await?);
385 while let Some(row) = rows.next().await {
386 let shard: String = row?.try_get("shard")?;
387 yield shard;
388 }
389 })
390 }
391
392 async fn head(&self, key: &str) -> Result<Option<VersionedData>, ExternalError> {
393 let q = "SELECT sequence_number, data FROM consensus
394 WHERE shard = $1 ORDER BY sequence_number DESC LIMIT 1";
395 let row = {
396 let client = self.get_connection().await?;
397 let statement = client.prepare_cached(q).await?;
398 pg_query_opt_prepared(&client, &statement, &[&key]).await?
399 };
400 let row = match row {
401 None => return Ok(None),
402 Some(row) => row,
403 };
404
405 let seqno: SeqNo = row.try_get("sequence_number")?;
406
407 let data: Vec<u8> = row.try_get("data")?;
408 Ok(Some(VersionedData {
409 seqno,
410 data: Bytes::from(data),
411 }))
412 }
413
414 async fn compare_and_set(
415 &self,
416 key: &str,
417 new: VersionedData,
418 ) -> Result<CaSResult, ExternalError> {
419 let expected = new.seqno.previous();
420
421 let result = if let Some(expected) = expected {
422 static CRDB_CAS_QUERY: &str = "
429 INSERT INTO consensus (shard, sequence_number, data)
430 SELECT $1, $2, $3
431 WHERE (SELECT sequence_number FROM consensus
432 WHERE shard = $1
433 ORDER BY sequence_number DESC LIMIT 1) = $4;
434 ";
435
436 static POSTGRES_CAS_QUERY: &str = "
441 WITH last_seq AS (
442 SELECT sequence_number FROM consensus
443 WHERE shard = $1
444 ORDER BY sequence_number DESC
445 LIMIT 1
446 FOR UPDATE
447 )
448 INSERT INTO consensus (shard, sequence_number, data)
449 SELECT $1, $2, $3
450 FROM last_seq
451 WHERE last_seq.sequence_number = $4;
452 ";
453
454 let q = if USE_POSTGRES_TUNED_QUERIES.get(&self.dyncfg)
455 && self.mode == PostgresMode::Postgres
456 {
457 POSTGRES_CAS_QUERY
458 } else {
459 CRDB_CAS_QUERY
460 };
461 let client = self.get_connection().await?;
462 let statement = client.prepare_cached(q).await?;
463 pg_execute_prepared(
464 &client,
465 &statement,
466 &[&key, &new.seqno, &new.data.as_ref(), &expected],
467 )
468 .await?
469 } else {
470 let q = "INSERT INTO consensus SELECT $1, $2, $3 WHERE
472 NOT EXISTS (
473 SELECT * FROM consensus WHERE shard = $1
474 )
475 ON CONFLICT DO NOTHING";
476 let client = self.get_connection().await?;
477 let statement = client.prepare_cached(q).await?;
478 pg_execute_prepared(&client, &statement, &[&key, &new.seqno, &new.data.as_ref()])
479 .await?
480 };
481
482 if result == 1 {
483 Ok(CaSResult::Committed)
484 } else {
485 Ok(CaSResult::ExpectationMismatch)
486 }
487 }
488
489 async fn scan(
490 &self,
491 key: &str,
492 from: SeqNo,
493 limit: usize,
494 ) -> Result<Vec<VersionedData>, ExternalError> {
495 let q = "SELECT sequence_number, data FROM consensus
496 WHERE shard = $1 AND sequence_number >= $2
497 ORDER BY sequence_number ASC LIMIT $3";
498 let Ok(limit) = i64::try_from(limit) else {
499 return Err(ExternalError::from(anyhow!(
500 "limit must be [0, i64::MAX]. was: {:?}",
501 limit
502 )));
503 };
504 let rows = {
505 let client = self.get_connection().await?;
506 let statement = client.prepare_cached(q).await?;
507 pg_query_prepared(&client, &statement, &[&key, &from, &limit]).await?
508 };
509 let mut results = Vec::with_capacity(rows.len());
510
511 for row in rows {
512 let seqno: SeqNo = row.try_get("sequence_number")?;
513 let data: Vec<u8> = row.try_get("data")?;
514 results.push(VersionedData {
515 seqno,
516 data: Bytes::from(data),
517 });
518 }
519 Ok(results)
520 }
521
522 async fn truncate(&self, key: &str, seqno: SeqNo) -> Result<Option<usize>, ExternalError> {
523 static CRDB_TRUNCATE_QUERY: &str = "
524 DELETE FROM consensus
525 WHERE shard = $1 AND sequence_number < $2 AND
526 EXISTS (
527 SELECT * FROM consensus WHERE shard = $1 AND sequence_number >= $2
528 )
529 ";
530
531 static POSTGRES_TRUNCATE_QUERY: &str = "
543 WITH newer_exists AS (
544 SELECT * FROM consensus
545 WHERE shard = $1
546 AND sequence_number >= $2
547 ORDER BY sequence_number ASC
548 LIMIT 1
549 FOR UPDATE
550 ),
551 to_lock AS (
552 SELECT ctid FROM consensus
553 WHERE shard = $1
554 AND sequence_number < $2
555 AND EXISTS (SELECT * FROM newer_exists)
556 ORDER BY sequence_number DESC
557 FOR UPDATE
558 )
559 DELETE FROM consensus
560 USING to_lock
561 WHERE consensus.ctid = to_lock.ctid;
562 ";
563
564 let q = if USE_POSTGRES_TUNED_QUERIES.get(&self.dyncfg)
565 && self.mode == PostgresMode::Postgres
566 {
567 POSTGRES_TRUNCATE_QUERY
568 } else {
569 CRDB_TRUNCATE_QUERY
570 };
571 let result = {
572 let client = self.get_connection().await?;
573 let statement = client.prepare_cached(q).await?;
574 pg_execute_prepared(&client, &statement, &[&key, &seqno]).await?
575 };
576 if result == 0 {
577 let current = self.head(key).await?;
589 if current.map_or(true, |data| data.seqno < seqno) {
590 return Err(ExternalError::from(anyhow!(
591 "upper bound too high for truncate: {:?}",
592 seqno
593 )));
594 }
595 }
596
597 Ok(Some(usize::cast_from(result)))
598 }
599}
600
601#[cfg(test)]
602mod tests {
603 use mz_ore::assert_err;
604 use tracing::info;
605 use uuid::Uuid;
606
607 use crate::location::tests::consensus_impl_test;
608
609 use super::*;
610
611 #[mz_ore::test(tokio::test(flavor = "multi_thread"))]
612 #[cfg_attr(miri, ignore)] async fn postgres_consensus() -> Result<(), ExternalError> {
614 let config = match PostgresConsensusConfig::new_for_test()? {
615 Some(config) => config,
616 None => {
617 info!(
618 "{} env not set: skipping test that uses external service",
619 PostgresConsensusConfig::EXTERNAL_TESTS_POSTGRES_URL
620 );
621 return Ok(());
622 }
623 };
624
625 consensus_impl_test(|| PostgresConsensus::open(config.clone())).await?;
626
627 let consensus = PostgresConsensus::open(config.clone()).await?;
629 let key = Uuid::new_v4().to_string();
630 let state = VersionedData {
631 seqno: SeqNo(0),
632 data: Bytes::from("abc"),
633 };
634
635 assert_eq!(
636 consensus.compare_and_set(&key, state.clone()).await,
637 Ok(CaSResult::Committed),
638 );
639
640 assert_eq!(consensus.head(&key).await, Ok(Some(state.clone())));
641
642 consensus.drop_and_recreate().await?;
643
644 assert_eq!(consensus.head(&key).await, Ok(None));
645
646 let config = match PostgresConsensusConfig::new_for_test()? {
650 Some(config) => config,
651 None => {
652 info!(
653 "{} env not set: skipping test that uses external service",
654 PostgresConsensusConfig::EXTERNAL_TESTS_POSTGRES_URL
655 );
656 return Ok(());
657 }
658 };
659
660 let consensus: PostgresConsensus = PostgresConsensus::open(config.clone()).await?;
661 let _conn1 = consensus.get_connection().await?;
663 let _conn2 = consensus.get_connection().await?;
664
665 let conn3 = consensus.get_connection().await;
667
668 assert_err!(conn3);
669
670 Ok(())
671 }
672}