mz_persist_client/cli/
args.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! CLI argument types for persist
11
12use crate::ShardId;
13use crate::cfg::PersistConfig;
14use crate::internal::metrics::{MetricsBlob, MetricsConsensus};
15use crate::internal::state_versions::StateVersions;
16use crate::metrics::Metrics;
17use async_trait::async_trait;
18use bytes::Bytes;
19use mz_build_info::BuildInfo;
20use mz_ore::bytes::SegmentedBytes;
21use mz_ore::metrics::MetricsRegistry;
22use mz_ore::now::SYSTEM_TIME;
23use mz_ore::url::SensitiveUrl;
24use mz_persist::cfg::{BlobConfig, ConsensusConfig};
25use mz_persist::location::{
26    Blob, BlobMetadata, CaSResult, Consensus, ExternalError, ResultStream, SeqNo, Tasked,
27    VersionedData,
28};
29use std::str::FromStr;
30use std::sync::Arc;
31use std::sync::atomic::{AtomicBool, Ordering};
32use tracing::warn;
33
34/// Arguments for commands that work over both backing stores.
35/// TODO: squish this into `StateArgs`.
36#[derive(Debug, Clone, clap::Parser)]
37pub struct StoreArgs {
38    /// Consensus to use.
39    ///
40    /// When connecting to a deployed environment's consensus table, the Postgres/CRDB connection
41    /// string must contain the database name and `options=--search_path=consensus`.
42    ///
43    /// When connecting to Cockroach Cloud, use the following format:
44    ///
45    /// ```text
46    /// postgresql://<user>:$COCKROACH_PW@<hostname>:<port>/environment_<environment-id>
47    ///   ?sslmode=verify-full
48    ///   &sslrootcert=/path/to/cockroach-cloud/certs/cluster-ca.crt
49    ///   &options=--search_path=consensus
50    /// ```
51    ///
52    #[clap(long, verbatim_doc_comment, env = "CONSENSUS_URI")]
53    pub(crate) consensus_uri: SensitiveUrl,
54
55    /// Blob to use
56    ///
57    /// When connecting to a deployed environment's blob, the necessary connection glue must be in
58    /// place. e.g. for S3, sign into SSO, set AWS_PROFILE and AWS_REGION appropriately, with a blob
59    /// URI scoped to the environment's bucket prefix.
60    #[clap(long, env = "BLOB_URI")]
61    pub(crate) blob_uri: SensitiveUrl,
62}
63
64/// Arguments for viewing the current state of a given shard
65#[derive(Debug, Clone, clap::Parser)]
66pub struct StateArgs {
67    /// Shard to view
68    #[clap(long)]
69    pub(crate) shard_id: String,
70
71    /// Consensus to use.
72    ///
73    /// When connecting to a deployed environment's consensus table, the Postgres/CRDB connection
74    /// string must contain the database name and `options=--search_path=consensus`.
75    ///
76    /// When connecting to Cockroach Cloud, use the following format:
77    ///
78    /// ```text
79    /// postgresql://<user>:$COCKROACH_PW@<hostname>:<port>/environment_<environment-id>
80    ///   ?sslmode=verify-full
81    ///   &sslrootcert=/path/to/cockroach-cloud/certs/cluster-ca.crt
82    ///   &options=--search_path=consensus
83    /// ```
84    ///
85    #[clap(long, verbatim_doc_comment, env = "CONSENSUS_URI")]
86    pub(crate) consensus_uri: SensitiveUrl,
87
88    /// Blob to use
89    ///
90    /// When connecting to a deployed environment's blob, the necessary connection glue must be in
91    /// place. e.g. for S3, sign into SSO, set AWS_PROFILE and AWS_REGION appropriately, with a blob
92    /// URI scoped to the environment's bucket prefix.
93    #[clap(long, env = "BLOB_URI")]
94    pub(crate) blob_uri: SensitiveUrl,
95}
96
97// BuildInfo with a larger version than any version we expect to see in prod,
98// to ensure that any data read is from a smaller version and does not trigger
99// alerts.
100pub(crate) const READ_ALL_BUILD_INFO: BuildInfo = BuildInfo {
101    version: "99.999.99+test",
102    sha: "0000000000000000000000000000000000000000",
103};
104
105// All `inspect` command are read-only.
106pub(crate) const NO_COMMIT: bool = false;
107
108impl StateArgs {
109    pub(crate) fn shard_id(&self) -> ShardId {
110        ShardId::from_str(&self.shard_id).expect("invalid shard id")
111    }
112
113    pub(crate) async fn open(&self) -> Result<StateVersions, anyhow::Error> {
114        let cfg = PersistConfig::new_default_configs(&READ_ALL_BUILD_INFO, SYSTEM_TIME.clone());
115        let metrics = Arc::new(Metrics::new(&cfg, &MetricsRegistry::new()));
116        let consensus =
117            make_consensus(&cfg, &self.consensus_uri, NO_COMMIT, Arc::clone(&metrics)).await?;
118        let blob = make_blob(&cfg, &self.blob_uri, NO_COMMIT, Arc::clone(&metrics)).await?;
119        Ok(StateVersions::new(cfg, consensus, blob, metrics))
120    }
121}
122
123pub(super) async fn make_consensus(
124    cfg: &PersistConfig,
125    consensus_uri: &SensitiveUrl,
126    commit: bool,
127    metrics: Arc<Metrics>,
128) -> anyhow::Result<Arc<dyn Consensus>> {
129    let consensus = ConsensusConfig::try_from(
130        consensus_uri,
131        Box::new(cfg.clone()),
132        metrics.postgres_consensus.clone(),
133    )?;
134    let consensus = consensus.clone().open().await?;
135    let consensus = if commit {
136        consensus
137    } else {
138        Arc::new(ReadOnly::new(consensus))
139    };
140    let consensus = Arc::new(MetricsConsensus::new(consensus, Arc::clone(&metrics)));
141    let consensus = Arc::new(Tasked(consensus));
142    Ok(consensus)
143}
144
145pub(super) async fn make_blob(
146    cfg: &PersistConfig,
147    blob_uri: &SensitiveUrl,
148    commit: bool,
149    metrics: Arc<Metrics>,
150) -> anyhow::Result<Arc<dyn Blob>> {
151    let blob = BlobConfig::try_from(
152        blob_uri,
153        Box::new(cfg.clone()),
154        metrics.s3_blob.clone(),
155        Arc::clone(&cfg.configs),
156    )
157    .await?;
158    let blob = blob.clone().open().await?;
159    let blob = if commit {
160        blob
161    } else {
162        Arc::new(ReadOnly::new(blob))
163    };
164    let blob = Arc::new(MetricsBlob::new(blob, Arc::clone(&metrics)));
165    let blob = Arc::new(Tasked(blob));
166    Ok(blob)
167}
168
169/// Wrap a lower-level service (Blob or Consensus) to make it read only.
170/// This is probably not elaborate enough to work in general -- folks may expect to read
171/// their own writes, among other things -- but it should handle the case of GC, where
172/// all reads finish before the writes begin.
173#[derive(Debug)]
174struct ReadOnly<T> {
175    store: T,
176    ignored_write: AtomicBool,
177}
178
179impl<T> ReadOnly<T> {
180    fn new(store: T) -> Self {
181        Self {
182            store,
183            ignored_write: AtomicBool::new(false),
184        }
185    }
186
187    fn ignored_write(&self) -> bool {
188        self.ignored_write.load(Ordering::SeqCst)
189    }
190
191    fn ignoring_write(&self) {
192        self.ignored_write.store(true, Ordering::SeqCst)
193    }
194}
195
196#[async_trait]
197impl Blob for ReadOnly<Arc<dyn Blob>> {
198    async fn get(&self, key: &str) -> Result<Option<SegmentedBytes>, ExternalError> {
199        if self.ignored_write() {
200            warn!("potentially-invalid get({key}) after ignored write");
201        }
202        self.store.get(key).await
203    }
204
205    async fn list_keys_and_metadata(
206        &self,
207        key_prefix: &str,
208        f: &mut (dyn FnMut(BlobMetadata) + Send + Sync),
209    ) -> Result<(), ExternalError> {
210        if self.ignored_write() {
211            warn!("potentially-invalid list_keys_and_metadata() after ignored write");
212        }
213        self.store.list_keys_and_metadata(key_prefix, f).await
214    }
215
216    async fn set(&self, key: &str, _value: Bytes) -> Result<(), ExternalError> {
217        warn!("ignoring set({key}) in read-only mode");
218        self.ignoring_write();
219        Ok(())
220    }
221
222    async fn delete(&self, key: &str) -> Result<Option<usize>, ExternalError> {
223        warn!("ignoring delete({key}) in read-only mode");
224        self.ignoring_write();
225        Ok(None)
226    }
227
228    async fn restore(&self, key: &str) -> Result<(), ExternalError> {
229        warn!("ignoring restore({key}) in read-only mode");
230        self.ignoring_write();
231        Ok(())
232    }
233}
234
235#[async_trait]
236impl Consensus for ReadOnly<Arc<dyn Consensus>> {
237    fn list_keys(&self) -> ResultStream<String> {
238        if self.ignored_write() {
239            warn!("potentially-invalid list_keys() after ignored write");
240        }
241        self.store.list_keys()
242    }
243
244    async fn head(&self, key: &str) -> Result<Option<VersionedData>, ExternalError> {
245        if self.ignored_write() {
246            warn!("potentially-invalid head({key}) after ignored write");
247        }
248        self.store.head(key).await
249    }
250
251    async fn compare_and_set(
252        &self,
253        key: &str,
254        expected: Option<SeqNo>,
255        new: VersionedData,
256    ) -> Result<CaSResult, ExternalError> {
257        warn!(
258            "ignoring cas({key}) in read-only mode ({} bytes at seqno {expected:?})",
259            new.data.len(),
260        );
261        self.ignoring_write();
262        Ok(CaSResult::Committed)
263    }
264
265    async fn scan(
266        &self,
267        key: &str,
268        from: SeqNo,
269        limit: usize,
270    ) -> Result<Vec<VersionedData>, ExternalError> {
271        if self.ignored_write() {
272            warn!("potentially-invalid scan({key}) after ignored write");
273        }
274        self.store.scan(key, from, limit).await
275    }
276
277    async fn truncate(&self, key: &str, seqno: SeqNo) -> Result<usize, ExternalError> {
278        warn!("ignoring truncate({key}) in read-only mode (to seqno {seqno})");
279        self.ignoring_write();
280        Ok(0)
281    }
282}