Skip to main content

mz_timestamp_oracle/
postgres_oracle.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! A timestamp oracle backed by "Postgres" for persistence/durability and where
11//! all oracle operations are self-sufficiently linearized, without requiring
12//! any external precautions/machinery.
13
14use std::str::FromStr;
15use std::sync::atomic::{AtomicU32, AtomicUsize, Ordering};
16use std::sync::{Arc, RwLock};
17use std::time::{Duration, SystemTime};
18
19use async_trait::async_trait;
20use deadpool_postgres::tokio_postgres::Config;
21use deadpool_postgres::tokio_postgres::error::SqlState;
22use deadpool_postgres::tokio_postgres::types::ToSql;
23use deadpool_postgres::tokio_postgres::{Row, Statement};
24use deadpool_postgres::{Object, PoolError};
25use dec::Decimal;
26use mz_adapter_types::timestamp_oracle::{
27    DEFAULT_PG_TIMESTAMP_ORACLE_CONNECT_TIMEOUT, DEFAULT_PG_TIMESTAMP_ORACLE_CONNPOOL_MAX_SIZE,
28    DEFAULT_PG_TIMESTAMP_ORACLE_CONNPOOL_MAX_WAIT, DEFAULT_PG_TIMESTAMP_ORACLE_CONNPOOL_TTL,
29    DEFAULT_PG_TIMESTAMP_ORACLE_CONNPOOL_TTL_STAGGER, DEFAULT_PG_TIMESTAMP_ORACLE_KEEPALIVES_IDLE,
30    DEFAULT_PG_TIMESTAMP_ORACLE_KEEPALIVES_INTERVAL,
31    DEFAULT_PG_TIMESTAMP_ORACLE_KEEPALIVES_RETRIES, DEFAULT_PG_TIMESTAMP_ORACLE_STATEMENT_TIMEOUT,
32    DEFAULT_PG_TIMESTAMP_ORACLE_TCP_USER_TIMEOUT,
33};
34use mz_ore::error::ErrorExt;
35use mz_ore::instrument;
36use mz_ore::metrics::MetricsRegistry;
37use mz_ore::url::SensitiveUrl;
38use mz_pgrepr::Numeric;
39use mz_postgres_client::{PostgresClient, PostgresClientConfig, PostgresClientKnobs};
40use mz_repr::Timestamp;
41use postgres_protocol::escape::escape_identifier;
42use serde::{Deserialize, Serialize};
43use tracing::{debug, info};
44
45use crate::WriteTimestamp;
46use crate::metrics::{Metrics, RetryMetrics};
47use crate::retry::Retry;
48use crate::{GenericNowFn, TimestampOracle};
49
50// The timestamp columns are a `DECIMAL` that is big enough to hold
51// `18446744073709551615`, the maximum value of `u64` which is our underlying
52// timestamp type.
53const SCHEMA: &str = "
54CREATE TABLE IF NOT EXISTS timestamp_oracle (
55    timeline text NOT NULL,
56    read_ts DECIMAL(20,0) NOT NULL,
57    write_ts DECIMAL(20,0) NOT NULL,
58    PRIMARY KEY(timeline)
59)
60";
61
62// These `sql_stats_automatic_collection_enabled` are for the cost-based
63// optimizer but all the queries against this table are single-table and very
64// carefully tuned to hit the primary index, so the cost-based optimizer doesn't
65// really get us anything. OTOH, the background jobs that crdb creates to
66// collect these stats fill up the jobs table (slowing down all sorts of
67// things).
68const CRDB_SCHEMA_OPTIONS: &str = "WITH (sql_stats_automatic_collection_enabled = false)";
69// The `timestamp_oracle` table creates and deletes rows at a high
70// frequency, generating many tombstoned rows. If Cockroach's GC
71// interval is set high (the default is 25h) and these tombstones
72// accumulate, scanning over the table will take increasingly and
73// prohibitively long.
74//
75// See: https://github.com/MaterializeInc/database-issues/issues/4001
76// See: https://www.cockroachlabs.com/docs/stable/configure-zone.html#variables
77const CRDB_CONFIGURE_ZONE: &str =
78    "ALTER TABLE timestamp_oracle CONFIGURE ZONE USING gc.ttlseconds = 600;";
79
80/// NOTE: `mz-timestamp-oracle` currently keeps its Postgres surface local; it
81/// does not use `mz-postgres-util` wrappers.
82async fn pg_batch_execute(
83    client: &Object,
84    query: &str,
85) -> Result<(), deadpool_postgres::tokio_postgres::Error> {
86    #[allow(clippy::disallowed_methods)]
87    client.batch_execute(query).await
88}
89
90async fn pg_query_one_prepared(
91    client: &Object,
92    statement: &Statement,
93    params: &[&(dyn ToSql + Sync)],
94) -> Result<Row, deadpool_postgres::tokio_postgres::Error> {
95    #[allow(clippy::disallowed_methods)]
96    client.query_one(statement, params).await
97}
98
99async fn pg_execute_prepared(
100    client: &Object,
101    statement: &Statement,
102    params: &[&(dyn ToSql + Sync)],
103) -> Result<u64, deadpool_postgres::tokio_postgres::Error> {
104    #[allow(clippy::disallowed_methods)]
105    client.execute(statement, params).await
106}
107
108async fn pg_txn_query_prepared(
109    txn: &deadpool_postgres::Transaction<'_>,
110    statement: &Statement,
111    params: &[&(dyn ToSql + Sync)],
112) -> Result<Vec<Row>, deadpool_postgres::tokio_postgres::Error> {
113    #[allow(clippy::disallowed_methods)]
114    txn.query(statement, params).await
115}
116
117async fn pg_txn_query_one_prepared(
118    txn: &deadpool_postgres::Transaction<'_>,
119    statement: &Statement,
120    params: &[&(dyn ToSql + Sync)],
121) -> Result<Row, deadpool_postgres::tokio_postgres::Error> {
122    #[allow(clippy::disallowed_methods)]
123    txn.query_one(statement, params).await
124}
125
126/// A [`TimestampOracle`] backed by "Postgres".
127#[derive(Debug)]
128pub struct PostgresTimestampOracle<N>
129where
130    N: GenericNowFn<Timestamp>,
131{
132    timeline: String,
133    next: N,
134    postgres_client: Arc<PostgresClient>,
135    metrics: Arc<Metrics>,
136    /// A read-only timestamp oracle is NOT allowed to do operations that change
137    /// the backing Postgres/CRDB state.
138    read_only: bool,
139}
140
141/// Configuration to connect to a Postgres-backed implementation of
142/// [`TimestampOracle`].
143#[derive(Clone, Debug)]
144pub struct PostgresTimestampOracleConfig {
145    url: SensitiveUrl,
146    metrics: Arc<Metrics>,
147
148    /// Configurations that can be dynamically updated.
149    pub dynamic: Arc<DynamicConfig>,
150}
151
152impl From<PostgresTimestampOracleConfig> for PostgresClientConfig {
153    fn from(config: PostgresTimestampOracleConfig) -> Self {
154        let metrics = config.metrics.postgres_client.clone();
155        PostgresClientConfig::new(config.url.clone(), Arc::new(config), metrics)
156    }
157}
158
159impl PostgresTimestampOracleConfig {
160    pub(crate) const EXTERNAL_TESTS_POSTGRES_URL: &'static str = "METADATA_BACKEND_URL";
161
162    /// Returns a new instance of [`PostgresTimestampOracleConfig`] with default tuning.
163    pub fn new(url: &SensitiveUrl, metrics_registry: &MetricsRegistry) -> Self {
164        let metrics = Arc::new(Metrics::new(metrics_registry));
165
166        let dynamic = DynamicConfig::default();
167
168        PostgresTimestampOracleConfig {
169            url: url.clone(),
170            metrics,
171            dynamic: Arc::new(dynamic),
172        }
173    }
174
175    /// Returns a new [`PostgresTimestampOracleConfig`] for use in unit tests.
176    ///
177    /// By default, postgres oracle tests are no-ops so that `cargo test` works
178    /// on new environments without any configuration. To activate the tests for
179    /// [`PostgresTimestampOracle`] set the `METADATA_BACKEND_URL` environment variable
180    /// with a valid connection url [1].
181    ///
182    /// [1]: https://docs.rs/tokio-postgres/latest/tokio_postgres/config/struct.Config.html#url
183    pub fn new_for_test() -> Option<Self> {
184        let url = match std::env::var(Self::EXTERNAL_TESTS_POSTGRES_URL) {
185            Ok(url) => SensitiveUrl::from_str(&url).expect("invalid Postgres URL"),
186            Err(_) => {
187                if mz_ore::env::is_var_truthy("CI") {
188                    panic!("CI is supposed to run this test but something has gone wrong!");
189                }
190                return None;
191            }
192        };
193
194        let dynamic = DynamicConfig::default();
195
196        let config = PostgresTimestampOracleConfig {
197            url,
198            metrics: Arc::new(Metrics::new(&MetricsRegistry::new())),
199            dynamic: Arc::new(dynamic),
200        };
201
202        Some(config)
203    }
204
205    /// Returns the metrics associated with this config.
206    pub(crate) fn metrics(&self) -> &Arc<Metrics> {
207        &self.metrics
208    }
209}
210
211/// Part of [`PostgresTimestampOracleConfig`] that can be dynamically updated.
212///
213/// The timestamp oracle is expected to react to each of these such that
214/// updating the value returned by the function takes effect (i.e. no caching
215/// it). This should happen "as promptly as reasonably possible" where that's
216/// defined by the tradeoffs of complexity vs promptness.
217///
218/// These are hooked up to LaunchDarkly. Specifically, LaunchDarkly configs are
219/// serialized into a [`TimestampOracleParameters`]. In environmentd,
220/// these are applied directly via [`TimestampOracleParameters::apply`]
221/// to the [`PostgresTimestampOracleConfig`].
222#[derive(Debug)]
223pub struct DynamicConfig {
224    /// The maximum size of the connection pool to Postgres/CRDB.
225    pg_connection_pool_max_size: AtomicUsize,
226
227    /// The maximum time to wait when attempting to obtain a connection from the pool.
228    pg_connection_pool_max_wait: RwLock<Option<Duration>>,
229
230    /// The minimum TTL of a connection to Postgres/CRDB before it is
231    /// proactively terminated. Connections are routinely culled to balance load
232    /// against the downstream database.
233    pg_connection_pool_ttl: RwLock<Duration>,
234
235    /// The minimum time between TTLing connections to Postgres/CRDB. This delay
236    /// is used to stagger reconnections to avoid stampedes and high tail
237    /// latencies. This value should be much less than `connection_pool_ttl` so
238    /// that reconnections are biased towards terminating the oldest connections
239    /// first. A value of `connection_pool_ttl / connection_pool_max_size` is
240    /// likely a good place to start so that all connections are rotated when
241    /// the pool is fully used.
242    pg_connection_pool_ttl_stagger: RwLock<Duration>,
243
244    /// The duration to wait for a Postgres/CRDB connection to be made before
245    /// retrying.
246    pg_connection_pool_connect_timeout: RwLock<Duration>,
247
248    /// The TCP user timeout for a Postgres/CRDB connection. Specifies the
249    /// amount of time that transmitted data may remain unacknowledged before
250    /// the TCP connection is forcibly closed.
251    pg_connection_pool_tcp_user_timeout: RwLock<Duration>,
252
253    /// The amount of idle time before a TCP keepalive packet is sent on
254    /// Postgres/CRDB connections.
255    pg_connection_pool_keepalives_idle: RwLock<Duration>,
256
257    /// The time interval between TCP keepalive probes on Postgres/CRDB
258    /// connections.
259    pg_connection_pool_keepalives_interval: RwLock<Duration>,
260
261    /// The maximum number of TCP keepalive probes that will be sent before
262    /// dropping a Postgres/CRDB connection.
263    pg_connection_pool_keepalives_retries: AtomicU32,
264
265    /// The server-side `statement_timeout` to set on each Postgres/CRDB
266    /// connection. A zero value is a sentinel that means "do not set a
267    /// statement timeout".
268    pg_statement_timeout: RwLock<Duration>,
269}
270
271impl Default for DynamicConfig {
272    fn default() -> Self {
273        Self {
274            // TODO(aljoscha): These defaults are taken as is from the persist
275            // Consensus tuning. Once we're sufficiently advanced we might want
276            // to pick defaults that are different from the persist defaults.
277            pg_connection_pool_max_size: AtomicUsize::new(
278                DEFAULT_PG_TIMESTAMP_ORACLE_CONNPOOL_MAX_SIZE,
279            ),
280            pg_connection_pool_max_wait: RwLock::new(Some(
281                DEFAULT_PG_TIMESTAMP_ORACLE_CONNPOOL_MAX_WAIT,
282            )),
283            pg_connection_pool_ttl: RwLock::new(DEFAULT_PG_TIMESTAMP_ORACLE_CONNPOOL_TTL),
284            pg_connection_pool_ttl_stagger: RwLock::new(
285                DEFAULT_PG_TIMESTAMP_ORACLE_CONNPOOL_TTL_STAGGER,
286            ),
287            pg_connection_pool_connect_timeout: RwLock::new(
288                DEFAULT_PG_TIMESTAMP_ORACLE_CONNECT_TIMEOUT,
289            ),
290            pg_connection_pool_tcp_user_timeout: RwLock::new(
291                DEFAULT_PG_TIMESTAMP_ORACLE_TCP_USER_TIMEOUT,
292            ),
293            pg_connection_pool_keepalives_idle: RwLock::new(
294                DEFAULT_PG_TIMESTAMP_ORACLE_KEEPALIVES_IDLE,
295            ),
296            pg_connection_pool_keepalives_interval: RwLock::new(
297                DEFAULT_PG_TIMESTAMP_ORACLE_KEEPALIVES_INTERVAL,
298            ),
299            pg_connection_pool_keepalives_retries: AtomicU32::new(
300                DEFAULT_PG_TIMESTAMP_ORACLE_KEEPALIVES_RETRIES,
301            ),
302            pg_statement_timeout: RwLock::new(DEFAULT_PG_TIMESTAMP_ORACLE_STATEMENT_TIMEOUT),
303        }
304    }
305}
306
307impl DynamicConfig {
308    // TODO: Decide if we can relax these.
309    const LOAD_ORDERING: Ordering = Ordering::SeqCst;
310    const STORE_ORDERING: Ordering = Ordering::SeqCst;
311
312    fn connection_pool_max_size(&self) -> usize {
313        self.pg_connection_pool_max_size.load(Self::LOAD_ORDERING)
314    }
315
316    fn connection_pool_max_wait(&self) -> Option<Duration> {
317        *self
318            .pg_connection_pool_max_wait
319            .read()
320            .expect("lock poisoned")
321    }
322
323    fn connection_pool_ttl(&self) -> Duration {
324        *self.pg_connection_pool_ttl.read().expect("lock poisoned")
325    }
326
327    fn connection_pool_ttl_stagger(&self) -> Duration {
328        *self
329            .pg_connection_pool_ttl_stagger
330            .read()
331            .expect("lock poisoned")
332    }
333
334    fn connect_timeout(&self) -> Duration {
335        *self
336            .pg_connection_pool_connect_timeout
337            .read()
338            .expect("lock poisoned")
339    }
340
341    fn tcp_user_timeout(&self) -> Duration {
342        *self
343            .pg_connection_pool_tcp_user_timeout
344            .read()
345            .expect("lock poisoned")
346    }
347
348    fn keepalives_idle(&self) -> Duration {
349        *self
350            .pg_connection_pool_keepalives_idle
351            .read()
352            .expect("lock poisoned")
353    }
354
355    fn keepalives_interval(&self) -> Duration {
356        *self
357            .pg_connection_pool_keepalives_interval
358            .read()
359            .expect("lock poisoned")
360    }
361
362    fn keepalives_retries(&self) -> u32 {
363        self.pg_connection_pool_keepalives_retries
364            .load(Self::LOAD_ORDERING)
365    }
366
367    fn statement_timeout(&self) -> Duration {
368        *self.pg_statement_timeout.read().expect("lock poisoned")
369    }
370}
371
372impl PostgresClientKnobs for PostgresTimestampOracleConfig {
373    fn connection_pool_max_size(&self) -> usize {
374        self.dynamic.connection_pool_max_size()
375    }
376
377    fn connection_pool_max_wait(&self) -> Option<Duration> {
378        self.dynamic.connection_pool_max_wait()
379    }
380
381    fn connection_pool_ttl(&self) -> Duration {
382        self.dynamic.connection_pool_ttl()
383    }
384
385    fn connection_pool_ttl_stagger(&self) -> Duration {
386        self.dynamic.connection_pool_ttl_stagger()
387    }
388
389    fn connect_timeout(&self) -> Duration {
390        self.dynamic.connect_timeout()
391    }
392
393    fn tcp_user_timeout(&self) -> Duration {
394        self.dynamic.tcp_user_timeout()
395    }
396
397    fn keepalives_idle(&self) -> Duration {
398        self.dynamic.keepalives_idle()
399    }
400
401    fn keepalives_interval(&self) -> Duration {
402        self.dynamic.keepalives_interval()
403    }
404
405    fn keepalives_retries(&self) -> u32 {
406        self.dynamic.keepalives_retries()
407    }
408
409    fn statement_timeout(&self) -> Duration {
410        self.dynamic.statement_timeout()
411    }
412}
413
414/// Updates to values in [`PostgresTimestampOracleConfig`].
415///
416/// These reflect updates made to LaunchDarkly.
417///
418/// Parameters can be set (`Some`) or unset (`None`). Unset parameters should be
419/// interpreted to mean "use the previous value".
420#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
421pub struct TimestampOracleParameters {
422    /// Configures `DynamicConfig::pg_connection_pool_max_size`.
423    pub pg_connection_pool_max_size: Option<usize>,
424    /// Configures `DynamicConfig::pg_connection_pool_max_wait`.
425    ///
426    /// NOTE: Yes, this is an `Option<Option<...>>`. Fields in this struct are
427    /// all `Option` to signal the presence or absence of a system var
428    /// configuration. A `Some(None)` value in this field signals the desire to
429    /// have no `max_wait`. This is different from `None`, which signals that
430    /// there is no actively set system var, although this case is not currently
431    /// possible with how the code around this works.
432    pub pg_connection_pool_max_wait: Option<Option<Duration>>,
433    /// Configures `DynamicConfig::pg_connection_pool_ttl`.
434    pub pg_connection_pool_ttl: Option<Duration>,
435    /// Configures `DynamicConfig::pg_connection_pool_ttl_stagger`.
436    pub pg_connection_pool_ttl_stagger: Option<Duration>,
437    /// Configures `DynamicConfig::pg_connection_pool_connect_timeout`.
438    pub pg_connection_pool_connect_timeout: Option<Duration>,
439    /// Configures `DynamicConfig::pg_connection_pool_tcp_user_timeout`.
440    pub pg_connection_pool_tcp_user_timeout: Option<Duration>,
441    /// Configures `DynamicConfig::pg_connection_pool_keepalives_idle`.
442    pub pg_connection_pool_keepalives_idle: Option<Duration>,
443    /// Configures `DynamicConfig::pg_connection_pool_keepalives_interval`.
444    pub pg_connection_pool_keepalives_interval: Option<Duration>,
445    /// Configures `DynamicConfig::pg_connection_pool_keepalives_retries`.
446    pub pg_connection_pool_keepalives_retries: Option<u32>,
447    /// Configures `DynamicConfig::pg_statement_timeout`.
448    pub pg_statement_timeout: Option<Duration>,
449}
450
451impl TimestampOracleParameters {
452    /// Update the parameter values with the set ones from `other`.
453    pub fn update(&mut self, other: TimestampOracleParameters) {
454        // Deconstruct self and other so we get a compile failure if new fields
455        // are added.
456        let Self {
457            pg_connection_pool_max_size: self_pg_connection_pool_max_size,
458            pg_connection_pool_max_wait: self_pg_connection_pool_max_wait,
459            pg_connection_pool_ttl: self_pg_connection_pool_ttl,
460            pg_connection_pool_ttl_stagger: self_pg_connection_pool_ttl_stagger,
461            pg_connection_pool_connect_timeout: self_pg_connection_pool_connect_timeout,
462            pg_connection_pool_tcp_user_timeout: self_pg_connection_pool_tcp_user_timeout,
463            pg_connection_pool_keepalives_idle: self_pg_connection_pool_keepalives_idle,
464            pg_connection_pool_keepalives_interval: self_pg_connection_pool_keepalives_interval,
465            pg_connection_pool_keepalives_retries: self_pg_connection_pool_keepalives_retries,
466            pg_statement_timeout: self_pg_statement_timeout,
467        } = self;
468        let Self {
469            pg_connection_pool_max_size: other_pg_connection_pool_max_size,
470            pg_connection_pool_max_wait: other_pg_connection_pool_max_wait,
471            pg_connection_pool_ttl: other_pg_connection_pool_ttl,
472            pg_connection_pool_ttl_stagger: other_pg_connection_pool_ttl_stagger,
473            pg_connection_pool_connect_timeout: other_pg_connection_pool_connect_timeout,
474            pg_connection_pool_tcp_user_timeout: other_pg_connection_pool_tcp_user_timeout,
475            pg_connection_pool_keepalives_idle: other_pg_connection_pool_keepalives_idle,
476            pg_connection_pool_keepalives_interval: other_pg_connection_pool_keepalives_interval,
477            pg_connection_pool_keepalives_retries: other_pg_connection_pool_keepalives_retries,
478            pg_statement_timeout: other_pg_statement_timeout,
479        } = other;
480        if let Some(v) = other_pg_connection_pool_max_size {
481            *self_pg_connection_pool_max_size = Some(v);
482        }
483        if let Some(v) = other_pg_connection_pool_max_wait {
484            *self_pg_connection_pool_max_wait = Some(v);
485        }
486        if let Some(v) = other_pg_connection_pool_ttl {
487            *self_pg_connection_pool_ttl = Some(v);
488        }
489        if let Some(v) = other_pg_connection_pool_ttl_stagger {
490            *self_pg_connection_pool_ttl_stagger = Some(v);
491        }
492        if let Some(v) = other_pg_connection_pool_connect_timeout {
493            *self_pg_connection_pool_connect_timeout = Some(v);
494        }
495        if let Some(v) = other_pg_connection_pool_tcp_user_timeout {
496            *self_pg_connection_pool_tcp_user_timeout = Some(v);
497        }
498        if let Some(v) = other_pg_connection_pool_keepalives_idle {
499            *self_pg_connection_pool_keepalives_idle = Some(v);
500        }
501        if let Some(v) = other_pg_connection_pool_keepalives_interval {
502            *self_pg_connection_pool_keepalives_interval = Some(v);
503        }
504        if let Some(v) = other_pg_connection_pool_keepalives_retries {
505            *self_pg_connection_pool_keepalives_retries = Some(v);
506        }
507        if let Some(v) = other_pg_statement_timeout {
508            *self_pg_statement_timeout = Some(v);
509        }
510    }
511
512    /// Applies the parameter values to the given in-memory config object.
513    ///
514    /// Note that these overrides are not all applied atomically: i.e. it's
515    /// possible for the timestamp oracle/postgres client to race with this and
516    /// see some but not all of the parameters applied.
517    pub fn apply(&self, cfg: &PostgresTimestampOracleConfig) {
518        info!(params = ?self, "Applying configuration update!");
519
520        // Deconstruct self so we get a compile failure if new fields are added.
521        let Self {
522            pg_connection_pool_max_size,
523            pg_connection_pool_max_wait,
524            pg_connection_pool_ttl,
525            pg_connection_pool_ttl_stagger,
526            pg_connection_pool_connect_timeout,
527            pg_connection_pool_tcp_user_timeout,
528            pg_connection_pool_keepalives_idle,
529            pg_connection_pool_keepalives_interval,
530            pg_connection_pool_keepalives_retries,
531            pg_statement_timeout,
532        } = self;
533        if let Some(pg_connection_pool_max_size) = pg_connection_pool_max_size {
534            cfg.dynamic
535                .pg_connection_pool_max_size
536                .store(*pg_connection_pool_max_size, DynamicConfig::STORE_ORDERING);
537        }
538        if let Some(pg_connection_pool_max_wait) = pg_connection_pool_max_wait {
539            let mut max_wait = cfg
540                .dynamic
541                .pg_connection_pool_max_wait
542                .write()
543                .expect("lock poisoned");
544            *max_wait = *pg_connection_pool_max_wait;
545        }
546        if let Some(pg_connection_pool_ttl) = pg_connection_pool_ttl {
547            let mut ttl = cfg
548                .dynamic
549                .pg_connection_pool_ttl
550                .write()
551                .expect("lock poisoned");
552            *ttl = *pg_connection_pool_ttl;
553        }
554        if let Some(pg_connection_pool_ttl_stagger) = pg_connection_pool_ttl_stagger {
555            let mut ttl_stagger = cfg
556                .dynamic
557                .pg_connection_pool_ttl_stagger
558                .write()
559                .expect("lock poisoned");
560            *ttl_stagger = *pg_connection_pool_ttl_stagger;
561        }
562        if let Some(pg_connection_pool_connect_timeout) = pg_connection_pool_connect_timeout {
563            let mut timeout = cfg
564                .dynamic
565                .pg_connection_pool_connect_timeout
566                .write()
567                .expect("lock poisoned");
568            *timeout = *pg_connection_pool_connect_timeout;
569        }
570        if let Some(pg_connection_pool_tcp_user_timeout) = pg_connection_pool_tcp_user_timeout {
571            let mut timeout = cfg
572                .dynamic
573                .pg_connection_pool_tcp_user_timeout
574                .write()
575                .expect("lock poisoned");
576            *timeout = *pg_connection_pool_tcp_user_timeout;
577        }
578        if let Some(pg_connection_pool_keepalives_idle) = pg_connection_pool_keepalives_idle {
579            let mut timeout = cfg
580                .dynamic
581                .pg_connection_pool_keepalives_idle
582                .write()
583                .expect("lock poisoned");
584            *timeout = *pg_connection_pool_keepalives_idle;
585        }
586        if let Some(pg_connection_pool_keepalives_interval) = pg_connection_pool_keepalives_interval
587        {
588            let mut timeout = cfg
589                .dynamic
590                .pg_connection_pool_keepalives_interval
591                .write()
592                .expect("lock poisoned");
593            *timeout = *pg_connection_pool_keepalives_interval;
594        }
595        if let Some(pg_connection_pool_keepalives_retries) = pg_connection_pool_keepalives_retries {
596            cfg.dynamic.pg_connection_pool_keepalives_retries.store(
597                *pg_connection_pool_keepalives_retries,
598                DynamicConfig::STORE_ORDERING,
599            );
600        }
601        if let Some(pg_statement_timeout) = pg_statement_timeout {
602            let mut timeout = cfg
603                .dynamic
604                .pg_statement_timeout
605                .write()
606                .expect("lock poisoned");
607            *timeout = *pg_statement_timeout;
608        }
609    }
610}
611
612impl<N> PostgresTimestampOracle<N>
613where
614    N: GenericNowFn<Timestamp> + std::fmt::Debug + 'static,
615{
616    /// Open a Postgres [`TimestampOracle`] instance with `config`, for the
617    /// timeline named `timeline`. `next` generates new timestamps when invoked.
618    /// Timestamps that are returned are made durable and will never retract.
619    pub async fn open(
620        config: PostgresTimestampOracleConfig,
621        timeline: String,
622        initially: Timestamp,
623        next: N,
624        read_only: bool,
625    ) -> Self {
626        info!(config = ?config, "opening PostgresTimestampOracle");
627
628        let fallible = || async {
629            let metrics = Arc::clone(&config.metrics);
630
631            // don't need to unredact here because we're just pulling out the username
632            let pg_config: Config = config.url.to_string().parse()?;
633            let role = pg_config.get_user().unwrap();
634            let create_schema = format!(
635                "CREATE SCHEMA IF NOT EXISTS tsoracle AUTHORIZATION {}",
636                escape_identifier(role),
637            );
638
639            let postgres_client = PostgresClient::open(config.clone().into())?;
640
641            let client = postgres_client.get_connection().await?;
642
643            let crdb_mode = match pg_batch_execute(
644                &client,
645                &format!(
646                    "{}; {}{}; {}",
647                    create_schema, SCHEMA, CRDB_SCHEMA_OPTIONS, CRDB_CONFIGURE_ZONE,
648                ),
649            )
650            .await
651            {
652                Ok(()) => true,
653                Err(e)
654                    if e.code() == Some(&SqlState::INVALID_PARAMETER_VALUE)
655                        || e.code() == Some(&SqlState::SYNTAX_ERROR) =>
656                {
657                    info!(
658                        "unable to initiate timestamp_oracle with CRDB params, this is expected and OK when running against Postgres: {:?}",
659                        e
660                    );
661                    false
662                }
663                Err(e) => return Err(e.into()),
664            };
665
666            if !crdb_mode {
667                pg_batch_execute(&client, &format!("{}; {};", create_schema, SCHEMA)).await?;
668            }
669
670            let oracle = PostgresTimestampOracle {
671                timeline: timeline.clone(),
672                next: next.clone(),
673                postgres_client: Arc::new(postgres_client),
674                metrics,
675                read_only,
676            };
677
678            // Create a row for our timeline, if it doesn't exist. The
679            // `apply_write` call below expects the row to be present. If we
680            // didn't have this here we would always need CHECK EXISTS calls or
681            // something in `apply_write`, making it more complicated, so we
682            // only do it once here, on initialization.
683            let q = r#"
684                    INSERT INTO timestamp_oracle (timeline, read_ts, write_ts)
685                        VALUES ($1, $2, $3)
686                        ON CONFLICT (timeline) DO NOTHING;
687                "#;
688            let statement = client.prepare_cached(q).await?;
689
690            let initially_coerced = Self::ts_to_decimal(initially);
691            let _ = pg_execute_prepared(
692                &client,
693                &statement,
694                &[&oracle.timeline, &initially_coerced, &initially_coerced],
695            )
696            .await?;
697
698            // Forward timestamps to what we're given from outside. Remember,
699            // the above query will only create the row at the initial timestamp
700            // if it didn't exist before.
701            if !read_only {
702                TimestampOracle::apply_write(&oracle, initially).await;
703            }
704
705            Result::<_, anyhow::Error>::Ok(oracle)
706        };
707
708        let metrics = &config.metrics.retries.open;
709
710        let oracle = retry_fallible(metrics, fallible).await;
711
712        oracle
713    }
714
715    async fn get_connection(&self) -> Result<Object, PoolError> {
716        self.postgres_client.get_connection().await
717    }
718
719    /// Returns a `Vec` of all known timelines along with their current greatest
720    /// timestamp (max of read_ts and write_ts).
721    ///
722    /// For use when initializing another [`TimestampOracle`] implementation
723    /// from another oracle's state.
724    ///
725    /// NOTE: This method can have linearizability violations, but it's hard to
726    /// get around those. They will only occur during bootstrap, and only
727    /// if/when we migrate between different oracle implementations. Once we
728    /// migrated from the catalog-backed oracle to the postgres/crdb-backed
729    /// oracle we can remove this code and it's callsite.
730    pub async fn get_all_timelines(
731        config: PostgresTimestampOracleConfig,
732    ) -> Result<Vec<(String, Timestamp)>, anyhow::Error> {
733        let fallible = || async {
734            let postgres_client = PostgresClient::open(config.clone().into())?;
735
736            let mut client = postgres_client.get_connection().await?;
737
738            let txn = client.transaction().await?;
739
740            // Using `table_schema = CURRENT_SCHEMA` makes sure we only include
741            // tables that are queryable by us. Otherwise this check might
742            // return true but then the query below fails with a confusing
743            // "table does not exist" error.
744            let q = r#"
745            SELECT EXISTS (SELECT * FROM information_schema.tables WHERE table_name = 'timestamp_oracle' AND table_schema = CURRENT_SCHEMA);
746        "#;
747            let statement = txn.prepare(q).await?;
748            let exists_row = pg_txn_query_one_prepared(&txn, &statement, &[]).await?;
749            let exists: bool = exists_row.try_get("exists").expect("missing exists column");
750            if !exists {
751                return Ok(Vec::new());
752            }
753
754            let q = r#"
755            SELECT timeline, GREATEST(read_ts, write_ts) as ts FROM timestamp_oracle;
756        "#;
757            let statement = txn.prepare(q).await?;
758            let rows = pg_txn_query_prepared(&txn, &statement, &[]).await?;
759
760            txn.commit().await?;
761
762            let result = rows
763                .into_iter()
764                .map(|row| {
765                    let timeline: String =
766                        row.try_get("timeline").expect("missing timeline column");
767                    let ts: Numeric = row.try_get("ts").expect("missing ts column");
768                    let ts = Self::decimal_to_ts(ts);
769
770                    (timeline, ts)
771                })
772                .collect::<Vec<_>>();
773
774            Ok(result)
775        };
776
777        let metrics = &config.metrics.retries.get_all_timelines;
778
779        let result = retry_fallible(metrics, fallible).await;
780
781        Ok(result)
782    }
783
784    #[mz_ore::instrument(name = "oracle::write_ts")]
785    async fn fallible_write_ts(&self) -> Result<WriteTimestamp<Timestamp>, anyhow::Error> {
786        if self.read_only {
787            panic!("attempting write_ts in read-only mode");
788        }
789
790        let proposed_next_ts = self.next.now();
791        let proposed_next_ts = Self::ts_to_decimal(proposed_next_ts);
792
793        let q = r#"
794            UPDATE timestamp_oracle SET write_ts = GREATEST(write_ts+1, $2)
795                WHERE timeline = $1
796            RETURNING write_ts;
797        "#;
798        let client = self.get_connection().await?;
799        let statement = client.prepare_cached(q).await?;
800        let result =
801            pg_query_one_prepared(&client, &statement, &[&self.timeline, &proposed_next_ts])
802                .await?;
803
804        let write_ts: Numeric = result.try_get("write_ts").expect("missing column write_ts");
805        let write_ts = Self::decimal_to_ts(write_ts);
806
807        debug!(
808            timeline = ?self.timeline,
809            write_ts = ?write_ts,
810            proposed_next_ts = ?proposed_next_ts,
811            "returning from write_ts()");
812
813        let advance_to = write_ts.step_forward();
814
815        Ok(WriteTimestamp {
816            timestamp: write_ts,
817            advance_to,
818        })
819    }
820
821    #[mz_ore::instrument(name = "oracle::peek_write_ts")]
822    async fn fallible_peek_write_ts(&self) -> Result<Timestamp, anyhow::Error> {
823        let q = r#"
824            SELECT write_ts FROM timestamp_oracle
825                WHERE timeline = $1;
826        "#;
827        let client = self.get_connection().await?;
828        let statement = client.prepare_cached(q).await?;
829        let result = pg_query_one_prepared(&client, &statement, &[&self.timeline]).await?;
830
831        let write_ts: Numeric = result.try_get("write_ts").expect("missing column write_ts");
832        let write_ts = Self::decimal_to_ts(write_ts);
833
834        debug!(
835            timeline = ?self.timeline,
836            write_ts = ?write_ts,
837            "returning from peek_write_ts()");
838
839        Ok(write_ts)
840    }
841
842    #[mz_ore::instrument(name = "oracle::read_ts")]
843    async fn fallible_read_ts(&self) -> Result<Timestamp, anyhow::Error> {
844        let q = r#"
845            SELECT read_ts FROM timestamp_oracle
846                WHERE timeline = $1;
847        "#;
848        let client = self.get_connection().await?;
849        let statement = client.prepare_cached(q).await?;
850        let result = pg_query_one_prepared(&client, &statement, &[&self.timeline]).await?;
851
852        let read_ts: Numeric = result.try_get("read_ts").expect("missing column read_ts");
853        let read_ts = Self::decimal_to_ts(read_ts);
854
855        debug!(
856            timeline = ?self.timeline,
857            read_ts = ?read_ts,
858            "returning from read_ts()");
859
860        Ok(read_ts)
861    }
862
863    #[mz_ore::instrument(name = "oracle::apply_write")]
864    async fn fallible_apply_write(&self, write_ts: Timestamp) -> Result<(), anyhow::Error> {
865        if self.read_only {
866            panic!("attempting apply_write in read-only mode");
867        }
868
869        let q = r#"
870            UPDATE timestamp_oracle SET write_ts = GREATEST(write_ts, $2), read_ts = GREATEST(read_ts, $2)
871                WHERE timeline = $1;
872        "#;
873        let client = self.get_connection().await?;
874        let statement = client.prepare_cached(q).await?;
875        let write_ts = Self::ts_to_decimal(write_ts);
876
877        let _ = pg_execute_prepared(&client, &statement, &[&self.timeline, &write_ts]).await?;
878
879        debug!(
880            timeline = ?self.timeline,
881            write_ts = ?write_ts,
882            "returning from apply_write()");
883
884        Ok(())
885    }
886
887    // We could add `From` impls for these but for now keep the code local to
888    // the oracle.
889    fn ts_to_decimal(ts: Timestamp) -> Numeric {
890        let decimal = Decimal::from(ts);
891        Numeric::from(decimal)
892    }
893
894    // We could add `From` impls for these but for now keep the code local to
895    // the oracle.
896    fn decimal_to_ts(ts: Numeric) -> Timestamp {
897        ts.0.0.try_into().expect("we only use u64 timestamps")
898    }
899}
900
901// A wrapper around the `fallible_` methods that adds operation metrics and
902// retries.
903//
904// NOTE: This implementation is tied to [`mz_repr::Timestamp`]. We could change
905// that, and also make the types we store in the backing "Postgres" table
906// generic. But in practice we only use oracles for [`mz_repr::Timestamp`] so
907// don't do that extra work for now.
908#[async_trait]
909impl<N> TimestampOracle<Timestamp> for PostgresTimestampOracle<N>
910where
911    N: GenericNowFn<Timestamp> + std::fmt::Debug + 'static,
912{
913    #[instrument]
914    async fn write_ts(&self) -> WriteTimestamp<Timestamp> {
915        let metrics = &self.metrics.retries.write_ts;
916
917        let res = retry_fallible(metrics, || {
918            self.metrics
919                .oracle
920                .write_ts
921                .run_op(|| self.fallible_write_ts())
922        })
923        .await;
924
925        res
926    }
927
928    #[instrument]
929    async fn peek_write_ts(&self) -> Timestamp {
930        let metrics = &self.metrics.retries.peek_write_ts;
931
932        let res = retry_fallible(metrics, || {
933            self.metrics
934                .oracle
935                .peek_write_ts
936                .run_op(|| self.fallible_peek_write_ts())
937        })
938        .await;
939
940        res
941    }
942
943    #[instrument]
944    async fn read_ts(&self) -> Timestamp {
945        let metrics = &self.metrics.retries.read_ts;
946
947        let res = retry_fallible(metrics, || {
948            self.metrics
949                .oracle
950                .read_ts
951                .run_op(|| self.fallible_read_ts())
952        })
953        .await;
954
955        res
956    }
957
958    #[instrument]
959    async fn apply_write(&self, write_ts: Timestamp) {
960        let metrics = &self.metrics.retries.apply_write;
961
962        let res = retry_fallible(metrics, || {
963            self.metrics
964                .oracle
965                .apply_write
966                .run_op(|| self.fallible_apply_write(write_ts.clone()))
967        })
968        .await;
969
970        res
971    }
972}
973
974pub const INFO_MIN_ATTEMPTS: usize = 3;
975
976pub async fn retry_fallible<R, F, WorkFn>(metrics: &RetryMetrics, mut work_fn: WorkFn) -> R
977where
978    F: std::future::Future<Output = Result<R, anyhow::Error>>,
979    WorkFn: FnMut() -> F,
980{
981    let mut retry = metrics.stream(Retry::oracle_defaults(SystemTime::now()).into_retry_stream());
982    loop {
983        match work_fn().await {
984            Ok(x) => {
985                if retry.attempt() > 0 {
986                    debug!(
987                        "external operation {} succeeded after failing at least once",
988                        metrics.name,
989                    );
990                }
991                return x;
992            }
993            Err(err)
994                if err
995                    .to_string()
996                    .contains("\"timestamp_oracle\" does not exist") =>
997            {
998                // TODO(aljoscha): Re-consider this before enabling the oracle
999                // in production.
1000                // It's a policy question whether this actually _is_ an
1001                // unrecoverable error: an operator could go in an re-create the
1002                // `timestamp_oracle` table once the problem is noticed.
1003                // However, our tests currently assume that materialize will
1004                // fail unrecoverably in cases where we do a rug-pull/remove
1005                // expected tables from CRDB.
1006                panic!(
1007                    "external operation {} failed unrecoverably, someone removed our database/schema/table: {}",
1008                    metrics.name,
1009                    err.display_with_causes()
1010                );
1011            }
1012            Err(err) => {
1013                if retry.attempt() >= INFO_MIN_ATTEMPTS {
1014                    info!(
1015                        "external operation {} failed, retrying in {:?}: {}",
1016                        metrics.name,
1017                        retry.next_sleep(),
1018                        err.display_with_causes()
1019                    );
1020                } else {
1021                    debug!(
1022                        "external operation {} failed, retrying in {:?}: {}",
1023                        metrics.name,
1024                        retry.next_sleep(),
1025                        err.display_with_causes()
1026                    );
1027                }
1028                retry = retry.sleep().await;
1029            }
1030        }
1031    }
1032}
1033
1034#[cfg(test)]
1035mod tests {
1036    use super::*;
1037
1038    #[mz_ore::test(tokio::test)]
1039    #[cfg_attr(miri, ignore)] // error: unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
1040    async fn test_postgres_timestamp_oracle() -> Result<(), anyhow::Error> {
1041        let config = match PostgresTimestampOracleConfig::new_for_test() {
1042            Some(config) => config,
1043            None => {
1044                info!(
1045                    "{} env not set: skipping test that uses external service",
1046                    PostgresTimestampOracleConfig::EXTERNAL_TESTS_POSTGRES_URL
1047                );
1048                return Ok(());
1049            }
1050        };
1051
1052        crate::tests::timestamp_oracle_impl_test(|timeline, now_fn, initial_ts| {
1053            let oracle = PostgresTimestampOracle::open(
1054                config.clone(),
1055                timeline,
1056                initial_ts,
1057                now_fn,
1058                false, /* read-only */
1059            );
1060
1061            async {
1062                let arced_oracle: Arc<dyn TimestampOracle<Timestamp> + Send + Sync> =
1063                    Arc::new(oracle.await);
1064
1065                arced_oracle
1066            }
1067        })
1068        .await?;
1069
1070        Ok(())
1071    }
1072
1073    #[mz_ore::test(tokio::test)]
1074    #[cfg_attr(miri, ignore)] // error: unsupported operation: can't call foreign function `TLS_client_method` on OS `linux`
1075    async fn test_postgres_statement_timeout() -> Result<(), anyhow::Error> {
1076        let config = match PostgresTimestampOracleConfig::new_for_test() {
1077            Some(config) => config,
1078            None => {
1079                info!(
1080                    "{} env not set: skipping test that uses external service",
1081                    PostgresTimestampOracleConfig::EXTERNAL_TESTS_POSTGRES_URL
1082                );
1083                return Ok(());
1084            }
1085        };
1086
1087        // With the default config, the statement timeout is unset (the zero
1088        // sentinel), so a query that outlives a short sleep still completes.
1089        let no_timeout_client = PostgresClient::open(config.clone().into())?;
1090        let conn = no_timeout_client.get_connection().await?;
1091        pg_batch_execute(&conn, "SELECT pg_sleep(0.1)")
1092            .await
1093            .expect("query should not be aborted when no statement timeout is set");
1094        drop(conn);
1095
1096        // Apply a short statement timeout through the dynamic-config path. New
1097        // connections set it via the per-session setup hook.
1098        TimestampOracleParameters {
1099            pg_statement_timeout: Some(Duration::from_millis(100)),
1100            ..Default::default()
1101        }
1102        .apply(&config);
1103
1104        let timeout_client = PostgresClient::open(config.clone().into())?;
1105        let conn = timeout_client.get_connection().await?;
1106
1107        // A query that sleeps for much longer than the statement timeout must
1108        // be aborted by the server.
1109        let err = pg_batch_execute(&conn, "SELECT pg_sleep(5)")
1110            .await
1111            .expect_err("query should have been aborted by the statement timeout");
1112        assert_eq!(
1113            err.code(),
1114            Some(&SqlState::QUERY_CANCELED),
1115            "unexpected error, expected a statement timeout: {err:?}"
1116        );
1117
1118        Ok(())
1119    }
1120}