use std::collections::BTreeMap;
use std::marker::PhantomData;
use std::sync::atomic::Ordering::SeqCst;
use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, AtomicUsize};
use std::sync::{Arc, RwLock};
use std::time::Duration;
use tracing::error;
use mz_proto::{ProtoType, RustType};
include!(concat!(env!("OUT_DIR"), "/mz_dyncfg.rs"));
#[derive(Clone, Debug)]
pub struct Config<D: ConfigDefault> {
name: &'static str,
desc: &'static str,
default: D,
}
impl<D: ConfigDefault> Config<D> {
pub const fn new(name: &'static str, default: D, desc: &'static str) -> Self {
Config {
name,
default,
desc,
}
}
pub fn name(&self) -> &str {
self.name
}
pub fn desc(&self) -> &str {
self.desc
}
pub fn default(&self) -> &D {
&self.default
}
pub fn get(&self, set: &ConfigSet) -> D::ConfigType {
D::ConfigType::from_val(self.shared(set).load())
}
pub fn handle(&self, set: &ConfigSet) -> ConfigValHandle<D::ConfigType> {
ConfigValHandle {
val: self.shared(set).clone(),
_type: PhantomData,
}
}
fn shared<'a>(&self, set: &'a ConfigSet) -> &'a ConfigValAtomic {
&set.configs
.get(self.name)
.unwrap_or_else(|| panic!("config {} should be registered to set", self.name))
.val
}
pub fn parse_val(&self, val: &str) -> Result<ConfigVal, String> {
let val = D::ConfigType::parse(val)?;
let val = Into::<ConfigVal>::into(val);
Ok(val)
}
}
pub trait ConfigType: Into<ConfigVal> + Clone + Sized {
fn from_val(val: ConfigVal) -> Self;
fn parse(s: &str) -> Result<Self, String>;
}
pub trait ConfigDefault: Clone {
type ConfigType: ConfigType;
fn into_config_type(self) -> Self::ConfigType;
}
impl<T: ConfigType> ConfigDefault for T {
type ConfigType = T;
fn into_config_type(self) -> T {
self
}
}
impl<T: ConfigType> ConfigDefault for fn() -> T {
type ConfigType = T;
fn into_config_type(self) -> T {
(self)()
}
}
#[derive(Clone, Default)]
pub struct ConfigSet {
configs: BTreeMap<String, ConfigEntry>,
}
impl ConfigSet {
pub fn add<D: ConfigDefault>(mut self, config: &Config<D>) -> Self {
let default = config.default.clone().into_config_type();
let default = Into::<ConfigVal>::into(default);
let config = ConfigEntry {
name: config.name,
desc: config.desc,
default: default.clone(),
val: ConfigValAtomic::from(default),
};
if let Some(prev) = self.configs.insert(config.name.to_owned(), config) {
panic!("{} registered twice", prev.name);
}
self
}
pub fn entries(&self) -> impl Iterator<Item = &ConfigEntry> {
self.configs.values()
}
pub fn entry(&self, name: &str) -> Option<&ConfigEntry> {
self.configs.get(name)
}
}
#[derive(Clone, Debug)]
pub struct ConfigEntry {
name: &'static str,
desc: &'static str,
default: ConfigVal,
val: ConfigValAtomic,
}
impl ConfigEntry {
pub fn name(&self) -> &'static str {
self.name
}
pub fn desc(&self) -> &'static str {
self.desc
}
pub fn default(&self) -> &ConfigVal {
&self.default
}
pub fn val(&self) -> ConfigVal {
self.val.load()
}
}
#[derive(Debug, Clone)]
pub struct ConfigValHandle<T> {
val: ConfigValAtomic,
_type: PhantomData<T>,
}
impl<T: ConfigType> ConfigValHandle<T> {
pub fn get(&self) -> T {
T::from_val(self.val.load())
}
}
#[derive(Clone, Debug, PartialEq)]
pub enum ConfigVal {
Bool(bool),
U32(u32),
Usize(usize),
OptUsize(Option<usize>),
F64(f64),
String(String),
Duration(Duration),
Json(serde_json::Value),
}
#[derive(Clone, Debug)]
enum ConfigValAtomic {
Bool(Arc<AtomicBool>),
U32(Arc<AtomicU32>),
Usize(Arc<AtomicUsize>),
OptUsize(Arc<RwLock<Option<usize>>>),
F64(Arc<AtomicU64>),
String(Arc<RwLock<String>>),
Duration(Arc<RwLock<Duration>>),
Json(Arc<RwLock<serde_json::Value>>),
}
impl From<ConfigVal> for ConfigValAtomic {
fn from(val: ConfigVal) -> ConfigValAtomic {
match val {
ConfigVal::Bool(x) => ConfigValAtomic::Bool(Arc::new(AtomicBool::new(x))),
ConfigVal::U32(x) => ConfigValAtomic::U32(Arc::new(AtomicU32::new(x))),
ConfigVal::Usize(x) => ConfigValAtomic::Usize(Arc::new(AtomicUsize::new(x))),
ConfigVal::OptUsize(x) => ConfigValAtomic::OptUsize(Arc::new(RwLock::new(x))),
ConfigVal::F64(x) => ConfigValAtomic::F64(Arc::new(AtomicU64::new(x.to_bits()))),
ConfigVal::String(x) => ConfigValAtomic::String(Arc::new(RwLock::new(x))),
ConfigVal::Duration(x) => ConfigValAtomic::Duration(Arc::new(RwLock::new(x))),
ConfigVal::Json(x) => ConfigValAtomic::Json(Arc::new(RwLock::new(x))),
}
}
}
impl ConfigValAtomic {
fn load(&self) -> ConfigVal {
match self {
ConfigValAtomic::Bool(x) => ConfigVal::Bool(x.load(SeqCst)),
ConfigValAtomic::U32(x) => ConfigVal::U32(x.load(SeqCst)),
ConfigValAtomic::Usize(x) => ConfigVal::Usize(x.load(SeqCst)),
ConfigValAtomic::OptUsize(x) => ConfigVal::OptUsize(*x.read().expect("lock poisoned")),
ConfigValAtomic::F64(x) => ConfigVal::F64(f64::from_bits(x.load(SeqCst))),
ConfigValAtomic::String(x) => {
ConfigVal::String(x.read().expect("lock poisoned").clone())
}
ConfigValAtomic::Duration(x) => ConfigVal::Duration(*x.read().expect("lock poisoned")),
ConfigValAtomic::Json(x) => ConfigVal::Json(x.read().expect("lock poisoned").clone()),
}
}
fn store(&self, val: ConfigVal) {
match (self, val) {
(ConfigValAtomic::Bool(x), ConfigVal::Bool(val)) => x.store(val, SeqCst),
(ConfigValAtomic::U32(x), ConfigVal::U32(val)) => x.store(val, SeqCst),
(ConfigValAtomic::Usize(x), ConfigVal::Usize(val)) => x.store(val, SeqCst),
(ConfigValAtomic::OptUsize(x), ConfigVal::OptUsize(val)) => {
*x.write().expect("lock poisoned") = val
}
(ConfigValAtomic::F64(x), ConfigVal::F64(val)) => x.store(val.to_bits(), SeqCst),
(ConfigValAtomic::String(x), ConfigVal::String(val)) => {
*x.write().expect("lock poisoned") = val
}
(ConfigValAtomic::Duration(x), ConfigVal::Duration(val)) => {
*x.write().expect("lock poisoned") = val
}
(ConfigValAtomic::Json(x), ConfigVal::Json(val)) => {
*x.write().expect("lock poisoned") = val
}
(ConfigValAtomic::Bool(_), val)
| (ConfigValAtomic::U32(_), val)
| (ConfigValAtomic::Usize(_), val)
| (ConfigValAtomic::OptUsize(_), val)
| (ConfigValAtomic::F64(_), val)
| (ConfigValAtomic::String(_), val)
| (ConfigValAtomic::Duration(_), val)
| (ConfigValAtomic::Json(_), val) => {
panic!("attempted to store {val:?} value in {self:?} parameter")
}
}
}
}
impl ConfigUpdates {
pub fn add<T, U>(&mut self, config: &Config<T>, val: U)
where
T: ConfigDefault,
U: ConfigDefault<ConfigType = T::ConfigType>,
{
self.add_dynamic(config.name, val.into_config_type().into());
}
pub fn add_dynamic(&mut self, name: &str, val: ConfigVal) {
self.updates.insert(
name.to_owned(),
ProtoConfigVal {
val: val.into_proto(),
},
);
}
pub fn extend(&mut self, mut other: Self) {
self.updates.append(&mut other.updates)
}
pub fn apply(&self, set: &ConfigSet) {
for (name, ProtoConfigVal { val }) in self.updates.iter() {
let Some(config) = set.configs.get(name) else {
error!("config update {} {:?} not known set: {:?}", name, val, set);
continue;
};
let val = match (val.clone()).into_rust() {
Ok(x) => x,
Err(err) => {
error!("config update {} decode error: {}", name, err);
continue;
}
};
config.val.store(val);
}
}
}
mod impls {
use std::num::{ParseFloatError, ParseIntError};
use std::str::ParseBoolError;
use std::time::Duration;
use mz_ore::cast::CastFrom;
use mz_proto::{ProtoType, RustType, TryFromProtoError};
use crate::{
proto_config_val, ConfigDefault, ConfigSet, ConfigType, ConfigVal, ProtoOptionU64,
};
impl ConfigType for bool {
fn from_val(val: ConfigVal) -> Self {
match val {
ConfigVal::Bool(x) => x,
x => panic!("expected bool value got {:?}", x),
}
}
fn parse(s: &str) -> Result<Self, String> {
s.parse().map_err(|e: ParseBoolError| e.to_string())
}
}
impl From<bool> for ConfigVal {
fn from(val: bool) -> ConfigVal {
ConfigVal::Bool(val)
}
}
impl ConfigType for u32 {
fn from_val(val: ConfigVal) -> Self {
match val {
ConfigVal::U32(x) => x,
x => panic!("expected u32 value got {:?}", x),
}
}
fn parse(s: &str) -> Result<Self, String> {
s.parse().map_err(|e: ParseIntError| e.to_string())
}
}
impl From<u32> for ConfigVal {
fn from(val: u32) -> ConfigVal {
ConfigVal::U32(val)
}
}
impl ConfigType for usize {
fn from_val(val: ConfigVal) -> Self {
match val {
ConfigVal::Usize(x) => x,
x => panic!("expected usize value got {:?}", x),
}
}
fn parse(s: &str) -> Result<Self, String> {
s.parse().map_err(|e: ParseIntError| e.to_string())
}
}
impl From<usize> for ConfigVal {
fn from(val: usize) -> ConfigVal {
ConfigVal::Usize(val)
}
}
impl ConfigType for Option<usize> {
fn from_val(val: ConfigVal) -> Self {
match val {
ConfigVal::OptUsize(x) => x,
x => panic!("expected usize value got {:?}", x),
}
}
fn parse(s: &str) -> Result<Self, String> {
if s.is_empty() {
Ok(None)
} else {
let val = s.parse().map_err(|e: ParseIntError| e.to_string())?;
Ok(Some(val))
}
}
}
impl From<Option<usize>> for ConfigVal {
fn from(val: Option<usize>) -> ConfigVal {
ConfigVal::OptUsize(val)
}
}
impl ConfigType for f64 {
fn from_val(val: ConfigVal) -> Self {
match val {
ConfigVal::F64(x) => x,
x => panic!("expected f64 value got {:?}", x),
}
}
fn parse(s: &str) -> Result<Self, String> {
s.parse().map_err(|e: ParseFloatError| e.to_string())
}
}
impl From<f64> for ConfigVal {
fn from(val: f64) -> ConfigVal {
ConfigVal::F64(val)
}
}
impl ConfigType for String {
fn from_val(val: ConfigVal) -> Self {
match val {
ConfigVal::String(x) => x,
x => panic!("expected String value got {:?}", x),
}
}
fn parse(s: &str) -> Result<Self, String> {
Ok(s.to_string())
}
}
impl From<String> for ConfigVal {
fn from(val: String) -> ConfigVal {
ConfigVal::String(val)
}
}
impl ConfigDefault for &str {
type ConfigType = String;
fn into_config_type(self) -> String {
self.into()
}
}
impl ConfigType for Duration {
fn from_val(val: ConfigVal) -> Self {
match val {
ConfigVal::Duration(x) => x,
x => panic!("expected Duration value got {:?}", x),
}
}
fn parse(s: &str) -> Result<Self, String> {
humantime::parse_duration(s).map_err(|e| e.to_string())
}
}
impl From<Duration> for ConfigVal {
fn from(val: Duration) -> ConfigVal {
ConfigVal::Duration(val)
}
}
impl ConfigType for serde_json::Value {
fn from_val(val: ConfigVal) -> Self {
match val {
ConfigVal::Json(x) => x,
x => panic!("expected JSON value got {:?}", x),
}
}
fn parse(s: &str) -> Result<Self, String> {
serde_json::from_str(s).map_err(|e| e.to_string())
}
}
impl From<serde_json::Value> for ConfigVal {
fn from(val: serde_json::Value) -> ConfigVal {
ConfigVal::Json(val)
}
}
impl RustType<Option<proto_config_val::Val>> for ConfigVal {
fn into_proto(&self) -> Option<proto_config_val::Val> {
use crate::proto_config_val::Val;
let val = match self {
ConfigVal::Bool(x) => Val::Bool(*x),
ConfigVal::U32(x) => Val::U32(*x),
ConfigVal::Usize(x) => Val::Usize(u64::cast_from(*x)),
ConfigVal::OptUsize(x) => Val::OptUsize(ProtoOptionU64 {
val: x.map(u64::cast_from),
}),
ConfigVal::F64(x) => Val::F64(*x),
ConfigVal::String(x) => Val::String(x.into_proto()),
ConfigVal::Duration(x) => Val::Duration(x.into_proto()),
ConfigVal::Json(x) => Val::Json(x.to_string()),
};
Some(val)
}
fn from_proto(proto: Option<proto_config_val::Val>) -> Result<Self, TryFromProtoError> {
let val = match proto {
Some(proto_config_val::Val::Bool(x)) => ConfigVal::Bool(x),
Some(proto_config_val::Val::U32(x)) => ConfigVal::U32(x),
Some(proto_config_val::Val::Usize(x)) => ConfigVal::Usize(usize::cast_from(x)),
Some(proto_config_val::Val::OptUsize(ProtoOptionU64 { val })) => {
ConfigVal::OptUsize(val.map(usize::cast_from))
}
Some(proto_config_val::Val::F64(x)) => ConfigVal::F64(x),
Some(proto_config_val::Val::String(x)) => ConfigVal::String(x),
Some(proto_config_val::Val::Duration(x)) => ConfigVal::Duration(x.into_rust()?),
Some(proto_config_val::Val::Json(x)) => ConfigVal::Json(serde_json::from_str(&x)?),
None => {
return Err(TryFromProtoError::unknown_enum_variant(
"ProtoConfigVal::Val",
))
}
};
Ok(val)
}
}
impl std::fmt::Debug for ConfigSet {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let ConfigSet { configs } = self;
f.debug_map()
.entries(configs.iter().map(|(name, val)| (name, val.val())))
.finish()
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use mz_ore::assert_err;
const BOOL: Config<bool> = Config::new("bool", true, "");
const U32: Config<u32> = Config::new("u32", 4, "");
const USIZE: Config<usize> = Config::new("usize", 1, "");
const OPT_USIZE: Config<Option<usize>> = Config::new("opt_usize", Some(2), "");
const F64: Config<f64> = Config::new("f64", 5.0, "");
const STRING: Config<&str> = Config::new("string", "a", "");
const DURATION: Config<Duration> = Config::new("duration", Duration::from_nanos(3), "");
const JSON: Config<fn() -> serde_json::Value> =
Config::new("json", || serde_json::json!({}), "");
#[mz_ore::test]
fn all_types() {
let configs = ConfigSet::default()
.add(&BOOL)
.add(&USIZE)
.add(&U32)
.add(&OPT_USIZE)
.add(&F64)
.add(&STRING)
.add(&DURATION)
.add(&JSON);
assert_eq!(BOOL.get(&configs), true);
assert_eq!(U32.get(&configs), 4);
assert_eq!(USIZE.get(&configs), 1);
assert_eq!(OPT_USIZE.get(&configs), Some(2));
assert_eq!(F64.get(&configs), 5.0);
assert_eq!(STRING.get(&configs), "a");
assert_eq!(DURATION.get(&configs), Duration::from_nanos(3));
assert_eq!(JSON.get(&configs), serde_json::json!({}));
let mut updates = ConfigUpdates::default();
updates.add(&BOOL, false);
updates.add(&U32, 7);
updates.add(&USIZE, 2);
updates.add(&OPT_USIZE, None);
updates.add(&F64, 8.0);
updates.add(&STRING, "b");
updates.add(&DURATION, Duration::from_nanos(4));
updates.add(&JSON, serde_json::json!({"a": 1}));
updates.apply(&configs);
assert_eq!(BOOL.get(&configs), false);
assert_eq!(U32.get(&configs), 7);
assert_eq!(USIZE.get(&configs), 2);
assert_eq!(OPT_USIZE.get(&configs), None);
assert_eq!(F64.get(&configs), 8.0);
assert_eq!(STRING.get(&configs), "b");
assert_eq!(DURATION.get(&configs), Duration::from_nanos(4));
assert_eq!(JSON.get(&configs), serde_json::json!({"a": 1}));
}
#[mz_ore::test]
fn fn_default() {
const BOOL_FN_DEFAULT: Config<fn() -> bool> = Config::new("bool", || !true, "");
const STRING_FN_DEFAULT: Config<fn() -> String> =
Config::new("string", || "x".repeat(3), "");
let configs = ConfigSet::default()
.add(&BOOL_FN_DEFAULT)
.add(&STRING_FN_DEFAULT);
assert_eq!(BOOL_FN_DEFAULT.get(&configs), false);
assert_eq!(STRING_FN_DEFAULT.get(&configs), "xxx");
}
#[mz_ore::test]
fn config_set() {
let c0 = ConfigSet::default().add(&USIZE);
assert_eq!(USIZE.get(&c0), 1);
let mut updates = ConfigUpdates::default();
updates.add(&USIZE, 2);
updates.apply(&c0);
assert_eq!(USIZE.get(&c0), 2);
let c1 = ConfigSet::default().add(&USIZE);
assert_eq!(USIZE.get(&c1), 1);
let mut updates = ConfigUpdates::default();
updates.add(&USIZE, 3);
updates.apply(&c1);
assert_eq!(USIZE.get(&c1), 3);
assert_eq!(USIZE.get(&c0), 2);
let mut updates = ConfigUpdates::default();
for e in c0.entries() {
updates.add_dynamic(e.name, e.val());
}
assert_eq!(USIZE.get(&c1), 3);
updates.apply(&c1);
assert_eq!(USIZE.get(&c1), 2);
}
#[mz_ore::test]
fn config_updates_extend() {
let mut u1 = {
let c = ConfigSet::default().add(&USIZE).add(&STRING);
let mut x = ConfigUpdates::default();
for e in c.entries() {
x.add_dynamic(e.name(), e.val());
}
x
};
let u2 = {
let c = ConfigSet::default().add(&USIZE).add(&DURATION);
let mut updates = ConfigUpdates::default();
updates.add(&USIZE, 2);
updates.apply(&c);
let mut x = ConfigUpdates::default();
for e in c.entries() {
x.add_dynamic(e.name(), e.val());
}
x
};
assert_eq!(u1.updates.len(), 2);
assert_eq!(u2.updates.len(), 2);
u1.extend(u2);
assert_eq!(u1.updates.len(), 3);
let c = ConfigSet::default().add(&USIZE);
u1.apply(&c);
assert_eq!(USIZE.get(&c), 2);
}
#[mz_ore::test]
fn config_parse() {
assert_eq!(BOOL.parse_val("true"), Ok(ConfigVal::Bool(true)));
assert_eq!(BOOL.parse_val("false"), Ok(ConfigVal::Bool(false)));
assert_err!(BOOL.parse_val("42"));
assert_err!(BOOL.parse_val("66.6"));
assert_err!(BOOL.parse_val("farragut"));
assert_err!(BOOL.parse_val(""));
assert_err!(BOOL.parse_val("5 s"));
assert_err!(U32.parse_val("true"));
assert_err!(U32.parse_val("false"));
assert_eq!(U32.parse_val("42"), Ok(ConfigVal::U32(42)));
assert_err!(U32.parse_val("66.6"));
assert_err!(U32.parse_val("farragut"));
assert_err!(U32.parse_val(""));
assert_err!(U32.parse_val("5 s"));
assert_err!(USIZE.parse_val("true"));
assert_err!(USIZE.parse_val("false"));
assert_eq!(USIZE.parse_val("42"), Ok(ConfigVal::Usize(42)));
assert_err!(USIZE.parse_val("66.6"));
assert_err!(USIZE.parse_val("farragut"));
assert_err!(USIZE.parse_val(""));
assert_err!(USIZE.parse_val("5 s"));
assert_err!(OPT_USIZE.parse_val("true"));
assert_err!(OPT_USIZE.parse_val("false"));
assert_eq!(OPT_USIZE.parse_val("42"), Ok(ConfigVal::OptUsize(Some(42))));
assert_err!(OPT_USIZE.parse_val("66.6"));
assert_err!(OPT_USIZE.parse_val("farragut"));
assert_eq!(OPT_USIZE.parse_val(""), Ok(ConfigVal::OptUsize(None)));
assert_err!(OPT_USIZE.parse_val("5 s"));
assert_err!(F64.parse_val("true"));
assert_err!(F64.parse_val("false"));
assert_eq!(F64.parse_val("42"), Ok(ConfigVal::F64(42.0)));
assert_eq!(F64.parse_val("66.6"), Ok(ConfigVal::F64(66.6)));
assert_err!(F64.parse_val("farragut"));
assert_err!(F64.parse_val(""));
assert_err!(F64.parse_val("5 s"));
assert_eq!(
STRING.parse_val("true"),
Ok(ConfigVal::String("true".to_string()))
);
assert_eq!(
STRING.parse_val("false"),
Ok(ConfigVal::String("false".to_string()))
);
assert_eq!(
STRING.parse_val("66.6"),
Ok(ConfigVal::String("66.6".to_string()))
);
assert_eq!(
STRING.parse_val("42"),
Ok(ConfigVal::String("42".to_string()))
);
assert_eq!(
STRING.parse_val("farragut"),
Ok(ConfigVal::String("farragut".to_string()))
);
assert_eq!(STRING.parse_val(""), Ok(ConfigVal::String("".to_string())));
assert_eq!(
STRING.parse_val("5 s"),
Ok(ConfigVal::String("5 s".to_string()))
);
assert_err!(DURATION.parse_val("true"));
assert_err!(DURATION.parse_val("false"));
assert_err!(DURATION.parse_val("42"));
assert_err!(DURATION.parse_val("66.6"));
assert_err!(DURATION.parse_val("farragut"));
assert_err!(DURATION.parse_val(""));
assert_eq!(
DURATION.parse_val("5 s"),
Ok(ConfigVal::Duration(Duration::from_secs(5)))
);
assert_eq!(
JSON.parse_val("true"),
Ok(ConfigVal::Json(serde_json::json!(true)))
);
assert_eq!(
JSON.parse_val("false"),
Ok(ConfigVal::Json(serde_json::json!(false)))
);
assert_eq!(
JSON.parse_val("42"),
Ok(ConfigVal::Json(serde_json::json!(42)))
);
assert_eq!(
JSON.parse_val("66.6"),
Ok(ConfigVal::Json(serde_json::json!(66.6)))
);
assert_err!(JSON.parse_val("farragut"));
assert_err!(JSON.parse_val(""));
assert_err!(JSON.parse_val("5 s"));
assert_eq!(
JSON.parse_val("{\"joe\": \"developer\"}"),
Ok(ConfigVal::Json(serde_json::json!({"joe": "developer"})))
);
}
}