use crate::cfg::PersistConfig;
use crate::internal::metrics::{MetricsBlob, MetricsConsensus};
use crate::internal::state_versions::StateVersions;
use crate::metrics::Metrics;
use crate::ShardId;
use async_trait::async_trait;
use bytes::Bytes;
use mz_build_info::BuildInfo;
use mz_ore::bytes::SegmentedBytes;
use mz_ore::metrics::MetricsRegistry;
use mz_ore::now::SYSTEM_TIME;
use mz_ore::url::SensitiveUrl;
use mz_persist::cfg::{BlobConfig, ConsensusConfig};
use mz_persist::location::{
Blob, BlobMetadata, CaSResult, Consensus, ExternalError, ResultStream, SeqNo, Tasked,
VersionedData,
};
use std::str::FromStr;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use tracing::warn;
#[derive(Debug, Clone, clap::Parser)]
pub struct StoreArgs {
#[clap(long, verbatim_doc_comment, env = "CONSENSUS_URI")]
pub(crate) consensus_uri: SensitiveUrl,
#[clap(long, env = "BLOB_URI")]
pub(crate) blob_uri: SensitiveUrl,
}
#[derive(Debug, Clone, clap::Parser)]
pub struct StateArgs {
#[clap(long)]
pub(crate) shard_id: String,
#[clap(long, verbatim_doc_comment, env = "CONSENSUS_URI")]
pub(crate) consensus_uri: SensitiveUrl,
#[clap(long, env = "BLOB_URI")]
pub(crate) blob_uri: SensitiveUrl,
}
pub(crate) const READ_ALL_BUILD_INFO: BuildInfo = BuildInfo {
version: "99.999.99+test",
sha: "0000000000000000000000000000000000000000",
time: "",
};
pub(crate) const NO_COMMIT: bool = false;
impl StateArgs {
pub(crate) fn shard_id(&self) -> ShardId {
ShardId::from_str(&self.shard_id).expect("invalid shard id")
}
pub(crate) async fn open(&self) -> Result<StateVersions, anyhow::Error> {
let cfg = PersistConfig::new_default_configs(&READ_ALL_BUILD_INFO, SYSTEM_TIME.clone());
let metrics = Arc::new(Metrics::new(&cfg, &MetricsRegistry::new()));
let consensus =
make_consensus(&cfg, &self.consensus_uri, NO_COMMIT, Arc::clone(&metrics)).await?;
let blob = make_blob(&cfg, &self.blob_uri, NO_COMMIT, Arc::clone(&metrics)).await?;
Ok(StateVersions::new(cfg, consensus, blob, metrics))
}
}
pub(super) async fn make_consensus(
cfg: &PersistConfig,
consensus_uri: &SensitiveUrl,
commit: bool,
metrics: Arc<Metrics>,
) -> anyhow::Result<Arc<dyn Consensus>> {
let consensus = ConsensusConfig::try_from(
consensus_uri,
Box::new(cfg.clone()),
metrics.postgres_consensus.clone(),
)?;
let consensus = consensus.clone().open().await?;
let consensus = if commit {
consensus
} else {
Arc::new(ReadOnly::new(consensus))
};
let consensus = Arc::new(MetricsConsensus::new(consensus, Arc::clone(&metrics)));
let consensus = Arc::new(Tasked(consensus));
Ok(consensus)
}
pub(super) async fn make_blob(
cfg: &PersistConfig,
blob_uri: &SensitiveUrl,
commit: bool,
metrics: Arc<Metrics>,
) -> anyhow::Result<Arc<dyn Blob>> {
let blob = BlobConfig::try_from(
blob_uri,
Box::new(cfg.clone()),
metrics.s3_blob.clone(),
Arc::clone(&cfg.configs),
)
.await?;
let blob = blob.clone().open().await?;
let blob = if commit {
blob
} else {
Arc::new(ReadOnly::new(blob))
};
let blob = Arc::new(MetricsBlob::new(blob, Arc::clone(&metrics)));
let blob = Arc::new(Tasked(blob));
Ok(blob)
}
#[derive(Debug)]
struct ReadOnly<T> {
store: T,
ignored_write: AtomicBool,
}
impl<T> ReadOnly<T> {
fn new(store: T) -> Self {
Self {
store,
ignored_write: AtomicBool::new(false),
}
}
fn ignored_write(&self) -> bool {
self.ignored_write.load(Ordering::SeqCst)
}
fn ignoring_write(&self) {
self.ignored_write.store(true, Ordering::SeqCst)
}
}
#[async_trait]
impl Blob for ReadOnly<Arc<dyn Blob>> {
async fn get(&self, key: &str) -> Result<Option<SegmentedBytes>, ExternalError> {
if self.ignored_write() {
warn!("potentially-invalid get({key}) after ignored write");
}
self.store.get(key).await
}
async fn list_keys_and_metadata(
&self,
key_prefix: &str,
f: &mut (dyn FnMut(BlobMetadata) + Send + Sync),
) -> Result<(), ExternalError> {
if self.ignored_write() {
warn!("potentially-invalid list_keys_and_metadata() after ignored write");
}
self.store.list_keys_and_metadata(key_prefix, f).await
}
async fn set(&self, key: &str, _value: Bytes) -> Result<(), ExternalError> {
warn!("ignoring set({key}) in read-only mode");
self.ignoring_write();
Ok(())
}
async fn delete(&self, key: &str) -> Result<Option<usize>, ExternalError> {
warn!("ignoring delete({key}) in read-only mode");
self.ignoring_write();
Ok(None)
}
async fn restore(&self, key: &str) -> Result<(), ExternalError> {
warn!("ignoring restore({key}) in read-only mode");
self.ignoring_write();
Ok(())
}
}
#[async_trait]
impl Consensus for ReadOnly<Arc<dyn Consensus>> {
fn list_keys(&self) -> ResultStream<String> {
if self.ignored_write() {
warn!("potentially-invalid list_keys() after ignored write");
}
self.store.list_keys()
}
async fn head(&self, key: &str) -> Result<Option<VersionedData>, ExternalError> {
if self.ignored_write() {
warn!("potentially-invalid head({key}) after ignored write");
}
self.store.head(key).await
}
async fn compare_and_set(
&self,
key: &str,
expected: Option<SeqNo>,
new: VersionedData,
) -> Result<CaSResult, ExternalError> {
warn!(
"ignoring cas({key}) in read-only mode ({} bytes at seqno {expected:?})",
new.data.len(),
);
self.ignoring_write();
Ok(CaSResult::Committed)
}
async fn scan(
&self,
key: &str,
from: SeqNo,
limit: usize,
) -> Result<Vec<VersionedData>, ExternalError> {
if self.ignored_write() {
warn!("potentially-invalid scan({key}) after ignored write");
}
self.store.scan(key, from, limit).await
}
async fn truncate(&self, key: &str, seqno: SeqNo) -> Result<usize, ExternalError> {
warn!("ignoring truncate({key}) in read-only mode (to seqno {seqno})");
self.ignoring_write();
Ok(0)
}
}