timely/dataflow/operators/
capability.rs1use std::{borrow, error::Error, fmt::Display, ops::Deref};
25use std::rc::Rc;
26use std::cell::RefCell;
27use std::fmt::{self, Debug};
28
29use crate::order::PartialOrder;
30use crate::progress::Timestamp;
31use crate::progress::ChangeBatch;
32use crate::progress::operate::PortConnectivity;
33use crate::scheduling::Activations;
34use crate::dataflow::channels::pullers::counter::ConsumedGuard;
35
36pub trait CapabilityTrait<T: Timestamp> {
38 fn time(&self) -> &T;
40 fn valid_for_output(&self, query_buffer: &Rc<RefCell<ChangeBatch<T>>>) -> bool;
41}
42
43impl<T: Timestamp, C: CapabilityTrait<T>> CapabilityTrait<T> for &C {
44 fn time(&self) -> &T { (**self).time() }
45 fn valid_for_output(&self, query_buffer: &Rc<RefCell<ChangeBatch<T>>>) -> bool {
46 (**self).valid_for_output(query_buffer)
47 }
48}
49impl<T: Timestamp, C: CapabilityTrait<T>> CapabilityTrait<T> for &mut C {
50 fn time(&self) -> &T { (**self).time() }
51 fn valid_for_output(&self, query_buffer: &Rc<RefCell<ChangeBatch<T>>>) -> bool {
52 (**self).valid_for_output(query_buffer)
53 }
54}
55
56pub struct Capability<T: Timestamp> {
63 time: T,
64 internal: Rc<RefCell<ChangeBatch<T>>>,
65}
66
67impl<T: Timestamp> CapabilityTrait<T> for Capability<T> {
68 fn time(&self) -> &T { &self.time }
69 fn valid_for_output(&self, query_buffer: &Rc<RefCell<ChangeBatch<T>>>) -> bool {
70 Rc::ptr_eq(&self.internal, query_buffer)
71 }
72}
73
74impl<T: Timestamp> Capability<T> {
75 pub(crate) fn new(time: T, internal: Rc<RefCell<ChangeBatch<T>>>) -> Self {
78 internal.borrow_mut().update(time.clone(), 1);
79
80 Self {
81 time,
82 internal,
83 }
84 }
85
86 pub fn time(&self) -> &T {
88 &self.time
89 }
90
91 pub fn delayed(&self, new_time: &T) -> Capability<T> {
96 #[cold]
100 #[inline(never)]
101 fn delayed_panic(capability: &dyn Debug, invalid_time: &dyn Debug) -> ! {
102 panic!(
105 "Attempted to delay {:?} to {:?}, which is not beyond the capability's time.",
106 capability,
107 invalid_time,
108 )
109 }
110
111 self.try_delayed(new_time)
112 .unwrap_or_else(|| delayed_panic(self, new_time))
113 }
114
115 pub fn try_delayed(&self, new_time: &T) -> Option<Capability<T>> {
120 if self.time.less_equal(new_time) {
121 Some(Self::new(new_time.clone(), Rc::clone(&self.internal)))
122 } else {
123 None
124 }
125 }
126
127 pub fn downgrade(&mut self, new_time: &T) {
131 #[cold]
135 #[inline(never)]
136 fn downgrade_panic(capability: &dyn Debug, invalid_time: &dyn Debug) -> ! {
137 panic!(
140 "Attempted to downgrade {:?} to {:?}, which is not beyond the capability's time.",
141 capability,
142 invalid_time,
143 )
144 }
145
146 self.try_downgrade(new_time)
147 .unwrap_or_else(|_| downgrade_panic(self, new_time))
148 }
149
150 pub fn try_downgrade(&mut self, new_time: &T) -> Result<(), DowngradeError> {
154 if let Some(new_capability) = self.try_delayed(new_time) {
155 *self = new_capability;
156 Ok(())
157 } else {
158 Err(DowngradeError(()))
159 }
160 }
161}
162
163impl<T: Timestamp> Drop for Capability<T> {
167 fn drop(&mut self) {
168 self.internal.borrow_mut().update(self.time.clone(), -1);
169 }
170}
171
172impl<T: Timestamp> Clone for Capability<T> {
173 fn clone(&self) -> Capability<T> {
174 Self::new(self.time.clone(), Rc::clone(&self.internal))
175 }
176}
177
178impl<T: Timestamp> Deref for Capability<T> {
179 type Target = T;
180
181 fn deref(&self) -> &T {
182 &self.time
183 }
184}
185
186impl<T: Timestamp> Debug for Capability<T> {
187 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
188 f.debug_struct("Capability")
189 .field("time", &self.time)
190 .field("internal", &"...")
191 .finish()
192 }
193}
194
195impl<T: Timestamp> PartialEq for Capability<T> {
196 fn eq(&self, other: &Self) -> bool {
197 self.time() == other.time() && Rc::ptr_eq(&self.internal, &other.internal)
198 }
199}
200impl<T: Timestamp> Eq for Capability<T> { }
201
202impl<T: Timestamp> PartialOrder for Capability<T> {
203 fn less_equal(&self, other: &Self) -> bool {
204 self.time().less_equal(other.time()) && Rc::ptr_eq(&self.internal, &other.internal)
205 }
206}
207
208impl<T: Timestamp> ::std::hash::Hash for Capability<T> {
209 fn hash<H: ::std::hash::Hasher>(&self, state: &mut H) {
210 self.time.hash(state);
211 }
212}
213
214#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
217pub struct DowngradeError(());
218
219impl Display for DowngradeError {
220 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
221 f.write_str("could not downgrade the given capability")
222 }
223}
224
225impl Error for DowngradeError {}
226
227type CapabilityUpdates<T> = Rc<RefCell<Vec<Rc<RefCell<ChangeBatch<T>>>>>>;
229
230pub struct InputCapability<T: Timestamp> {
238 internal: CapabilityUpdates<T>,
240 summaries: Rc<RefCell<PortConnectivity<T::Summary>>>,
242 consumed_guard: ConsumedGuard<T>,
244}
245
246impl<T: Timestamp> CapabilityTrait<T> for InputCapability<T> {
247 fn time(&self) -> &T { self.time() }
248 fn valid_for_output(&self, query_buffer: &Rc<RefCell<ChangeBatch<T>>>) -> bool {
249 let summaries_borrow = self.summaries.borrow();
250 let internal_borrow = self.internal.borrow();
251 let result = summaries_borrow.iter_ports().any(|(port, path)| {
253 Rc::ptr_eq(&internal_borrow[port], query_buffer) && path.len() == 1 && path[0] == Default::default()
254 });
255 result
256 }
257}
258
259impl<T: Timestamp> InputCapability<T> {
260 pub(crate) fn new(internal: CapabilityUpdates<T>, summaries: Rc<RefCell<PortConnectivity<T::Summary>>>, guard: ConsumedGuard<T>) -> Self {
263 InputCapability {
264 internal,
265 summaries,
266 consumed_guard: guard,
267 }
268 }
269
270 pub fn time(&self) -> &T {
272 self.consumed_guard.time()
273 }
274
275 pub fn delayed(&self, new_time: &T) -> Capability<T> {
280 self.delayed_for_output(new_time, 0)
281 }
282
283 pub fn delayed_for_output(&self, new_time: &T, output_port: usize) -> Capability<T> {
285 use crate::progress::timestamp::PathSummary;
286 if let Some(path) = self.summaries.borrow().get(output_port) {
287 if path.iter().flat_map(|summary| summary.results_in(self.time())).any(|time| time.less_equal(new_time)) {
288 Capability::new(new_time.clone(), Rc::clone(&self.internal.borrow()[output_port]))
289 } else {
290 panic!("Attempted to delay to a time ({:?}) not greater or equal to the operators input-output summary ({:?}) applied to the capabilities time ({:?})", new_time, path, self.time());
291 }
292 }
293 else {
294 panic!("Attempted to delay a capability for a disconnected output");
295 }
296 }
297
298 pub fn retain(self) -> Capability<T> {
306 self.retain_for_output(0)
307 }
308
309 pub fn retain_for_output(self, output_port: usize) -> Capability<T> {
313 use crate::progress::timestamp::PathSummary;
314 let self_time = self.time().clone();
315 if let Some(path) = self.summaries.borrow().get(output_port) {
316 if path.iter().flat_map(|summary| summary.results_in(&self_time)).any(|time| time.less_equal(&self_time)) {
317 Capability::new(self_time, Rc::clone(&self.internal.borrow()[output_port]))
318 }
319 else {
320 panic!("Attempted to retain a time ({:?}) not greater or equal to the operators input-output summary ({:?}) applied to the capabilities time ({:?})", self_time, path, self_time);
321 }
322 }
323 else {
324 panic!("Attempted to retain a capability for a disconnected output");
325 }
326 }
327}
328
329impl<T: Timestamp> Deref for InputCapability<T> {
330 type Target = T;
331
332 fn deref(&self) -> &T {
333 self.time()
334 }
335}
336
337impl<T: Timestamp> Debug for InputCapability<T> {
338 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
339 f.debug_struct("InputCapability")
340 .field("time", self.time())
341 .field("internal", &"...")
342 .finish()
343 }
344}
345
346#[derive(Clone, Debug)]
348pub struct ActivateCapability<T: Timestamp> {
349 pub(crate) capability: Capability<T>,
350 pub(crate) address: Rc<[usize]>,
351 pub(crate) activations: Rc<RefCell<Activations>>,
352}
353
354impl<T: Timestamp> CapabilityTrait<T> for ActivateCapability<T> {
355 fn time(&self) -> &T { self.capability.time() }
356 fn valid_for_output(&self, query_buffer: &Rc<RefCell<ChangeBatch<T>>>) -> bool {
357 self.capability.valid_for_output(query_buffer)
358 }
359}
360
361impl<T: Timestamp> ActivateCapability<T> {
362 pub fn new(capability: Capability<T>, address: Rc<[usize]>, activations: Rc<RefCell<Activations>>) -> Self {
364 Self {
365 capability,
366 address,
367 activations,
368 }
369 }
370
371 pub fn time(&self) -> &T {
373 self.capability.time()
374 }
375
376 pub fn delayed(&self, time: &T) -> Self {
378 ActivateCapability {
379 capability: self.capability.delayed(time),
380 address: Rc::clone(&self.address),
381 activations: Rc::clone(&self.activations),
382 }
383 }
384
385 pub fn downgrade(&mut self, time: &T) {
387 self.capability.downgrade(time);
388 self.activations.borrow_mut().activate(&self.address);
389 }
390}
391
392impl<T: Timestamp> Drop for ActivateCapability<T> {
393 fn drop(&mut self) {
394 self.activations.borrow_mut().activate(&self.address);
395 }
396}
397
398#[derive(Clone, Debug)]
400pub struct CapabilitySet<T: Timestamp> {
401 elements: Vec<Capability<T>>,
402}
403
404impl<T: Timestamp> CapabilitySet<T> {
405
406 pub fn new() -> Self {
408 Self { elements: Vec::new() }
409 }
410
411 pub fn with_capacity(capacity: usize) -> Self {
413 Self { elements: Vec::with_capacity(capacity) }
414 }
415
416 pub fn from_elem(cap: Capability<T>) -> Self {
445 Self { elements: vec![cap] }
446 }
447
448 pub fn insert(&mut self, capability: Capability<T>) {
450 if !self.elements.iter().any(|c| c.less_equal(&capability)) {
451 self.elements.retain(|c| !capability.less_equal(c));
452 self.elements.push(capability);
453 }
454 }
455
456 pub fn delayed(&self, time: &T) -> Capability<T> {
460 #[cold]
464 #[inline(never)]
465 fn delayed_panic(invalid_time: &dyn Debug) -> ! {
466 panic!(
469 "failed to create a delayed capability, the current set does not \
470 have an element less than or equal to {:?}",
471 invalid_time,
472 )
473 }
474
475 self.try_delayed(time)
476 .unwrap_or_else(|| delayed_panic(time))
477 }
478
479 pub fn try_delayed(&self, time: &T) -> Option<Capability<T>> {
483 self.elements
484 .iter()
485 .find(|capability| capability.time().less_equal(time))
486 .and_then(|capability| capability.try_delayed(time))
487 }
488
489 pub fn downgrade<B, F>(&mut self, frontier: F)
493 where
494 B: borrow::Borrow<T>,
495 F: IntoIterator<Item = B>,
496 {
497 #[cold]
501 #[inline(never)]
502 fn downgrade_panic() -> ! {
503 panic!(
506 "Attempted to downgrade a CapabilitySet with a frontier containing an element \
507 that was not beyond an element within the set"
508 )
509 }
510
511 self.try_downgrade(frontier)
512 .unwrap_or_else(|_| downgrade_panic())
513 }
514
515 pub fn try_downgrade<B, F>(&mut self, frontier: F) -> Result<(), DowngradeError>
523 where
524 B: borrow::Borrow<T>,
525 F: IntoIterator<Item = B>,
526 {
527 let count = self.elements.len();
528 for time in frontier.into_iter() {
529 let capability = self.try_delayed(time.borrow()).ok_or(DowngradeError(()))?;
530 self.elements.push(capability);
531 }
532 self.elements.drain(..count);
533
534 Ok(())
535 }
536}
537
538impl<T> From<Vec<Capability<T>>> for CapabilitySet<T>
539where
540 T: Timestamp,
541{
542 fn from(capabilities: Vec<Capability<T>>) -> Self {
543 let mut this = Self::with_capacity(capabilities.len());
544 for capability in capabilities {
545 this.insert(capability);
546 }
547
548 this
549 }
550}
551
552impl<T: Timestamp> Default for CapabilitySet<T> {
553 fn default() -> Self {
554 Self::new()
555 }
556}
557
558impl<T: Timestamp> Deref for CapabilitySet<T> {
559 type Target=[Capability<T>];
560
561 fn deref(&self) -> &[Capability<T>] {
562 &self.elements
563 }
564}