1use std::fmt::Formatter;
13use std::str::FromStr;
14use std::sync::Arc;
15use std::time::Duration;
16
17use anyhow::anyhow;
18use async_stream::try_stream;
19use async_trait::async_trait;
20use bytes::Bytes;
21use deadpool_postgres::tokio_postgres::Config;
22use deadpool_postgres::tokio_postgres::types::{FromSql, IsNull, ToSql, Type, to_sql_checked};
23use deadpool_postgres::{Object, PoolError};
24use futures_util::StreamExt;
25use mz_dyncfg::ConfigSet;
26use mz_ore::cast::CastFrom;
27use mz_ore::metrics::MetricsRegistry;
28use mz_ore::url::SensitiveUrl;
29use mz_postgres_client::metrics::PostgresClientMetrics;
30use mz_postgres_client::{PostgresClient, PostgresClientConfig, PostgresClientKnobs};
31use postgres_protocol::escape::escape_identifier;
32use tokio_postgres::error::SqlState;
33use tracing::{info, warn};
34
35use crate::error::Error;
36use crate::location::{CaSResult, Consensus, ExternalError, ResultStream, SeqNo, VersionedData};
37
38pub const USE_POSTGRES_TUNED_QUERIES: mz_dyncfg::Config<bool> = mz_dyncfg::Config::new(
40 "persist_use_postgres_tuned_queries",
41 false,
42 "Use a set of queries for consensus that have specifically been tuned against
43 Postgres to ensure we acquire a minimal number of locks.",
44);
45
46const SCHEMA: &str = "
47CREATE TABLE IF NOT EXISTS consensus (
48 shard text NOT NULL,
49 sequence_number bigint NOT NULL,
50 data bytea NOT NULL,
51 PRIMARY KEY(shard, sequence_number)
52)
53";
54
55const CRDB_SCHEMA_OPTIONS: &str = "WITH (sql_stats_automatic_collection_enabled = false)";
62const CRDB_CONFIGURE_ZONE: &str = "ALTER TABLE consensus CONFIGURE ZONE USING gc.ttlseconds = 600";
70
71impl ToSql for SeqNo {
72 fn to_sql(
73 &self,
74 ty: &Type,
75 w: &mut bytes::BytesMut,
76 ) -> Result<IsNull, Box<dyn std::error::Error + Sync + Send>> {
77 let value = i64::try_from(self.0)?;
79 <i64 as ToSql>::to_sql(&value, ty, w)
80 }
81
82 fn accepts(ty: &Type) -> bool {
83 <i64 as ToSql>::accepts(ty)
84 }
85
86 to_sql_checked!();
87}
88
89impl<'a> FromSql<'a> for SeqNo {
90 fn from_sql(
91 ty: &Type,
92 raw: &'a [u8],
93 ) -> Result<SeqNo, Box<dyn std::error::Error + Sync + Send>> {
94 let sequence_number = <i64 as FromSql>::from_sql(ty, raw)?;
95
96 let sequence_number = u64::try_from(sequence_number)?;
99 Ok(SeqNo(sequence_number))
100 }
101
102 fn accepts(ty: &Type) -> bool {
103 <i64 as FromSql>::accepts(ty)
104 }
105}
106
107#[derive(Clone, Debug)]
109pub struct PostgresConsensusConfig {
110 url: SensitiveUrl,
111 knobs: Arc<dyn PostgresClientKnobs>,
112 metrics: PostgresClientMetrics,
113 dyncfg: Arc<ConfigSet>,
114}
115
116impl From<PostgresConsensusConfig> for PostgresClientConfig {
117 fn from(config: PostgresConsensusConfig) -> Self {
118 PostgresClientConfig::new(config.url, config.knobs, config.metrics)
119 }
120}
121
122impl PostgresConsensusConfig {
123 const EXTERNAL_TESTS_POSTGRES_URL: &'static str =
124 "MZ_PERSIST_EXTERNAL_STORAGE_TEST_POSTGRES_URL";
125
126 pub fn new(
128 url: &SensitiveUrl,
129 knobs: Box<dyn PostgresClientKnobs>,
130 metrics: PostgresClientMetrics,
131 dyncfg: Arc<ConfigSet>,
132 ) -> Result<Self, Error> {
133 Ok(PostgresConsensusConfig {
134 url: url.clone(),
135 knobs: Arc::from(knobs),
136 metrics,
137 dyncfg,
138 })
139 }
140
141 pub fn new_for_test() -> Result<Option<Self>, Error> {
151 let url = match std::env::var(Self::EXTERNAL_TESTS_POSTGRES_URL) {
152 Ok(url) => SensitiveUrl::from_str(&url).map_err(|e| e.to_string())?,
153 Err(_) => {
154 if mz_ore::env::is_var_truthy("CI") {
155 panic!("CI is supposed to run this test but something has gone wrong!");
156 }
157 return Ok(None);
158 }
159 };
160
161 struct TestConsensusKnobs;
162 impl std::fmt::Debug for TestConsensusKnobs {
163 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
164 f.debug_struct("TestConsensusKnobs").finish_non_exhaustive()
165 }
166 }
167 impl PostgresClientKnobs for TestConsensusKnobs {
168 fn connection_pool_max_size(&self) -> usize {
169 2
170 }
171
172 fn connection_pool_max_wait(&self) -> Option<Duration> {
173 Some(Duration::from_secs(1))
174 }
175
176 fn connection_pool_ttl(&self) -> Duration {
177 Duration::MAX
178 }
179 fn connection_pool_ttl_stagger(&self) -> Duration {
180 Duration::MAX
181 }
182 fn connect_timeout(&self) -> Duration {
183 Duration::MAX
184 }
185 fn tcp_user_timeout(&self) -> Duration {
186 Duration::ZERO
187 }
188 }
189
190 let dyncfg = ConfigSet::default().add(&USE_POSTGRES_TUNED_QUERIES);
191 let config = PostgresConsensusConfig::new(
192 &url,
193 Box::new(TestConsensusKnobs),
194 PostgresClientMetrics::new(&MetricsRegistry::new(), "mz_persist"),
195 Arc::new(dyncfg),
196 )?;
197 Ok(Some(config))
198 }
199}
200
201#[derive(Debug, Clone, Copy, PartialEq, Eq)]
203enum PostgresMode {
204 CockroachDB,
206 Postgres,
208}
209
210pub struct PostgresConsensus {
212 postgres_client: PostgresClient,
213 dyncfg: Arc<ConfigSet>,
214 mode: PostgresMode,
215}
216
217impl std::fmt::Debug for PostgresConsensus {
218 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
219 f.debug_struct("PostgresConsensus").finish_non_exhaustive()
220 }
221}
222
223impl PostgresConsensus {
224 pub async fn open(config: PostgresConsensusConfig) -> Result<Self, ExternalError> {
227 let pg_config: Config = config.url.to_string().parse()?;
229 let role = pg_config.get_user().unwrap();
230 let create_schema = format!(
231 "CREATE SCHEMA IF NOT EXISTS consensus AUTHORIZATION {}",
232 escape_identifier(role),
233 );
234
235 let dyncfg = Arc::clone(&config.dyncfg);
236 let postgres_client = PostgresClient::open(config.into())?;
237
238 let client = postgres_client.get_connection().await?;
239
240 let mode = match client
241 .batch_execute(&format!(
242 "{}; {}{}; {};",
243 create_schema, SCHEMA, CRDB_SCHEMA_OPTIONS, CRDB_CONFIGURE_ZONE,
244 ))
245 .await
246 {
247 Ok(()) => PostgresMode::CockroachDB,
248 Err(e) if e.code() == Some(&SqlState::INSUFFICIENT_PRIVILEGE) => {
249 warn!(
250 "unable to ALTER TABLE consensus, this is expected and OK when connecting with a read-only user"
251 );
252 PostgresMode::CockroachDB
253 }
254 Err(e)
257 if e.code() == Some(&SqlState::INVALID_PARAMETER_VALUE)
258 || e.code() == Some(&SqlState::SYNTAX_ERROR) =>
259 {
260 info!(
261 "unable to initiate consensus with CRDB params, this is expected and OK when running against Postgres: {:?}",
262 e
263 );
264 PostgresMode::Postgres
265 }
266 Err(e) => return Err(e.into()),
267 };
268
269 if mode != PostgresMode::CockroachDB {
270 client
271 .batch_execute(&format!("{}; {};", create_schema, SCHEMA))
272 .await?;
273 }
274
275 Ok(PostgresConsensus {
276 postgres_client,
277 dyncfg,
278 mode,
279 })
280 }
281
282 pub async fn drop_and_recreate(&self) -> Result<(), ExternalError> {
286 let client = self.get_connection().await?;
288 client.execute("DROP TABLE consensus", &[]).await?;
289 let crdb_mode = match client
290 .batch_execute(&format!(
291 "{}{}; {}",
292 SCHEMA, CRDB_SCHEMA_OPTIONS, CRDB_CONFIGURE_ZONE,
293 ))
294 .await
295 {
296 Ok(()) => true,
297 Err(e) if e.code() == Some(&SqlState::INSUFFICIENT_PRIVILEGE) => {
298 warn!(
299 "unable to ALTER TABLE consensus, this is expected and OK when connecting with a read-only user"
300 );
301 true
302 }
303 Err(e)
304 if e.code() == Some(&SqlState::INVALID_PARAMETER_VALUE)
305 || e.code() == Some(&SqlState::SYNTAX_ERROR) =>
306 {
307 info!(
308 "unable to initiate consensus with CRDB params, this is expected and OK when running against Postgres: {:?}",
309 e
310 );
311 false
312 }
313 Err(e) => return Err(e.into()),
314 };
315
316 if !crdb_mode {
317 client.execute(SCHEMA, &[]).await?;
318 }
319 Ok(())
320 }
321
322 async fn get_connection(&self) -> Result<Object, PoolError> {
323 self.postgres_client.get_connection().await
324 }
325}
326
327#[async_trait]
328impl Consensus for PostgresConsensus {
329 fn list_keys(&self) -> ResultStream<String> {
330 let q = "SELECT DISTINCT shard FROM consensus";
331
332 Box::pin(try_stream! {
333 let client = self.get_connection().await?;
336 let statement = client.prepare_cached(q).await?;
337 let params: &[String] = &[];
338 let mut rows = Box::pin(client.query_raw(&statement, params).await?);
339 while let Some(row) = rows.next().await {
340 let shard: String = row?.try_get("shard")?;
341 yield shard;
342 }
343 })
344 }
345
346 async fn head(&self, key: &str) -> Result<Option<VersionedData>, ExternalError> {
347 let q = "SELECT sequence_number, data FROM consensus
348 WHERE shard = $1 ORDER BY sequence_number DESC LIMIT 1";
349 let row = {
350 let client = self.get_connection().await?;
351 let statement = client.prepare_cached(q).await?;
352 client.query_opt(&statement, &[&key]).await?
353 };
354 let row = match row {
355 None => return Ok(None),
356 Some(row) => row,
357 };
358
359 let seqno: SeqNo = row.try_get("sequence_number")?;
360
361 let data: Vec<u8> = row.try_get("data")?;
362 Ok(Some(VersionedData {
363 seqno,
364 data: Bytes::from(data),
365 }))
366 }
367
368 async fn compare_and_set(
369 &self,
370 key: &str,
371 expected: Option<SeqNo>,
372 new: VersionedData,
373 ) -> Result<CaSResult, ExternalError> {
374 if let Some(expected) = expected {
375 if new.seqno <= expected {
376 return Err(Error::from(
377 format!("new seqno must be strictly greater than expected. Got new: {:?} expected: {:?}",
378 new.seqno, expected)).into());
379 }
380 }
381
382 let result = if let Some(expected) = expected {
383 static CRDB_CAS_QUERY: &str = "
390 INSERT INTO consensus (shard, sequence_number, data)
391 SELECT $1, $2, $3
392 WHERE (SELECT sequence_number FROM consensus
393 WHERE shard = $1
394 ORDER BY sequence_number DESC LIMIT 1) = $4;
395 ";
396
397 static POSTGRES_CAS_QUERY: &str = "
402 WITH last_seq AS (
403 SELECT sequence_number FROM consensus
404 WHERE shard = $1
405 ORDER BY sequence_number DESC
406 LIMIT 1
407 FOR UPDATE
408 )
409 INSERT INTO consensus (shard, sequence_number, data)
410 SELECT $1, $2, $3
411 FROM last_seq
412 WHERE last_seq.sequence_number = $4;
413 ";
414
415 let q = if USE_POSTGRES_TUNED_QUERIES.get(&self.dyncfg)
416 && self.mode == PostgresMode::Postgres
417 {
418 POSTGRES_CAS_QUERY
419 } else {
420 CRDB_CAS_QUERY
421 };
422 let client = self.get_connection().await?;
423 let statement = client.prepare_cached(q).await?;
424 client
425 .execute(
426 &statement,
427 &[&key, &new.seqno, &new.data.as_ref(), &expected],
428 )
429 .await?
430 } else {
431 let q = "INSERT INTO consensus SELECT $1, $2, $3 WHERE
433 NOT EXISTS (
434 SELECT * FROM consensus WHERE shard = $1
435 )
436 ON CONFLICT DO NOTHING";
437 let client = self.get_connection().await?;
438 let statement = client.prepare_cached(q).await?;
439 client
440 .execute(&statement, &[&key, &new.seqno, &new.data.as_ref()])
441 .await?
442 };
443
444 if result == 1 {
445 Ok(CaSResult::Committed)
446 } else {
447 Ok(CaSResult::ExpectationMismatch)
448 }
449 }
450
451 async fn scan(
452 &self,
453 key: &str,
454 from: SeqNo,
455 limit: usize,
456 ) -> Result<Vec<VersionedData>, ExternalError> {
457 let q = "SELECT sequence_number, data FROM consensus
458 WHERE shard = $1 AND sequence_number >= $2
459 ORDER BY sequence_number ASC LIMIT $3";
460 let Ok(limit) = i64::try_from(limit) else {
461 return Err(ExternalError::from(anyhow!(
462 "limit must be [0, i64::MAX]. was: {:?}",
463 limit
464 )));
465 };
466 let rows = {
467 let client = self.get_connection().await?;
468 let statement = client.prepare_cached(q).await?;
469 client.query(&statement, &[&key, &from, &limit]).await?
470 };
471 let mut results = Vec::with_capacity(rows.len());
472
473 for row in rows {
474 let seqno: SeqNo = row.try_get("sequence_number")?;
475 let data: Vec<u8> = row.try_get("data")?;
476 results.push(VersionedData {
477 seqno,
478 data: Bytes::from(data),
479 });
480 }
481 Ok(results)
482 }
483
484 async fn truncate(&self, key: &str, seqno: SeqNo) -> Result<usize, ExternalError> {
485 static CRDB_TRUNCATE_QUERY: &str = "
486 DELETE FROM consensus
487 WHERE shard = $1 AND sequence_number < $2 AND
488 EXISTS (
489 SELECT * FROM consensus WHERE shard = $1 AND sequence_number >= $2
490 )
491 ";
492
493 static POSTGRES_TRUNCATE_QUERY: &str = "
505 WITH newer_exists AS (
506 SELECT * FROM consensus
507 WHERE shard = $1
508 AND sequence_number >= $2
509 ORDER BY sequence_number ASC
510 LIMIT 1
511 FOR UPDATE
512 ),
513 to_lock AS (
514 SELECT ctid FROM consensus
515 WHERE shard = $1
516 AND sequence_number < $2
517 AND EXISTS (SELECT * FROM newer_exists)
518 ORDER BY sequence_number DESC
519 FOR UPDATE
520 )
521 DELETE FROM consensus
522 USING to_lock
523 WHERE consensus.ctid = to_lock.ctid;
524 ";
525
526 let q = if USE_POSTGRES_TUNED_QUERIES.get(&self.dyncfg)
527 && self.mode == PostgresMode::Postgres
528 {
529 POSTGRES_TRUNCATE_QUERY
530 } else {
531 CRDB_TRUNCATE_QUERY
532 };
533 let result = {
534 let client = self.get_connection().await?;
535 let statement = client.prepare_cached(q).await?;
536 client.execute(&statement, &[&key, &seqno]).await?
537 };
538 if result == 0 {
539 let current = self.head(key).await?;
551 if current.map_or(true, |data| data.seqno < seqno) {
552 return Err(ExternalError::from(anyhow!(
553 "upper bound too high for truncate: {:?}",
554 seqno
555 )));
556 }
557 }
558
559 Ok(usize::cast_from(result))
560 }
561}
562
563#[cfg(test)]
564mod tests {
565 use mz_ore::assert_err;
566 use tracing::info;
567 use uuid::Uuid;
568
569 use crate::location::tests::consensus_impl_test;
570
571 use super::*;
572
573 #[mz_ore::test(tokio::test(flavor = "multi_thread"))]
574 #[cfg_attr(miri, ignore)] async fn postgres_consensus() -> Result<(), ExternalError> {
576 let config = match PostgresConsensusConfig::new_for_test()? {
577 Some(config) => config,
578 None => {
579 info!(
580 "{} env not set: skipping test that uses external service",
581 PostgresConsensusConfig::EXTERNAL_TESTS_POSTGRES_URL
582 );
583 return Ok(());
584 }
585 };
586
587 consensus_impl_test(|| PostgresConsensus::open(config.clone())).await?;
588
589 let consensus = PostgresConsensus::open(config.clone()).await?;
591 let key = Uuid::new_v4().to_string();
592 let state = VersionedData {
593 seqno: SeqNo(5),
594 data: Bytes::from("abc"),
595 };
596
597 assert_eq!(
598 consensus.compare_and_set(&key, None, state.clone()).await,
599 Ok(CaSResult::Committed),
600 );
601
602 assert_eq!(consensus.head(&key).await, Ok(Some(state.clone())));
603
604 consensus.drop_and_recreate().await?;
605
606 assert_eq!(consensus.head(&key).await, Ok(None));
607
608 let config = match PostgresConsensusConfig::new_for_test()? {
612 Some(config) => config,
613 None => {
614 info!(
615 "{} env not set: skipping test that uses external service",
616 PostgresConsensusConfig::EXTERNAL_TESTS_POSTGRES_URL
617 );
618 return Ok(());
619 }
620 };
621
622 let consensus: PostgresConsensus = PostgresConsensus::open(config.clone()).await?;
623 let _conn1 = consensus.get_connection().await?;
625 let _conn2 = consensus.get_connection().await?;
626
627 let conn3 = consensus.get_connection().await;
629
630 assert_err!(conn3);
631
632 Ok(())
633 }
634}