timely/dataflow/operators/
capability.rs1use std::{borrow, error::Error, fmt::Display, ops::Deref};
25use std::rc::Rc;
26use std::cell::RefCell;
27use std::fmt::{self, Debug};
28
29use crate::order::PartialOrder;
30use crate::progress::Timestamp;
31use crate::progress::ChangeBatch;
32use crate::progress::operate::PortConnectivity;
33use crate::scheduling::Activations;
34use crate::dataflow::channels::pullers::counter::ConsumedGuard;
35
36pub trait CapabilityTrait<T: Timestamp> {
38 fn time(&self) -> &T;
40 fn valid_for_output(&self, query_buffer: &Rc<RefCell<ChangeBatch<T>>>, port: usize) -> bool;
42}
43
44impl<T: Timestamp, C: CapabilityTrait<T>> CapabilityTrait<T> for &C {
45 fn time(&self) -> &T { (**self).time() }
46 fn valid_for_output(&self, query_buffer: &Rc<RefCell<ChangeBatch<T>>>, port: usize) -> bool {
47 (**self).valid_for_output(query_buffer, port)
48 }
49}
50impl<T: Timestamp, C: CapabilityTrait<T>> CapabilityTrait<T> for &mut C {
51 fn time(&self) -> &T { (**self).time() }
52 fn valid_for_output(&self, query_buffer: &Rc<RefCell<ChangeBatch<T>>>, port: usize) -> bool {
53 (**self).valid_for_output(query_buffer, port)
54 }
55}
56
57pub struct Capability<T: Timestamp> {
64 time: T,
65 internal: Rc<RefCell<ChangeBatch<T>>>,
66}
67
68impl<T: Timestamp> CapabilityTrait<T> for Capability<T> {
69 fn time(&self) -> &T { &self.time }
70 fn valid_for_output(&self, query_buffer: &Rc<RefCell<ChangeBatch<T>>>, _port: usize) -> bool {
71 Rc::ptr_eq(&self.internal, query_buffer)
72 }
73}
74
75impl<T: Timestamp> Capability<T> {
76 pub(crate) fn new(time: T, internal: Rc<RefCell<ChangeBatch<T>>>) -> Self {
79 internal.borrow_mut().update(time.clone(), 1);
80
81 Self {
82 time,
83 internal,
84 }
85 }
86
87 pub fn time(&self) -> &T {
89 &self.time
90 }
91
92 pub fn delayed(&self, new_time: &T) -> Capability<T> {
97 #[cold]
101 #[inline(never)]
102 fn delayed_panic(capability: &dyn Debug, invalid_time: &dyn Debug) -> ! {
103 panic!(
106 "Attempted to delay {:?} to {:?}, which is not beyond the capability's time.",
107 capability,
108 invalid_time,
109 )
110 }
111
112 self.try_delayed(new_time)
113 .unwrap_or_else(|| delayed_panic(self, new_time))
114 }
115
116 pub fn try_delayed(&self, new_time: &T) -> Option<Capability<T>> {
121 if self.time.less_equal(new_time) {
122 Some(Self::new(new_time.clone(), Rc::clone(&self.internal)))
123 } else {
124 None
125 }
126 }
127
128 pub fn downgrade(&mut self, new_time: &T) {
132 #[cold]
136 #[inline(never)]
137 fn downgrade_panic(capability: &dyn Debug, invalid_time: &dyn Debug) -> ! {
138 panic!(
141 "Attempted to downgrade {:?} to {:?}, which is not beyond the capability's time.",
142 capability,
143 invalid_time,
144 )
145 }
146
147 self.try_downgrade(new_time)
148 .unwrap_or_else(|_| downgrade_panic(self, new_time))
149 }
150
151 pub fn try_downgrade(&mut self, new_time: &T) -> Result<(), DowngradeError> {
155 if let Some(new_capability) = self.try_delayed(new_time) {
156 *self = new_capability;
157 Ok(())
158 } else {
159 Err(DowngradeError(()))
160 }
161 }
162}
163
164impl<T: Timestamp> Drop for Capability<T> {
168 fn drop(&mut self) {
169 self.internal.borrow_mut().update(self.time.clone(), -1);
170 }
171}
172
173impl<T: Timestamp> Clone for Capability<T> {
174 fn clone(&self) -> Capability<T> {
175 Self::new(self.time.clone(), Rc::clone(&self.internal))
176 }
177}
178
179impl<T: Timestamp> Deref for Capability<T> {
180 type Target = T;
181
182 fn deref(&self) -> &T {
183 &self.time
184 }
185}
186
187impl<T: Timestamp> Debug for Capability<T> {
188 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
189 f.debug_struct("Capability")
190 .field("time", &self.time)
191 .field("internal", &"...")
192 .finish()
193 }
194}
195
196impl<T: Timestamp> PartialEq for Capability<T> {
197 fn eq(&self, other: &Self) -> bool {
198 self.time() == other.time() && Rc::ptr_eq(&self.internal, &other.internal)
199 }
200}
201impl<T: Timestamp> Eq for Capability<T> { }
202
203impl<T: Timestamp> PartialOrder for Capability<T> {
204 fn less_equal(&self, other: &Self) -> bool {
205 self.time().less_equal(other.time()) && Rc::ptr_eq(&self.internal, &other.internal)
206 }
207}
208
209impl<T: Timestamp+::std::hash::Hash> ::std::hash::Hash for Capability<T> {
210 fn hash<H: ::std::hash::Hasher>(&self, state: &mut H) {
211 self.time.hash(state);
212 }
213}
214
215#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
218pub struct DowngradeError(());
219
220impl Display for DowngradeError {
221 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
222 f.write_str("could not downgrade the given capability")
223 }
224}
225
226impl Error for DowngradeError {}
227
228type CapabilityUpdates<T> = Rc<RefCell<Vec<Rc<RefCell<ChangeBatch<T>>>>>>;
230
231pub struct InputCapability<T: Timestamp> {
239 internal: CapabilityUpdates<T>,
241 summaries: Rc<RefCell<PortConnectivity<T::Summary>>>,
243 consumed_guard: ConsumedGuard<T>,
245}
246
247impl<T: Timestamp> CapabilityTrait<T> for InputCapability<T> {
248 fn time(&self) -> &T { self.time() }
249 fn valid_for_output(&self, query_buffer: &Rc<RefCell<ChangeBatch<T>>>, port: usize) -> bool {
250 let summaries_borrow = self.summaries.borrow();
251 let internal_borrow = self.internal.borrow();
252 Rc::ptr_eq(&internal_borrow[port], query_buffer) &&
254 summaries_borrow.get(port).map_or(false, |path| path.elements() == [Default::default()])
255 }
256}
257
258impl<T: Timestamp> InputCapability<T> {
259 pub(crate) fn new(internal: CapabilityUpdates<T>, summaries: Rc<RefCell<PortConnectivity<T::Summary>>>, guard: ConsumedGuard<T>) -> Self {
262 InputCapability {
263 internal,
264 summaries,
265 consumed_guard: guard,
266 }
267 }
268
269 pub fn time(&self) -> &T {
271 self.consumed_guard.time()
272 }
273
274 pub fn delayed(&self, new_time: &T) -> Capability<T> {
279 self.delayed_for_output(new_time, 0)
280 }
281
282 pub fn delayed_for_output(&self, new_time: &T, output_port: usize) -> Capability<T> {
284 use crate::progress::timestamp::PathSummary;
285 if let Some(path) = self.summaries.borrow().get(output_port) {
286 if path.iter().flat_map(|summary| summary.results_in(self.time())).any(|time| time.less_equal(new_time)) {
287 Capability::new(new_time.clone(), Rc::clone(&self.internal.borrow()[output_port]))
288 } else {
289 panic!("Attempted to delay to a time ({:?}) not greater or equal to the operators input-output summary ({:?}) applied to the capabilities time ({:?})", new_time, path, self.time());
290 }
291 }
292 else {
293 panic!("Attempted to delay a capability for a disconnected output");
294 }
295 }
296
297 pub fn retain(self) -> Capability<T> {
305 self.retain_for_output(0)
306 }
307
308 pub fn retain_for_output(self, output_port: usize) -> Capability<T> {
312 use crate::progress::timestamp::PathSummary;
313 let self_time = self.time().clone();
314 if let Some(path) = self.summaries.borrow().get(output_port) {
315 if path.iter().flat_map(|summary| summary.results_in(&self_time)).any(|time| time.less_equal(&self_time)) {
316 Capability::new(self_time, Rc::clone(&self.internal.borrow()[output_port]))
317 }
318 else {
319 panic!("Attempted to retain a time ({:?}) not greater or equal to the operators input-output summary ({:?}) applied to the capabilities time ({:?})", self_time, path, self_time);
320 }
321 }
322 else {
323 panic!("Attempted to retain a capability for a disconnected output");
324 }
325 }
326}
327
328impl<T: Timestamp> Deref for InputCapability<T> {
329 type Target = T;
330
331 fn deref(&self) -> &T {
332 self.time()
333 }
334}
335
336impl<T: Timestamp> Debug for InputCapability<T> {
337 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
338 f.debug_struct("InputCapability")
339 .field("time", self.time())
340 .field("internal", &"...")
341 .finish()
342 }
343}
344
345#[derive(Clone, Debug)]
347pub struct ActivateCapability<T: Timestamp> {
348 pub(crate) capability: Capability<T>,
349 pub(crate) address: Rc<[usize]>,
350 pub(crate) activations: Rc<RefCell<Activations>>,
351}
352
353impl<T: Timestamp> CapabilityTrait<T> for ActivateCapability<T> {
354 fn time(&self) -> &T { self.capability.time() }
355 fn valid_for_output(&self, query_buffer: &Rc<RefCell<ChangeBatch<T>>>, port: usize) -> bool {
356 self.capability.valid_for_output(query_buffer, port)
357 }
358}
359
360impl<T: Timestamp> ActivateCapability<T> {
361 pub fn new(capability: Capability<T>, address: Rc<[usize]>, activations: Rc<RefCell<Activations>>) -> Self {
363 Self {
364 capability,
365 address,
366 activations,
367 }
368 }
369
370 pub fn time(&self) -> &T {
372 self.capability.time()
373 }
374
375 pub fn delayed(&self, time: &T) -> Self {
377 ActivateCapability {
378 capability: self.capability.delayed(time),
379 address: Rc::clone(&self.address),
380 activations: Rc::clone(&self.activations),
381 }
382 }
383
384 pub fn downgrade(&mut self, time: &T) {
386 self.capability.downgrade(time);
387 self.activations.borrow_mut().activate(&self.address);
388 }
389}
390
391impl<T: Timestamp> Drop for ActivateCapability<T> {
392 fn drop(&mut self) {
393 self.activations.borrow_mut().activate(&self.address);
394 }
395}
396
397#[derive(Clone, Debug)]
399pub struct CapabilitySet<T: Timestamp> {
400 elements: Vec<Capability<T>>,
401}
402
403impl<T: Timestamp> CapabilitySet<T> {
404
405 pub fn new() -> Self {
407 Self { elements: Vec::new() }
408 }
409
410 pub fn with_capacity(capacity: usize) -> Self {
412 Self { elements: Vec::with_capacity(capacity) }
413 }
414
415 pub fn from_elem(cap: Capability<T>) -> Self {
443 Self { elements: vec![cap] }
444 }
445
446 pub fn insert(&mut self, capability: Capability<T>) {
448 if !self.elements.iter().any(|c| c.less_equal(&capability)) {
449 self.elements.retain(|c| !capability.less_equal(c));
450 self.elements.push(capability);
451 }
452 }
453
454 pub fn delayed(&self, time: &T) -> Capability<T> {
458 #[cold]
462 #[inline(never)]
463 fn delayed_panic(invalid_time: &dyn Debug) -> ! {
464 panic!(
467 "failed to create a delayed capability, the current set does not \
468 have an element less than or equal to {:?}",
469 invalid_time,
470 )
471 }
472
473 self.try_delayed(time)
474 .unwrap_or_else(|| delayed_panic(time))
475 }
476
477 pub fn try_delayed(&self, time: &T) -> Option<Capability<T>> {
481 self.elements
482 .iter()
483 .find(|capability| capability.time().less_equal(time))
484 .and_then(|capability| capability.try_delayed(time))
485 }
486
487 pub fn downgrade<B, F>(&mut self, frontier: F)
491 where
492 B: borrow::Borrow<T>,
493 F: IntoIterator<Item = B>,
494 {
495 #[cold]
499 #[inline(never)]
500 fn downgrade_panic() -> ! {
501 panic!(
504 "Attempted to downgrade a CapabilitySet with a frontier containing an element \
505 that was not beyond an element within the set"
506 )
507 }
508
509 self.try_downgrade(frontier)
510 .unwrap_or_else(|_| downgrade_panic())
511 }
512
513 pub fn try_downgrade<B, F>(&mut self, frontier: F) -> Result<(), DowngradeError>
521 where
522 B: borrow::Borrow<T>,
523 F: IntoIterator<Item = B>,
524 {
525 let count = self.elements.len();
526 for time in frontier.into_iter() {
527 let capability = self.try_delayed(time.borrow()).ok_or(DowngradeError(()))?;
528 self.elements.push(capability);
529 }
530 self.elements.drain(..count);
531
532 Ok(())
533 }
534}
535
536impl<T> From<Vec<Capability<T>>> for CapabilitySet<T>
537where
538 T: Timestamp,
539{
540 fn from(capabilities: Vec<Capability<T>>) -> Self {
541 let mut this = Self::with_capacity(capabilities.len());
542 for capability in capabilities {
543 this.insert(capability);
544 }
545
546 this
547 }
548}
549
550impl<T: Timestamp> Default for CapabilitySet<T> {
551 fn default() -> Self {
552 Self::new()
553 }
554}
555
556impl<T: Timestamp> Deref for CapabilitySet<T> {
557 type Target=[Capability<T>];
558
559 fn deref(&self) -> &[Capability<T>] {
560 &self.elements
561 }
562}