1use std::str::FromStr;
15use std::sync::atomic::{AtomicU32, AtomicUsize, Ordering};
16use std::sync::{Arc, RwLock};
17use std::time::{Duration, SystemTime};
18
19use async_trait::async_trait;
20use deadpool_postgres::tokio_postgres::Config;
21use deadpool_postgres::tokio_postgres::error::SqlState;
22use deadpool_postgres::tokio_postgres::types::ToSql;
23use deadpool_postgres::tokio_postgres::{Row, Statement};
24use deadpool_postgres::{Object, PoolError};
25use dec::Decimal;
26use mz_adapter_types::timestamp_oracle::{
27 DEFAULT_PG_TIMESTAMP_ORACLE_CONNECT_TIMEOUT, DEFAULT_PG_TIMESTAMP_ORACLE_CONNPOOL_MAX_SIZE,
28 DEFAULT_PG_TIMESTAMP_ORACLE_CONNPOOL_MAX_WAIT, DEFAULT_PG_TIMESTAMP_ORACLE_CONNPOOL_TTL,
29 DEFAULT_PG_TIMESTAMP_ORACLE_CONNPOOL_TTL_STAGGER, DEFAULT_PG_TIMESTAMP_ORACLE_KEEPALIVES_IDLE,
30 DEFAULT_PG_TIMESTAMP_ORACLE_KEEPALIVES_INTERVAL,
31 DEFAULT_PG_TIMESTAMP_ORACLE_KEEPALIVES_RETRIES, DEFAULT_PG_TIMESTAMP_ORACLE_TCP_USER_TIMEOUT,
32};
33use mz_ore::error::ErrorExt;
34use mz_ore::instrument;
35use mz_ore::metrics::MetricsRegistry;
36use mz_ore::url::SensitiveUrl;
37use mz_pgrepr::Numeric;
38use mz_postgres_client::{PostgresClient, PostgresClientConfig, PostgresClientKnobs};
39use mz_repr::Timestamp;
40use postgres_protocol::escape::escape_identifier;
41use serde::{Deserialize, Serialize};
42use tracing::{debug, info};
43
44use crate::WriteTimestamp;
45use crate::metrics::{Metrics, RetryMetrics};
46use crate::retry::Retry;
47use crate::{GenericNowFn, TimestampOracle};
48
49const SCHEMA: &str = "
53CREATE TABLE IF NOT EXISTS timestamp_oracle (
54 timeline text NOT NULL,
55 read_ts DECIMAL(20,0) NOT NULL,
56 write_ts DECIMAL(20,0) NOT NULL,
57 PRIMARY KEY(timeline)
58)
59";
60
61const CRDB_SCHEMA_OPTIONS: &str = "WITH (sql_stats_automatic_collection_enabled = false)";
68const CRDB_CONFIGURE_ZONE: &str =
77 "ALTER TABLE timestamp_oracle CONFIGURE ZONE USING gc.ttlseconds = 600;";
78
79async fn pg_batch_execute(
82 client: &Object,
83 query: &str,
84) -> Result<(), deadpool_postgres::tokio_postgres::Error> {
85 #[allow(clippy::disallowed_methods)]
86 client.batch_execute(query).await
87}
88
89async fn pg_query_one_prepared(
90 client: &Object,
91 statement: &Statement,
92 params: &[&(dyn ToSql + Sync)],
93) -> Result<Row, deadpool_postgres::tokio_postgres::Error> {
94 #[allow(clippy::disallowed_methods)]
95 client.query_one(statement, params).await
96}
97
98async fn pg_execute_prepared(
99 client: &Object,
100 statement: &Statement,
101 params: &[&(dyn ToSql + Sync)],
102) -> Result<u64, deadpool_postgres::tokio_postgres::Error> {
103 #[allow(clippy::disallowed_methods)]
104 client.execute(statement, params).await
105}
106
107async fn pg_txn_query_prepared(
108 txn: &deadpool_postgres::Transaction<'_>,
109 statement: &Statement,
110 params: &[&(dyn ToSql + Sync)],
111) -> Result<Vec<Row>, deadpool_postgres::tokio_postgres::Error> {
112 #[allow(clippy::disallowed_methods)]
113 txn.query(statement, params).await
114}
115
116async fn pg_txn_query_one_prepared(
117 txn: &deadpool_postgres::Transaction<'_>,
118 statement: &Statement,
119 params: &[&(dyn ToSql + Sync)],
120) -> Result<Row, deadpool_postgres::tokio_postgres::Error> {
121 #[allow(clippy::disallowed_methods)]
122 txn.query_one(statement, params).await
123}
124
125#[derive(Debug)]
127pub struct PostgresTimestampOracle<N>
128where
129 N: GenericNowFn<Timestamp>,
130{
131 timeline: String,
132 next: N,
133 postgres_client: Arc<PostgresClient>,
134 metrics: Arc<Metrics>,
135 read_only: bool,
138}
139
140#[derive(Clone, Debug)]
143pub struct PostgresTimestampOracleConfig {
144 url: SensitiveUrl,
145 metrics: Arc<Metrics>,
146
147 pub dynamic: Arc<DynamicConfig>,
149}
150
151impl From<PostgresTimestampOracleConfig> for PostgresClientConfig {
152 fn from(config: PostgresTimestampOracleConfig) -> Self {
153 let metrics = config.metrics.postgres_client.clone();
154 PostgresClientConfig::new(config.url.clone(), Arc::new(config), metrics)
155 }
156}
157
158impl PostgresTimestampOracleConfig {
159 pub(crate) const EXTERNAL_TESTS_POSTGRES_URL: &'static str = "METADATA_BACKEND_URL";
160
161 pub fn new(url: &SensitiveUrl, metrics_registry: &MetricsRegistry) -> Self {
163 let metrics = Arc::new(Metrics::new(metrics_registry));
164
165 let dynamic = DynamicConfig::default();
166
167 PostgresTimestampOracleConfig {
168 url: url.clone(),
169 metrics,
170 dynamic: Arc::new(dynamic),
171 }
172 }
173
174 pub fn new_for_test() -> Option<Self> {
183 let url = match std::env::var(Self::EXTERNAL_TESTS_POSTGRES_URL) {
184 Ok(url) => SensitiveUrl::from_str(&url).expect("invalid Postgres URL"),
185 Err(_) => {
186 if mz_ore::env::is_var_truthy("CI") {
187 panic!("CI is supposed to run this test but something has gone wrong!");
188 }
189 return None;
190 }
191 };
192
193 let dynamic = DynamicConfig::default();
194
195 let config = PostgresTimestampOracleConfig {
196 url,
197 metrics: Arc::new(Metrics::new(&MetricsRegistry::new())),
198 dynamic: Arc::new(dynamic),
199 };
200
201 Some(config)
202 }
203
204 pub(crate) fn metrics(&self) -> &Arc<Metrics> {
206 &self.metrics
207 }
208}
209
210#[derive(Debug)]
222pub struct DynamicConfig {
223 pg_connection_pool_max_size: AtomicUsize,
225
226 pg_connection_pool_max_wait: RwLock<Option<Duration>>,
228
229 pg_connection_pool_ttl: RwLock<Duration>,
233
234 pg_connection_pool_ttl_stagger: RwLock<Duration>,
242
243 pg_connection_pool_connect_timeout: RwLock<Duration>,
246
247 pg_connection_pool_tcp_user_timeout: RwLock<Duration>,
251
252 pg_connection_pool_keepalives_idle: RwLock<Duration>,
255
256 pg_connection_pool_keepalives_interval: RwLock<Duration>,
259
260 pg_connection_pool_keepalives_retries: AtomicU32,
263}
264
265impl Default for DynamicConfig {
266 fn default() -> Self {
267 Self {
268 pg_connection_pool_max_size: AtomicUsize::new(
272 DEFAULT_PG_TIMESTAMP_ORACLE_CONNPOOL_MAX_SIZE,
273 ),
274 pg_connection_pool_max_wait: RwLock::new(Some(
275 DEFAULT_PG_TIMESTAMP_ORACLE_CONNPOOL_MAX_WAIT,
276 )),
277 pg_connection_pool_ttl: RwLock::new(DEFAULT_PG_TIMESTAMP_ORACLE_CONNPOOL_TTL),
278 pg_connection_pool_ttl_stagger: RwLock::new(
279 DEFAULT_PG_TIMESTAMP_ORACLE_CONNPOOL_TTL_STAGGER,
280 ),
281 pg_connection_pool_connect_timeout: RwLock::new(
282 DEFAULT_PG_TIMESTAMP_ORACLE_CONNECT_TIMEOUT,
283 ),
284 pg_connection_pool_tcp_user_timeout: RwLock::new(
285 DEFAULT_PG_TIMESTAMP_ORACLE_TCP_USER_TIMEOUT,
286 ),
287 pg_connection_pool_keepalives_idle: RwLock::new(
288 DEFAULT_PG_TIMESTAMP_ORACLE_KEEPALIVES_IDLE,
289 ),
290 pg_connection_pool_keepalives_interval: RwLock::new(
291 DEFAULT_PG_TIMESTAMP_ORACLE_KEEPALIVES_INTERVAL,
292 ),
293 pg_connection_pool_keepalives_retries: AtomicU32::new(
294 DEFAULT_PG_TIMESTAMP_ORACLE_KEEPALIVES_RETRIES,
295 ),
296 }
297 }
298}
299
300impl DynamicConfig {
301 const LOAD_ORDERING: Ordering = Ordering::SeqCst;
303 const STORE_ORDERING: Ordering = Ordering::SeqCst;
304
305 fn connection_pool_max_size(&self) -> usize {
306 self.pg_connection_pool_max_size.load(Self::LOAD_ORDERING)
307 }
308
309 fn connection_pool_max_wait(&self) -> Option<Duration> {
310 *self
311 .pg_connection_pool_max_wait
312 .read()
313 .expect("lock poisoned")
314 }
315
316 fn connection_pool_ttl(&self) -> Duration {
317 *self.pg_connection_pool_ttl.read().expect("lock poisoned")
318 }
319
320 fn connection_pool_ttl_stagger(&self) -> Duration {
321 *self
322 .pg_connection_pool_ttl_stagger
323 .read()
324 .expect("lock poisoned")
325 }
326
327 fn connect_timeout(&self) -> Duration {
328 *self
329 .pg_connection_pool_connect_timeout
330 .read()
331 .expect("lock poisoned")
332 }
333
334 fn tcp_user_timeout(&self) -> Duration {
335 *self
336 .pg_connection_pool_tcp_user_timeout
337 .read()
338 .expect("lock poisoned")
339 }
340
341 fn keepalives_idle(&self) -> Duration {
342 *self
343 .pg_connection_pool_keepalives_idle
344 .read()
345 .expect("lock poisoned")
346 }
347
348 fn keepalives_interval(&self) -> Duration {
349 *self
350 .pg_connection_pool_keepalives_interval
351 .read()
352 .expect("lock poisoned")
353 }
354
355 fn keepalives_retries(&self) -> u32 {
356 self.pg_connection_pool_keepalives_retries
357 .load(Self::LOAD_ORDERING)
358 }
359}
360
361impl PostgresClientKnobs for PostgresTimestampOracleConfig {
362 fn connection_pool_max_size(&self) -> usize {
363 self.dynamic.connection_pool_max_size()
364 }
365
366 fn connection_pool_max_wait(&self) -> Option<Duration> {
367 self.dynamic.connection_pool_max_wait()
368 }
369
370 fn connection_pool_ttl(&self) -> Duration {
371 self.dynamic.connection_pool_ttl()
372 }
373
374 fn connection_pool_ttl_stagger(&self) -> Duration {
375 self.dynamic.connection_pool_ttl_stagger()
376 }
377
378 fn connect_timeout(&self) -> Duration {
379 self.dynamic.connect_timeout()
380 }
381
382 fn tcp_user_timeout(&self) -> Duration {
383 self.dynamic.tcp_user_timeout()
384 }
385
386 fn keepalives_idle(&self) -> Duration {
387 self.dynamic.keepalives_idle()
388 }
389
390 fn keepalives_interval(&self) -> Duration {
391 self.dynamic.keepalives_interval()
392 }
393
394 fn keepalives_retries(&self) -> u32 {
395 self.dynamic.keepalives_retries()
396 }
397}
398
399#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
406pub struct TimestampOracleParameters {
407 pub pg_connection_pool_max_size: Option<usize>,
409 pub pg_connection_pool_max_wait: Option<Option<Duration>>,
418 pub pg_connection_pool_ttl: Option<Duration>,
420 pub pg_connection_pool_ttl_stagger: Option<Duration>,
422 pub pg_connection_pool_connect_timeout: Option<Duration>,
424 pub pg_connection_pool_tcp_user_timeout: Option<Duration>,
426 pub pg_connection_pool_keepalives_idle: Option<Duration>,
428 pub pg_connection_pool_keepalives_interval: Option<Duration>,
430 pub pg_connection_pool_keepalives_retries: Option<u32>,
432}
433
434impl TimestampOracleParameters {
435 pub fn update(&mut self, other: TimestampOracleParameters) {
437 let Self {
440 pg_connection_pool_max_size: self_pg_connection_pool_max_size,
441 pg_connection_pool_max_wait: self_pg_connection_pool_max_wait,
442 pg_connection_pool_ttl: self_pg_connection_pool_ttl,
443 pg_connection_pool_ttl_stagger: self_pg_connection_pool_ttl_stagger,
444 pg_connection_pool_connect_timeout: self_pg_connection_pool_connect_timeout,
445 pg_connection_pool_tcp_user_timeout: self_pg_connection_pool_tcp_user_timeout,
446 pg_connection_pool_keepalives_idle: self_pg_connection_pool_keepalives_idle,
447 pg_connection_pool_keepalives_interval: self_pg_connection_pool_keepalives_interval,
448 pg_connection_pool_keepalives_retries: self_pg_connection_pool_keepalives_retries,
449 } = self;
450 let Self {
451 pg_connection_pool_max_size: other_pg_connection_pool_max_size,
452 pg_connection_pool_max_wait: other_pg_connection_pool_max_wait,
453 pg_connection_pool_ttl: other_pg_connection_pool_ttl,
454 pg_connection_pool_ttl_stagger: other_pg_connection_pool_ttl_stagger,
455 pg_connection_pool_connect_timeout: other_pg_connection_pool_connect_timeout,
456 pg_connection_pool_tcp_user_timeout: other_pg_connection_pool_tcp_user_timeout,
457 pg_connection_pool_keepalives_idle: other_pg_connection_pool_keepalives_idle,
458 pg_connection_pool_keepalives_interval: other_pg_connection_pool_keepalives_interval,
459 pg_connection_pool_keepalives_retries: other_pg_connection_pool_keepalives_retries,
460 } = other;
461 if let Some(v) = other_pg_connection_pool_max_size {
462 *self_pg_connection_pool_max_size = Some(v);
463 }
464 if let Some(v) = other_pg_connection_pool_max_wait {
465 *self_pg_connection_pool_max_wait = Some(v);
466 }
467 if let Some(v) = other_pg_connection_pool_ttl {
468 *self_pg_connection_pool_ttl = Some(v);
469 }
470 if let Some(v) = other_pg_connection_pool_ttl_stagger {
471 *self_pg_connection_pool_ttl_stagger = Some(v);
472 }
473 if let Some(v) = other_pg_connection_pool_connect_timeout {
474 *self_pg_connection_pool_connect_timeout = Some(v);
475 }
476 if let Some(v) = other_pg_connection_pool_tcp_user_timeout {
477 *self_pg_connection_pool_tcp_user_timeout = Some(v);
478 }
479 if let Some(v) = other_pg_connection_pool_keepalives_idle {
480 *self_pg_connection_pool_keepalives_idle = Some(v);
481 }
482 if let Some(v) = other_pg_connection_pool_keepalives_interval {
483 *self_pg_connection_pool_keepalives_interval = Some(v);
484 }
485 if let Some(v) = other_pg_connection_pool_keepalives_retries {
486 *self_pg_connection_pool_keepalives_retries = Some(v);
487 }
488 }
489
490 pub fn apply(&self, cfg: &PostgresTimestampOracleConfig) {
496 info!(params = ?self, "Applying configuration update!");
497
498 let Self {
500 pg_connection_pool_max_size,
501 pg_connection_pool_max_wait,
502 pg_connection_pool_ttl,
503 pg_connection_pool_ttl_stagger,
504 pg_connection_pool_connect_timeout,
505 pg_connection_pool_tcp_user_timeout,
506 pg_connection_pool_keepalives_idle,
507 pg_connection_pool_keepalives_interval,
508 pg_connection_pool_keepalives_retries,
509 } = self;
510 if let Some(pg_connection_pool_max_size) = pg_connection_pool_max_size {
511 cfg.dynamic
512 .pg_connection_pool_max_size
513 .store(*pg_connection_pool_max_size, DynamicConfig::STORE_ORDERING);
514 }
515 if let Some(pg_connection_pool_max_wait) = pg_connection_pool_max_wait {
516 let mut max_wait = cfg
517 .dynamic
518 .pg_connection_pool_max_wait
519 .write()
520 .expect("lock poisoned");
521 *max_wait = *pg_connection_pool_max_wait;
522 }
523 if let Some(pg_connection_pool_ttl) = pg_connection_pool_ttl {
524 let mut ttl = cfg
525 .dynamic
526 .pg_connection_pool_ttl
527 .write()
528 .expect("lock poisoned");
529 *ttl = *pg_connection_pool_ttl;
530 }
531 if let Some(pg_connection_pool_ttl_stagger) = pg_connection_pool_ttl_stagger {
532 let mut ttl_stagger = cfg
533 .dynamic
534 .pg_connection_pool_ttl_stagger
535 .write()
536 .expect("lock poisoned");
537 *ttl_stagger = *pg_connection_pool_ttl_stagger;
538 }
539 if let Some(pg_connection_pool_connect_timeout) = pg_connection_pool_connect_timeout {
540 let mut timeout = cfg
541 .dynamic
542 .pg_connection_pool_connect_timeout
543 .write()
544 .expect("lock poisoned");
545 *timeout = *pg_connection_pool_connect_timeout;
546 }
547 if let Some(pg_connection_pool_tcp_user_timeout) = pg_connection_pool_tcp_user_timeout {
548 let mut timeout = cfg
549 .dynamic
550 .pg_connection_pool_tcp_user_timeout
551 .write()
552 .expect("lock poisoned");
553 *timeout = *pg_connection_pool_tcp_user_timeout;
554 }
555 if let Some(pg_connection_pool_keepalives_idle) = pg_connection_pool_keepalives_idle {
556 let mut timeout = cfg
557 .dynamic
558 .pg_connection_pool_keepalives_idle
559 .write()
560 .expect("lock poisoned");
561 *timeout = *pg_connection_pool_keepalives_idle;
562 }
563 if let Some(pg_connection_pool_keepalives_interval) = pg_connection_pool_keepalives_interval
564 {
565 let mut timeout = cfg
566 .dynamic
567 .pg_connection_pool_keepalives_interval
568 .write()
569 .expect("lock poisoned");
570 *timeout = *pg_connection_pool_keepalives_interval;
571 }
572 if let Some(pg_connection_pool_keepalives_retries) = pg_connection_pool_keepalives_retries {
573 cfg.dynamic.pg_connection_pool_keepalives_retries.store(
574 *pg_connection_pool_keepalives_retries,
575 DynamicConfig::STORE_ORDERING,
576 );
577 }
578 }
579}
580
581impl<N> PostgresTimestampOracle<N>
582where
583 N: GenericNowFn<Timestamp> + std::fmt::Debug + 'static,
584{
585 pub async fn open(
589 config: PostgresTimestampOracleConfig,
590 timeline: String,
591 initially: Timestamp,
592 next: N,
593 read_only: bool,
594 ) -> Self {
595 info!(config = ?config, "opening PostgresTimestampOracle");
596
597 let fallible = || async {
598 let metrics = Arc::clone(&config.metrics);
599
600 let pg_config: Config = config.url.to_string().parse()?;
602 let role = pg_config.get_user().unwrap();
603 let create_schema = format!(
604 "CREATE SCHEMA IF NOT EXISTS tsoracle AUTHORIZATION {}",
605 escape_identifier(role),
606 );
607
608 let postgres_client = PostgresClient::open(config.clone().into())?;
609
610 let client = postgres_client.get_connection().await?;
611
612 let crdb_mode = match pg_batch_execute(
613 &client,
614 &format!(
615 "{}; {}{}; {}",
616 create_schema, SCHEMA, CRDB_SCHEMA_OPTIONS, CRDB_CONFIGURE_ZONE,
617 ),
618 )
619 .await
620 {
621 Ok(()) => true,
622 Err(e)
623 if e.code() == Some(&SqlState::INVALID_PARAMETER_VALUE)
624 || e.code() == Some(&SqlState::SYNTAX_ERROR) =>
625 {
626 info!(
627 "unable to initiate timestamp_oracle with CRDB params, this is expected and OK when running against Postgres: {:?}",
628 e
629 );
630 false
631 }
632 Err(e) => return Err(e.into()),
633 };
634
635 if !crdb_mode {
636 pg_batch_execute(&client, &format!("{}; {};", create_schema, SCHEMA)).await?;
637 }
638
639 let oracle = PostgresTimestampOracle {
640 timeline: timeline.clone(),
641 next: next.clone(),
642 postgres_client: Arc::new(postgres_client),
643 metrics,
644 read_only,
645 };
646
647 let q = r#"
653 INSERT INTO timestamp_oracle (timeline, read_ts, write_ts)
654 VALUES ($1, $2, $3)
655 ON CONFLICT (timeline) DO NOTHING;
656 "#;
657 let statement = client.prepare_cached(q).await?;
658
659 let initially_coerced = Self::ts_to_decimal(initially);
660 let _ = pg_execute_prepared(
661 &client,
662 &statement,
663 &[&oracle.timeline, &initially_coerced, &initially_coerced],
664 )
665 .await?;
666
667 if !read_only {
671 TimestampOracle::apply_write(&oracle, initially).await;
672 }
673
674 Result::<_, anyhow::Error>::Ok(oracle)
675 };
676
677 let metrics = &config.metrics.retries.open;
678
679 let oracle = retry_fallible(metrics, fallible).await;
680
681 oracle
682 }
683
684 async fn get_connection(&self) -> Result<Object, PoolError> {
685 self.postgres_client.get_connection().await
686 }
687
688 pub async fn get_all_timelines(
700 config: PostgresTimestampOracleConfig,
701 ) -> Result<Vec<(String, Timestamp)>, anyhow::Error> {
702 let fallible = || async {
703 let postgres_client = PostgresClient::open(config.clone().into())?;
704
705 let mut client = postgres_client.get_connection().await?;
706
707 let txn = client.transaction().await?;
708
709 let q = r#"
714 SELECT EXISTS (SELECT * FROM information_schema.tables WHERE table_name = 'timestamp_oracle' AND table_schema = CURRENT_SCHEMA);
715 "#;
716 let statement = txn.prepare(q).await?;
717 let exists_row = pg_txn_query_one_prepared(&txn, &statement, &[]).await?;
718 let exists: bool = exists_row.try_get("exists").expect("missing exists column");
719 if !exists {
720 return Ok(Vec::new());
721 }
722
723 let q = r#"
724 SELECT timeline, GREATEST(read_ts, write_ts) as ts FROM timestamp_oracle;
725 "#;
726 let statement = txn.prepare(q).await?;
727 let rows = pg_txn_query_prepared(&txn, &statement, &[]).await?;
728
729 txn.commit().await?;
730
731 let result = rows
732 .into_iter()
733 .map(|row| {
734 let timeline: String =
735 row.try_get("timeline").expect("missing timeline column");
736 let ts: Numeric = row.try_get("ts").expect("missing ts column");
737 let ts = Self::decimal_to_ts(ts);
738
739 (timeline, ts)
740 })
741 .collect::<Vec<_>>();
742
743 Ok(result)
744 };
745
746 let metrics = &config.metrics.retries.get_all_timelines;
747
748 let result = retry_fallible(metrics, fallible).await;
749
750 Ok(result)
751 }
752
753 #[mz_ore::instrument(name = "oracle::write_ts")]
754 async fn fallible_write_ts(&self) -> Result<WriteTimestamp<Timestamp>, anyhow::Error> {
755 if self.read_only {
756 panic!("attempting write_ts in read-only mode");
757 }
758
759 let proposed_next_ts = self.next.now();
760 let proposed_next_ts = Self::ts_to_decimal(proposed_next_ts);
761
762 let q = r#"
763 UPDATE timestamp_oracle SET write_ts = GREATEST(write_ts+1, $2)
764 WHERE timeline = $1
765 RETURNING write_ts;
766 "#;
767 let client = self.get_connection().await?;
768 let statement = client.prepare_cached(q).await?;
769 let result =
770 pg_query_one_prepared(&client, &statement, &[&self.timeline, &proposed_next_ts])
771 .await?;
772
773 let write_ts: Numeric = result.try_get("write_ts").expect("missing column write_ts");
774 let write_ts = Self::decimal_to_ts(write_ts);
775
776 debug!(
777 timeline = ?self.timeline,
778 write_ts = ?write_ts,
779 proposed_next_ts = ?proposed_next_ts,
780 "returning from write_ts()");
781
782 let advance_to = write_ts.step_forward();
783
784 Ok(WriteTimestamp {
785 timestamp: write_ts,
786 advance_to,
787 })
788 }
789
790 #[mz_ore::instrument(name = "oracle::peek_write_ts")]
791 async fn fallible_peek_write_ts(&self) -> Result<Timestamp, anyhow::Error> {
792 let q = r#"
793 SELECT write_ts FROM timestamp_oracle
794 WHERE timeline = $1;
795 "#;
796 let client = self.get_connection().await?;
797 let statement = client.prepare_cached(q).await?;
798 let result = pg_query_one_prepared(&client, &statement, &[&self.timeline]).await?;
799
800 let write_ts: Numeric = result.try_get("write_ts").expect("missing column write_ts");
801 let write_ts = Self::decimal_to_ts(write_ts);
802
803 debug!(
804 timeline = ?self.timeline,
805 write_ts = ?write_ts,
806 "returning from peek_write_ts()");
807
808 Ok(write_ts)
809 }
810
811 #[mz_ore::instrument(name = "oracle::read_ts")]
812 async fn fallible_read_ts(&self) -> Result<Timestamp, anyhow::Error> {
813 let q = r#"
814 SELECT read_ts FROM timestamp_oracle
815 WHERE timeline = $1;
816 "#;
817 let client = self.get_connection().await?;
818 let statement = client.prepare_cached(q).await?;
819 let result = pg_query_one_prepared(&client, &statement, &[&self.timeline]).await?;
820
821 let read_ts: Numeric = result.try_get("read_ts").expect("missing column read_ts");
822 let read_ts = Self::decimal_to_ts(read_ts);
823
824 debug!(
825 timeline = ?self.timeline,
826 read_ts = ?read_ts,
827 "returning from read_ts()");
828
829 Ok(read_ts)
830 }
831
832 #[mz_ore::instrument(name = "oracle::apply_write")]
833 async fn fallible_apply_write(&self, write_ts: Timestamp) -> Result<(), anyhow::Error> {
834 if self.read_only {
835 panic!("attempting apply_write in read-only mode");
836 }
837
838 let q = r#"
839 UPDATE timestamp_oracle SET write_ts = GREATEST(write_ts, $2), read_ts = GREATEST(read_ts, $2)
840 WHERE timeline = $1;
841 "#;
842 let client = self.get_connection().await?;
843 let statement = client.prepare_cached(q).await?;
844 let write_ts = Self::ts_to_decimal(write_ts);
845
846 let _ = pg_execute_prepared(&client, &statement, &[&self.timeline, &write_ts]).await?;
847
848 debug!(
849 timeline = ?self.timeline,
850 write_ts = ?write_ts,
851 "returning from apply_write()");
852
853 Ok(())
854 }
855
856 fn ts_to_decimal(ts: Timestamp) -> Numeric {
859 let decimal = Decimal::from(ts);
860 Numeric::from(decimal)
861 }
862
863 fn decimal_to_ts(ts: Numeric) -> Timestamp {
866 ts.0.0.try_into().expect("we only use u64 timestamps")
867 }
868}
869
870#[async_trait]
878impl<N> TimestampOracle<Timestamp> for PostgresTimestampOracle<N>
879where
880 N: GenericNowFn<Timestamp> + std::fmt::Debug + 'static,
881{
882 #[instrument]
883 async fn write_ts(&self) -> WriteTimestamp<Timestamp> {
884 let metrics = &self.metrics.retries.write_ts;
885
886 let res = retry_fallible(metrics, || {
887 self.metrics
888 .oracle
889 .write_ts
890 .run_op(|| self.fallible_write_ts())
891 })
892 .await;
893
894 res
895 }
896
897 #[instrument]
898 async fn peek_write_ts(&self) -> Timestamp {
899 let metrics = &self.metrics.retries.peek_write_ts;
900
901 let res = retry_fallible(metrics, || {
902 self.metrics
903 .oracle
904 .peek_write_ts
905 .run_op(|| self.fallible_peek_write_ts())
906 })
907 .await;
908
909 res
910 }
911
912 #[instrument]
913 async fn read_ts(&self) -> Timestamp {
914 let metrics = &self.metrics.retries.read_ts;
915
916 let res = retry_fallible(metrics, || {
917 self.metrics
918 .oracle
919 .read_ts
920 .run_op(|| self.fallible_read_ts())
921 })
922 .await;
923
924 res
925 }
926
927 #[instrument]
928 async fn apply_write(&self, write_ts: Timestamp) {
929 let metrics = &self.metrics.retries.apply_write;
930
931 let res = retry_fallible(metrics, || {
932 self.metrics
933 .oracle
934 .apply_write
935 .run_op(|| self.fallible_apply_write(write_ts.clone()))
936 })
937 .await;
938
939 res
940 }
941}
942
943pub const INFO_MIN_ATTEMPTS: usize = 3;
944
945pub async fn retry_fallible<R, F, WorkFn>(metrics: &RetryMetrics, mut work_fn: WorkFn) -> R
946where
947 F: std::future::Future<Output = Result<R, anyhow::Error>>,
948 WorkFn: FnMut() -> F,
949{
950 let mut retry = metrics.stream(Retry::oracle_defaults(SystemTime::now()).into_retry_stream());
951 loop {
952 match work_fn().await {
953 Ok(x) => {
954 if retry.attempt() > 0 {
955 debug!(
956 "external operation {} succeeded after failing at least once",
957 metrics.name,
958 );
959 }
960 return x;
961 }
962 Err(err)
963 if err
964 .to_string()
965 .contains("\"timestamp_oracle\" does not exist") =>
966 {
967 panic!(
976 "external operation {} failed unrecoverably, someone removed our database/schema/table: {}",
977 metrics.name,
978 err.display_with_causes()
979 );
980 }
981 Err(err) => {
982 if retry.attempt() >= INFO_MIN_ATTEMPTS {
983 info!(
984 "external operation {} failed, retrying in {:?}: {}",
985 metrics.name,
986 retry.next_sleep(),
987 err.display_with_causes()
988 );
989 } else {
990 debug!(
991 "external operation {} failed, retrying in {:?}: {}",
992 metrics.name,
993 retry.next_sleep(),
994 err.display_with_causes()
995 );
996 }
997 retry = retry.sleep().await;
998 }
999 }
1000 }
1001}
1002
1003#[cfg(test)]
1004mod tests {
1005 use super::*;
1006
1007 #[mz_ore::test(tokio::test)]
1008 #[cfg_attr(miri, ignore)] async fn test_postgres_timestamp_oracle() -> Result<(), anyhow::Error> {
1010 let config = match PostgresTimestampOracleConfig::new_for_test() {
1011 Some(config) => config,
1012 None => {
1013 info!(
1014 "{} env not set: skipping test that uses external service",
1015 PostgresTimestampOracleConfig::EXTERNAL_TESTS_POSTGRES_URL
1016 );
1017 return Ok(());
1018 }
1019 };
1020
1021 crate::tests::timestamp_oracle_impl_test(|timeline, now_fn, initial_ts| {
1022 let oracle = PostgresTimestampOracle::open(
1023 config.clone(),
1024 timeline,
1025 initial_ts,
1026 now_fn,
1027 false, );
1029
1030 async {
1031 let arced_oracle: Arc<dyn TimestampOracle<Timestamp> + Send + Sync> =
1032 Arc::new(oracle.await);
1033
1034 arced_oracle
1035 }
1036 })
1037 .await?;
1038
1039 Ok(())
1040 }
1041}