differential_dataflow/dynamic/
pointstamp.rs

1//! A timestamp type as in Naiad, where a vector of timestamps of different lengths are comparable.
2//!
3//! This type compares using "standard" tuple logic as if each timestamp were extended indefinitely with minimal elements.
4//!
5//! The path summary for this type allows *run-time* rather than *type-driven* iterative scopes.
6//! Each summary represents some journey within and out of some number of scopes, followed by entry
7//! into and iteration within some other number of scopes.
8//!
9//! As a result, summaries describe some number of trailing coordinates to truncate, and some increments
10//! to the resulting vector. Structurally, the increments can only be to one non-truncated coordinate
11//! (as iteration within a scope requires leaving contained scopes), and then to any number of appended
12//! default coordinates (which is effectively just *setting* the coordinate).
13
14use columnar::Columnar;
15use serde::{Deserialize, Serialize};
16
17/// A sequence of timestamps, partially ordered by the product order.
18///
19/// Sequences of different lengths are compared as if extended indefinitely by `T::minimum()`.
20/// Sequences are guaranteed to be "minimal", and may not end with `T::minimum()` entries.
21#[derive(Hash, Default, Clone, Eq, PartialEq, Ord, PartialOrd, Debug, Serialize, Deserialize, Columnar)]
22#[columnar(derive(Eq, PartialEq, Ord, PartialOrd))]
23pub struct PointStamp<T> {
24    /// A sequence of timestamps corresponding to timestamps in a sequence of nested scopes.
25    vector: Vec<T>,
26}
27
28impl<T: Timestamp> PartialEq<[T]> for PointStamp<T> {
29    fn eq(&self, other: &[T]) -> bool {
30        self.vector.iter()
31            .zip(other.iter().chain(std::iter::repeat(&T::minimum())))
32            .all(|(t1, t2)| t1.eq(t2))
33    }
34}
35
36impl<T: Timestamp> PartialEq<PointStamp<T>> for [T] {
37    fn eq(&self, other: &PointStamp<T>) -> bool {
38        self.iter()
39            .zip(other.vector.iter().chain(std::iter::repeat(&T::minimum())))
40            .all(|(t1, t2)| t1.eq(t2))
41    }
42}
43
44impl<T: Timestamp> PartialOrder<[T]> for PointStamp<T> {
45    fn less_equal(&self, other: &[T]) -> bool {
46        self.vector.iter()
47            .zip(other.iter().chain(std::iter::repeat(&T::minimum())))
48            .all(|(t1, t2)| t1.less_equal(t2))
49    }
50}
51
52impl<T: Timestamp> PartialOrder<PointStamp<T>> for [T] {
53    fn less_equal(&self, other: &PointStamp<T>) -> bool {
54        self.iter()
55            .zip(other.vector.iter().chain(std::iter::repeat(&T::minimum())))
56            .all(|(t1, t2)| t1.less_equal(t2))
57    }
58}
59
60impl<T: Timestamp> PointStamp<T> {
61    /// Create a new sequence.
62    ///
63    /// This method will modify `vector` to ensure it does not end with `T::minimum()`.
64    pub fn new(mut vector: Vec<T>) -> Self {
65        while vector.last() == Some(&T::minimum()) {
66            vector.pop();
67        }
68        PointStamp { vector }
69    }
70    /// Returns the wrapped vector.
71    ///
72    /// This method is the support way to mutate the contents of `self`, by extracting
73    /// the vector and then re-introducing it with `PointStamp::new` to re-establish
74    /// the invariant that the vector not end with `T::minimum`.
75    pub fn into_vec(self) -> Vec<T> {
76        self.vector
77    }
78}
79
80impl<T> std::ops::Deref for PointStamp<T> {
81    type Target = [T];
82    fn deref(&self) -> &Self::Target {
83        &self.vector
84    }
85}
86
87// Implement timely dataflow's `PartialOrder` trait.
88use timely::order::PartialOrder;
89impl<T: PartialOrder + Timestamp> PartialOrder for PointStamp<T> {
90    fn less_equal(&self, other: &Self) -> bool {
91        // Every present coordinate must be less-equal the corresponding coordinate,
92        // where absent corresponding coordinates are `T::minimum()`. Coordinates
93        // absent from `self.vector` are themselves `T::minimum()` and are less-equal
94        // any corresponding coordinate in `other.vector`.
95        self.vector
96            .iter()
97            .zip(other.vector.iter().chain(std::iter::repeat(&T::minimum())))
98            .all(|(t1, t2)| t1.less_equal(t2))
99    }
100}
101
102use timely::progress::timestamp::Refines;
103impl<T: Timestamp> Refines<()> for PointStamp<T> {
104    fn to_inner(_outer: ()) -> Self {
105        Self { vector: Vec::new() }
106    }
107    fn to_outer(self) -> () {
108        ()
109    }
110    fn summarize(_summary: <Self>::Summary) -> () {
111        ()
112    }
113}
114
115// Implement timely dataflow's `PathSummary` trait.
116// This is preparation for the `Timestamp` implementation below.
117use timely::progress::PathSummary;
118
119/// Describes an action on a `PointStamp`: truncation to `length` followed by `actions`.
120#[derive(Hash, Default, Clone, Eq, PartialEq, Ord, PartialOrd, Debug, Serialize, Deserialize)]
121pub struct PointStampSummary<TS> {
122    /// Number of leading coordinates to retain.
123    ///
124    /// A `None` value indicates that all coordinates should be retained.
125    pub retain: Option<usize>,
126    /// Summary actions to apply to all coordinates.
127    ///
128    /// If `actions.len()` is greater than `retain`, a timestamp should be extended by
129    /// `T::minimum()` in order to be subjected to `actions`.
130    pub actions: Vec<TS>,
131}
132
133impl<T: Timestamp> PathSummary<PointStamp<T>> for PointStampSummary<T::Summary> {
134    fn results_in(&self, timestamp: &PointStamp<T>) -> Option<PointStamp<T>> {
135        // Get a slice of timestamp coordinates appropriate for consideration.
136        let timestamps = if let Some(retain) = self.retain {
137            if retain < timestamp.vector.len() {
138                &timestamp.vector[..retain]
139            } else {
140                &timestamp.vector[..]
141            }
142        } else {
143            &timestamp.vector[..]
144        };
145
146        let mut vector = Vec::with_capacity(std::cmp::max(timestamps.len(), self.actions.len()));
147        // Introduce elements where both timestamp and action exist.
148        let min_len = std::cmp::min(timestamps.len(), self.actions.len());
149        for (action, timestamp) in self.actions.iter().zip(timestamps.iter()) {
150            vector.push(action.results_in(timestamp)?);
151        }
152        // Any remaining timestamps should be copied in.
153        for timestamp in timestamps.iter().skip(min_len) {
154            vector.push(timestamp.clone());
155        }
156        // Any remaining actions should be applied to the empty timestamp.
157        for action in self.actions.iter().skip(min_len) {
158            vector.push(action.results_in(&T::minimum())?);
159        }
160
161        Some(PointStamp::new(vector))
162    }
163    fn followed_by(&self, other: &Self) -> Option<Self> {
164        // The output `retain` will be the minimum of the two inputs.
165        let retain = match (self.retain, other.retain) {
166            (Some(x), Some(y)) => Some(std::cmp::min(x, y)),
167            (Some(x), None) => Some(x),
168            (None, Some(y)) => Some(y),
169            (None, None) => None,
170        };
171
172        // The output `actions` will depend on the relative sizes of the input `retain`s.
173        let self_actions = if let Some(retain) = other.retain {
174            if retain < self.actions.len() {
175                &self.actions[..retain]
176            } else {
177                &self.actions[..]
178            }
179        } else {
180            &self.actions[..]
181        };
182
183        let mut actions = Vec::with_capacity(std::cmp::max(self_actions.len(), other.actions.len()));
184        // Introduce actions where both input actions apply.
185        let min_len = std::cmp::min(self_actions.len(), other.actions.len());
186        for (action1, action2) in self_actions.iter().zip(other.actions.iter()) {
187            actions.push(action1.followed_by(action2)?);
188        }
189        // Append any remaining self actions.
190        actions.extend(self_actions.iter().skip(min_len).cloned());
191        // Append any remaining other actions.
192        actions.extend(other.actions.iter().skip(min_len).cloned());
193
194        Some(Self { retain, actions })
195    }
196}
197
198impl<TS: PartialOrder> PartialOrder for PointStampSummary<TS> {
199    fn less_equal(&self, other: &Self) -> bool {
200        // If the `retain`s are not the same, there is some coordinate which
201        // could either be bigger or smaller as the timestamp or the replacement.
202        // In principle, a `T::minimum()` extension could break this rule, and
203        // we could tighten this logic if needed; I think it is fine not to though.
204        self.retain == other.retain
205            && self.actions.len() <= other.actions.len()
206            && self
207                .actions
208                .iter()
209                .zip(other.actions.iter())
210                .all(|(t1, t2)| t1.less_equal(t2))
211    }
212}
213
214// Implement timely dataflow's `Timestamp` trait.
215use timely::progress::Timestamp;
216impl<T: Timestamp> Timestamp for PointStamp<T> {
217    fn minimum() -> Self {
218        Self::new(Vec::new())
219    }
220    type Summary = PointStampSummary<T::Summary>;
221}
222
223// Implement differential dataflow's `Lattice` trait.
224// This extends the `PartialOrder` implementation with additional structure.
225use crate::lattice::Lattice;
226impl<T: Lattice + Timestamp + Clone> Lattice for PointStamp<T> {
227    fn join(&self, other: &Self) -> Self {
228        let min_len = ::std::cmp::min(self.vector.len(), other.vector.len());
229        let max_len = ::std::cmp::max(self.vector.len(), other.vector.len());
230        let mut vector = Vec::with_capacity(max_len);
231        // For coordinates in both inputs, apply `join` to the pair.
232        for index in 0..min_len {
233            vector.push(self.vector[index].join(&other.vector[index]));
234        }
235        // Only one of the two vectors will have remaining elements; copy them.
236        for time in &self.vector[min_len..] {
237            vector.push(time.clone());
238        }
239        for time in &other.vector[min_len..] {
240            vector.push(time.clone());
241        }
242        Self::new(vector)
243    }
244    fn meet(&self, other: &Self) -> Self {
245        let min_len = ::std::cmp::min(self.vector.len(), other.vector.len());
246        let mut vector = Vec::with_capacity(min_len);
247        // For coordinates in both inputs, apply `meet` to the pair.
248        for index in 0..min_len {
249            vector.push(self.vector[index].meet(&other.vector[index]));
250        }
251        // Remaining coordinates are `T::minimum()` in one input, and so in the output.
252        Self::new(vector)
253    }
254}
255
256mod columnation {
257    use columnation::{Columnation, Region};
258
259    use crate::dynamic::pointstamp::PointStamp;
260
261    impl<T: Columnation> Columnation for PointStamp<T> {
262        type InnerRegion = PointStampStack<T::InnerRegion>;
263    }
264
265    /// Stack for PointStamp. Part of Columnation implementation.
266    pub struct PointStampStack<R: Region>(<Vec<R::Item> as Columnation>::InnerRegion)
267    where
268        <R as Region>::Item: Columnation;
269
270    impl<R: Region> Default for PointStampStack<R>
271    where
272        <R as Region>::Item: Columnation
273    {
274        #[inline]
275        fn default() -> Self {
276            Self(Default::default())
277        }
278    }
279
280    impl<R: Region> Region for PointStampStack<R>
281    where
282        <R as Region>::Item: Columnation
283    {
284        type Item = PointStamp<R::Item>;
285
286        #[inline]
287        unsafe fn copy(&mut self, item: &Self::Item) -> Self::Item {
288            Self::Item { vector: self.0.copy(&item.vector) }
289        }
290
291        fn clear(&mut self) {
292            self.0.clear();
293        }
294
295        fn reserve_items<'a, I>(&mut self, items: I) where Self: 'a, I: Iterator<Item=&'a Self::Item> + Clone {
296            self.0.reserve_items(items.map(|x| &x.vector));
297        }
298
299        fn reserve_regions<'a, I>(&mut self, regions: I) where Self: 'a, I: Iterator<Item=&'a Self> + Clone {
300            self.0.reserve_regions(regions.map(|r| &r.0));
301        }
302
303        fn heap_size(&self, callback: impl FnMut(usize, usize)) {
304            self.0.heap_size(callback);
305        }
306    }
307}