1use crate::ShardId;
13use crate::cfg::PersistConfig;
14use crate::internal::metrics::{MetricsBlob, MetricsConsensus};
15use crate::internal::state_versions::StateVersions;
16use crate::metrics::Metrics;
17use async_trait::async_trait;
18use bytes::Bytes;
19use mz_build_info::BuildInfo;
20use mz_ore::bytes::SegmentedBytes;
21use mz_ore::metrics::MetricsRegistry;
22use mz_ore::now::SYSTEM_TIME;
23use mz_ore::url::SensitiveUrl;
24use mz_persist::cfg::{BlobConfig, ConsensusConfig};
25use mz_persist::location::{
26 Blob, BlobMetadata, CaSResult, Consensus, ExternalError, ResultStream, SeqNo, Tasked,
27 VersionedData,
28};
29use std::str::FromStr;
30use std::sync::Arc;
31use std::sync::atomic::{AtomicBool, Ordering};
32use tracing::warn;
33
34#[derive(Debug, Clone, clap::Parser)]
37pub struct StoreArgs {
38 #[clap(long, verbatim_doc_comment, env = "CONSENSUS_URI")]
53 pub(crate) consensus_uri: SensitiveUrl,
54
55 #[clap(long, env = "BLOB_URI")]
61 pub(crate) blob_uri: SensitiveUrl,
62}
63
64#[derive(Debug, Clone, clap::Parser)]
66pub struct StateArgs {
67 #[clap(long)]
69 pub(crate) shard_id: String,
70
71 #[clap(long, verbatim_doc_comment, env = "CONSENSUS_URI")]
86 pub(crate) consensus_uri: SensitiveUrl,
87
88 #[clap(long, env = "BLOB_URI")]
94 pub(crate) blob_uri: SensitiveUrl,
95}
96
97pub(crate) const READ_ALL_BUILD_INFO: BuildInfo = BuildInfo {
101 version: "99.999.99+test",
102 sha: "0000000000000000000000000000000000000000",
103};
104
105pub(crate) const NO_COMMIT: bool = false;
107
108impl StateArgs {
109 pub(crate) fn shard_id(&self) -> ShardId {
110 ShardId::from_str(&self.shard_id).expect("invalid shard id")
111 }
112
113 pub(crate) async fn open(&self) -> Result<StateVersions, anyhow::Error> {
114 let cfg = PersistConfig::new_default_configs(&READ_ALL_BUILD_INFO, SYSTEM_TIME.clone());
115 let metrics = Arc::new(Metrics::new(&cfg, &MetricsRegistry::new()));
116 let consensus =
117 make_consensus(&cfg, &self.consensus_uri, NO_COMMIT, Arc::clone(&metrics)).await?;
118 let blob = make_blob(&cfg, &self.blob_uri, NO_COMMIT, Arc::clone(&metrics)).await?;
119 Ok(StateVersions::new(cfg, consensus, blob, metrics))
120 }
121}
122
123pub(super) async fn make_consensus(
124 cfg: &PersistConfig,
125 consensus_uri: &SensitiveUrl,
126 commit: bool,
127 metrics: Arc<Metrics>,
128) -> anyhow::Result<Arc<dyn Consensus>> {
129 let consensus = ConsensusConfig::try_from(
130 consensus_uri,
131 Box::new(cfg.clone()),
132 metrics.postgres_consensus.clone(),
133 )?;
134 let consensus = consensus.clone().open().await?;
135 let consensus = if commit {
136 consensus
137 } else {
138 Arc::new(ReadOnly::new(consensus))
139 };
140 let consensus = Arc::new(MetricsConsensus::new(consensus, Arc::clone(&metrics)));
141 let consensus = Arc::new(Tasked(consensus));
142 Ok(consensus)
143}
144
145pub(super) async fn make_blob(
146 cfg: &PersistConfig,
147 blob_uri: &SensitiveUrl,
148 commit: bool,
149 metrics: Arc<Metrics>,
150) -> anyhow::Result<Arc<dyn Blob>> {
151 let blob = BlobConfig::try_from(
152 blob_uri,
153 Box::new(cfg.clone()),
154 metrics.s3_blob.clone(),
155 Arc::clone(&cfg.configs),
156 )
157 .await?;
158 let blob = blob.clone().open().await?;
159 let blob = if commit {
160 blob
161 } else {
162 Arc::new(ReadOnly::new(blob))
163 };
164 let blob = Arc::new(MetricsBlob::new(blob, Arc::clone(&metrics)));
165 let blob = Arc::new(Tasked(blob));
166 Ok(blob)
167}
168
169#[derive(Debug)]
174struct ReadOnly<T> {
175 store: T,
176 ignored_write: AtomicBool,
177}
178
179impl<T> ReadOnly<T> {
180 fn new(store: T) -> Self {
181 Self {
182 store,
183 ignored_write: AtomicBool::new(false),
184 }
185 }
186
187 fn ignored_write(&self) -> bool {
188 self.ignored_write.load(Ordering::SeqCst)
189 }
190
191 fn ignoring_write(&self) {
192 self.ignored_write.store(true, Ordering::SeqCst)
193 }
194}
195
196#[async_trait]
197impl Blob for ReadOnly<Arc<dyn Blob>> {
198 async fn get(&self, key: &str) -> Result<Option<SegmentedBytes>, ExternalError> {
199 if self.ignored_write() {
200 warn!("potentially-invalid get({key}) after ignored write");
201 }
202 self.store.get(key).await
203 }
204
205 async fn list_keys_and_metadata(
206 &self,
207 key_prefix: &str,
208 f: &mut (dyn FnMut(BlobMetadata) + Send + Sync),
209 ) -> Result<(), ExternalError> {
210 if self.ignored_write() {
211 warn!("potentially-invalid list_keys_and_metadata() after ignored write");
212 }
213 self.store.list_keys_and_metadata(key_prefix, f).await
214 }
215
216 async fn set(&self, key: &str, _value: Bytes) -> Result<(), ExternalError> {
217 warn!("ignoring set({key}) in read-only mode");
218 self.ignoring_write();
219 Ok(())
220 }
221
222 async fn delete(&self, key: &str) -> Result<Option<usize>, ExternalError> {
223 warn!("ignoring delete({key}) in read-only mode");
224 self.ignoring_write();
225 Ok(None)
226 }
227
228 async fn restore(&self, key: &str) -> Result<(), ExternalError> {
229 warn!("ignoring restore({key}) in read-only mode");
230 self.ignoring_write();
231 Ok(())
232 }
233}
234
235#[async_trait]
236impl Consensus for ReadOnly<Arc<dyn Consensus>> {
237 fn list_keys(&self) -> ResultStream<String> {
238 if self.ignored_write() {
239 warn!("potentially-invalid list_keys() after ignored write");
240 }
241 self.store.list_keys()
242 }
243
244 async fn head(&self, key: &str) -> Result<Option<VersionedData>, ExternalError> {
245 if self.ignored_write() {
246 warn!("potentially-invalid head({key}) after ignored write");
247 }
248 self.store.head(key).await
249 }
250
251 async fn compare_and_set(
252 &self,
253 key: &str,
254 expected: Option<SeqNo>,
255 new: VersionedData,
256 ) -> Result<CaSResult, ExternalError> {
257 warn!(
258 "ignoring cas({key}) in read-only mode ({} bytes at seqno {expected:?})",
259 new.data.len(),
260 );
261 self.ignoring_write();
262 Ok(CaSResult::Committed)
263 }
264
265 async fn scan(
266 &self,
267 key: &str,
268 from: SeqNo,
269 limit: usize,
270 ) -> Result<Vec<VersionedData>, ExternalError> {
271 if self.ignored_write() {
272 warn!("potentially-invalid scan({key}) after ignored write");
273 }
274 self.store.scan(key, from, limit).await
275 }
276
277 async fn truncate(&self, key: &str, seqno: SeqNo) -> Result<usize, ExternalError> {
278 warn!("ignoring truncate({key}) in read-only mode (to seqno {seqno})");
279 self.ignoring_write();
280 Ok(0)
281 }
282}