1use std::fmt;
13
14use async_trait::async_trait;
15use chrono::{DateTime, Utc};
16use constraints::Constraints;
17use differential_dataflow::lattice::Lattice;
18use itertools::Itertools;
19use mz_compute_types::ComputeInstanceId;
20use mz_ore::cast::CastLossy;
21use mz_repr::{GlobalId, Timestamp, TimestampManipulation};
22use mz_sql::plan::QueryWhen;
23use mz_sql::session::vars::IsolationLevel;
24use mz_storage_types::sources::Timeline;
25use serde::{Deserialize, Serialize};
26use timely::progress::{Antichain, Timestamp as _};
27
28use crate::AdapterError;
29use crate::catalog::CatalogState;
30use crate::coord::Coordinator;
31use crate::coord::id_bundle::CollectionIdBundle;
32use crate::coord::read_policy::ReadHolds;
33use crate::coord::timeline::TimelineContext;
34use crate::session::Session;
35
36#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
38pub enum TimestampContext {
39 TimelineTimestamp {
41 timeline: Timeline,
42 chosen_ts: Timestamp,
49 oracle_ts: Option<Timestamp>,
53 },
54 NoTimestamp,
56}
57
58impl TimestampContext {
59 pub fn from_timeline_context(
61 chosen_ts: Timestamp,
62 oracle_ts: Option<Timestamp>,
63 transaction_timeline: Option<Timeline>,
64 timeline_context: &TimelineContext,
65 ) -> TimestampContext {
66 match timeline_context {
67 TimelineContext::TimelineDependent(timeline) => {
68 if let Some(transaction_timeline) = transaction_timeline {
69 assert_eq!(timeline, &transaction_timeline);
70 }
71 Self::TimelineTimestamp {
72 timeline: timeline.clone(),
73 chosen_ts,
74 oracle_ts,
75 }
76 }
77 TimelineContext::TimestampDependent => {
78 Self::TimelineTimestamp {
80 timeline: transaction_timeline.unwrap_or(Timeline::EpochMilliseconds),
81 chosen_ts,
82 oracle_ts,
83 }
84 }
85 TimelineContext::TimestampIndependent => Self::NoTimestamp,
86 }
87 }
88
89 pub fn timeline(&self) -> Option<&Timeline> {
91 self.timeline_timestamp().map(|tt| tt.0)
92 }
93
94 pub fn timestamp(&self) -> Option<&Timestamp> {
96 self.timeline_timestamp().map(|tt| tt.1)
97 }
98
99 pub fn timeline_timestamp(&self) -> Option<(&Timeline, &Timestamp)> {
101 match self {
102 Self::TimelineTimestamp {
103 timeline,
104 chosen_ts,
105 ..
106 } => Some((timeline, chosen_ts)),
107 Self::NoTimestamp => None,
108 }
109 }
110
111 pub fn timestamp_or_default(&self) -> Timestamp {
113 match self {
114 Self::TimelineTimestamp { chosen_ts, .. } => chosen_ts.clone(),
115 Self::NoTimestamp => Timestamp::maximum(),
119 }
120 }
121
122 pub fn contains_timestamp(&self) -> bool {
124 self.timestamp().is_some()
125 }
126
127 pub fn antichain(&self) -> Antichain<Timestamp> {
129 Antichain::from_elem(self.timestamp_or_default())
130 }
131}
132
133#[async_trait(?Send)]
134impl TimestampProvider for Coordinator {
135 fn compute_read_frontier(
137 &self,
138 instance: ComputeInstanceId,
139 id: GlobalId,
140 ) -> Antichain<Timestamp> {
141 self.controller
142 .compute
143 .collection_frontiers(id, Some(instance))
144 .expect("id does not exist")
145 .read_frontier
146 }
147
148 fn compute_write_frontier(
150 &self,
151 instance: ComputeInstanceId,
152 id: GlobalId,
153 ) -> Antichain<Timestamp> {
154 self.controller
155 .compute
156 .collection_frontiers(id, Some(instance))
157 .expect("id does not exist")
158 .write_frontier
159 }
160
161 fn storage_frontiers(
162 &self,
163 ids: Vec<GlobalId>,
164 ) -> Vec<(GlobalId, Antichain<Timestamp>, Antichain<Timestamp>)> {
165 self.controller
166 .storage
167 .collections_frontiers(ids)
168 .expect("missing collections")
169 }
170
171 fn acquire_read_holds(&self, id_bundle: &CollectionIdBundle) -> ReadHolds {
172 self.acquire_read_holds(id_bundle)
173 }
174
175 fn catalog_state(&self) -> &CatalogState {
176 self.catalog().state()
177 }
178}
179
180#[derive(Debug, Clone, Serialize, Deserialize)]
183pub struct RawTimestampDetermination {
184 pub timestamp: Timestamp,
185 pub constraints: Constraints,
186 pub session_oracle_read_ts: Option<Timestamp>,
187}
188
189#[async_trait(?Send)]
190pub trait TimestampProvider {
191 fn compute_read_frontier(
192 &self,
193 instance: ComputeInstanceId,
194 id: GlobalId,
195 ) -> Antichain<Timestamp>;
196 fn compute_write_frontier(
197 &self,
198 instance: ComputeInstanceId,
199 id: GlobalId,
200 ) -> Antichain<Timestamp>;
201
202 fn storage_frontiers(
205 &self,
206 ids: Vec<GlobalId>,
207 ) -> Vec<(GlobalId, Antichain<Timestamp>, Antichain<Timestamp>)>;
208
209 fn catalog_state(&self) -> &CatalogState;
210
211 fn get_timeline(timeline_context: &TimelineContext) -> Option<Timeline> {
212 let timeline = match timeline_context {
213 TimelineContext::TimelineDependent(timeline) => Some(timeline.clone()),
214 TimelineContext::TimestampDependent => Some(Timeline::EpochMilliseconds),
216 TimelineContext::TimestampIndependent => None,
217 };
218
219 timeline
220 }
221
222 fn needs_linearized_read_ts(isolation_level: &IsolationLevel, when: &QueryWhen) -> bool {
229 when.must_advance_to_timeline_ts()
237 || (when.can_advance_to_timeline_ts()
238 && matches!(
239 isolation_level,
240 IsolationLevel::StrictSerializable
241 | IsolationLevel::StrongSessionSerializable
242 | IsolationLevel::BoundedStaleness(_)
243 ))
244 }
245
246 fn determine_timestamp_via_constraints(
250 session: &Session,
251 read_holds: &ReadHolds,
252 id_bundle: &CollectionIdBundle,
253 when: &QueryWhen,
254 oracle_read_ts: Option<Timestamp>,
255 real_time_recency_ts: Option<Timestamp>,
256 isolation_level: &IsolationLevel,
257 timeline: &Option<Timeline>,
258 largest_not_in_advance_of_upper: Timestamp,
259 ) -> Result<RawTimestampDetermination, AdapterError> {
260 use constraints::{Constraints, Preference, Reason};
261
262 let mut session_oracle_read_ts = None;
263 let constraints = {
267 let mut constraints = Constraints::default();
269
270 let since = read_holds.least_valid_read();
276 let storage = id_bundle
277 .storage_ids
278 .iter()
279 .cloned()
280 .collect::<Vec<GlobalId>>();
281 if !storage.is_empty() {
282 constraints
283 .lower
284 .push((since.clone(), Reason::StorageInput(storage)));
285 }
286 let compute = id_bundle
287 .compute_ids
288 .iter()
289 .flat_map(|(key, ids)| ids.iter().map(|id| (*key, *id)))
290 .collect::<Vec<(ComputeInstanceId, GlobalId)>>();
291 if !compute.is_empty() {
292 constraints
293 .lower
294 .push((since.clone(), Reason::ComputeInput(compute)));
295 }
296
297 if let Some(ts) = when.advance_to_timestamp() {
299 constraints
300 .lower
301 .push((Antichain::from_elem(ts), Reason::QueryAsOf));
302 if when.constrains_upper() {
304 constraints
305 .upper
306 .push((Antichain::from_elem(ts), Reason::QueryAsOf));
307 }
308 }
309
310 if let Some(timestamp) = &oracle_read_ts {
315 let hard_lower_bound = match isolation_level {
320 IsolationLevel::StrongSessionSerializable
321 | IsolationLevel::BoundedStaleness(_) => false,
322 IsolationLevel::ReadUncommitted
323 | IsolationLevel::ReadCommitted
324 | IsolationLevel::RepeatableRead
325 | IsolationLevel::Serializable
326 | IsolationLevel::StrictSerializable => true,
327 };
328 if hard_lower_bound || when.must_advance_to_timeline_ts() {
332 constraints.lower.push((
333 Antichain::from_elem(*timestamp),
334 Reason::IsolationLevel(*isolation_level),
335 ));
336 }
337 }
338
339 if let Some(real_time_recency_ts) = real_time_recency_ts {
341 assert!(
342 session.vars().real_time_recency()
343 && isolation_level == &IsolationLevel::StrictSerializable,
344 "real time recency timestamp should only be supplied when real time recency \
345 is enabled and the isolation level is strict serializable"
346 );
347 constraints.lower.push((
348 Antichain::from_elem(real_time_recency_ts),
349 Reason::RealTimeRecency,
350 ));
351 }
352
353 if let IsolationLevel::BoundedStaleness(d) = isolation_level {
360 if let Some(anchor) = oracle_read_ts {
361 let bound_ms = u64::try_from(d.as_millis()).unwrap_or(u64::MAX);
362 let lower = anchor.saturating_sub(bound_ms);
363 constraints.lower.push((
364 Antichain::from_elem(lower),
365 Reason::IsolationLevel(*isolation_level),
366 ));
367 constraints.upper.push((
368 Antichain::from_elem(largest_not_in_advance_of_upper),
369 Reason::IsolationLevel(*isolation_level),
370 ));
371 }
372 }
373
374 if isolation_level == &IsolationLevel::StrongSessionSerializable {
376 if let Some(timeline) = &timeline {
377 if let Some(oracle) = session.get_timestamp_oracle(timeline) {
378 let session_ts = oracle.read_ts();
379 constraints.lower.push((
380 Antichain::from_elem(session_ts),
381 Reason::IsolationLevel(*isolation_level),
382 ));
383 session_oracle_read_ts = Some(session_ts);
384 }
385
386 if when.can_advance_to_upper() && when.can_advance_to_timeline_ts() {
396 let mut advance_to = largest_not_in_advance_of_upper;
397 if let Some(oracle_read_ts) = oracle_read_ts {
398 advance_to = std::cmp::min(advance_to, oracle_read_ts);
399 }
400 constraints.lower.push((
401 Antichain::from_elem(advance_to),
402 Reason::IsolationLevel(*isolation_level),
403 ));
404 }
405 }
406 }
407
408 constraints.minimize();
409 constraints
410 };
411
412 let preferences = {
417 if when.can_advance_to_upper()
422 && (isolation_level == &IsolationLevel::Serializable
423 || matches!(isolation_level, IsolationLevel::BoundedStaleness(_))
424 || timeline.is_none())
425 {
426 Preference::FreshestAvailable
427 } else {
428 Preference::StalestValid
429 }
430
431 };
435
436 let constraint_candidate = {
438 let mut candidate = Timestamp::minimum();
439 candidate.advance_by(constraints.lower_bound().borrow());
441 if let Preference::FreshestAvailable = preferences {
444 let mut upper_bound = constraints.upper_bound();
445 upper_bound.insert(largest_not_in_advance_of_upper);
446 candidate.advance_by(upper_bound.borrow());
447 }
448 if !constraints.lower_bound().less_equal(&candidate)
452 || constraints.upper_bound().less_than(&candidate)
453 {
454 if let IsolationLevel::BoundedStaleness(d) = isolation_level {
464 if let Some(anchor) = oracle_read_ts {
465 let bound_ms = u64::try_from(d.as_millis()).unwrap_or(u64::MAX);
466 let bs_lower: u64 = anchor.saturating_sub(bound_ms).into();
467 let upper: u64 = largest_not_in_advance_of_upper.into();
468 let gap = bs_lower.saturating_sub(upper);
469 if gap > 0 {
470 return Err(AdapterError::BoundedStalenessExceeded {
471 bound: *d,
472 gap_ms: gap,
473 slowest_input: None,
474 });
475 }
476 }
477 }
478 return Err(AdapterError::ImpossibleTimestampConstraints {
479 constraints: constraints.display(timeline.as_ref()).to_string(),
480 });
481 } else {
482 candidate
483 }
484 };
485
486 Ok(RawTimestampDetermination {
487 timestamp: constraint_candidate,
488 constraints,
489 session_oracle_read_ts,
490 })
491 }
492
493 fn determine_timestamp_for(
501 &self,
502 session: &Session,
503 id_bundle: &CollectionIdBundle,
504 when: &QueryWhen,
505 timeline_context: &TimelineContext,
506 oracle_read_ts: Option<Timestamp>,
507 real_time_recency_ts: Option<Timestamp>,
508 isolation_level: &IsolationLevel,
509 ) -> Result<(TimestampDetermination, ReadHolds), AdapterError> {
510 let read_holds = self.acquire_read_holds(id_bundle);
513
514 let upper = self.least_valid_write(id_bundle);
515
516 Self::determine_timestamp_for_inner(
517 session,
518 id_bundle,
519 when,
520 timeline_context,
521 oracle_read_ts,
522 real_time_recency_ts,
523 isolation_level,
524 read_holds,
525 upper,
526 )
527 }
528
529 fn determine_timestamp_for_inner(
531 session: &Session,
532 id_bundle: &CollectionIdBundle,
533 when: &QueryWhen,
534 timeline_context: &TimelineContext,
535 oracle_read_ts: Option<Timestamp>,
536 real_time_recency_ts: Option<Timestamp>,
537 isolation_level: &IsolationLevel,
538 read_holds: ReadHolds,
539 upper: Antichain<Timestamp>,
540 ) -> Result<(TimestampDetermination, ReadHolds), AdapterError> {
541 let timeline = Self::get_timeline(timeline_context);
542 let largest_not_in_advance_of_upper = Coordinator::largest_not_in_advance_of_upper(&upper);
543 let since = read_holds.least_valid_read();
544
545 if since.is_empty() {
549 let mut unreadable_collections = Vec::new();
551 for (coll_id, hold) in read_holds.storage_holds {
552 if hold.since().is_empty() {
553 unreadable_collections.push(coll_id);
554 }
555 }
556 for ((_instance_id, coll_id), hold) in read_holds.compute_holds {
557 if hold.since().is_empty() {
558 unreadable_collections.push(coll_id);
559 }
560 }
561 return Err(AdapterError::CollectionUnreadable {
562 id: unreadable_collections.into_iter().join(", "),
563 });
564 }
565
566 if isolation_level.is_bounded_staleness()
571 && matches!(&timeline, Some(t) if *t != Timeline::EpochMilliseconds)
572 {
573 return Err(AdapterError::BoundedStalenessTimelineUnsupported);
574 }
575
576 let raw_determination = Self::determine_timestamp_via_constraints(
577 session,
578 &read_holds,
579 id_bundle,
580 when,
581 oracle_read_ts,
582 real_time_recency_ts,
583 isolation_level,
584 &timeline,
585 largest_not_in_advance_of_upper,
586 )?;
587
588 let timestamp_context = TimestampContext::from_timeline_context(
589 raw_determination.timestamp,
590 oracle_read_ts,
591 timeline,
592 timeline_context,
593 );
594
595 let determination = TimestampDetermination {
596 timestamp_context,
597 since,
598 upper,
599 largest_not_in_advance_of_upper,
600 oracle_read_ts,
601 session_oracle_read_ts: raw_determination.session_oracle_read_ts,
602 real_time_recency_ts,
603 constraints: raw_determination.constraints,
604 };
605
606 Ok((determination, read_holds))
607 }
608
609 fn acquire_read_holds(&self, id_bundle: &CollectionIdBundle) -> ReadHolds;
612
613 fn least_valid_write(&self, id_bundle: &CollectionIdBundle) -> Antichain<mz_repr::Timestamp> {
618 let mut upper = Antichain::new();
619 {
620 for (_id, _since, collection_upper) in
621 self.storage_frontiers(id_bundle.storage_ids.iter().cloned().collect_vec())
622 {
623 upper.extend(collection_upper);
624 }
625 }
626 {
627 for (instance, compute_ids) in &id_bundle.compute_ids {
628 for id in compute_ids.iter() {
629 upper.extend(self.compute_write_frontier(*instance, *id));
630 }
631 }
632 }
633 upper
634 }
635
636 fn greatest_available_read(&self, id_bundle: &CollectionIdBundle) -> Antichain<Timestamp> {
639 let mut frontier = Antichain::new();
640 for t in self.least_valid_write(id_bundle) {
641 frontier.insert(t.step_back().unwrap_or(t));
642 }
643 frontier
644 }
645}
646
647impl Coordinator {
648 pub(crate) async fn oracle_read_ts(
649 &self,
650 session: &Session,
651 timeline_ctx: &TimelineContext,
652 when: &QueryWhen,
653 ) -> Option<Timestamp> {
654 let isolation_level = session.vars().transaction_isolation().clone();
655 let timeline = Coordinator::get_timeline(timeline_ctx);
656 let needs_linearized_read_ts =
657 Coordinator::needs_linearized_read_ts(&isolation_level, when);
658
659 let oracle_read_ts = match timeline {
660 Some(timeline) if needs_linearized_read_ts => {
661 let timestamp_oracle = self.get_timestamp_oracle(&timeline);
662 Some(timestamp_oracle.read_ts().await)
663 }
664 Some(_) | None => None,
665 };
666
667 oracle_read_ts
668 }
669
670 #[mz_ore::instrument(level = "debug")]
674 pub(crate) fn determine_timestamp(
675 &self,
676 session: &Session,
677 id_bundle: &CollectionIdBundle,
678 when: &QueryWhen,
679 compute_instance: ComputeInstanceId,
680 timeline_context: &TimelineContext,
681 oracle_read_ts: Option<Timestamp>,
682 real_time_recency_ts: Option<mz_repr::Timestamp>,
683 ) -> Result<(TimestampDetermination, ReadHolds), AdapterError> {
684 let isolation_level = session.vars().transaction_isolation();
685 let (det, read_holds) = self.determine_timestamp_for(
686 session,
687 id_bundle,
688 when,
689 timeline_context,
690 oracle_read_ts,
691 real_time_recency_ts,
692 isolation_level,
693 )?;
694 self.metrics
695 .determine_timestamp
696 .with_label_values(&[
697 match det.respond_immediately() {
698 true => "true",
699 false => "false",
700 },
701 isolation_level.as_variant_str(),
702 &compute_instance.to_string(),
703 ])
704 .inc();
705 if !det.respond_immediately()
706 && isolation_level == &IsolationLevel::StrictSerializable
707 && real_time_recency_ts.is_none()
708 {
709 if let Some(strict) = det.timestamp_context.timestamp() {
711 let (serializable_det, _tmp_read_holds) = self.determine_timestamp_for(
712 session,
713 id_bundle,
714 when,
715 timeline_context,
716 oracle_read_ts,
717 real_time_recency_ts,
718 &IsolationLevel::Serializable,
719 )?;
720
721 if let Some(serializable) = serializable_det.timestamp_context.timestamp() {
722 self.metrics
723 .timestamp_difference_for_strict_serializable_ms
724 .with_label_values(&[compute_instance.to_string().as_str()])
725 .observe(f64::cast_lossy(u64::from(
726 strict.saturating_sub(*serializable),
727 )));
728 }
729 }
730 }
731 if !det.respond_immediately()
732 && isolation_level.is_bounded_staleness()
733 && real_time_recency_ts.is_none()
734 {
735 if let Some(bs_ts) = det.timestamp_context.timestamp() {
737 let (serializable_det, _tmp_read_holds) = self.determine_timestamp_for(
738 session,
739 id_bundle,
740 when,
741 timeline_context,
742 oracle_read_ts,
743 real_time_recency_ts,
744 &IsolationLevel::Serializable,
745 )?;
746 if let Some(serializable) = serializable_det.timestamp_context.timestamp() {
747 self.metrics
748 .timestamp_difference_for_bounded_staleness_ms
749 .with_label_values(&[compute_instance.to_string().as_str()])
750 .observe(f64::cast_lossy(u64::from(
751 serializable.saturating_sub(*bs_ts),
752 )));
753 }
754 }
755 }
756 Ok((det, read_holds))
757 }
758
759 pub(crate) fn largest_not_in_advance_of_upper(
764 upper: &Antichain<mz_repr::Timestamp>,
765 ) -> mz_repr::Timestamp {
766 if let Some(upper) = upper.as_option() {
771 upper.step_back().unwrap_or_else(Timestamp::minimum)
772 } else {
773 Timestamp::MAX
778 }
779 }
780}
781
782#[derive(Serialize, Deserialize, Debug, Clone)]
784pub struct TimestampDetermination {
785 pub timestamp_context: TimestampContext,
787 pub since: Antichain<Timestamp>,
789 pub upper: Antichain<Timestamp>,
791 pub largest_not_in_advance_of_upper: Timestamp,
793 pub oracle_read_ts: Option<Timestamp>,
795 pub session_oracle_read_ts: Option<Timestamp>,
797 pub real_time_recency_ts: Option<Timestamp>,
799 pub constraints: Constraints,
802}
803
804impl TimestampDetermination {
805 pub fn respond_immediately(&self) -> bool {
806 match &self.timestamp_context {
807 TimestampContext::TimelineTimestamp { chosen_ts, .. } => {
808 !self.upper.less_equal(chosen_ts)
809 }
810 TimestampContext::NoTimestamp => true,
811 }
812 }
813}
814
815#[derive(Clone, Debug, Serialize, Deserialize)]
817pub struct TimestampExplanation {
818 pub determination: TimestampDetermination,
820 pub sources: Vec<TimestampSource>,
822 pub session_wall_time: DateTime<Utc>,
824 pub respond_immediately: bool,
826}
827
828#[derive(Clone, Debug, Serialize, Deserialize)]
829pub struct TimestampSource {
830 pub name: String,
831 pub read_frontier: Vec<Timestamp>,
832 pub write_frontier: Vec<Timestamp>,
833}
834
835pub trait DisplayableInTimeline {
836 fn fmt(&self, timeline: Option<&Timeline>, f: &mut fmt::Formatter) -> fmt::Result;
837 fn display<'a>(&'a self, timeline: Option<&'a Timeline>) -> DisplayInTimeline<'a, Self> {
838 DisplayInTimeline { t: self, timeline }
839 }
840}
841
842impl DisplayableInTimeline for mz_repr::Timestamp {
843 fn fmt(&self, timeline: Option<&Timeline>, f: &mut fmt::Formatter) -> fmt::Result {
844 if let Some(Timeline::EpochMilliseconds) = timeline {
845 let ts_ms: u64 = self.into();
846 if let Ok(ts_ms) = i64::try_from(ts_ms) {
847 if let Some(ndt) = DateTime::from_timestamp_millis(ts_ms) {
848 return write!(f, "{:13} ({})", self, ndt.format("%Y-%m-%d %H:%M:%S%.3f"));
849 }
850 }
851 }
852 write!(f, "{:13}", self)
853 }
854}
855
856pub struct DisplayInTimeline<'a, T: ?Sized> {
857 t: &'a T,
858 timeline: Option<&'a Timeline>,
859}
860impl<'a, T> fmt::Display for DisplayInTimeline<'a, T>
861where
862 T: DisplayableInTimeline,
863{
864 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
865 self.t.fmt(self.timeline, f)
866 }
867}
868
869impl<'a, T> fmt::Debug for DisplayInTimeline<'a, T>
870where
871 T: DisplayableInTimeline,
872{
873 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
874 fmt::Display::fmt(&self, f)
875 }
876}
877
878impl fmt::Display for TimestampExplanation {
879 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
880 let timeline = self.determination.timestamp_context.timeline();
881 writeln!(
882 f,
883 " query timestamp: {}",
884 self.determination
885 .timestamp_context
886 .timestamp_or_default()
887 .display(timeline)
888 )?;
889 if let Some(oracle_read_ts) = &self.determination.oracle_read_ts {
890 writeln!(
891 f,
892 " oracle read timestamp: {}",
893 oracle_read_ts.display(timeline)
894 )?;
895 }
896 if let Some(session_oracle_read_ts) = &self.determination.session_oracle_read_ts {
897 writeln!(
898 f,
899 " session oracle read timestamp: {}",
900 session_oracle_read_ts.display(timeline)
901 )?;
902 }
903 if let Some(real_time_recency_ts) = &self.determination.real_time_recency_ts {
904 writeln!(
905 f,
906 " real time recency timestamp: {}",
907 real_time_recency_ts.display(timeline)
908 )?;
909 }
910 writeln!(
911 f,
912 "largest not in advance of upper: {}",
913 self.determination
914 .largest_not_in_advance_of_upper
915 .display(timeline),
916 )?;
917 writeln!(
918 f,
919 " upper:{:?}",
920 self.determination
921 .upper
922 .iter()
923 .map(|t| t.display(timeline))
924 .collect::<Vec<_>>()
925 )?;
926 writeln!(
927 f,
928 " since:{:?}",
929 self.determination
930 .since
931 .iter()
932 .map(|t| t.display(timeline))
933 .collect::<Vec<_>>()
934 )?;
935 writeln!(
936 f,
937 " can respond immediately: {}",
938 self.respond_immediately
939 )?;
940 writeln!(f, " timeline: {:?}", &timeline)?;
941 writeln!(
942 f,
943 " session wall time: {:13} ({})",
944 self.session_wall_time.timestamp_millis(),
945 self.session_wall_time.format("%Y-%m-%d %H:%M:%S%.3f"),
946 )?;
947
948 for source in &self.sources {
949 writeln!(f, "")?;
950 writeln!(f, "source {}:", source.name)?;
951 writeln!(
952 f,
953 " read frontier:{:?}",
954 source
955 .read_frontier
956 .iter()
957 .map(|t| t.display(timeline))
958 .collect::<Vec<_>>()
959 )?;
960 writeln!(
961 f,
962 " write frontier:{:?}",
963 source
964 .write_frontier
965 .iter()
966 .map(|t| t.display(timeline))
967 .collect::<Vec<_>>()
968 )?;
969 }
970
971 writeln!(f, "")?;
972 writeln!(f, "binding constraints:")?;
973 write!(f, "{}", self.determination.constraints.display(timeline))?;
974
975 Ok(())
976 }
977}
978
979mod constraints {
981
982 use core::fmt;
983 use std::fmt::Debug;
984
985 use differential_dataflow::lattice::Lattice;
986 use mz_storage_types::sources::Timeline;
987 use serde::{Deserialize, Serialize};
988 use timely::progress::{Antichain, Timestamp};
989
990 use mz_compute_types::ComputeInstanceId;
991 use mz_repr::GlobalId;
992 use mz_sql::session::vars::IsolationLevel;
993
994 use super::DisplayableInTimeline;
995
996 #[derive(Default, Serialize, Deserialize, Clone)]
1009 pub struct Constraints {
1010 pub lower: Vec<(Antichain<mz_repr::Timestamp>, Reason)>,
1012 pub upper: Vec<(Antichain<mz_repr::Timestamp>, Reason)>,
1014 }
1015
1016 impl DisplayableInTimeline for Constraints {
1017 fn fmt(&self, timeline: Option<&Timeline>, f: &mut fmt::Formatter) -> fmt::Result {
1018 if !self.lower.is_empty() {
1019 writeln!(f, "lower:")?;
1020 for (ts, reason) in &self.lower {
1021 let ts: Vec<_> = ts
1022 .iter()
1023 .map(|t| format!("{}", t.display(timeline)))
1024 .collect();
1025 writeln!(f, " ({}): [{}]", reason, ts.join(", "))?;
1026 }
1027 }
1028 if !self.upper.is_empty() {
1029 writeln!(f, "upper:")?;
1030 for (ts, reason) in &self.upper {
1031 let ts: Vec<_> = ts
1032 .iter()
1033 .map(|t| format!("{}", t.display(timeline)))
1034 .collect();
1035 writeln!(f, " ({}): [{}]", reason, ts.join(", "))?;
1036 }
1037 }
1038 Ok(())
1039 }
1040 }
1041
1042 impl Debug for Constraints {
1043 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1044 self.display(None).fmt(f)?;
1045 Ok(())
1046 }
1047 }
1048
1049 impl Constraints {
1050 pub fn minimize(&mut self) {
1060 let lower_frontier = self.lower_bound();
1062 self.lower.retain(|(anti, _)| {
1064 anti.iter()
1065 .any(|time| lower_frontier.elements().contains(time))
1066 });
1067
1068 let upper_frontier = self.upper_bound();
1070 self.upper.retain(|(anti, _)| {
1072 anti.iter()
1073 .any(|time| upper_frontier.elements().contains(time))
1074 });
1075 }
1076
1077 pub fn lower_bound(&self) -> Antichain<mz_repr::Timestamp> {
1079 let mut lower = Antichain::from_elem(mz_repr::Timestamp::minimum());
1080 for (anti, _) in self.lower.iter() {
1081 lower = lower.join(anti);
1082 }
1083 lower
1084 }
1085 pub fn upper_bound(&self) -> Antichain<mz_repr::Timestamp> {
1087 self.upper
1088 .iter()
1089 .flat_map(|(anti, _)| anti.iter())
1090 .cloned()
1091 .collect()
1092 }
1093 }
1094
1095 #[derive(Serialize, Deserialize, Clone)]
1097 pub enum Reason {
1098 ComputeInput(Vec<(ComputeInstanceId, GlobalId)>),
1102 StorageInput(Vec<GlobalId>),
1104 IsolationLevel(IsolationLevel),
1106 RealTimeRecency,
1108 QueryAsOf,
1110 }
1111
1112 impl fmt::Display for Reason {
1113 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1114 match self {
1115 Reason::ComputeInput(ids) => {
1116 let formatted: Vec<_> =
1117 ids.iter().map(|(c, g)| format!("({}, {})", c, g)).collect();
1118 write!(f, "Indexed inputs: [{}]", formatted.join(", "))
1119 }
1120 Reason::StorageInput(ids) => {
1121 let formatted: Vec<_> = ids.iter().map(|g| format!("{}", g)).collect();
1122 write!(f, "Storage inputs: [{}]", formatted.join(", "))
1123 }
1124 Reason::IsolationLevel(level) => {
1125 write!(f, "Isolation level: {:?}", level)
1126 }
1127 Reason::RealTimeRecency => {
1128 write!(f, "Real-time recency")
1129 }
1130 Reason::QueryAsOf => {
1131 write!(f, "Query's AS OF")
1132 }
1133 }
1134 }
1135 }
1136
1137 pub enum Preference {
1140 FreshestAvailable,
1153 StalestValid,
1159 }
1160}