use std::collections::{BTreeMap, BTreeSet};
use std::sync::Arc;
use chrono::{DateTime, Utc};
use differential_dataflow::lattice::Lattice;
use mz_ore::now::EpochMillis;
use mz_persist_types::Codec64;
use mz_repr::{Datum, Diff, GlobalId, Row, TimestampManipulation};
use timely::progress::Timestamp;
use crate::collection_mgmt::CollectionManager;
use crate::IntrospectionType;
pub use mz_storage_client::healthcheck::*;
#[derive(Clone)]
pub struct RawStatusUpdate {
pub id: GlobalId,
pub ts: DateTime<Utc>,
pub status_name: String,
pub error: Option<String>,
pub hints: BTreeSet<String>,
pub namespaced_errors: BTreeMap<String, String>,
}
pub fn pack_status_row(update: RawStatusUpdate) -> Row {
let timestamp = Datum::TimestampTz(update.ts.try_into().expect("must fit"));
let id = update.id.to_string();
let id = Datum::String(&id);
let status = Datum::String(&update.status_name);
let error = update.error.as_deref().into();
let mut row = Row::default();
let mut packer = row.packer();
packer.extend([timestamp, id, status, error]);
if !update.hints.is_empty() || !update.namespaced_errors.is_empty() {
packer.push_dict_with(|dict_packer| {
if !update.hints.is_empty() {
dict_packer.push(Datum::String("hints"));
dict_packer.push_list(update.hints.iter().map(|s| Datum::String(s)));
}
if !update.namespaced_errors.is_empty() {
dict_packer.push(Datum::String("namespaced"));
dict_packer.push_dict(
update
.namespaced_errors
.iter()
.map(|(k, v)| (k.as_str(), Datum::String(v))),
);
}
});
} else {
packer.push(Datum::Null);
}
row
}
#[derive(Debug, Clone)]
pub struct CollectionStatusManager<T>
where
T: Timestamp + Lattice + Codec64 + TimestampManipulation,
{
collection_manager: CollectionManager<T>,
introspection_ids: Arc<std::sync::Mutex<BTreeMap<IntrospectionType, GlobalId>>>,
previous_statuses: BTreeMap<GlobalId, String>,
}
impl<T> CollectionStatusManager<T>
where
T: Timestamp + Lattice + Codec64 + TimestampManipulation + From<EpochMillis>,
{
pub fn new(
collection_manager: CollectionManager<T>,
introspection_ids: Arc<std::sync::Mutex<BTreeMap<IntrospectionType, GlobalId>>>,
) -> Self {
Self {
collection_manager,
introspection_ids,
previous_statuses: Default::default(),
}
}
fn pack_status_updates(updates: Vec<RawStatusUpdate>) -> Vec<(Row, Diff)> {
updates
.into_iter()
.map(|update| (pack_status_row(update), 1))
.collect()
}
async fn append_updates(&mut self, updates: Vec<RawStatusUpdate>, type_: IntrospectionType) {
let source_status_history_id = *self
.introspection_ids
.lock()
.expect("poisoned lock")
.get(&type_)
.unwrap_or_else(|| panic!("{:?} status history collection to be registered", type_));
let new: Vec<_> = updates
.iter()
.filter(
|r| match (&r.status_name, self.previous_statuses.get(&r.id).as_deref()) {
(new, Some(prev)) if new == "paused" && prev == "stalled" => false,
(new, Some(prev)) if new == "paused" && prev == "paused" => false,
_ => true,
},
)
.cloned()
.collect();
self.collection_manager
.append_to_collection(
source_status_history_id,
Self::pack_status_updates(new.clone()),
)
.await;
self.previous_statuses
.extend(new.into_iter().map(|r| (r.id, r.status_name)));
}
pub async fn append_source_updates(&mut self, updates: Vec<RawStatusUpdate>) {
self.append_updates(updates, IntrospectionType::SourceStatusHistory)
.await;
}
pub async fn append_sink_updates(&mut self, updates: Vec<RawStatusUpdate>) {
self.append_updates(updates, IntrospectionType::SinkStatusHistory)
.await;
}
async fn drop_items(
&mut self,
ids: Vec<GlobalId>,
ts: DateTime<Utc>,
type_: IntrospectionType,
) {
self.append_updates(
ids.iter()
.map(|id| RawStatusUpdate {
id: *id,
ts,
status_name: "dropped".to_string(),
error: None,
hints: Default::default(),
namespaced_errors: Default::default(),
})
.collect(),
type_,
)
.await;
}
pub async fn drop_sources(&mut self, ids: Vec<GlobalId>, ts: DateTime<Utc>) {
self.drop_items(ids, ts, IntrospectionType::SourceStatusHistory)
.await;
}
pub async fn drop_sinks(&mut self, ids: Vec<GlobalId>, ts: DateTime<Utc>) {
self.drop_items(ids, ts, IntrospectionType::SinkStatusHistory)
.await;
}
}
#[cfg(test)]
mod tests {
use super::*;
#[mz_ore::test]
fn test_row() {
let error_message = "error message";
let hint = "hint message";
let id = GlobalId::User(1);
let status = "dropped";
let row = pack_status_row(RawStatusUpdate {
id,
ts: chrono::offset::Utc::now(),
status_name: status.to_string(),
error: Some(error_message.to_string()),
hints: BTreeSet::from([hint.to_string()]),
namespaced_errors: Default::default(),
});
for (datum, column_type) in row.iter().zip(MZ_SINK_STATUS_HISTORY_DESC.iter_types()) {
assert!(datum.is_instance_of(column_type));
}
for (datum, column_type) in row.iter().zip(MZ_SOURCE_STATUS_HISTORY_DESC.iter_types()) {
assert!(datum.is_instance_of(column_type));
}
assert_eq!(row.iter().nth(1).unwrap(), Datum::String(&id.to_string()));
assert_eq!(row.iter().nth(2).unwrap(), Datum::String(status));
assert_eq!(row.iter().nth(3).unwrap(), Datum::String(error_message));
let details = row
.iter()
.nth(4)
.unwrap()
.unwrap_map()
.iter()
.collect::<Vec<_>>();
assert_eq!(details.len(), 1);
let hint_datum = &details[0];
assert_eq!(hint_datum.0, "hints");
assert_eq!(
hint_datum.1.unwrap_list().iter().next().unwrap(),
Datum::String(hint)
);
}
#[mz_ore::test]
fn test_row_without_hint() {
let error_message = "error message";
let id = GlobalId::User(1);
let status = "dropped";
let row = pack_status_row(RawStatusUpdate {
id,
ts: chrono::offset::Utc::now(),
status_name: status.to_string(),
error: Some(error_message.to_string()),
hints: Default::default(),
namespaced_errors: Default::default(),
});
for (datum, column_type) in row.iter().zip(MZ_SINK_STATUS_HISTORY_DESC.iter_types()) {
assert!(datum.is_instance_of(column_type));
}
for (datum, column_type) in row.iter().zip(MZ_SOURCE_STATUS_HISTORY_DESC.iter_types()) {
assert!(datum.is_instance_of(column_type));
}
assert_eq!(row.iter().nth(1).unwrap(), Datum::String(&id.to_string()));
assert_eq!(row.iter().nth(2).unwrap(), Datum::String(status));
assert_eq!(row.iter().nth(3).unwrap(), Datum::String(error_message));
assert_eq!(row.iter().nth(4).unwrap(), Datum::Null);
}
#[mz_ore::test]
fn test_row_without_error() {
let id = GlobalId::User(1);
let status = "dropped";
let hint = "hint message";
let row = pack_status_row(RawStatusUpdate {
id,
ts: chrono::offset::Utc::now(),
status_name: status.to_string(),
error: None,
hints: BTreeSet::from([hint.to_string()]),
namespaced_errors: Default::default(),
});
for (datum, column_type) in row.iter().zip(MZ_SINK_STATUS_HISTORY_DESC.iter_types()) {
assert!(datum.is_instance_of(column_type));
}
for (datum, column_type) in row.iter().zip(MZ_SOURCE_STATUS_HISTORY_DESC.iter_types()) {
assert!(datum.is_instance_of(column_type));
}
assert_eq!(row.iter().nth(1).unwrap(), Datum::String(&id.to_string()));
assert_eq!(row.iter().nth(2).unwrap(), Datum::String(status));
assert_eq!(row.iter().nth(3).unwrap(), Datum::Null);
let details = row
.iter()
.nth(4)
.unwrap()
.unwrap_map()
.iter()
.collect::<Vec<_>>();
assert_eq!(details.len(), 1);
let hint_datum = &details[0];
assert_eq!(hint_datum.0, "hints");
assert_eq!(
hint_datum.1.unwrap_list().iter().next().unwrap(),
Datum::String(hint)
);
}
#[mz_ore::test]
fn test_row_with_namespaced() {
let error_message = "error message";
let id = GlobalId::User(1);
let status = "dropped";
let row = pack_status_row(RawStatusUpdate {
id,
ts: chrono::offset::Utc::now(),
status_name: status.to_string(),
error: Some(error_message.to_string()),
hints: Default::default(),
namespaced_errors: BTreeMap::from([("thing".to_string(), "error".to_string())]),
});
for (datum, column_type) in row.iter().zip(MZ_SINK_STATUS_HISTORY_DESC.iter_types()) {
assert!(datum.is_instance_of(column_type));
}
for (datum, column_type) in row.iter().zip(MZ_SOURCE_STATUS_HISTORY_DESC.iter_types()) {
assert!(datum.is_instance_of(column_type));
}
assert_eq!(row.iter().nth(1).unwrap(), Datum::String(&id.to_string()));
assert_eq!(row.iter().nth(2).unwrap(), Datum::String(status));
assert_eq!(row.iter().nth(3).unwrap(), Datum::String(error_message));
let details = row
.iter()
.nth(4)
.unwrap()
.unwrap_map()
.iter()
.collect::<Vec<_>>();
assert_eq!(details.len(), 1);
let ns_datum = &details[0];
assert_eq!(ns_datum.0, "namespaced");
assert_eq!(
ns_datum.1.unwrap_map().iter().next().unwrap(),
("thing", Datum::String("error"))
);
}
#[mz_ore::test]
fn test_row_with_everything() {
let error_message = "error message";
let hint = "hint message";
let id = GlobalId::User(1);
let status = "dropped";
let row = pack_status_row(RawStatusUpdate {
id,
ts: chrono::offset::Utc::now(),
status_name: status.to_string(),
error: Some(error_message.to_string()),
hints: BTreeSet::from([hint.to_string()]),
namespaced_errors: BTreeMap::from([("thing".to_string(), "error".to_string())]),
});
for (datum, column_type) in row.iter().zip(MZ_SINK_STATUS_HISTORY_DESC.iter_types()) {
assert!(datum.is_instance_of(column_type));
}
for (datum, column_type) in row.iter().zip(MZ_SOURCE_STATUS_HISTORY_DESC.iter_types()) {
assert!(datum.is_instance_of(column_type));
}
assert_eq!(row.iter().nth(1).unwrap(), Datum::String(&id.to_string()));
assert_eq!(row.iter().nth(2).unwrap(), Datum::String(status));
assert_eq!(row.iter().nth(3).unwrap(), Datum::String(error_message));
let details = row
.iter()
.nth(4)
.unwrap()
.unwrap_map()
.iter()
.collect::<Vec<_>>();
assert_eq!(details.len(), 2);
let hint_datum = &details[0];
let ns_datum = &details[1];
assert_eq!(hint_datum.0, "hints");
assert_eq!(
hint_datum.1.unwrap_list().iter().next().unwrap(),
Datum::String(hint)
);
assert_eq!(ns_datum.0, "namespaced");
assert_eq!(
ns_datum.1.unwrap_map().iter().next().unwrap(),
("thing", Datum::String("error"))
);
}
}