Skip to main content

mz_timestamp_oracle/
postgres_oracle.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! A timestamp oracle backed by "Postgres" for persistence/durability and where
11//! all oracle operations are self-sufficiently linearized, without requiring
12//! any external precautions/machinery.
13
14use std::str::FromStr;
15use std::sync::atomic::{AtomicU32, AtomicUsize, Ordering};
16use std::sync::{Arc, RwLock};
17use std::time::{Duration, SystemTime};
18
19use async_trait::async_trait;
20use deadpool_postgres::tokio_postgres::Config;
21use deadpool_postgres::tokio_postgres::error::SqlState;
22use deadpool_postgres::tokio_postgres::types::ToSql;
23use deadpool_postgres::tokio_postgres::{Row, Statement};
24use deadpool_postgres::{Object, PoolError};
25use dec::Decimal;
26use mz_adapter_types::timestamp_oracle::{
27    DEFAULT_PG_TIMESTAMP_ORACLE_CONNECT_TIMEOUT, DEFAULT_PG_TIMESTAMP_ORACLE_CONNPOOL_MAX_SIZE,
28    DEFAULT_PG_TIMESTAMP_ORACLE_CONNPOOL_MAX_WAIT, DEFAULT_PG_TIMESTAMP_ORACLE_CONNPOOL_TTL,
29    DEFAULT_PG_TIMESTAMP_ORACLE_CONNPOOL_TTL_STAGGER, DEFAULT_PG_TIMESTAMP_ORACLE_KEEPALIVES_IDLE,
30    DEFAULT_PG_TIMESTAMP_ORACLE_KEEPALIVES_INTERVAL,
31    DEFAULT_PG_TIMESTAMP_ORACLE_KEEPALIVES_RETRIES, DEFAULT_PG_TIMESTAMP_ORACLE_TCP_USER_TIMEOUT,
32};
33use mz_ore::error::ErrorExt;
34use mz_ore::instrument;
35use mz_ore::metrics::MetricsRegistry;
36use mz_ore::url::SensitiveUrl;
37use mz_pgrepr::Numeric;
38use mz_postgres_client::{PostgresClient, PostgresClientConfig, PostgresClientKnobs};
39use mz_repr::Timestamp;
40use postgres_protocol::escape::escape_identifier;
41use serde::{Deserialize, Serialize};
42use tracing::{debug, info};
43
44use crate::WriteTimestamp;
45use crate::metrics::{Metrics, RetryMetrics};
46use crate::retry::Retry;
47use crate::{GenericNowFn, TimestampOracle};
48
49// The timestamp columns are a `DECIMAL` that is big enough to hold
50// `18446744073709551615`, the maximum value of `u64` which is our underlying
51// timestamp type.
52const SCHEMA: &str = "
53CREATE TABLE IF NOT EXISTS timestamp_oracle (
54    timeline text NOT NULL,
55    read_ts DECIMAL(20,0) NOT NULL,
56    write_ts DECIMAL(20,0) NOT NULL,
57    PRIMARY KEY(timeline)
58)
59";
60
61// These `sql_stats_automatic_collection_enabled` are for the cost-based
62// optimizer but all the queries against this table are single-table and very
63// carefully tuned to hit the primary index, so the cost-based optimizer doesn't
64// really get us anything. OTOH, the background jobs that crdb creates to
65// collect these stats fill up the jobs table (slowing down all sorts of
66// things).
67const CRDB_SCHEMA_OPTIONS: &str = "WITH (sql_stats_automatic_collection_enabled = false)";
68// The `timestamp_oracle` table creates and deletes rows at a high
69// frequency, generating many tombstoned rows. If Cockroach's GC
70// interval is set high (the default is 25h) and these tombstones
71// accumulate, scanning over the table will take increasingly and
72// prohibitively long.
73//
74// See: https://github.com/MaterializeInc/database-issues/issues/4001
75// See: https://www.cockroachlabs.com/docs/stable/configure-zone.html#variables
76const CRDB_CONFIGURE_ZONE: &str =
77    "ALTER TABLE timestamp_oracle CONFIGURE ZONE USING gc.ttlseconds = 600;";
78
79/// NOTE: `mz-timestamp-oracle` currently keeps its Postgres surface local; it
80/// does not use `mz-postgres-util` wrappers.
81async fn pg_batch_execute(
82    client: &Object,
83    query: &str,
84) -> Result<(), deadpool_postgres::tokio_postgres::Error> {
85    #[allow(clippy::disallowed_methods)]
86    client.batch_execute(query).await
87}
88
89async fn pg_query_one_prepared(
90    client: &Object,
91    statement: &Statement,
92    params: &[&(dyn ToSql + Sync)],
93) -> Result<Row, deadpool_postgres::tokio_postgres::Error> {
94    #[allow(clippy::disallowed_methods)]
95    client.query_one(statement, params).await
96}
97
98async fn pg_execute_prepared(
99    client: &Object,
100    statement: &Statement,
101    params: &[&(dyn ToSql + Sync)],
102) -> Result<u64, deadpool_postgres::tokio_postgres::Error> {
103    #[allow(clippy::disallowed_methods)]
104    client.execute(statement, params).await
105}
106
107async fn pg_txn_query_prepared(
108    txn: &deadpool_postgres::Transaction<'_>,
109    statement: &Statement,
110    params: &[&(dyn ToSql + Sync)],
111) -> Result<Vec<Row>, deadpool_postgres::tokio_postgres::Error> {
112    #[allow(clippy::disallowed_methods)]
113    txn.query(statement, params).await
114}
115
116async fn pg_txn_query_one_prepared(
117    txn: &deadpool_postgres::Transaction<'_>,
118    statement: &Statement,
119    params: &[&(dyn ToSql + Sync)],
120) -> Result<Row, deadpool_postgres::tokio_postgres::Error> {
121    #[allow(clippy::disallowed_methods)]
122    txn.query_one(statement, params).await
123}
124
125/// A [`TimestampOracle`] backed by "Postgres".
126#[derive(Debug)]
127pub struct PostgresTimestampOracle<N>
128where
129    N: GenericNowFn<Timestamp>,
130{
131    timeline: String,
132    next: N,
133    postgres_client: Arc<PostgresClient>,
134    metrics: Arc<Metrics>,
135    /// A read-only timestamp oracle is NOT allowed to do operations that change
136    /// the backing Postgres/CRDB state.
137    read_only: bool,
138}
139
140/// Configuration to connect to a Postgres-backed implementation of
141/// [`TimestampOracle`].
142#[derive(Clone, Debug)]
143pub struct PostgresTimestampOracleConfig {
144    url: SensitiveUrl,
145    metrics: Arc<Metrics>,
146
147    /// Configurations that can be dynamically updated.
148    pub dynamic: Arc<DynamicConfig>,
149}
150
151impl From<PostgresTimestampOracleConfig> for PostgresClientConfig {
152    fn from(config: PostgresTimestampOracleConfig) -> Self {
153        let metrics = config.metrics.postgres_client.clone();
154        PostgresClientConfig::new(config.url.clone(), Arc::new(config), metrics)
155    }
156}
157
158impl PostgresTimestampOracleConfig {
159    pub(crate) const EXTERNAL_TESTS_POSTGRES_URL: &'static str = "METADATA_BACKEND_URL";
160
161    /// Returns a new instance of [`PostgresTimestampOracleConfig`] with default tuning.
162    pub fn new(url: &SensitiveUrl, metrics_registry: &MetricsRegistry) -> Self {
163        let metrics = Arc::new(Metrics::new(metrics_registry));
164
165        let dynamic = DynamicConfig::default();
166
167        PostgresTimestampOracleConfig {
168            url: url.clone(),
169            metrics,
170            dynamic: Arc::new(dynamic),
171        }
172    }
173
174    /// Returns a new [`PostgresTimestampOracleConfig`] for use in unit tests.
175    ///
176    /// By default, postgres oracle tests are no-ops so that `cargo test` works
177    /// on new environments without any configuration. To activate the tests for
178    /// [`PostgresTimestampOracle`] set the `METADATA_BACKEND_URL` environment variable
179    /// with a valid connection url [1].
180    ///
181    /// [1]: https://docs.rs/tokio-postgres/latest/tokio_postgres/config/struct.Config.html#url
182    pub fn new_for_test() -> Option<Self> {
183        let url = match std::env::var(Self::EXTERNAL_TESTS_POSTGRES_URL) {
184            Ok(url) => SensitiveUrl::from_str(&url).expect("invalid Postgres URL"),
185            Err(_) => {
186                if mz_ore::env::is_var_truthy("CI") {
187                    panic!("CI is supposed to run this test but something has gone wrong!");
188                }
189                return None;
190            }
191        };
192
193        let dynamic = DynamicConfig::default();
194
195        let config = PostgresTimestampOracleConfig {
196            url,
197            metrics: Arc::new(Metrics::new(&MetricsRegistry::new())),
198            dynamic: Arc::new(dynamic),
199        };
200
201        Some(config)
202    }
203
204    /// Returns the metrics associated with this config.
205    pub(crate) fn metrics(&self) -> &Arc<Metrics> {
206        &self.metrics
207    }
208}
209
210/// Part of [`PostgresTimestampOracleConfig`] that can be dynamically updated.
211///
212/// The timestamp oracle is expected to react to each of these such that
213/// updating the value returned by the function takes effect (i.e. no caching
214/// it). This should happen "as promptly as reasonably possible" where that's
215/// defined by the tradeoffs of complexity vs promptness.
216///
217/// These are hooked up to LaunchDarkly. Specifically, LaunchDarkly configs are
218/// serialized into a [`TimestampOracleParameters`]. In environmentd,
219/// these are applied directly via [`TimestampOracleParameters::apply`]
220/// to the [`PostgresTimestampOracleConfig`].
221#[derive(Debug)]
222pub struct DynamicConfig {
223    /// The maximum size of the connection pool to Postgres/CRDB.
224    pg_connection_pool_max_size: AtomicUsize,
225
226    /// The maximum time to wait when attempting to obtain a connection from the pool.
227    pg_connection_pool_max_wait: RwLock<Option<Duration>>,
228
229    /// The minimum TTL of a connection to Postgres/CRDB before it is
230    /// proactively terminated. Connections are routinely culled to balance load
231    /// against the downstream database.
232    pg_connection_pool_ttl: RwLock<Duration>,
233
234    /// The minimum time between TTLing connections to Postgres/CRDB. This delay
235    /// is used to stagger reconnections to avoid stampedes and high tail
236    /// latencies. This value should be much less than `connection_pool_ttl` so
237    /// that reconnections are biased towards terminating the oldest connections
238    /// first. A value of `connection_pool_ttl / connection_pool_max_size` is
239    /// likely a good place to start so that all connections are rotated when
240    /// the pool is fully used.
241    pg_connection_pool_ttl_stagger: RwLock<Duration>,
242
243    /// The duration to wait for a Postgres/CRDB connection to be made before
244    /// retrying.
245    pg_connection_pool_connect_timeout: RwLock<Duration>,
246
247    /// The TCP user timeout for a Postgres/CRDB connection. Specifies the
248    /// amount of time that transmitted data may remain unacknowledged before
249    /// the TCP connection is forcibly closed.
250    pg_connection_pool_tcp_user_timeout: RwLock<Duration>,
251
252    /// The amount of idle time before a TCP keepalive packet is sent on
253    /// Postgres/CRDB connections.
254    pg_connection_pool_keepalives_idle: RwLock<Duration>,
255
256    /// The time interval between TCP keepalive probes on Postgres/CRDB
257    /// connections.
258    pg_connection_pool_keepalives_interval: RwLock<Duration>,
259
260    /// The maximum number of TCP keepalive probes that will be sent before
261    /// dropping a Postgres/CRDB connection.
262    pg_connection_pool_keepalives_retries: AtomicU32,
263}
264
265impl Default for DynamicConfig {
266    fn default() -> Self {
267        Self {
268            // TODO(aljoscha): These defaults are taken as is from the persist
269            // Consensus tuning. Once we're sufficiently advanced we might want
270            // to pick defaults that are different from the persist defaults.
271            pg_connection_pool_max_size: AtomicUsize::new(
272                DEFAULT_PG_TIMESTAMP_ORACLE_CONNPOOL_MAX_SIZE,
273            ),
274            pg_connection_pool_max_wait: RwLock::new(Some(
275                DEFAULT_PG_TIMESTAMP_ORACLE_CONNPOOL_MAX_WAIT,
276            )),
277            pg_connection_pool_ttl: RwLock::new(DEFAULT_PG_TIMESTAMP_ORACLE_CONNPOOL_TTL),
278            pg_connection_pool_ttl_stagger: RwLock::new(
279                DEFAULT_PG_TIMESTAMP_ORACLE_CONNPOOL_TTL_STAGGER,
280            ),
281            pg_connection_pool_connect_timeout: RwLock::new(
282                DEFAULT_PG_TIMESTAMP_ORACLE_CONNECT_TIMEOUT,
283            ),
284            pg_connection_pool_tcp_user_timeout: RwLock::new(
285                DEFAULT_PG_TIMESTAMP_ORACLE_TCP_USER_TIMEOUT,
286            ),
287            pg_connection_pool_keepalives_idle: RwLock::new(
288                DEFAULT_PG_TIMESTAMP_ORACLE_KEEPALIVES_IDLE,
289            ),
290            pg_connection_pool_keepalives_interval: RwLock::new(
291                DEFAULT_PG_TIMESTAMP_ORACLE_KEEPALIVES_INTERVAL,
292            ),
293            pg_connection_pool_keepalives_retries: AtomicU32::new(
294                DEFAULT_PG_TIMESTAMP_ORACLE_KEEPALIVES_RETRIES,
295            ),
296        }
297    }
298}
299
300impl DynamicConfig {
301    // TODO: Decide if we can relax these.
302    const LOAD_ORDERING: Ordering = Ordering::SeqCst;
303    const STORE_ORDERING: Ordering = Ordering::SeqCst;
304
305    fn connection_pool_max_size(&self) -> usize {
306        self.pg_connection_pool_max_size.load(Self::LOAD_ORDERING)
307    }
308
309    fn connection_pool_max_wait(&self) -> Option<Duration> {
310        *self
311            .pg_connection_pool_max_wait
312            .read()
313            .expect("lock poisoned")
314    }
315
316    fn connection_pool_ttl(&self) -> Duration {
317        *self.pg_connection_pool_ttl.read().expect("lock poisoned")
318    }
319
320    fn connection_pool_ttl_stagger(&self) -> Duration {
321        *self
322            .pg_connection_pool_ttl_stagger
323            .read()
324            .expect("lock poisoned")
325    }
326
327    fn connect_timeout(&self) -> Duration {
328        *self
329            .pg_connection_pool_connect_timeout
330            .read()
331            .expect("lock poisoned")
332    }
333
334    fn tcp_user_timeout(&self) -> Duration {
335        *self
336            .pg_connection_pool_tcp_user_timeout
337            .read()
338            .expect("lock poisoned")
339    }
340
341    fn keepalives_idle(&self) -> Duration {
342        *self
343            .pg_connection_pool_keepalives_idle
344            .read()
345            .expect("lock poisoned")
346    }
347
348    fn keepalives_interval(&self) -> Duration {
349        *self
350            .pg_connection_pool_keepalives_interval
351            .read()
352            .expect("lock poisoned")
353    }
354
355    fn keepalives_retries(&self) -> u32 {
356        self.pg_connection_pool_keepalives_retries
357            .load(Self::LOAD_ORDERING)
358    }
359}
360
361impl PostgresClientKnobs for PostgresTimestampOracleConfig {
362    fn connection_pool_max_size(&self) -> usize {
363        self.dynamic.connection_pool_max_size()
364    }
365
366    fn connection_pool_max_wait(&self) -> Option<Duration> {
367        self.dynamic.connection_pool_max_wait()
368    }
369
370    fn connection_pool_ttl(&self) -> Duration {
371        self.dynamic.connection_pool_ttl()
372    }
373
374    fn connection_pool_ttl_stagger(&self) -> Duration {
375        self.dynamic.connection_pool_ttl_stagger()
376    }
377
378    fn connect_timeout(&self) -> Duration {
379        self.dynamic.connect_timeout()
380    }
381
382    fn tcp_user_timeout(&self) -> Duration {
383        self.dynamic.tcp_user_timeout()
384    }
385
386    fn keepalives_idle(&self) -> Duration {
387        self.dynamic.keepalives_idle()
388    }
389
390    fn keepalives_interval(&self) -> Duration {
391        self.dynamic.keepalives_interval()
392    }
393
394    fn keepalives_retries(&self) -> u32 {
395        self.dynamic.keepalives_retries()
396    }
397}
398
399/// Updates to values in [`PostgresTimestampOracleConfig`].
400///
401/// These reflect updates made to LaunchDarkly.
402///
403/// Parameters can be set (`Some`) or unset (`None`). Unset parameters should be
404/// interpreted to mean "use the previous value".
405#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
406pub struct TimestampOracleParameters {
407    /// Configures `DynamicConfig::pg_connection_pool_max_size`.
408    pub pg_connection_pool_max_size: Option<usize>,
409    /// Configures `DynamicConfig::pg_connection_pool_max_wait`.
410    ///
411    /// NOTE: Yes, this is an `Option<Option<...>>`. Fields in this struct are
412    /// all `Option` to signal the presence or absence of a system var
413    /// configuration. A `Some(None)` value in this field signals the desire to
414    /// have no `max_wait`. This is different from `None`, which signals that
415    /// there is no actively set system var, although this case is not currently
416    /// possible with how the code around this works.
417    pub pg_connection_pool_max_wait: Option<Option<Duration>>,
418    /// Configures `DynamicConfig::pg_connection_pool_ttl`.
419    pub pg_connection_pool_ttl: Option<Duration>,
420    /// Configures `DynamicConfig::pg_connection_pool_ttl_stagger`.
421    pub pg_connection_pool_ttl_stagger: Option<Duration>,
422    /// Configures `DynamicConfig::pg_connection_pool_connect_timeout`.
423    pub pg_connection_pool_connect_timeout: Option<Duration>,
424    /// Configures `DynamicConfig::pg_connection_pool_tcp_user_timeout`.
425    pub pg_connection_pool_tcp_user_timeout: Option<Duration>,
426    /// Configures `DynamicConfig::pg_connection_pool_keepalives_idle`.
427    pub pg_connection_pool_keepalives_idle: Option<Duration>,
428    /// Configures `DynamicConfig::pg_connection_pool_keepalives_interval`.
429    pub pg_connection_pool_keepalives_interval: Option<Duration>,
430    /// Configures `DynamicConfig::pg_connection_pool_keepalives_retries`.
431    pub pg_connection_pool_keepalives_retries: Option<u32>,
432}
433
434impl TimestampOracleParameters {
435    /// Update the parameter values with the set ones from `other`.
436    pub fn update(&mut self, other: TimestampOracleParameters) {
437        // Deconstruct self and other so we get a compile failure if new fields
438        // are added.
439        let Self {
440            pg_connection_pool_max_size: self_pg_connection_pool_max_size,
441            pg_connection_pool_max_wait: self_pg_connection_pool_max_wait,
442            pg_connection_pool_ttl: self_pg_connection_pool_ttl,
443            pg_connection_pool_ttl_stagger: self_pg_connection_pool_ttl_stagger,
444            pg_connection_pool_connect_timeout: self_pg_connection_pool_connect_timeout,
445            pg_connection_pool_tcp_user_timeout: self_pg_connection_pool_tcp_user_timeout,
446            pg_connection_pool_keepalives_idle: self_pg_connection_pool_keepalives_idle,
447            pg_connection_pool_keepalives_interval: self_pg_connection_pool_keepalives_interval,
448            pg_connection_pool_keepalives_retries: self_pg_connection_pool_keepalives_retries,
449        } = self;
450        let Self {
451            pg_connection_pool_max_size: other_pg_connection_pool_max_size,
452            pg_connection_pool_max_wait: other_pg_connection_pool_max_wait,
453            pg_connection_pool_ttl: other_pg_connection_pool_ttl,
454            pg_connection_pool_ttl_stagger: other_pg_connection_pool_ttl_stagger,
455            pg_connection_pool_connect_timeout: other_pg_connection_pool_connect_timeout,
456            pg_connection_pool_tcp_user_timeout: other_pg_connection_pool_tcp_user_timeout,
457            pg_connection_pool_keepalives_idle: other_pg_connection_pool_keepalives_idle,
458            pg_connection_pool_keepalives_interval: other_pg_connection_pool_keepalives_interval,
459            pg_connection_pool_keepalives_retries: other_pg_connection_pool_keepalives_retries,
460        } = other;
461        if let Some(v) = other_pg_connection_pool_max_size {
462            *self_pg_connection_pool_max_size = Some(v);
463        }
464        if let Some(v) = other_pg_connection_pool_max_wait {
465            *self_pg_connection_pool_max_wait = Some(v);
466        }
467        if let Some(v) = other_pg_connection_pool_ttl {
468            *self_pg_connection_pool_ttl = Some(v);
469        }
470        if let Some(v) = other_pg_connection_pool_ttl_stagger {
471            *self_pg_connection_pool_ttl_stagger = Some(v);
472        }
473        if let Some(v) = other_pg_connection_pool_connect_timeout {
474            *self_pg_connection_pool_connect_timeout = Some(v);
475        }
476        if let Some(v) = other_pg_connection_pool_tcp_user_timeout {
477            *self_pg_connection_pool_tcp_user_timeout = Some(v);
478        }
479        if let Some(v) = other_pg_connection_pool_keepalives_idle {
480            *self_pg_connection_pool_keepalives_idle = Some(v);
481        }
482        if let Some(v) = other_pg_connection_pool_keepalives_interval {
483            *self_pg_connection_pool_keepalives_interval = Some(v);
484        }
485        if let Some(v) = other_pg_connection_pool_keepalives_retries {
486            *self_pg_connection_pool_keepalives_retries = Some(v);
487        }
488    }
489
490    /// Applies the parameter values to the given in-memory config object.
491    ///
492    /// Note that these overrides are not all applied atomically: i.e. it's
493    /// possible for the timestamp oracle/postgres client to race with this and
494    /// see some but not all of the parameters applied.
495    pub fn apply(&self, cfg: &PostgresTimestampOracleConfig) {
496        info!(params = ?self, "Applying configuration update!");
497
498        // Deconstruct self so we get a compile failure if new fields are added.
499        let Self {
500            pg_connection_pool_max_size,
501            pg_connection_pool_max_wait,
502            pg_connection_pool_ttl,
503            pg_connection_pool_ttl_stagger,
504            pg_connection_pool_connect_timeout,
505            pg_connection_pool_tcp_user_timeout,
506            pg_connection_pool_keepalives_idle,
507            pg_connection_pool_keepalives_interval,
508            pg_connection_pool_keepalives_retries,
509        } = self;
510        if let Some(pg_connection_pool_max_size) = pg_connection_pool_max_size {
511            cfg.dynamic
512                .pg_connection_pool_max_size
513                .store(*pg_connection_pool_max_size, DynamicConfig::STORE_ORDERING);
514        }
515        if let Some(pg_connection_pool_max_wait) = pg_connection_pool_max_wait {
516            let mut max_wait = cfg
517                .dynamic
518                .pg_connection_pool_max_wait
519                .write()
520                .expect("lock poisoned");
521            *max_wait = *pg_connection_pool_max_wait;
522        }
523        if let Some(pg_connection_pool_ttl) = pg_connection_pool_ttl {
524            let mut ttl = cfg
525                .dynamic
526                .pg_connection_pool_ttl
527                .write()
528                .expect("lock poisoned");
529            *ttl = *pg_connection_pool_ttl;
530        }
531        if let Some(pg_connection_pool_ttl_stagger) = pg_connection_pool_ttl_stagger {
532            let mut ttl_stagger = cfg
533                .dynamic
534                .pg_connection_pool_ttl_stagger
535                .write()
536                .expect("lock poisoned");
537            *ttl_stagger = *pg_connection_pool_ttl_stagger;
538        }
539        if let Some(pg_connection_pool_connect_timeout) = pg_connection_pool_connect_timeout {
540            let mut timeout = cfg
541                .dynamic
542                .pg_connection_pool_connect_timeout
543                .write()
544                .expect("lock poisoned");
545            *timeout = *pg_connection_pool_connect_timeout;
546        }
547        if let Some(pg_connection_pool_tcp_user_timeout) = pg_connection_pool_tcp_user_timeout {
548            let mut timeout = cfg
549                .dynamic
550                .pg_connection_pool_tcp_user_timeout
551                .write()
552                .expect("lock poisoned");
553            *timeout = *pg_connection_pool_tcp_user_timeout;
554        }
555        if let Some(pg_connection_pool_keepalives_idle) = pg_connection_pool_keepalives_idle {
556            let mut timeout = cfg
557                .dynamic
558                .pg_connection_pool_keepalives_idle
559                .write()
560                .expect("lock poisoned");
561            *timeout = *pg_connection_pool_keepalives_idle;
562        }
563        if let Some(pg_connection_pool_keepalives_interval) = pg_connection_pool_keepalives_interval
564        {
565            let mut timeout = cfg
566                .dynamic
567                .pg_connection_pool_keepalives_interval
568                .write()
569                .expect("lock poisoned");
570            *timeout = *pg_connection_pool_keepalives_interval;
571        }
572        if let Some(pg_connection_pool_keepalives_retries) = pg_connection_pool_keepalives_retries {
573            cfg.dynamic.pg_connection_pool_keepalives_retries.store(
574                *pg_connection_pool_keepalives_retries,
575                DynamicConfig::STORE_ORDERING,
576            );
577        }
578    }
579}
580
581impl<N> PostgresTimestampOracle<N>
582where
583    N: GenericNowFn<Timestamp> + std::fmt::Debug + 'static,
584{
585    /// Open a Postgres [`TimestampOracle`] instance with `config`, for the
586    /// timeline named `timeline`. `next` generates new timestamps when invoked.
587    /// Timestamps that are returned are made durable and will never retract.
588    pub async fn open(
589        config: PostgresTimestampOracleConfig,
590        timeline: String,
591        initially: Timestamp,
592        next: N,
593        read_only: bool,
594    ) -> Self {
595        info!(config = ?config, "opening PostgresTimestampOracle");
596
597        let fallible = || async {
598            let metrics = Arc::clone(&config.metrics);
599
600            // don't need to unredact here because we're just pulling out the username
601            let pg_config: Config = config.url.to_string().parse()?;
602            let role = pg_config.get_user().unwrap();
603            let create_schema = format!(
604                "CREATE SCHEMA IF NOT EXISTS tsoracle AUTHORIZATION {}",
605                escape_identifier(role),
606            );
607
608            let postgres_client = PostgresClient::open(config.clone().into())?;
609
610            let client = postgres_client.get_connection().await?;
611
612            let crdb_mode = match pg_batch_execute(
613                &client,
614                &format!(
615                    "{}; {}{}; {}",
616                    create_schema, SCHEMA, CRDB_SCHEMA_OPTIONS, CRDB_CONFIGURE_ZONE,
617                ),
618            )
619            .await
620            {
621                Ok(()) => true,
622                Err(e)
623                    if e.code() == Some(&SqlState::INVALID_PARAMETER_VALUE)
624                        || e.code() == Some(&SqlState::SYNTAX_ERROR) =>
625                {
626                    info!(
627                        "unable to initiate timestamp_oracle with CRDB params, this is expected and OK when running against Postgres: {:?}",
628                        e
629                    );
630                    false
631                }
632                Err(e) => return Err(e.into()),
633            };
634
635            if !crdb_mode {
636                pg_batch_execute(&client, &format!("{}; {};", create_schema, SCHEMA)).await?;
637            }
638
639            let oracle = PostgresTimestampOracle {
640                timeline: timeline.clone(),
641                next: next.clone(),
642                postgres_client: Arc::new(postgres_client),
643                metrics,
644                read_only,
645            };
646
647            // Create a row for our timeline, if it doesn't exist. The
648            // `apply_write` call below expects the row to be present. If we
649            // didn't have this here we would always need CHECK EXISTS calls or
650            // something in `apply_write`, making it more complicated, so we
651            // only do it once here, on initialization.
652            let q = r#"
653                    INSERT INTO timestamp_oracle (timeline, read_ts, write_ts)
654                        VALUES ($1, $2, $3)
655                        ON CONFLICT (timeline) DO NOTHING;
656                "#;
657            let statement = client.prepare_cached(q).await?;
658
659            let initially_coerced = Self::ts_to_decimal(initially);
660            let _ = pg_execute_prepared(
661                &client,
662                &statement,
663                &[&oracle.timeline, &initially_coerced, &initially_coerced],
664            )
665            .await?;
666
667            // Forward timestamps to what we're given from outside. Remember,
668            // the above query will only create the row at the initial timestamp
669            // if it didn't exist before.
670            if !read_only {
671                TimestampOracle::apply_write(&oracle, initially).await;
672            }
673
674            Result::<_, anyhow::Error>::Ok(oracle)
675        };
676
677        let metrics = &config.metrics.retries.open;
678
679        let oracle = retry_fallible(metrics, fallible).await;
680
681        oracle
682    }
683
684    async fn get_connection(&self) -> Result<Object, PoolError> {
685        self.postgres_client.get_connection().await
686    }
687
688    /// Returns a `Vec` of all known timelines along with their current greatest
689    /// timestamp (max of read_ts and write_ts).
690    ///
691    /// For use when initializing another [`TimestampOracle`] implementation
692    /// from another oracle's state.
693    ///
694    /// NOTE: This method can have linearizability violations, but it's hard to
695    /// get around those. They will only occur during bootstrap, and only
696    /// if/when we migrate between different oracle implementations. Once we
697    /// migrated from the catalog-backed oracle to the postgres/crdb-backed
698    /// oracle we can remove this code and it's callsite.
699    pub async fn get_all_timelines(
700        config: PostgresTimestampOracleConfig,
701    ) -> Result<Vec<(String, Timestamp)>, anyhow::Error> {
702        let fallible = || async {
703            let postgres_client = PostgresClient::open(config.clone().into())?;
704
705            let mut client = postgres_client.get_connection().await?;
706
707            let txn = client.transaction().await?;
708
709            // Using `table_schema = CURRENT_SCHEMA` makes sure we only include
710            // tables that are queryable by us. Otherwise this check might
711            // return true but then the query below fails with a confusing
712            // "table does not exist" error.
713            let q = r#"
714            SELECT EXISTS (SELECT * FROM information_schema.tables WHERE table_name = 'timestamp_oracle' AND table_schema = CURRENT_SCHEMA);
715        "#;
716            let statement = txn.prepare(q).await?;
717            let exists_row = pg_txn_query_one_prepared(&txn, &statement, &[]).await?;
718            let exists: bool = exists_row.try_get("exists").expect("missing exists column");
719            if !exists {
720                return Ok(Vec::new());
721            }
722
723            let q = r#"
724            SELECT timeline, GREATEST(read_ts, write_ts) as ts FROM timestamp_oracle;
725        "#;
726            let statement = txn.prepare(q).await?;
727            let rows = pg_txn_query_prepared(&txn, &statement, &[]).await?;
728
729            txn.commit().await?;
730
731            let result = rows
732                .into_iter()
733                .map(|row| {
734                    let timeline: String =
735                        row.try_get("timeline").expect("missing timeline column");
736                    let ts: Numeric = row.try_get("ts").expect("missing ts column");
737                    let ts = Self::decimal_to_ts(ts);
738
739                    (timeline, ts)
740                })
741                .collect::<Vec<_>>();
742
743            Ok(result)
744        };
745
746        let metrics = &config.metrics.retries.get_all_timelines;
747
748        let result = retry_fallible(metrics, fallible).await;
749
750        Ok(result)
751    }
752
753    #[mz_ore::instrument(name = "oracle::write_ts")]
754    async fn fallible_write_ts(&self) -> Result<WriteTimestamp<Timestamp>, anyhow::Error> {
755        if self.read_only {
756            panic!("attempting write_ts in read-only mode");
757        }
758
759        let proposed_next_ts = self.next.now();
760        let proposed_next_ts = Self::ts_to_decimal(proposed_next_ts);
761
762        let q = r#"
763            UPDATE timestamp_oracle SET write_ts = GREATEST(write_ts+1, $2)
764                WHERE timeline = $1
765            RETURNING write_ts;
766        "#;
767        let client = self.get_connection().await?;
768        let statement = client.prepare_cached(q).await?;
769        let result =
770            pg_query_one_prepared(&client, &statement, &[&self.timeline, &proposed_next_ts])
771                .await?;
772
773        let write_ts: Numeric = result.try_get("write_ts").expect("missing column write_ts");
774        let write_ts = Self::decimal_to_ts(write_ts);
775
776        debug!(
777            timeline = ?self.timeline,
778            write_ts = ?write_ts,
779            proposed_next_ts = ?proposed_next_ts,
780            "returning from write_ts()");
781
782        let advance_to = write_ts.step_forward();
783
784        Ok(WriteTimestamp {
785            timestamp: write_ts,
786            advance_to,
787        })
788    }
789
790    #[mz_ore::instrument(name = "oracle::peek_write_ts")]
791    async fn fallible_peek_write_ts(&self) -> Result<Timestamp, anyhow::Error> {
792        let q = r#"
793            SELECT write_ts FROM timestamp_oracle
794                WHERE timeline = $1;
795        "#;
796        let client = self.get_connection().await?;
797        let statement = client.prepare_cached(q).await?;
798        let result = pg_query_one_prepared(&client, &statement, &[&self.timeline]).await?;
799
800        let write_ts: Numeric = result.try_get("write_ts").expect("missing column write_ts");
801        let write_ts = Self::decimal_to_ts(write_ts);
802
803        debug!(
804            timeline = ?self.timeline,
805            write_ts = ?write_ts,
806            "returning from peek_write_ts()");
807
808        Ok(write_ts)
809    }
810
811    #[mz_ore::instrument(name = "oracle::read_ts")]
812    async fn fallible_read_ts(&self) -> Result<Timestamp, anyhow::Error> {
813        let q = r#"
814            SELECT read_ts FROM timestamp_oracle
815                WHERE timeline = $1;
816        "#;
817        let client = self.get_connection().await?;
818        let statement = client.prepare_cached(q).await?;
819        let result = pg_query_one_prepared(&client, &statement, &[&self.timeline]).await?;
820
821        let read_ts: Numeric = result.try_get("read_ts").expect("missing column read_ts");
822        let read_ts = Self::decimal_to_ts(read_ts);
823
824        debug!(
825            timeline = ?self.timeline,
826            read_ts = ?read_ts,
827            "returning from read_ts()");
828
829        Ok(read_ts)
830    }
831
832    #[mz_ore::instrument(name = "oracle::apply_write")]
833    async fn fallible_apply_write(&self, write_ts: Timestamp) -> Result<(), anyhow::Error> {
834        if self.read_only {
835            panic!("attempting apply_write in read-only mode");
836        }
837
838        let q = r#"
839            UPDATE timestamp_oracle SET write_ts = GREATEST(write_ts, $2), read_ts = GREATEST(read_ts, $2)
840                WHERE timeline = $1;
841        "#;
842        let client = self.get_connection().await?;
843        let statement = client.prepare_cached(q).await?;
844        let write_ts = Self::ts_to_decimal(write_ts);
845
846        let _ = pg_execute_prepared(&client, &statement, &[&self.timeline, &write_ts]).await?;
847
848        debug!(
849            timeline = ?self.timeline,
850            write_ts = ?write_ts,
851            "returning from apply_write()");
852
853        Ok(())
854    }
855
856    // We could add `From` impls for these but for now keep the code local to
857    // the oracle.
858    fn ts_to_decimal(ts: Timestamp) -> Numeric {
859        let decimal = Decimal::from(ts);
860        Numeric::from(decimal)
861    }
862
863    // We could add `From` impls for these but for now keep the code local to
864    // the oracle.
865    fn decimal_to_ts(ts: Numeric) -> Timestamp {
866        ts.0.0.try_into().expect("we only use u64 timestamps")
867    }
868}
869
870// A wrapper around the `fallible_` methods that adds operation metrics and
871// retries.
872//
873// NOTE: This implementation is tied to [`mz_repr::Timestamp`]. We could change
874// that, and also make the types we store in the backing "Postgres" table
875// generic. But in practice we only use oracles for [`mz_repr::Timestamp`] so
876// don't do that extra work for now.
877#[async_trait]
878impl<N> TimestampOracle<Timestamp> for PostgresTimestampOracle<N>
879where
880    N: GenericNowFn<Timestamp> + std::fmt::Debug + 'static,
881{
882    #[instrument]
883    async fn write_ts(&self) -> WriteTimestamp<Timestamp> {
884        let metrics = &self.metrics.retries.write_ts;
885
886        let res = retry_fallible(metrics, || {
887            self.metrics
888                .oracle
889                .write_ts
890                .run_op(|| self.fallible_write_ts())
891        })
892        .await;
893
894        res
895    }
896
897    #[instrument]
898    async fn peek_write_ts(&self) -> Timestamp {
899        let metrics = &self.metrics.retries.peek_write_ts;
900
901        let res = retry_fallible(metrics, || {
902            self.metrics
903                .oracle
904                .peek_write_ts
905                .run_op(|| self.fallible_peek_write_ts())
906        })
907        .await;
908
909        res
910    }
911
912    #[instrument]
913    async fn read_ts(&self) -> Timestamp {
914        let metrics = &self.metrics.retries.read_ts;
915
916        let res = retry_fallible(metrics, || {
917            self.metrics
918                .oracle
919                .read_ts
920                .run_op(|| self.fallible_read_ts())
921        })
922        .await;
923
924        res
925    }
926
927    #[instrument]
928    async fn apply_write(&self, write_ts: Timestamp) {
929        let metrics = &self.metrics.retries.apply_write;
930
931        let res = retry_fallible(metrics, || {
932            self.metrics
933                .oracle
934                .apply_write
935                .run_op(|| self.fallible_apply_write(write_ts.clone()))
936        })
937        .await;
938
939        res
940    }
941}
942
943pub const INFO_MIN_ATTEMPTS: usize = 3;
944
945pub async fn retry_fallible<R, F, WorkFn>(metrics: &RetryMetrics, mut work_fn: WorkFn) -> R
946where
947    F: std::future::Future<Output = Result<R, anyhow::Error>>,
948    WorkFn: FnMut() -> F,
949{
950    let mut retry = metrics.stream(Retry::oracle_defaults(SystemTime::now()).into_retry_stream());
951    loop {
952        match work_fn().await {
953            Ok(x) => {
954                if retry.attempt() > 0 {
955                    debug!(
956                        "external operation {} succeeded after failing at least once",
957                        metrics.name,
958                    );
959                }
960                return x;
961            }
962            Err(err)
963                if err
964                    .to_string()
965                    .contains("\"timestamp_oracle\" does not exist") =>
966            {
967                // TODO(aljoscha): Re-consider this before enabling the oracle
968                // in production.
969                // It's a policy question whether this actually _is_ an
970                // unrecoverable error: an operator could go in an re-create the
971                // `timestamp_oracle` table once the problem is noticed.
972                // However, our tests currently assume that materialize will
973                // fail unrecoverably in cases where we do a rug-pull/remove
974                // expected tables from CRDB.
975                panic!(
976                    "external operation {} failed unrecoverably, someone removed our database/schema/table: {}",
977                    metrics.name,
978                    err.display_with_causes()
979                );
980            }
981            Err(err) => {
982                if retry.attempt() >= INFO_MIN_ATTEMPTS {
983                    info!(
984                        "external operation {} failed, retrying in {:?}: {}",
985                        metrics.name,
986                        retry.next_sleep(),
987                        err.display_with_causes()
988                    );
989                } else {
990                    debug!(
991                        "external operation {} failed, retrying in {:?}: {}",
992                        metrics.name,
993                        retry.next_sleep(),
994                        err.display_with_causes()
995                    );
996                }
997                retry = retry.sleep().await;
998            }
999        }
1000    }
1001}
1002
1003#[cfg(test)]
1004mod tests {
1005    use super::*;
1006
1007    #[mz_ore::test(tokio::test)]
1008    #[cfg_attr(miri, ignore)] // error: unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
1009    async fn test_postgres_timestamp_oracle() -> Result<(), anyhow::Error> {
1010        let config = match PostgresTimestampOracleConfig::new_for_test() {
1011            Some(config) => config,
1012            None => {
1013                info!(
1014                    "{} env not set: skipping test that uses external service",
1015                    PostgresTimestampOracleConfig::EXTERNAL_TESTS_POSTGRES_URL
1016                );
1017                return Ok(());
1018            }
1019        };
1020
1021        crate::tests::timestamp_oracle_impl_test(|timeline, now_fn, initial_ts| {
1022            let oracle = PostgresTimestampOracle::open(
1023                config.clone(),
1024                timeline,
1025                initial_ts,
1026                now_fn,
1027                false, /* read-only */
1028            );
1029
1030            async {
1031                let arced_oracle: Arc<dyn TimestampOracle<Timestamp> + Send + Sync> =
1032                    Arc::new(oracle.await);
1033
1034                arced_oracle
1035            }
1036        })
1037        .await?;
1038
1039        Ok(())
1040    }
1041}