differential_dataflow/dynamic/
pointstamp.rs1use columnar::Columnar;
15use serde::{Deserialize, Serialize};
16
17#[derive(Hash, Default, Clone, Eq, PartialEq, Ord, PartialOrd, Debug, Serialize, Deserialize, Columnar)]
22#[columnar(derive(Eq, PartialEq, Ord, PartialOrd))]
23pub struct PointStamp<T> {
24 vector: Vec<T>,
26}
27
28impl<T: Timestamp> PartialEq<[T]> for PointStamp<T> {
29 fn eq(&self, other: &[T]) -> bool {
30 self.vector.iter()
31 .zip(other.iter().chain(std::iter::repeat(&T::minimum())))
32 .all(|(t1, t2)| t1.eq(t2))
33 }
34}
35
36impl<T: Timestamp> PartialEq<PointStamp<T>> for [T] {
37 fn eq(&self, other: &PointStamp<T>) -> bool {
38 self.iter()
39 .zip(other.vector.iter().chain(std::iter::repeat(&T::minimum())))
40 .all(|(t1, t2)| t1.eq(t2))
41 }
42}
43
44impl<T: Timestamp> PartialOrder<[T]> for PointStamp<T> {
45 fn less_equal(&self, other: &[T]) -> bool {
46 self.vector.iter()
47 .zip(other.iter().chain(std::iter::repeat(&T::minimum())))
48 .all(|(t1, t2)| t1.less_equal(t2))
49 }
50}
51
52impl<T: Timestamp> PartialOrder<PointStamp<T>> for [T] {
53 fn less_equal(&self, other: &PointStamp<T>) -> bool {
54 self.iter()
55 .zip(other.vector.iter().chain(std::iter::repeat(&T::minimum())))
56 .all(|(t1, t2)| t1.less_equal(t2))
57 }
58}
59
60impl<T: Timestamp> PointStamp<T> {
61 pub fn new(mut vector: Vec<T>) -> Self {
65 while vector.last() == Some(&T::minimum()) {
66 vector.pop();
67 }
68 PointStamp { vector }
69 }
70 pub fn into_vec(self) -> Vec<T> {
76 self.vector
77 }
78}
79
80impl<T> std::ops::Deref for PointStamp<T> {
81 type Target = [T];
82 fn deref(&self) -> &Self::Target {
83 &self.vector
84 }
85}
86
87use timely::order::PartialOrder;
89impl<T: PartialOrder + Timestamp> PartialOrder for PointStamp<T> {
90 fn less_equal(&self, other: &Self) -> bool {
91 self.vector
96 .iter()
97 .zip(other.vector.iter().chain(std::iter::repeat(&T::minimum())))
98 .all(|(t1, t2)| t1.less_equal(t2))
99 }
100}
101
102use timely::progress::timestamp::Refines;
103impl<T: Timestamp> Refines<()> for PointStamp<T> {
104 fn to_inner(_outer: ()) -> Self {
105 Self { vector: Vec::new() }
106 }
107 fn to_outer(self) -> () {
108 ()
109 }
110 fn summarize(_summary: <Self>::Summary) -> () {
111 ()
112 }
113}
114
115use timely::progress::PathSummary;
118
119#[derive(Hash, Default, Clone, Eq, PartialEq, Ord, PartialOrd, Debug, Serialize, Deserialize)]
121pub struct PointStampSummary<TS> {
122 pub retain: Option<usize>,
126 pub actions: Vec<TS>,
131}
132
133impl<T: Timestamp> PathSummary<PointStamp<T>> for PointStampSummary<T::Summary> {
134 fn results_in(&self, timestamp: &PointStamp<T>) -> Option<PointStamp<T>> {
135 let timestamps = if let Some(retain) = self.retain {
137 if retain < timestamp.vector.len() {
138 ×tamp.vector[..retain]
139 } else {
140 ×tamp.vector[..]
141 }
142 } else {
143 ×tamp.vector[..]
144 };
145
146 let mut vector = Vec::with_capacity(std::cmp::max(timestamps.len(), self.actions.len()));
147 let min_len = std::cmp::min(timestamps.len(), self.actions.len());
149 for (action, timestamp) in self.actions.iter().zip(timestamps.iter()) {
150 vector.push(action.results_in(timestamp)?);
151 }
152 for timestamp in timestamps.iter().skip(min_len) {
154 vector.push(timestamp.clone());
155 }
156 for action in self.actions.iter().skip(min_len) {
158 vector.push(action.results_in(&T::minimum())?);
159 }
160
161 Some(PointStamp::new(vector))
162 }
163 fn followed_by(&self, other: &Self) -> Option<Self> {
164 let retain = match (self.retain, other.retain) {
166 (Some(x), Some(y)) => Some(std::cmp::min(x, y)),
167 (Some(x), None) => Some(x),
168 (None, Some(y)) => Some(y),
169 (None, None) => None,
170 };
171
172 let self_actions = if let Some(retain) = other.retain {
174 if retain < self.actions.len() {
175 &self.actions[..retain]
176 } else {
177 &self.actions[..]
178 }
179 } else {
180 &self.actions[..]
181 };
182
183 let mut actions = Vec::with_capacity(std::cmp::max(self_actions.len(), other.actions.len()));
184 let min_len = std::cmp::min(self_actions.len(), other.actions.len());
186 for (action1, action2) in self_actions.iter().zip(other.actions.iter()) {
187 actions.push(action1.followed_by(action2)?);
188 }
189 actions.extend(self_actions.iter().skip(min_len).cloned());
191 actions.extend(other.actions.iter().skip(min_len).cloned());
193
194 Some(Self { retain, actions })
195 }
196}
197
198impl<TS: PartialOrder> PartialOrder for PointStampSummary<TS> {
199 fn less_equal(&self, other: &Self) -> bool {
200 self.retain == other.retain
205 && self.actions.len() <= other.actions.len()
206 && self
207 .actions
208 .iter()
209 .zip(other.actions.iter())
210 .all(|(t1, t2)| t1.less_equal(t2))
211 }
212}
213
214use timely::progress::Timestamp;
216impl<T: Timestamp> Timestamp for PointStamp<T> {
217 fn minimum() -> Self {
218 Self::new(Vec::new())
219 }
220 type Summary = PointStampSummary<T::Summary>;
221}
222
223use crate::lattice::Lattice;
226impl<T: Lattice + Timestamp + Clone> Lattice for PointStamp<T> {
227 fn join(&self, other: &Self) -> Self {
228 let min_len = ::std::cmp::min(self.vector.len(), other.vector.len());
229 let max_len = ::std::cmp::max(self.vector.len(), other.vector.len());
230 let mut vector = Vec::with_capacity(max_len);
231 for index in 0..min_len {
233 vector.push(self.vector[index].join(&other.vector[index]));
234 }
235 for time in &self.vector[min_len..] {
237 vector.push(time.clone());
238 }
239 for time in &other.vector[min_len..] {
240 vector.push(time.clone());
241 }
242 Self::new(vector)
243 }
244 fn meet(&self, other: &Self) -> Self {
245 let min_len = ::std::cmp::min(self.vector.len(), other.vector.len());
246 let mut vector = Vec::with_capacity(min_len);
247 for index in 0..min_len {
249 vector.push(self.vector[index].meet(&other.vector[index]));
250 }
251 Self::new(vector)
253 }
254}
255
256mod columnation {
257 use columnation::{Columnation, Region};
258
259 use crate::dynamic::pointstamp::PointStamp;
260
261 impl<T: Columnation> Columnation for PointStamp<T> {
262 type InnerRegion = PointStampStack<T::InnerRegion>;
263 }
264
265 pub struct PointStampStack<R: Region>(<Vec<R::Item> as Columnation>::InnerRegion)
267 where
268 <R as Region>::Item: Columnation;
269
270 impl<R: Region> Default for PointStampStack<R>
271 where
272 <R as Region>::Item: Columnation
273 {
274 #[inline]
275 fn default() -> Self {
276 Self(Default::default())
277 }
278 }
279
280 impl<R: Region> Region for PointStampStack<R>
281 where
282 <R as Region>::Item: Columnation
283 {
284 type Item = PointStamp<R::Item>;
285
286 #[inline]
287 unsafe fn copy(&mut self, item: &Self::Item) -> Self::Item {
288 Self::Item { vector: self.0.copy(&item.vector) }
289 }
290
291 fn clear(&mut self) {
292 self.0.clear();
293 }
294
295 fn reserve_items<'a, I>(&mut self, items: I) where Self: 'a, I: Iterator<Item=&'a Self::Item> + Clone {
296 self.0.reserve_items(items.map(|x| &x.vector));
297 }
298
299 fn reserve_regions<'a, I>(&mut self, regions: I) where Self: 'a, I: Iterator<Item=&'a Self> + Clone {
300 self.0.reserve_regions(regions.map(|r| &r.0));
301 }
302
303 fn heap_size(&self, callback: impl FnMut(usize, usize)) {
304 self.0.heap_size(callback);
305 }
306 }
307}