Skip to main content

mz_timestamp_oracle/
postgres_oracle.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! A timestamp oracle backed by "Postgres" for persistence/durability and where
11//! all oracle operations are self-sufficiently linearized, without requiring
12//! any external precautions/machinery.
13
14use std::str::FromStr;
15use std::sync::atomic::{AtomicU32, AtomicUsize, Ordering};
16use std::sync::{Arc, RwLock};
17use std::time::{Duration, SystemTime};
18
19use async_trait::async_trait;
20use deadpool_postgres::tokio_postgres::Config;
21use deadpool_postgres::tokio_postgres::error::SqlState;
22use deadpool_postgres::{Object, PoolError};
23use dec::Decimal;
24use mz_adapter_types::timestamp_oracle::{
25    DEFAULT_PG_TIMESTAMP_ORACLE_CONNECT_TIMEOUT, DEFAULT_PG_TIMESTAMP_ORACLE_CONNPOOL_MAX_SIZE,
26    DEFAULT_PG_TIMESTAMP_ORACLE_CONNPOOL_MAX_WAIT, DEFAULT_PG_TIMESTAMP_ORACLE_CONNPOOL_TTL,
27    DEFAULT_PG_TIMESTAMP_ORACLE_CONNPOOL_TTL_STAGGER, DEFAULT_PG_TIMESTAMP_ORACLE_KEEPALIVES_IDLE,
28    DEFAULT_PG_TIMESTAMP_ORACLE_KEEPALIVES_INTERVAL,
29    DEFAULT_PG_TIMESTAMP_ORACLE_KEEPALIVES_RETRIES, DEFAULT_PG_TIMESTAMP_ORACLE_TCP_USER_TIMEOUT,
30};
31use mz_ore::error::ErrorExt;
32use mz_ore::instrument;
33use mz_ore::metrics::MetricsRegistry;
34use mz_ore::url::SensitiveUrl;
35use mz_pgrepr::Numeric;
36use mz_postgres_client::{PostgresClient, PostgresClientConfig, PostgresClientKnobs};
37use mz_repr::Timestamp;
38use postgres_protocol::escape::escape_identifier;
39use serde::{Deserialize, Serialize};
40use tracing::{debug, info};
41
42use crate::WriteTimestamp;
43use crate::metrics::{Metrics, RetryMetrics};
44use crate::retry::Retry;
45use crate::{GenericNowFn, TimestampOracle};
46
47// The timestamp columns are a `DECIMAL` that is big enough to hold
48// `18446744073709551615`, the maximum value of `u64` which is our underlying
49// timestamp type.
50const SCHEMA: &str = "
51CREATE TABLE IF NOT EXISTS timestamp_oracle (
52    timeline text NOT NULL,
53    read_ts DECIMAL(20,0) NOT NULL,
54    write_ts DECIMAL(20,0) NOT NULL,
55    PRIMARY KEY(timeline)
56)
57";
58
59// These `sql_stats_automatic_collection_enabled` are for the cost-based
60// optimizer but all the queries against this table are single-table and very
61// carefully tuned to hit the primary index, so the cost-based optimizer doesn't
62// really get us anything. OTOH, the background jobs that crdb creates to
63// collect these stats fill up the jobs table (slowing down all sorts of
64// things).
65const CRDB_SCHEMA_OPTIONS: &str = "WITH (sql_stats_automatic_collection_enabled = false)";
66// The `timestamp_oracle` table creates and deletes rows at a high
67// frequency, generating many tombstoned rows. If Cockroach's GC
68// interval is set high (the default is 25h) and these tombstones
69// accumulate, scanning over the table will take increasingly and
70// prohibitively long.
71//
72// See: https://github.com/MaterializeInc/database-issues/issues/4001
73// See: https://www.cockroachlabs.com/docs/stable/configure-zone.html#variables
74const CRDB_CONFIGURE_ZONE: &str =
75    "ALTER TABLE timestamp_oracle CONFIGURE ZONE USING gc.ttlseconds = 600;";
76
77/// A [`TimestampOracle`] backed by "Postgres".
78#[derive(Debug)]
79pub struct PostgresTimestampOracle<N>
80where
81    N: GenericNowFn<Timestamp>,
82{
83    timeline: String,
84    next: N,
85    postgres_client: Arc<PostgresClient>,
86    metrics: Arc<Metrics>,
87    /// A read-only timestamp oracle is NOT allowed to do operations that change
88    /// the backing Postgres/CRDB state.
89    read_only: bool,
90}
91
92/// Configuration to connect to a Postgres-backed implementation of
93/// [`TimestampOracle`].
94#[derive(Clone, Debug)]
95pub struct PostgresTimestampOracleConfig {
96    url: SensitiveUrl,
97    metrics: Arc<Metrics>,
98
99    /// Configurations that can be dynamically updated.
100    pub dynamic: Arc<DynamicConfig>,
101}
102
103impl From<PostgresTimestampOracleConfig> for PostgresClientConfig {
104    fn from(config: PostgresTimestampOracleConfig) -> Self {
105        let metrics = config.metrics.postgres_client.clone();
106        PostgresClientConfig::new(config.url.clone(), Arc::new(config), metrics)
107    }
108}
109
110impl PostgresTimestampOracleConfig {
111    pub(crate) const EXTERNAL_TESTS_POSTGRES_URL: &'static str = "METADATA_BACKEND_URL";
112
113    /// Returns a new instance of [`PostgresTimestampOracleConfig`] with default tuning.
114    pub fn new(url: &SensitiveUrl, metrics_registry: &MetricsRegistry) -> Self {
115        let metrics = Arc::new(Metrics::new(metrics_registry));
116
117        let dynamic = DynamicConfig::default();
118
119        PostgresTimestampOracleConfig {
120            url: url.clone(),
121            metrics,
122            dynamic: Arc::new(dynamic),
123        }
124    }
125
126    /// Returns a new [`PostgresTimestampOracleConfig`] for use in unit tests.
127    ///
128    /// By default, postgres oracle tests are no-ops so that `cargo test` works
129    /// on new environments without any configuration. To activate the tests for
130    /// [`PostgresTimestampOracle`] set the `METADATA_BACKEND_URL` environment variable
131    /// with a valid connection url [1].
132    ///
133    /// [1]: https://docs.rs/tokio-postgres/latest/tokio_postgres/config/struct.Config.html#url
134    pub fn new_for_test() -> Option<Self> {
135        let url = match std::env::var(Self::EXTERNAL_TESTS_POSTGRES_URL) {
136            Ok(url) => SensitiveUrl::from_str(&url).expect("invalid Postgres URL"),
137            Err(_) => {
138                if mz_ore::env::is_var_truthy("CI") {
139                    panic!("CI is supposed to run this test but something has gone wrong!");
140                }
141                return None;
142            }
143        };
144
145        let dynamic = DynamicConfig::default();
146
147        let config = PostgresTimestampOracleConfig {
148            url,
149            metrics: Arc::new(Metrics::new(&MetricsRegistry::new())),
150            dynamic: Arc::new(dynamic),
151        };
152
153        Some(config)
154    }
155
156    /// Returns the metrics associated with this config.
157    pub(crate) fn metrics(&self) -> &Arc<Metrics> {
158        &self.metrics
159    }
160}
161
162/// Part of [`PostgresTimestampOracleConfig`] that can be dynamically updated.
163///
164/// The timestamp oracle is expected to react to each of these such that
165/// updating the value returned by the function takes effect (i.e. no caching
166/// it). This should happen "as promptly as reasonably possible" where that's
167/// defined by the tradeoffs of complexity vs promptness.
168///
169/// These are hooked up to LaunchDarkly. Specifically, LaunchDarkly configs are
170/// serialized into a [`TimestampOracleParameters`]. In environmentd,
171/// these are applied directly via [`TimestampOracleParameters::apply`]
172/// to the [`PostgresTimestampOracleConfig`].
173#[derive(Debug)]
174pub struct DynamicConfig {
175    /// The maximum size of the connection pool to Postgres/CRDB.
176    pg_connection_pool_max_size: AtomicUsize,
177
178    /// The maximum time to wait when attempting to obtain a connection from the pool.
179    pg_connection_pool_max_wait: RwLock<Option<Duration>>,
180
181    /// The minimum TTL of a connection to Postgres/CRDB before it is
182    /// proactively terminated. Connections are routinely culled to balance load
183    /// against the downstream database.
184    pg_connection_pool_ttl: RwLock<Duration>,
185
186    /// The minimum time between TTLing connections to Postgres/CRDB. This delay
187    /// is used to stagger reconnections to avoid stampedes and high tail
188    /// latencies. This value should be much less than `connection_pool_ttl` so
189    /// that reconnections are biased towards terminating the oldest connections
190    /// first. A value of `connection_pool_ttl / connection_pool_max_size` is
191    /// likely a good place to start so that all connections are rotated when
192    /// the pool is fully used.
193    pg_connection_pool_ttl_stagger: RwLock<Duration>,
194
195    /// The duration to wait for a Postgres/CRDB connection to be made before
196    /// retrying.
197    pg_connection_pool_connect_timeout: RwLock<Duration>,
198
199    /// The TCP user timeout for a Postgres/CRDB connection. Specifies the
200    /// amount of time that transmitted data may remain unacknowledged before
201    /// the TCP connection is forcibly closed.
202    pg_connection_pool_tcp_user_timeout: RwLock<Duration>,
203
204    /// The amount of idle time before a TCP keepalive packet is sent on
205    /// Postgres/CRDB connections.
206    pg_connection_pool_keepalives_idle: RwLock<Duration>,
207
208    /// The time interval between TCP keepalive probes on Postgres/CRDB
209    /// connections.
210    pg_connection_pool_keepalives_interval: RwLock<Duration>,
211
212    /// The maximum number of TCP keepalive probes that will be sent before
213    /// dropping a Postgres/CRDB connection.
214    pg_connection_pool_keepalives_retries: AtomicU32,
215}
216
217impl Default for DynamicConfig {
218    fn default() -> Self {
219        Self {
220            // TODO(aljoscha): These defaults are taken as is from the persist
221            // Consensus tuning. Once we're sufficiently advanced we might want
222            // to pick defaults that are different from the persist defaults.
223            pg_connection_pool_max_size: AtomicUsize::new(
224                DEFAULT_PG_TIMESTAMP_ORACLE_CONNPOOL_MAX_SIZE,
225            ),
226            pg_connection_pool_max_wait: RwLock::new(Some(
227                DEFAULT_PG_TIMESTAMP_ORACLE_CONNPOOL_MAX_WAIT,
228            )),
229            pg_connection_pool_ttl: RwLock::new(DEFAULT_PG_TIMESTAMP_ORACLE_CONNPOOL_TTL),
230            pg_connection_pool_ttl_stagger: RwLock::new(
231                DEFAULT_PG_TIMESTAMP_ORACLE_CONNPOOL_TTL_STAGGER,
232            ),
233            pg_connection_pool_connect_timeout: RwLock::new(
234                DEFAULT_PG_TIMESTAMP_ORACLE_CONNECT_TIMEOUT,
235            ),
236            pg_connection_pool_tcp_user_timeout: RwLock::new(
237                DEFAULT_PG_TIMESTAMP_ORACLE_TCP_USER_TIMEOUT,
238            ),
239            pg_connection_pool_keepalives_idle: RwLock::new(
240                DEFAULT_PG_TIMESTAMP_ORACLE_KEEPALIVES_IDLE,
241            ),
242            pg_connection_pool_keepalives_interval: RwLock::new(
243                DEFAULT_PG_TIMESTAMP_ORACLE_KEEPALIVES_INTERVAL,
244            ),
245            pg_connection_pool_keepalives_retries: AtomicU32::new(
246                DEFAULT_PG_TIMESTAMP_ORACLE_KEEPALIVES_RETRIES,
247            ),
248        }
249    }
250}
251
252impl DynamicConfig {
253    // TODO: Decide if we can relax these.
254    const LOAD_ORDERING: Ordering = Ordering::SeqCst;
255    const STORE_ORDERING: Ordering = Ordering::SeqCst;
256
257    fn connection_pool_max_size(&self) -> usize {
258        self.pg_connection_pool_max_size.load(Self::LOAD_ORDERING)
259    }
260
261    fn connection_pool_max_wait(&self) -> Option<Duration> {
262        *self
263            .pg_connection_pool_max_wait
264            .read()
265            .expect("lock poisoned")
266    }
267
268    fn connection_pool_ttl(&self) -> Duration {
269        *self.pg_connection_pool_ttl.read().expect("lock poisoned")
270    }
271
272    fn connection_pool_ttl_stagger(&self) -> Duration {
273        *self
274            .pg_connection_pool_ttl_stagger
275            .read()
276            .expect("lock poisoned")
277    }
278
279    fn connect_timeout(&self) -> Duration {
280        *self
281            .pg_connection_pool_connect_timeout
282            .read()
283            .expect("lock poisoned")
284    }
285
286    fn tcp_user_timeout(&self) -> Duration {
287        *self
288            .pg_connection_pool_tcp_user_timeout
289            .read()
290            .expect("lock poisoned")
291    }
292
293    fn keepalives_idle(&self) -> Duration {
294        *self
295            .pg_connection_pool_keepalives_idle
296            .read()
297            .expect("lock poisoned")
298    }
299
300    fn keepalives_interval(&self) -> Duration {
301        *self
302            .pg_connection_pool_keepalives_interval
303            .read()
304            .expect("lock poisoned")
305    }
306
307    fn keepalives_retries(&self) -> u32 {
308        self.pg_connection_pool_keepalives_retries
309            .load(Self::LOAD_ORDERING)
310    }
311}
312
313impl PostgresClientKnobs for PostgresTimestampOracleConfig {
314    fn connection_pool_max_size(&self) -> usize {
315        self.dynamic.connection_pool_max_size()
316    }
317
318    fn connection_pool_max_wait(&self) -> Option<Duration> {
319        self.dynamic.connection_pool_max_wait()
320    }
321
322    fn connection_pool_ttl(&self) -> Duration {
323        self.dynamic.connection_pool_ttl()
324    }
325
326    fn connection_pool_ttl_stagger(&self) -> Duration {
327        self.dynamic.connection_pool_ttl_stagger()
328    }
329
330    fn connect_timeout(&self) -> Duration {
331        self.dynamic.connect_timeout()
332    }
333
334    fn tcp_user_timeout(&self) -> Duration {
335        self.dynamic.tcp_user_timeout()
336    }
337
338    fn keepalives_idle(&self) -> Duration {
339        self.dynamic.keepalives_idle()
340    }
341
342    fn keepalives_interval(&self) -> Duration {
343        self.dynamic.keepalives_interval()
344    }
345
346    fn keepalives_retries(&self) -> u32 {
347        self.dynamic.keepalives_retries()
348    }
349}
350
351/// Updates to values in [`PostgresTimestampOracleConfig`].
352///
353/// These reflect updates made to LaunchDarkly.
354///
355/// Parameters can be set (`Some`) or unset (`None`). Unset parameters should be
356/// interpreted to mean "use the previous value".
357#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
358pub struct TimestampOracleParameters {
359    /// Configures `DynamicConfig::pg_connection_pool_max_size`.
360    pub pg_connection_pool_max_size: Option<usize>,
361    /// Configures `DynamicConfig::pg_connection_pool_max_wait`.
362    ///
363    /// NOTE: Yes, this is an `Option<Option<...>>`. Fields in this struct are
364    /// all `Option` to signal the presence or absence of a system var
365    /// configuration. A `Some(None)` value in this field signals the desire to
366    /// have no `max_wait`. This is different from `None`, which signals that
367    /// there is no actively set system var, although this case is not currently
368    /// possible with how the code around this works.
369    pub pg_connection_pool_max_wait: Option<Option<Duration>>,
370    /// Configures `DynamicConfig::pg_connection_pool_ttl`.
371    pub pg_connection_pool_ttl: Option<Duration>,
372    /// Configures `DynamicConfig::pg_connection_pool_ttl_stagger`.
373    pub pg_connection_pool_ttl_stagger: Option<Duration>,
374    /// Configures `DynamicConfig::pg_connection_pool_connect_timeout`.
375    pub pg_connection_pool_connect_timeout: Option<Duration>,
376    /// Configures `DynamicConfig::pg_connection_pool_tcp_user_timeout`.
377    pub pg_connection_pool_tcp_user_timeout: Option<Duration>,
378    /// Configures `DynamicConfig::pg_connection_pool_keepalives_idle`.
379    pub pg_connection_pool_keepalives_idle: Option<Duration>,
380    /// Configures `DynamicConfig::pg_connection_pool_keepalives_interval`.
381    pub pg_connection_pool_keepalives_interval: Option<Duration>,
382    /// Configures `DynamicConfig::pg_connection_pool_keepalives_retries`.
383    pub pg_connection_pool_keepalives_retries: Option<u32>,
384}
385
386impl TimestampOracleParameters {
387    /// Update the parameter values with the set ones from `other`.
388    pub fn update(&mut self, other: TimestampOracleParameters) {
389        // Deconstruct self and other so we get a compile failure if new fields
390        // are added.
391        let Self {
392            pg_connection_pool_max_size: self_pg_connection_pool_max_size,
393            pg_connection_pool_max_wait: self_pg_connection_pool_max_wait,
394            pg_connection_pool_ttl: self_pg_connection_pool_ttl,
395            pg_connection_pool_ttl_stagger: self_pg_connection_pool_ttl_stagger,
396            pg_connection_pool_connect_timeout: self_pg_connection_pool_connect_timeout,
397            pg_connection_pool_tcp_user_timeout: self_pg_connection_pool_tcp_user_timeout,
398            pg_connection_pool_keepalives_idle: self_pg_connection_pool_keepalives_idle,
399            pg_connection_pool_keepalives_interval: self_pg_connection_pool_keepalives_interval,
400            pg_connection_pool_keepalives_retries: self_pg_connection_pool_keepalives_retries,
401        } = self;
402        let Self {
403            pg_connection_pool_max_size: other_pg_connection_pool_max_size,
404            pg_connection_pool_max_wait: other_pg_connection_pool_max_wait,
405            pg_connection_pool_ttl: other_pg_connection_pool_ttl,
406            pg_connection_pool_ttl_stagger: other_pg_connection_pool_ttl_stagger,
407            pg_connection_pool_connect_timeout: other_pg_connection_pool_connect_timeout,
408            pg_connection_pool_tcp_user_timeout: other_pg_connection_pool_tcp_user_timeout,
409            pg_connection_pool_keepalives_idle: other_pg_connection_pool_keepalives_idle,
410            pg_connection_pool_keepalives_interval: other_pg_connection_pool_keepalives_interval,
411            pg_connection_pool_keepalives_retries: other_pg_connection_pool_keepalives_retries,
412        } = other;
413        if let Some(v) = other_pg_connection_pool_max_size {
414            *self_pg_connection_pool_max_size = Some(v);
415        }
416        if let Some(v) = other_pg_connection_pool_max_wait {
417            *self_pg_connection_pool_max_wait = Some(v);
418        }
419        if let Some(v) = other_pg_connection_pool_ttl {
420            *self_pg_connection_pool_ttl = Some(v);
421        }
422        if let Some(v) = other_pg_connection_pool_ttl_stagger {
423            *self_pg_connection_pool_ttl_stagger = Some(v);
424        }
425        if let Some(v) = other_pg_connection_pool_connect_timeout {
426            *self_pg_connection_pool_connect_timeout = Some(v);
427        }
428        if let Some(v) = other_pg_connection_pool_tcp_user_timeout {
429            *self_pg_connection_pool_tcp_user_timeout = Some(v);
430        }
431        if let Some(v) = other_pg_connection_pool_keepalives_idle {
432            *self_pg_connection_pool_keepalives_idle = Some(v);
433        }
434        if let Some(v) = other_pg_connection_pool_keepalives_interval {
435            *self_pg_connection_pool_keepalives_interval = Some(v);
436        }
437        if let Some(v) = other_pg_connection_pool_keepalives_retries {
438            *self_pg_connection_pool_keepalives_retries = Some(v);
439        }
440    }
441
442    /// Applies the parameter values to the given in-memory config object.
443    ///
444    /// Note that these overrides are not all applied atomically: i.e. it's
445    /// possible for the timestamp oracle/postgres client to race with this and
446    /// see some but not all of the parameters applied.
447    pub fn apply(&self, cfg: &PostgresTimestampOracleConfig) {
448        info!(params = ?self, "Applying configuration update!");
449
450        // Deconstruct self so we get a compile failure if new fields are added.
451        let Self {
452            pg_connection_pool_max_size,
453            pg_connection_pool_max_wait,
454            pg_connection_pool_ttl,
455            pg_connection_pool_ttl_stagger,
456            pg_connection_pool_connect_timeout,
457            pg_connection_pool_tcp_user_timeout,
458            pg_connection_pool_keepalives_idle,
459            pg_connection_pool_keepalives_interval,
460            pg_connection_pool_keepalives_retries,
461        } = self;
462        if let Some(pg_connection_pool_max_size) = pg_connection_pool_max_size {
463            cfg.dynamic
464                .pg_connection_pool_max_size
465                .store(*pg_connection_pool_max_size, DynamicConfig::STORE_ORDERING);
466        }
467        if let Some(pg_connection_pool_max_wait) = pg_connection_pool_max_wait {
468            let mut max_wait = cfg
469                .dynamic
470                .pg_connection_pool_max_wait
471                .write()
472                .expect("lock poisoned");
473            *max_wait = *pg_connection_pool_max_wait;
474        }
475        if let Some(pg_connection_pool_ttl) = pg_connection_pool_ttl {
476            let mut ttl = cfg
477                .dynamic
478                .pg_connection_pool_ttl
479                .write()
480                .expect("lock poisoned");
481            *ttl = *pg_connection_pool_ttl;
482        }
483        if let Some(pg_connection_pool_ttl_stagger) = pg_connection_pool_ttl_stagger {
484            let mut ttl_stagger = cfg
485                .dynamic
486                .pg_connection_pool_ttl_stagger
487                .write()
488                .expect("lock poisoned");
489            *ttl_stagger = *pg_connection_pool_ttl_stagger;
490        }
491        if let Some(pg_connection_pool_connect_timeout) = pg_connection_pool_connect_timeout {
492            let mut timeout = cfg
493                .dynamic
494                .pg_connection_pool_connect_timeout
495                .write()
496                .expect("lock poisoned");
497            *timeout = *pg_connection_pool_connect_timeout;
498        }
499        if let Some(pg_connection_pool_tcp_user_timeout) = pg_connection_pool_tcp_user_timeout {
500            let mut timeout = cfg
501                .dynamic
502                .pg_connection_pool_tcp_user_timeout
503                .write()
504                .expect("lock poisoned");
505            *timeout = *pg_connection_pool_tcp_user_timeout;
506        }
507        if let Some(pg_connection_pool_keepalives_idle) = pg_connection_pool_keepalives_idle {
508            let mut timeout = cfg
509                .dynamic
510                .pg_connection_pool_keepalives_idle
511                .write()
512                .expect("lock poisoned");
513            *timeout = *pg_connection_pool_keepalives_idle;
514        }
515        if let Some(pg_connection_pool_keepalives_interval) = pg_connection_pool_keepalives_interval
516        {
517            let mut timeout = cfg
518                .dynamic
519                .pg_connection_pool_keepalives_interval
520                .write()
521                .expect("lock poisoned");
522            *timeout = *pg_connection_pool_keepalives_interval;
523        }
524        if let Some(pg_connection_pool_keepalives_retries) = pg_connection_pool_keepalives_retries {
525            cfg.dynamic.pg_connection_pool_keepalives_retries.store(
526                *pg_connection_pool_keepalives_retries,
527                DynamicConfig::STORE_ORDERING,
528            );
529        }
530    }
531}
532
533impl<N> PostgresTimestampOracle<N>
534where
535    N: GenericNowFn<Timestamp> + std::fmt::Debug + 'static,
536{
537    /// Open a Postgres [`TimestampOracle`] instance with `config`, for the
538    /// timeline named `timeline`. `next` generates new timestamps when invoked.
539    /// Timestamps that are returned are made durable and will never retract.
540    pub async fn open(
541        config: PostgresTimestampOracleConfig,
542        timeline: String,
543        initially: Timestamp,
544        next: N,
545        read_only: bool,
546    ) -> Self {
547        info!(config = ?config, "opening PostgresTimestampOracle");
548
549        let fallible = || async {
550            let metrics = Arc::clone(&config.metrics);
551
552            // don't need to unredact here because we're just pulling out the username
553            let pg_config: Config = config.url.to_string().parse()?;
554            let role = pg_config.get_user().unwrap();
555            let create_schema = format!(
556                "CREATE SCHEMA IF NOT EXISTS tsoracle AUTHORIZATION {}",
557                escape_identifier(role),
558            );
559
560            let postgres_client = PostgresClient::open(config.clone().into())?;
561
562            let client = postgres_client.get_connection().await?;
563
564            let crdb_mode = match client
565                .batch_execute(&format!(
566                    "{}; {}{}; {}",
567                    create_schema, SCHEMA, CRDB_SCHEMA_OPTIONS, CRDB_CONFIGURE_ZONE,
568                ))
569                .await
570            {
571                Ok(()) => true,
572                Err(e)
573                    if e.code() == Some(&SqlState::INVALID_PARAMETER_VALUE)
574                        || e.code() == Some(&SqlState::SYNTAX_ERROR) =>
575                {
576                    info!(
577                        "unable to initiate timestamp_oracle with CRDB params, this is expected and OK when running against Postgres: {:?}",
578                        e
579                    );
580                    false
581                }
582                Err(e) => return Err(e.into()),
583            };
584
585            if !crdb_mode {
586                client
587                    .batch_execute(&format!("{}; {};", create_schema, SCHEMA))
588                    .await?;
589            }
590
591            let oracle = PostgresTimestampOracle {
592                timeline: timeline.clone(),
593                next: next.clone(),
594                postgres_client: Arc::new(postgres_client),
595                metrics,
596                read_only,
597            };
598
599            // Create a row for our timeline, if it doesn't exist. The
600            // `apply_write` call below expects the row to be present. If we
601            // didn't have this here we would always need CHECK EXISTS calls or
602            // something in `apply_write`, making it more complicated, so we
603            // only do it once here, on initialization.
604            let q = r#"
605                    INSERT INTO timestamp_oracle (timeline, read_ts, write_ts)
606                        VALUES ($1, $2, $3)
607                        ON CONFLICT (timeline) DO NOTHING;
608                "#;
609            let statement = client.prepare_cached(q).await?;
610
611            let initially_coerced = Self::ts_to_decimal(initially);
612            let _ = client
613                .execute(
614                    &statement,
615                    &[&oracle.timeline, &initially_coerced, &initially_coerced],
616                )
617                .await?;
618
619            // Forward timestamps to what we're given from outside. Remember,
620            // the above query will only create the row at the initial timestamp
621            // if it didn't exist before.
622            if !read_only {
623                TimestampOracle::apply_write(&oracle, initially).await;
624            }
625
626            Result::<_, anyhow::Error>::Ok(oracle)
627        };
628
629        let metrics = &config.metrics.retries.open;
630
631        let oracle = retry_fallible(metrics, fallible).await;
632
633        oracle
634    }
635
636    async fn get_connection(&self) -> Result<Object, PoolError> {
637        self.postgres_client.get_connection().await
638    }
639
640    /// Returns a `Vec` of all known timelines along with their current greatest
641    /// timestamp (max of read_ts and write_ts).
642    ///
643    /// For use when initializing another [`TimestampOracle`] implementation
644    /// from another oracle's state.
645    ///
646    /// NOTE: This method can have linearizability violations, but it's hard to
647    /// get around those. They will only occur during bootstrap, and only
648    /// if/when we migrate between different oracle implementations. Once we
649    /// migrated from the catalog-backed oracle to the postgres/crdb-backed
650    /// oracle we can remove this code and it's callsite.
651    pub async fn get_all_timelines(
652        config: PostgresTimestampOracleConfig,
653    ) -> Result<Vec<(String, Timestamp)>, anyhow::Error> {
654        let fallible = || async {
655            let postgres_client = PostgresClient::open(config.clone().into())?;
656
657            let mut client = postgres_client.get_connection().await?;
658
659            let txn = client.transaction().await?;
660
661            // Using `table_schema = CURRENT_SCHEMA` makes sure we only include
662            // tables that are queryable by us. Otherwise this check might
663            // return true but then the query below fails with a confusing
664            // "table does not exist" error.
665            let q = r#"
666            SELECT EXISTS (SELECT * FROM information_schema.tables WHERE table_name = 'timestamp_oracle' AND table_schema = CURRENT_SCHEMA);
667        "#;
668            let statement = txn.prepare(q).await?;
669            let exists_row = txn.query_one(&statement, &[]).await?;
670            let exists: bool = exists_row.try_get("exists").expect("missing exists column");
671            if !exists {
672                return Ok(Vec::new());
673            }
674
675            let q = r#"
676            SELECT timeline, GREATEST(read_ts, write_ts) as ts FROM timestamp_oracle;
677        "#;
678            let statement = txn.prepare(q).await?;
679            let rows = txn.query(&statement, &[]).await?;
680
681            txn.commit().await?;
682
683            let result = rows
684                .into_iter()
685                .map(|row| {
686                    let timeline: String =
687                        row.try_get("timeline").expect("missing timeline column");
688                    let ts: Numeric = row.try_get("ts").expect("missing ts column");
689                    let ts = Self::decimal_to_ts(ts);
690
691                    (timeline, ts)
692                })
693                .collect::<Vec<_>>();
694
695            Ok(result)
696        };
697
698        let metrics = &config.metrics.retries.get_all_timelines;
699
700        let result = retry_fallible(metrics, fallible).await;
701
702        Ok(result)
703    }
704
705    #[mz_ore::instrument(name = "oracle::write_ts")]
706    async fn fallible_write_ts(&self) -> Result<WriteTimestamp<Timestamp>, anyhow::Error> {
707        if self.read_only {
708            panic!("attempting write_ts in read-only mode");
709        }
710
711        let proposed_next_ts = self.next.now();
712        let proposed_next_ts = Self::ts_to_decimal(proposed_next_ts);
713
714        let q = r#"
715            UPDATE timestamp_oracle SET write_ts = GREATEST(write_ts+1, $2)
716                WHERE timeline = $1
717            RETURNING write_ts;
718        "#;
719        let client = self.get_connection().await?;
720        let statement = client.prepare_cached(q).await?;
721        let result = client
722            .query_one(&statement, &[&self.timeline, &proposed_next_ts])
723            .await?;
724
725        let write_ts: Numeric = result.try_get("write_ts").expect("missing column write_ts");
726        let write_ts = Self::decimal_to_ts(write_ts);
727
728        debug!(
729            timeline = ?self.timeline,
730            write_ts = ?write_ts,
731            proposed_next_ts = ?proposed_next_ts,
732            "returning from write_ts()");
733
734        let advance_to = write_ts.step_forward();
735
736        Ok(WriteTimestamp {
737            timestamp: write_ts,
738            advance_to,
739        })
740    }
741
742    #[mz_ore::instrument(name = "oracle::peek_write_ts")]
743    async fn fallible_peek_write_ts(&self) -> Result<Timestamp, anyhow::Error> {
744        let q = r#"
745            SELECT write_ts FROM timestamp_oracle
746                WHERE timeline = $1;
747        "#;
748        let client = self.get_connection().await?;
749        let statement = client.prepare_cached(q).await?;
750        let result = client.query_one(&statement, &[&self.timeline]).await?;
751
752        let write_ts: Numeric = result.try_get("write_ts").expect("missing column write_ts");
753        let write_ts = Self::decimal_to_ts(write_ts);
754
755        debug!(
756            timeline = ?self.timeline,
757            write_ts = ?write_ts,
758            "returning from peek_write_ts()");
759
760        Ok(write_ts)
761    }
762
763    #[mz_ore::instrument(name = "oracle::read_ts")]
764    async fn fallible_read_ts(&self) -> Result<Timestamp, anyhow::Error> {
765        let q = r#"
766            SELECT read_ts FROM timestamp_oracle
767                WHERE timeline = $1;
768        "#;
769        let client = self.get_connection().await?;
770        let statement = client.prepare_cached(q).await?;
771        let result = client.query_one(&statement, &[&self.timeline]).await?;
772
773        let read_ts: Numeric = result.try_get("read_ts").expect("missing column read_ts");
774        let read_ts = Self::decimal_to_ts(read_ts);
775
776        debug!(
777            timeline = ?self.timeline,
778            read_ts = ?read_ts,
779            "returning from read_ts()");
780
781        Ok(read_ts)
782    }
783
784    #[mz_ore::instrument(name = "oracle::apply_write")]
785    async fn fallible_apply_write(&self, write_ts: Timestamp) -> Result<(), anyhow::Error> {
786        if self.read_only {
787            panic!("attempting apply_write in read-only mode");
788        }
789
790        let q = r#"
791            UPDATE timestamp_oracle SET write_ts = GREATEST(write_ts, $2), read_ts = GREATEST(read_ts, $2)
792                WHERE timeline = $1;
793        "#;
794        let client = self.get_connection().await?;
795        let statement = client.prepare_cached(q).await?;
796        let write_ts = Self::ts_to_decimal(write_ts);
797
798        let _ = client
799            .execute(&statement, &[&self.timeline, &write_ts])
800            .await?;
801
802        debug!(
803            timeline = ?self.timeline,
804            write_ts = ?write_ts,
805            "returning from apply_write()");
806
807        Ok(())
808    }
809
810    // We could add `From` impls for these but for now keep the code local to
811    // the oracle.
812    fn ts_to_decimal(ts: Timestamp) -> Numeric {
813        let decimal = Decimal::from(ts);
814        Numeric::from(decimal)
815    }
816
817    // We could add `From` impls for these but for now keep the code local to
818    // the oracle.
819    fn decimal_to_ts(ts: Numeric) -> Timestamp {
820        ts.0.0.try_into().expect("we only use u64 timestamps")
821    }
822}
823
824// A wrapper around the `fallible_` methods that adds operation metrics and
825// retries.
826//
827// NOTE: This implementation is tied to [`mz_repr::Timestamp`]. We could change
828// that, and also make the types we store in the backing "Postgres" table
829// generic. But in practice we only use oracles for [`mz_repr::Timestamp`] so
830// don't do that extra work for now.
831#[async_trait]
832impl<N> TimestampOracle<Timestamp> for PostgresTimestampOracle<N>
833where
834    N: GenericNowFn<Timestamp> + std::fmt::Debug + 'static,
835{
836    #[instrument]
837    async fn write_ts(&self) -> WriteTimestamp<Timestamp> {
838        let metrics = &self.metrics.retries.write_ts;
839
840        let res = retry_fallible(metrics, || {
841            self.metrics
842                .oracle
843                .write_ts
844                .run_op(|| self.fallible_write_ts())
845        })
846        .await;
847
848        res
849    }
850
851    #[instrument]
852    async fn peek_write_ts(&self) -> Timestamp {
853        let metrics = &self.metrics.retries.peek_write_ts;
854
855        let res = retry_fallible(metrics, || {
856            self.metrics
857                .oracle
858                .peek_write_ts
859                .run_op(|| self.fallible_peek_write_ts())
860        })
861        .await;
862
863        res
864    }
865
866    #[instrument]
867    async fn read_ts(&self) -> Timestamp {
868        let metrics = &self.metrics.retries.read_ts;
869
870        let res = retry_fallible(metrics, || {
871            self.metrics
872                .oracle
873                .read_ts
874                .run_op(|| self.fallible_read_ts())
875        })
876        .await;
877
878        res
879    }
880
881    #[instrument]
882    async fn apply_write(&self, write_ts: Timestamp) {
883        let metrics = &self.metrics.retries.apply_write;
884
885        let res = retry_fallible(metrics, || {
886            self.metrics
887                .oracle
888                .apply_write
889                .run_op(|| self.fallible_apply_write(write_ts.clone()))
890        })
891        .await;
892
893        res
894    }
895}
896
897pub const INFO_MIN_ATTEMPTS: usize = 3;
898
899pub async fn retry_fallible<R, F, WorkFn>(metrics: &RetryMetrics, mut work_fn: WorkFn) -> R
900where
901    F: std::future::Future<Output = Result<R, anyhow::Error>>,
902    WorkFn: FnMut() -> F,
903{
904    let mut retry = metrics.stream(Retry::oracle_defaults(SystemTime::now()).into_retry_stream());
905    loop {
906        match work_fn().await {
907            Ok(x) => {
908                if retry.attempt() > 0 {
909                    debug!(
910                        "external operation {} succeeded after failing at least once",
911                        metrics.name,
912                    );
913                }
914                return x;
915            }
916            Err(err)
917                if err
918                    .to_string()
919                    .contains("\"timestamp_oracle\" does not exist") =>
920            {
921                // TODO(aljoscha): Re-consider this before enabling the oracle
922                // in production.
923                // It's a policy question whether this actually _is_ an
924                // unrecoverable error: an operator could go in an re-create the
925                // `timestamp_oracle` table once the problem is noticed.
926                // However, our tests currently assume that materialize will
927                // fail unrecoverably in cases where we do a rug-pull/remove
928                // expected tables from CRDB.
929                panic!(
930                    "external operation {} failed unrecoverably, someone removed our database/schema/table: {}",
931                    metrics.name,
932                    err.display_with_causes()
933                );
934            }
935            Err(err) => {
936                if retry.attempt() >= INFO_MIN_ATTEMPTS {
937                    info!(
938                        "external operation {} failed, retrying in {:?}: {}",
939                        metrics.name,
940                        retry.next_sleep(),
941                        err.display_with_causes()
942                    );
943                } else {
944                    info!(
945                        "external operation {} failed, retrying in {:?}: {}",
946                        metrics.name,
947                        retry.next_sleep(),
948                        err.display_with_causes()
949                    );
950                }
951                retry = retry.sleep().await;
952            }
953        }
954    }
955}
956
957#[cfg(test)]
958mod tests {
959    use super::*;
960
961    #[mz_ore::test(tokio::test)]
962    #[cfg_attr(miri, ignore)] // error: unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
963    async fn test_postgres_timestamp_oracle() -> Result<(), anyhow::Error> {
964        let config = match PostgresTimestampOracleConfig::new_for_test() {
965            Some(config) => config,
966            None => {
967                info!(
968                    "{} env not set: skipping test that uses external service",
969                    PostgresTimestampOracleConfig::EXTERNAL_TESTS_POSTGRES_URL
970                );
971                return Ok(());
972            }
973        };
974
975        crate::tests::timestamp_oracle_impl_test(|timeline, now_fn, initial_ts| {
976            let oracle = PostgresTimestampOracle::open(
977                config.clone(),
978                timeline,
979                initial_ts,
980                now_fn,
981                false, /* read-only */
982            );
983
984            async {
985                let arced_oracle: Arc<dyn TimestampOracle<Timestamp> + Send + Sync> =
986                    Arc::new(oracle.await);
987
988                arced_oracle
989            }
990        })
991        .await?;
992
993        Ok(())
994    }
995}