1use std::str::FromStr;
15use std::sync::atomic::{AtomicUsize, Ordering};
16use std::sync::{Arc, RwLock};
17use std::time::{Duration, SystemTime};
18
19use async_trait::async_trait;
20use deadpool_postgres::tokio_postgres::Config;
21use deadpool_postgres::tokio_postgres::error::SqlState;
22use deadpool_postgres::{Object, PoolError};
23use dec::Decimal;
24use mz_adapter_types::timestamp_oracle::{
25 DEFAULT_PG_TIMESTAMP_ORACLE_CONNECT_TIMEOUT, DEFAULT_PG_TIMESTAMP_ORACLE_CONNPOOL_MAX_SIZE,
26 DEFAULT_PG_TIMESTAMP_ORACLE_CONNPOOL_MAX_WAIT, DEFAULT_PG_TIMESTAMP_ORACLE_CONNPOOL_TTL,
27 DEFAULT_PG_TIMESTAMP_ORACLE_CONNPOOL_TTL_STAGGER, DEFAULT_PG_TIMESTAMP_ORACLE_TCP_USER_TIMEOUT,
28};
29use mz_ore::error::ErrorExt;
30use mz_ore::instrument;
31use mz_ore::metrics::MetricsRegistry;
32use mz_ore::url::SensitiveUrl;
33use mz_pgrepr::Numeric;
34use mz_postgres_client::{PostgresClient, PostgresClientConfig, PostgresClientKnobs};
35use mz_repr::Timestamp;
36use postgres_protocol::escape::escape_identifier;
37use serde::{Deserialize, Serialize};
38use tracing::{debug, info};
39
40use crate::WriteTimestamp;
41use crate::metrics::{Metrics, RetryMetrics};
42use crate::retry::Retry;
43use crate::{GenericNowFn, TimestampOracle};
44
45const SCHEMA: &str = "
49CREATE TABLE IF NOT EXISTS timestamp_oracle (
50 timeline text NOT NULL,
51 read_ts DECIMAL(20,0) NOT NULL,
52 write_ts DECIMAL(20,0) NOT NULL,
53 PRIMARY KEY(timeline)
54)
55";
56
57const CRDB_SCHEMA_OPTIONS: &str = "WITH (sql_stats_automatic_collection_enabled = false)";
64const CRDB_CONFIGURE_ZONE: &str =
73 "ALTER TABLE timestamp_oracle CONFIGURE ZONE USING gc.ttlseconds = 600;";
74
75#[derive(Debug)]
77pub struct PostgresTimestampOracle<N>
78where
79 N: GenericNowFn<Timestamp>,
80{
81 timeline: String,
82 next: N,
83 postgres_client: Arc<PostgresClient>,
84 metrics: Arc<Metrics>,
85 read_only: bool,
88}
89
90#[derive(Clone, Debug)]
93pub struct PostgresTimestampOracleConfig {
94 url: SensitiveUrl,
95 pub metrics: Arc<Metrics>,
96
97 pub dynamic: Arc<DynamicConfig>,
99}
100
101impl From<PostgresTimestampOracleConfig> for PostgresClientConfig {
102 fn from(config: PostgresTimestampOracleConfig) -> Self {
103 let metrics = config.metrics.postgres_client.clone();
104 PostgresClientConfig::new(config.url.clone(), Arc::new(config), metrics)
105 }
106}
107
108impl PostgresTimestampOracleConfig {
109 pub(crate) const EXTERNAL_TESTS_POSTGRES_URL: &'static str = "COCKROACH_URL";
110
111 pub fn new(url: &SensitiveUrl, metrics_registry: &MetricsRegistry) -> Self {
113 let metrics = Arc::new(Metrics::new(metrics_registry));
114
115 let dynamic = DynamicConfig::default();
116
117 PostgresTimestampOracleConfig {
118 url: url.clone(),
119 metrics,
120 dynamic: Arc::new(dynamic),
121 }
122 }
123
124 pub fn new_for_test() -> Option<Self> {
133 let url = match std::env::var(Self::EXTERNAL_TESTS_POSTGRES_URL) {
134 Ok(url) => SensitiveUrl::from_str(&url).expect("invalid Postgres URL"),
135 Err(_) => {
136 if mz_ore::env::is_var_truthy("CI") {
137 panic!("CI is supposed to run this test but something has gone wrong!");
138 }
139 return None;
140 }
141 };
142
143 let dynamic = DynamicConfig::default();
144
145 let config = PostgresTimestampOracleConfig {
146 url,
147 metrics: Arc::new(Metrics::new(&MetricsRegistry::new())),
148 dynamic: Arc::new(dynamic),
149 };
150
151 Some(config)
152 }
153}
154
155#[derive(Debug)]
167pub struct DynamicConfig {
168 pg_connection_pool_max_size: AtomicUsize,
170
171 pg_connection_pool_max_wait: RwLock<Option<Duration>>,
173
174 pg_connection_pool_ttl: RwLock<Duration>,
178
179 pg_connection_pool_ttl_stagger: RwLock<Duration>,
187
188 pg_connection_pool_connect_timeout: RwLock<Duration>,
191
192 pg_connection_pool_tcp_user_timeout: RwLock<Duration>,
196}
197
198impl Default for DynamicConfig {
199 fn default() -> Self {
200 Self {
201 pg_connection_pool_max_size: AtomicUsize::new(
205 DEFAULT_PG_TIMESTAMP_ORACLE_CONNPOOL_MAX_SIZE,
206 ),
207 pg_connection_pool_max_wait: RwLock::new(Some(
208 DEFAULT_PG_TIMESTAMP_ORACLE_CONNPOOL_MAX_WAIT,
209 )),
210 pg_connection_pool_ttl: RwLock::new(DEFAULT_PG_TIMESTAMP_ORACLE_CONNPOOL_TTL),
211 pg_connection_pool_ttl_stagger: RwLock::new(
212 DEFAULT_PG_TIMESTAMP_ORACLE_CONNPOOL_TTL_STAGGER,
213 ),
214 pg_connection_pool_connect_timeout: RwLock::new(
215 DEFAULT_PG_TIMESTAMP_ORACLE_CONNECT_TIMEOUT,
216 ),
217 pg_connection_pool_tcp_user_timeout: RwLock::new(
218 DEFAULT_PG_TIMESTAMP_ORACLE_TCP_USER_TIMEOUT,
219 ),
220 }
221 }
222}
223
224impl DynamicConfig {
225 const LOAD_ORDERING: Ordering = Ordering::SeqCst;
227 const STORE_ORDERING: Ordering = Ordering::SeqCst;
228
229 fn connection_pool_max_size(&self) -> usize {
230 self.pg_connection_pool_max_size.load(Self::LOAD_ORDERING)
231 }
232
233 fn connection_pool_max_wait(&self) -> Option<Duration> {
234 *self
235 .pg_connection_pool_max_wait
236 .read()
237 .expect("lock poisoned")
238 }
239
240 fn connection_pool_ttl(&self) -> Duration {
241 *self.pg_connection_pool_ttl.read().expect("lock poisoned")
242 }
243
244 fn connection_pool_ttl_stagger(&self) -> Duration {
245 *self
246 .pg_connection_pool_ttl_stagger
247 .read()
248 .expect("lock poisoned")
249 }
250
251 fn connect_timeout(&self) -> Duration {
252 *self
253 .pg_connection_pool_connect_timeout
254 .read()
255 .expect("lock poisoned")
256 }
257
258 fn tcp_user_timeout(&self) -> Duration {
259 *self
260 .pg_connection_pool_tcp_user_timeout
261 .read()
262 .expect("lock poisoned")
263 }
264}
265
266impl PostgresClientKnobs for PostgresTimestampOracleConfig {
267 fn connection_pool_max_size(&self) -> usize {
268 self.dynamic.connection_pool_max_size()
269 }
270
271 fn connection_pool_max_wait(&self) -> Option<Duration> {
272 self.dynamic.connection_pool_max_wait()
273 }
274
275 fn connection_pool_ttl(&self) -> Duration {
276 self.dynamic.connection_pool_ttl()
277 }
278
279 fn connection_pool_ttl_stagger(&self) -> Duration {
280 self.dynamic.connection_pool_ttl_stagger()
281 }
282
283 fn connect_timeout(&self) -> Duration {
284 self.dynamic.connect_timeout()
285 }
286
287 fn tcp_user_timeout(&self) -> Duration {
288 self.dynamic.tcp_user_timeout()
289 }
290}
291
292#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
299pub struct PostgresTimestampOracleParameters {
300 pub pg_connection_pool_max_size: Option<usize>,
302 pub pg_connection_pool_max_wait: Option<Option<Duration>>,
311 pub pg_connection_pool_ttl: Option<Duration>,
313 pub pg_connection_pool_ttl_stagger: Option<Duration>,
315 pub pg_connection_pool_connect_timeout: Option<Duration>,
317 pub pg_connection_pool_tcp_user_timeout: Option<Duration>,
319}
320
321impl PostgresTimestampOracleParameters {
322 pub fn update(&mut self, other: PostgresTimestampOracleParameters) {
324 let Self {
327 pg_connection_pool_max_size: self_pg_connection_pool_max_size,
328 pg_connection_pool_max_wait: self_pg_connection_pool_max_wait,
329 pg_connection_pool_ttl: self_pg_connection_pool_ttl,
330 pg_connection_pool_ttl_stagger: self_pg_connection_pool_ttl_stagger,
331 pg_connection_pool_connect_timeout: self_pg_connection_pool_connect_timeout,
332 pg_connection_pool_tcp_user_timeout: self_pg_connection_pool_tcp_user_timeout,
333 } = self;
334 let Self {
335 pg_connection_pool_max_size: other_pg_connection_pool_max_size,
336 pg_connection_pool_max_wait: other_pg_connection_pool_max_wait,
337 pg_connection_pool_ttl: other_pg_connection_pool_ttl,
338 pg_connection_pool_ttl_stagger: other_pg_connection_pool_ttl_stagger,
339 pg_connection_pool_connect_timeout: other_pg_connection_pool_connect_timeout,
340 pg_connection_pool_tcp_user_timeout: other_pg_connection_pool_tcp_user_timeout,
341 } = other;
342 if let Some(v) = other_pg_connection_pool_max_size {
343 *self_pg_connection_pool_max_size = Some(v);
344 }
345 if let Some(v) = other_pg_connection_pool_max_wait {
346 *self_pg_connection_pool_max_wait = Some(v);
347 }
348 if let Some(v) = other_pg_connection_pool_ttl {
349 *self_pg_connection_pool_ttl = Some(v);
350 }
351 if let Some(v) = other_pg_connection_pool_ttl_stagger {
352 *self_pg_connection_pool_ttl_stagger = Some(v);
353 }
354 if let Some(v) = other_pg_connection_pool_connect_timeout {
355 *self_pg_connection_pool_connect_timeout = Some(v);
356 }
357 if let Some(v) = other_pg_connection_pool_tcp_user_timeout {
358 *self_pg_connection_pool_tcp_user_timeout = Some(v);
359 }
360 }
361
362 pub fn apply(&self, cfg: &PostgresTimestampOracleConfig) {
368 info!(params = ?self, "Applying configuration update!");
369
370 let Self {
372 pg_connection_pool_max_size,
373 pg_connection_pool_max_wait,
374 pg_connection_pool_ttl,
375 pg_connection_pool_ttl_stagger,
376 pg_connection_pool_connect_timeout,
377 pg_connection_pool_tcp_user_timeout,
378 } = self;
379 if let Some(pg_connection_pool_max_size) = pg_connection_pool_max_size {
380 cfg.dynamic
381 .pg_connection_pool_max_size
382 .store(*pg_connection_pool_max_size, DynamicConfig::STORE_ORDERING);
383 }
384 if let Some(pg_connection_pool_max_wait) = pg_connection_pool_max_wait {
385 let mut max_wait = cfg
386 .dynamic
387 .pg_connection_pool_max_wait
388 .write()
389 .expect("lock poisoned");
390 *max_wait = *pg_connection_pool_max_wait;
391 }
392 if let Some(pg_connection_pool_ttl) = pg_connection_pool_ttl {
393 let mut ttl = cfg
394 .dynamic
395 .pg_connection_pool_ttl
396 .write()
397 .expect("lock poisoned");
398 *ttl = *pg_connection_pool_ttl;
399 }
400 if let Some(pg_connection_pool_ttl_stagger) = pg_connection_pool_ttl_stagger {
401 let mut ttl_stagger = cfg
402 .dynamic
403 .pg_connection_pool_ttl_stagger
404 .write()
405 .expect("lock poisoned");
406 *ttl_stagger = *pg_connection_pool_ttl_stagger;
407 }
408 if let Some(pg_connection_pool_connect_timeout) = pg_connection_pool_connect_timeout {
409 let mut timeout = cfg
410 .dynamic
411 .pg_connection_pool_connect_timeout
412 .write()
413 .expect("lock poisoned");
414 *timeout = *pg_connection_pool_connect_timeout;
415 }
416 if let Some(pg_connection_pool_tcp_user_timeout) = pg_connection_pool_tcp_user_timeout {
417 let mut timeout = cfg
418 .dynamic
419 .pg_connection_pool_tcp_user_timeout
420 .write()
421 .expect("lock poisoned");
422 *timeout = *pg_connection_pool_tcp_user_timeout;
423 }
424 }
425}
426
427impl<N> PostgresTimestampOracle<N>
428where
429 N: GenericNowFn<Timestamp> + std::fmt::Debug + 'static,
430{
431 pub async fn open(
435 config: PostgresTimestampOracleConfig,
436 timeline: String,
437 initially: Timestamp,
438 next: N,
439 read_only: bool,
440 ) -> Self {
441 info!(config = ?config, "opening PostgresTimestampOracle");
442
443 let fallible = || async {
444 let metrics = Arc::clone(&config.metrics);
445
446 let pg_config: Config = config.url.to_string().parse()?;
448 let role = pg_config.get_user().unwrap();
449 let create_schema = format!(
450 "CREATE SCHEMA IF NOT EXISTS tsoracle AUTHORIZATION {}",
451 escape_identifier(role),
452 );
453
454 let postgres_client = PostgresClient::open(config.clone().into())?;
455
456 let client = postgres_client.get_connection().await?;
457
458 let crdb_mode = match client
459 .batch_execute(&format!(
460 "{}; {}{}; {}",
461 create_schema, SCHEMA, CRDB_SCHEMA_OPTIONS, CRDB_CONFIGURE_ZONE,
462 ))
463 .await
464 {
465 Ok(()) => true,
466 Err(e)
467 if e.code() == Some(&SqlState::INVALID_PARAMETER_VALUE)
468 || e.code() == Some(&SqlState::SYNTAX_ERROR) =>
469 {
470 info!(
471 "unable to initiate timestamp_oracle with CRDB params, this is expected and OK when running against Postgres: {:?}",
472 e
473 );
474 false
475 }
476 Err(e) => return Err(e.into()),
477 };
478
479 if !crdb_mode {
480 client
481 .batch_execute(&format!("{}; {};", create_schema, SCHEMA))
482 .await?;
483 }
484
485 let oracle = PostgresTimestampOracle {
486 timeline: timeline.clone(),
487 next: next.clone(),
488 postgres_client: Arc::new(postgres_client),
489 metrics,
490 read_only,
491 };
492
493 let q = r#"
499 INSERT INTO timestamp_oracle (timeline, read_ts, write_ts)
500 VALUES ($1, $2, $3)
501 ON CONFLICT (timeline) DO NOTHING;
502 "#;
503 let statement = client.prepare_cached(q).await?;
504
505 let initially_coerced = Self::ts_to_decimal(initially);
506 let _ = client
507 .execute(
508 &statement,
509 &[&oracle.timeline, &initially_coerced, &initially_coerced],
510 )
511 .await?;
512
513 if !read_only {
517 TimestampOracle::apply_write(&oracle, initially).await;
518 }
519
520 Result::<_, anyhow::Error>::Ok(oracle)
521 };
522
523 let metrics = &config.metrics.retries.open;
524
525 let oracle = retry_fallible(metrics, fallible).await;
526
527 oracle
528 }
529
530 async fn get_connection(&self) -> Result<Object, PoolError> {
531 self.postgres_client.get_connection().await
532 }
533
534 pub async fn get_all_timelines(
546 config: PostgresTimestampOracleConfig,
547 ) -> Result<Vec<(String, Timestamp)>, anyhow::Error> {
548 let fallible = || async {
549 let postgres_client = PostgresClient::open(config.clone().into())?;
550
551 let mut client = postgres_client.get_connection().await?;
552
553 let txn = client.transaction().await?;
554
555 let q = r#"
560 SELECT EXISTS (SELECT * FROM information_schema.tables WHERE table_name = 'timestamp_oracle' AND table_schema = CURRENT_SCHEMA);
561 "#;
562 let statement = txn.prepare(q).await?;
563 let exists_row = txn.query_one(&statement, &[]).await?;
564 let exists: bool = exists_row.try_get("exists").expect("missing exists column");
565 if !exists {
566 return Ok(Vec::new());
567 }
568
569 let q = r#"
570 SELECT timeline, GREATEST(read_ts, write_ts) as ts FROM timestamp_oracle;
571 "#;
572 let statement = txn.prepare(q).await?;
573 let rows = txn.query(&statement, &[]).await?;
574
575 txn.commit().await?;
576
577 let result = rows
578 .into_iter()
579 .map(|row| {
580 let timeline: String =
581 row.try_get("timeline").expect("missing timeline column");
582 let ts: Numeric = row.try_get("ts").expect("missing ts column");
583 let ts = Self::decimal_to_ts(ts);
584
585 (timeline, ts)
586 })
587 .collect::<Vec<_>>();
588
589 Ok(result)
590 };
591
592 let metrics = &config.metrics.retries.get_all_timelines;
593
594 let result = retry_fallible(metrics, fallible).await;
595
596 Ok(result)
597 }
598
599 #[mz_ore::instrument(name = "oracle::write_ts")]
600 async fn fallible_write_ts(&self) -> Result<WriteTimestamp<Timestamp>, anyhow::Error> {
601 if self.read_only {
602 panic!("attempting write_ts in read-only mode");
603 }
604
605 let proposed_next_ts = self.next.now();
606 let proposed_next_ts = Self::ts_to_decimal(proposed_next_ts);
607
608 let q = r#"
609 UPDATE timestamp_oracle SET write_ts = GREATEST(write_ts+1, $2)
610 WHERE timeline = $1
611 RETURNING write_ts;
612 "#;
613 let client = self.get_connection().await?;
614 let statement = client.prepare_cached(q).await?;
615 let result = client
616 .query_one(&statement, &[&self.timeline, &proposed_next_ts])
617 .await?;
618
619 let write_ts: Numeric = result.try_get("write_ts").expect("missing column write_ts");
620 let write_ts = Self::decimal_to_ts(write_ts);
621
622 debug!(
623 timeline = ?self.timeline,
624 write_ts = ?write_ts,
625 proposed_next_ts = ?proposed_next_ts,
626 "returning from write_ts()");
627
628 let advance_to = write_ts.step_forward();
629
630 Ok(WriteTimestamp {
631 timestamp: write_ts,
632 advance_to,
633 })
634 }
635
636 #[mz_ore::instrument(name = "oracle::peek_write_ts")]
637 async fn fallible_peek_write_ts(&self) -> Result<Timestamp, anyhow::Error> {
638 let q = r#"
639 SELECT write_ts FROM timestamp_oracle
640 WHERE timeline = $1;
641 "#;
642 let client = self.get_connection().await?;
643 let statement = client.prepare_cached(q).await?;
644 let result = client.query_one(&statement, &[&self.timeline]).await?;
645
646 let write_ts: Numeric = result.try_get("write_ts").expect("missing column write_ts");
647 let write_ts = Self::decimal_to_ts(write_ts);
648
649 debug!(
650 timeline = ?self.timeline,
651 write_ts = ?write_ts,
652 "returning from peek_write_ts()");
653
654 Ok(write_ts)
655 }
656
657 #[mz_ore::instrument(name = "oracle::read_ts")]
658 async fn fallible_read_ts(&self) -> Result<Timestamp, anyhow::Error> {
659 let q = r#"
660 SELECT read_ts FROM timestamp_oracle
661 WHERE timeline = $1;
662 "#;
663 let client = self.get_connection().await?;
664 let statement = client.prepare_cached(q).await?;
665 let result = client.query_one(&statement, &[&self.timeline]).await?;
666
667 let read_ts: Numeric = result.try_get("read_ts").expect("missing column read_ts");
668 let read_ts = Self::decimal_to_ts(read_ts);
669
670 debug!(
671 timeline = ?self.timeline,
672 read_ts = ?read_ts,
673 "returning from read_ts()");
674
675 Ok(read_ts)
676 }
677
678 #[mz_ore::instrument(name = "oracle::apply_write")]
679 async fn fallible_apply_write(&self, write_ts: Timestamp) -> Result<(), anyhow::Error> {
680 if self.read_only {
681 panic!("attempting apply_write in read-only mode");
682 }
683
684 let q = r#"
685 UPDATE timestamp_oracle SET write_ts = GREATEST(write_ts, $2), read_ts = GREATEST(read_ts, $2)
686 WHERE timeline = $1;
687 "#;
688 let client = self.get_connection().await?;
689 let statement = client.prepare_cached(q).await?;
690 let write_ts = Self::ts_to_decimal(write_ts);
691
692 let _ = client
693 .execute(&statement, &[&self.timeline, &write_ts])
694 .await?;
695
696 debug!(
697 timeline = ?self.timeline,
698 write_ts = ?write_ts,
699 "returning from apply_write()");
700
701 Ok(())
702 }
703
704 fn ts_to_decimal(ts: Timestamp) -> Numeric {
707 let decimal = Decimal::from(ts);
708 Numeric::from(decimal)
709 }
710
711 fn decimal_to_ts(ts: Numeric) -> Timestamp {
714 ts.0.0.try_into().expect("we only use u64 timestamps")
715 }
716}
717
718#[async_trait]
726impl<N> TimestampOracle<Timestamp> for PostgresTimestampOracle<N>
727where
728 N: GenericNowFn<Timestamp> + std::fmt::Debug + 'static,
729{
730 #[instrument]
731 async fn write_ts(&self) -> WriteTimestamp<Timestamp> {
732 let metrics = &self.metrics.retries.write_ts;
733
734 let res = retry_fallible(metrics, || {
735 self.metrics
736 .oracle
737 .write_ts
738 .run_op(|| self.fallible_write_ts())
739 })
740 .await;
741
742 res
743 }
744
745 #[instrument]
746 async fn peek_write_ts(&self) -> Timestamp {
747 let metrics = &self.metrics.retries.peek_write_ts;
748
749 let res = retry_fallible(metrics, || {
750 self.metrics
751 .oracle
752 .peek_write_ts
753 .run_op(|| self.fallible_peek_write_ts())
754 })
755 .await;
756
757 res
758 }
759
760 #[instrument]
761 async fn read_ts(&self) -> Timestamp {
762 let metrics = &self.metrics.retries.read_ts;
763
764 let res = retry_fallible(metrics, || {
765 self.metrics
766 .oracle
767 .read_ts
768 .run_op(|| self.fallible_read_ts())
769 })
770 .await;
771
772 res
773 }
774
775 #[instrument]
776 async fn apply_write(&self, write_ts: Timestamp) {
777 let metrics = &self.metrics.retries.apply_write;
778
779 let res = retry_fallible(metrics, || {
780 self.metrics
781 .oracle
782 .apply_write
783 .run_op(|| self.fallible_apply_write(write_ts.clone()))
784 })
785 .await;
786
787 res
788 }
789}
790
791pub const INFO_MIN_ATTEMPTS: usize = 3;
792
793pub async fn retry_fallible<R, F, WorkFn>(metrics: &RetryMetrics, mut work_fn: WorkFn) -> R
794where
795 F: std::future::Future<Output = Result<R, anyhow::Error>>,
796 WorkFn: FnMut() -> F,
797{
798 let mut retry = metrics.stream(Retry::oracle_defaults(SystemTime::now()).into_retry_stream());
799 loop {
800 match work_fn().await {
801 Ok(x) => {
802 if retry.attempt() > 0 {
803 debug!(
804 "external operation {} succeeded after failing at least once",
805 metrics.name,
806 );
807 }
808 return x;
809 }
810 Err(err)
811 if err
812 .to_string()
813 .contains("\"timestamp_oracle\" does not exist") =>
814 {
815 panic!(
824 "external operation {} failed unrecoverably, someone removed our database/schema/table: {}",
825 metrics.name,
826 err.display_with_causes()
827 );
828 }
829 Err(err) => {
830 if retry.attempt() >= INFO_MIN_ATTEMPTS {
831 info!(
832 "external operation {} failed, retrying in {:?}: {}",
833 metrics.name,
834 retry.next_sleep(),
835 err.display_with_causes()
836 );
837 } else {
838 info!(
839 "external operation {} failed, retrying in {:?}: {}",
840 metrics.name,
841 retry.next_sleep(),
842 err.display_with_causes()
843 );
844 }
845 retry = retry.sleep().await;
846 }
847 }
848 }
849}
850
851#[cfg(test)]
852mod tests {
853 use super::*;
854
855 #[mz_ore::test(tokio::test)]
856 #[cfg_attr(miri, ignore)] async fn test_postgres_timestamp_oracle() -> Result<(), anyhow::Error> {
858 let config = match PostgresTimestampOracleConfig::new_for_test() {
859 Some(config) => config,
860 None => {
861 info!(
862 "{} env not set: skipping test that uses external service",
863 PostgresTimestampOracleConfig::EXTERNAL_TESTS_POSTGRES_URL
864 );
865 return Ok(());
866 }
867 };
868
869 crate::tests::timestamp_oracle_impl_test(|timeline, now_fn, initial_ts| {
870 let oracle = PostgresTimestampOracle::open(
871 config.clone(),
872 timeline,
873 initial_ts,
874 now_fn,
875 false, );
877
878 async {
879 let arced_oracle: Arc<dyn TimestampOracle<Timestamp> + Send + Sync> =
880 Arc::new(oracle.await);
881
882 arced_oracle
883 }
884 })
885 .await?;
886
887 Ok(())
888 }
889}