timely_communication/allocator/
generic.rs1use std::rc::Rc;
7use std::cell::RefCell;
8
9use crate::allocator::thread::ThreadBuilder;
10use crate::allocator::{Allocate, AllocateBuilder, Exchangeable, Thread, Process, ProcessBuilder};
11use crate::allocator::zero_copy::allocator::{TcpBuilder, TcpAllocator};
12
13use crate::{Push, Pull};
14
15pub enum Allocator {
18 Thread(Thread),
20 Process(Process),
22 Tcp(TcpAllocator),
24}
25
26impl Allocator {
27 pub fn index(&self) -> usize {
29 match self {
30 Allocator::Thread(t) => t.index(),
31 Allocator::Process(p) => p.index(),
32 Allocator::Tcp(z) => z.index(),
33 }
34 }
35 pub fn peers(&self) -> usize {
37 match self {
38 Allocator::Thread(t) => t.peers(),
39 Allocator::Process(p) => p.peers(),
40 Allocator::Tcp(z) => z.peers(),
41 }
42 }
43 pub fn allocate<T: Exchangeable>(&mut self, identifier: usize) -> (Vec<Box<dyn Push<T>>>, Box<dyn Pull<T>>) {
45 match self {
46 Allocator::Thread(t) => t.allocate(identifier),
47 Allocator::Process(p) => p.allocate(identifier),
48 Allocator::Tcp(z) => z.allocate(identifier),
49 }
50 }
51 pub fn broadcast<T: Exchangeable+Clone>(&mut self, identifier: usize) -> (Box<dyn Push<T>>, Box<dyn Pull<T>>) {
53 match self {
54 Allocator::Thread(t) => t.broadcast(identifier),
55 Allocator::Process(p) => p.broadcast(identifier),
56 Allocator::Tcp(z) => z.broadcast(identifier),
57 }
58 }
59 pub fn receive(&mut self) {
61 match self {
62 Allocator::Thread(t) => t.receive(),
63 Allocator::Process(p) => p.receive(),
64 Allocator::Tcp(z) => z.receive(),
65 }
66 }
67 pub fn release(&mut self) {
69 match self {
70 Allocator::Thread(t) => t.release(),
71 Allocator::Process(p) => p.release(),
72 Allocator::Tcp(z) => z.release(),
73 }
74 }
75 pub fn events(&self) -> &Rc<RefCell<Vec<usize>>> {
77 match self {
78 Allocator::Thread(ref t) => t.events(),
79 Allocator::Process(ref p) => p.events(),
80 Allocator::Tcp(ref z) => z.events(),
81 }
82 }
83
84 pub fn await_events(&self, duration: Option<std::time::Duration>) {
86 match self {
87 Allocator::Thread(t) => t.await_events(duration),
88 Allocator::Process(p) => p.await_events(duration),
89 Allocator::Tcp(z) => z.await_events(duration),
90 }
91 }
92
93 pub fn pipeline<T: 'static>(&mut self, identifier: usize) ->
98 (crate::allocator::thread::ThreadPusher<T>,
99 crate::allocator::thread::ThreadPuller<T>)
100 {
101 crate::allocator::thread::Thread::new_from(identifier, Rc::clone(self.events()))
102 }
103}
104
105impl Allocate for Allocator {
106 fn index(&self) -> usize { self.index() }
107 fn peers(&self) -> usize { self.peers() }
108 fn allocate<T: Exchangeable>(&mut self, identifier: usize) -> (Vec<Box<dyn Push<T>>>, Box<dyn Pull<T>>) {
109 self.allocate(identifier)
110 }
111 fn broadcast<T: Exchangeable+Clone>(&mut self, identifier: usize) -> (Box<dyn Push<T>>, Box<dyn Pull<T>>) {
112 self.broadcast(identifier)
113 }
114 fn receive(&mut self) { self.receive(); }
115 fn release(&mut self) { self.release(); }
116 fn events(&self) -> &Rc<RefCell<Vec<usize>>> { self.events() }
117 fn await_events(&self, _duration: Option<std::time::Duration>) {
118 match self {
119 Allocator::Thread(t) => t.await_events(_duration),
120 Allocator::Process(p) => p.await_events(_duration),
121 Allocator::Tcp(z) => z.await_events(_duration),
122 }
123 }
124}
125
126
127pub enum AllocatorBuilder {
133 Thread(ThreadBuilder),
135 Process(ProcessBuilder),
137 Tcp(TcpBuilder),
139}
140
141impl AllocateBuilder for AllocatorBuilder {
142 type Allocator = Allocator;
143 fn build(self) -> Allocator {
144 match self {
145 AllocatorBuilder::Thread(t) => Allocator::Thread(t.build()),
146 AllocatorBuilder::Process(p) => Allocator::Process(p.build()),
147 AllocatorBuilder::Tcp(z) => Allocator::Tcp(z.build()),
148 }
149 }
150}