Skip to main content

timely_communication/allocator/zero_copy/
spill.rs

1//! Spill strategies and policies for `MergeQueue` entries under memory pressure.
2//!
3//! Three traits compose here:
4//!
5//! - [`SpillPolicy`] decides *whether and how* a queue should be reshaped at
6//!   each `extend`. It is handed the raw `VecDeque<QueueEntry>` under the
7//!   queue's mutex and may replace entries freely.
8//! - [`BytesSpill`] decides *where* bytes go when a policy chooses to spill.
9//!   Pluggable: file, object store, mlock pool, in-memory mock for tests.
10//! - [`BytesFetch`] is the handle returned by a `BytesSpill`; it reads the
11//!   spilled bytes back, consuming itself in the process.
12//!
13//! The shipped [`threshold::Threshold`] pairs a `BytesSpill` strategy with
14//! threshold/reserve/budget knobs and encodes the "spill the middle of the
15//! queue when resident bytes get too large" heuristic. Other policies can
16//! make entirely different decisions (memory-pressure-driven, periodic,
17//! manual trigger, adaptive) using the same strategies.
18
19use std::collections::VecDeque;
20use std::sync::Arc;
21
22use timely_bytes::arc::Bytes;
23
24use super::bytes_exchange::QueueEntry;
25
26/// A function that produces pairs of writer and reader [`SpillPolicy`]s.
27///
28/// This type is the entry point to spilling, and the two returned policies
29/// contain the opinions about how to handle excess data for an instance of
30/// a `MergeQueue`.
31pub type SpillPolicyFn = Arc<dyn Fn() -> (Box<dyn SpillPolicy>, Box<dyn SpillPolicy>) + Send + Sync>;
32
33/// Inspects and optionally rewrites a `MergeQueue`'s entries.
34pub trait SpillPolicy: Send {
35    /// Optionally transforms some (ranges of) queue entries.
36    ///
37    /// This trait is used both for spilling and rehydrating, and just acts
38    /// on the list of queue entries, rewriting ranges of them as it sees fit.
39    /// This is intented for spilling data to secondary storage, but can also
40    /// be used for compression, or other mechanisms to reduce resource load.
41    fn apply(&mut self, queue: &mut VecDeque<QueueEntry>);
42}
43
44/// A type that can convert runs of bytes into runs of boxed bytes retrieval.
45pub trait BytesSpill: Send {
46    /// Move entries from `chunks` into `handles`, spilling each to backing storage.
47    ///
48    /// The implementor should drain from `chunks` and push to `handles`as it goes;
49    /// on failure it may stop partway, leaving the data in a consistent state that
50    /// will be retried in the future. If it cannot leave the lists in a consistent
51    /// state it should panic.
52    fn spill(&mut self, chunks: &mut Vec<Bytes>, handles: &mut Vec<Box<dyn BytesFetch>>);
53}
54
55/// A consume-once handle to bytes previously written via a [`BytesSpill`].
56pub trait BytesFetch: Send {
57    /// Consume the handle and return the spilled payload as `Bytes`.
58    ///
59    /// On failure, the handle is returned so the caller can retry later.
60    fn fetch(self: Box<Self>) -> Result<Vec<Bytes>, Box<dyn BytesFetch>>;
61}
62
63/// Writer-side spill policy: threshold-based, spills the middle of the queue.
64pub mod threshold {
65    use super::*;
66
67    /// A threshold-based [`SpillPolicy`]: when a queue's resident-byte count
68    /// exceeds `head_reserve_bytes + threshold_bytes`, spill all entries past
69    /// the head reserve (except the last entry, which stays as the `try_merge`
70    /// target).
71    pub struct Threshold {
72        strategy: Box<dyn BytesSpill>,
73        /// Spillable surplus: spill is considered when resident bytes exceed
74        /// `head_reserve_bytes + threshold_bytes`.
75        pub threshold_bytes: usize,
76        /// Bytes near the head of the queue stay resident, protecting the
77        /// consumer from an immediate page-in stall.
78        pub head_reserve_bytes: usize,
79    }
80
81    impl Threshold {
82        /// Create a new threshold policy with default knobs, dispatching spills
83        /// through `strategy`.
84        pub fn new(strategy: Box<dyn BytesSpill>) -> Self {
85            Threshold {
86                strategy,
87                threshold_bytes:    256 << 20,  // 256 MB
88                head_reserve_bytes:  64 << 20,  //  64 MB
89            }
90        }
91    }
92
93    impl SpillPolicy for Threshold {
94        fn apply(&mut self, queue: &mut VecDeque<QueueEntry>) {
95            let resident: usize = queue.iter().map(|e| match e {
96                QueueEntry::Bytes(b) => b.len(),
97                QueueEntry::Paged(_) => 0,
98            }).sum();
99            if resident <= self.head_reserve_bytes + self.threshold_bytes {
100                return;
101            }
102
103            let head_reserve = self.head_reserve_bytes;
104
105            let mut cumulative: usize = 0;
106            let last_index = queue.len().saturating_sub(1);
107            let mut target_indices: Vec<usize> = Vec::new();
108            let mut target_bytes: Vec<Bytes> = Vec::new();
109            for (i, entry) in queue.iter().enumerate() {
110                if i == last_index { break; }
111                match entry {
112                    QueueEntry::Bytes(b) => {
113                        if cumulative >= head_reserve {
114                            target_indices.push(i);
115                            target_bytes.push(b.clone());
116                        }
117                        cumulative += b.len();
118                    }
119                    QueueEntry::Paged(_) => {}
120                }
121            }
122
123            if target_bytes.is_empty() {
124                return;
125            }
126
127            let mut handles: Vec<Box<dyn BytesFetch>> = Vec::new();
128            self.strategy.spill(&mut target_bytes, &mut handles);
129            // Replace queue entries for however many chunks were spilled.
130            for (i, handle) in target_indices.into_iter().zip(handles) {
131                queue[i] = QueueEntry::Paged(handle);
132            }
133            // Remaining entries in target_bytes (if any) stay Resident.
134        }
135    }
136}
137
138/// Reader-side policy: materializes `Paged` entries near the front.
139pub mod prefetch {
140    use super::*;
141
142    /// A reader-side [`SpillPolicy`] that materializes `Paged` entries near
143    /// the front of the queue up to a byte budget. The writer-side dual of
144    /// [`super::threshold::Threshold`]: the writer pages data out, the reader
145    /// pages data back in — both through `SpillPolicy::apply`.
146    pub struct PrefetchPolicy {
147        /// Maximum bytes to materialize per `apply` invocation.
148        pub budget: usize,
149    }
150
151    impl PrefetchPolicy {
152        /// Create a prefetch policy with the given byte budget.
153        pub fn new(budget: usize) -> Self {
154            PrefetchPolicy { budget }
155        }
156    }
157
158    impl SpillPolicy for PrefetchPolicy {
159        fn apply(&mut self, queue: &mut VecDeque<QueueEntry>) {
160            let mut resident_head = 0;
161            let mut i = 0;
162            while i < queue.len() && resident_head < self.budget {
163                match &queue[i] {
164                    QueueEntry::Bytes(b) => {
165                        resident_head += b.len();
166                        i += 1;
167                    }
168                    QueueEntry::Paged(_) => {
169                        let entry = queue.remove(i).expect("index valid");
170                        if let QueueEntry::Paged(h) = entry {
171                            match h.fetch() {
172                                Ok(fetched) => {
173                                    let n = fetched.len();
174                                    for (j, b) in fetched.into_iter().enumerate() {
175                                        resident_head += b.len();
176                                        queue.insert(i + j, QueueEntry::Bytes(b));
177                                    }
178                                    i += n;
179                                }
180                                Err(h) => {
181                                    // Fetch failed; put the handle back and stop.
182                                    queue.insert(i, QueueEntry::Paged(h));
183                                    break;
184                                }
185                            }
186                        }
187                    }
188                }
189            }
190        }
191    }
192}
193
194// Re-export the key types at the spill module level.
195pub use threshold::Threshold;
196pub use prefetch::PrefetchPolicy;
197
198#[cfg(test)]
199mod tests {
200    use super::*;
201
202    fn bytes_of(data: &[u8]) -> Bytes {
203        timely_bytes::arc::BytesMut::from(data.to_vec()).freeze()
204    }
205
206    struct MockStrategy;
207    struct MockHandle { data: Bytes }
208    impl BytesSpill for MockStrategy {
209        fn spill(&mut self, chunks: &mut Vec<Bytes>, handles: &mut Vec<Box<dyn BytesFetch>>) {
210            handles.extend(chunks.drain(..)
211                .map(|b| Box::new(MockHandle { data: b }) as Box<dyn BytesFetch>));
212        }
213    }
214    impl BytesFetch for MockHandle {
215        fn fetch(self: Box<Self>) -> Result<Vec<Bytes>, Box<dyn BytesFetch>> { Ok(vec![self.data]) }
216    }
217
218    #[test]
219    fn eager_policy_moves_middle_entries() {
220        struct EagerPolicy { strategy: Box<dyn BytesSpill> }
221        impl SpillPolicy for EagerPolicy {
222            fn apply(&mut self, queue: &mut VecDeque<QueueEntry>) {
223                let last = queue.len().saturating_sub(1);
224                let mut indices = Vec::new();
225                let mut bytes = Vec::new();
226                for (i, entry) in queue.iter().enumerate() {
227                    if i == last { break; }
228                    if let QueueEntry::Bytes(b) = entry {
229                        indices.push(i);
230                        bytes.push(b.clone());
231                    }
232                }
233                if bytes.is_empty() { return; }
234                let mut handles = Vec::new();
235                self.strategy.spill(&mut bytes, &mut handles);
236                for (i, h) in indices.into_iter().zip(handles) {
237                    queue[i] = QueueEntry::Paged(h);
238                }
239            }
240        }
241
242        let mut p = EagerPolicy { strategy: Box::new(MockStrategy) };
243        let mut queue: VecDeque<QueueEntry> = VecDeque::new();
244        for i in 0..4 {
245            queue.push_back(QueueEntry::Bytes(bytes_of(&[i as u8; 8])));
246        }
247        p.apply(&mut queue);
248        assert!(matches!(queue[0], QueueEntry::Paged(_)));
249        assert!(matches!(queue[1], QueueEntry::Paged(_)));
250        assert!(matches!(queue[2], QueueEntry::Paged(_)));
251        assert!(matches!(queue[3], QueueEntry::Bytes(_)));
252    }
253
254    #[test]
255    fn merge_queue_spill_roundtrip_mock() {
256        use super::super::bytes_exchange::{MergeQueue, BytesPush, BytesPull};
257
258        let head_reserve = 128;
259        let mut tp = Threshold::new(Box::new(MockStrategy));
260        tp.threshold_bytes = 512;
261        tp.head_reserve_bytes = head_reserve;
262        let writer_policy: Box<dyn SpillPolicy> = Box::new(tp);
263        let reader_policy: Box<dyn SpillPolicy> = Box::new(PrefetchPolicy::new(head_reserve));
264
265        let buzzer = crate::buzzer::Buzzer::default();
266        let (mut writer, mut reader) =
267            MergeQueue::new_pair(buzzer, Some(writer_policy), Some(reader_policy));
268
269        let mut expected: Vec<Vec<u8>> = Vec::new();
270        for i in 0..100 {
271            let data = vec![(i % 251) as u8; 64];
272            expected.push(data.clone());
273            writer.extend(Some(bytes_of(&data)));
274        }
275
276        let mut received: Vec<Bytes> = Vec::new();
277        loop {
278            let before = received.len();
279            reader.drain_into(&mut received);
280            if received.len() == before { break; }
281        }
282
283        let expected_flat: Vec<u8> = expected.into_iter().flatten().collect();
284        let received_flat: Vec<u8> = received.iter().flat_map(|b| b.iter().copied()).collect();
285        assert_eq!(expected_flat, received_flat);
286    }
287}