1use crate::ShardId;
13use crate::cfg::PersistConfig;
14use crate::internal::metrics::{MetricsBlob, MetricsConsensus};
15use crate::internal::state_versions::StateVersions;
16use crate::metrics::Metrics;
17use async_trait::async_trait;
18use bytes::Bytes;
19use mz_build_info::BuildInfo;
20use mz_ore::bytes::SegmentedBytes;
21use mz_ore::metrics::MetricsRegistry;
22use mz_ore::now::SYSTEM_TIME;
23use mz_ore::url::SensitiveUrl;
24use mz_persist::cfg::{BlobConfig, ConsensusConfig};
25use mz_persist::location::{
26    Blob, BlobMetadata, CaSResult, Consensus, ExternalError, ResultStream, SeqNo, Tasked,
27    VersionedData,
28};
29use std::str::FromStr;
30use std::sync::Arc;
31use std::sync::atomic::{AtomicBool, Ordering};
32use tracing::warn;
33
34#[derive(Debug, Clone, clap::Parser)]
37pub struct StoreArgs {
38    #[clap(long, verbatim_doc_comment, env = "CONSENSUS_URI")]
53    pub(crate) consensus_uri: SensitiveUrl,
54
55    #[clap(long, env = "BLOB_URI")]
61    pub(crate) blob_uri: SensitiveUrl,
62}
63
64#[derive(Debug, Clone, clap::Parser)]
66pub struct StateArgs {
67    #[clap(long)]
69    pub(crate) shard_id: String,
70
71    #[clap(long, verbatim_doc_comment, env = "CONSENSUS_URI")]
86    pub(crate) consensus_uri: SensitiveUrl,
87
88    #[clap(long, env = "BLOB_URI")]
94    pub(crate) blob_uri: SensitiveUrl,
95}
96
97pub(crate) const READ_ALL_BUILD_INFO: BuildInfo = BuildInfo {
101    version: "99.999.99+test",
102    sha: "0000000000000000000000000000000000000000",
103};
104
105pub(crate) const NO_COMMIT: bool = false;
107
108impl StateArgs {
109    pub(crate) fn shard_id(&self) -> ShardId {
110        ShardId::from_str(&self.shard_id).expect("invalid shard id")
111    }
112
113    pub(crate) async fn open(&self) -> Result<StateVersions, anyhow::Error> {
114        let cfg = PersistConfig::new_default_configs(&READ_ALL_BUILD_INFO, SYSTEM_TIME.clone());
115        let metrics = Arc::new(Metrics::new(&cfg, &MetricsRegistry::new()));
116        let consensus =
117            make_consensus(&cfg, &self.consensus_uri, NO_COMMIT, Arc::clone(&metrics)).await?;
118        let blob = make_blob(&cfg, &self.blob_uri, NO_COMMIT, Arc::clone(&metrics)).await?;
119        Ok(StateVersions::new(cfg, consensus, blob, metrics))
120    }
121}
122
123pub(super) async fn make_consensus(
124    cfg: &PersistConfig,
125    consensus_uri: &SensitiveUrl,
126    commit: bool,
127    metrics: Arc<Metrics>,
128) -> anyhow::Result<Arc<dyn Consensus>> {
129    let consensus = ConsensusConfig::try_from(
130        consensus_uri,
131        Box::new(cfg.clone()),
132        metrics.postgres_consensus.clone(),
133        Arc::clone(&cfg.configs),
134    )?;
135    let consensus = consensus.clone().open().await?;
136    let consensus = if commit {
137        consensus
138    } else {
139        Arc::new(ReadOnly::new(consensus))
140    };
141    let consensus = Arc::new(MetricsConsensus::new(consensus, Arc::clone(&metrics)));
142    let consensus = Arc::new(Tasked(consensus));
143    Ok(consensus)
144}
145
146pub(super) async fn make_blob(
147    cfg: &PersistConfig,
148    blob_uri: &SensitiveUrl,
149    commit: bool,
150    metrics: Arc<Metrics>,
151) -> anyhow::Result<Arc<dyn Blob>> {
152    let blob = BlobConfig::try_from(
153        blob_uri,
154        Box::new(cfg.clone()),
155        metrics.s3_blob.clone(),
156        Arc::clone(&cfg.configs),
157    )
158    .await?;
159    let blob = blob.clone().open().await?;
160    let blob = if commit {
161        blob
162    } else {
163        Arc::new(ReadOnly::new(blob))
164    };
165    let blob = Arc::new(MetricsBlob::new(blob, Arc::clone(&metrics)));
166    let blob = Arc::new(Tasked(blob));
167    Ok(blob)
168}
169
170#[derive(Debug)]
175struct ReadOnly<T> {
176    store: T,
177    ignored_write: AtomicBool,
178}
179
180impl<T> ReadOnly<T> {
181    fn new(store: T) -> Self {
182        Self {
183            store,
184            ignored_write: AtomicBool::new(false),
185        }
186    }
187
188    fn ignored_write(&self) -> bool {
189        self.ignored_write.load(Ordering::SeqCst)
190    }
191
192    fn ignoring_write(&self) {
193        self.ignored_write.store(true, Ordering::SeqCst)
194    }
195}
196
197#[async_trait]
198impl Blob for ReadOnly<Arc<dyn Blob>> {
199    async fn get(&self, key: &str) -> Result<Option<SegmentedBytes>, ExternalError> {
200        if self.ignored_write() {
201            warn!("potentially-invalid get({key}) after ignored write");
202        }
203        self.store.get(key).await
204    }
205
206    async fn list_keys_and_metadata(
207        &self,
208        key_prefix: &str,
209        f: &mut (dyn FnMut(BlobMetadata) + Send + Sync),
210    ) -> Result<(), ExternalError> {
211        if self.ignored_write() {
212            warn!("potentially-invalid list_keys_and_metadata() after ignored write");
213        }
214        self.store.list_keys_and_metadata(key_prefix, f).await
215    }
216
217    async fn set(&self, key: &str, _value: Bytes) -> Result<(), ExternalError> {
218        warn!("ignoring set({key}) in read-only mode");
219        self.ignoring_write();
220        Ok(())
221    }
222
223    async fn delete(&self, key: &str) -> Result<Option<usize>, ExternalError> {
224        warn!("ignoring delete({key}) in read-only mode");
225        self.ignoring_write();
226        Ok(None)
227    }
228
229    async fn restore(&self, key: &str) -> Result<(), ExternalError> {
230        warn!("ignoring restore({key}) in read-only mode");
231        self.ignoring_write();
232        Ok(())
233    }
234}
235
236#[async_trait]
237impl Consensus for ReadOnly<Arc<dyn Consensus>> {
238    fn list_keys(&self) -> ResultStream<'_, String> {
239        if self.ignored_write() {
240            warn!("potentially-invalid list_keys() after ignored write");
241        }
242        self.store.list_keys()
243    }
244
245    async fn head(&self, key: &str) -> Result<Option<VersionedData>, ExternalError> {
246        if self.ignored_write() {
247            warn!("potentially-invalid head({key}) after ignored write");
248        }
249        self.store.head(key).await
250    }
251
252    async fn compare_and_set(
253        &self,
254        key: &str,
255        expected: Option<SeqNo>,
256        new: VersionedData,
257    ) -> Result<CaSResult, ExternalError> {
258        warn!(
259            "ignoring cas({key}) in read-only mode ({} bytes at seqno {expected:?})",
260            new.data.len(),
261        );
262        self.ignoring_write();
263        Ok(CaSResult::Committed)
264    }
265
266    async fn scan(
267        &self,
268        key: &str,
269        from: SeqNo,
270        limit: usize,
271    ) -> Result<Vec<VersionedData>, ExternalError> {
272        if self.ignored_write() {
273            warn!("potentially-invalid scan({key}) after ignored write");
274        }
275        self.store.scan(key, from, limit).await
276    }
277
278    async fn truncate(&self, key: &str, seqno: SeqNo) -> Result<usize, ExternalError> {
279        warn!("ignoring truncate({key}) in read-only mode (to seqno {seqno})");
280        self.ignoring_write();
281        Ok(0)
282    }
283}