use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, RwLock};
use std::time::{Duration, SystemTime};
use async_trait::async_trait;
use deadpool_postgres::{Object, PoolError};
use dec::Decimal;
use mz_adapter_types::timestamp_oracle::{
DEFAULT_PG_TIMESTAMP_ORACLE_CONNECT_TIMEOUT, DEFAULT_PG_TIMESTAMP_ORACLE_CONNPOOL_MAX_SIZE,
DEFAULT_PG_TIMESTAMP_ORACLE_CONNPOOL_MAX_WAIT, DEFAULT_PG_TIMESTAMP_ORACLE_CONNPOOL_TTL,
DEFAULT_PG_TIMESTAMP_ORACLE_CONNPOOL_TTL_STAGGER, DEFAULT_PG_TIMESTAMP_ORACLE_TCP_USER_TIMEOUT,
};
use mz_ore::error::ErrorExt;
use mz_ore::metrics::MetricsRegistry;
use mz_pgrepr::Numeric;
use mz_postgres_client::{PostgresClient, PostgresClientConfig, PostgresClientKnobs};
use mz_repr::Timestamp;
use serde::{Deserialize, Serialize};
use tracing::{debug, info};
use crate::coord::timeline::WriteTimestamp;
use crate::coord::timestamp_oracle::metrics::{Metrics, RetryMetrics};
use crate::coord::timestamp_oracle::retry::Retry;
use crate::coord::timestamp_oracle::{
self, GenericNowFn, ShareableTimestampOracle, TimestampOracle,
};
const SCHEMA: &str = "
CREATE TABLE IF NOT EXISTS timestamp_oracle (
timeline text NOT NULL,
read_ts DECIMAL(20,0) NOT NULL,
write_ts DECIMAL(20,0) NOT NULL,
PRIMARY KEY(timeline)
) WITH (sql_stats_automatic_collection_enabled = false);
";
#[derive(Debug)]
pub struct PostgresTimestampOracle<N>
where
N: GenericNowFn<Timestamp>,
{
timeline: String,
next: N,
postgres_client: Arc<PostgresClient>,
metrics: Arc<Metrics>,
}
#[derive(Clone, Debug)]
pub struct PostgresTimestampOracleConfig {
url: String,
pub metrics: Arc<Metrics>,
pub dynamic: Arc<DynamicConfig>,
}
impl From<PostgresTimestampOracleConfig> for PostgresClientConfig {
fn from(config: PostgresTimestampOracleConfig) -> Self {
let metrics = config.metrics.postgres_client.clone();
PostgresClientConfig::new(config.url.clone(), Arc::new(config), metrics)
}
}
impl PostgresTimestampOracleConfig {
pub(crate) const EXTERNAL_TESTS_POSTGRES_URL: &'static str = "COCKROACH_URL";
pub fn new(url: &str, metrics_registry: &MetricsRegistry) -> Self {
let metrics = Arc::new(timestamp_oracle::metrics::Metrics::new(metrics_registry));
let dynamic = DynamicConfig::default();
PostgresTimestampOracleConfig {
url: url.to_string(),
metrics,
dynamic: Arc::new(dynamic),
}
}
pub fn new_for_test() -> Option<Self> {
let url = match std::env::var(Self::EXTERNAL_TESTS_POSTGRES_URL) {
Ok(url) => url,
Err(_) => {
if mz_ore::env::is_var_truthy("CI") {
panic!("CI is supposed to run this test but something has gone wrong!");
}
return None;
}
};
let dynamic = DynamicConfig::default();
let config = PostgresTimestampOracleConfig {
url: url.to_string(),
metrics: Arc::new(Metrics::new(&MetricsRegistry::new())),
dynamic: Arc::new(dynamic),
};
Some(config)
}
}
#[derive(Debug)]
pub struct DynamicConfig {
pg_connection_pool_max_size: AtomicUsize,
pg_connection_pool_max_wait: RwLock<Option<Duration>>,
pg_connection_pool_ttl: RwLock<Duration>,
pg_connection_pool_ttl_stagger: RwLock<Duration>,
pg_connection_pool_connect_timeout: RwLock<Duration>,
pg_connection_pool_tcp_user_timeout: RwLock<Duration>,
}
impl Default for DynamicConfig {
fn default() -> Self {
Self {
pg_connection_pool_max_size: AtomicUsize::new(
DEFAULT_PG_TIMESTAMP_ORACLE_CONNPOOL_MAX_SIZE,
),
pg_connection_pool_max_wait: RwLock::new(Some(
DEFAULT_PG_TIMESTAMP_ORACLE_CONNPOOL_MAX_WAIT,
)),
pg_connection_pool_ttl: RwLock::new(DEFAULT_PG_TIMESTAMP_ORACLE_CONNPOOL_TTL),
pg_connection_pool_ttl_stagger: RwLock::new(
DEFAULT_PG_TIMESTAMP_ORACLE_CONNPOOL_TTL_STAGGER,
),
pg_connection_pool_connect_timeout: RwLock::new(
DEFAULT_PG_TIMESTAMP_ORACLE_CONNECT_TIMEOUT,
),
pg_connection_pool_tcp_user_timeout: RwLock::new(
DEFAULT_PG_TIMESTAMP_ORACLE_TCP_USER_TIMEOUT,
),
}
}
}
impl DynamicConfig {
const LOAD_ORDERING: Ordering = Ordering::SeqCst;
const STORE_ORDERING: Ordering = Ordering::SeqCst;
fn connection_pool_max_size(&self) -> usize {
self.pg_connection_pool_max_size.load(Self::LOAD_ORDERING)
}
fn connection_pool_max_wait(&self) -> Option<Duration> {
*self
.pg_connection_pool_max_wait
.read()
.expect("lock poisoned")
}
fn connection_pool_ttl(&self) -> Duration {
*self.pg_connection_pool_ttl.read().expect("lock poisoned")
}
fn connection_pool_ttl_stagger(&self) -> Duration {
*self
.pg_connection_pool_ttl_stagger
.read()
.expect("lock poisoned")
}
fn connect_timeout(&self) -> Duration {
*self
.pg_connection_pool_connect_timeout
.read()
.expect("lock poisoned")
}
fn tcp_user_timeout(&self) -> Duration {
*self
.pg_connection_pool_tcp_user_timeout
.read()
.expect("lock poisoned")
}
}
impl PostgresClientKnobs for PostgresTimestampOracleConfig {
fn connection_pool_max_size(&self) -> usize {
self.dynamic.connection_pool_max_size()
}
fn connection_pool_max_wait(&self) -> Option<Duration> {
self.dynamic.connection_pool_max_wait()
}
fn connection_pool_ttl(&self) -> Duration {
self.dynamic.connection_pool_ttl()
}
fn connection_pool_ttl_stagger(&self) -> Duration {
self.dynamic.connection_pool_ttl_stagger()
}
fn connect_timeout(&self) -> Duration {
self.dynamic.connect_timeout()
}
fn tcp_user_timeout(&self) -> Duration {
self.dynamic.tcp_user_timeout()
}
}
#[derive(Clone, Debug, Default, PartialEq, Eq, Serialize, Deserialize)]
pub struct PostgresTimestampOracleParameters {
pub pg_connection_pool_max_size: Option<usize>,
pub pg_connection_pool_max_wait: Option<Option<Duration>>,
pub pg_connection_pool_ttl: Option<Duration>,
pub pg_connection_pool_ttl_stagger: Option<Duration>,
pub pg_connection_pool_connect_timeout: Option<Duration>,
pub pg_connection_pool_tcp_user_timeout: Option<Duration>,
}
impl PostgresTimestampOracleParameters {
pub fn update(&mut self, other: PostgresTimestampOracleParameters) {
let Self {
pg_connection_pool_max_size: self_pg_connection_pool_max_size,
pg_connection_pool_max_wait: self_pg_connection_pool_max_wait,
pg_connection_pool_ttl: self_pg_connection_pool_ttl,
pg_connection_pool_ttl_stagger: self_pg_connection_pool_ttl_stagger,
pg_connection_pool_connect_timeout: self_pg_connection_pool_connect_timeout,
pg_connection_pool_tcp_user_timeout: self_pg_connection_pool_tcp_user_timeout,
} = self;
let Self {
pg_connection_pool_max_size: other_pg_connection_pool_max_size,
pg_connection_pool_max_wait: other_pg_connection_pool_max_wait,
pg_connection_pool_ttl: other_pg_connection_pool_ttl,
pg_connection_pool_ttl_stagger: other_pg_connection_pool_ttl_stagger,
pg_connection_pool_connect_timeout: other_pg_connection_pool_connect_timeout,
pg_connection_pool_tcp_user_timeout: other_pg_connection_pool_tcp_user_timeout,
} = other;
if let Some(v) = other_pg_connection_pool_max_size {
*self_pg_connection_pool_max_size = Some(v);
}
if let Some(v) = other_pg_connection_pool_max_wait {
*self_pg_connection_pool_max_wait = Some(v);
}
if let Some(v) = other_pg_connection_pool_ttl {
*self_pg_connection_pool_ttl = Some(v);
}
if let Some(v) = other_pg_connection_pool_ttl_stagger {
*self_pg_connection_pool_ttl_stagger = Some(v);
}
if let Some(v) = other_pg_connection_pool_connect_timeout {
*self_pg_connection_pool_connect_timeout = Some(v);
}
if let Some(v) = other_pg_connection_pool_tcp_user_timeout {
*self_pg_connection_pool_tcp_user_timeout = Some(v);
}
}
pub fn apply(&self, cfg: &PostgresTimestampOracleConfig) {
info!(params = ?self, "Applying configuration update!");
let Self {
pg_connection_pool_max_size,
pg_connection_pool_max_wait,
pg_connection_pool_ttl,
pg_connection_pool_ttl_stagger,
pg_connection_pool_connect_timeout,
pg_connection_pool_tcp_user_timeout,
} = self;
if let Some(pg_connection_pool_max_size) = pg_connection_pool_max_size {
cfg.dynamic
.pg_connection_pool_max_size
.store(*pg_connection_pool_max_size, DynamicConfig::STORE_ORDERING);
}
if let Some(pg_connection_pool_max_wait) = pg_connection_pool_max_wait {
let mut max_wait = cfg
.dynamic
.pg_connection_pool_max_wait
.write()
.expect("lock poisoned");
*max_wait = *pg_connection_pool_max_wait;
}
if let Some(pg_connection_pool_ttl) = pg_connection_pool_ttl {
let mut ttl = cfg
.dynamic
.pg_connection_pool_ttl
.write()
.expect("lock poisoned");
*ttl = *pg_connection_pool_ttl;
}
if let Some(pg_connection_pool_ttl_stagger) = pg_connection_pool_ttl_stagger {
let mut ttl_stagger = cfg
.dynamic
.pg_connection_pool_ttl_stagger
.write()
.expect("lock poisoned");
*ttl_stagger = *pg_connection_pool_ttl_stagger;
}
if let Some(pg_connection_pool_connect_timeout) = pg_connection_pool_connect_timeout {
let mut timeout = cfg
.dynamic
.pg_connection_pool_connect_timeout
.write()
.expect("lock poisoned");
*timeout = *pg_connection_pool_connect_timeout;
}
if let Some(pg_connection_pool_tcp_user_timeout) = pg_connection_pool_tcp_user_timeout {
let mut timeout = cfg
.dynamic
.pg_connection_pool_tcp_user_timeout
.write()
.expect("lock poisoned");
*timeout = *pg_connection_pool_tcp_user_timeout;
}
}
}
impl<N> PostgresTimestampOracle<N>
where
N: GenericNowFn<Timestamp> + std::fmt::Debug + 'static,
{
pub(crate) async fn open(
config: PostgresTimestampOracleConfig,
timeline: String,
initially: Timestamp,
next: N,
) -> Self {
info!(config = ?config, "opening PostgresTimestampOracle");
let fallible = || async {
let metrics = Arc::clone(&config.metrics);
let postgres_client = PostgresClient::open(config.clone().into())?;
let client = postgres_client.get_connection().await?;
client
.batch_execute(&format!(
"{} {}",
SCHEMA,
"ALTER TABLE timestamp_oracle CONFIGURE ZONE USING gc.ttlseconds = 600;",
))
.await?;
let oracle = PostgresTimestampOracle {
timeline: timeline.clone(),
next: next.clone(),
postgres_client: Arc::new(postgres_client),
metrics,
};
let q = r#"
INSERT INTO timestamp_oracle (timeline, read_ts, write_ts)
VALUES ($1, $2, $3)
ON CONFLICT (timeline) DO NOTHING;
"#;
let statement = client.prepare_cached(q).await?;
let initially_coerced = Self::ts_to_decimal(initially);
let _ = client
.execute(
&statement,
&[&oracle.timeline, &initially_coerced, &initially_coerced],
)
.await?;
ShareableTimestampOracle::apply_write(&oracle, initially).await;
Result::<_, anyhow::Error>::Ok(oracle)
};
let metrics = &config.metrics.retries.open;
let oracle = retry_fallible(metrics, fallible).await;
oracle
}
async fn get_connection(&self) -> Result<Object, PoolError> {
self.postgres_client.get_connection().await
}
pub(crate) async fn get_all_timelines(
config: PostgresTimestampOracleConfig,
) -> Result<Vec<(String, Timestamp)>, anyhow::Error> {
let fallible = || async {
let postgres_client = PostgresClient::open(config.clone().into())?;
let mut client = postgres_client.get_connection().await?;
let txn = client.transaction().await?;
let q = r#"
SELECT EXISTS (SELECT * FROM information_schema.tables WHERE table_name = 'timestamp_oracle' AND table_schema = CURRENT_SCHEMA);
"#;
let statement = txn.prepare(q).await?;
let exists_row = txn.query_one(&statement, &[]).await?;
let exists: bool = exists_row.try_get("exists").expect("missing exists column");
if !exists {
return Ok(Vec::new());
}
let q = r#"
SELECT timeline, GREATEST(read_ts, write_ts) as ts FROM timestamp_oracle;
"#;
let statement = txn.prepare(q).await?;
let rows = txn.query(&statement, &[]).await?;
txn.commit().await?;
let result = rows
.into_iter()
.map(|row| {
let timeline: String =
row.try_get("timeline").expect("missing timeline column");
let ts: Numeric = row.try_get("ts").expect("missing ts column");
let ts = Self::decimal_to_ts(ts);
(timeline, ts)
})
.collect::<Vec<_>>();
Ok(result)
};
let metrics = &config.metrics.retries.get_all_timelines;
let result = retry_fallible(metrics, fallible).await;
Ok(result)
}
#[tracing::instrument(name = "oracle::write_ts", level = "debug", skip_all)]
async fn fallible_write_ts(&self) -> Result<WriteTimestamp<Timestamp>, anyhow::Error> {
let proposed_next_ts = self.next.now();
let proposed_next_ts = Self::ts_to_decimal(proposed_next_ts);
let q = r#"
UPDATE timestamp_oracle SET write_ts = GREATEST(write_ts+1, $2)
WHERE timeline = $1
RETURNING write_ts;
"#;
let client = self.get_connection().await?;
let statement = client.prepare_cached(q).await?;
let result = client
.query_one(&statement, &[&self.timeline, &proposed_next_ts])
.await?;
let write_ts: Numeric = result.try_get("write_ts").expect("missing column write_ts");
let write_ts = Self::decimal_to_ts(write_ts);
debug!(
timeline = ?self.timeline,
write_ts = ?write_ts,
proposed_next_ts = ?proposed_next_ts,
"returning from write_ts()");
let advance_to = write_ts.step_forward();
Ok(WriteTimestamp {
timestamp: write_ts,
advance_to,
})
}
#[tracing::instrument(name = "oracle::peek_write_ts", level = "debug", skip_all)]
async fn fallible_peek_write_ts(&self) -> Result<Timestamp, anyhow::Error> {
let q = r#"
SELECT write_ts FROM timestamp_oracle
WHERE timeline = $1;
"#;
let client = self.get_connection().await?;
let statement = client.prepare_cached(q).await?;
let result = client.query_one(&statement, &[&self.timeline]).await?;
let write_ts: Numeric = result.try_get("write_ts").expect("missing column write_ts");
let write_ts = Self::decimal_to_ts(write_ts);
debug!(
timeline = ?self.timeline,
write_ts = ?write_ts,
"returning from peek_write_ts()");
Ok(write_ts)
}
#[tracing::instrument(name = "oracle::read_ts", level = "debug", skip_all)]
async fn fallible_read_ts(&self) -> Result<Timestamp, anyhow::Error> {
let q = r#"
SELECT read_ts FROM timestamp_oracle
WHERE timeline = $1;
"#;
let client = self.get_connection().await?;
let statement = client.prepare_cached(q).await?;
let result = client.query_one(&statement, &[&self.timeline]).await?;
let read_ts: Numeric = result.try_get("read_ts").expect("missing column read_ts");
let read_ts = Self::decimal_to_ts(read_ts);
debug!(
timeline = ?self.timeline,
read_ts = ?read_ts,
"returning from read_ts()");
Ok(read_ts)
}
#[tracing::instrument(name = "oracle::apply_write", level = "debug", skip_all)]
async fn fallible_apply_write(&self, write_ts: Timestamp) -> Result<(), anyhow::Error> {
let q = r#"
UPDATE timestamp_oracle SET write_ts = GREATEST(write_ts, $2), read_ts = GREATEST(read_ts, $2)
WHERE timeline = $1;
"#;
let client = self.get_connection().await?;
let statement = client.prepare_cached(q).await?;
let write_ts = Self::ts_to_decimal(write_ts);
let _ = client
.execute(&statement, &[&self.timeline, &write_ts])
.await?;
debug!(
timeline = ?self.timeline,
write_ts = ?write_ts,
"returning from apply_write()");
Ok(())
}
fn ts_to_decimal(ts: Timestamp) -> Numeric {
let decimal = Decimal::from(ts);
Numeric::from(decimal)
}
fn decimal_to_ts(ts: Numeric) -> Timestamp {
ts.0 .0.try_into().expect("we only use u64 timestamps")
}
}
#[async_trait]
impl<N> ShareableTimestampOracle<Timestamp> for PostgresTimestampOracle<N>
where
N: GenericNowFn<Timestamp> + std::fmt::Debug + 'static,
{
async fn write_ts(&self) -> WriteTimestamp<Timestamp> {
let metrics = &self.metrics.retries.write_ts;
let res = retry_fallible(metrics, || {
self.metrics
.oracle
.write_ts
.run_op(|| self.fallible_write_ts())
})
.await;
res
}
async fn peek_write_ts(&self) -> Timestamp {
let metrics = &self.metrics.retries.peek_write_ts;
let res = retry_fallible(metrics, || {
self.metrics
.oracle
.peek_write_ts
.run_op(|| self.fallible_peek_write_ts())
})
.await;
res
}
async fn read_ts(&self) -> Timestamp {
let metrics = &self.metrics.retries.read_ts;
let res = retry_fallible(metrics, || {
self.metrics
.oracle
.read_ts
.run_op(|| self.fallible_read_ts())
})
.await;
res
}
async fn apply_write(&self, write_ts: Timestamp) {
let metrics = &self.metrics.retries.apply_write;
let res = retry_fallible(metrics, || {
self.metrics
.oracle
.apply_write
.run_op(|| self.fallible_apply_write(write_ts.clone()))
})
.await;
res
}
}
pub const INFO_MIN_ATTEMPTS: usize = 3;
pub async fn retry_fallible<R, F, WorkFn>(metrics: &RetryMetrics, mut work_fn: WorkFn) -> R
where
F: std::future::Future<Output = Result<R, anyhow::Error>>,
WorkFn: FnMut() -> F,
{
let mut retry = metrics.stream(Retry::oracle_defaults(SystemTime::now()).into_retry_stream());
loop {
match work_fn().await {
Ok(x) => {
if retry.attempt() > 0 {
debug!(
"external operation {} succeeded after failing at least once",
metrics.name,
);
}
return x;
}
Err(err)
if err
.to_string()
.contains("\"timestamp_oracle\" does not exist") =>
{
panic!(
"external operation {} failed unrecoverably, someone removed our database/schema/table: {}",
metrics.name,
err.display_with_causes()
);
}
Err(err) => {
if retry.attempt() >= INFO_MIN_ATTEMPTS {
info!(
"external operation {} failed, retrying in {:?}: {}",
metrics.name,
retry.next_sleep(),
err.display_with_causes()
);
} else {
debug!(
"external operation {} failed, retrying in {:?}: {}",
metrics.name,
retry.next_sleep(),
err.display_with_causes()
);
}
retry = retry.sleep().await;
}
}
}
}
#[async_trait(?Send)]
impl<N> TimestampOracle<Timestamp> for PostgresTimestampOracle<N>
where
N: GenericNowFn<Timestamp> + std::fmt::Debug + 'static,
{
#[tracing::instrument(name = "oracle::write_ts", level = "debug", skip_all)]
async fn write_ts(&mut self) -> WriteTimestamp<Timestamp> {
ShareableTimestampOracle::write_ts(self).await
}
#[tracing::instrument(name = "oracle::peek_write_ts", level = "debug", skip_all)]
async fn peek_write_ts(&self) -> Timestamp {
ShareableTimestampOracle::peek_write_ts(self).await
}
#[tracing::instrument(name = "oracle::read_ts", level = "debug", skip_all)]
async fn read_ts(&self) -> Timestamp {
ShareableTimestampOracle::read_ts(self).await
}
#[tracing::instrument(name = "oracle::apply_write", level = "debug", skip_all)]
async fn apply_write(&mut self, write_ts: Timestamp) {
ShareableTimestampOracle::apply_write(self, write_ts).await
}
fn get_shared(&self) -> Option<Arc<dyn ShareableTimestampOracle<Timestamp> + Send + Sync>> {
let shallow_clone = Self {
timeline: self.timeline.clone(),
next: self.next.clone(),
postgres_client: Arc::clone(&self.postgres_client),
metrics: Arc::clone(&self.metrics),
};
Some(Arc::new(shallow_clone))
}
}
#[cfg(test)]
mod tests {
use crate::coord::timestamp_oracle;
use super::*;
#[mz_ore::test(tokio::test)]
#[cfg_attr(miri, ignore)] async fn test_postgres_timestamp_oracle() -> Result<(), anyhow::Error> {
let config = match PostgresTimestampOracleConfig::new_for_test() {
Some(config) => config,
None => {
info!(
"{} env not set: skipping test that uses external service",
PostgresTimestampOracleConfig::EXTERNAL_TESTS_POSTGRES_URL
);
return Ok(());
}
};
timestamp_oracle::tests::timestamp_oracle_impl_test(|timeline, now_fn, initial_ts| {
let oracle =
PostgresTimestampOracle::open(config.clone(), timeline, initial_ts, now_fn);
oracle
})
.await?;
Ok(())
}
#[mz_ore::test(tokio::test)]
#[cfg_attr(miri, ignore)] async fn test_shareable_postgres_timestamp_oracle() -> Result<(), anyhow::Error> {
let config = match PostgresTimestampOracleConfig::new_for_test() {
Some(config) => config,
None => {
info!(
"{} env not set: skipping test that uses external service",
PostgresTimestampOracleConfig::EXTERNAL_TESTS_POSTGRES_URL
);
return Ok(());
}
};
timestamp_oracle::tests::shareable_timestamp_oracle_impl_test(
|timeline, now_fn, initial_ts| {
let oracle =
PostgresTimestampOracle::open(config.clone(), timeline, initial_ts, now_fn);
async {
oracle
.await
.get_shared()
.expect("postgres oracle is shareable")
}
},
)
.await?;
Ok(())
}
}