use std::cell::RefCell;
use std::collections::BTreeMap;
use std::rc::Rc;
use std::time::{Duration, Instant};
use differential_dataflow::logging::DifferentialEvent;
use differential_dataflow::Collection;
use mz_compute_client::logging::{LogVariant, LoggingConfig};
use mz_repr::{Diff, Timestamp};
use mz_storage_types::errors::DataflowError;
use mz_timely_util::operator::CollectionExt;
use timely::communication::Allocate;
use timely::logging::{Logger, TimelyEvent};
use timely::progress::reachability::logging::TrackerEvent;
use crate::arrangement::manager::{SpecializedTraceHandle, TraceBundle};
use crate::extensions::arrange::{KeyCollection, MzArrange};
use crate::logging::compute::ComputeEvent;
use crate::logging::reachability::ReachabilityEvent;
use crate::logging::{BatchLogger, EventQueue, SharedLoggingState};
pub fn initialize<A: Allocate + 'static>(
worker: &mut timely::worker::Worker<A>,
config: &LoggingConfig,
) -> (
super::compute::Logger,
BTreeMap<LogVariant, (TraceBundle, usize)>,
) {
let interval_ms = std::cmp::max(1, config.interval.as_millis())
.try_into()
.expect("must fit");
let now = Instant::now();
let start_offset = std::time::SystemTime::now()
.duration_since(std::time::SystemTime::UNIX_EPOCH)
.expect("Failed to get duration since Unix epoch");
let mut context = LoggingContext {
worker,
config,
interval_ms,
now,
start_offset,
t_event_queue: EventQueue::new("t"),
r_event_queue: EventQueue::new("r"),
d_event_queue: EventQueue::new("d"),
c_event_queue: EventQueue::new("c"),
shared_state: Default::default(),
};
let traces = if config.log_logging {
context.register_loggers();
context.construct_dataflows()
} else {
let traces = context.construct_dataflows();
context.register_loggers();
traces
};
let logger = worker.log_register().get("materialize/compute").unwrap();
(logger, traces)
}
struct LoggingContext<'a, A: Allocate> {
worker: &'a mut timely::worker::Worker<A>,
config: &'a LoggingConfig,
interval_ms: u64,
now: Instant,
start_offset: Duration,
t_event_queue: EventQueue<TimelyEvent>,
r_event_queue: EventQueue<ReachabilityEvent>,
d_event_queue: EventQueue<DifferentialEvent>,
c_event_queue: EventQueue<ComputeEvent>,
shared_state: Rc<RefCell<SharedLoggingState>>,
}
impl<A: Allocate + 'static> LoggingContext<'_, A> {
fn construct_dataflows(&mut self) -> BTreeMap<LogVariant, (TraceBundle, usize)> {
let mut collections = BTreeMap::new();
collections.extend(super::timely::construct(
self.worker,
self.config,
self.t_event_queue.clone(),
Rc::clone(&self.shared_state),
));
collections.extend(super::reachability::construct(
self.worker,
self.config,
self.r_event_queue.clone(),
));
collections.extend(super::differential::construct(
self.worker,
self.config,
self.d_event_queue.clone(),
Rc::clone(&self.shared_state),
));
collections.extend(super::compute::construct(
self.worker,
self.config,
self.c_event_queue.clone(),
Rc::clone(&self.shared_state),
));
let errs = self
.worker
.dataflow_named("Dataflow: logging errors", |scope| {
let collection: KeyCollection<_, DataflowError, Diff> =
Collection::empty(scope).into();
collection.mz_arrange("Arrange logging err").trace
});
collections
.into_iter()
.map(|(log, collection)| {
let bundle = TraceBundle::new(
SpecializedTraceHandle::RowRow(collection.trace),
errs.clone(),
)
.with_drop(collection.token);
(log, (bundle, collection.dataflow_index))
})
.collect()
}
fn register_loggers(&self) {
let t_logger = self.simple_logger(self.t_event_queue.clone());
let r_logger = self.reachability_logger();
let d_logger = self.simple_logger(self.d_event_queue.clone());
let c_logger = self.simple_logger(self.c_event_queue.clone());
let mut register = self.worker.log_register();
register.insert_logger("timely", t_logger);
register.insert_logger("timely/reachability", r_logger);
register.insert_logger("differential/arrange", d_logger);
register.insert_logger("materialize/compute", c_logger.clone());
self.shared_state.borrow_mut().compute_logger = Some(c_logger);
}
fn simple_logger<E: 'static>(&self, event_queue: EventQueue<E>) -> Logger<E> {
let mut logger = BatchLogger::new(event_queue.link, self.interval_ms);
Logger::new(
self.now,
self.start_offset,
self.worker.index(),
move |time, data| {
logger.publish_batch(time, data);
event_queue.activator.activate();
},
)
}
fn reachability_logger(&self) -> Logger<TrackerEvent> {
let event_queue = self.r_event_queue.clone();
let mut logger = BatchLogger::new(event_queue.link, self.interval_ms);
Logger::new(
self.now,
self.start_offset,
self.worker.index(),
move |time, data| {
let mut converted_updates = Vec::new();
for event in data.drain(..) {
match event.2 {
TrackerEvent::SourceUpdate(update) => {
let massaged: Vec<_> = update
.updates
.iter()
.map(|(node, port, time, diff)| {
let ts = time.as_any().downcast_ref::<Timestamp>().copied();
let is_source = true;
(*node, *port, is_source, ts, *diff)
})
.collect();
converted_updates.push((
event.0,
event.1,
(update.tracker_id, massaged),
));
}
TrackerEvent::TargetUpdate(update) => {
let massaged: Vec<_> = update
.updates
.iter()
.map(|(node, port, time, diff)| {
let ts = time.as_any().downcast_ref::<Timestamp>().copied();
let is_source = false;
(*node, *port, is_source, ts, *diff)
})
.collect();
converted_updates.push((
event.0,
event.1,
(update.tracker_id, massaged),
));
}
}
}
logger.publish_batch(time, &mut converted_updates);
event_queue.activator.activate();
},
)
}
}