1use std::str::FromStr;
15use std::sync::atomic::{AtomicU32, AtomicUsize, Ordering};
16use std::sync::{Arc, RwLock};
17use std::time::{Duration, SystemTime};
18
19use async_trait::async_trait;
20use deadpool_postgres::tokio_postgres::Config;
21use deadpool_postgres::tokio_postgres::error::SqlState;
22use deadpool_postgres::tokio_postgres::types::ToSql;
23use deadpool_postgres::tokio_postgres::{Row, Statement};
24use deadpool_postgres::{Object, PoolError};
25use dec::Decimal;
26use mz_adapter_types::timestamp_oracle::{
27 DEFAULT_PG_TIMESTAMP_ORACLE_CONNECT_TIMEOUT, DEFAULT_PG_TIMESTAMP_ORACLE_CONNPOOL_MAX_SIZE,
28 DEFAULT_PG_TIMESTAMP_ORACLE_CONNPOOL_MAX_WAIT, DEFAULT_PG_TIMESTAMP_ORACLE_CONNPOOL_TTL,
29 DEFAULT_PG_TIMESTAMP_ORACLE_CONNPOOL_TTL_STAGGER, DEFAULT_PG_TIMESTAMP_ORACLE_KEEPALIVES_IDLE,
30 DEFAULT_PG_TIMESTAMP_ORACLE_KEEPALIVES_INTERVAL,
31 DEFAULT_PG_TIMESTAMP_ORACLE_KEEPALIVES_RETRIES, DEFAULT_PG_TIMESTAMP_ORACLE_STATEMENT_TIMEOUT,
32 DEFAULT_PG_TIMESTAMP_ORACLE_TCP_USER_TIMEOUT,
33};
34use mz_ore::error::ErrorExt;
35use mz_ore::instrument;
36use mz_ore::metrics::MetricsRegistry;
37use mz_ore::url::SensitiveUrl;
38use mz_pgrepr::Numeric;
39use mz_postgres_client::{PostgresClient, PostgresClientConfig, PostgresClientKnobs};
40use mz_repr::Timestamp;
41use postgres_protocol::escape::escape_identifier;
42use serde::{Deserialize, Serialize};
43use tracing::{debug, info};
44
45use crate::WriteTimestamp;
46use crate::metrics::{Metrics, RetryMetrics};
47use crate::retry::Retry;
48use crate::{GenericNowFn, TimestampOracle};
49
50const SCHEMA: &str = "
54CREATE TABLE IF NOT EXISTS timestamp_oracle (
55 timeline text NOT NULL,
56 read_ts DECIMAL(20,0) NOT NULL,
57 write_ts DECIMAL(20,0) NOT NULL,
58 PRIMARY KEY(timeline)
59)
60";
61
62const CRDB_SCHEMA_OPTIONS: &str = "WITH (sql_stats_automatic_collection_enabled = false)";
69const CRDB_CONFIGURE_ZONE: &str =
78 "ALTER TABLE timestamp_oracle CONFIGURE ZONE USING gc.ttlseconds = 600;";
79
80async fn pg_batch_execute(
83 client: &Object,
84 query: &str,
85) -> Result<(), deadpool_postgres::tokio_postgres::Error> {
86 #[allow(clippy::disallowed_methods)]
87 client.batch_execute(query).await
88}
89
90async fn pg_query_one_prepared(
91 client: &Object,
92 statement: &Statement,
93 params: &[&(dyn ToSql + Sync)],
94) -> Result<Row, deadpool_postgres::tokio_postgres::Error> {
95 #[allow(clippy::disallowed_methods)]
96 client.query_one(statement, params).await
97}
98
99async fn pg_execute_prepared(
100 client: &Object,
101 statement: &Statement,
102 params: &[&(dyn ToSql + Sync)],
103) -> Result<u64, deadpool_postgres::tokio_postgres::Error> {
104 #[allow(clippy::disallowed_methods)]
105 client.execute(statement, params).await
106}
107
108async fn pg_txn_query_prepared(
109 txn: &deadpool_postgres::Transaction<'_>,
110 statement: &Statement,
111 params: &[&(dyn ToSql + Sync)],
112) -> Result<Vec<Row>, deadpool_postgres::tokio_postgres::Error> {
113 #[allow(clippy::disallowed_methods)]
114 txn.query(statement, params).await
115}
116
117async fn pg_txn_query_one_prepared(
118 txn: &deadpool_postgres::Transaction<'_>,
119 statement: &Statement,
120 params: &[&(dyn ToSql + Sync)],
121) -> Result<Row, deadpool_postgres::tokio_postgres::Error> {
122 #[allow(clippy::disallowed_methods)]
123 txn.query_one(statement, params).await
124}
125
126#[derive(Debug)]
128pub struct PostgresTimestampOracle<N>
129where
130 N: GenericNowFn<Timestamp>,
131{
132 timeline: String,
133 next: N,
134 postgres_client: Arc<PostgresClient>,
135 metrics: Arc<Metrics>,
136 read_only: bool,
139}
140
141#[derive(Clone, Debug)]
144pub struct PostgresTimestampOracleConfig {
145 url: SensitiveUrl,
146 metrics: Arc<Metrics>,
147
148 pub dynamic: Arc<DynamicConfig>,
150}
151
152impl From<PostgresTimestampOracleConfig> for PostgresClientConfig {
153 fn from(config: PostgresTimestampOracleConfig) -> Self {
154 let metrics = config.metrics.postgres_client.clone();
155 PostgresClientConfig::new(config.url.clone(), Arc::new(config), metrics)
156 }
157}
158
159impl PostgresTimestampOracleConfig {
160 pub(crate) const EXTERNAL_TESTS_POSTGRES_URL: &'static str = "METADATA_BACKEND_URL";
161
162 pub fn new(url: &SensitiveUrl, metrics_registry: &MetricsRegistry) -> Self {
164 let metrics = Arc::new(Metrics::new(metrics_registry));
165
166 let dynamic = DynamicConfig::default();
167
168 PostgresTimestampOracleConfig {
169 url: url.clone(),
170 metrics,
171 dynamic: Arc::new(dynamic),
172 }
173 }
174
175 pub fn new_for_test() -> Option<Self> {
184 let url = match std::env::var(Self::EXTERNAL_TESTS_POSTGRES_URL) {
185 Ok(url) => SensitiveUrl::from_str(&url).expect("invalid Postgres URL"),
186 Err(_) => {
187 if mz_ore::env::is_var_truthy("CI") {
188 panic!("CI is supposed to run this test but something has gone wrong!");
189 }
190 return None;
191 }
192 };
193
194 let dynamic = DynamicConfig::default();
195
196 let config = PostgresTimestampOracleConfig {
197 url,
198 metrics: Arc::new(Metrics::new(&MetricsRegistry::new())),
199 dynamic: Arc::new(dynamic),
200 };
201
202 Some(config)
203 }
204
205 pub(crate) fn metrics(&self) -> &Arc<Metrics> {
207 &self.metrics
208 }
209}
210
211#[derive(Debug)]
223pub struct DynamicConfig {
224 pg_connection_pool_max_size: AtomicUsize,
226
227 pg_connection_pool_max_wait: RwLock<Option<Duration>>,
229
230 pg_connection_pool_ttl: RwLock<Duration>,
234
235 pg_connection_pool_ttl_stagger: RwLock<Duration>,
243
244 pg_connection_pool_connect_timeout: RwLock<Duration>,
247
248 pg_connection_pool_tcp_user_timeout: RwLock<Duration>,
252
253 pg_connection_pool_keepalives_idle: RwLock<Duration>,
256
257 pg_connection_pool_keepalives_interval: RwLock<Duration>,
260
261 pg_connection_pool_keepalives_retries: AtomicU32,
264
265 pg_statement_timeout: RwLock<Duration>,
269}
270
271impl Default for DynamicConfig {
272 fn default() -> Self {
273 Self {
274 pg_connection_pool_max_size: AtomicUsize::new(
278 DEFAULT_PG_TIMESTAMP_ORACLE_CONNPOOL_MAX_SIZE,
279 ),
280 pg_connection_pool_max_wait: RwLock::new(Some(
281 DEFAULT_PG_TIMESTAMP_ORACLE_CONNPOOL_MAX_WAIT,
282 )),
283 pg_connection_pool_ttl: RwLock::new(DEFAULT_PG_TIMESTAMP_ORACLE_CONNPOOL_TTL),
284 pg_connection_pool_ttl_stagger: RwLock::new(
285 DEFAULT_PG_TIMESTAMP_ORACLE_CONNPOOL_TTL_STAGGER,
286 ),
287 pg_connection_pool_connect_timeout: RwLock::new(
288 DEFAULT_PG_TIMESTAMP_ORACLE_CONNECT_TIMEOUT,
289 ),
290 pg_connection_pool_tcp_user_timeout: RwLock::new(
291 DEFAULT_PG_TIMESTAMP_ORACLE_TCP_USER_TIMEOUT,
292 ),
293 pg_connection_pool_keepalives_idle: RwLock::new(
294 DEFAULT_PG_TIMESTAMP_ORACLE_KEEPALIVES_IDLE,
295 ),
296 pg_connection_pool_keepalives_interval: RwLock::new(
297 DEFAULT_PG_TIMESTAMP_ORACLE_KEEPALIVES_INTERVAL,
298 ),
299 pg_connection_pool_keepalives_retries: AtomicU32::new(
300 DEFAULT_PG_TIMESTAMP_ORACLE_KEEPALIVES_RETRIES,
301 ),
302 pg_statement_timeout: RwLock::new(DEFAULT_PG_TIMESTAMP_ORACLE_STATEMENT_TIMEOUT),
303 }
304 }
305}
306
307impl DynamicConfig {
308 const LOAD_ORDERING: Ordering = Ordering::SeqCst;
310 const STORE_ORDERING: Ordering = Ordering::SeqCst;
311
312 fn connection_pool_max_size(&self) -> usize {
313 self.pg_connection_pool_max_size.load(Self::LOAD_ORDERING)
314 }
315
316 fn connection_pool_max_wait(&self) -> Option<Duration> {
317 *self
318 .pg_connection_pool_max_wait
319 .read()
320 .expect("lock poisoned")
321 }
322
323 fn connection_pool_ttl(&self) -> Duration {
324 *self.pg_connection_pool_ttl.read().expect("lock poisoned")
325 }
326
327 fn connection_pool_ttl_stagger(&self) -> Duration {
328 *self
329 .pg_connection_pool_ttl_stagger
330 .read()
331 .expect("lock poisoned")
332 }
333
334 fn connect_timeout(&self) -> Duration {
335 *self
336 .pg_connection_pool_connect_timeout
337 .read()
338 .expect("lock poisoned")
339 }
340
341 fn tcp_user_timeout(&self) -> Duration {
342 *self
343 .pg_connection_pool_tcp_user_timeout
344 .read()
345 .expect("lock poisoned")
346 }
347
348 fn keepalives_idle(&self) -> Duration {
349 *self
350 .pg_connection_pool_keepalives_idle
351 .read()
352 .expect("lock poisoned")
353 }
354
355 fn keepalives_interval(&self) -> Duration {
356 *self
357 .pg_connection_pool_keepalives_interval
358 .read()
359 .expect("lock poisoned")
360 }
361
362 fn keepalives_retries(&self) -> u32 {
363 self.pg_connection_pool_keepalives_retries
364 .load(Self::LOAD_ORDERING)
365 }
366
367 fn statement_timeout(&self) -> Duration {
368 *self.pg_statement_timeout.read().expect("lock poisoned")
369 }
370}
371
372impl PostgresClientKnobs for PostgresTimestampOracleConfig {
373 fn connection_pool_max_size(&self) -> usize {
374 self.dynamic.connection_pool_max_size()
375 }
376
377 fn connection_pool_max_wait(&self) -> Option<Duration> {
378 self.dynamic.connection_pool_max_wait()
379 }
380
381 fn connection_pool_ttl(&self) -> Duration {
382 self.dynamic.connection_pool_ttl()
383 }
384
385 fn connection_pool_ttl_stagger(&self) -> Duration {
386 self.dynamic.connection_pool_ttl_stagger()
387 }
388
389 fn connect_timeout(&self) -> Duration {
390 self.dynamic.connect_timeout()
391 }
392
393 fn tcp_user_timeout(&self) -> Duration {
394 self.dynamic.tcp_user_timeout()
395 }
396
397 fn keepalives_idle(&self) -> Duration {
398 self.dynamic.keepalives_idle()
399 }
400
401 fn keepalives_interval(&self) -> Duration {
402 self.dynamic.keepalives_interval()
403 }
404
405 fn keepalives_retries(&self) -> u32 {
406 self.dynamic.keepalives_retries()
407 }
408
409 fn statement_timeout(&self) -> Duration {
410 self.dynamic.statement_timeout()
411 }
412}
413
414#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
421pub struct TimestampOracleParameters {
422 pub pg_connection_pool_max_size: Option<usize>,
424 pub pg_connection_pool_max_wait: Option<Option<Duration>>,
433 pub pg_connection_pool_ttl: Option<Duration>,
435 pub pg_connection_pool_ttl_stagger: Option<Duration>,
437 pub pg_connection_pool_connect_timeout: Option<Duration>,
439 pub pg_connection_pool_tcp_user_timeout: Option<Duration>,
441 pub pg_connection_pool_keepalives_idle: Option<Duration>,
443 pub pg_connection_pool_keepalives_interval: Option<Duration>,
445 pub pg_connection_pool_keepalives_retries: Option<u32>,
447 pub pg_statement_timeout: Option<Duration>,
449}
450
451impl TimestampOracleParameters {
452 pub fn update(&mut self, other: TimestampOracleParameters) {
454 let Self {
457 pg_connection_pool_max_size: self_pg_connection_pool_max_size,
458 pg_connection_pool_max_wait: self_pg_connection_pool_max_wait,
459 pg_connection_pool_ttl: self_pg_connection_pool_ttl,
460 pg_connection_pool_ttl_stagger: self_pg_connection_pool_ttl_stagger,
461 pg_connection_pool_connect_timeout: self_pg_connection_pool_connect_timeout,
462 pg_connection_pool_tcp_user_timeout: self_pg_connection_pool_tcp_user_timeout,
463 pg_connection_pool_keepalives_idle: self_pg_connection_pool_keepalives_idle,
464 pg_connection_pool_keepalives_interval: self_pg_connection_pool_keepalives_interval,
465 pg_connection_pool_keepalives_retries: self_pg_connection_pool_keepalives_retries,
466 pg_statement_timeout: self_pg_statement_timeout,
467 } = self;
468 let Self {
469 pg_connection_pool_max_size: other_pg_connection_pool_max_size,
470 pg_connection_pool_max_wait: other_pg_connection_pool_max_wait,
471 pg_connection_pool_ttl: other_pg_connection_pool_ttl,
472 pg_connection_pool_ttl_stagger: other_pg_connection_pool_ttl_stagger,
473 pg_connection_pool_connect_timeout: other_pg_connection_pool_connect_timeout,
474 pg_connection_pool_tcp_user_timeout: other_pg_connection_pool_tcp_user_timeout,
475 pg_connection_pool_keepalives_idle: other_pg_connection_pool_keepalives_idle,
476 pg_connection_pool_keepalives_interval: other_pg_connection_pool_keepalives_interval,
477 pg_connection_pool_keepalives_retries: other_pg_connection_pool_keepalives_retries,
478 pg_statement_timeout: other_pg_statement_timeout,
479 } = other;
480 if let Some(v) = other_pg_connection_pool_max_size {
481 *self_pg_connection_pool_max_size = Some(v);
482 }
483 if let Some(v) = other_pg_connection_pool_max_wait {
484 *self_pg_connection_pool_max_wait = Some(v);
485 }
486 if let Some(v) = other_pg_connection_pool_ttl {
487 *self_pg_connection_pool_ttl = Some(v);
488 }
489 if let Some(v) = other_pg_connection_pool_ttl_stagger {
490 *self_pg_connection_pool_ttl_stagger = Some(v);
491 }
492 if let Some(v) = other_pg_connection_pool_connect_timeout {
493 *self_pg_connection_pool_connect_timeout = Some(v);
494 }
495 if let Some(v) = other_pg_connection_pool_tcp_user_timeout {
496 *self_pg_connection_pool_tcp_user_timeout = Some(v);
497 }
498 if let Some(v) = other_pg_connection_pool_keepalives_idle {
499 *self_pg_connection_pool_keepalives_idle = Some(v);
500 }
501 if let Some(v) = other_pg_connection_pool_keepalives_interval {
502 *self_pg_connection_pool_keepalives_interval = Some(v);
503 }
504 if let Some(v) = other_pg_connection_pool_keepalives_retries {
505 *self_pg_connection_pool_keepalives_retries = Some(v);
506 }
507 if let Some(v) = other_pg_statement_timeout {
508 *self_pg_statement_timeout = Some(v);
509 }
510 }
511
512 pub fn apply(&self, cfg: &PostgresTimestampOracleConfig) {
518 info!(params = ?self, "Applying configuration update!");
519
520 let Self {
522 pg_connection_pool_max_size,
523 pg_connection_pool_max_wait,
524 pg_connection_pool_ttl,
525 pg_connection_pool_ttl_stagger,
526 pg_connection_pool_connect_timeout,
527 pg_connection_pool_tcp_user_timeout,
528 pg_connection_pool_keepalives_idle,
529 pg_connection_pool_keepalives_interval,
530 pg_connection_pool_keepalives_retries,
531 pg_statement_timeout,
532 } = self;
533 if let Some(pg_connection_pool_max_size) = pg_connection_pool_max_size {
534 cfg.dynamic
535 .pg_connection_pool_max_size
536 .store(*pg_connection_pool_max_size, DynamicConfig::STORE_ORDERING);
537 }
538 if let Some(pg_connection_pool_max_wait) = pg_connection_pool_max_wait {
539 let mut max_wait = cfg
540 .dynamic
541 .pg_connection_pool_max_wait
542 .write()
543 .expect("lock poisoned");
544 *max_wait = *pg_connection_pool_max_wait;
545 }
546 if let Some(pg_connection_pool_ttl) = pg_connection_pool_ttl {
547 let mut ttl = cfg
548 .dynamic
549 .pg_connection_pool_ttl
550 .write()
551 .expect("lock poisoned");
552 *ttl = *pg_connection_pool_ttl;
553 }
554 if let Some(pg_connection_pool_ttl_stagger) = pg_connection_pool_ttl_stagger {
555 let mut ttl_stagger = cfg
556 .dynamic
557 .pg_connection_pool_ttl_stagger
558 .write()
559 .expect("lock poisoned");
560 *ttl_stagger = *pg_connection_pool_ttl_stagger;
561 }
562 if let Some(pg_connection_pool_connect_timeout) = pg_connection_pool_connect_timeout {
563 let mut timeout = cfg
564 .dynamic
565 .pg_connection_pool_connect_timeout
566 .write()
567 .expect("lock poisoned");
568 *timeout = *pg_connection_pool_connect_timeout;
569 }
570 if let Some(pg_connection_pool_tcp_user_timeout) = pg_connection_pool_tcp_user_timeout {
571 let mut timeout = cfg
572 .dynamic
573 .pg_connection_pool_tcp_user_timeout
574 .write()
575 .expect("lock poisoned");
576 *timeout = *pg_connection_pool_tcp_user_timeout;
577 }
578 if let Some(pg_connection_pool_keepalives_idle) = pg_connection_pool_keepalives_idle {
579 let mut timeout = cfg
580 .dynamic
581 .pg_connection_pool_keepalives_idle
582 .write()
583 .expect("lock poisoned");
584 *timeout = *pg_connection_pool_keepalives_idle;
585 }
586 if let Some(pg_connection_pool_keepalives_interval) = pg_connection_pool_keepalives_interval
587 {
588 let mut timeout = cfg
589 .dynamic
590 .pg_connection_pool_keepalives_interval
591 .write()
592 .expect("lock poisoned");
593 *timeout = *pg_connection_pool_keepalives_interval;
594 }
595 if let Some(pg_connection_pool_keepalives_retries) = pg_connection_pool_keepalives_retries {
596 cfg.dynamic.pg_connection_pool_keepalives_retries.store(
597 *pg_connection_pool_keepalives_retries,
598 DynamicConfig::STORE_ORDERING,
599 );
600 }
601 if let Some(pg_statement_timeout) = pg_statement_timeout {
602 let mut timeout = cfg
603 .dynamic
604 .pg_statement_timeout
605 .write()
606 .expect("lock poisoned");
607 *timeout = *pg_statement_timeout;
608 }
609 }
610}
611
612impl<N> PostgresTimestampOracle<N>
613where
614 N: GenericNowFn<Timestamp> + std::fmt::Debug + 'static,
615{
616 pub async fn open(
620 config: PostgresTimestampOracleConfig,
621 timeline: String,
622 initially: Timestamp,
623 next: N,
624 read_only: bool,
625 ) -> Self {
626 info!(config = ?config, "opening PostgresTimestampOracle");
627
628 let fallible = || async {
629 let metrics = Arc::clone(&config.metrics);
630
631 let pg_config: Config = config.url.to_string().parse()?;
633 let role = pg_config.get_user().unwrap();
634 let create_schema = format!(
635 "CREATE SCHEMA IF NOT EXISTS tsoracle AUTHORIZATION {}",
636 escape_identifier(role),
637 );
638
639 let postgres_client = PostgresClient::open(config.clone().into())?;
640
641 let client = postgres_client.get_connection().await?;
642
643 let crdb_mode = match pg_batch_execute(
644 &client,
645 &format!(
646 "{}; {}{}; {}",
647 create_schema, SCHEMA, CRDB_SCHEMA_OPTIONS, CRDB_CONFIGURE_ZONE,
648 ),
649 )
650 .await
651 {
652 Ok(()) => true,
653 Err(e)
654 if e.code() == Some(&SqlState::INVALID_PARAMETER_VALUE)
655 || e.code() == Some(&SqlState::SYNTAX_ERROR) =>
656 {
657 info!(
658 "unable to initiate timestamp_oracle with CRDB params, this is expected and OK when running against Postgres: {:?}",
659 e
660 );
661 false
662 }
663 Err(e) => return Err(e.into()),
664 };
665
666 if !crdb_mode {
667 pg_batch_execute(&client, &format!("{}; {};", create_schema, SCHEMA)).await?;
668 }
669
670 let oracle = PostgresTimestampOracle {
671 timeline: timeline.clone(),
672 next: next.clone(),
673 postgres_client: Arc::new(postgres_client),
674 metrics,
675 read_only,
676 };
677
678 let q = r#"
684 INSERT INTO timestamp_oracle (timeline, read_ts, write_ts)
685 VALUES ($1, $2, $3)
686 ON CONFLICT (timeline) DO NOTHING;
687 "#;
688 let statement = client.prepare_cached(q).await?;
689
690 let initially_coerced = Self::ts_to_decimal(initially);
691 let _ = pg_execute_prepared(
692 &client,
693 &statement,
694 &[&oracle.timeline, &initially_coerced, &initially_coerced],
695 )
696 .await?;
697
698 if !read_only {
702 TimestampOracle::apply_write(&oracle, initially).await;
703 }
704
705 Result::<_, anyhow::Error>::Ok(oracle)
706 };
707
708 let metrics = &config.metrics.retries.open;
709
710 let oracle = retry_fallible(metrics, fallible).await;
711
712 oracle
713 }
714
715 async fn get_connection(&self) -> Result<Object, PoolError> {
716 self.postgres_client.get_connection().await
717 }
718
719 pub async fn get_all_timelines(
731 config: PostgresTimestampOracleConfig,
732 ) -> Result<Vec<(String, Timestamp)>, anyhow::Error> {
733 let fallible = || async {
734 let postgres_client = PostgresClient::open(config.clone().into())?;
735
736 let mut client = postgres_client.get_connection().await?;
737
738 let txn = client.transaction().await?;
739
740 let q = r#"
745 SELECT EXISTS (SELECT * FROM information_schema.tables WHERE table_name = 'timestamp_oracle' AND table_schema = CURRENT_SCHEMA);
746 "#;
747 let statement = txn.prepare(q).await?;
748 let exists_row = pg_txn_query_one_prepared(&txn, &statement, &[]).await?;
749 let exists: bool = exists_row.try_get("exists").expect("missing exists column");
750 if !exists {
751 return Ok(Vec::new());
752 }
753
754 let q = r#"
755 SELECT timeline, GREATEST(read_ts, write_ts) as ts FROM timestamp_oracle;
756 "#;
757 let statement = txn.prepare(q).await?;
758 let rows = pg_txn_query_prepared(&txn, &statement, &[]).await?;
759
760 txn.commit().await?;
761
762 let result = rows
763 .into_iter()
764 .map(|row| {
765 let timeline: String =
766 row.try_get("timeline").expect("missing timeline column");
767 let ts: Numeric = row.try_get("ts").expect("missing ts column");
768 let ts = Self::decimal_to_ts(ts);
769
770 (timeline, ts)
771 })
772 .collect::<Vec<_>>();
773
774 Ok(result)
775 };
776
777 let metrics = &config.metrics.retries.get_all_timelines;
778
779 let result = retry_fallible(metrics, fallible).await;
780
781 Ok(result)
782 }
783
784 #[mz_ore::instrument(name = "oracle::write_ts")]
785 async fn fallible_write_ts(&self) -> Result<WriteTimestamp<Timestamp>, anyhow::Error> {
786 if self.read_only {
787 panic!("attempting write_ts in read-only mode");
788 }
789
790 let proposed_next_ts = self.next.now();
791 let proposed_next_ts = Self::ts_to_decimal(proposed_next_ts);
792
793 let q = r#"
794 UPDATE timestamp_oracle SET write_ts = GREATEST(write_ts+1, $2)
795 WHERE timeline = $1
796 RETURNING write_ts;
797 "#;
798 let client = self.get_connection().await?;
799 let statement = client.prepare_cached(q).await?;
800 let result =
801 pg_query_one_prepared(&client, &statement, &[&self.timeline, &proposed_next_ts])
802 .await?;
803
804 let write_ts: Numeric = result.try_get("write_ts").expect("missing column write_ts");
805 let write_ts = Self::decimal_to_ts(write_ts);
806
807 debug!(
808 timeline = ?self.timeline,
809 write_ts = ?write_ts,
810 proposed_next_ts = ?proposed_next_ts,
811 "returning from write_ts()");
812
813 let advance_to = write_ts.step_forward();
814
815 Ok(WriteTimestamp {
816 timestamp: write_ts,
817 advance_to,
818 })
819 }
820
821 #[mz_ore::instrument(name = "oracle::peek_write_ts")]
822 async fn fallible_peek_write_ts(&self) -> Result<Timestamp, anyhow::Error> {
823 let q = r#"
824 SELECT write_ts FROM timestamp_oracle
825 WHERE timeline = $1;
826 "#;
827 let client = self.get_connection().await?;
828 let statement = client.prepare_cached(q).await?;
829 let result = pg_query_one_prepared(&client, &statement, &[&self.timeline]).await?;
830
831 let write_ts: Numeric = result.try_get("write_ts").expect("missing column write_ts");
832 let write_ts = Self::decimal_to_ts(write_ts);
833
834 debug!(
835 timeline = ?self.timeline,
836 write_ts = ?write_ts,
837 "returning from peek_write_ts()");
838
839 Ok(write_ts)
840 }
841
842 #[mz_ore::instrument(name = "oracle::read_ts")]
843 async fn fallible_read_ts(&self) -> Result<Timestamp, anyhow::Error> {
844 let q = r#"
845 SELECT read_ts FROM timestamp_oracle
846 WHERE timeline = $1;
847 "#;
848 let client = self.get_connection().await?;
849 let statement = client.prepare_cached(q).await?;
850 let result = pg_query_one_prepared(&client, &statement, &[&self.timeline]).await?;
851
852 let read_ts: Numeric = result.try_get("read_ts").expect("missing column read_ts");
853 let read_ts = Self::decimal_to_ts(read_ts);
854
855 debug!(
856 timeline = ?self.timeline,
857 read_ts = ?read_ts,
858 "returning from read_ts()");
859
860 Ok(read_ts)
861 }
862
863 #[mz_ore::instrument(name = "oracle::apply_write")]
864 async fn fallible_apply_write(&self, write_ts: Timestamp) -> Result<(), anyhow::Error> {
865 if self.read_only {
866 panic!("attempting apply_write in read-only mode");
867 }
868
869 let q = r#"
870 UPDATE timestamp_oracle SET write_ts = GREATEST(write_ts, $2), read_ts = GREATEST(read_ts, $2)
871 WHERE timeline = $1;
872 "#;
873 let client = self.get_connection().await?;
874 let statement = client.prepare_cached(q).await?;
875 let write_ts = Self::ts_to_decimal(write_ts);
876
877 let _ = pg_execute_prepared(&client, &statement, &[&self.timeline, &write_ts]).await?;
878
879 debug!(
880 timeline = ?self.timeline,
881 write_ts = ?write_ts,
882 "returning from apply_write()");
883
884 Ok(())
885 }
886
887 fn ts_to_decimal(ts: Timestamp) -> Numeric {
890 let decimal = Decimal::from(ts);
891 Numeric::from(decimal)
892 }
893
894 fn decimal_to_ts(ts: Numeric) -> Timestamp {
897 ts.0.0.try_into().expect("we only use u64 timestamps")
898 }
899}
900
901#[async_trait]
909impl<N> TimestampOracle<Timestamp> for PostgresTimestampOracle<N>
910where
911 N: GenericNowFn<Timestamp> + std::fmt::Debug + 'static,
912{
913 #[instrument]
914 async fn write_ts(&self) -> WriteTimestamp<Timestamp> {
915 let metrics = &self.metrics.retries.write_ts;
916
917 let res = retry_fallible(metrics, || {
918 self.metrics
919 .oracle
920 .write_ts
921 .run_op(|| self.fallible_write_ts())
922 })
923 .await;
924
925 res
926 }
927
928 #[instrument]
929 async fn peek_write_ts(&self) -> Timestamp {
930 let metrics = &self.metrics.retries.peek_write_ts;
931
932 let res = retry_fallible(metrics, || {
933 self.metrics
934 .oracle
935 .peek_write_ts
936 .run_op(|| self.fallible_peek_write_ts())
937 })
938 .await;
939
940 res
941 }
942
943 #[instrument]
944 async fn read_ts(&self) -> Timestamp {
945 let metrics = &self.metrics.retries.read_ts;
946
947 let res = retry_fallible(metrics, || {
948 self.metrics
949 .oracle
950 .read_ts
951 .run_op(|| self.fallible_read_ts())
952 })
953 .await;
954
955 res
956 }
957
958 #[instrument]
959 async fn apply_write(&self, write_ts: Timestamp) {
960 let metrics = &self.metrics.retries.apply_write;
961
962 let res = retry_fallible(metrics, || {
963 self.metrics
964 .oracle
965 .apply_write
966 .run_op(|| self.fallible_apply_write(write_ts.clone()))
967 })
968 .await;
969
970 res
971 }
972}
973
974pub const INFO_MIN_ATTEMPTS: usize = 3;
975
976pub async fn retry_fallible<R, F, WorkFn>(metrics: &RetryMetrics, mut work_fn: WorkFn) -> R
977where
978 F: std::future::Future<Output = Result<R, anyhow::Error>>,
979 WorkFn: FnMut() -> F,
980{
981 let mut retry = metrics.stream(Retry::oracle_defaults(SystemTime::now()).into_retry_stream());
982 loop {
983 match work_fn().await {
984 Ok(x) => {
985 if retry.attempt() > 0 {
986 debug!(
987 "external operation {} succeeded after failing at least once",
988 metrics.name,
989 );
990 }
991 return x;
992 }
993 Err(err)
994 if err
995 .to_string()
996 .contains("\"timestamp_oracle\" does not exist") =>
997 {
998 panic!(
1007 "external operation {} failed unrecoverably, someone removed our database/schema/table: {}",
1008 metrics.name,
1009 err.display_with_causes()
1010 );
1011 }
1012 Err(err) => {
1013 if retry.attempt() >= INFO_MIN_ATTEMPTS {
1014 info!(
1015 "external operation {} failed, retrying in {:?}: {}",
1016 metrics.name,
1017 retry.next_sleep(),
1018 err.display_with_causes()
1019 );
1020 } else {
1021 debug!(
1022 "external operation {} failed, retrying in {:?}: {}",
1023 metrics.name,
1024 retry.next_sleep(),
1025 err.display_with_causes()
1026 );
1027 }
1028 retry = retry.sleep().await;
1029 }
1030 }
1031 }
1032}
1033
1034#[cfg(test)]
1035mod tests {
1036 use super::*;
1037
1038 #[mz_ore::test(tokio::test)]
1039 #[cfg_attr(miri, ignore)] async fn test_postgres_timestamp_oracle() -> Result<(), anyhow::Error> {
1041 let config = match PostgresTimestampOracleConfig::new_for_test() {
1042 Some(config) => config,
1043 None => {
1044 info!(
1045 "{} env not set: skipping test that uses external service",
1046 PostgresTimestampOracleConfig::EXTERNAL_TESTS_POSTGRES_URL
1047 );
1048 return Ok(());
1049 }
1050 };
1051
1052 crate::tests::timestamp_oracle_impl_test(|timeline, now_fn, initial_ts| {
1053 let oracle = PostgresTimestampOracle::open(
1054 config.clone(),
1055 timeline,
1056 initial_ts,
1057 now_fn,
1058 false, );
1060
1061 async {
1062 let arced_oracle: Arc<dyn TimestampOracle<Timestamp> + Send + Sync> =
1063 Arc::new(oracle.await);
1064
1065 arced_oracle
1066 }
1067 })
1068 .await?;
1069
1070 Ok(())
1071 }
1072
1073 #[mz_ore::test(tokio::test)]
1074 #[cfg_attr(miri, ignore)] async fn test_postgres_statement_timeout() -> Result<(), anyhow::Error> {
1076 let config = match PostgresTimestampOracleConfig::new_for_test() {
1077 Some(config) => config,
1078 None => {
1079 info!(
1080 "{} env not set: skipping test that uses external service",
1081 PostgresTimestampOracleConfig::EXTERNAL_TESTS_POSTGRES_URL
1082 );
1083 return Ok(());
1084 }
1085 };
1086
1087 let no_timeout_client = PostgresClient::open(config.clone().into())?;
1090 let conn = no_timeout_client.get_connection().await?;
1091 pg_batch_execute(&conn, "SELECT pg_sleep(0.1)")
1092 .await
1093 .expect("query should not be aborted when no statement timeout is set");
1094 drop(conn);
1095
1096 TimestampOracleParameters {
1099 pg_statement_timeout: Some(Duration::from_millis(100)),
1100 ..Default::default()
1101 }
1102 .apply(&config);
1103
1104 let timeout_client = PostgresClient::open(config.clone().into())?;
1105 let conn = timeout_client.get_connection().await?;
1106
1107 let err = pg_batch_execute(&conn, "SELECT pg_sleep(5)")
1110 .await
1111 .expect_err("query should have been aborted by the statement timeout");
1112 assert_eq!(
1113 err.code(),
1114 Some(&SqlState::QUERY_CANCELED),
1115 "unexpected error, expected a statement timeout: {err:?}"
1116 );
1117
1118 Ok(())
1119 }
1120}