1use std::fmt;
13
14use async_trait::async_trait;
15use chrono::{DateTime, Utc};
16use constraints::Constraints;
17use differential_dataflow::lattice::Lattice;
18use itertools::Itertools;
19use mz_compute_types::ComputeInstanceId;
20use mz_ore::cast::CastLossy;
21use mz_repr::{GlobalId, Timestamp, TimestampManipulation};
22use mz_sql::plan::QueryWhen;
23use mz_sql::session::vars::IsolationLevel;
24use mz_storage_types::sources::Timeline;
25use serde::{Deserialize, Serialize};
26use timely::progress::{Antichain, Timestamp as TimelyTimestamp};
27
28use crate::AdapterError;
29use crate::catalog::CatalogState;
30use crate::coord::Coordinator;
31use crate::coord::id_bundle::CollectionIdBundle;
32use crate::coord::read_policy::ReadHolds;
33use crate::coord::timeline::TimelineContext;
34use crate::session::Session;
35
36#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
38pub enum TimestampContext<T> {
39 TimelineTimestamp {
41 timeline: Timeline,
42 chosen_ts: T,
49 oracle_ts: Option<T>,
53 },
54 NoTimestamp,
56}
57
58impl<T: TimestampManipulation> TimestampContext<T> {
59 pub fn from_timeline_context(
61 chosen_ts: T,
62 oracle_ts: Option<T>,
63 transaction_timeline: Option<Timeline>,
64 timeline_context: &TimelineContext,
65 ) -> TimestampContext<T> {
66 match timeline_context {
67 TimelineContext::TimelineDependent(timeline) => {
68 if let Some(transaction_timeline) = transaction_timeline {
69 assert_eq!(timeline, &transaction_timeline);
70 }
71 Self::TimelineTimestamp {
72 timeline: timeline.clone(),
73 chosen_ts,
74 oracle_ts,
75 }
76 }
77 TimelineContext::TimestampDependent => {
78 Self::TimelineTimestamp {
80 timeline: transaction_timeline.unwrap_or(Timeline::EpochMilliseconds),
81 chosen_ts,
82 oracle_ts,
83 }
84 }
85 TimelineContext::TimestampIndependent => Self::NoTimestamp,
86 }
87 }
88
89 pub fn timeline(&self) -> Option<&Timeline> {
91 self.timeline_timestamp().map(|tt| tt.0)
92 }
93
94 pub fn timestamp(&self) -> Option<&T> {
96 self.timeline_timestamp().map(|tt| tt.1)
97 }
98
99 pub fn timeline_timestamp(&self) -> Option<(&Timeline, &T)> {
101 match self {
102 Self::TimelineTimestamp {
103 timeline,
104 chosen_ts,
105 ..
106 } => Some((timeline, chosen_ts)),
107 Self::NoTimestamp => None,
108 }
109 }
110
111 pub fn timestamp_or_default(&self) -> T {
113 match self {
114 Self::TimelineTimestamp { chosen_ts, .. } => chosen_ts.clone(),
115 Self::NoTimestamp => T::maximum(),
119 }
120 }
121
122 pub fn contains_timestamp(&self) -> bool {
124 self.timestamp().is_some()
125 }
126
127 pub fn antichain(&self) -> Antichain<T> {
129 Antichain::from_elem(self.timestamp_or_default())
130 }
131}
132
133#[async_trait(?Send)]
134impl TimestampProvider for Coordinator {
135 fn compute_read_frontier(
137 &self,
138 instance: ComputeInstanceId,
139 id: GlobalId,
140 ) -> Antichain<Timestamp> {
141 self.controller
142 .compute
143 .collection_frontiers(id, Some(instance))
144 .expect("id does not exist")
145 .read_frontier
146 }
147
148 fn compute_write_frontier(
150 &self,
151 instance: ComputeInstanceId,
152 id: GlobalId,
153 ) -> Antichain<Timestamp> {
154 self.controller
155 .compute
156 .collection_frontiers(id, Some(instance))
157 .expect("id does not exist")
158 .write_frontier
159 }
160
161 fn storage_frontiers(
162 &self,
163 ids: Vec<GlobalId>,
164 ) -> Vec<(GlobalId, Antichain<Timestamp>, Antichain<Timestamp>)> {
165 self.controller
166 .storage
167 .collections_frontiers(ids)
168 .expect("missing collections")
169 }
170
171 fn acquire_read_holds(&self, id_bundle: &CollectionIdBundle) -> ReadHolds<Timestamp> {
172 self.acquire_read_holds(id_bundle)
173 }
174
175 fn catalog_state(&self) -> &CatalogState {
176 self.catalog().state()
177 }
178}
179
180#[derive(Debug, Clone, Serialize, Deserialize)]
183pub struct RawTimestampDetermination<T> {
184 pub timestamp: T,
185 pub constraints: Constraints,
186 pub session_oracle_read_ts: Option<T>,
187}
188
189#[async_trait(?Send)]
190pub trait TimestampProvider {
191 fn compute_read_frontier(
192 &self,
193 instance: ComputeInstanceId,
194 id: GlobalId,
195 ) -> Antichain<Timestamp>;
196 fn compute_write_frontier(
197 &self,
198 instance: ComputeInstanceId,
199 id: GlobalId,
200 ) -> Antichain<Timestamp>;
201
202 fn storage_frontiers(
205 &self,
206 ids: Vec<GlobalId>,
207 ) -> Vec<(GlobalId, Antichain<Timestamp>, Antichain<Timestamp>)>;
208
209 fn catalog_state(&self) -> &CatalogState;
210
211 fn get_timeline(timeline_context: &TimelineContext) -> Option<Timeline> {
212 let timeline = match timeline_context {
213 TimelineContext::TimelineDependent(timeline) => Some(timeline.clone()),
214 TimelineContext::TimestampDependent => Some(Timeline::EpochMilliseconds),
216 TimelineContext::TimestampIndependent => None,
217 };
218
219 timeline
220 }
221
222 fn needs_linearized_read_ts(isolation_level: &IsolationLevel, when: &QueryWhen) -> bool {
229 when.must_advance_to_timeline_ts()
235 || (when.can_advance_to_timeline_ts()
236 && matches!(
237 isolation_level,
238 IsolationLevel::StrictSerializable | IsolationLevel::StrongSessionSerializable
239 ))
240 }
241
242 fn determine_timestamp_via_constraints(
246 session: &Session,
247 read_holds: &ReadHolds<Timestamp>,
248 id_bundle: &CollectionIdBundle,
249 when: &QueryWhen,
250 oracle_read_ts: Option<Timestamp>,
251 real_time_recency_ts: Option<Timestamp>,
252 isolation_level: &IsolationLevel,
253 timeline: &Option<Timeline>,
254 largest_not_in_advance_of_upper: Timestamp,
255 ) -> Result<RawTimestampDetermination<Timestamp>, AdapterError> {
256 use constraints::{Constraints, Preference, Reason};
257
258 let mut session_oracle_read_ts = None;
259 let constraints = {
263 let mut constraints = Constraints::default();
265
266 let since = read_holds.least_valid_read();
272 let storage = id_bundle
273 .storage_ids
274 .iter()
275 .cloned()
276 .collect::<Vec<GlobalId>>();
277 if !storage.is_empty() {
278 constraints
279 .lower
280 .push((since.clone(), Reason::StorageInput(storage)));
281 }
282 let compute = id_bundle
283 .compute_ids
284 .iter()
285 .flat_map(|(key, ids)| ids.iter().map(|id| (*key, *id)))
286 .collect::<Vec<(ComputeInstanceId, GlobalId)>>();
287 if !compute.is_empty() {
288 constraints
289 .lower
290 .push((since.clone(), Reason::ComputeInput(compute)));
291 }
292
293 if let Some(ts) = when.advance_to_timestamp() {
295 constraints
296 .lower
297 .push((Antichain::from_elem(ts), Reason::QueryAsOf));
298 if when.constrains_upper() {
300 constraints
301 .upper
302 .push((Antichain::from_elem(ts), Reason::QueryAsOf));
303 }
304 }
305
306 if let Some(timestamp) = &oracle_read_ts {
311 if isolation_level != &IsolationLevel::StrongSessionSerializable
312 || when.must_advance_to_timeline_ts()
313 {
314 constraints.lower.push((
317 Antichain::from_elem(*timestamp),
318 Reason::IsolationLevel(*isolation_level),
319 ));
320 }
321 }
322
323 if let Some(real_time_recency_ts) = real_time_recency_ts {
325 assert!(
326 session.vars().real_time_recency()
327 && isolation_level == &IsolationLevel::StrictSerializable,
328 "real time recency timestamp should only be supplied when real time recency \
329 is enabled and the isolation level is strict serializable"
330 );
331 constraints.lower.push((
332 Antichain::from_elem(real_time_recency_ts),
333 Reason::RealTimeRecency,
334 ));
335 }
336
337 if isolation_level == &IsolationLevel::StrongSessionSerializable {
339 if let Some(timeline) = &timeline {
340 if let Some(oracle) = session.get_timestamp_oracle(timeline) {
341 let session_ts = oracle.read_ts();
342 constraints.lower.push((
343 Antichain::from_elem(session_ts),
344 Reason::IsolationLevel(*isolation_level),
345 ));
346 session_oracle_read_ts = Some(session_ts);
347 }
348
349 if when.can_advance_to_upper() && when.can_advance_to_timeline_ts() {
359 let mut advance_to = largest_not_in_advance_of_upper;
360 if let Some(oracle_read_ts) = oracle_read_ts {
361 advance_to = std::cmp::min(advance_to, oracle_read_ts);
362 }
363 constraints.lower.push((
364 Antichain::from_elem(advance_to),
365 Reason::IsolationLevel(*isolation_level),
366 ));
367 }
368 }
369 }
370
371 constraints.minimize();
372 constraints
373 };
374
375 let preferences = {
380 if when.can_advance_to_upper()
385 && (isolation_level == &IsolationLevel::Serializable || timeline.is_none())
386 {
387 Preference::FreshestAvailable
388 } else {
389 Preference::StalestValid
390 }
391
392 };
396
397 let constraint_candidate = {
399 let mut candidate = Timestamp::minimum();
400 candidate.advance_by(constraints.lower_bound().borrow());
402 if let Preference::FreshestAvailable = preferences {
405 let mut upper_bound = constraints.upper_bound();
406 upper_bound.insert(largest_not_in_advance_of_upper);
407 candidate.advance_by(upper_bound.borrow());
408 }
409 if !constraints.lower_bound().less_equal(&candidate)
413 || constraints.upper_bound().less_than(&candidate)
414 {
415 return Err(AdapterError::ImpossibleTimestampConstraints {
416 constraints: constraints.display(timeline.as_ref()).to_string(),
417 });
418 } else {
419 candidate
420 }
421 };
422
423 Ok(RawTimestampDetermination {
424 timestamp: constraint_candidate,
425 constraints,
426 session_oracle_read_ts,
427 })
428 }
429
430 fn determine_timestamp_for(
438 &self,
439 session: &Session,
440 id_bundle: &CollectionIdBundle,
441 when: &QueryWhen,
442 timeline_context: &TimelineContext,
443 oracle_read_ts: Option<Timestamp>,
444 real_time_recency_ts: Option<Timestamp>,
445 isolation_level: &IsolationLevel,
446 ) -> Result<(TimestampDetermination<Timestamp>, ReadHolds<Timestamp>), AdapterError> {
447 let read_holds = self.acquire_read_holds(id_bundle);
450
451 let upper = self.least_valid_write(id_bundle);
452
453 Self::determine_timestamp_for_inner(
454 session,
455 id_bundle,
456 when,
457 timeline_context,
458 oracle_read_ts,
459 real_time_recency_ts,
460 isolation_level,
461 read_holds,
462 upper,
463 )
464 }
465
466 fn determine_timestamp_for_inner(
468 session: &Session,
469 id_bundle: &CollectionIdBundle,
470 when: &QueryWhen,
471 timeline_context: &TimelineContext,
472 oracle_read_ts: Option<Timestamp>,
473 real_time_recency_ts: Option<Timestamp>,
474 isolation_level: &IsolationLevel,
475 read_holds: ReadHolds<Timestamp>,
476 upper: Antichain<Timestamp>,
477 ) -> Result<(TimestampDetermination<Timestamp>, ReadHolds<Timestamp>), AdapterError> {
478 let timeline = Self::get_timeline(timeline_context);
479 let largest_not_in_advance_of_upper = Coordinator::largest_not_in_advance_of_upper(&upper);
480 let since = read_holds.least_valid_read();
481
482 if since.is_empty() {
486 let mut unreadable_collections = Vec::new();
488 for (coll_id, hold) in read_holds.storage_holds {
489 if hold.since().is_empty() {
490 unreadable_collections.push(coll_id);
491 }
492 }
493 for ((_instance_id, coll_id), hold) in read_holds.compute_holds {
494 if hold.since().is_empty() {
495 unreadable_collections.push(coll_id);
496 }
497 }
498 return Err(AdapterError::CollectionUnreadable {
499 id: unreadable_collections.into_iter().join(", "),
500 });
501 }
502
503 let raw_determination = Self::determine_timestamp_via_constraints(
504 session,
505 &read_holds,
506 id_bundle,
507 when,
508 oracle_read_ts,
509 real_time_recency_ts,
510 isolation_level,
511 &timeline,
512 largest_not_in_advance_of_upper,
513 )?;
514
515 let timestamp_context = TimestampContext::from_timeline_context(
516 raw_determination.timestamp,
517 oracle_read_ts,
518 timeline,
519 timeline_context,
520 );
521
522 let determination = TimestampDetermination {
523 timestamp_context,
524 since,
525 upper,
526 largest_not_in_advance_of_upper,
527 oracle_read_ts,
528 session_oracle_read_ts: raw_determination.session_oracle_read_ts,
529 real_time_recency_ts,
530 constraints: raw_determination.constraints,
531 };
532
533 Ok((determination, read_holds))
534 }
535
536 fn acquire_read_holds(&self, id_bundle: &CollectionIdBundle) -> ReadHolds<mz_repr::Timestamp>;
539
540 fn least_valid_write(&self, id_bundle: &CollectionIdBundle) -> Antichain<mz_repr::Timestamp> {
545 let mut upper = Antichain::new();
546 {
547 for (_id, _since, collection_upper) in
548 self.storage_frontiers(id_bundle.storage_ids.iter().cloned().collect_vec())
549 {
550 upper.extend(collection_upper);
551 }
552 }
553 {
554 for (instance, compute_ids) in &id_bundle.compute_ids {
555 for id in compute_ids.iter() {
556 upper.extend(self.compute_write_frontier(*instance, *id).into_iter());
557 }
558 }
559 }
560 upper
561 }
562
563 fn greatest_available_read(&self, id_bundle: &CollectionIdBundle) -> Antichain<Timestamp> {
566 let mut frontier = Antichain::new();
567 for t in self.least_valid_write(id_bundle) {
568 frontier.insert(t.step_back().unwrap_or(t));
569 }
570 frontier
571 }
572}
573
574impl Coordinator {
575 pub(crate) async fn oracle_read_ts(
576 &self,
577 session: &Session,
578 timeline_ctx: &TimelineContext,
579 when: &QueryWhen,
580 ) -> Option<Timestamp> {
581 let isolation_level = session.vars().transaction_isolation().clone();
582 let timeline = Coordinator::get_timeline(timeline_ctx);
583 let needs_linearized_read_ts =
584 Coordinator::needs_linearized_read_ts(&isolation_level, when);
585
586 let oracle_read_ts = match timeline {
587 Some(timeline) if needs_linearized_read_ts => {
588 let timestamp_oracle = self.get_timestamp_oracle(&timeline);
589 Some(timestamp_oracle.read_ts().await)
590 }
591 Some(_) | None => None,
592 };
593
594 oracle_read_ts
595 }
596
597 #[mz_ore::instrument(level = "debug")]
601 pub(crate) fn determine_timestamp(
602 &self,
603 session: &Session,
604 id_bundle: &CollectionIdBundle,
605 when: &QueryWhen,
606 compute_instance: ComputeInstanceId,
607 timeline_context: &TimelineContext,
608 oracle_read_ts: Option<Timestamp>,
609 real_time_recency_ts: Option<mz_repr::Timestamp>,
610 ) -> Result<
611 (
612 TimestampDetermination<mz_repr::Timestamp>,
613 ReadHolds<mz_repr::Timestamp>,
614 ),
615 AdapterError,
616 > {
617 let isolation_level = session.vars().transaction_isolation();
618 let (det, read_holds) = self.determine_timestamp_for(
619 session,
620 id_bundle,
621 when,
622 timeline_context,
623 oracle_read_ts,
624 real_time_recency_ts,
625 isolation_level,
626 )?;
627 self.metrics
628 .determine_timestamp
629 .with_label_values(&[
630 match det.respond_immediately() {
631 true => "true",
632 false => "false",
633 },
634 isolation_level.as_str(),
635 &compute_instance.to_string(),
636 ])
637 .inc();
638 if !det.respond_immediately()
639 && isolation_level == &IsolationLevel::StrictSerializable
640 && real_time_recency_ts.is_none()
641 {
642 if let Some(strict) = det.timestamp_context.timestamp() {
644 let (serializable_det, _tmp_read_holds) = self.determine_timestamp_for(
645 session,
646 id_bundle,
647 when,
648 timeline_context,
649 oracle_read_ts,
650 real_time_recency_ts,
651 &IsolationLevel::Serializable,
652 )?;
653
654 if let Some(serializable) = serializable_det.timestamp_context.timestamp() {
655 self.metrics
656 .timestamp_difference_for_strict_serializable_ms
657 .with_label_values(&[compute_instance.to_string().as_str()])
658 .observe(f64::cast_lossy(u64::from(
659 strict.saturating_sub(*serializable),
660 )));
661 }
662 }
663 }
664 Ok((det, read_holds))
665 }
666
667 pub(crate) fn largest_not_in_advance_of_upper(
672 upper: &Antichain<mz_repr::Timestamp>,
673 ) -> mz_repr::Timestamp {
674 if let Some(upper) = upper.as_option() {
679 upper.step_back().unwrap_or_else(Timestamp::minimum)
680 } else {
681 Timestamp::MAX
686 }
687 }
688}
689
690#[derive(Serialize, Deserialize, Debug, Clone)]
692pub struct TimestampDetermination<T> {
693 pub timestamp_context: TimestampContext<T>,
695 pub since: Antichain<T>,
697 pub upper: Antichain<T>,
699 pub largest_not_in_advance_of_upper: T,
701 pub oracle_read_ts: Option<T>,
703 pub session_oracle_read_ts: Option<T>,
705 pub real_time_recency_ts: Option<T>,
707 pub constraints: Constraints,
710}
711
712impl<T: TimestampManipulation> TimestampDetermination<T> {
713 pub fn respond_immediately(&self) -> bool {
714 match &self.timestamp_context {
715 TimestampContext::TimelineTimestamp { chosen_ts, .. } => {
716 !self.upper.less_equal(chosen_ts)
717 }
718 TimestampContext::NoTimestamp => true,
719 }
720 }
721}
722
723#[derive(Clone, Debug, Serialize, Deserialize)]
725pub struct TimestampExplanation<T> {
726 pub determination: TimestampDetermination<T>,
728 pub sources: Vec<TimestampSource<T>>,
730 pub session_wall_time: DateTime<Utc>,
732 pub respond_immediately: bool,
734}
735
736#[derive(Clone, Debug, Serialize, Deserialize)]
737pub struct TimestampSource<T> {
738 pub name: String,
739 pub read_frontier: Vec<T>,
740 pub write_frontier: Vec<T>,
741}
742
743pub trait DisplayableInTimeline {
744 fn fmt(&self, timeline: Option<&Timeline>, f: &mut fmt::Formatter) -> fmt::Result;
745 fn display<'a>(&'a self, timeline: Option<&'a Timeline>) -> DisplayInTimeline<'a, Self> {
746 DisplayInTimeline { t: self, timeline }
747 }
748}
749
750impl DisplayableInTimeline for mz_repr::Timestamp {
751 fn fmt(&self, timeline: Option<&Timeline>, f: &mut fmt::Formatter) -> fmt::Result {
752 if let Some(Timeline::EpochMilliseconds) = timeline {
753 let ts_ms: u64 = self.into();
754 if let Ok(ts_ms) = i64::try_from(ts_ms) {
755 if let Some(ndt) = DateTime::from_timestamp_millis(ts_ms) {
756 return write!(f, "{:13} ({})", self, ndt.format("%Y-%m-%d %H:%M:%S%.3f"));
757 }
758 }
759 }
760 write!(f, "{:13}", self)
761 }
762}
763
764pub struct DisplayInTimeline<'a, T: ?Sized> {
765 t: &'a T,
766 timeline: Option<&'a Timeline>,
767}
768impl<'a, T> fmt::Display for DisplayInTimeline<'a, T>
769where
770 T: DisplayableInTimeline,
771{
772 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
773 self.t.fmt(self.timeline, f)
774 }
775}
776
777impl<'a, T> fmt::Debug for DisplayInTimeline<'a, T>
778where
779 T: DisplayableInTimeline,
780{
781 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
782 fmt::Display::fmt(&self, f)
783 }
784}
785
786impl<T: fmt::Display + fmt::Debug + DisplayableInTimeline + TimestampManipulation> fmt::Display
787 for TimestampExplanation<T>
788{
789 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
790 let timeline = self.determination.timestamp_context.timeline();
791 writeln!(
792 f,
793 " query timestamp: {}",
794 self.determination
795 .timestamp_context
796 .timestamp_or_default()
797 .display(timeline)
798 )?;
799 if let Some(oracle_read_ts) = &self.determination.oracle_read_ts {
800 writeln!(
801 f,
802 " oracle read timestamp: {}",
803 oracle_read_ts.display(timeline)
804 )?;
805 }
806 if let Some(session_oracle_read_ts) = &self.determination.session_oracle_read_ts {
807 writeln!(
808 f,
809 " session oracle read timestamp: {}",
810 session_oracle_read_ts.display(timeline)
811 )?;
812 }
813 if let Some(real_time_recency_ts) = &self.determination.real_time_recency_ts {
814 writeln!(
815 f,
816 " real time recency timestamp: {}",
817 real_time_recency_ts.display(timeline)
818 )?;
819 }
820 writeln!(
821 f,
822 "largest not in advance of upper: {}",
823 self.determination
824 .largest_not_in_advance_of_upper
825 .display(timeline),
826 )?;
827 writeln!(
828 f,
829 " upper:{:?}",
830 self.determination
831 .upper
832 .iter()
833 .map(|t| t.display(timeline))
834 .collect::<Vec<_>>()
835 )?;
836 writeln!(
837 f,
838 " since:{:?}",
839 self.determination
840 .since
841 .iter()
842 .map(|t| t.display(timeline))
843 .collect::<Vec<_>>()
844 )?;
845 writeln!(
846 f,
847 " can respond immediately: {}",
848 self.respond_immediately
849 )?;
850 writeln!(f, " timeline: {:?}", &timeline)?;
851 writeln!(
852 f,
853 " session wall time: {:13} ({})",
854 self.session_wall_time.timestamp_millis(),
855 self.session_wall_time.format("%Y-%m-%d %H:%M:%S%.3f"),
856 )?;
857
858 for source in &self.sources {
859 writeln!(f, "")?;
860 writeln!(f, "source {}:", source.name)?;
861 writeln!(
862 f,
863 " read frontier:{:?}",
864 source
865 .read_frontier
866 .iter()
867 .map(|t| t.display(timeline))
868 .collect::<Vec<_>>()
869 )?;
870 writeln!(
871 f,
872 " write frontier:{:?}",
873 source
874 .write_frontier
875 .iter()
876 .map(|t| t.display(timeline))
877 .collect::<Vec<_>>()
878 )?;
879 }
880
881 writeln!(f, "")?;
882 writeln!(f, "binding constraints:")?;
883 write!(f, "{}", self.determination.constraints.display(timeline))?;
884
885 Ok(())
886 }
887}
888
889mod constraints {
891
892 use core::fmt;
893 use std::fmt::Debug;
894
895 use differential_dataflow::lattice::Lattice;
896 use mz_storage_types::sources::Timeline;
897 use serde::{Deserialize, Serialize};
898 use timely::progress::{Antichain, Timestamp};
899
900 use mz_compute_types::ComputeInstanceId;
901 use mz_repr::GlobalId;
902 use mz_sql::session::vars::IsolationLevel;
903
904 use super::DisplayableInTimeline;
905
906 #[derive(Default, Serialize, Deserialize, Clone)]
919 pub struct Constraints {
920 pub lower: Vec<(Antichain<mz_repr::Timestamp>, Reason)>,
922 pub upper: Vec<(Antichain<mz_repr::Timestamp>, Reason)>,
924 }
925
926 impl DisplayableInTimeline for Constraints {
927 fn fmt(&self, timeline: Option<&Timeline>, f: &mut fmt::Formatter) -> fmt::Result {
928 if !self.lower.is_empty() {
929 writeln!(f, "lower:")?;
930 for (ts, reason) in &self.lower {
931 let ts: Vec<_> = ts
932 .iter()
933 .map(|t| format!("{}", t.display(timeline)))
934 .collect();
935 writeln!(f, " ({}): [{}]", reason, ts.join(", "))?;
936 }
937 }
938 if !self.upper.is_empty() {
939 writeln!(f, "upper:")?;
940 for (ts, reason) in &self.upper {
941 let ts: Vec<_> = ts
942 .iter()
943 .map(|t| format!("{}", t.display(timeline)))
944 .collect();
945 writeln!(f, " ({}): [{}]", reason, ts.join(", "))?;
946 }
947 }
948 Ok(())
949 }
950 }
951
952 impl Debug for Constraints {
953 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
954 self.display(None).fmt(f)?;
955 Ok(())
956 }
957 }
958
959 impl Constraints {
960 pub fn minimize(&mut self) {
970 let lower_frontier = self.lower_bound();
972 self.lower.retain(|(anti, _)| {
974 anti.iter()
975 .any(|time| lower_frontier.elements().contains(time))
976 });
977
978 let upper_frontier = self.upper_bound();
980 self.upper.retain(|(anti, _)| {
982 anti.iter()
983 .any(|time| upper_frontier.elements().contains(time))
984 });
985 }
986
987 pub fn lower_bound(&self) -> Antichain<mz_repr::Timestamp> {
989 let mut lower = Antichain::from_elem(mz_repr::Timestamp::minimum());
990 for (anti, _) in self.lower.iter() {
991 lower = lower.join(anti);
992 }
993 lower
994 }
995 pub fn upper_bound(&self) -> Antichain<mz_repr::Timestamp> {
997 self.upper
998 .iter()
999 .flat_map(|(anti, _)| anti.iter())
1000 .cloned()
1001 .collect()
1002 }
1003 }
1004
1005 #[derive(Serialize, Deserialize, Clone)]
1007 pub enum Reason {
1008 ComputeInput(Vec<(ComputeInstanceId, GlobalId)>),
1012 StorageInput(Vec<GlobalId>),
1014 IsolationLevel(IsolationLevel),
1016 RealTimeRecency,
1018 QueryAsOf,
1020 }
1021
1022 impl fmt::Display for Reason {
1023 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1024 match self {
1025 Reason::ComputeInput(ids) => {
1026 let formatted: Vec<_> =
1027 ids.iter().map(|(c, g)| format!("({}, {})", c, g)).collect();
1028 write!(f, "Indexed inputs: [{}]", formatted.join(", "))
1029 }
1030 Reason::StorageInput(ids) => {
1031 let formatted: Vec<_> = ids.iter().map(|g| format!("{}", g)).collect();
1032 write!(f, "Storage inputs: [{}]", formatted.join(", "))
1033 }
1034 Reason::IsolationLevel(level) => {
1035 write!(f, "Isolation level: {:?}", level)
1036 }
1037 Reason::RealTimeRecency => {
1038 write!(f, "Real-time recency")
1039 }
1040 Reason::QueryAsOf => {
1041 write!(f, "Query's AS OF")
1042 }
1043 }
1044 }
1045 }
1046
1047 pub enum Preference {
1050 FreshestAvailable,
1063 StalestValid,
1069 }
1070}