use std::cell::RefCell;
use std::collections::BTreeMap;
use std::rc::Rc;
use std::time::Duration;
use differential_dataflow::consolidation::ConsolidatingContainerBuilder;
use differential_dataflow::logging::{
BatchEvent, BatcherEvent, DifferentialEvent, DropEvent, MergeEvent, TraceShare,
};
use differential_dataflow::AsCollection;
use mz_ore::cast::CastFrom;
use mz_repr::{Datum, Diff, Timestamp};
use mz_timely_util::operator::consolidate_pact;
use mz_timely_util::replay::MzReplay;
use timely::communication::Allocate;
use timely::container::CapacityContainerBuilder;
use timely::dataflow::channels::pact::Pipeline;
use timely::dataflow::channels::pushers::buffer::Session;
use timely::dataflow::channels::pushers::{Counter, Tee};
use timely::dataflow::operators::generic::builder_rc::OperatorBuilder;
use timely::dataflow::operators::Filter;
use timely::dataflow::Stream;
use crate::extensions::arrange::MzArrange;
use crate::logging::compute::ComputeEvent;
use crate::logging::{
DifferentialLog, EventQueue, LogCollection, LogVariant, PermutedRowPacker, SharedLoggingState,
};
use crate::row_spine::{RowRowBatcher, RowRowBuilder};
use crate::typedefs::{KeyBatcher, RowRowSpine};
pub(super) fn construct<A: Allocate>(
worker: &mut timely::worker::Worker<A>,
config: &mz_compute_client::logging::LoggingConfig,
event_queue: EventQueue<Vec<(Duration, DifferentialEvent)>>,
shared_state: Rc<RefCell<SharedLoggingState>>,
) -> BTreeMap<LogVariant, LogCollection> {
let logging_interval_ms = std::cmp::max(1, config.interval.as_millis());
let worker_id = worker.index();
let dataflow_index = worker.next_dataflow_index();
worker.dataflow_named("Dataflow: differential logging", move |scope| {
let (mut logs, token) = Some(event_queue.link)
.mz_replay::<_, CapacityContainerBuilder<_>, _>(
scope,
"differential logs",
config.interval,
event_queue.activator,
|mut session, data| session.give_iterator(data.iter()),
);
if !config.enable_logging {
logs = logs.filter(|_| false);
}
let mut demux =
OperatorBuilder::new("Differential Logging Demux".to_string(), scope.clone());
let mut input = demux.new_input(&logs, Pipeline);
let (mut batches_out, batches) = demux.new_output();
let (mut records_out, records) = demux.new_output();
let (mut sharing_out, sharing) = demux.new_output();
let (mut batcher_records_out, batcher_records) = demux.new_output();
let (mut batcher_size_out, batcher_size) = demux.new_output();
let (mut batcher_capacity_out, batcher_capacity) = demux.new_output();
let (mut batcher_allocations_out, batcher_allocations) = demux.new_output();
let mut demux_state = Default::default();
demux.build(move |_capability| {
move |_frontiers| {
let mut batches = batches_out.activate();
let mut records = records_out.activate();
let mut sharing = sharing_out.activate();
let mut batcher_records = batcher_records_out.activate();
let mut batcher_size = batcher_size_out.activate();
let mut batcher_capacity = batcher_capacity_out.activate();
let mut batcher_allocations = batcher_allocations_out.activate();
input.for_each(|cap, data| {
let mut output_buffers = DemuxOutput {
batches: batches.session_with_builder(&cap),
records: records.session_with_builder(&cap),
sharing: sharing.session_with_builder(&cap),
batcher_records: batcher_records.session_with_builder(&cap),
batcher_size: batcher_size.session_with_builder(&cap),
batcher_capacity: batcher_capacity.session_with_builder(&cap),
batcher_allocations: batcher_allocations.session_with_builder(&cap),
};
for (time, event) in data.drain(..) {
DemuxHandler {
state: &mut demux_state,
output: &mut output_buffers,
logging_interval_ms,
time,
shared_state: &mut shared_state.borrow_mut(),
}
.handle(event);
}
});
}
});
let stream_to_collection = |input: Stream<_, ((usize, ()), Timestamp, Diff)>, log, name| {
let packer = PermutedRowPacker::new(log);
consolidate_pact::<KeyBatcher<_, _, _>, _, _, _, _>(
&input,
Pipeline,
&format!("Consolidate Differential {name}"),
)
.as_collection()
.map(move |(op, ())| {
packer.pack_slice(&[
Datum::UInt64(u64::cast_from(op)),
Datum::UInt64(u64::cast_from(worker_id)),
])
})
};
let arrangement_batches = stream_to_collection(batches, ArrangementBatches, "batches");
let arrangement_records = stream_to_collection(records, ArrangementRecords, "records");
let sharing = stream_to_collection(sharing, Sharing, "sharing");
let batcher_records =
stream_to_collection(batcher_records, BatcherRecords, "batcher records");
let batcher_size = stream_to_collection(batcher_size, BatcherSize, "batcher size");
let batcher_capacity =
stream_to_collection(batcher_capacity, BatcherCapacity, "batcher capacity");
let batcher_allocations = stream_to_collection(
batcher_allocations,
BatcherAllocations,
"batcher allocations",
);
use DifferentialLog::*;
let logs = [
(ArrangementBatches, arrangement_batches),
(ArrangementRecords, arrangement_records),
(Sharing, sharing),
(BatcherRecords, batcher_records),
(BatcherSize, batcher_size),
(BatcherCapacity, batcher_capacity),
(BatcherAllocations, batcher_allocations),
];
let mut result = BTreeMap::new();
for (variant, collection) in logs {
let variant = LogVariant::Differential(variant);
if config.index_logs.contains_key(&variant) {
let trace = collection
.mz_arrange::<RowRowBatcher<_, _>, RowRowBuilder<_, _>, RowRowSpine<_, _>>(
&format!("Arrange {variant:?}"),
)
.trace;
let collection = LogCollection {
trace,
token: Rc::clone(&token),
dataflow_index,
};
result.insert(variant, collection);
}
}
result
})
}
type Pusher<D> =
Counter<Timestamp, Vec<(D, Timestamp, Diff)>, Tee<Timestamp, Vec<(D, Timestamp, Diff)>>>;
type OutputSession<'a, D> =
Session<'a, Timestamp, ConsolidatingContainerBuilder<Vec<(D, Timestamp, Diff)>>, Pusher<D>>;
struct DemuxOutput<'a> {
batches: OutputSession<'a, (usize, ())>,
records: OutputSession<'a, (usize, ())>,
sharing: OutputSession<'a, (usize, ())>,
batcher_records: OutputSession<'a, (usize, ())>,
batcher_size: OutputSession<'a, (usize, ())>,
batcher_capacity: OutputSession<'a, (usize, ())>,
batcher_allocations: OutputSession<'a, (usize, ())>,
}
#[derive(Default)]
struct DemuxState {
sharing: BTreeMap<usize, usize>,
}
struct DemuxHandler<'a, 'b> {
state: &'a mut DemuxState,
output: &'a mut DemuxOutput<'b>,
logging_interval_ms: u128,
time: Duration,
shared_state: &'a mut SharedLoggingState,
}
impl DemuxHandler<'_, '_> {
fn ts(&self) -> Timestamp {
let time_ms = self.time.as_millis();
let interval = self.logging_interval_ms;
let rounded = (time_ms / interval + 1) * interval;
rounded.try_into().expect("must fit")
}
fn handle(&mut self, event: DifferentialEvent) {
use DifferentialEvent::*;
match event {
Batch(e) => self.handle_batch(e),
Merge(e) => self.handle_merge(e),
Drop(e) => self.handle_drop(e),
TraceShare(e) => self.handle_trace_share(e),
Batcher(e) => self.handle_batcher_event(e),
_ => (),
}
}
fn handle_batch(&mut self, event: BatchEvent) {
let ts = self.ts();
let op = event.operator;
self.output.batches.give(((op, ()), ts, 1));
let diff = Diff::try_from(event.length).expect("must fit");
self.output.records.give(((op, ()), ts, diff));
self.notify_arrangement_size(op);
}
fn handle_merge(&mut self, event: MergeEvent) {
let Some(done) = event.complete else { return };
let ts = self.ts();
let op = event.operator;
self.output.batches.give(((op, ()), ts, -1));
let diff = Diff::try_from(done).expect("must fit")
- Diff::try_from(event.length1 + event.length2).expect("must fit");
if diff != 0 {
self.output.records.give(((op, ()), ts, diff));
}
self.notify_arrangement_size(op);
}
fn handle_drop(&mut self, event: DropEvent) {
let ts = self.ts();
let op = event.operator;
self.output.batches.give(((op, ()), ts, -1));
let diff = -Diff::try_from(event.length).expect("must fit");
if diff != 0 {
self.output.records.give(((op, ()), ts, diff));
}
self.notify_arrangement_size(op);
}
fn handle_trace_share(&mut self, event: TraceShare) {
let ts = self.ts();
let op = event.operator;
let diff = Diff::cast_from(event.diff);
debug_assert_ne!(diff, 0);
self.output.sharing.give(((op, ()), ts, diff));
if let Some(logger) = &mut self.shared_state.compute_logger {
let sharing = self.state.sharing.entry(op).or_default();
*sharing = (i64::try_from(*sharing).expect("must fit") + diff)
.try_into()
.expect("under/overflow");
if *sharing == 0 {
self.state.sharing.remove(&op);
logger.log(ComputeEvent::ArrangementHeapSizeOperatorDrop { operator: op });
}
}
}
fn handle_batcher_event(&mut self, event: BatcherEvent) {
let ts = self.ts();
let op = event.operator;
let records_diff = Diff::cast_from(event.records_diff);
let size_diff = Diff::cast_from(event.size_diff);
let capacity_diff = Diff::cast_from(event.capacity_diff);
let allocations_diff = Diff::cast_from(event.allocations_diff);
self.output
.batcher_records
.give(((op, ()), ts, records_diff));
self.output.batcher_size.give(((op, ()), ts, size_diff));
self.output
.batcher_capacity
.give(((op, ()), ts, capacity_diff));
self.output
.batcher_allocations
.give(((op, ()), ts, allocations_diff));
}
fn notify_arrangement_size(&self, operator: usize) {
if let Some(activator) = self.shared_state.arrangement_size_activators.get(&operator) {
activator.activate();
}
}
}