timely/progress/
broadcast.rs
1use std::rc::Rc;
4use crate::progress::{ChangeBatch, Timestamp};
5use crate::progress::{Location, Port};
6use crate::communication::{Push, Pull};
7use crate::logging::TimelyLogger as Logger;
8use crate::logging::TimelyProgressLogger as ProgressLogger;
9use crate::Bincode;
10
11pub type ProgressMsg<T> = Bincode<(usize, usize, ChangeBatch<(Location, T)>)>;
14
15pub struct Progcaster<T:Timestamp> {
17 pusher: Box<dyn Push<ProgressMsg<T>>>,
19 puller: Box<dyn Pull<ProgressMsg<T>>>,
21 source: usize,
23 counter: usize,
25 identifier: usize,
27 channel_identifier: usize,
29 progress_logging: Option<ProgressLogger<T>>,
31}
32
33impl<T:Timestamp+Send> Progcaster<T> {
34 pub fn new<A: crate::worker::AsWorker>(worker: &mut A, addr: Rc<[usize]>, identifier: usize, mut logging: Option<Logger>, progress_logging: Option<ProgressLogger<T>>) -> Progcaster<T> {
36
37 let channel_identifier = worker.new_identifier();
38 let (pusher, puller) = worker.broadcast(channel_identifier, addr);
39 logging.as_mut().map(|l| l.log(crate::logging::CommChannelsEvent {
40 identifier: channel_identifier,
41 kind: crate::logging::CommChannelKind::Progress,
42 }));
43 let worker_index = worker.index();
44 Progcaster {
45 pusher,
46 puller,
47 source: worker_index,
48 counter: 0,
49 identifier,
50 channel_identifier,
51 progress_logging,
52 }
53 }
54
55 pub fn send(&mut self, changes: &mut ChangeBatch<(Location, T)>) {
57
58 changes.compact();
59 if !changes.is_empty() {
60
61 self.progress_logging.as_ref().map(|l| {
62
63 let mut messages = Vec::with_capacity(changes.len());
67 let mut internal = Vec::with_capacity(changes.len());
68
69 for ((location, time), diff) in changes.iter() {
70 match location.port {
71 Port::Target(port) => {
72 messages.push((location.node, port, time.clone(), *diff))
73 },
74 Port::Source(port) => {
75 internal.push((location.node, port, time.clone(), *diff))
76 }
77 }
78 }
79
80 l.log(crate::logging::TimelyProgressEvent {
81 is_send: true,
82 source: self.source,
83 channel: self.channel_identifier,
84 seq_no: self.counter,
85 identifier: self.identifier,
86 messages,
87 internal,
88 });
89 });
90
91 let payload = (self.source, self.counter, std::mem::take(changes));
92 let mut to_push = Some(Bincode { payload });
93 self.pusher.push(&mut to_push);
94 self.pusher.done();
95
96 if let Some(pushed) = to_push {
97 *changes = pushed.payload.2;
98 changes.clear();
99 }
100
101 self.counter += 1;
102 }
103 }
104
105 pub fn recv(&mut self, changes: &mut ChangeBatch<(Location, T)>) {
107
108 while let Some(message) = self.puller.pull() {
109
110 let source = message.0;
111 let counter = message.1;
112 let recv_changes = &mut message.2;
113
114 let channel = self.channel_identifier;
115
116 self.progress_logging.as_ref().map(|l| {
119
120 let mut messages = Vec::with_capacity(changes.len());
121 let mut internal = Vec::with_capacity(changes.len());
122
123 for ((location, time), diff) in recv_changes.iter() {
124
125 match location.port {
126 Port::Target(port) => {
127 messages.push((location.node, port, time.clone(), *diff))
128 },
129 Port::Source(port) => {
130 internal.push((location.node, port, time.clone(), *diff))
131 }
132 }
133 }
134
135 l.log(crate::logging::TimelyProgressEvent {
136 is_send: false,
137 source,
138 seq_no: counter,
139 channel,
140 identifier: self.identifier,
141 messages,
142 internal,
143 });
144 });
145
146 for &(ref update, delta) in recv_changes.iter() {
148 changes.update(update.clone(), delta);
149 }
150 }
151
152 }
153}