1use columnar::Columnar;
4use serde::{Deserialize, Serialize};
5
6pub type DifferentialEventBuilder = timely::container::CapacityContainerBuilder<Vec<(std::time::Duration, DifferentialEvent)>>;
8
9pub type Logger = ::timely::logging_core::TypedLogger<DifferentialEventBuilder, DifferentialEvent>;
11
12pub fn enable<A, W>(worker: &mut timely::worker::Worker<A>, writer: W) -> Option<Box<dyn std::any::Any+'static>>
14where
15 A: timely::communication::Allocate,
16 W: std::io::Write+'static,
17{
18 let writer = ::timely::dataflow::operators::capture::EventWriter::new(writer);
19 let mut logger = ::timely::logging::BatchLogger::new(writer);
20 worker
21 .log_register()
22 .insert::<DifferentialEventBuilder,_>("differential/arrange", move |time, data| logger.publish_batch(time, data))
23}
24
25#[derive(Debug, Clone, Ord, PartialOrd, Eq, PartialEq, Serialize, Deserialize, Columnar)]
27pub enum DifferentialEvent {
28 Batch(BatchEvent),
30 Merge(MergeEvent),
32 Drop(DropEvent),
34 MergeShortfall(MergeShortfall),
36 TraceShare(TraceShare),
38 Batcher(BatcherEvent),
40}
41
42#[derive(Debug, Clone, Ord, PartialOrd, Eq, PartialEq, Serialize, Deserialize, Columnar)]
44pub struct BatchEvent {
45 pub operator: usize,
47 pub length: usize,
49}
50
51impl From<BatchEvent> for DifferentialEvent { fn from(e: BatchEvent) -> Self { DifferentialEvent::Batch(e) } }
52
53
54#[derive(Debug, Clone, Ord, PartialOrd, Eq, PartialEq, Serialize, Deserialize, Columnar)]
56pub struct BatcherEvent {
57 pub operator: usize,
59 pub records_diff: isize,
61 pub size_diff: isize,
63 pub capacity_diff: isize,
65 pub allocations_diff: isize,
67}
68
69impl From<BatcherEvent> for DifferentialEvent { fn from(e: BatcherEvent) -> Self { DifferentialEvent::Batcher(e) } }
70
71#[derive(Debug, Clone, Ord, PartialOrd, Eq, PartialEq, Serialize, Deserialize, Columnar)]
73pub struct DropEvent {
74 pub operator: usize,
76 pub length: usize,
78}
79
80impl From<DropEvent> for DifferentialEvent { fn from(e: DropEvent) -> Self { DifferentialEvent::Drop(e) } }
81
82#[derive(Debug, Clone, Ord, PartialOrd, Eq, PartialEq, Serialize, Deserialize, Columnar)]
84pub struct MergeEvent {
85 pub operator: usize,
87 pub scale: usize,
89 pub length1: usize,
91 pub length2: usize,
93 pub complete: Option<usize>,
95}
96
97impl From<MergeEvent> for DifferentialEvent { fn from(e: MergeEvent) -> Self { DifferentialEvent::Merge(e) } }
98
99#[derive(Debug, Clone, Ord, PartialOrd, Eq, PartialEq, Serialize, Deserialize, Columnar)]
101pub struct MergeShortfall {
102 pub operator: usize,
104 pub scale: usize,
106 pub shortfall: usize,
108}
109
110impl From<MergeShortfall> for DifferentialEvent { fn from(e: MergeShortfall) -> Self { DifferentialEvent::MergeShortfall(e) } }
111
112#[derive(Debug, Clone, Ord, PartialOrd, Eq, PartialEq, Serialize, Deserialize, Columnar)]
114pub struct TraceShare {
115 pub operator: usize,
117 pub diff: isize,
119}
120
121impl From<TraceShare> for DifferentialEvent { fn from(e: TraceShare) -> Self { DifferentialEvent::TraceShare(e) } }