timely_communication/allocator/counters.rs
1//! Push and Pull wrappers to maintain counts of messages in channels.
2
3use std::rc::Rc;
4use std::cell::RefCell;
5use std::sync::mpsc::Sender;
6
7use crate::{Push, Pull};
8
9/// The push half of an intra-thread channel.
10pub struct Pusher<T, P: Push<T>> {
11 index: usize,
12 // count: usize,
13 events: Rc<RefCell<Vec<usize>>>,
14 pusher: P,
15 phantom: ::std::marker::PhantomData<T>,
16}
17
18impl<T, P: Push<T>> Pusher<T, P> {
19 /// Wraps a pusher with a message counter.
20 pub fn new(pusher: P, index: usize, events: Rc<RefCell<Vec<usize>>>) -> Self {
21 Pusher {
22 index,
23 // count: 0,
24 events,
25 pusher,
26 phantom: ::std::marker::PhantomData,
27 }
28 }
29}
30
31impl<T, P: Push<T>> Push<T> for Pusher<T, P> {
32 #[inline]
33 fn push(&mut self, element: &mut Option<T>) {
34 // if element.is_none() {
35 // if self.count != 0 {
36 // self.events
37 // .borrow_mut()
38 // .push_back(self.index);
39 // self.count = 0;
40 // }
41 // }
42 // else {
43 // self.count += 1;
44 // }
45 // TODO: Version above is less chatty, but can be a bit late in
46 // moving information along. Better, but needs cooperation.
47 self.events
48 .borrow_mut()
49 .push(self.index);
50
51 self.pusher.push(element)
52 }
53}
54
55/// The push half of an intra-thread channel.
56pub struct ArcPusher<T, P: Push<T>> {
57 index: usize,
58 // count: usize,
59 events: Sender<usize>,
60 pusher: P,
61 phantom: ::std::marker::PhantomData<T>,
62 buzzer: crate::buzzer::Buzzer,
63}
64
65impl<T, P: Push<T>> ArcPusher<T, P> {
66 /// Wraps a pusher with a message counter.
67 pub fn new(pusher: P, index: usize, events: Sender<usize>, buzzer: crate::buzzer::Buzzer) -> Self {
68 ArcPusher {
69 index,
70 // count: 0,
71 events,
72 pusher,
73 phantom: ::std::marker::PhantomData,
74 buzzer,
75 }
76 }
77}
78
79impl<T, P: Push<T>> Push<T> for ArcPusher<T, P> {
80 #[inline]
81 fn push(&mut self, element: &mut Option<T>) {
82 // if element.is_none() {
83 // if self.count != 0 {
84 // self.events
85 // .send((self.index, Event::Pushed(self.count)))
86 // .expect("Failed to send message count");
87 // self.count = 0;
88 // }
89 // }
90 // else {
91 // self.count += 1;
92 // }
93
94 // These three calls should happen in this order, to ensure that
95 // we first enqueue data, second enqueue interest in the channel,
96 // and finally awaken the thread. Other orders are defective when
97 // multiple threads are involved.
98 self.pusher.push(element);
99 let _ = self.events.send(self.index);
100 // TODO : Perhaps this shouldn't be a fatal error (e.g. in shutdown).
101 // .expect("Failed to send message count");
102 self.buzzer.buzz();
103 }
104}
105
106/// The pull half of an intra-thread channel.
107pub struct Puller<T, P: Pull<T>> {
108 index: usize,
109 count: usize,
110 events: Rc<RefCell<Vec<usize>>>,
111 puller: P,
112 phantom: ::std::marker::PhantomData<T>,
113}
114
115impl<T, P: Pull<T>> Puller<T, P> {
116 /// Wraps a puller with a message counter.
117 pub fn new(puller: P, index: usize, events: Rc<RefCell<Vec<usize>>>) -> Self {
118 Puller {
119 index,
120 count: 0,
121 events,
122 puller,
123 phantom: ::std::marker::PhantomData,
124 }
125 }
126}
127impl<T, P: Pull<T>> Pull<T> for Puller<T, P> {
128 #[inline]
129 fn pull(&mut self) -> &mut Option<T> {
130 let result = self.puller.pull();
131 if result.is_none() {
132 if self.count != 0 {
133 self.events
134 .borrow_mut()
135 .push(self.index);
136 self.count = 0;
137 }
138 }
139 else {
140 self.count += 1;
141 }
142
143 result
144 }
145}