differential_dataflow/
logging.rs

1//! Loggers and logging events for differential dataflow.
2
3use columnar::Columnar;
4use serde::{Deserialize, Serialize};
5
6/// Container builder for differential log events.
7pub type DifferentialEventBuilder = timely::container::CapacityContainerBuilder<Vec<(std::time::Duration, DifferentialEvent)>>;
8
9/// Logger for differential dataflow events.
10pub type Logger = ::timely::logging_core::TypedLogger<DifferentialEventBuilder, DifferentialEvent>;
11
12/// Enables logging of differential dataflow events.
13pub fn enable<A, W>(worker: &mut timely::worker::Worker<A>, writer: W) -> Option<Box<dyn std::any::Any+'static>>
14where
15    A: timely::communication::Allocate,
16    W: std::io::Write+'static,
17{
18    let writer = ::timely::dataflow::operators::capture::EventWriter::new(writer);
19    let mut logger = ::timely::logging::BatchLogger::new(writer);
20    worker
21        .log_register()
22        .insert::<DifferentialEventBuilder,_>("differential/arrange", move |time, data| logger.publish_batch(time, data))
23}
24
25/// Possible different differential events.
26#[derive(Debug, Clone, Ord, PartialOrd, Eq, PartialEq, Serialize, Deserialize, Columnar)]
27pub enum DifferentialEvent {
28    /// Batch creation.
29    Batch(BatchEvent),
30    /// Merge start and stop events.
31    Merge(MergeEvent),
32    /// Batch dropped when trace dropped.
33    Drop(DropEvent),
34    /// A merge failed to complete in time.
35    MergeShortfall(MergeShortfall),
36    /// Trace sharing event.
37    TraceShare(TraceShare),
38    /// Batcher size event
39    Batcher(BatcherEvent),
40}
41
42/// Either the start or end of a merge event.
43#[derive(Debug, Clone, Ord, PartialOrd, Eq, PartialEq, Serialize, Deserialize, Columnar)]
44pub struct BatchEvent {
45    /// Operator identifier.
46    pub operator: usize,
47    /// Which order of magnitude.
48    pub length: usize,
49}
50
51impl From<BatchEvent> for DifferentialEvent { fn from(e: BatchEvent) -> Self { DifferentialEvent::Batch(e) } }
52
53
54/// Either the start or end of a merge event.
55#[derive(Debug, Clone, Ord, PartialOrd, Eq, PartialEq, Serialize, Deserialize, Columnar)]
56pub struct BatcherEvent {
57    /// Operator identifier.
58    pub operator: usize,
59    /// Change in records.
60    pub records_diff: isize,
61    /// Change in used size.
62    pub size_diff: isize,
63    /// Change in capacity.
64    pub capacity_diff: isize,
65    /// Change in number of allocations.
66    pub allocations_diff: isize,
67}
68
69impl From<BatcherEvent> for DifferentialEvent { fn from(e: BatcherEvent) -> Self { DifferentialEvent::Batcher(e) } }
70
71/// Either the start or end of a merge event.
72#[derive(Debug, Clone, Ord, PartialOrd, Eq, PartialEq, Serialize, Deserialize, Columnar)]
73pub struct DropEvent {
74    /// Operator identifier.
75    pub operator: usize,
76    /// Which order of magnitude.
77    pub length: usize,
78}
79
80impl From<DropEvent> for DifferentialEvent { fn from(e: DropEvent) -> Self { DifferentialEvent::Drop(e) } }
81
82/// Either the start or end of a merge event.
83#[derive(Debug, Clone, Ord, PartialOrd, Eq, PartialEq, Serialize, Deserialize, Columnar)]
84pub struct MergeEvent {
85    /// Operator identifier.
86    pub operator: usize,
87    /// Which order of magnitude.
88    pub scale: usize,
89    /// Length of first trace.
90    pub length1: usize,
91    /// Length of second trace.
92    pub length2: usize,
93    /// None implies a start.
94    pub complete: Option<usize>,
95}
96
97impl From<MergeEvent> for DifferentialEvent { fn from(e: MergeEvent) -> Self { DifferentialEvent::Merge(e) } }
98
99/// A merge failed to complete in time.
100#[derive(Debug, Clone, Ord, PartialOrd, Eq, PartialEq, Serialize, Deserialize, Columnar)]
101pub struct MergeShortfall {
102    /// Operator identifier.
103    pub operator: usize,
104    /// Which order of magnitude.
105    pub scale: usize,
106    /// By how much were we short.
107    pub shortfall: usize,
108}
109
110impl From<MergeShortfall> for DifferentialEvent { fn from(e: MergeShortfall) -> Self { DifferentialEvent::MergeShortfall(e) } }
111
112/// Either the start or end of a merge event.
113#[derive(Debug, Clone, Ord, PartialOrd, Eq, PartialEq, Serialize, Deserialize, Columnar)]
114pub struct TraceShare {
115    /// Operator identifier.
116    pub operator: usize,
117    /// Change in number of shares.
118    pub diff: isize,
119}
120
121impl From<TraceShare> for DifferentialEvent { fn from(e: TraceShare) -> Self { DifferentialEvent::TraceShare(e) } }