use serde::{Deserialize, Serialize};
pub type Logger = ::timely::logging::Logger<DifferentialEvent>;
pub fn enable<A, W>(worker: &mut timely::worker::Worker<A>, writer: W) -> Option<Box<dyn std::any::Any+'static>>
where
A: timely::communication::Allocate,
W: std::io::Write+'static,
{
let writer = ::timely::dataflow::operators::capture::EventWriter::new(writer);
let mut logger = ::timely::logging::BatchLogger::new(writer);
worker
.log_register()
.insert::<DifferentialEvent,_>("differential/arrange", move |time, data| logger.publish_batch(time, data))
}
#[derive(Debug, Clone, Ord, PartialOrd, Eq, PartialEq, Serialize, Deserialize)]
pub enum DifferentialEvent {
Batch(BatchEvent),
Merge(MergeEvent),
Drop(DropEvent),
MergeShortfall(MergeShortfall),
TraceShare(TraceShare),
Batcher(BatcherEvent),
}
#[derive(Debug, Clone, Ord, PartialOrd, Eq, PartialEq, Serialize, Deserialize)]
pub struct BatchEvent {
pub operator: usize,
pub length: usize,
}
impl From<BatchEvent> for DifferentialEvent { fn from(e: BatchEvent) -> Self { DifferentialEvent::Batch(e) } }
#[derive(Debug, Clone, Ord, PartialOrd, Eq, PartialEq, Serialize, Deserialize)]
pub struct BatcherEvent {
pub operator: usize,
pub records_diff: isize,
pub size_diff: isize,
pub capacity_diff: isize,
pub allocations_diff: isize,
}
impl From<BatcherEvent> for DifferentialEvent { fn from(e: BatcherEvent) -> Self { DifferentialEvent::Batcher(e) } }
#[derive(Debug, Clone, Ord, PartialOrd, Eq, PartialEq, Serialize, Deserialize)]
pub struct DropEvent {
pub operator: usize,
pub length: usize,
}
impl From<DropEvent> for DifferentialEvent { fn from(e: DropEvent) -> Self { DifferentialEvent::Drop(e) } }
#[derive(Debug, Clone, Ord, PartialOrd, Eq, PartialEq, Serialize, Deserialize)]
pub struct MergeEvent {
pub operator: usize,
pub scale: usize,
pub length1: usize,
pub length2: usize,
pub complete: Option<usize>,
}
impl From<MergeEvent> for DifferentialEvent { fn from(e: MergeEvent) -> Self { DifferentialEvent::Merge(e) } }
#[derive(Debug, Clone, Ord, PartialOrd, Eq, PartialEq, Serialize, Deserialize)]
pub struct MergeShortfall {
pub operator: usize,
pub scale: usize,
pub shortfall: usize,
}
impl From<MergeShortfall> for DifferentialEvent { fn from(e: MergeShortfall) -> Self { DifferentialEvent::MergeShortfall(e) } }
#[derive(Debug, Clone, Ord, PartialOrd, Eq, PartialEq, Serialize, Deserialize)]
pub struct TraceShare {
pub operator: usize,
pub diff: isize,
}
impl From<TraceShare> for DifferentialEvent { fn from(e: TraceShare) -> Self { DifferentialEvent::TraceShare(e) } }