1use std::str::FromStr;
15use std::sync::atomic::{AtomicU32, AtomicUsize, Ordering};
16use std::sync::{Arc, RwLock};
17use std::time::{Duration, SystemTime};
18
19use async_trait::async_trait;
20use deadpool_postgres::tokio_postgres::Config;
21use deadpool_postgres::tokio_postgres::error::SqlState;
22use deadpool_postgres::{Object, PoolError};
23use dec::Decimal;
24use mz_adapter_types::timestamp_oracle::{
25 DEFAULT_PG_TIMESTAMP_ORACLE_CONNECT_TIMEOUT, DEFAULT_PG_TIMESTAMP_ORACLE_CONNPOOL_MAX_SIZE,
26 DEFAULT_PG_TIMESTAMP_ORACLE_CONNPOOL_MAX_WAIT, DEFAULT_PG_TIMESTAMP_ORACLE_CONNPOOL_TTL,
27 DEFAULT_PG_TIMESTAMP_ORACLE_CONNPOOL_TTL_STAGGER, DEFAULT_PG_TIMESTAMP_ORACLE_KEEPALIVES_IDLE,
28 DEFAULT_PG_TIMESTAMP_ORACLE_KEEPALIVES_INTERVAL,
29 DEFAULT_PG_TIMESTAMP_ORACLE_KEEPALIVES_RETRIES, DEFAULT_PG_TIMESTAMP_ORACLE_TCP_USER_TIMEOUT,
30};
31use mz_ore::error::ErrorExt;
32use mz_ore::instrument;
33use mz_ore::metrics::MetricsRegistry;
34use mz_ore::url::SensitiveUrl;
35use mz_pgrepr::Numeric;
36use mz_postgres_client::{PostgresClient, PostgresClientConfig, PostgresClientKnobs};
37use mz_repr::Timestamp;
38use postgres_protocol::escape::escape_identifier;
39use serde::{Deserialize, Serialize};
40use tracing::{debug, info};
41
42use crate::WriteTimestamp;
43use crate::metrics::{Metrics, RetryMetrics};
44use crate::retry::Retry;
45use crate::{GenericNowFn, TimestampOracle};
46
47const SCHEMA: &str = "
51CREATE TABLE IF NOT EXISTS timestamp_oracle (
52 timeline text NOT NULL,
53 read_ts DECIMAL(20,0) NOT NULL,
54 write_ts DECIMAL(20,0) NOT NULL,
55 PRIMARY KEY(timeline)
56)
57";
58
59const CRDB_SCHEMA_OPTIONS: &str = "WITH (sql_stats_automatic_collection_enabled = false)";
66const CRDB_CONFIGURE_ZONE: &str =
75 "ALTER TABLE timestamp_oracle CONFIGURE ZONE USING gc.ttlseconds = 600;";
76
77#[derive(Debug)]
79pub struct PostgresTimestampOracle<N>
80where
81 N: GenericNowFn<Timestamp>,
82{
83 timeline: String,
84 next: N,
85 postgres_client: Arc<PostgresClient>,
86 metrics: Arc<Metrics>,
87 read_only: bool,
90}
91
92#[derive(Clone, Debug)]
95pub struct PostgresTimestampOracleConfig {
96 url: SensitiveUrl,
97 metrics: Arc<Metrics>,
98
99 pub dynamic: Arc<DynamicConfig>,
101}
102
103impl From<PostgresTimestampOracleConfig> for PostgresClientConfig {
104 fn from(config: PostgresTimestampOracleConfig) -> Self {
105 let metrics = config.metrics.postgres_client.clone();
106 PostgresClientConfig::new(config.url.clone(), Arc::new(config), metrics)
107 }
108}
109
110impl PostgresTimestampOracleConfig {
111 pub(crate) const EXTERNAL_TESTS_POSTGRES_URL: &'static str = "METADATA_BACKEND_URL";
112
113 pub fn new(url: &SensitiveUrl, metrics_registry: &MetricsRegistry) -> Self {
115 let metrics = Arc::new(Metrics::new(metrics_registry));
116
117 let dynamic = DynamicConfig::default();
118
119 PostgresTimestampOracleConfig {
120 url: url.clone(),
121 metrics,
122 dynamic: Arc::new(dynamic),
123 }
124 }
125
126 pub fn new_for_test() -> Option<Self> {
135 let url = match std::env::var(Self::EXTERNAL_TESTS_POSTGRES_URL) {
136 Ok(url) => SensitiveUrl::from_str(&url).expect("invalid Postgres URL"),
137 Err(_) => {
138 if mz_ore::env::is_var_truthy("CI") {
139 panic!("CI is supposed to run this test but something has gone wrong!");
140 }
141 return None;
142 }
143 };
144
145 let dynamic = DynamicConfig::default();
146
147 let config = PostgresTimestampOracleConfig {
148 url,
149 metrics: Arc::new(Metrics::new(&MetricsRegistry::new())),
150 dynamic: Arc::new(dynamic),
151 };
152
153 Some(config)
154 }
155
156 pub(crate) fn metrics(&self) -> &Arc<Metrics> {
158 &self.metrics
159 }
160}
161
162#[derive(Debug)]
174pub struct DynamicConfig {
175 pg_connection_pool_max_size: AtomicUsize,
177
178 pg_connection_pool_max_wait: RwLock<Option<Duration>>,
180
181 pg_connection_pool_ttl: RwLock<Duration>,
185
186 pg_connection_pool_ttl_stagger: RwLock<Duration>,
194
195 pg_connection_pool_connect_timeout: RwLock<Duration>,
198
199 pg_connection_pool_tcp_user_timeout: RwLock<Duration>,
203
204 pg_connection_pool_keepalives_idle: RwLock<Duration>,
207
208 pg_connection_pool_keepalives_interval: RwLock<Duration>,
211
212 pg_connection_pool_keepalives_retries: AtomicU32,
215}
216
217impl Default for DynamicConfig {
218 fn default() -> Self {
219 Self {
220 pg_connection_pool_max_size: AtomicUsize::new(
224 DEFAULT_PG_TIMESTAMP_ORACLE_CONNPOOL_MAX_SIZE,
225 ),
226 pg_connection_pool_max_wait: RwLock::new(Some(
227 DEFAULT_PG_TIMESTAMP_ORACLE_CONNPOOL_MAX_WAIT,
228 )),
229 pg_connection_pool_ttl: RwLock::new(DEFAULT_PG_TIMESTAMP_ORACLE_CONNPOOL_TTL),
230 pg_connection_pool_ttl_stagger: RwLock::new(
231 DEFAULT_PG_TIMESTAMP_ORACLE_CONNPOOL_TTL_STAGGER,
232 ),
233 pg_connection_pool_connect_timeout: RwLock::new(
234 DEFAULT_PG_TIMESTAMP_ORACLE_CONNECT_TIMEOUT,
235 ),
236 pg_connection_pool_tcp_user_timeout: RwLock::new(
237 DEFAULT_PG_TIMESTAMP_ORACLE_TCP_USER_TIMEOUT,
238 ),
239 pg_connection_pool_keepalives_idle: RwLock::new(
240 DEFAULT_PG_TIMESTAMP_ORACLE_KEEPALIVES_IDLE,
241 ),
242 pg_connection_pool_keepalives_interval: RwLock::new(
243 DEFAULT_PG_TIMESTAMP_ORACLE_KEEPALIVES_INTERVAL,
244 ),
245 pg_connection_pool_keepalives_retries: AtomicU32::new(
246 DEFAULT_PG_TIMESTAMP_ORACLE_KEEPALIVES_RETRIES,
247 ),
248 }
249 }
250}
251
252impl DynamicConfig {
253 const LOAD_ORDERING: Ordering = Ordering::SeqCst;
255 const STORE_ORDERING: Ordering = Ordering::SeqCst;
256
257 fn connection_pool_max_size(&self) -> usize {
258 self.pg_connection_pool_max_size.load(Self::LOAD_ORDERING)
259 }
260
261 fn connection_pool_max_wait(&self) -> Option<Duration> {
262 *self
263 .pg_connection_pool_max_wait
264 .read()
265 .expect("lock poisoned")
266 }
267
268 fn connection_pool_ttl(&self) -> Duration {
269 *self.pg_connection_pool_ttl.read().expect("lock poisoned")
270 }
271
272 fn connection_pool_ttl_stagger(&self) -> Duration {
273 *self
274 .pg_connection_pool_ttl_stagger
275 .read()
276 .expect("lock poisoned")
277 }
278
279 fn connect_timeout(&self) -> Duration {
280 *self
281 .pg_connection_pool_connect_timeout
282 .read()
283 .expect("lock poisoned")
284 }
285
286 fn tcp_user_timeout(&self) -> Duration {
287 *self
288 .pg_connection_pool_tcp_user_timeout
289 .read()
290 .expect("lock poisoned")
291 }
292
293 fn keepalives_idle(&self) -> Duration {
294 *self
295 .pg_connection_pool_keepalives_idle
296 .read()
297 .expect("lock poisoned")
298 }
299
300 fn keepalives_interval(&self) -> Duration {
301 *self
302 .pg_connection_pool_keepalives_interval
303 .read()
304 .expect("lock poisoned")
305 }
306
307 fn keepalives_retries(&self) -> u32 {
308 self.pg_connection_pool_keepalives_retries
309 .load(Self::LOAD_ORDERING)
310 }
311}
312
313impl PostgresClientKnobs for PostgresTimestampOracleConfig {
314 fn connection_pool_max_size(&self) -> usize {
315 self.dynamic.connection_pool_max_size()
316 }
317
318 fn connection_pool_max_wait(&self) -> Option<Duration> {
319 self.dynamic.connection_pool_max_wait()
320 }
321
322 fn connection_pool_ttl(&self) -> Duration {
323 self.dynamic.connection_pool_ttl()
324 }
325
326 fn connection_pool_ttl_stagger(&self) -> Duration {
327 self.dynamic.connection_pool_ttl_stagger()
328 }
329
330 fn connect_timeout(&self) -> Duration {
331 self.dynamic.connect_timeout()
332 }
333
334 fn tcp_user_timeout(&self) -> Duration {
335 self.dynamic.tcp_user_timeout()
336 }
337
338 fn keepalives_idle(&self) -> Duration {
339 self.dynamic.keepalives_idle()
340 }
341
342 fn keepalives_interval(&self) -> Duration {
343 self.dynamic.keepalives_interval()
344 }
345
346 fn keepalives_retries(&self) -> u32 {
347 self.dynamic.keepalives_retries()
348 }
349}
350
351#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
358pub struct TimestampOracleParameters {
359 pub pg_connection_pool_max_size: Option<usize>,
361 pub pg_connection_pool_max_wait: Option<Option<Duration>>,
370 pub pg_connection_pool_ttl: Option<Duration>,
372 pub pg_connection_pool_ttl_stagger: Option<Duration>,
374 pub pg_connection_pool_connect_timeout: Option<Duration>,
376 pub pg_connection_pool_tcp_user_timeout: Option<Duration>,
378 pub pg_connection_pool_keepalives_idle: Option<Duration>,
380 pub pg_connection_pool_keepalives_interval: Option<Duration>,
382 pub pg_connection_pool_keepalives_retries: Option<u32>,
384}
385
386impl TimestampOracleParameters {
387 pub fn update(&mut self, other: TimestampOracleParameters) {
389 let Self {
392 pg_connection_pool_max_size: self_pg_connection_pool_max_size,
393 pg_connection_pool_max_wait: self_pg_connection_pool_max_wait,
394 pg_connection_pool_ttl: self_pg_connection_pool_ttl,
395 pg_connection_pool_ttl_stagger: self_pg_connection_pool_ttl_stagger,
396 pg_connection_pool_connect_timeout: self_pg_connection_pool_connect_timeout,
397 pg_connection_pool_tcp_user_timeout: self_pg_connection_pool_tcp_user_timeout,
398 pg_connection_pool_keepalives_idle: self_pg_connection_pool_keepalives_idle,
399 pg_connection_pool_keepalives_interval: self_pg_connection_pool_keepalives_interval,
400 pg_connection_pool_keepalives_retries: self_pg_connection_pool_keepalives_retries,
401 } = self;
402 let Self {
403 pg_connection_pool_max_size: other_pg_connection_pool_max_size,
404 pg_connection_pool_max_wait: other_pg_connection_pool_max_wait,
405 pg_connection_pool_ttl: other_pg_connection_pool_ttl,
406 pg_connection_pool_ttl_stagger: other_pg_connection_pool_ttl_stagger,
407 pg_connection_pool_connect_timeout: other_pg_connection_pool_connect_timeout,
408 pg_connection_pool_tcp_user_timeout: other_pg_connection_pool_tcp_user_timeout,
409 pg_connection_pool_keepalives_idle: other_pg_connection_pool_keepalives_idle,
410 pg_connection_pool_keepalives_interval: other_pg_connection_pool_keepalives_interval,
411 pg_connection_pool_keepalives_retries: other_pg_connection_pool_keepalives_retries,
412 } = other;
413 if let Some(v) = other_pg_connection_pool_max_size {
414 *self_pg_connection_pool_max_size = Some(v);
415 }
416 if let Some(v) = other_pg_connection_pool_max_wait {
417 *self_pg_connection_pool_max_wait = Some(v);
418 }
419 if let Some(v) = other_pg_connection_pool_ttl {
420 *self_pg_connection_pool_ttl = Some(v);
421 }
422 if let Some(v) = other_pg_connection_pool_ttl_stagger {
423 *self_pg_connection_pool_ttl_stagger = Some(v);
424 }
425 if let Some(v) = other_pg_connection_pool_connect_timeout {
426 *self_pg_connection_pool_connect_timeout = Some(v);
427 }
428 if let Some(v) = other_pg_connection_pool_tcp_user_timeout {
429 *self_pg_connection_pool_tcp_user_timeout = Some(v);
430 }
431 if let Some(v) = other_pg_connection_pool_keepalives_idle {
432 *self_pg_connection_pool_keepalives_idle = Some(v);
433 }
434 if let Some(v) = other_pg_connection_pool_keepalives_interval {
435 *self_pg_connection_pool_keepalives_interval = Some(v);
436 }
437 if let Some(v) = other_pg_connection_pool_keepalives_retries {
438 *self_pg_connection_pool_keepalives_retries = Some(v);
439 }
440 }
441
442 pub fn apply(&self, cfg: &PostgresTimestampOracleConfig) {
448 info!(params = ?self, "Applying configuration update!");
449
450 let Self {
452 pg_connection_pool_max_size,
453 pg_connection_pool_max_wait,
454 pg_connection_pool_ttl,
455 pg_connection_pool_ttl_stagger,
456 pg_connection_pool_connect_timeout,
457 pg_connection_pool_tcp_user_timeout,
458 pg_connection_pool_keepalives_idle,
459 pg_connection_pool_keepalives_interval,
460 pg_connection_pool_keepalives_retries,
461 } = self;
462 if let Some(pg_connection_pool_max_size) = pg_connection_pool_max_size {
463 cfg.dynamic
464 .pg_connection_pool_max_size
465 .store(*pg_connection_pool_max_size, DynamicConfig::STORE_ORDERING);
466 }
467 if let Some(pg_connection_pool_max_wait) = pg_connection_pool_max_wait {
468 let mut max_wait = cfg
469 .dynamic
470 .pg_connection_pool_max_wait
471 .write()
472 .expect("lock poisoned");
473 *max_wait = *pg_connection_pool_max_wait;
474 }
475 if let Some(pg_connection_pool_ttl) = pg_connection_pool_ttl {
476 let mut ttl = cfg
477 .dynamic
478 .pg_connection_pool_ttl
479 .write()
480 .expect("lock poisoned");
481 *ttl = *pg_connection_pool_ttl;
482 }
483 if let Some(pg_connection_pool_ttl_stagger) = pg_connection_pool_ttl_stagger {
484 let mut ttl_stagger = cfg
485 .dynamic
486 .pg_connection_pool_ttl_stagger
487 .write()
488 .expect("lock poisoned");
489 *ttl_stagger = *pg_connection_pool_ttl_stagger;
490 }
491 if let Some(pg_connection_pool_connect_timeout) = pg_connection_pool_connect_timeout {
492 let mut timeout = cfg
493 .dynamic
494 .pg_connection_pool_connect_timeout
495 .write()
496 .expect("lock poisoned");
497 *timeout = *pg_connection_pool_connect_timeout;
498 }
499 if let Some(pg_connection_pool_tcp_user_timeout) = pg_connection_pool_tcp_user_timeout {
500 let mut timeout = cfg
501 .dynamic
502 .pg_connection_pool_tcp_user_timeout
503 .write()
504 .expect("lock poisoned");
505 *timeout = *pg_connection_pool_tcp_user_timeout;
506 }
507 if let Some(pg_connection_pool_keepalives_idle) = pg_connection_pool_keepalives_idle {
508 let mut timeout = cfg
509 .dynamic
510 .pg_connection_pool_keepalives_idle
511 .write()
512 .expect("lock poisoned");
513 *timeout = *pg_connection_pool_keepalives_idle;
514 }
515 if let Some(pg_connection_pool_keepalives_interval) = pg_connection_pool_keepalives_interval
516 {
517 let mut timeout = cfg
518 .dynamic
519 .pg_connection_pool_keepalives_interval
520 .write()
521 .expect("lock poisoned");
522 *timeout = *pg_connection_pool_keepalives_interval;
523 }
524 if let Some(pg_connection_pool_keepalives_retries) = pg_connection_pool_keepalives_retries {
525 cfg.dynamic.pg_connection_pool_keepalives_retries.store(
526 *pg_connection_pool_keepalives_retries,
527 DynamicConfig::STORE_ORDERING,
528 );
529 }
530 }
531}
532
533impl<N> PostgresTimestampOracle<N>
534where
535 N: GenericNowFn<Timestamp> + std::fmt::Debug + 'static,
536{
537 pub async fn open(
541 config: PostgresTimestampOracleConfig,
542 timeline: String,
543 initially: Timestamp,
544 next: N,
545 read_only: bool,
546 ) -> Self {
547 info!(config = ?config, "opening PostgresTimestampOracle");
548
549 let fallible = || async {
550 let metrics = Arc::clone(&config.metrics);
551
552 let pg_config: Config = config.url.to_string().parse()?;
554 let role = pg_config.get_user().unwrap();
555 let create_schema = format!(
556 "CREATE SCHEMA IF NOT EXISTS tsoracle AUTHORIZATION {}",
557 escape_identifier(role),
558 );
559
560 let postgres_client = PostgresClient::open(config.clone().into())?;
561
562 let client = postgres_client.get_connection().await?;
563
564 let crdb_mode = match client
565 .batch_execute(&format!(
566 "{}; {}{}; {}",
567 create_schema, SCHEMA, CRDB_SCHEMA_OPTIONS, CRDB_CONFIGURE_ZONE,
568 ))
569 .await
570 {
571 Ok(()) => true,
572 Err(e)
573 if e.code() == Some(&SqlState::INVALID_PARAMETER_VALUE)
574 || e.code() == Some(&SqlState::SYNTAX_ERROR) =>
575 {
576 info!(
577 "unable to initiate timestamp_oracle with CRDB params, this is expected and OK when running against Postgres: {:?}",
578 e
579 );
580 false
581 }
582 Err(e) => return Err(e.into()),
583 };
584
585 if !crdb_mode {
586 client
587 .batch_execute(&format!("{}; {};", create_schema, SCHEMA))
588 .await?;
589 }
590
591 let oracle = PostgresTimestampOracle {
592 timeline: timeline.clone(),
593 next: next.clone(),
594 postgres_client: Arc::new(postgres_client),
595 metrics,
596 read_only,
597 };
598
599 let q = r#"
605 INSERT INTO timestamp_oracle (timeline, read_ts, write_ts)
606 VALUES ($1, $2, $3)
607 ON CONFLICT (timeline) DO NOTHING;
608 "#;
609 let statement = client.prepare_cached(q).await?;
610
611 let initially_coerced = Self::ts_to_decimal(initially);
612 let _ = client
613 .execute(
614 &statement,
615 &[&oracle.timeline, &initially_coerced, &initially_coerced],
616 )
617 .await?;
618
619 if !read_only {
623 TimestampOracle::apply_write(&oracle, initially).await;
624 }
625
626 Result::<_, anyhow::Error>::Ok(oracle)
627 };
628
629 let metrics = &config.metrics.retries.open;
630
631 let oracle = retry_fallible(metrics, fallible).await;
632
633 oracle
634 }
635
636 async fn get_connection(&self) -> Result<Object, PoolError> {
637 self.postgres_client.get_connection().await
638 }
639
640 pub async fn get_all_timelines(
652 config: PostgresTimestampOracleConfig,
653 ) -> Result<Vec<(String, Timestamp)>, anyhow::Error> {
654 let fallible = || async {
655 let postgres_client = PostgresClient::open(config.clone().into())?;
656
657 let mut client = postgres_client.get_connection().await?;
658
659 let txn = client.transaction().await?;
660
661 let q = r#"
666 SELECT EXISTS (SELECT * FROM information_schema.tables WHERE table_name = 'timestamp_oracle' AND table_schema = CURRENT_SCHEMA);
667 "#;
668 let statement = txn.prepare(q).await?;
669 let exists_row = txn.query_one(&statement, &[]).await?;
670 let exists: bool = exists_row.try_get("exists").expect("missing exists column");
671 if !exists {
672 return Ok(Vec::new());
673 }
674
675 let q = r#"
676 SELECT timeline, GREATEST(read_ts, write_ts) as ts FROM timestamp_oracle;
677 "#;
678 let statement = txn.prepare(q).await?;
679 let rows = txn.query(&statement, &[]).await?;
680
681 txn.commit().await?;
682
683 let result = rows
684 .into_iter()
685 .map(|row| {
686 let timeline: String =
687 row.try_get("timeline").expect("missing timeline column");
688 let ts: Numeric = row.try_get("ts").expect("missing ts column");
689 let ts = Self::decimal_to_ts(ts);
690
691 (timeline, ts)
692 })
693 .collect::<Vec<_>>();
694
695 Ok(result)
696 };
697
698 let metrics = &config.metrics.retries.get_all_timelines;
699
700 let result = retry_fallible(metrics, fallible).await;
701
702 Ok(result)
703 }
704
705 #[mz_ore::instrument(name = "oracle::write_ts")]
706 async fn fallible_write_ts(&self) -> Result<WriteTimestamp<Timestamp>, anyhow::Error> {
707 if self.read_only {
708 panic!("attempting write_ts in read-only mode");
709 }
710
711 let proposed_next_ts = self.next.now();
712 let proposed_next_ts = Self::ts_to_decimal(proposed_next_ts);
713
714 let q = r#"
715 UPDATE timestamp_oracle SET write_ts = GREATEST(write_ts+1, $2)
716 WHERE timeline = $1
717 RETURNING write_ts;
718 "#;
719 let client = self.get_connection().await?;
720 let statement = client.prepare_cached(q).await?;
721 let result = client
722 .query_one(&statement, &[&self.timeline, &proposed_next_ts])
723 .await?;
724
725 let write_ts: Numeric = result.try_get("write_ts").expect("missing column write_ts");
726 let write_ts = Self::decimal_to_ts(write_ts);
727
728 debug!(
729 timeline = ?self.timeline,
730 write_ts = ?write_ts,
731 proposed_next_ts = ?proposed_next_ts,
732 "returning from write_ts()");
733
734 let advance_to = write_ts.step_forward();
735
736 Ok(WriteTimestamp {
737 timestamp: write_ts,
738 advance_to,
739 })
740 }
741
742 #[mz_ore::instrument(name = "oracle::peek_write_ts")]
743 async fn fallible_peek_write_ts(&self) -> Result<Timestamp, anyhow::Error> {
744 let q = r#"
745 SELECT write_ts FROM timestamp_oracle
746 WHERE timeline = $1;
747 "#;
748 let client = self.get_connection().await?;
749 let statement = client.prepare_cached(q).await?;
750 let result = client.query_one(&statement, &[&self.timeline]).await?;
751
752 let write_ts: Numeric = result.try_get("write_ts").expect("missing column write_ts");
753 let write_ts = Self::decimal_to_ts(write_ts);
754
755 debug!(
756 timeline = ?self.timeline,
757 write_ts = ?write_ts,
758 "returning from peek_write_ts()");
759
760 Ok(write_ts)
761 }
762
763 #[mz_ore::instrument(name = "oracle::read_ts")]
764 async fn fallible_read_ts(&self) -> Result<Timestamp, anyhow::Error> {
765 let q = r#"
766 SELECT read_ts FROM timestamp_oracle
767 WHERE timeline = $1;
768 "#;
769 let client = self.get_connection().await?;
770 let statement = client.prepare_cached(q).await?;
771 let result = client.query_one(&statement, &[&self.timeline]).await?;
772
773 let read_ts: Numeric = result.try_get("read_ts").expect("missing column read_ts");
774 let read_ts = Self::decimal_to_ts(read_ts);
775
776 debug!(
777 timeline = ?self.timeline,
778 read_ts = ?read_ts,
779 "returning from read_ts()");
780
781 Ok(read_ts)
782 }
783
784 #[mz_ore::instrument(name = "oracle::apply_write")]
785 async fn fallible_apply_write(&self, write_ts: Timestamp) -> Result<(), anyhow::Error> {
786 if self.read_only {
787 panic!("attempting apply_write in read-only mode");
788 }
789
790 let q = r#"
791 UPDATE timestamp_oracle SET write_ts = GREATEST(write_ts, $2), read_ts = GREATEST(read_ts, $2)
792 WHERE timeline = $1;
793 "#;
794 let client = self.get_connection().await?;
795 let statement = client.prepare_cached(q).await?;
796 let write_ts = Self::ts_to_decimal(write_ts);
797
798 let _ = client
799 .execute(&statement, &[&self.timeline, &write_ts])
800 .await?;
801
802 debug!(
803 timeline = ?self.timeline,
804 write_ts = ?write_ts,
805 "returning from apply_write()");
806
807 Ok(())
808 }
809
810 fn ts_to_decimal(ts: Timestamp) -> Numeric {
813 let decimal = Decimal::from(ts);
814 Numeric::from(decimal)
815 }
816
817 fn decimal_to_ts(ts: Numeric) -> Timestamp {
820 ts.0.0.try_into().expect("we only use u64 timestamps")
821 }
822}
823
824#[async_trait]
832impl<N> TimestampOracle<Timestamp> for PostgresTimestampOracle<N>
833where
834 N: GenericNowFn<Timestamp> + std::fmt::Debug + 'static,
835{
836 #[instrument]
837 async fn write_ts(&self) -> WriteTimestamp<Timestamp> {
838 let metrics = &self.metrics.retries.write_ts;
839
840 let res = retry_fallible(metrics, || {
841 self.metrics
842 .oracle
843 .write_ts
844 .run_op(|| self.fallible_write_ts())
845 })
846 .await;
847
848 res
849 }
850
851 #[instrument]
852 async fn peek_write_ts(&self) -> Timestamp {
853 let metrics = &self.metrics.retries.peek_write_ts;
854
855 let res = retry_fallible(metrics, || {
856 self.metrics
857 .oracle
858 .peek_write_ts
859 .run_op(|| self.fallible_peek_write_ts())
860 })
861 .await;
862
863 res
864 }
865
866 #[instrument]
867 async fn read_ts(&self) -> Timestamp {
868 let metrics = &self.metrics.retries.read_ts;
869
870 let res = retry_fallible(metrics, || {
871 self.metrics
872 .oracle
873 .read_ts
874 .run_op(|| self.fallible_read_ts())
875 })
876 .await;
877
878 res
879 }
880
881 #[instrument]
882 async fn apply_write(&self, write_ts: Timestamp) {
883 let metrics = &self.metrics.retries.apply_write;
884
885 let res = retry_fallible(metrics, || {
886 self.metrics
887 .oracle
888 .apply_write
889 .run_op(|| self.fallible_apply_write(write_ts.clone()))
890 })
891 .await;
892
893 res
894 }
895}
896
897pub const INFO_MIN_ATTEMPTS: usize = 3;
898
899pub async fn retry_fallible<R, F, WorkFn>(metrics: &RetryMetrics, mut work_fn: WorkFn) -> R
900where
901 F: std::future::Future<Output = Result<R, anyhow::Error>>,
902 WorkFn: FnMut() -> F,
903{
904 let mut retry = metrics.stream(Retry::oracle_defaults(SystemTime::now()).into_retry_stream());
905 loop {
906 match work_fn().await {
907 Ok(x) => {
908 if retry.attempt() > 0 {
909 debug!(
910 "external operation {} succeeded after failing at least once",
911 metrics.name,
912 );
913 }
914 return x;
915 }
916 Err(err)
917 if err
918 .to_string()
919 .contains("\"timestamp_oracle\" does not exist") =>
920 {
921 panic!(
930 "external operation {} failed unrecoverably, someone removed our database/schema/table: {}",
931 metrics.name,
932 err.display_with_causes()
933 );
934 }
935 Err(err) => {
936 if retry.attempt() >= INFO_MIN_ATTEMPTS {
937 info!(
938 "external operation {} failed, retrying in {:?}: {}",
939 metrics.name,
940 retry.next_sleep(),
941 err.display_with_causes()
942 );
943 } else {
944 info!(
945 "external operation {} failed, retrying in {:?}: {}",
946 metrics.name,
947 retry.next_sleep(),
948 err.display_with_causes()
949 );
950 }
951 retry = retry.sleep().await;
952 }
953 }
954 }
955}
956
957#[cfg(test)]
958mod tests {
959 use super::*;
960
961 #[mz_ore::test(tokio::test)]
962 #[cfg_attr(miri, ignore)] async fn test_postgres_timestamp_oracle() -> Result<(), anyhow::Error> {
964 let config = match PostgresTimestampOracleConfig::new_for_test() {
965 Some(config) => config,
966 None => {
967 info!(
968 "{} env not set: skipping test that uses external service",
969 PostgresTimestampOracleConfig::EXTERNAL_TESTS_POSTGRES_URL
970 );
971 return Ok(());
972 }
973 };
974
975 crate::tests::timestamp_oracle_impl_test(|timeline, now_fn, initial_ts| {
976 let oracle = PostgresTimestampOracle::open(
977 config.clone(),
978 timeline,
979 initial_ts,
980 now_fn,
981 false, );
983
984 async {
985 let arced_oracle: Arc<dyn TimestampOracle<Timestamp> + Send + Sync> =
986 Arc::new(oracle.await);
987
988 arced_oracle
989 }
990 })
991 .await?;
992
993 Ok(())
994 }
995}