mz_timestamp_oracle/
config.rs1use std::sync::Arc;
17
18use mz_ore::metrics::MetricsRegistry;
19use mz_ore::now::NowFn;
20use mz_ore::url::SensitiveUrl;
21use mz_repr::Timestamp;
22
23use crate::TimestampOracle;
24#[cfg(any(target_os = "linux", feature = "foundationdb"))]
25use crate::foundationdb_oracle::{FdbTimestampOracle, FdbTimestampOracleConfig};
26use crate::metrics::Metrics;
27use crate::postgres_oracle::{
28 PostgresTimestampOracle, PostgresTimestampOracleConfig, TimestampOracleParameters,
29};
30
31#[derive(Clone, Debug)]
36pub enum TimestampOracleConfig {
37 Postgres(PostgresTimestampOracleConfig),
39 #[cfg(any(target_os = "linux", feature = "foundationdb"))]
41 Fdb(FdbTimestampOracleConfig),
42}
43
44impl TimestampOracleConfig {
45 pub fn from_url(
53 url: &SensitiveUrl,
54 metrics_registry: &MetricsRegistry,
55 ) -> Result<Self, anyhow::Error> {
56 let scheme = url.scheme();
57 match scheme {
58 "postgres" | "postgresql" => Ok(Self::new_postgres(url, metrics_registry)),
59 #[cfg(any(target_os = "linux", feature = "foundationdb"))]
60 "foundationdb" => Ok(Self::new_fdb(url.clone(), metrics_registry)),
61 #[cfg(not(any(target_os = "linux", feature = "foundationdb")))]
62 "foundationdb" => {
63 anyhow::bail!("FoundationDB timestamp oracle is not supported on this platform")
64 }
65 _ => {
66 anyhow::bail!(
67 "unsupported timestamp oracle URL scheme: '{}'. \
68 Supported schemes: postgres, postgresql, foundationdb",
69 scheme
70 )
71 }
72 }
73 }
74
75 pub fn new_postgres(url: &SensitiveUrl, metrics_registry: &MetricsRegistry) -> Self {
77 TimestampOracleConfig::Postgres(PostgresTimestampOracleConfig::new(url, metrics_registry))
78 }
79
80 #[cfg(any(target_os = "linux", feature = "foundationdb"))]
82 pub fn new_fdb(url: SensitiveUrl, metrics_registry: &MetricsRegistry) -> Self {
83 TimestampOracleConfig::Fdb(FdbTimestampOracleConfig::new(url, metrics_registry))
84 }
85
86 pub fn metrics(&self) -> Arc<Metrics> {
88 match self {
89 TimestampOracleConfig::Postgres(config) => Arc::clone(config.metrics()),
90 #[cfg(any(target_os = "linux", feature = "foundationdb"))]
91 TimestampOracleConfig::Fdb(config) => Arc::clone(config.metrics()),
92 }
93 }
94
95 pub async fn open(
97 &self,
98 timeline: String,
99 initially: Timestamp,
100 now_fn: NowFn,
101 read_only: bool,
102 ) -> Arc<dyn TimestampOracle<Timestamp> + Send + Sync> {
103 match self {
104 TimestampOracleConfig::Postgres(config) => Arc::new(
105 PostgresTimestampOracle::open(
106 config.clone(),
107 timeline,
108 initially,
109 now_fn.clone(),
110 read_only,
111 )
112 .await,
113 ),
114 #[cfg(any(target_os = "linux", feature = "foundationdb"))]
115 TimestampOracleConfig::Fdb(config) => {
116 let fdb_oracle = FdbTimestampOracle::open(
117 config.clone(),
118 timeline,
119 initially,
120 now_fn,
121 read_only,
122 )
123 .await
124 .expect("failed to open FdbTimestampOracle");
125 Arc::new(fdb_oracle)
126 }
127 }
128 }
129
130 pub async fn get_all_timelines(&self) -> Result<Vec<(String, Timestamp)>, anyhow::Error> {
134 match self {
135 TimestampOracleConfig::Postgres(config) => {
136 PostgresTimestampOracle::<NowFn>::get_all_timelines(config.clone()).await
137 }
138 #[cfg(any(target_os = "linux", feature = "foundationdb"))]
139 TimestampOracleConfig::Fdb(config) => {
140 FdbTimestampOracle::<NowFn>::get_all_timelines(config.clone()).await
141 }
142 }
143 }
144
145 pub fn apply_parameters(&self, params: TimestampOracleParameters) {
149 #[allow(irrefutable_let_patterns)]
151 if let TimestampOracleConfig::Postgres(pg_config) = self {
152 params.apply(pg_config)
153 }
154 }
155}