timely_communication/allocator/
generic.rs

1//! A generic allocator, wrapping known implementors of `Allocate`.
2//!
3//! This type is useful in settings where it is difficult to write code generic in `A: Allocate`,
4//! for example closures whose type arguments must be specified.
5
6use std::rc::Rc;
7use std::cell::RefCell;
8
9use crate::allocator::thread::ThreadBuilder;
10use crate::allocator::process::ProcessBuilder as TypedProcessBuilder;
11use crate::allocator::{Allocate, AllocateBuilder, Exchangeable, Thread, Process};
12use crate::allocator::zero_copy::allocator_process::{ProcessBuilder, ProcessAllocator};
13use crate::allocator::zero_copy::allocator::{TcpBuilder, TcpAllocator};
14
15use crate::{Push, Pull};
16
17/// Enumerates known implementors of `Allocate`.
18/// Passes trait method calls on to members.
19pub enum Generic {
20    /// Intra-thread allocator.
21    Thread(Thread),
22    /// Inter-thread, intra-process allocator.
23    Process(Process),
24    /// Inter-thread, intra-process serializing allocator.
25    ProcessBinary(ProcessAllocator),
26    /// Inter-process allocator.
27    ZeroCopy(TcpAllocator<Process>),
28    /// Inter-process allocator, intra-process serializing allocator.
29    ZeroCopyBinary(TcpAllocator<ProcessAllocator>),
30}
31
32impl Generic {
33    /// The index of the worker out of `(0..self.peers())`.
34    pub fn index(&self) -> usize {
35        match self {
36            Generic::Thread(t) => t.index(),
37            Generic::Process(p) => p.index(),
38            Generic::ProcessBinary(pb) => pb.index(),
39            Generic::ZeroCopy(z) => z.index(),
40            Generic::ZeroCopyBinary(z) => z.index(),
41        }
42    }
43    /// The number of workers.
44    pub fn peers(&self) -> usize {
45        match self {
46            Generic::Thread(t) => t.peers(),
47            Generic::Process(p) => p.peers(),
48            Generic::ProcessBinary(pb) => pb.peers(),
49            Generic::ZeroCopy(z) => z.peers(),
50            Generic::ZeroCopyBinary(z) => z.peers(),
51        }
52    }
53    /// Constructs several send endpoints and one receive endpoint.
54    fn allocate<T: Exchangeable>(&mut self, identifier: usize) -> (Vec<Box<dyn Push<T>>>, Box<dyn Pull<T>>) {
55        match self {
56            Generic::Thread(t) => t.allocate(identifier),
57            Generic::Process(p) => p.allocate(identifier),
58            Generic::ProcessBinary(pb) => pb.allocate(identifier),
59            Generic::ZeroCopy(z) => z.allocate(identifier),
60            Generic::ZeroCopyBinary(z) => z.allocate(identifier),
61        }
62    }
63    /// Constructs several send endpoints and one receive endpoint.
64    fn broadcast<T: Exchangeable+Clone>(&mut self, identifier: usize) -> (Box<dyn Push<T>>, Box<dyn Pull<T>>) {
65        match self {
66            Generic::Thread(t) => t.broadcast(identifier),
67            Generic::Process(p) => p.broadcast(identifier),
68            Generic::ProcessBinary(pb) => pb.broadcast(identifier),
69            Generic::ZeroCopy(z) => z.broadcast(identifier),
70            Generic::ZeroCopyBinary(z) => z.broadcast(identifier),
71        }
72    }
73    /// Perform work before scheduling operators.
74    fn receive(&mut self) {
75        match self {
76            Generic::Thread(t) => t.receive(),
77            Generic::Process(p) => p.receive(),
78            Generic::ProcessBinary(pb) => pb.receive(),
79            Generic::ZeroCopy(z) => z.receive(),
80            Generic::ZeroCopyBinary(z) => z.receive(),
81        }
82    }
83    /// Perform work after scheduling operators.
84    pub fn release(&mut self) {
85        match self {
86            Generic::Thread(t) => t.release(),
87            Generic::Process(p) => p.release(),
88            Generic::ProcessBinary(pb) => pb.release(),
89            Generic::ZeroCopy(z) => z.release(),
90            Generic::ZeroCopyBinary(z) => z.release(),
91        }
92    }
93    fn events(&self) -> &Rc<RefCell<Vec<usize>>> {
94        match self {
95            Generic::Thread(ref t) => t.events(),
96            Generic::Process(ref p) => p.events(),
97            Generic::ProcessBinary(ref pb) => pb.events(),
98            Generic::ZeroCopy(ref z) => z.events(),
99            Generic::ZeroCopyBinary(ref z) => z.events(),
100        }
101    }
102}
103
104impl Allocate for Generic {
105    fn index(&self) -> usize { self.index() }
106    fn peers(&self) -> usize { self.peers() }
107    fn allocate<T: Exchangeable>(&mut self, identifier: usize) -> (Vec<Box<dyn Push<T>>>, Box<dyn Pull<T>>) {
108        self.allocate(identifier)
109    }
110    fn broadcast<T: Exchangeable+Clone>(&mut self, identifier: usize) -> (Box<dyn Push<T>>, Box<dyn Pull<T>>) {
111        self.broadcast(identifier)
112    }
113    fn receive(&mut self) { self.receive(); }
114    fn release(&mut self) { self.release(); }
115    fn events(&self) -> &Rc<RefCell<Vec<usize>>> { self.events() }
116    fn await_events(&self, _duration: Option<std::time::Duration>) {
117        match self {
118            Generic::Thread(t) => t.await_events(_duration),
119            Generic::Process(p) => p.await_events(_duration),
120            Generic::ProcessBinary(pb) => pb.await_events(_duration),
121            Generic::ZeroCopy(z) => z.await_events(_duration),
122            Generic::ZeroCopyBinary(z) => z.await_events(_duration),
123        }
124    }
125}
126
127
128/// Enumerations of constructable implementors of `Allocate`.
129///
130/// The builder variants are meant to be `Send`, so that they can be moved across threads,
131/// whereas the allocator they construct may not. As an example, the `ProcessBinary` type
132/// contains `Rc` wrapped state, and so cannot itself be moved across threads.
133pub enum GenericBuilder {
134    /// Builder for `Thread` allocator.
135    Thread(ThreadBuilder),
136    /// Builder for `Process` allocator.
137    Process(TypedProcessBuilder),
138    /// Builder for `ProcessBinary` allocator.
139    ProcessBinary(ProcessBuilder),
140    /// Builder for `ZeroCopy` allocator.
141    ZeroCopy(TcpBuilder<TypedProcessBuilder>),
142    /// Builder for `ZeroCopyBinary` allocator.
143    ZeroCopyBinary(TcpBuilder<ProcessBuilder>),
144}
145
146impl AllocateBuilder for GenericBuilder {
147    type Allocator = Generic;
148    fn build(self) -> Generic {
149        match self {
150            GenericBuilder::Thread(t) => Generic::Thread(t.build()),
151            GenericBuilder::Process(p) => Generic::Process(p.build()),
152            GenericBuilder::ProcessBinary(pb) => Generic::ProcessBinary(pb.build()),
153            GenericBuilder::ZeroCopy(z) => Generic::ZeroCopy(z.build()),
154            GenericBuilder::ZeroCopyBinary(z) => Generic::ZeroCopyBinary(z.build()),
155        }
156    }
157}