timely_communication/allocator/
counters.rs

1//! Push and Pull wrappers to maintain counts of messages in channels.
2
3use std::rc::Rc;
4use std::cell::RefCell;
5use std::sync::mpsc::Sender;
6
7use crate::{Push, Pull};
8
9/// The push half of an intra-thread channel.
10pub struct Pusher<T, P: Push<T>> {
11    index: usize,
12    // count: usize,
13    events: Rc<RefCell<Vec<usize>>>,
14    pusher: P,
15    phantom: ::std::marker::PhantomData<T>,
16}
17
18impl<T, P: Push<T>>  Pusher<T, P> {
19    /// Wraps a pusher with a message counter.
20    pub fn new(pusher: P, index: usize, events: Rc<RefCell<Vec<usize>>>) -> Self {
21        Pusher {
22            index,
23            // count: 0,
24            events,
25            pusher,
26            phantom: ::std::marker::PhantomData,
27        }
28    }
29}
30
31impl<T, P: Push<T>> Push<T> for Pusher<T, P> {
32    #[inline]
33    fn push(&mut self, element: &mut Option<T>) {
34        // if element.is_none() {
35        //     if self.count != 0 {
36        //         self.events
37        //             .borrow_mut()
38        //             .push_back(self.index);
39        //         self.count = 0;
40        //     }
41        // }
42        // else {
43        //     self.count += 1;
44        // }
45        // TODO: Version above is less chatty, but can be a bit late in
46        //       moving information along. Better, but needs cooperation.
47        self.events
48            .borrow_mut()
49            .push(self.index);
50
51        self.pusher.push(element)
52    }
53}
54
55/// The push half of an intra-thread channel.
56pub struct ArcPusher<T, P: Push<T>> {
57    index: usize,
58    // count: usize,
59    events: Sender<usize>,
60    pusher: P,
61    phantom: ::std::marker::PhantomData<T>,
62    buzzer: crate::buzzer::Buzzer,
63}
64
65impl<T, P: Push<T>>  ArcPusher<T, P> {
66    /// Wraps a pusher with a message counter.
67    pub fn new(pusher: P, index: usize, events: Sender<usize>, buzzer: crate::buzzer::Buzzer) -> Self {
68        ArcPusher {
69            index,
70            // count: 0,
71            events,
72            pusher,
73            phantom: ::std::marker::PhantomData,
74            buzzer,
75        }
76    }
77}
78
79impl<T, P: Push<T>> Push<T> for ArcPusher<T, P> {
80    #[inline]
81    fn push(&mut self, element: &mut Option<T>) {
82        // if element.is_none() {
83        //     if self.count != 0 {
84        //         self.events
85        //             .send((self.index, Event::Pushed(self.count)))
86        //             .expect("Failed to send message count");
87        //         self.count = 0;
88        //     }
89        // }
90        // else {
91        //     self.count += 1;
92        // }
93
94        // These three calls should happen in this order, to ensure that
95        // we first enqueue data, second enqueue interest in the channel,
96        // and finally awaken the thread. Other orders are defective when
97        // multiple threads are involved.
98        self.pusher.push(element);
99        let _ = self.events.send(self.index);
100            // TODO : Perhaps this shouldn't be a fatal error (e.g. in shutdown).
101            // .expect("Failed to send message count");
102        self.buzzer.buzz();
103    }
104}
105
106/// The pull half of an intra-thread channel.
107pub struct Puller<T, P: Pull<T>> {
108    index: usize,
109    count: usize,
110    events: Rc<RefCell<Vec<usize>>>,
111    puller: P,
112    phantom: ::std::marker::PhantomData<T>,
113}
114
115impl<T, P: Pull<T>>  Puller<T, P> {
116    /// Wraps a puller with a message counter.
117    pub fn new(puller: P, index: usize, events: Rc<RefCell<Vec<usize>>>) -> Self {
118        Puller {
119            index,
120            count: 0,
121            events,
122            puller,
123            phantom: ::std::marker::PhantomData,
124        }
125    }
126}
127impl<T, P: Pull<T>> Pull<T> for Puller<T, P> {
128    #[inline]
129    fn pull(&mut self) -> &mut Option<T> {
130        let result = self.puller.pull();
131        if result.is_none() {
132            if self.count != 0 {
133                self.events
134                    .borrow_mut()
135                    .push(self.index);
136                self.count = 0;
137            }
138        }
139        else {
140            self.count += 1;
141        }
142
143        result
144    }
145}