timely/
logging.rs

1//! Traits, implementations, and macros related to logging timely events.
2
3/// Type alias for logging timely events.
4pub type WorkerIdentifier = usize;
5/// Container builder for timely dataflow system events.
6pub type TimelyEventBuilder = CapacityContainerBuilder<Vec<(Duration, TimelyEvent)>>;
7/// Logger for timely dataflow system events.
8pub type TimelyLogger = crate::logging_core::TypedLogger<TimelyEventBuilder, TimelyEvent>;
9/// Container builder for timely dataflow progress events.
10pub type TimelyProgressEventBuilder<T> = CapacityContainerBuilder<Vec<(Duration, TimelyProgressEvent<T>)>>;
11/// Logger for timely dataflow progress events (the "timely/progress/*" log streams).
12pub type TimelyProgressLogger<T> = crate::logging_core::Logger<TimelyProgressEventBuilder<T>>;
13/// Container builder for timely dataflow operator summary events.
14pub type TimelySummaryEventBuilder<TS> = CapacityContainerBuilder<Vec<(Duration, OperatesSummaryEvent<TS>)>>;
15/// Logger for timely dataflow operator summary events (the "timely/summary/*" log streams).
16pub type TimelySummaryLogger<TS> = crate::logging_core::Logger<TimelySummaryEventBuilder<TS>>;
17
18use std::time::Duration;
19use columnar::Columnar;
20use serde::{Deserialize, Serialize};
21
22use crate::Container;
23use crate::container::CapacityContainerBuilder;
24use crate::dataflow::operators::capture::{Event, EventPusher};
25use crate::progress::operate::Connectivity;
26
27/// Logs events as a timely stream, with progress statements.
28pub struct BatchLogger<P, C> where P: EventPusher<Duration, C> {
29    time: Duration,
30    event_pusher: P,
31    _phantom: ::std::marker::PhantomData<C>,
32}
33
34impl<P, C> BatchLogger<P, C> where P: EventPusher<Duration, C>, C: Container {
35    /// Creates a new batch logger.
36    pub fn new(event_pusher: P) -> Self {
37        BatchLogger {
38            time: Default::default(),
39            event_pusher,
40            _phantom: ::std::marker::PhantomData,
41        }
42    }
43    /// Publishes a batch of logged events and advances the capability.
44    pub fn publish_batch(&mut self, &time: &Duration, data: &mut Option<C>) {
45        if let Some(data) = data {
46            self.event_pusher.push(Event::Messages(self.time, std::mem::take(data)));
47        }
48        if self.time < time {
49            let new_frontier = time;
50            let old_frontier = self.time;
51            self.event_pusher.push(Event::Progress(vec![(new_frontier, 1), (old_frontier, -1)]));
52        }
53        self.time = time;
54    }
55}
56impl<P, C> Drop for BatchLogger<P, C> where P: EventPusher<Duration, C> {
57    fn drop(&mut self) {
58        self.event_pusher.push(Event::Progress(vec![(self.time, -1)]));
59    }
60}
61
62#[derive(Serialize, Deserialize, Columnar, Debug, Clone, Hash, Eq, PartialEq, Ord, PartialOrd)]
63/// The creation of an `Operate` implementor.
64pub struct OperatesEvent {
65    /// Worker-unique identifier for the operator.
66    pub id: usize,
67    /// Sequence of nested scope identifiers indicating the path from the root to this instance.
68    pub addr: Vec<usize>,
69    /// A helpful name.
70    pub name: String,
71}
72
73
74#[derive(Serialize, Deserialize, Columnar, Debug, Clone, Eq, PartialEq)]
75/// The summary of internal connectivity of an `Operate` implementor.
76pub struct OperatesSummaryEvent<TS> {
77    /// Worker-unique identifier for the operator.
78    pub id: usize,
79    /// Timestamp action summaries for (input, output) pairs.
80    pub summary: Connectivity<TS>,
81}
82
83#[derive(Serialize, Deserialize, Columnar, Debug, Clone, Hash, Eq, PartialEq, Ord, PartialOrd)]
84/// The creation of a channel between operators.
85pub struct ChannelsEvent {
86    /// Worker-unique identifier for the channel
87    pub id: usize,
88    /// Sequence of nested scope identifiers indicating the path from the root to this instance.
89    pub scope_addr: Vec<usize>,
90    /// Source descriptor, indicating operator index and output port.
91    pub source: (usize, usize),
92    /// Target descriptor, indicating operator index and input port.
93    pub target: (usize, usize),
94    /// The type of data on the channel, as a string.
95    pub typ: String,
96}
97
98#[derive(Debug, Clone)]
99/// Send or receive of progress information.
100pub struct TimelyProgressEvent<T> {
101    /// `true` if the event is a send, and `false` if it is a receive.
102    pub is_send: bool,
103    /// Source worker index.
104    pub source: usize,
105    /// Communication channel identifier
106    pub channel: usize,
107    /// Message sequence number.
108    pub seq_no: usize,
109    /// Global identifier of the operator reporting progress.
110    pub identifier: usize,
111    /// List of message updates, containing Target descriptor, timestamp as string, and delta.
112    pub messages: Vec<(usize, usize, T, i64)>,
113    /// List of capability updates, containing Source descriptor, timestamp as string, and delta.
114    pub internal: Vec<(usize, usize, T, i64)>,
115}
116
117#[derive(Serialize, Deserialize, Columnar, Debug, Clone, Hash, Eq, PartialEq, Ord, PartialOrd)]
118/// External progress pushed onto an operator
119pub struct PushProgressEvent {
120    /// Worker-unique operator identifier
121    pub op_id: usize,
122}
123
124#[derive(Serialize, Deserialize, Columnar, Debug, Clone, Hash, Eq, PartialEq, Ord, PartialOrd)]
125/// Message send or receive event
126pub struct MessagesEvent {
127    /// `true` if send event, `false` if receive event.
128    pub is_send: bool,
129    /// Channel identifier
130    pub channel: usize,
131    /// Source worker index.
132    pub source: usize,
133    /// Target worker index.
134    pub target: usize,
135    /// Message sequence number.
136    pub seq_no: usize,
137    /// Number of typed records in the message.
138    pub length: usize,
139}
140
141/// Records the starting and stopping of an operator.
142#[derive(Serialize, Deserialize, Columnar, Debug, Copy, Clone, Hash, PartialEq, Eq, Ord, PartialOrd)]
143pub enum StartStop {
144    /// Operator starts.
145    Start,
146    /// Operator stops.
147    Stop,
148}
149
150#[derive(Serialize, Deserialize, Columnar, Debug, Copy, Clone, Hash, Eq, PartialEq, Ord, PartialOrd)]
151/// Operator start or stop.
152pub struct ScheduleEvent {
153    /// Worker-unique identifier for the operator, linkable to the identifiers in [`OperatesEvent`].
154    pub id: usize,
155    /// `Start` if the operator is starting, `Stop` if it is stopping.
156    /// activity is true if it looks like some useful work was performed during this call (data was
157    /// read or written, notifications were requested / delivered)
158    pub start_stop: StartStop,
159}
160
161impl ScheduleEvent {
162    /// Creates a new start scheduling event.
163    pub fn start(id: usize) -> Self { ScheduleEvent { id, start_stop: StartStop::Start } }
164    /// Creates a new stop scheduling event and reports whether work occurred.
165    pub fn stop(id: usize) -> Self { ScheduleEvent { id, start_stop: StartStop::Stop } }
166}
167
168#[derive(Serialize, Deserialize, Columnar, Debug, Clone, Hash, Eq, PartialEq, Ord, PartialOrd)]
169/// Operator shutdown.
170pub struct ShutdownEvent {
171    /// Worker-unique identifier for the operator, linkable to the identifiers in [`OperatesEvent`].
172    pub id: usize,
173}
174
175#[derive(Serialize, Deserialize, Columnar, Debug, Clone, Hash, Eq, PartialEq, Ord, PartialOrd)]
176/// Application-defined code start or stop
177pub struct ApplicationEvent {
178    /// Unique event type identifier
179    pub id: usize,
180    /// `true` when activity begins, `false` when it stops
181    pub is_start: bool,
182}
183
184#[derive(Serialize, Deserialize, Columnar, Debug, Clone, Hash, Eq, PartialEq, Ord, PartialOrd)]
185/// Application-defined code start or stop
186pub struct GuardedMessageEvent {
187    /// `true` when activity begins, `false` when it stops
188    pub is_start: bool,
189}
190
191#[derive(Serialize, Deserialize, Columnar, Debug, Clone, Hash, Eq, PartialEq, Ord, PartialOrd)]
192/// Application-defined code start or stop
193pub struct GuardedProgressEvent {
194    /// `true` when activity begins, `false` when it stops
195    pub is_start: bool,
196}
197
198#[derive(Serialize, Deserialize, Columnar, Debug, PartialEq, Eq, Hash, Clone, Copy)]
199/// Identifier of the worker that generated a log line
200pub struct TimelySetup {
201    /// Worker index
202    pub index: usize,
203}
204
205#[derive(Serialize, Deserialize, Columnar, Debug, Copy, Clone, Hash, Eq, PartialEq, Ord, PartialOrd)]
206/// Kind of communication channel
207pub enum CommChannelKind {
208    /// Communication channel carrying progress information
209    Progress,
210    /// Communication channel carrying data
211    Data,
212}
213
214#[derive(Serialize, Deserialize, Columnar, Debug, Copy, Clone, Hash, Eq, PartialEq, Ord, PartialOrd)]
215/// Event on a communication channel
216pub struct CommChannelsEvent {
217    /// Communication channel identifier
218    pub identifier: usize,
219    /// Kind of communication channel (progress / data)
220    pub kind: CommChannelKind,
221}
222
223#[derive(Serialize, Deserialize, Columnar, Debug, Copy, Clone, Hash, Eq, PartialEq, Ord, PartialOrd)]
224/// Input logic start/stop
225pub struct InputEvent {
226    /// True when activity begins, false when it stops
227    pub start_stop: StartStop,
228}
229
230/// Records the starting and stopping of an operator.
231#[derive(Serialize, Deserialize, Columnar, Debug, Clone, Hash, PartialEq, Eq, Ord, PartialOrd)]
232pub enum ParkEvent {
233    /// Worker parks.
234    Park(Option<Duration>),
235    /// Worker unparks.
236    Unpark,
237}
238
239impl ParkEvent {
240    /// Creates a new park event from the supplied duration.
241    pub fn park(duration: Option<Duration>) -> Self { ParkEvent::Park(duration) }
242    /// Creates a new unpark event.
243    pub fn unpark() -> Self { ParkEvent::Unpark }
244}
245
246#[derive(Serialize, Deserialize, Columnar, Debug, Clone, Hash, Eq, PartialEq, Ord, PartialOrd)]
247/// An event in a timely worker
248pub enum TimelyEvent {
249    /// Operator creation.
250    Operates(OperatesEvent),
251    /// Channel creation.
252    Channels(ChannelsEvent),
253    /// Progress propagation (reasoning).
254    PushProgress(PushProgressEvent),
255    /// Message send or receive.
256    Messages(MessagesEvent),
257    /// Operator start or stop.
258    Schedule(ScheduleEvent),
259    /// Operator shutdown.
260    Shutdown(ShutdownEvent),
261    /// No clue.
262    Application(ApplicationEvent),
263    /// Per-message computation.
264    GuardedMessage(GuardedMessageEvent),
265    /// Per-notification computation.
266    GuardedProgress(GuardedProgressEvent),
267    /// Communication channel event.
268    CommChannels(CommChannelsEvent),
269    /// Input event.
270    Input(InputEvent),
271    /// Park event.
272    Park(ParkEvent),
273    /// Unstructured event.
274    Text(String),
275}
276
277impl From<OperatesEvent> for TimelyEvent {
278    fn from(v: OperatesEvent) -> TimelyEvent { TimelyEvent::Operates(v) }
279}
280
281impl From<ChannelsEvent> for TimelyEvent {
282    fn from(v: ChannelsEvent) -> TimelyEvent { TimelyEvent::Channels(v) }
283}
284
285impl From<PushProgressEvent> for TimelyEvent {
286    fn from(v: PushProgressEvent) -> TimelyEvent { TimelyEvent::PushProgress(v) }
287}
288
289impl From<MessagesEvent> for TimelyEvent {
290    fn from(v: MessagesEvent) -> TimelyEvent { TimelyEvent::Messages(v) }
291}
292
293impl From<ScheduleEvent> for TimelyEvent {
294    fn from(v: ScheduleEvent) -> TimelyEvent { TimelyEvent::Schedule(v) }
295}
296
297impl From<ShutdownEvent> for TimelyEvent {
298    fn from(v: ShutdownEvent) -> TimelyEvent { TimelyEvent::Shutdown(v) }
299}
300
301impl From<ApplicationEvent> for TimelyEvent {
302    fn from(v: ApplicationEvent) -> TimelyEvent { TimelyEvent::Application(v) }
303}
304
305impl From<GuardedMessageEvent> for TimelyEvent {
306    fn from(v: GuardedMessageEvent) -> TimelyEvent { TimelyEvent::GuardedMessage(v) }
307}
308
309impl From<GuardedProgressEvent> for TimelyEvent {
310    fn from(v: GuardedProgressEvent) -> TimelyEvent { TimelyEvent::GuardedProgress(v) }
311}
312
313impl From<CommChannelsEvent> for TimelyEvent {
314    fn from(v: CommChannelsEvent) -> TimelyEvent { TimelyEvent::CommChannels(v) }
315}
316
317impl From<InputEvent> for TimelyEvent {
318    fn from(v: InputEvent) -> TimelyEvent { TimelyEvent::Input(v) }
319}
320
321impl From<ParkEvent> for TimelyEvent {
322    fn from(v: ParkEvent) -> TimelyEvent { TimelyEvent::Park(v) }
323}