Skip to main content

timely_communication/allocator/
generic.rs

1//! A generic allocator, wrapping known implementors of `Allocate`.
2//!
3//! This type is useful in settings where it is difficult to write code generic in `A: Allocate`,
4//! for example closures whose type arguments must be specified.
5
6use std::rc::Rc;
7use std::cell::RefCell;
8
9use crate::allocator::thread::ThreadBuilder;
10use crate::allocator::{Allocate, AllocateBuilder, Exchangeable, Thread, Process, ProcessBuilder};
11use crate::allocator::zero_copy::allocator::{TcpBuilder, TcpAllocator};
12
13use crate::{Push, Pull};
14
15/// Enumerates known implementors of `Allocate`.
16/// Passes trait method calls on to members.
17pub enum Allocator {
18    /// Intra-thread allocator.
19    Thread(Thread),
20    /// Inter-thread, intra-process allocator (in either of two flavors, see `Process`).
21    Process(Process),
22    /// Inter-process allocator (TCP-based, with a `Process` as its intra-process inner).
23    Tcp(TcpAllocator),
24}
25
26impl Allocator {
27    /// The index of the worker out of `(0..self.peers())`.
28    pub fn index(&self) -> usize {
29        match self {
30            Allocator::Thread(t) => t.index(),
31            Allocator::Process(p) => p.index(),
32            Allocator::Tcp(z) => z.index(),
33        }
34    }
35    /// The number of workers.
36    pub fn peers(&self) -> usize {
37        match self {
38            Allocator::Thread(t) => t.peers(),
39            Allocator::Process(p) => p.peers(),
40            Allocator::Tcp(z) => z.peers(),
41        }
42    }
43    /// Constructs several send endpoints and one receive endpoint.
44    pub fn allocate<T: Exchangeable>(&mut self, identifier: usize) -> (Vec<Box<dyn Push<T>>>, Box<dyn Pull<T>>) {
45        match self {
46            Allocator::Thread(t) => t.allocate(identifier),
47            Allocator::Process(p) => p.allocate(identifier),
48            Allocator::Tcp(z) => z.allocate(identifier),
49        }
50    }
51    /// Constructs several send endpoints and one receive endpoint.
52    pub fn broadcast<T: Exchangeable+Clone>(&mut self, identifier: usize) -> (Box<dyn Push<T>>, Box<dyn Pull<T>>) {
53        match self {
54            Allocator::Thread(t) => t.broadcast(identifier),
55            Allocator::Process(p) => p.broadcast(identifier),
56            Allocator::Tcp(z) => z.broadcast(identifier),
57        }
58    }
59    /// Perform work before scheduling operators.
60    pub fn receive(&mut self) {
61        match self {
62            Allocator::Thread(t) => t.receive(),
63            Allocator::Process(p) => p.receive(),
64            Allocator::Tcp(z) => z.receive(),
65        }
66    }
67    /// Perform work after scheduling operators.
68    pub fn release(&mut self) {
69        match self {
70            Allocator::Thread(t) => t.release(),
71            Allocator::Process(p) => p.release(),
72            Allocator::Tcp(z) => z.release(),
73        }
74    }
75    /// Provides access to the shared event queue.
76    pub fn events(&self) -> &Rc<RefCell<Vec<usize>>> {
77        match self {
78            Allocator::Thread(ref t) => t.events(),
79            Allocator::Process(ref p) => p.events(),
80            Allocator::Tcp(ref z) => z.events(),
81        }
82    }
83
84    /// Awaits communication events.
85    pub fn await_events(&self, duration: Option<std::time::Duration>) {
86        match self {
87            Allocator::Thread(t) => t.await_events(duration),
88            Allocator::Process(p) => p.await_events(duration),
89            Allocator::Tcp(z) => z.await_events(duration),
90        }
91    }
92
93    /// Constructs a pipeline channel from the worker to itself.
94    ///
95    /// By default, this method uses the thread-local channel constructor
96    /// based on a shared `VecDeque` which updates the event queue.
97    pub fn pipeline<T: 'static>(&mut self, identifier: usize) ->
98        (crate::allocator::thread::ThreadPusher<T>,
99         crate::allocator::thread::ThreadPuller<T>)
100    {
101        crate::allocator::thread::Thread::new_from(identifier, Rc::clone(self.events()))
102    }
103}
104
105impl Allocate for Allocator {
106    fn index(&self) -> usize { self.index() }
107    fn peers(&self) -> usize { self.peers() }
108    fn allocate<T: Exchangeable>(&mut self, identifier: usize) -> (Vec<Box<dyn Push<T>>>, Box<dyn Pull<T>>) {
109        self.allocate(identifier)
110    }
111    fn broadcast<T: Exchangeable+Clone>(&mut self, identifier: usize) -> (Box<dyn Push<T>>, Box<dyn Pull<T>>) {
112        self.broadcast(identifier)
113    }
114    fn receive(&mut self) { self.receive(); }
115    fn release(&mut self) { self.release(); }
116    fn events(&self) -> &Rc<RefCell<Vec<usize>>> { self.events() }
117    fn await_events(&self, _duration: Option<std::time::Duration>) {
118        match self {
119            Allocator::Thread(t) => t.await_events(_duration),
120            Allocator::Process(p) => p.await_events(_duration),
121            Allocator::Tcp(z) => z.await_events(_duration),
122        }
123    }
124}
125
126
127/// Enumerations of constructable implementors of `Allocate`.
128///
129/// The builder variants are meant to be `Send`, so that they can be moved across threads,
130/// whereas the allocator they construct may not. As an example, the binary `Process` type
131/// contains `Rc` wrapped state, and so cannot itself be moved across threads.
132pub enum AllocatorBuilder {
133    /// Builder for the `Thread` allocator.
134    Thread(ThreadBuilder),
135    /// Builder for a `Process` allocator (in either of two flavors, see `ProcessBuilder`).
136    Process(ProcessBuilder),
137    /// Builder for the `Tcp` (inter-process) allocator.
138    Tcp(TcpBuilder),
139}
140
141impl AllocateBuilder for AllocatorBuilder {
142    type Allocator = Allocator;
143    fn build(self) -> Allocator {
144        match self {
145            AllocatorBuilder::Thread(t) => Allocator::Thread(t.build()),
146            AllocatorBuilder::Process(p) => Allocator::Process(p.build()),
147            AllocatorBuilder::Tcp(z) => Allocator::Tcp(z.build()),
148        }
149    }
150}