use std::cell::RefCell;
use std::collections::BTreeMap;
use std::rc::Rc;
use std::time::Duration;
use differential_dataflow::collection::AsCollection;
use differential_dataflow::logging::{
BatchEvent, BatcherEvent, DifferentialEvent, DropEvent, MergeEvent, TraceShare,
};
use mz_ore::cast::CastFrom;
use mz_repr::{Datum, Diff, Timestamp};
use mz_timely_util::buffer::ConsolidateBuffer;
use mz_timely_util::replay::MzReplay;
use timely::communication::Allocate;
use timely::dataflow::channels::pact::Pipeline;
use timely::dataflow::channels::pushers::Tee;
use timely::dataflow::operators::generic::builder_rc::OperatorBuilder;
use timely::dataflow::operators::{Filter, InputCapability};
use timely::dataflow::Stream;
use crate::extensions::arrange::MzArrange;
use crate::logging::compute::ComputeEvent;
use crate::logging::{
DifferentialLog, EventQueue, LogCollection, LogVariant, PermutedRowPacker, SharedLoggingState,
};
use crate::typedefs::{KeyValSpine, RowRowSpine};
pub(super) fn construct<A: Allocate>(
worker: &mut timely::worker::Worker<A>,
config: &mz_compute_client::logging::LoggingConfig,
event_queue: EventQueue<DifferentialEvent>,
shared_state: Rc<RefCell<SharedLoggingState>>,
) -> BTreeMap<LogVariant, LogCollection> {
let logging_interval_ms = std::cmp::max(1, config.interval.as_millis());
let worker_id = worker.index();
let dataflow_index = worker.next_dataflow_index();
worker.dataflow_named("Dataflow: differential logging", move |scope| {
let (mut logs, token) = Some(event_queue.link).mz_replay(
scope,
"differential logs",
config.interval,
event_queue.activator,
);
if !config.enable_logging {
logs = logs.filter(|_| false);
}
let mut demux =
OperatorBuilder::new("Differential Logging Demux".to_string(), scope.clone());
let mut input = demux.new_input(&logs, Pipeline);
let (mut batches_out, batches) = demux.new_output();
let (mut records_out, records) = demux.new_output();
let (mut sharing_out, sharing) = demux.new_output();
let (mut batcher_records_out, batcher_records) = demux.new_output();
let (mut batcher_size_out, batcher_size) = demux.new_output();
let (mut batcher_capacity_out, batcher_capacity) = demux.new_output();
let (mut batcher_allocations_out, batcher_allocations) = demux.new_output();
let mut demux_buffer = Vec::new();
let mut demux_state = Default::default();
demux.build(move |_capability| {
move |_frontiers| {
let mut batches = batches_out.activate();
let mut records = records_out.activate();
let mut sharing = sharing_out.activate();
let mut batcher_records = batcher_records_out.activate();
let mut batcher_size = batcher_size_out.activate();
let mut batcher_capacity = batcher_capacity_out.activate();
let mut batcher_allocations = batcher_allocations_out.activate();
let mut output_buffers = DemuxOutput {
batches: ConsolidateBuffer::new(&mut batches, 0),
records: ConsolidateBuffer::new(&mut records, 1),
sharing: ConsolidateBuffer::new(&mut sharing, 2),
batcher_records: ConsolidateBuffer::new(&mut batcher_records, 3),
batcher_size: ConsolidateBuffer::new(&mut batcher_size, 4),
batcher_capacity: ConsolidateBuffer::new(&mut batcher_capacity, 5),
batcher_allocations: ConsolidateBuffer::new(&mut batcher_allocations, 6),
};
input.for_each(|cap, data| {
data.swap(&mut demux_buffer);
for (time, logger_id, event) in demux_buffer.drain(..) {
assert_eq!(logger_id, worker_id);
DemuxHandler {
state: &mut demux_state,
output: &mut output_buffers,
logging_interval_ms,
time,
cap: &cap,
shared_state: &mut shared_state.borrow_mut(),
}
.handle(event);
}
});
}
});
let stream_to_collection = |input: Stream<_, ((usize, ()), Timestamp, Diff)>, log, name| {
let mut packer = PermutedRowPacker::new(log);
input
.as_collection()
.mz_arrange_core::<_, KeyValSpine<_, _, _, _>>(
Pipeline,
&format!("PreArrange Differential {name}"),
)
.as_collection(move |op, ()| {
packer.pack_slice(&[
Datum::UInt64(u64::cast_from(*op)),
Datum::UInt64(u64::cast_from(worker_id)),
])
})
};
let arrangement_batches = stream_to_collection(batches, ArrangementBatches, "batches");
let arrangement_records = stream_to_collection(records, ArrangementRecords, "records");
let sharing = stream_to_collection(sharing, Sharing, "sharing");
let batcher_records =
stream_to_collection(batcher_records, BatcherRecords, "batcher records");
let batcher_size = stream_to_collection(batcher_size, BatcherSize, "batcher size");
let batcher_capacity =
stream_to_collection(batcher_capacity, BatcherCapacity, "batcher capacity");
let batcher_allocations = stream_to_collection(
batcher_allocations,
BatcherAllocations,
"batcher allocations",
);
use DifferentialLog::*;
let logs = [
(ArrangementBatches, arrangement_batches),
(ArrangementRecords, arrangement_records),
(Sharing, sharing),
(BatcherRecords, batcher_records),
(BatcherSize, batcher_size),
(BatcherCapacity, batcher_capacity),
(BatcherAllocations, batcher_allocations),
];
let mut result = BTreeMap::new();
for (variant, collection) in logs {
let variant = LogVariant::Differential(variant);
if config.index_logs.contains_key(&variant) {
let trace = collection
.mz_arrange::<RowRowSpine<_, _>>(&format!("Arrange {variant:?}"))
.trace;
let collection = LogCollection {
trace,
token: Rc::clone(&token),
dataflow_index,
};
result.insert(variant, collection);
}
}
result
})
}
type Pusher<D> = Tee<Timestamp, Vec<(D, Timestamp, Diff)>>;
type OutputBuffer<'a, 'b, D> = ConsolidateBuffer<'a, 'b, Timestamp, D, Diff, Pusher<D>>;
struct DemuxOutput<'a, 'b> {
batches: OutputBuffer<'a, 'b, (usize, ())>,
records: OutputBuffer<'a, 'b, (usize, ())>,
sharing: OutputBuffer<'a, 'b, (usize, ())>,
batcher_records: OutputBuffer<'a, 'b, (usize, ())>,
batcher_size: OutputBuffer<'a, 'b, (usize, ())>,
batcher_capacity: OutputBuffer<'a, 'b, (usize, ())>,
batcher_allocations: OutputBuffer<'a, 'b, (usize, ())>,
}
#[derive(Default)]
struct DemuxState {
sharing: BTreeMap<usize, usize>,
}
struct DemuxHandler<'a, 'b, 'c> {
state: &'a mut DemuxState,
output: &'a mut DemuxOutput<'b, 'c>,
logging_interval_ms: u128,
time: Duration,
cap: &'a InputCapability<Timestamp>,
shared_state: &'a mut SharedLoggingState,
}
impl DemuxHandler<'_, '_, '_> {
fn ts(&self) -> Timestamp {
let time_ms = self.time.as_millis();
let interval = self.logging_interval_ms;
let rounded = (time_ms / interval + 1) * interval;
rounded.try_into().expect("must fit")
}
fn handle(&mut self, event: DifferentialEvent) {
use DifferentialEvent::*;
match event {
Batch(e) => self.handle_batch(e),
Merge(e) => self.handle_merge(e),
Drop(e) => self.handle_drop(e),
TraceShare(e) => self.handle_trace_share(e),
Batcher(e) => self.handle_batcher_event(e),
_ => (),
}
}
fn handle_batch(&mut self, event: BatchEvent) {
let ts = self.ts();
let op = event.operator;
self.output.batches.give(self.cap, ((op, ()), ts, 1));
let diff = Diff::try_from(event.length).expect("must fit");
self.output.records.give(self.cap, ((op, ()), ts, diff));
self.notify_arrangement_size(op);
}
fn handle_merge(&mut self, event: MergeEvent) {
let Some(done) = event.complete else { return };
let ts = self.ts();
let op = event.operator;
self.output.batches.give(self.cap, ((op, ()), ts, -1));
let diff = Diff::try_from(done).expect("must fit")
- Diff::try_from(event.length1 + event.length2).expect("must fit");
if diff != 0 {
self.output.records.give(self.cap, ((op, ()), ts, diff));
}
self.notify_arrangement_size(op);
}
fn handle_drop(&mut self, event: DropEvent) {
let ts = self.ts();
let op = event.operator;
self.output.batches.give(self.cap, ((op, ()), ts, -1));
let diff = -Diff::try_from(event.length).expect("must fit");
if diff != 0 {
self.output.records.give(self.cap, ((op, ()), ts, diff));
}
self.notify_arrangement_size(op);
}
fn handle_trace_share(&mut self, event: TraceShare) {
let ts = self.ts();
let op = event.operator;
let diff = Diff::cast_from(event.diff);
debug_assert_ne!(diff, 0);
self.output.sharing.give(self.cap, ((op, ()), ts, diff));
if let Some(logger) = &mut self.shared_state.compute_logger {
let sharing = self.state.sharing.entry(op).or_default();
*sharing = (i64::try_from(*sharing).expect("must fit") + diff)
.try_into()
.expect("under/overflow");
if *sharing == 0 {
self.state.sharing.remove(&op);
logger.log(ComputeEvent::ArrangementHeapSizeOperatorDrop { operator: op });
}
}
}
fn handle_batcher_event(&mut self, event: BatcherEvent) {
let ts = self.ts();
let op = event.operator;
let records_diff = Diff::cast_from(event.records_diff);
let size_diff = Diff::cast_from(event.size_diff);
let capacity_diff = Diff::cast_from(event.capacity_diff);
let allocations_diff = Diff::cast_from(event.allocations_diff);
self.output
.batcher_records
.give(self.cap, ((op, ()), ts, records_diff));
self.output
.batcher_size
.give(self.cap, ((op, ()), ts, size_diff));
self.output
.batcher_capacity
.give(self.cap, ((op, ()), ts, capacity_diff));
self.output
.batcher_allocations
.give(self.cap, ((op, ()), ts, allocations_diff));
}
fn notify_arrangement_size(&self, operator: usize) {
if let Some(activator) = self.shared_state.arrangement_size_activators.get(&operator) {
activator.activate();
}
}
}