1pub type WorkerIdentifier = usize;
5pub type TimelyEventBuilder = CapacityContainerBuilder<Vec<(Duration, TimelyEvent)>>;
7pub type TimelyLogger = crate::logging_core::TypedLogger<TimelyEventBuilder, TimelyEvent>;
9pub type TimelyProgressEventBuilder<T> = CapacityContainerBuilder<Vec<(Duration, TimelyProgressEvent<T>)>>;
11pub type TimelyProgressLogger<T> = crate::logging_core::Logger<TimelyProgressEventBuilder<T>>;
13pub type TimelySummaryEventBuilder<TS> = CapacityContainerBuilder<Vec<(Duration, OperatesSummaryEvent<TS>)>>;
15pub type TimelySummaryLogger<TS> = crate::logging_core::Logger<TimelySummaryEventBuilder<TS>>;
17
18use std::time::Duration;
19use columnar::Columnar;
20use serde::{Deserialize, Serialize};
21
22use crate::Container;
23use crate::container::CapacityContainerBuilder;
24use crate::dataflow::operators::capture::{Event, EventPusher};
25use crate::progress::operate::Connectivity;
26
27pub struct BatchLogger<P, C> where P: EventPusher<Duration, C> {
29 time: Duration,
30 event_pusher: P,
31 _phantom: ::std::marker::PhantomData<C>,
32}
33
34impl<P, C> BatchLogger<P, C> where P: EventPusher<Duration, C>, C: Container {
35 pub fn new(event_pusher: P) -> Self {
37 BatchLogger {
38 time: Default::default(),
39 event_pusher,
40 _phantom: ::std::marker::PhantomData,
41 }
42 }
43 pub fn publish_batch(&mut self, &time: &Duration, data: &mut Option<C>) {
45 if let Some(data) = data {
46 self.event_pusher.push(Event::Messages(self.time, std::mem::take(data)));
47 }
48 if self.time < time {
49 let new_frontier = time;
50 let old_frontier = self.time;
51 self.event_pusher.push(Event::Progress(vec![(new_frontier, 1), (old_frontier, -1)]));
52 }
53 self.time = time;
54 }
55}
56impl<P, C> Drop for BatchLogger<P, C> where P: EventPusher<Duration, C> {
57 fn drop(&mut self) {
58 self.event_pusher.push(Event::Progress(vec![(self.time, -1)]));
59 }
60}
61
62#[derive(Serialize, Deserialize, Columnar, Debug, Clone, Hash, Eq, PartialEq, Ord, PartialOrd)]
63pub struct OperatesEvent {
65 pub id: usize,
67 pub addr: Vec<usize>,
69 pub name: String,
71}
72
73
74#[derive(Serialize, Deserialize, Columnar, Debug, Clone, Eq, PartialEq)]
75pub struct OperatesSummaryEvent<TS> {
77 pub id: usize,
79 pub summary: Connectivity<TS>,
81}
82
83#[derive(Serialize, Deserialize, Columnar, Debug, Clone, Hash, Eq, PartialEq, Ord, PartialOrd)]
84pub struct ChannelsEvent {
86 pub id: usize,
88 pub scope_addr: Vec<usize>,
90 pub source: (usize, usize),
92 pub target: (usize, usize),
94 pub typ: String,
96}
97
98#[derive(Debug, Clone)]
99pub struct TimelyProgressEvent<T> {
101 pub is_send: bool,
103 pub source: usize,
105 pub channel: usize,
107 pub seq_no: usize,
109 pub identifier: usize,
111 pub messages: Vec<(usize, usize, T, i64)>,
113 pub internal: Vec<(usize, usize, T, i64)>,
115}
116
117#[derive(Serialize, Deserialize, Columnar, Debug, Clone, Hash, Eq, PartialEq, Ord, PartialOrd)]
118pub struct PushProgressEvent {
120 pub op_id: usize,
122}
123
124#[derive(Serialize, Deserialize, Columnar, Debug, Clone, Hash, Eq, PartialEq, Ord, PartialOrd)]
125pub struct MessagesEvent {
127 pub is_send: bool,
129 pub channel: usize,
131 pub source: usize,
133 pub target: usize,
135 pub seq_no: usize,
137 pub length: usize,
139}
140
141#[derive(Serialize, Deserialize, Columnar, Debug, Copy, Clone, Hash, PartialEq, Eq, Ord, PartialOrd)]
143pub enum StartStop {
144 Start,
146 Stop,
148}
149
150#[derive(Serialize, Deserialize, Columnar, Debug, Copy, Clone, Hash, Eq, PartialEq, Ord, PartialOrd)]
151pub struct ScheduleEvent {
153 pub id: usize,
155 pub start_stop: StartStop,
159}
160
161impl ScheduleEvent {
162 pub fn start(id: usize) -> Self { ScheduleEvent { id, start_stop: StartStop::Start } }
164 pub fn stop(id: usize) -> Self { ScheduleEvent { id, start_stop: StartStop::Stop } }
166}
167
168#[derive(Serialize, Deserialize, Columnar, Debug, Clone, Hash, Eq, PartialEq, Ord, PartialOrd)]
169pub struct ShutdownEvent {
171 pub id: usize,
173}
174
175#[derive(Serialize, Deserialize, Columnar, Debug, Clone, Hash, Eq, PartialEq, Ord, PartialOrd)]
176pub struct ApplicationEvent {
178 pub id: usize,
180 pub is_start: bool,
182}
183
184#[derive(Serialize, Deserialize, Columnar, Debug, Clone, Hash, Eq, PartialEq, Ord, PartialOrd)]
185pub struct GuardedMessageEvent {
187 pub is_start: bool,
189}
190
191#[derive(Serialize, Deserialize, Columnar, Debug, Clone, Hash, Eq, PartialEq, Ord, PartialOrd)]
192pub struct GuardedProgressEvent {
194 pub is_start: bool,
196}
197
198#[derive(Serialize, Deserialize, Columnar, Debug, PartialEq, Eq, Hash, Clone, Copy)]
199pub struct TimelySetup {
201 pub index: usize,
203}
204
205#[derive(Serialize, Deserialize, Columnar, Debug, Copy, Clone, Hash, Eq, PartialEq, Ord, PartialOrd)]
206pub enum CommChannelKind {
208 Progress,
210 Data,
212}
213
214#[derive(Serialize, Deserialize, Columnar, Debug, Copy, Clone, Hash, Eq, PartialEq, Ord, PartialOrd)]
215pub struct CommChannelsEvent {
217 pub identifier: usize,
219 pub kind: CommChannelKind,
221}
222
223#[derive(Serialize, Deserialize, Columnar, Debug, Copy, Clone, Hash, Eq, PartialEq, Ord, PartialOrd)]
224pub struct InputEvent {
226 pub start_stop: StartStop,
228}
229
230#[derive(Serialize, Deserialize, Columnar, Debug, Clone, Hash, PartialEq, Eq, Ord, PartialOrd)]
232pub enum ParkEvent {
233 Park(Option<Duration>),
235 Unpark,
237}
238
239impl ParkEvent {
240 pub fn park(duration: Option<Duration>) -> Self { ParkEvent::Park(duration) }
242 pub fn unpark() -> Self { ParkEvent::Unpark }
244}
245
246#[derive(Serialize, Deserialize, Columnar, Debug, Clone, Hash, Eq, PartialEq, Ord, PartialOrd)]
247pub enum TimelyEvent {
249 Operates(OperatesEvent),
251 Channels(ChannelsEvent),
253 PushProgress(PushProgressEvent),
255 Messages(MessagesEvent),
257 Schedule(ScheduleEvent),
259 Shutdown(ShutdownEvent),
261 Application(ApplicationEvent),
263 GuardedMessage(GuardedMessageEvent),
265 GuardedProgress(GuardedProgressEvent),
267 CommChannels(CommChannelsEvent),
269 Input(InputEvent),
271 Park(ParkEvent),
273 Text(String),
275}
276
277impl From<OperatesEvent> for TimelyEvent {
278 fn from(v: OperatesEvent) -> TimelyEvent { TimelyEvent::Operates(v) }
279}
280
281impl From<ChannelsEvent> for TimelyEvent {
282 fn from(v: ChannelsEvent) -> TimelyEvent { TimelyEvent::Channels(v) }
283}
284
285impl From<PushProgressEvent> for TimelyEvent {
286 fn from(v: PushProgressEvent) -> TimelyEvent { TimelyEvent::PushProgress(v) }
287}
288
289impl From<MessagesEvent> for TimelyEvent {
290 fn from(v: MessagesEvent) -> TimelyEvent { TimelyEvent::Messages(v) }
291}
292
293impl From<ScheduleEvent> for TimelyEvent {
294 fn from(v: ScheduleEvent) -> TimelyEvent { TimelyEvent::Schedule(v) }
295}
296
297impl From<ShutdownEvent> for TimelyEvent {
298 fn from(v: ShutdownEvent) -> TimelyEvent { TimelyEvent::Shutdown(v) }
299}
300
301impl From<ApplicationEvent> for TimelyEvent {
302 fn from(v: ApplicationEvent) -> TimelyEvent { TimelyEvent::Application(v) }
303}
304
305impl From<GuardedMessageEvent> for TimelyEvent {
306 fn from(v: GuardedMessageEvent) -> TimelyEvent { TimelyEvent::GuardedMessage(v) }
307}
308
309impl From<GuardedProgressEvent> for TimelyEvent {
310 fn from(v: GuardedProgressEvent) -> TimelyEvent { TimelyEvent::GuardedProgress(v) }
311}
312
313impl From<CommChannelsEvent> for TimelyEvent {
314 fn from(v: CommChannelsEvent) -> TimelyEvent { TimelyEvent::CommChannels(v) }
315}
316
317impl From<InputEvent> for TimelyEvent {
318 fn from(v: InputEvent) -> TimelyEvent { TimelyEvent::Input(v) }
319}
320
321impl From<ParkEvent> for TimelyEvent {
322 fn from(v: ParkEvent) -> TimelyEvent { TimelyEvent::Park(v) }
323}