timely/progress/
broadcast.rs

1//! Broadcasts progress information among workers.
2
3use std::rc::Rc;
4use crate::progress::{ChangeBatch, Timestamp};
5use crate::progress::{Location, Port};
6use crate::communication::{Push, Pull};
7use crate::logging::TimelyLogger as Logger;
8use crate::logging::TimelyProgressLogger as ProgressLogger;
9use crate::Bincode;
10
11/// A progress update message consisting of source worker id, sequence number and lists of
12/// message and internal updates
13pub type ProgressMsg<T> = Bincode<(usize, usize, ChangeBatch<(Location, T)>)>;
14
15/// Manages broadcasting of progress updates to and receiving updates from workers.
16pub struct Progcaster<T:Timestamp> {
17    /// Pusher into which we send progress updates.
18    pusher: Box<dyn Push<ProgressMsg<T>>>,
19    /// Puller from which we recv progress updates.
20    puller: Box<dyn Pull<ProgressMsg<T>>>,
21    /// Source worker index
22    source: usize,
23    /// Sequence number counter
24    counter: usize,
25    /// Global identifier of the scope that owns this `Progcaster`.
26    identifier: usize,
27    /// Communication channel identifier
28    channel_identifier: usize,
29    /// An optional logger to record progress messages.
30    progress_logging: Option<ProgressLogger<T>>,
31}
32
33impl<T:Timestamp+Send> Progcaster<T> {
34    /// Creates a new `Progcaster` using a channel from the supplied worker.
35    pub fn new<A: crate::worker::AsWorker>(worker: &mut A, addr: Rc<[usize]>, identifier: usize, mut logging: Option<Logger>, progress_logging: Option<ProgressLogger<T>>) -> Progcaster<T> {
36
37        let channel_identifier = worker.new_identifier();
38        let (pusher, puller) = worker.broadcast(channel_identifier, addr);
39        logging.as_mut().map(|l| l.log(crate::logging::CommChannelsEvent {
40            identifier: channel_identifier,
41            kind: crate::logging::CommChannelKind::Progress,
42        }));
43        let worker_index = worker.index();
44        Progcaster {
45            pusher,
46            puller,
47            source: worker_index,
48            counter: 0,
49            identifier,
50            channel_identifier,
51            progress_logging,
52        }
53    }
54
55    /// Sends pointstamp changes to all workers.
56    pub fn send(&mut self, changes: &mut ChangeBatch<(Location, T)>) {
57
58        changes.compact();
59        if !changes.is_empty() {
60
61            self.progress_logging.as_ref().map(|l| {
62
63                // Pre-allocate enough space; we transfer ownership, so there is not
64                // an opportunity to re-use allocations (w/o changing the logging
65                // interface to accept references).
66                let mut messages = Vec::with_capacity(changes.len());
67                let mut internal = Vec::with_capacity(changes.len());
68
69                for ((location, time), diff) in changes.iter() {
70                    match location.port {
71                        Port::Target(port) => {
72                            messages.push((location.node, port, time.clone(), *diff))
73                        },
74                        Port::Source(port) => {
75                            internal.push((location.node, port, time.clone(), *diff))
76                        }
77                    }
78                }
79
80                l.log(crate::logging::TimelyProgressEvent {
81                    is_send: true,
82                    source: self.source,
83                    channel: self.channel_identifier,
84                    seq_no: self.counter,
85                    identifier: self.identifier,
86                    messages,
87                    internal,
88                });
89            });
90
91            let payload = (self.source, self.counter, std::mem::take(changes));
92            let mut to_push = Some(Bincode { payload });
93            self.pusher.push(&mut to_push);
94            self.pusher.done();
95
96            if let Some(pushed) = to_push {
97                *changes = pushed.payload.2;
98                changes.clear();
99            }
100
101            self.counter += 1;
102        }
103    }
104
105    /// Receives pointstamp changes from all workers.
106    pub fn recv(&mut self, changes: &mut ChangeBatch<(Location, T)>) {
107
108        while let Some(message) = self.puller.pull() {
109
110            let source = message.0;
111            let counter = message.1;
112            let recv_changes = &mut message.2;
113
114            let channel = self.channel_identifier;
115
116            // See comments above about the relatively high cost of this logging, and our
117            // options for improving it if performance limits users who want other logging.
118            self.progress_logging.as_ref().map(|l| {
119
120                let mut messages = Vec::with_capacity(changes.len());
121                let mut internal = Vec::with_capacity(changes.len());
122
123                for ((location, time), diff) in recv_changes.iter() {
124
125                    match location.port {
126                        Port::Target(port) => {
127                            messages.push((location.node, port, time.clone(), *diff))
128                        },
129                        Port::Source(port) => {
130                            internal.push((location.node, port, time.clone(), *diff))
131                        }
132                    }
133                }
134
135                l.log(crate::logging::TimelyProgressEvent {
136                    is_send: false,
137                    source,
138                    seq_no: counter,
139                    channel,
140                    identifier: self.identifier,
141                    messages,
142                    internal,
143                });
144            });
145
146            // We clone rather than drain to avoid deserialization.
147            for &(ref update, delta) in recv_changes.iter() {
148                changes.update(update.clone(), delta);
149            }
150        }
151
152    }
153}