use std::collections::BTreeMap;
use std::convert::TryInto;
use std::rc::Rc;
use std::time::Duration;
use columnar::Index;
use mz_compute_client::logging::LoggingConfig;
use mz_ore::cast::CastFrom;
use mz_repr::{Datum, Diff, Row, Timestamp};
use mz_timely_util::containers::{columnar_exchange, Col2ValBatcher, Column, ColumnBuilder};
use mz_timely_util::replay::MzReplay;
use timely::dataflow::channels::pact::ExchangeCore;
use timely::dataflow::Scope;
use timely::Container;
use crate::extensions::arrange::MzArrangeCore;
use crate::logging::initialize::ReachabilityEvent;
use crate::logging::{consolidate_and_pack, EventQueue, LogCollection, LogVariant, TimelyLog};
use crate::row_spine::RowRowBuilder;
use crate::typedefs::RowRowSpine;
pub(super) struct Return {
pub collections: BTreeMap<LogVariant, LogCollection>,
}
pub(super) fn construct<G: Scope<Timestamp = Timestamp>>(
mut scope: G,
config: &LoggingConfig,
event_queue: EventQueue<Column<(Duration, ReachabilityEvent)>, 3>,
) -> Return {
let collections = scope.scoped("timely reachability logging", move |scope| {
let enable_logging = config.enable_logging;
let interval_ms = std::cmp::max(1, config.interval.as_millis());
type UpdatesKey = (bool, usize, usize, usize, Timestamp);
type CB = ColumnBuilder<((UpdatesKey, ()), Timestamp, Diff)>;
let (updates, token) = event_queue.links.mz_replay::<_, CB, _>(
scope,
"reachability logs",
config.interval,
event_queue.activator,
move |mut session, data| {
if !enable_logging {
return;
}
for (time, (operator_id, massaged)) in data.iter() {
let time_ms = ((time.as_millis() / interval_ms) + 1) * interval_ms;
let time_ms: Timestamp = time_ms.try_into().expect("must fit");
for (source, port, update_type, ts, diff) in massaged.into_iter() {
let datum = (update_type, operator_id, source, port, ts);
session.give(((datum, ()), time_ms, diff));
}
}
},
);
let logs_active = [LogVariant::Timely(TimelyLog::Reachability)];
let worker_id = scope.index();
let updates = consolidate_and_pack::<_, Col2ValBatcher<UpdatesKey, _, _, _>, ColumnBuilder<_>, _, _>(
&updates,
TimelyLog::Reachability,
move |((datum, ()), time, diff), packer, session| {
let (update_type, operator_id, source, port, ts) = datum;
let update_type = if *update_type { "source" } else { "target" };
let data = packer.pack_slice(&[
Datum::UInt64(u64::cast_from(*operator_id)),
Datum::UInt64(u64::cast_from(worker_id)),
Datum::UInt64(u64::cast_from(*source)),
Datum::UInt64(u64::cast_from(*port)),
Datum::String(update_type),
Datum::from(*ts),
]);
session.give((data, time, diff));
}
);
let mut result = BTreeMap::new();
for variant in logs_active {
if config.index_logs.contains_key(&variant) {
let trace = updates
.mz_arrange_core::<_, Col2ValBatcher<_, _, _, _>, RowRowBuilder<_, _>, RowRowSpine<_, _>>(
ExchangeCore::<ColumnBuilder<_>, _>::new_core(columnar_exchange::<Row, Row, Timestamp, Diff>),
&format!("Arrange {variant:?}"),
)
.trace;
let collection = LogCollection {
trace,
token: Rc::clone(&token),
};
result.insert(variant, collection);
}
}
result
});
Return { collections }
}