use std::collections::BTreeMap;
use std::sync::Arc;
use differential_dataflow::lattice::Lattice;
use mz_ore::now::EpochMillis;
use mz_persist_types::Codec64;
use mz_repr::{GlobalId, Row, TimestampManipulation};
use mz_storage_client::client::{Status, StatusUpdate};
use timely::progress::Timestamp;
use crate::collection_mgmt::CollectionManager;
use crate::IntrospectionType;
pub use mz_storage_client::healthcheck::*;
#[derive(Debug, Clone)]
pub struct CollectionStatusManager<T>
where
T: Timestamp + Lattice + Codec64 + TimestampManipulation,
{
collection_manager: CollectionManager<T>,
introspection_ids: Arc<std::sync::Mutex<BTreeMap<IntrospectionType, GlobalId>>>,
previous_statuses: BTreeMap<GlobalId, Status>,
}
impl<T> CollectionStatusManager<T>
where
T: Timestamp + Lattice + Codec64 + TimestampManipulation + From<EpochMillis>,
{
pub fn new(
collection_manager: CollectionManager<T>,
introspection_ids: Arc<std::sync::Mutex<BTreeMap<IntrospectionType, GlobalId>>>,
) -> Self {
Self {
collection_manager,
introspection_ids,
previous_statuses: Default::default(),
}
}
pub fn extend_previous_statuses<I>(&mut self, previous_statuses: I)
where
I: IntoIterator<Item = (GlobalId, Status)>,
{
self.previous_statuses.extend(previous_statuses)
}
pub(super) fn append_updates(&mut self, updates: Vec<StatusUpdate>, type_: IntrospectionType) {
let source_status_history_id = *self
.introspection_ids
.lock()
.expect("poisoned lock")
.get(&type_)
.unwrap_or_else(|| panic!("{:?} status history collection to be registered", type_));
let new: Vec<_> = updates
.into_iter()
.filter(
|r| match (self.previous_statuses.get(&r.id).as_deref(), &r.status) {
(None, _) => true,
(Some(old), new) => old.superseded_by(*new),
},
)
.collect();
self.previous_statuses
.extend(new.iter().map(|r| (r.id, r.status)));
if !new.is_empty() {
self.collection_manager.blind_write(
source_status_history_id,
new.into_iter()
.map(|update| (Row::from(update), 1))
.collect(),
);
}
}
}
#[cfg(test)]
mod tests {
use std::collections::BTreeSet;
use mz_repr::Datum;
use super::*;
#[mz_ore::test]
fn test_row() {
let error_message = "error message";
let hint = "hint message";
let id = GlobalId::User(1);
let status = Status::Dropped;
let row = Row::from(StatusUpdate {
id,
timestamp: chrono::offset::Utc::now(),
status,
error: Some(error_message.to_string()),
hints: BTreeSet::from([hint.to_string()]),
namespaced_errors: Default::default(),
});
for (datum, column_type) in row.iter().zip(MZ_SINK_STATUS_HISTORY_DESC.iter_types()) {
assert!(datum.is_instance_of(column_type));
}
for (datum, column_type) in row.iter().zip(MZ_SOURCE_STATUS_HISTORY_DESC.iter_types()) {
assert!(datum.is_instance_of(column_type));
}
assert_eq!(row.iter().nth(1).unwrap(), Datum::String(&id.to_string()));
assert_eq!(row.iter().nth(2).unwrap(), Datum::String(status.to_str()));
assert_eq!(row.iter().nth(3).unwrap(), Datum::String(error_message));
let details = row
.iter()
.nth(4)
.unwrap()
.unwrap_map()
.iter()
.collect::<Vec<_>>();
assert_eq!(details.len(), 1);
let hint_datum = &details[0];
assert_eq!(hint_datum.0, "hints");
assert_eq!(
hint_datum.1.unwrap_list().iter().next().unwrap(),
Datum::String(hint)
);
}
#[mz_ore::test]
fn test_row_without_hint() {
let error_message = "error message";
let id = GlobalId::User(1);
let status = Status::Dropped;
let row = Row::from(StatusUpdate {
id,
timestamp: chrono::offset::Utc::now(),
status,
error: Some(error_message.to_string()),
hints: Default::default(),
namespaced_errors: Default::default(),
});
for (datum, column_type) in row.iter().zip(MZ_SINK_STATUS_HISTORY_DESC.iter_types()) {
assert!(datum.is_instance_of(column_type));
}
for (datum, column_type) in row.iter().zip(MZ_SOURCE_STATUS_HISTORY_DESC.iter_types()) {
assert!(datum.is_instance_of(column_type));
}
assert_eq!(row.iter().nth(1).unwrap(), Datum::String(&id.to_string()));
assert_eq!(row.iter().nth(2).unwrap(), Datum::String(status.to_str()));
assert_eq!(row.iter().nth(3).unwrap(), Datum::String(error_message));
assert_eq!(row.iter().nth(4).unwrap(), Datum::Null);
}
#[mz_ore::test]
fn test_row_without_error() {
let id = GlobalId::User(1);
let status = Status::Dropped;
let hint = "hint message";
let row = Row::from(StatusUpdate {
id,
timestamp: chrono::offset::Utc::now(),
status,
error: None,
hints: BTreeSet::from([hint.to_string()]),
namespaced_errors: Default::default(),
});
for (datum, column_type) in row.iter().zip(MZ_SINK_STATUS_HISTORY_DESC.iter_types()) {
assert!(datum.is_instance_of(column_type));
}
for (datum, column_type) in row.iter().zip(MZ_SOURCE_STATUS_HISTORY_DESC.iter_types()) {
assert!(datum.is_instance_of(column_type));
}
assert_eq!(row.iter().nth(1).unwrap(), Datum::String(&id.to_string()));
assert_eq!(row.iter().nth(2).unwrap(), Datum::String(status.to_str()));
assert_eq!(row.iter().nth(3).unwrap(), Datum::Null);
let details = row
.iter()
.nth(4)
.unwrap()
.unwrap_map()
.iter()
.collect::<Vec<_>>();
assert_eq!(details.len(), 1);
let hint_datum = &details[0];
assert_eq!(hint_datum.0, "hints");
assert_eq!(
hint_datum.1.unwrap_list().iter().next().unwrap(),
Datum::String(hint)
);
}
#[mz_ore::test]
fn test_row_with_namespaced() {
let error_message = "error message";
let id = GlobalId::User(1);
let status = Status::Dropped;
let row = Row::from(StatusUpdate {
id,
timestamp: chrono::offset::Utc::now(),
status,
error: Some(error_message.to_string()),
hints: Default::default(),
namespaced_errors: BTreeMap::from([("thing".to_string(), "error".to_string())]),
});
for (datum, column_type) in row.iter().zip(MZ_SINK_STATUS_HISTORY_DESC.iter_types()) {
assert!(datum.is_instance_of(column_type));
}
for (datum, column_type) in row.iter().zip(MZ_SOURCE_STATUS_HISTORY_DESC.iter_types()) {
assert!(datum.is_instance_of(column_type));
}
assert_eq!(row.iter().nth(1).unwrap(), Datum::String(&id.to_string()));
assert_eq!(row.iter().nth(2).unwrap(), Datum::String(status.to_str()));
assert_eq!(row.iter().nth(3).unwrap(), Datum::String(error_message));
let details = row
.iter()
.nth(4)
.unwrap()
.unwrap_map()
.iter()
.collect::<Vec<_>>();
assert_eq!(details.len(), 1);
let ns_datum = &details[0];
assert_eq!(ns_datum.0, "namespaced");
assert_eq!(
ns_datum.1.unwrap_map().iter().next().unwrap(),
("thing", Datum::String("error"))
);
}
#[mz_ore::test]
fn test_row_with_everything() {
let error_message = "error message";
let hint = "hint message";
let id = GlobalId::User(1);
let status = Status::Dropped;
let row = Row::from(StatusUpdate {
id,
timestamp: chrono::offset::Utc::now(),
status,
error: Some(error_message.to_string()),
hints: BTreeSet::from([hint.to_string()]),
namespaced_errors: BTreeMap::from([("thing".to_string(), "error".to_string())]),
});
for (datum, column_type) in row.iter().zip(MZ_SINK_STATUS_HISTORY_DESC.iter_types()) {
assert!(datum.is_instance_of(column_type));
}
for (datum, column_type) in row.iter().zip(MZ_SOURCE_STATUS_HISTORY_DESC.iter_types()) {
assert!(datum.is_instance_of(column_type));
}
assert_eq!(row.iter().nth(1).unwrap(), Datum::String(&id.to_string()));
assert_eq!(row.iter().nth(2).unwrap(), Datum::String(status.to_str()));
assert_eq!(row.iter().nth(3).unwrap(), Datum::String(error_message));
let details = row
.iter()
.nth(4)
.unwrap()
.unwrap_map()
.iter()
.collect::<Vec<_>>();
assert_eq!(details.len(), 2);
let hint_datum = &details[0];
let ns_datum = &details[1];
assert_eq!(hint_datum.0, "hints");
assert_eq!(
hint_datum.1.unwrap_list().iter().next().unwrap(),
Datum::String(hint)
);
assert_eq!(ns_datum.0, "namespaced");
assert_eq!(
ns_datum.1.unwrap_map().iter().next().unwrap(),
("thing", Datum::String("error"))
);
}
}