mz_timestamp_oracle/
postgres_oracle.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! A timestamp oracle backed by "Postgres" for persistence/durability and where
11//! all oracle operations are self-sufficiently linearized, without requiring
12//! any external precautions/machinery.
13
14use std::str::FromStr;
15use std::sync::atomic::{AtomicUsize, Ordering};
16use std::sync::{Arc, RwLock};
17use std::time::{Duration, SystemTime};
18
19use async_trait::async_trait;
20use deadpool_postgres::tokio_postgres::Config;
21use deadpool_postgres::tokio_postgres::error::SqlState;
22use deadpool_postgres::{Object, PoolError};
23use dec::Decimal;
24use mz_adapter_types::timestamp_oracle::{
25    DEFAULT_PG_TIMESTAMP_ORACLE_CONNECT_TIMEOUT, DEFAULT_PG_TIMESTAMP_ORACLE_CONNPOOL_MAX_SIZE,
26    DEFAULT_PG_TIMESTAMP_ORACLE_CONNPOOL_MAX_WAIT, DEFAULT_PG_TIMESTAMP_ORACLE_CONNPOOL_TTL,
27    DEFAULT_PG_TIMESTAMP_ORACLE_CONNPOOL_TTL_STAGGER, DEFAULT_PG_TIMESTAMP_ORACLE_TCP_USER_TIMEOUT,
28};
29use mz_ore::error::ErrorExt;
30use mz_ore::instrument;
31use mz_ore::metrics::MetricsRegistry;
32use mz_ore::url::SensitiveUrl;
33use mz_pgrepr::Numeric;
34use mz_postgres_client::{PostgresClient, PostgresClientConfig, PostgresClientKnobs};
35use mz_repr::Timestamp;
36use postgres_protocol::escape::escape_identifier;
37use serde::{Deserialize, Serialize};
38use tracing::{debug, info};
39
40use crate::WriteTimestamp;
41use crate::metrics::{Metrics, RetryMetrics};
42use crate::retry::Retry;
43use crate::{GenericNowFn, TimestampOracle};
44
45// The timestamp columns are a `DECIMAL` that is big enough to hold
46// `18446744073709551615`, the maximum value of `u64` which is our underlying
47// timestamp type.
48const SCHEMA: &str = "
49CREATE TABLE IF NOT EXISTS timestamp_oracle (
50    timeline text NOT NULL,
51    read_ts DECIMAL(20,0) NOT NULL,
52    write_ts DECIMAL(20,0) NOT NULL,
53    PRIMARY KEY(timeline)
54)
55";
56
57// These `sql_stats_automatic_collection_enabled` are for the cost-based
58// optimizer but all the queries against this table are single-table and very
59// carefully tuned to hit the primary index, so the cost-based optimizer doesn't
60// really get us anything. OTOH, the background jobs that crdb creates to
61// collect these stats fill up the jobs table (slowing down all sorts of
62// things).
63const CRDB_SCHEMA_OPTIONS: &str = "WITH (sql_stats_automatic_collection_enabled = false)";
64// The `timestamp_oracle` table creates and deletes rows at a high
65// frequency, generating many tombstoned rows. If Cockroach's GC
66// interval is set high (the default is 25h) and these tombstones
67// accumulate, scanning over the table will take increasingly and
68// prohibitively long.
69//
70// See: https://github.com/MaterializeInc/database-issues/issues/4001
71// See: https://www.cockroachlabs.com/docs/stable/configure-zone.html#variables
72const CRDB_CONFIGURE_ZONE: &str =
73    "ALTER TABLE timestamp_oracle CONFIGURE ZONE USING gc.ttlseconds = 600;";
74
75/// A [`TimestampOracle`] backed by "Postgres".
76#[derive(Debug)]
77pub struct PostgresTimestampOracle<N>
78where
79    N: GenericNowFn<Timestamp>,
80{
81    timeline: String,
82    next: N,
83    postgres_client: Arc<PostgresClient>,
84    metrics: Arc<Metrics>,
85    /// A read-only timestamp oracle is NOT allowed to do operations that change
86    /// the backing Postgres/CRDB state.
87    read_only: bool,
88}
89
90/// Configuration to connect to a Postgres-backed implementation of
91/// [`TimestampOracle`].
92#[derive(Clone, Debug)]
93pub struct PostgresTimestampOracleConfig {
94    url: SensitiveUrl,
95    pub metrics: Arc<Metrics>,
96
97    /// Configurations that can be dynamically updated.
98    pub dynamic: Arc<DynamicConfig>,
99}
100
101impl From<PostgresTimestampOracleConfig> for PostgresClientConfig {
102    fn from(config: PostgresTimestampOracleConfig) -> Self {
103        let metrics = config.metrics.postgres_client.clone();
104        PostgresClientConfig::new(config.url.clone(), Arc::new(config), metrics)
105    }
106}
107
108impl PostgresTimestampOracleConfig {
109    pub(crate) const EXTERNAL_TESTS_POSTGRES_URL: &'static str = "COCKROACH_URL";
110
111    /// Returns a new instance of [`PostgresTimestampOracleConfig`] with default tuning.
112    pub fn new(url: &SensitiveUrl, metrics_registry: &MetricsRegistry) -> Self {
113        let metrics = Arc::new(Metrics::new(metrics_registry));
114
115        let dynamic = DynamicConfig::default();
116
117        PostgresTimestampOracleConfig {
118            url: url.clone(),
119            metrics,
120            dynamic: Arc::new(dynamic),
121        }
122    }
123
124    /// Returns a new [`PostgresTimestampOracleConfig`] for use in unit tests.
125    ///
126    /// By default, postgres oracle tests are no-ops so that `cargo test` works
127    /// on new environments without any configuration. To activate the tests for
128    /// [`PostgresTimestampOracle`] set the `COCKROACH_URL` environment variable
129    /// with a valid connection url [1].
130    ///
131    /// [1]: https://docs.rs/tokio-postgres/latest/tokio_postgres/config/struct.Config.html#url
132    pub fn new_for_test() -> Option<Self> {
133        let url = match std::env::var(Self::EXTERNAL_TESTS_POSTGRES_URL) {
134            Ok(url) => SensitiveUrl::from_str(&url).expect("invalid Postgres URL"),
135            Err(_) => {
136                if mz_ore::env::is_var_truthy("CI") {
137                    panic!("CI is supposed to run this test but something has gone wrong!");
138                }
139                return None;
140            }
141        };
142
143        let dynamic = DynamicConfig::default();
144
145        let config = PostgresTimestampOracleConfig {
146            url,
147            metrics: Arc::new(Metrics::new(&MetricsRegistry::new())),
148            dynamic: Arc::new(dynamic),
149        };
150
151        Some(config)
152    }
153}
154
155/// Part of [`PostgresTimestampOracleConfig`] that can be dynamically updated.
156///
157/// The timestamp oracle is expected to react to each of these such that
158/// updating the value returned by the function takes effect (i.e. no caching
159/// it). This should happen "as promptly as reasonably possible" where that's
160/// defined by the tradeoffs of complexity vs promptness.
161///
162/// These are hooked up to LaunchDarkly. Specifically, LaunchDarkly configs are
163/// serialized into a [`PostgresTimestampOracleParameters`]. In environmentd,
164/// these are applied directly via [`PostgresTimestampOracleParameters::apply`]
165/// to the [`PostgresTimestampOracleConfig`].
166#[derive(Debug)]
167pub struct DynamicConfig {
168    /// The maximum size of the connection pool to Postgres/CRDB.
169    pg_connection_pool_max_size: AtomicUsize,
170
171    /// The maximum time to wait when attempting to obtain a connection from the pool.
172    pg_connection_pool_max_wait: RwLock<Option<Duration>>,
173
174    /// The minimum TTL of a connection to Postgres/CRDB before it is
175    /// proactively terminated. Connections are routinely culled to balance load
176    /// against the downstream database.
177    pg_connection_pool_ttl: RwLock<Duration>,
178
179    /// The minimum time between TTLing connections to Postgres/CRDB. This delay
180    /// is used to stagger reconnections to avoid stampedes and high tail
181    /// latencies. This value should be much less than `connection_pool_ttl` so
182    /// that reconnections are biased towards terminating the oldest connections
183    /// first. A value of `connection_pool_ttl / connection_pool_max_size` is
184    /// likely a good place to start so that all connections are rotated when
185    /// the pool is fully used.
186    pg_connection_pool_ttl_stagger: RwLock<Duration>,
187
188    /// The duration to wait for a Postgres/CRDB connection to be made before
189    /// retrying.
190    pg_connection_pool_connect_timeout: RwLock<Duration>,
191
192    /// The TCP user timeout for a Postgres/CRDB connection. Specifies the
193    /// amount of time that transmitted data may remain unacknowledged before
194    /// the TCP connection is forcibly closed.
195    pg_connection_pool_tcp_user_timeout: RwLock<Duration>,
196}
197
198impl Default for DynamicConfig {
199    fn default() -> Self {
200        Self {
201            // TODO(aljoscha): These defaults are taken as is from the persist
202            // Consensus tuning. Once we're sufficiently advanced we might want
203            // to pick defaults that are different from the persist defaults.
204            pg_connection_pool_max_size: AtomicUsize::new(
205                DEFAULT_PG_TIMESTAMP_ORACLE_CONNPOOL_MAX_SIZE,
206            ),
207            pg_connection_pool_max_wait: RwLock::new(Some(
208                DEFAULT_PG_TIMESTAMP_ORACLE_CONNPOOL_MAX_WAIT,
209            )),
210            pg_connection_pool_ttl: RwLock::new(DEFAULT_PG_TIMESTAMP_ORACLE_CONNPOOL_TTL),
211            pg_connection_pool_ttl_stagger: RwLock::new(
212                DEFAULT_PG_TIMESTAMP_ORACLE_CONNPOOL_TTL_STAGGER,
213            ),
214            pg_connection_pool_connect_timeout: RwLock::new(
215                DEFAULT_PG_TIMESTAMP_ORACLE_CONNECT_TIMEOUT,
216            ),
217            pg_connection_pool_tcp_user_timeout: RwLock::new(
218                DEFAULT_PG_TIMESTAMP_ORACLE_TCP_USER_TIMEOUT,
219            ),
220        }
221    }
222}
223
224impl DynamicConfig {
225    // TODO: Decide if we can relax these.
226    const LOAD_ORDERING: Ordering = Ordering::SeqCst;
227    const STORE_ORDERING: Ordering = Ordering::SeqCst;
228
229    fn connection_pool_max_size(&self) -> usize {
230        self.pg_connection_pool_max_size.load(Self::LOAD_ORDERING)
231    }
232
233    fn connection_pool_max_wait(&self) -> Option<Duration> {
234        *self
235            .pg_connection_pool_max_wait
236            .read()
237            .expect("lock poisoned")
238    }
239
240    fn connection_pool_ttl(&self) -> Duration {
241        *self.pg_connection_pool_ttl.read().expect("lock poisoned")
242    }
243
244    fn connection_pool_ttl_stagger(&self) -> Duration {
245        *self
246            .pg_connection_pool_ttl_stagger
247            .read()
248            .expect("lock poisoned")
249    }
250
251    fn connect_timeout(&self) -> Duration {
252        *self
253            .pg_connection_pool_connect_timeout
254            .read()
255            .expect("lock poisoned")
256    }
257
258    fn tcp_user_timeout(&self) -> Duration {
259        *self
260            .pg_connection_pool_tcp_user_timeout
261            .read()
262            .expect("lock poisoned")
263    }
264}
265
266impl PostgresClientKnobs for PostgresTimestampOracleConfig {
267    fn connection_pool_max_size(&self) -> usize {
268        self.dynamic.connection_pool_max_size()
269    }
270
271    fn connection_pool_max_wait(&self) -> Option<Duration> {
272        self.dynamic.connection_pool_max_wait()
273    }
274
275    fn connection_pool_ttl(&self) -> Duration {
276        self.dynamic.connection_pool_ttl()
277    }
278
279    fn connection_pool_ttl_stagger(&self) -> Duration {
280        self.dynamic.connection_pool_ttl_stagger()
281    }
282
283    fn connect_timeout(&self) -> Duration {
284        self.dynamic.connect_timeout()
285    }
286
287    fn tcp_user_timeout(&self) -> Duration {
288        self.dynamic.tcp_user_timeout()
289    }
290}
291
292/// Updates to values in [`PostgresTimestampOracleConfig`].
293///
294/// These reflect updates made to LaunchDarkly.
295///
296/// Parameters can be set (`Some`) or unset (`None`). Unset parameters should be
297/// interpreted to mean "use the previous value".
298#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
299pub struct PostgresTimestampOracleParameters {
300    /// Configures `DynamicConfig::pg_connection_pool_max_size`.
301    pub pg_connection_pool_max_size: Option<usize>,
302    /// Configures `DynamicConfig::pg_connection_pool_max_wait`.
303    ///
304    /// NOTE: Yes, this is an `Option<Option<...>>`. Fields in this struct are
305    /// all `Option` to signal the presence or absence of a system var
306    /// configuration. A `Some(None)` value in this field signals the desire to
307    /// have no `max_wait`. This is different from `None`, which signals that
308    /// there is no actively set system var, although this case is not currently
309    /// possible with how the code around this works.
310    pub pg_connection_pool_max_wait: Option<Option<Duration>>,
311    /// Configures `DynamicConfig::pg_connection_pool_ttl`.
312    pub pg_connection_pool_ttl: Option<Duration>,
313    /// Configures `DynamicConfig::pg_connection_pool_ttl_stagger`.
314    pub pg_connection_pool_ttl_stagger: Option<Duration>,
315    /// Configures `DynamicConfig::pg_connection_pool_connect_timeout`.
316    pub pg_connection_pool_connect_timeout: Option<Duration>,
317    /// Configures `DynamicConfig::pg_connection_pool_tcp_user_timeout`.
318    pub pg_connection_pool_tcp_user_timeout: Option<Duration>,
319}
320
321impl PostgresTimestampOracleParameters {
322    /// Update the parameter values with the set ones from `other`.
323    pub fn update(&mut self, other: PostgresTimestampOracleParameters) {
324        // Deconstruct self and other so we get a compile failure if new fields
325        // are added.
326        let Self {
327            pg_connection_pool_max_size: self_pg_connection_pool_max_size,
328            pg_connection_pool_max_wait: self_pg_connection_pool_max_wait,
329            pg_connection_pool_ttl: self_pg_connection_pool_ttl,
330            pg_connection_pool_ttl_stagger: self_pg_connection_pool_ttl_stagger,
331            pg_connection_pool_connect_timeout: self_pg_connection_pool_connect_timeout,
332            pg_connection_pool_tcp_user_timeout: self_pg_connection_pool_tcp_user_timeout,
333        } = self;
334        let Self {
335            pg_connection_pool_max_size: other_pg_connection_pool_max_size,
336            pg_connection_pool_max_wait: other_pg_connection_pool_max_wait,
337            pg_connection_pool_ttl: other_pg_connection_pool_ttl,
338            pg_connection_pool_ttl_stagger: other_pg_connection_pool_ttl_stagger,
339            pg_connection_pool_connect_timeout: other_pg_connection_pool_connect_timeout,
340            pg_connection_pool_tcp_user_timeout: other_pg_connection_pool_tcp_user_timeout,
341        } = other;
342        if let Some(v) = other_pg_connection_pool_max_size {
343            *self_pg_connection_pool_max_size = Some(v);
344        }
345        if let Some(v) = other_pg_connection_pool_max_wait {
346            *self_pg_connection_pool_max_wait = Some(v);
347        }
348        if let Some(v) = other_pg_connection_pool_ttl {
349            *self_pg_connection_pool_ttl = Some(v);
350        }
351        if let Some(v) = other_pg_connection_pool_ttl_stagger {
352            *self_pg_connection_pool_ttl_stagger = Some(v);
353        }
354        if let Some(v) = other_pg_connection_pool_connect_timeout {
355            *self_pg_connection_pool_connect_timeout = Some(v);
356        }
357        if let Some(v) = other_pg_connection_pool_tcp_user_timeout {
358            *self_pg_connection_pool_tcp_user_timeout = Some(v);
359        }
360    }
361
362    /// Applies the parameter values to the given in-memory config object.
363    ///
364    /// Note that these overrides are not all applied atomically: i.e. it's
365    /// possible for the timestamp oracle/postgres client to race with this and
366    /// see some but not all of the parameters applied.
367    pub fn apply(&self, cfg: &PostgresTimestampOracleConfig) {
368        info!(params = ?self, "Applying configuration update!");
369
370        // Deconstruct self so we get a compile failure if new fields are added.
371        let Self {
372            pg_connection_pool_max_size,
373            pg_connection_pool_max_wait,
374            pg_connection_pool_ttl,
375            pg_connection_pool_ttl_stagger,
376            pg_connection_pool_connect_timeout,
377            pg_connection_pool_tcp_user_timeout,
378        } = self;
379        if let Some(pg_connection_pool_max_size) = pg_connection_pool_max_size {
380            cfg.dynamic
381                .pg_connection_pool_max_size
382                .store(*pg_connection_pool_max_size, DynamicConfig::STORE_ORDERING);
383        }
384        if let Some(pg_connection_pool_max_wait) = pg_connection_pool_max_wait {
385            let mut max_wait = cfg
386                .dynamic
387                .pg_connection_pool_max_wait
388                .write()
389                .expect("lock poisoned");
390            *max_wait = *pg_connection_pool_max_wait;
391        }
392        if let Some(pg_connection_pool_ttl) = pg_connection_pool_ttl {
393            let mut ttl = cfg
394                .dynamic
395                .pg_connection_pool_ttl
396                .write()
397                .expect("lock poisoned");
398            *ttl = *pg_connection_pool_ttl;
399        }
400        if let Some(pg_connection_pool_ttl_stagger) = pg_connection_pool_ttl_stagger {
401            let mut ttl_stagger = cfg
402                .dynamic
403                .pg_connection_pool_ttl_stagger
404                .write()
405                .expect("lock poisoned");
406            *ttl_stagger = *pg_connection_pool_ttl_stagger;
407        }
408        if let Some(pg_connection_pool_connect_timeout) = pg_connection_pool_connect_timeout {
409            let mut timeout = cfg
410                .dynamic
411                .pg_connection_pool_connect_timeout
412                .write()
413                .expect("lock poisoned");
414            *timeout = *pg_connection_pool_connect_timeout;
415        }
416        if let Some(pg_connection_pool_tcp_user_timeout) = pg_connection_pool_tcp_user_timeout {
417            let mut timeout = cfg
418                .dynamic
419                .pg_connection_pool_tcp_user_timeout
420                .write()
421                .expect("lock poisoned");
422            *timeout = *pg_connection_pool_tcp_user_timeout;
423        }
424    }
425}
426
427impl<N> PostgresTimestampOracle<N>
428where
429    N: GenericNowFn<Timestamp> + std::fmt::Debug + 'static,
430{
431    /// Open a Postgres [`TimestampOracle`] instance with `config`, for the
432    /// timeline named `timeline`. `next` generates new timestamps when invoked.
433    /// Timestamps that are returned are made durable and will never retract.
434    pub async fn open(
435        config: PostgresTimestampOracleConfig,
436        timeline: String,
437        initially: Timestamp,
438        next: N,
439        read_only: bool,
440    ) -> Self {
441        info!(config = ?config, "opening PostgresTimestampOracle");
442
443        let fallible = || async {
444            let metrics = Arc::clone(&config.metrics);
445
446            // don't need to unredact here because we're just pulling out the username
447            let pg_config: Config = config.url.to_string().parse()?;
448            let role = pg_config.get_user().unwrap();
449            let create_schema = format!(
450                "CREATE SCHEMA IF NOT EXISTS tsoracle AUTHORIZATION {}",
451                escape_identifier(role),
452            );
453
454            let postgres_client = PostgresClient::open(config.clone().into())?;
455
456            let client = postgres_client.get_connection().await?;
457
458            let crdb_mode = match client
459                .batch_execute(&format!(
460                    "{}; {}{}; {}",
461                    create_schema, SCHEMA, CRDB_SCHEMA_OPTIONS, CRDB_CONFIGURE_ZONE,
462                ))
463                .await
464            {
465                Ok(()) => true,
466                Err(e)
467                    if e.code() == Some(&SqlState::INVALID_PARAMETER_VALUE)
468                        || e.code() == Some(&SqlState::SYNTAX_ERROR) =>
469                {
470                    info!(
471                        "unable to initiate timestamp_oracle with CRDB params, this is expected and OK when running against Postgres: {:?}",
472                        e
473                    );
474                    false
475                }
476                Err(e) => return Err(e.into()),
477            };
478
479            if !crdb_mode {
480                client
481                    .batch_execute(&format!("{}; {};", create_schema, SCHEMA))
482                    .await?;
483            }
484
485            let oracle = PostgresTimestampOracle {
486                timeline: timeline.clone(),
487                next: next.clone(),
488                postgres_client: Arc::new(postgres_client),
489                metrics,
490                read_only,
491            };
492
493            // Create a row for our timeline, if it doesn't exist. The
494            // `apply_write` call below expects the row to be present. If we
495            // didn't have this here we would always need CHECK EXISTS calls or
496            // something in `apply_write`, making it more complicated, so we
497            // only do it once here, on initialization.
498            let q = r#"
499                    INSERT INTO timestamp_oracle (timeline, read_ts, write_ts)
500                        VALUES ($1, $2, $3)
501                        ON CONFLICT (timeline) DO NOTHING;
502                "#;
503            let statement = client.prepare_cached(q).await?;
504
505            let initially_coerced = Self::ts_to_decimal(initially);
506            let _ = client
507                .execute(
508                    &statement,
509                    &[&oracle.timeline, &initially_coerced, &initially_coerced],
510                )
511                .await?;
512
513            // Forward timestamps to what we're given from outside. Remember,
514            // the above query will only create the row at the initial timestamp
515            // if it didn't exist before.
516            if !read_only {
517                TimestampOracle::apply_write(&oracle, initially).await;
518            }
519
520            Result::<_, anyhow::Error>::Ok(oracle)
521        };
522
523        let metrics = &config.metrics.retries.open;
524
525        let oracle = retry_fallible(metrics, fallible).await;
526
527        oracle
528    }
529
530    async fn get_connection(&self) -> Result<Object, PoolError> {
531        self.postgres_client.get_connection().await
532    }
533
534    /// Returns a `Vec` of all known timelines along with their current greatest
535    /// timestamp (max of read_ts and write_ts).
536    ///
537    /// For use when initializing another [`TimestampOracle`] implementation
538    /// from another oracle's state.
539    ///
540    /// NOTE: This method can have linearizability violations, but it's hard to
541    /// get around those. They will only occur during bootstrap, and only
542    /// if/when we migrate between different oracle implementations. Once we
543    /// migrated from the catalog-backed oracle to the postgres/crdb-backed
544    /// oracle we can remove this code and it's callsite.
545    pub async fn get_all_timelines(
546        config: PostgresTimestampOracleConfig,
547    ) -> Result<Vec<(String, Timestamp)>, anyhow::Error> {
548        let fallible = || async {
549            let postgres_client = PostgresClient::open(config.clone().into())?;
550
551            let mut client = postgres_client.get_connection().await?;
552
553            let txn = client.transaction().await?;
554
555            // Using `table_schema = CURRENT_SCHEMA` makes sure we only include
556            // tables that are queryable by us. Otherwise this check might
557            // return true but then the query below fails with a confusing
558            // "table does not exist" error.
559            let q = r#"
560            SELECT EXISTS (SELECT * FROM information_schema.tables WHERE table_name = 'timestamp_oracle' AND table_schema = CURRENT_SCHEMA);
561        "#;
562            let statement = txn.prepare(q).await?;
563            let exists_row = txn.query_one(&statement, &[]).await?;
564            let exists: bool = exists_row.try_get("exists").expect("missing exists column");
565            if !exists {
566                return Ok(Vec::new());
567            }
568
569            let q = r#"
570            SELECT timeline, GREATEST(read_ts, write_ts) as ts FROM timestamp_oracle;
571        "#;
572            let statement = txn.prepare(q).await?;
573            let rows = txn.query(&statement, &[]).await?;
574
575            txn.commit().await?;
576
577            let result = rows
578                .into_iter()
579                .map(|row| {
580                    let timeline: String =
581                        row.try_get("timeline").expect("missing timeline column");
582                    let ts: Numeric = row.try_get("ts").expect("missing ts column");
583                    let ts = Self::decimal_to_ts(ts);
584
585                    (timeline, ts)
586                })
587                .collect::<Vec<_>>();
588
589            Ok(result)
590        };
591
592        let metrics = &config.metrics.retries.get_all_timelines;
593
594        let result = retry_fallible(metrics, fallible).await;
595
596        Ok(result)
597    }
598
599    #[mz_ore::instrument(name = "oracle::write_ts")]
600    async fn fallible_write_ts(&self) -> Result<WriteTimestamp<Timestamp>, anyhow::Error> {
601        if self.read_only {
602            panic!("attempting write_ts in read-only mode");
603        }
604
605        let proposed_next_ts = self.next.now();
606        let proposed_next_ts = Self::ts_to_decimal(proposed_next_ts);
607
608        let q = r#"
609            UPDATE timestamp_oracle SET write_ts = GREATEST(write_ts+1, $2)
610                WHERE timeline = $1
611            RETURNING write_ts;
612        "#;
613        let client = self.get_connection().await?;
614        let statement = client.prepare_cached(q).await?;
615        let result = client
616            .query_one(&statement, &[&self.timeline, &proposed_next_ts])
617            .await?;
618
619        let write_ts: Numeric = result.try_get("write_ts").expect("missing column write_ts");
620        let write_ts = Self::decimal_to_ts(write_ts);
621
622        debug!(
623            timeline = ?self.timeline,
624            write_ts = ?write_ts,
625            proposed_next_ts = ?proposed_next_ts,
626            "returning from write_ts()");
627
628        let advance_to = write_ts.step_forward();
629
630        Ok(WriteTimestamp {
631            timestamp: write_ts,
632            advance_to,
633        })
634    }
635
636    #[mz_ore::instrument(name = "oracle::peek_write_ts")]
637    async fn fallible_peek_write_ts(&self) -> Result<Timestamp, anyhow::Error> {
638        let q = r#"
639            SELECT write_ts FROM timestamp_oracle
640                WHERE timeline = $1;
641        "#;
642        let client = self.get_connection().await?;
643        let statement = client.prepare_cached(q).await?;
644        let result = client.query_one(&statement, &[&self.timeline]).await?;
645
646        let write_ts: Numeric = result.try_get("write_ts").expect("missing column write_ts");
647        let write_ts = Self::decimal_to_ts(write_ts);
648
649        debug!(
650            timeline = ?self.timeline,
651            write_ts = ?write_ts,
652            "returning from peek_write_ts()");
653
654        Ok(write_ts)
655    }
656
657    #[mz_ore::instrument(name = "oracle::read_ts")]
658    async fn fallible_read_ts(&self) -> Result<Timestamp, anyhow::Error> {
659        let q = r#"
660            SELECT read_ts FROM timestamp_oracle
661                WHERE timeline = $1;
662        "#;
663        let client = self.get_connection().await?;
664        let statement = client.prepare_cached(q).await?;
665        let result = client.query_one(&statement, &[&self.timeline]).await?;
666
667        let read_ts: Numeric = result.try_get("read_ts").expect("missing column read_ts");
668        let read_ts = Self::decimal_to_ts(read_ts);
669
670        debug!(
671            timeline = ?self.timeline,
672            read_ts = ?read_ts,
673            "returning from read_ts()");
674
675        Ok(read_ts)
676    }
677
678    #[mz_ore::instrument(name = "oracle::apply_write")]
679    async fn fallible_apply_write(&self, write_ts: Timestamp) -> Result<(), anyhow::Error> {
680        if self.read_only {
681            panic!("attempting apply_write in read-only mode");
682        }
683
684        let q = r#"
685            UPDATE timestamp_oracle SET write_ts = GREATEST(write_ts, $2), read_ts = GREATEST(read_ts, $2)
686                WHERE timeline = $1;
687        "#;
688        let client = self.get_connection().await?;
689        let statement = client.prepare_cached(q).await?;
690        let write_ts = Self::ts_to_decimal(write_ts);
691
692        let _ = client
693            .execute(&statement, &[&self.timeline, &write_ts])
694            .await?;
695
696        debug!(
697            timeline = ?self.timeline,
698            write_ts = ?write_ts,
699            "returning from apply_write()");
700
701        Ok(())
702    }
703
704    // We could add `From` impls for these but for now keep the code local to
705    // the oracle.
706    fn ts_to_decimal(ts: Timestamp) -> Numeric {
707        let decimal = Decimal::from(ts);
708        Numeric::from(decimal)
709    }
710
711    // We could add `From` impls for these but for now keep the code local to
712    // the oracle.
713    fn decimal_to_ts(ts: Numeric) -> Timestamp {
714        ts.0.0.try_into().expect("we only use u64 timestamps")
715    }
716}
717
718// A wrapper around the `fallible_` methods that adds operation metrics and
719// retries.
720//
721// NOTE: This implementation is tied to [`mz_repr::Timestamp`]. We could change
722// that, and also make the types we store in the backing "Postgres" table
723// generic. But in practice we only use oracles for [`mz_repr::Timestamp`] so
724// don't do that extra work for now.
725#[async_trait]
726impl<N> TimestampOracle<Timestamp> for PostgresTimestampOracle<N>
727where
728    N: GenericNowFn<Timestamp> + std::fmt::Debug + 'static,
729{
730    #[instrument]
731    async fn write_ts(&self) -> WriteTimestamp<Timestamp> {
732        let metrics = &self.metrics.retries.write_ts;
733
734        let res = retry_fallible(metrics, || {
735            self.metrics
736                .oracle
737                .write_ts
738                .run_op(|| self.fallible_write_ts())
739        })
740        .await;
741
742        res
743    }
744
745    #[instrument]
746    async fn peek_write_ts(&self) -> Timestamp {
747        let metrics = &self.metrics.retries.peek_write_ts;
748
749        let res = retry_fallible(metrics, || {
750            self.metrics
751                .oracle
752                .peek_write_ts
753                .run_op(|| self.fallible_peek_write_ts())
754        })
755        .await;
756
757        res
758    }
759
760    #[instrument]
761    async fn read_ts(&self) -> Timestamp {
762        let metrics = &self.metrics.retries.read_ts;
763
764        let res = retry_fallible(metrics, || {
765            self.metrics
766                .oracle
767                .read_ts
768                .run_op(|| self.fallible_read_ts())
769        })
770        .await;
771
772        res
773    }
774
775    #[instrument]
776    async fn apply_write(&self, write_ts: Timestamp) {
777        let metrics = &self.metrics.retries.apply_write;
778
779        let res = retry_fallible(metrics, || {
780            self.metrics
781                .oracle
782                .apply_write
783                .run_op(|| self.fallible_apply_write(write_ts.clone()))
784        })
785        .await;
786
787        res
788    }
789}
790
791pub const INFO_MIN_ATTEMPTS: usize = 3;
792
793pub async fn retry_fallible<R, F, WorkFn>(metrics: &RetryMetrics, mut work_fn: WorkFn) -> R
794where
795    F: std::future::Future<Output = Result<R, anyhow::Error>>,
796    WorkFn: FnMut() -> F,
797{
798    let mut retry = metrics.stream(Retry::oracle_defaults(SystemTime::now()).into_retry_stream());
799    loop {
800        match work_fn().await {
801            Ok(x) => {
802                if retry.attempt() > 0 {
803                    debug!(
804                        "external operation {} succeeded after failing at least once",
805                        metrics.name,
806                    );
807                }
808                return x;
809            }
810            Err(err)
811                if err
812                    .to_string()
813                    .contains("\"timestamp_oracle\" does not exist") =>
814            {
815                // TODO(aljoscha): Re-consider this before enabling the oracle
816                // in production.
817                // It's a policy question whether this actually _is_ an
818                // unrecoverable error: an operator could go in an re-create the
819                // `timestamp_oracle` table once the problem is noticed.
820                // However, our tests currently assume that materialize will
821                // fail unrecoverably in cases where we do a rug-pull/remove
822                // expected tables from CRDB.
823                panic!(
824                    "external operation {} failed unrecoverably, someone removed our database/schema/table: {}",
825                    metrics.name,
826                    err.display_with_causes()
827                );
828            }
829            Err(err) => {
830                if retry.attempt() >= INFO_MIN_ATTEMPTS {
831                    info!(
832                        "external operation {} failed, retrying in {:?}: {}",
833                        metrics.name,
834                        retry.next_sleep(),
835                        err.display_with_causes()
836                    );
837                } else {
838                    info!(
839                        "external operation {} failed, retrying in {:?}: {}",
840                        metrics.name,
841                        retry.next_sleep(),
842                        err.display_with_causes()
843                    );
844                }
845                retry = retry.sleep().await;
846            }
847        }
848    }
849}
850
851#[cfg(test)]
852mod tests {
853    use super::*;
854
855    #[mz_ore::test(tokio::test)]
856    #[cfg_attr(miri, ignore)] // error: unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
857    async fn test_postgres_timestamp_oracle() -> Result<(), anyhow::Error> {
858        let config = match PostgresTimestampOracleConfig::new_for_test() {
859            Some(config) => config,
860            None => {
861                info!(
862                    "{} env not set: skipping test that uses external service",
863                    PostgresTimestampOracleConfig::EXTERNAL_TESTS_POSTGRES_URL
864                );
865                return Ok(());
866            }
867        };
868
869        crate::tests::timestamp_oracle_impl_test(|timeline, now_fn, initial_ts| {
870            let oracle = PostgresTimestampOracle::open(
871                config.clone(),
872                timeline,
873                initial_ts,
874                now_fn,
875                false, /* read-only */
876            );
877
878            async {
879                let arced_oracle: Arc<dyn TimestampOracle<Timestamp> + Send + Sync> =
880                    Arc::new(oracle.await);
881
882                arced_oracle
883            }
884        })
885        .await?;
886
887        Ok(())
888    }
889}