1use std::fmt;
13
14use async_trait::async_trait;
15use chrono::{DateTime, Utc};
16use constraints::Constraints;
17use differential_dataflow::lattice::Lattice;
18use itertools::Itertools;
19use mz_compute_types::ComputeInstanceId;
20use mz_ore::cast::CastLossy;
21use mz_repr::{GlobalId, Timestamp, TimestampManipulation};
22use mz_sql::plan::QueryWhen;
23use mz_sql::session::vars::IsolationLevel;
24use mz_storage_types::sources::Timeline;
25use serde::{Deserialize, Serialize};
26use timely::progress::{Antichain, Timestamp as _};
27
28use crate::AdapterError;
29use crate::catalog::CatalogState;
30use crate::coord::Coordinator;
31use crate::coord::id_bundle::CollectionIdBundle;
32use crate::coord::read_policy::ReadHolds;
33use crate::coord::timeline::TimelineContext;
34use crate::session::Session;
35
36#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
38pub enum TimestampContext {
39 TimelineTimestamp {
41 timeline: Timeline,
42 chosen_ts: Timestamp,
49 oracle_ts: Option<Timestamp>,
53 },
54 NoTimestamp,
56}
57
58impl TimestampContext {
59 pub fn from_timeline_context(
61 chosen_ts: Timestamp,
62 oracle_ts: Option<Timestamp>,
63 transaction_timeline: Option<Timeline>,
64 timeline_context: &TimelineContext,
65 ) -> TimestampContext {
66 match timeline_context {
67 TimelineContext::TimelineDependent(timeline) => {
68 if let Some(transaction_timeline) = transaction_timeline {
69 assert_eq!(timeline, &transaction_timeline);
70 }
71 Self::TimelineTimestamp {
72 timeline: timeline.clone(),
73 chosen_ts,
74 oracle_ts,
75 }
76 }
77 TimelineContext::TimestampDependent => {
78 Self::TimelineTimestamp {
80 timeline: transaction_timeline.unwrap_or(Timeline::EpochMilliseconds),
81 chosen_ts,
82 oracle_ts,
83 }
84 }
85 TimelineContext::TimestampIndependent => Self::NoTimestamp,
86 }
87 }
88
89 pub fn timeline(&self) -> Option<&Timeline> {
91 self.timeline_timestamp().map(|tt| tt.0)
92 }
93
94 pub fn timestamp(&self) -> Option<&Timestamp> {
96 self.timeline_timestamp().map(|tt| tt.1)
97 }
98
99 pub fn timeline_timestamp(&self) -> Option<(&Timeline, &Timestamp)> {
101 match self {
102 Self::TimelineTimestamp {
103 timeline,
104 chosen_ts,
105 ..
106 } => Some((timeline, chosen_ts)),
107 Self::NoTimestamp => None,
108 }
109 }
110
111 pub fn timestamp_or_default(&self) -> Timestamp {
113 match self {
114 Self::TimelineTimestamp { chosen_ts, .. } => chosen_ts.clone(),
115 Self::NoTimestamp => Timestamp::maximum(),
119 }
120 }
121
122 pub fn contains_timestamp(&self) -> bool {
124 self.timestamp().is_some()
125 }
126
127 pub fn antichain(&self) -> Antichain<Timestamp> {
129 Antichain::from_elem(self.timestamp_or_default())
130 }
131}
132
133#[async_trait(?Send)]
134impl TimestampProvider for Coordinator {
135 fn compute_read_frontier(
137 &self,
138 instance: ComputeInstanceId,
139 id: GlobalId,
140 ) -> Antichain<Timestamp> {
141 self.controller
142 .compute
143 .collection_frontiers(id, Some(instance))
144 .expect("id does not exist")
145 .read_frontier
146 }
147
148 fn compute_write_frontier(
150 &self,
151 instance: ComputeInstanceId,
152 id: GlobalId,
153 ) -> Antichain<Timestamp> {
154 self.controller
155 .compute
156 .collection_frontiers(id, Some(instance))
157 .expect("id does not exist")
158 .write_frontier
159 }
160
161 fn storage_frontiers(
162 &self,
163 ids: Vec<GlobalId>,
164 ) -> Vec<(GlobalId, Antichain<Timestamp>, Antichain<Timestamp>)> {
165 self.controller
166 .storage
167 .collections_frontiers(ids)
168 .expect("missing collections")
169 }
170
171 fn acquire_read_holds(&self, id_bundle: &CollectionIdBundle) -> ReadHolds {
172 self.acquire_read_holds(id_bundle)
173 }
174
175 fn catalog_state(&self) -> &CatalogState {
176 self.catalog().state()
177 }
178}
179
180#[derive(Debug, Clone, Serialize, Deserialize)]
183pub struct RawTimestampDetermination {
184 pub timestamp: Timestamp,
185 pub constraints: Constraints,
186 pub session_oracle_read_ts: Option<Timestamp>,
187}
188
189#[async_trait(?Send)]
190pub trait TimestampProvider {
191 fn compute_read_frontier(
192 &self,
193 instance: ComputeInstanceId,
194 id: GlobalId,
195 ) -> Antichain<Timestamp>;
196 fn compute_write_frontier(
197 &self,
198 instance: ComputeInstanceId,
199 id: GlobalId,
200 ) -> Antichain<Timestamp>;
201
202 fn storage_frontiers(
205 &self,
206 ids: Vec<GlobalId>,
207 ) -> Vec<(GlobalId, Antichain<Timestamp>, Antichain<Timestamp>)>;
208
209 fn catalog_state(&self) -> &CatalogState;
210
211 fn get_timeline(timeline_context: &TimelineContext) -> Option<Timeline> {
212 let timeline = match timeline_context {
213 TimelineContext::TimelineDependent(timeline) => Some(timeline.clone()),
214 TimelineContext::TimestampDependent => Some(Timeline::EpochMilliseconds),
216 TimelineContext::TimestampIndependent => None,
217 };
218
219 timeline
220 }
221
222 fn needs_linearized_read_ts(isolation_level: &IsolationLevel, when: &QueryWhen) -> bool {
229 when.must_advance_to_timeline_ts()
235 || (when.can_advance_to_timeline_ts()
236 && matches!(
237 isolation_level,
238 IsolationLevel::StrictSerializable | IsolationLevel::StrongSessionSerializable
239 ))
240 }
241
242 fn determine_timestamp_via_constraints(
246 session: &Session,
247 read_holds: &ReadHolds,
248 id_bundle: &CollectionIdBundle,
249 when: &QueryWhen,
250 oracle_read_ts: Option<Timestamp>,
251 real_time_recency_ts: Option<Timestamp>,
252 isolation_level: &IsolationLevel,
253 timeline: &Option<Timeline>,
254 largest_not_in_advance_of_upper: Timestamp,
255 ) -> Result<RawTimestampDetermination, AdapterError> {
256 use constraints::{Constraints, Preference, Reason};
257
258 let mut session_oracle_read_ts = None;
259 let constraints = {
263 let mut constraints = Constraints::default();
265
266 let since = read_holds.least_valid_read();
272 let storage = id_bundle
273 .storage_ids
274 .iter()
275 .cloned()
276 .collect::<Vec<GlobalId>>();
277 if !storage.is_empty() {
278 constraints
279 .lower
280 .push((since.clone(), Reason::StorageInput(storage)));
281 }
282 let compute = id_bundle
283 .compute_ids
284 .iter()
285 .flat_map(|(key, ids)| ids.iter().map(|id| (*key, *id)))
286 .collect::<Vec<(ComputeInstanceId, GlobalId)>>();
287 if !compute.is_empty() {
288 constraints
289 .lower
290 .push((since.clone(), Reason::ComputeInput(compute)));
291 }
292
293 if let Some(ts) = when.advance_to_timestamp() {
295 constraints
296 .lower
297 .push((Antichain::from_elem(ts), Reason::QueryAsOf));
298 if when.constrains_upper() {
300 constraints
301 .upper
302 .push((Antichain::from_elem(ts), Reason::QueryAsOf));
303 }
304 }
305
306 if let Some(timestamp) = &oracle_read_ts {
311 if isolation_level != &IsolationLevel::StrongSessionSerializable
312 || when.must_advance_to_timeline_ts()
313 {
314 constraints.lower.push((
317 Antichain::from_elem(*timestamp),
318 Reason::IsolationLevel(*isolation_level),
319 ));
320 }
321 }
322
323 if let Some(real_time_recency_ts) = real_time_recency_ts {
325 assert!(
326 session.vars().real_time_recency()
327 && isolation_level == &IsolationLevel::StrictSerializable,
328 "real time recency timestamp should only be supplied when real time recency \
329 is enabled and the isolation level is strict serializable"
330 );
331 constraints.lower.push((
332 Antichain::from_elem(real_time_recency_ts),
333 Reason::RealTimeRecency,
334 ));
335 }
336
337 if isolation_level == &IsolationLevel::StrongSessionSerializable {
339 if let Some(timeline) = &timeline {
340 if let Some(oracle) = session.get_timestamp_oracle(timeline) {
341 let session_ts = oracle.read_ts();
342 constraints.lower.push((
343 Antichain::from_elem(session_ts),
344 Reason::IsolationLevel(*isolation_level),
345 ));
346 session_oracle_read_ts = Some(session_ts);
347 }
348
349 if when.can_advance_to_upper() && when.can_advance_to_timeline_ts() {
359 let mut advance_to = largest_not_in_advance_of_upper;
360 if let Some(oracle_read_ts) = oracle_read_ts {
361 advance_to = std::cmp::min(advance_to, oracle_read_ts);
362 }
363 constraints.lower.push((
364 Antichain::from_elem(advance_to),
365 Reason::IsolationLevel(*isolation_level),
366 ));
367 }
368 }
369 }
370
371 constraints.minimize();
372 constraints
373 };
374
375 let preferences = {
380 if when.can_advance_to_upper()
385 && (isolation_level == &IsolationLevel::Serializable || timeline.is_none())
386 {
387 Preference::FreshestAvailable
388 } else {
389 Preference::StalestValid
390 }
391
392 };
396
397 let constraint_candidate = {
399 let mut candidate = Timestamp::minimum();
400 candidate.advance_by(constraints.lower_bound().borrow());
402 if let Preference::FreshestAvailable = preferences {
405 let mut upper_bound = constraints.upper_bound();
406 upper_bound.insert(largest_not_in_advance_of_upper);
407 candidate.advance_by(upper_bound.borrow());
408 }
409 if !constraints.lower_bound().less_equal(&candidate)
413 || constraints.upper_bound().less_than(&candidate)
414 {
415 return Err(AdapterError::ImpossibleTimestampConstraints {
416 constraints: constraints.display(timeline.as_ref()).to_string(),
417 });
418 } else {
419 candidate
420 }
421 };
422
423 Ok(RawTimestampDetermination {
424 timestamp: constraint_candidate,
425 constraints,
426 session_oracle_read_ts,
427 })
428 }
429
430 fn determine_timestamp_for(
438 &self,
439 session: &Session,
440 id_bundle: &CollectionIdBundle,
441 when: &QueryWhen,
442 timeline_context: &TimelineContext,
443 oracle_read_ts: Option<Timestamp>,
444 real_time_recency_ts: Option<Timestamp>,
445 isolation_level: &IsolationLevel,
446 ) -> Result<(TimestampDetermination, ReadHolds), AdapterError> {
447 let read_holds = self.acquire_read_holds(id_bundle);
450
451 let upper = self.least_valid_write(id_bundle);
452
453 Self::determine_timestamp_for_inner(
454 session,
455 id_bundle,
456 when,
457 timeline_context,
458 oracle_read_ts,
459 real_time_recency_ts,
460 isolation_level,
461 read_holds,
462 upper,
463 )
464 }
465
466 fn determine_timestamp_for_inner(
468 session: &Session,
469 id_bundle: &CollectionIdBundle,
470 when: &QueryWhen,
471 timeline_context: &TimelineContext,
472 oracle_read_ts: Option<Timestamp>,
473 real_time_recency_ts: Option<Timestamp>,
474 isolation_level: &IsolationLevel,
475 read_holds: ReadHolds,
476 upper: Antichain<Timestamp>,
477 ) -> Result<(TimestampDetermination, ReadHolds), AdapterError> {
478 let timeline = Self::get_timeline(timeline_context);
479 let largest_not_in_advance_of_upper = Coordinator::largest_not_in_advance_of_upper(&upper);
480 let since = read_holds.least_valid_read();
481
482 if since.is_empty() {
486 let mut unreadable_collections = Vec::new();
488 for (coll_id, hold) in read_holds.storage_holds {
489 if hold.since().is_empty() {
490 unreadable_collections.push(coll_id);
491 }
492 }
493 for ((_instance_id, coll_id), hold) in read_holds.compute_holds {
494 if hold.since().is_empty() {
495 unreadable_collections.push(coll_id);
496 }
497 }
498 return Err(AdapterError::CollectionUnreadable {
499 id: unreadable_collections.into_iter().join(", "),
500 });
501 }
502
503 let raw_determination = Self::determine_timestamp_via_constraints(
504 session,
505 &read_holds,
506 id_bundle,
507 when,
508 oracle_read_ts,
509 real_time_recency_ts,
510 isolation_level,
511 &timeline,
512 largest_not_in_advance_of_upper,
513 )?;
514
515 let timestamp_context = TimestampContext::from_timeline_context(
516 raw_determination.timestamp,
517 oracle_read_ts,
518 timeline,
519 timeline_context,
520 );
521
522 let determination = TimestampDetermination {
523 timestamp_context,
524 since,
525 upper,
526 largest_not_in_advance_of_upper,
527 oracle_read_ts,
528 session_oracle_read_ts: raw_determination.session_oracle_read_ts,
529 real_time_recency_ts,
530 constraints: raw_determination.constraints,
531 };
532
533 Ok((determination, read_holds))
534 }
535
536 fn acquire_read_holds(&self, id_bundle: &CollectionIdBundle) -> ReadHolds;
539
540 fn least_valid_write(&self, id_bundle: &CollectionIdBundle) -> Antichain<mz_repr::Timestamp> {
545 let mut upper = Antichain::new();
546 {
547 for (_id, _since, collection_upper) in
548 self.storage_frontiers(id_bundle.storage_ids.iter().cloned().collect_vec())
549 {
550 upper.extend(collection_upper);
551 }
552 }
553 {
554 for (instance, compute_ids) in &id_bundle.compute_ids {
555 for id in compute_ids.iter() {
556 upper.extend(self.compute_write_frontier(*instance, *id).into_iter());
557 }
558 }
559 }
560 upper
561 }
562
563 fn greatest_available_read(&self, id_bundle: &CollectionIdBundle) -> Antichain<Timestamp> {
566 let mut frontier = Antichain::new();
567 for t in self.least_valid_write(id_bundle) {
568 frontier.insert(t.step_back().unwrap_or(t));
569 }
570 frontier
571 }
572}
573
574impl Coordinator {
575 pub(crate) async fn oracle_read_ts(
576 &self,
577 session: &Session,
578 timeline_ctx: &TimelineContext,
579 when: &QueryWhen,
580 ) -> Option<Timestamp> {
581 let isolation_level = session.vars().transaction_isolation().clone();
582 let timeline = Coordinator::get_timeline(timeline_ctx);
583 let needs_linearized_read_ts =
584 Coordinator::needs_linearized_read_ts(&isolation_level, when);
585
586 let oracle_read_ts = match timeline {
587 Some(timeline) if needs_linearized_read_ts => {
588 let timestamp_oracle = self.get_timestamp_oracle(&timeline);
589 Some(timestamp_oracle.read_ts().await)
590 }
591 Some(_) | None => None,
592 };
593
594 oracle_read_ts
595 }
596
597 #[mz_ore::instrument(level = "debug")]
601 pub(crate) fn determine_timestamp(
602 &self,
603 session: &Session,
604 id_bundle: &CollectionIdBundle,
605 when: &QueryWhen,
606 compute_instance: ComputeInstanceId,
607 timeline_context: &TimelineContext,
608 oracle_read_ts: Option<Timestamp>,
609 real_time_recency_ts: Option<mz_repr::Timestamp>,
610 ) -> Result<(TimestampDetermination, ReadHolds), AdapterError> {
611 let isolation_level = session.vars().transaction_isolation();
612 let (det, read_holds) = self.determine_timestamp_for(
613 session,
614 id_bundle,
615 when,
616 timeline_context,
617 oracle_read_ts,
618 real_time_recency_ts,
619 isolation_level,
620 )?;
621 self.metrics
622 .determine_timestamp
623 .with_label_values(&[
624 match det.respond_immediately() {
625 true => "true",
626 false => "false",
627 },
628 isolation_level.as_str(),
629 &compute_instance.to_string(),
630 ])
631 .inc();
632 if !det.respond_immediately()
633 && isolation_level == &IsolationLevel::StrictSerializable
634 && real_time_recency_ts.is_none()
635 {
636 if let Some(strict) = det.timestamp_context.timestamp() {
638 let (serializable_det, _tmp_read_holds) = self.determine_timestamp_for(
639 session,
640 id_bundle,
641 when,
642 timeline_context,
643 oracle_read_ts,
644 real_time_recency_ts,
645 &IsolationLevel::Serializable,
646 )?;
647
648 if let Some(serializable) = serializable_det.timestamp_context.timestamp() {
649 self.metrics
650 .timestamp_difference_for_strict_serializable_ms
651 .with_label_values(&[compute_instance.to_string().as_str()])
652 .observe(f64::cast_lossy(u64::from(
653 strict.saturating_sub(*serializable),
654 )));
655 }
656 }
657 }
658 Ok((det, read_holds))
659 }
660
661 pub(crate) fn largest_not_in_advance_of_upper(
666 upper: &Antichain<mz_repr::Timestamp>,
667 ) -> mz_repr::Timestamp {
668 if let Some(upper) = upper.as_option() {
673 upper.step_back().unwrap_or_else(Timestamp::minimum)
674 } else {
675 Timestamp::MAX
680 }
681 }
682}
683
684#[derive(Serialize, Deserialize, Debug, Clone)]
686pub struct TimestampDetermination {
687 pub timestamp_context: TimestampContext,
689 pub since: Antichain<Timestamp>,
691 pub upper: Antichain<Timestamp>,
693 pub largest_not_in_advance_of_upper: Timestamp,
695 pub oracle_read_ts: Option<Timestamp>,
697 pub session_oracle_read_ts: Option<Timestamp>,
699 pub real_time_recency_ts: Option<Timestamp>,
701 pub constraints: Constraints,
704}
705
706impl TimestampDetermination {
707 pub fn respond_immediately(&self) -> bool {
708 match &self.timestamp_context {
709 TimestampContext::TimelineTimestamp { chosen_ts, .. } => {
710 !self.upper.less_equal(chosen_ts)
711 }
712 TimestampContext::NoTimestamp => true,
713 }
714 }
715}
716
717#[derive(Clone, Debug, Serialize, Deserialize)]
719pub struct TimestampExplanation {
720 pub determination: TimestampDetermination,
722 pub sources: Vec<TimestampSource>,
724 pub session_wall_time: DateTime<Utc>,
726 pub respond_immediately: bool,
728}
729
730#[derive(Clone, Debug, Serialize, Deserialize)]
731pub struct TimestampSource {
732 pub name: String,
733 pub read_frontier: Vec<Timestamp>,
734 pub write_frontier: Vec<Timestamp>,
735}
736
737pub trait DisplayableInTimeline {
738 fn fmt(&self, timeline: Option<&Timeline>, f: &mut fmt::Formatter) -> fmt::Result;
739 fn display<'a>(&'a self, timeline: Option<&'a Timeline>) -> DisplayInTimeline<'a, Self> {
740 DisplayInTimeline { t: self, timeline }
741 }
742}
743
744impl DisplayableInTimeline for mz_repr::Timestamp {
745 fn fmt(&self, timeline: Option<&Timeline>, f: &mut fmt::Formatter) -> fmt::Result {
746 if let Some(Timeline::EpochMilliseconds) = timeline {
747 let ts_ms: u64 = self.into();
748 if let Ok(ts_ms) = i64::try_from(ts_ms) {
749 if let Some(ndt) = DateTime::from_timestamp_millis(ts_ms) {
750 return write!(f, "{:13} ({})", self, ndt.format("%Y-%m-%d %H:%M:%S%.3f"));
751 }
752 }
753 }
754 write!(f, "{:13}", self)
755 }
756}
757
758pub struct DisplayInTimeline<'a, T: ?Sized> {
759 t: &'a T,
760 timeline: Option<&'a Timeline>,
761}
762impl<'a, T> fmt::Display for DisplayInTimeline<'a, T>
763where
764 T: DisplayableInTimeline,
765{
766 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
767 self.t.fmt(self.timeline, f)
768 }
769}
770
771impl<'a, T> fmt::Debug for DisplayInTimeline<'a, T>
772where
773 T: DisplayableInTimeline,
774{
775 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
776 fmt::Display::fmt(&self, f)
777 }
778}
779
780impl fmt::Display for TimestampExplanation {
781 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
782 let timeline = self.determination.timestamp_context.timeline();
783 writeln!(
784 f,
785 " query timestamp: {}",
786 self.determination
787 .timestamp_context
788 .timestamp_or_default()
789 .display(timeline)
790 )?;
791 if let Some(oracle_read_ts) = &self.determination.oracle_read_ts {
792 writeln!(
793 f,
794 " oracle read timestamp: {}",
795 oracle_read_ts.display(timeline)
796 )?;
797 }
798 if let Some(session_oracle_read_ts) = &self.determination.session_oracle_read_ts {
799 writeln!(
800 f,
801 " session oracle read timestamp: {}",
802 session_oracle_read_ts.display(timeline)
803 )?;
804 }
805 if let Some(real_time_recency_ts) = &self.determination.real_time_recency_ts {
806 writeln!(
807 f,
808 " real time recency timestamp: {}",
809 real_time_recency_ts.display(timeline)
810 )?;
811 }
812 writeln!(
813 f,
814 "largest not in advance of upper: {}",
815 self.determination
816 .largest_not_in_advance_of_upper
817 .display(timeline),
818 )?;
819 writeln!(
820 f,
821 " upper:{:?}",
822 self.determination
823 .upper
824 .iter()
825 .map(|t| t.display(timeline))
826 .collect::<Vec<_>>()
827 )?;
828 writeln!(
829 f,
830 " since:{:?}",
831 self.determination
832 .since
833 .iter()
834 .map(|t| t.display(timeline))
835 .collect::<Vec<_>>()
836 )?;
837 writeln!(
838 f,
839 " can respond immediately: {}",
840 self.respond_immediately
841 )?;
842 writeln!(f, " timeline: {:?}", &timeline)?;
843 writeln!(
844 f,
845 " session wall time: {:13} ({})",
846 self.session_wall_time.timestamp_millis(),
847 self.session_wall_time.format("%Y-%m-%d %H:%M:%S%.3f"),
848 )?;
849
850 for source in &self.sources {
851 writeln!(f, "")?;
852 writeln!(f, "source {}:", source.name)?;
853 writeln!(
854 f,
855 " read frontier:{:?}",
856 source
857 .read_frontier
858 .iter()
859 .map(|t| t.display(timeline))
860 .collect::<Vec<_>>()
861 )?;
862 writeln!(
863 f,
864 " write frontier:{:?}",
865 source
866 .write_frontier
867 .iter()
868 .map(|t| t.display(timeline))
869 .collect::<Vec<_>>()
870 )?;
871 }
872
873 writeln!(f, "")?;
874 writeln!(f, "binding constraints:")?;
875 write!(f, "{}", self.determination.constraints.display(timeline))?;
876
877 Ok(())
878 }
879}
880
881mod constraints {
883
884 use core::fmt;
885 use std::fmt::Debug;
886
887 use differential_dataflow::lattice::Lattice;
888 use mz_storage_types::sources::Timeline;
889 use serde::{Deserialize, Serialize};
890 use timely::progress::{Antichain, Timestamp};
891
892 use mz_compute_types::ComputeInstanceId;
893 use mz_repr::GlobalId;
894 use mz_sql::session::vars::IsolationLevel;
895
896 use super::DisplayableInTimeline;
897
898 #[derive(Default, Serialize, Deserialize, Clone)]
911 pub struct Constraints {
912 pub lower: Vec<(Antichain<mz_repr::Timestamp>, Reason)>,
914 pub upper: Vec<(Antichain<mz_repr::Timestamp>, Reason)>,
916 }
917
918 impl DisplayableInTimeline for Constraints {
919 fn fmt(&self, timeline: Option<&Timeline>, f: &mut fmt::Formatter) -> fmt::Result {
920 if !self.lower.is_empty() {
921 writeln!(f, "lower:")?;
922 for (ts, reason) in &self.lower {
923 let ts: Vec<_> = ts
924 .iter()
925 .map(|t| format!("{}", t.display(timeline)))
926 .collect();
927 writeln!(f, " ({}): [{}]", reason, ts.join(", "))?;
928 }
929 }
930 if !self.upper.is_empty() {
931 writeln!(f, "upper:")?;
932 for (ts, reason) in &self.upper {
933 let ts: Vec<_> = ts
934 .iter()
935 .map(|t| format!("{}", t.display(timeline)))
936 .collect();
937 writeln!(f, " ({}): [{}]", reason, ts.join(", "))?;
938 }
939 }
940 Ok(())
941 }
942 }
943
944 impl Debug for Constraints {
945 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
946 self.display(None).fmt(f)?;
947 Ok(())
948 }
949 }
950
951 impl Constraints {
952 pub fn minimize(&mut self) {
962 let lower_frontier = self.lower_bound();
964 self.lower.retain(|(anti, _)| {
966 anti.iter()
967 .any(|time| lower_frontier.elements().contains(time))
968 });
969
970 let upper_frontier = self.upper_bound();
972 self.upper.retain(|(anti, _)| {
974 anti.iter()
975 .any(|time| upper_frontier.elements().contains(time))
976 });
977 }
978
979 pub fn lower_bound(&self) -> Antichain<mz_repr::Timestamp> {
981 let mut lower = Antichain::from_elem(mz_repr::Timestamp::minimum());
982 for (anti, _) in self.lower.iter() {
983 lower = lower.join(anti);
984 }
985 lower
986 }
987 pub fn upper_bound(&self) -> Antichain<mz_repr::Timestamp> {
989 self.upper
990 .iter()
991 .flat_map(|(anti, _)| anti.iter())
992 .cloned()
993 .collect()
994 }
995 }
996
997 #[derive(Serialize, Deserialize, Clone)]
999 pub enum Reason {
1000 ComputeInput(Vec<(ComputeInstanceId, GlobalId)>),
1004 StorageInput(Vec<GlobalId>),
1006 IsolationLevel(IsolationLevel),
1008 RealTimeRecency,
1010 QueryAsOf,
1012 }
1013
1014 impl fmt::Display for Reason {
1015 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1016 match self {
1017 Reason::ComputeInput(ids) => {
1018 let formatted: Vec<_> =
1019 ids.iter().map(|(c, g)| format!("({}, {})", c, g)).collect();
1020 write!(f, "Indexed inputs: [{}]", formatted.join(", "))
1021 }
1022 Reason::StorageInput(ids) => {
1023 let formatted: Vec<_> = ids.iter().map(|g| format!("{}", g)).collect();
1024 write!(f, "Storage inputs: [{}]", formatted.join(", "))
1025 }
1026 Reason::IsolationLevel(level) => {
1027 write!(f, "Isolation level: {:?}", level)
1028 }
1029 Reason::RealTimeRecency => {
1030 write!(f, "Real-time recency")
1031 }
1032 Reason::QueryAsOf => {
1033 write!(f, "Query's AS OF")
1034 }
1035 }
1036 }
1037 }
1038
1039 pub enum Preference {
1042 FreshestAvailable,
1055 StalestValid,
1061 }
1062}