1use std::fmt;
13
14use async_trait::async_trait;
15use chrono::{DateTime, Utc};
16use constraints::Constraints;
17use differential_dataflow::lattice::Lattice;
18use itertools::Itertools;
19use mz_adapter_types::dyncfgs::CONSTRAINT_BASED_TIMESTAMP_SELECTION;
20use mz_adapter_types::timestamp_selection::ConstraintBasedTimestampSelection;
21use mz_compute_types::ComputeInstanceId;
22use mz_ore::cast::CastLossy;
23use mz_ore::soft_assert_eq_or_log;
24use mz_repr::{GlobalId, Timestamp, TimestampManipulation};
25use mz_sql::plan::QueryWhen;
26use mz_sql::session::metadata::SessionMetadata;
27use mz_sql::session::vars::IsolationLevel;
28use mz_storage_types::sources::Timeline;
29use serde::{Deserialize, Serialize};
30use timely::progress::{Antichain, Timestamp as TimelyTimestamp};
31use tracing::{Level, event};
32
33use crate::AdapterError;
34use crate::catalog::CatalogState;
35use crate::coord::Coordinator;
36use crate::coord::id_bundle::CollectionIdBundle;
37use crate::coord::read_policy::ReadHolds;
38use crate::coord::timeline::TimelineContext;
39use crate::session::Session;
40
41#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
43pub enum TimestampContext<T> {
44 TimelineTimestamp {
46 timeline: Timeline,
47 chosen_ts: T,
54 oracle_ts: Option<T>,
58 },
59 NoTimestamp,
61}
62
63impl<T: TimestampManipulation> TimestampContext<T> {
64 pub fn from_timeline_context(
66 chosen_ts: T,
67 oracle_ts: Option<T>,
68 transaction_timeline: Option<Timeline>,
69 timeline_context: &TimelineContext,
70 ) -> TimestampContext<T> {
71 match timeline_context {
72 TimelineContext::TimelineDependent(timeline) => {
73 if let Some(transaction_timeline) = transaction_timeline {
74 assert_eq!(timeline, &transaction_timeline);
75 }
76 Self::TimelineTimestamp {
77 timeline: timeline.clone(),
78 chosen_ts,
79 oracle_ts,
80 }
81 }
82 TimelineContext::TimestampDependent => {
83 Self::TimelineTimestamp {
85 timeline: transaction_timeline.unwrap_or(Timeline::EpochMilliseconds),
86 chosen_ts,
87 oracle_ts,
88 }
89 }
90 TimelineContext::TimestampIndependent => Self::NoTimestamp,
91 }
92 }
93
94 pub fn timeline(&self) -> Option<&Timeline> {
96 self.timeline_timestamp().map(|tt| tt.0)
97 }
98
99 pub fn timestamp(&self) -> Option<&T> {
101 self.timeline_timestamp().map(|tt| tt.1)
102 }
103
104 pub fn timeline_timestamp(&self) -> Option<(&Timeline, &T)> {
106 match self {
107 Self::TimelineTimestamp {
108 timeline,
109 chosen_ts,
110 ..
111 } => Some((timeline, chosen_ts)),
112 Self::NoTimestamp => None,
113 }
114 }
115
116 pub fn timestamp_or_default(&self) -> T {
118 match self {
119 Self::TimelineTimestamp { chosen_ts, .. } => chosen_ts.clone(),
120 Self::NoTimestamp => T::maximum(),
124 }
125 }
126
127 pub fn contains_timestamp(&self) -> bool {
129 self.timestamp().is_some()
130 }
131
132 pub fn antichain(&self) -> Antichain<T> {
134 Antichain::from_elem(self.timestamp_or_default())
135 }
136}
137
138#[async_trait(?Send)]
139impl TimestampProvider for Coordinator {
140 fn compute_read_frontier(
142 &self,
143 instance: ComputeInstanceId,
144 id: GlobalId,
145 ) -> Antichain<Timestamp> {
146 self.controller
147 .compute
148 .collection_frontiers(id, Some(instance))
149 .expect("id does not exist")
150 .read_frontier
151 }
152
153 fn compute_write_frontier(
155 &self,
156 instance: ComputeInstanceId,
157 id: GlobalId,
158 ) -> Antichain<Timestamp> {
159 self.controller
160 .compute
161 .collection_frontiers(id, Some(instance))
162 .expect("id does not exist")
163 .write_frontier
164 }
165
166 fn storage_frontiers(
167 &self,
168 ids: Vec<GlobalId>,
169 ) -> Vec<(GlobalId, Antichain<Timestamp>, Antichain<Timestamp>)> {
170 self.controller
171 .storage
172 .collections_frontiers(ids)
173 .expect("missing collections")
174 }
175
176 fn acquire_read_holds(&self, id_bundle: &CollectionIdBundle) -> ReadHolds<Timestamp> {
177 self.acquire_read_holds(id_bundle)
178 }
179
180 fn catalog_state(&self) -> &CatalogState {
181 self.catalog().state()
182 }
183}
184
185#[derive(Debug, Clone, Serialize, Deserialize)]
188pub struct RawTimestampDetermination<T> {
189 pub timestamp: T,
190 pub constraints: Option<Constraints>,
191 pub session_oracle_read_ts: Option<T>,
192}
193
194#[async_trait(?Send)]
195pub trait TimestampProvider {
196 fn compute_read_frontier(
197 &self,
198 instance: ComputeInstanceId,
199 id: GlobalId,
200 ) -> Antichain<Timestamp>;
201 fn compute_write_frontier(
202 &self,
203 instance: ComputeInstanceId,
204 id: GlobalId,
205 ) -> Antichain<Timestamp>;
206
207 fn storage_frontiers(
210 &self,
211 ids: Vec<GlobalId>,
212 ) -> Vec<(GlobalId, Antichain<Timestamp>, Antichain<Timestamp>)>;
213
214 fn catalog_state(&self) -> &CatalogState;
215
216 fn get_timeline(timeline_context: &TimelineContext) -> Option<Timeline> {
217 let timeline = match timeline_context {
218 TimelineContext::TimelineDependent(timeline) => Some(timeline.clone()),
219 TimelineContext::TimestampDependent => Some(Timeline::EpochMilliseconds),
221 TimelineContext::TimestampIndependent => None,
222 };
223
224 timeline
225 }
226
227 fn needs_linearized_read_ts(isolation_level: &IsolationLevel, when: &QueryWhen) -> bool {
234 when.must_advance_to_timeline_ts()
240 || (when.can_advance_to_timeline_ts()
241 && matches!(
242 isolation_level,
243 IsolationLevel::StrictSerializable | IsolationLevel::StrongSessionSerializable
244 ))
245 }
246
247 fn determine_timestamp_classical(
249 session: &Session,
250 read_holds: &ReadHolds<Timestamp>,
251 id_bundle: &CollectionIdBundle,
252 when: &QueryWhen,
253 oracle_read_ts: Option<Timestamp>,
254 compute_instance: ComputeInstanceId,
255 real_time_recency_ts: Option<Timestamp>,
256 isolation_level: &IsolationLevel,
257 timeline: &Option<Timeline>,
258 largest_not_in_advance_of_upper: Timestamp,
259 since: &Antichain<Timestamp>,
260 ) -> Result<RawTimestampDetermination<Timestamp>, AdapterError> {
261 let mut session_oracle_read_ts = None;
262 {
275 if timeline.is_some() && Self::needs_linearized_read_ts(isolation_level, when) {
286 assert!(
287 oracle_read_ts.is_some(),
288 "should get a timestamp from the oracle for linearized timeline {:?} but didn't",
289 timeline
290 );
291 }
292 }
293
294 let mut candidate = Timestamp::minimum();
296
297 if let Some(ts) = when.advance_to_timestamp() {
298 candidate.join_assign(&ts);
299 }
300
301 if when.advance_to_since() {
302 candidate.advance_by(since.borrow());
303 }
304
305 if let Some(timestamp) = &oracle_read_ts {
310 if isolation_level != &IsolationLevel::StrongSessionSerializable
311 || when.must_advance_to_timeline_ts()
312 {
313 candidate.join_assign(timestamp);
314 }
315 }
316
317 if when.can_advance_to_upper()
324 && (isolation_level == &IsolationLevel::Serializable || timeline.is_none())
325 {
326 candidate.join_assign(&largest_not_in_advance_of_upper);
327 }
328
329 if let Some(real_time_recency_ts) = real_time_recency_ts {
330 if !(session.vars().real_time_recency()
331 && isolation_level == &IsolationLevel::StrictSerializable)
332 {
333 coord_bail!(
337 "real time recency timestamp should only be supplied when real time recency \
338 is enabled and the isolation level is strict serializable"
339 );
340 }
341 candidate.join_assign(&real_time_recency_ts);
342 }
343
344 if isolation_level == &IsolationLevel::StrongSessionSerializable {
345 if let Some(timeline) = &timeline {
346 if let Some(oracle) = session.get_timestamp_oracle(timeline) {
347 let session_ts = oracle.read_ts();
348 candidate.join_assign(&session_ts);
349 session_oracle_read_ts = Some(session_ts);
350 }
351 }
352
353 if when.can_advance_to_upper() && when.can_advance_to_timeline_ts() {
363 let mut advance_to = largest_not_in_advance_of_upper;
364 if let Some(oracle_read_ts) = oracle_read_ts {
365 advance_to = std::cmp::min(advance_to, oracle_read_ts);
366 }
367 candidate.join_assign(&advance_to);
368 }
369 }
370
371 let timestamp = if since.less_equal(&candidate) {
379 event!(
380 Level::DEBUG,
381 conn_id = format!("{}", session.conn_id()),
382 since = format!("{since:?}"),
383 largest_not_in_advance_of_upper = format!("{largest_not_in_advance_of_upper}"),
384 timestamp = format!("{candidate}")
385 );
386 candidate
387 } else {
388 coord_bail!(generate_timestamp_not_valid_error_msg(
389 id_bundle,
390 compute_instance,
391 read_holds,
392 candidate
393 ));
394 };
395 Ok(RawTimestampDetermination {
396 timestamp,
397 constraints: None,
398 session_oracle_read_ts,
399 })
400 }
401
402 fn determine_timestamp_via_constraints(
406 session: &Session,
407 read_holds: &ReadHolds<Timestamp>,
408 id_bundle: &CollectionIdBundle,
409 when: &QueryWhen,
410 oracle_read_ts: Option<Timestamp>,
411 compute_instance: ComputeInstanceId,
412 real_time_recency_ts: Option<Timestamp>,
413 isolation_level: &IsolationLevel,
414 timeline: &Option<Timeline>,
415 largest_not_in_advance_of_upper: Timestamp,
416 ) -> Result<RawTimestampDetermination<Timestamp>, AdapterError> {
417 use constraints::{Constraints, Preference, Reason};
418
419 let mut session_oracle_read_ts = None;
420 let constraints = {
424 let mut constraints = Constraints::default();
426
427 let since = read_holds.least_valid_read();
433 let storage = id_bundle
434 .storage_ids
435 .iter()
436 .cloned()
437 .collect::<Vec<GlobalId>>();
438 if !storage.is_empty() {
439 constraints
440 .lower
441 .push((since.clone(), Reason::StorageInput(storage)));
442 }
443 let compute = id_bundle
444 .compute_ids
445 .iter()
446 .flat_map(|(key, ids)| ids.iter().map(|id| (*key, *id)))
447 .collect::<Vec<(ComputeInstanceId, GlobalId)>>();
448 if !compute.is_empty() {
449 constraints
450 .lower
451 .push((since.clone(), Reason::ComputeInput(compute)));
452 }
453
454 if let Some(ts) = when.advance_to_timestamp() {
456 constraints
457 .lower
458 .push((Antichain::from_elem(ts), Reason::QueryAsOf));
459 if when.constrains_upper() {
461 constraints
462 .upper
463 .push((Antichain::from_elem(ts), Reason::QueryAsOf));
464 }
465 }
466
467 if let Some(timestamp) = &oracle_read_ts {
472 if isolation_level != &IsolationLevel::StrongSessionSerializable
473 || when.must_advance_to_timeline_ts()
474 {
475 constraints.lower.push((
478 Antichain::from_elem(*timestamp),
479 Reason::IsolationLevel(*isolation_level),
480 ));
481 }
482 }
483
484 if let Some(real_time_recency_ts) = real_time_recency_ts {
486 assert!(
487 session.vars().real_time_recency()
488 && isolation_level == &IsolationLevel::StrictSerializable,
489 "real time recency timestamp should only be supplied when real time recency \
490 is enabled and the isolation level is strict serializable"
491 );
492 constraints.lower.push((
493 Antichain::from_elem(real_time_recency_ts),
494 Reason::RealTimeRecency,
495 ));
496 }
497
498 if isolation_level == &IsolationLevel::StrongSessionSerializable {
500 if let Some(timeline) = &timeline {
501 if let Some(oracle) = session.get_timestamp_oracle(timeline) {
502 let session_ts = oracle.read_ts();
503 constraints.lower.push((
504 Antichain::from_elem(session_ts),
505 Reason::IsolationLevel(*isolation_level),
506 ));
507 session_oracle_read_ts = Some(session_ts);
508 }
509
510 if when.can_advance_to_upper() && when.can_advance_to_timeline_ts() {
520 let mut advance_to = largest_not_in_advance_of_upper;
521 if let Some(oracle_read_ts) = oracle_read_ts {
522 advance_to = std::cmp::min(advance_to, oracle_read_ts);
523 }
524 constraints.lower.push((
525 Antichain::from_elem(advance_to),
526 Reason::IsolationLevel(*isolation_level),
527 ));
528 }
529 }
530 }
531
532 constraints.minimize();
533 constraints
534 };
535
536 let preferences = {
541 if when.can_advance_to_upper()
546 && (isolation_level == &IsolationLevel::Serializable || timeline.is_none())
547 {
548 Preference::FreshestAvailable
549 } else {
550 Preference::StalestValid
551 }
552
553 };
557
558 let constraint_candidate = {
560 let mut candidate = Timestamp::minimum();
561 candidate.advance_by(constraints.lower_bound().borrow());
562 if let Preference::FreshestAvailable = preferences {
565 let mut upper_bound = constraints.upper_bound();
566 upper_bound.insert(largest_not_in_advance_of_upper);
567 candidate.advance_by(upper_bound.borrow());
568 }
569 if constraints.upper_bound().less_than(&candidate) {
571 coord_bail!(generate_timestamp_not_valid_error_msg(
572 id_bundle,
573 compute_instance,
574 read_holds,
575 candidate
576 ));
577 } else {
578 candidate
579 }
580 };
581
582 Ok(RawTimestampDetermination {
583 timestamp: constraint_candidate,
584 constraints: Some(constraints),
585 session_oracle_read_ts,
586 })
587 }
588
589 fn determine_timestamp_for(
597 &self,
598 session: &Session,
599 id_bundle: &CollectionIdBundle,
600 when: &QueryWhen,
601 compute_instance: ComputeInstanceId,
602 timeline_context: &TimelineContext,
603 oracle_read_ts: Option<Timestamp>,
604 real_time_recency_ts: Option<Timestamp>,
605 isolation_level: &IsolationLevel,
606 constraint_based: &ConstraintBasedTimestampSelection,
607 ) -> Result<(TimestampDetermination<Timestamp>, ReadHolds<Timestamp>), AdapterError> {
608 let read_holds = self.acquire_read_holds(id_bundle);
611
612 let upper = self.least_valid_write(id_bundle);
613
614 Self::determine_timestamp_for_inner(
615 session,
616 id_bundle,
617 when,
618 compute_instance,
619 timeline_context,
620 oracle_read_ts,
621 real_time_recency_ts,
622 isolation_level,
623 constraint_based,
624 read_holds,
625 upper,
626 )
627 }
628
629 fn determine_timestamp_for_inner(
631 session: &Session,
632 id_bundle: &CollectionIdBundle,
633 when: &QueryWhen,
634 compute_instance: ComputeInstanceId,
635 timeline_context: &TimelineContext,
636 oracle_read_ts: Option<Timestamp>,
637 real_time_recency_ts: Option<Timestamp>,
638 isolation_level: &IsolationLevel,
639 constraint_based: &ConstraintBasedTimestampSelection,
640 read_holds: ReadHolds<Timestamp>,
641 upper: Antichain<Timestamp>,
642 ) -> Result<(TimestampDetermination<Timestamp>, ReadHolds<Timestamp>), AdapterError> {
643 let timeline = Self::get_timeline(timeline_context);
644 let largest_not_in_advance_of_upper = Coordinator::largest_not_in_advance_of_upper(&upper);
645 let since = read_holds.least_valid_read();
646
647 let raw_determination = match constraint_based {
648 ConstraintBasedTimestampSelection::Disabled => Self::determine_timestamp_classical(
649 session,
650 &read_holds,
651 id_bundle,
652 when,
653 oracle_read_ts,
654 compute_instance,
655 real_time_recency_ts,
656 isolation_level,
657 &timeline,
658 largest_not_in_advance_of_upper,
659 &since,
660 )?,
661 ConstraintBasedTimestampSelection::Enabled => {
662 Self::determine_timestamp_via_constraints(
663 session,
664 &read_holds,
665 id_bundle,
666 when,
667 oracle_read_ts,
668 compute_instance,
669 real_time_recency_ts,
670 isolation_level,
671 &timeline,
672 largest_not_in_advance_of_upper,
673 )?
674 }
675 ConstraintBasedTimestampSelection::Verify => {
676 let classical_determination = Self::determine_timestamp_classical(
677 session,
678 &read_holds,
679 id_bundle,
680 when,
681 oracle_read_ts,
682 compute_instance,
683 real_time_recency_ts,
684 isolation_level,
685 &timeline,
686 largest_not_in_advance_of_upper,
687 &since,
688 )?;
689
690 match Self::determine_timestamp_via_constraints(
691 session,
692 &read_holds,
693 id_bundle,
694 when,
695 oracle_read_ts,
696 compute_instance,
697 real_time_recency_ts,
698 isolation_level,
699 &timeline,
700 largest_not_in_advance_of_upper,
701 ) {
702 Ok(constraint_determination) => {
703 soft_assert_eq_or_log!(
704 classical_determination.timestamp,
705 constraint_determination.timestamp,
706 "timestamp determination mismatch"
707 );
708 if classical_determination.timestamp != constraint_determination.timestamp {
709 tracing::info!(
710 "timestamp constrains: {:?}",
711 constraint_determination.constraints
712 );
713 }
714 RawTimestampDetermination {
715 timestamp: classical_determination.timestamp,
716 constraints: constraint_determination.constraints,
717 session_oracle_read_ts: classical_determination.session_oracle_read_ts,
718 }
719 }
720 Err(e) => {
721 event!(Level::ERROR, error = ?e, "constraint-based timestamp determination failed");
722 RawTimestampDetermination {
723 timestamp: classical_determination.timestamp,
724 constraints: classical_determination.constraints,
725 session_oracle_read_ts: classical_determination.session_oracle_read_ts,
726 }
727 }
728 }
729 }
730 };
731
732 let timestamp_context = TimestampContext::from_timeline_context(
733 raw_determination.timestamp,
734 oracle_read_ts,
735 timeline,
736 timeline_context,
737 );
738
739 let determination = TimestampDetermination {
740 timestamp_context,
741 since,
742 upper,
743 largest_not_in_advance_of_upper,
744 oracle_read_ts,
745 session_oracle_read_ts: raw_determination.session_oracle_read_ts,
746 real_time_recency_ts,
747 constraints: raw_determination.constraints,
748 };
749
750 Ok((determination, read_holds))
751 }
752
753 fn acquire_read_holds(&self, id_bundle: &CollectionIdBundle) -> ReadHolds<mz_repr::Timestamp>;
756
757 fn least_valid_write(&self, id_bundle: &CollectionIdBundle) -> Antichain<mz_repr::Timestamp> {
762 let mut upper = Antichain::new();
763 {
764 for (_id, _since, collection_upper) in
765 self.storage_frontiers(id_bundle.storage_ids.iter().cloned().collect_vec())
766 {
767 upper.extend(collection_upper);
768 }
769 }
770 {
771 for (instance, compute_ids) in &id_bundle.compute_ids {
772 for id in compute_ids.iter() {
773 upper.extend(self.compute_write_frontier(*instance, *id).into_iter());
774 }
775 }
776 }
777 upper
778 }
779
780 fn greatest_available_read(&self, id_bundle: &CollectionIdBundle) -> Antichain<Timestamp> {
783 let mut frontier = Antichain::new();
784 for t in self.least_valid_write(id_bundle) {
785 frontier.insert(t.step_back().unwrap_or(t));
786 }
787 frontier
788 }
789}
790
791fn generate_timestamp_not_valid_error_msg(
792 id_bundle: &CollectionIdBundle,
793 compute_instance: ComputeInstanceId,
794 read_holds: &ReadHolds<mz_repr::Timestamp>,
795 candidate: mz_repr::Timestamp,
796) -> String {
797 let mut invalid = Vec::new();
798
799 if let Some(compute_ids) = id_bundle.compute_ids.get(&compute_instance) {
800 for id in compute_ids {
801 let since = read_holds.since(id);
802 if !since.less_equal(&candidate) {
803 invalid.push((*id, since));
804 }
805 }
806 }
807
808 for id in id_bundle.storage_ids.iter() {
809 let since = read_holds.since(id);
810 if !since.less_equal(&candidate) {
811 invalid.push((*id, since));
812 }
813 }
814
815 format!(
816 "Timestamp ({}) is not valid for all inputs: {:?}",
817 candidate, invalid,
818 )
819}
820
821impl Coordinator {
822 pub(crate) async fn oracle_read_ts(
823 &self,
824 session: &Session,
825 timeline_ctx: &TimelineContext,
826 when: &QueryWhen,
827 ) -> Option<Timestamp> {
828 let isolation_level = session.vars().transaction_isolation().clone();
829 let timeline = Coordinator::get_timeline(timeline_ctx);
830 let needs_linearized_read_ts =
831 Coordinator::needs_linearized_read_ts(&isolation_level, when);
832
833 let oracle_read_ts = match timeline {
834 Some(timeline) if needs_linearized_read_ts => {
835 let timestamp_oracle = self.get_timestamp_oracle(&timeline);
836 Some(timestamp_oracle.read_ts().await)
837 }
838 Some(_) | None => None,
839 };
840
841 oracle_read_ts
842 }
843
844 #[mz_ore::instrument(level = "debug")]
848 pub(crate) fn determine_timestamp(
849 &self,
850 session: &Session,
851 id_bundle: &CollectionIdBundle,
852 when: &QueryWhen,
853 compute_instance: ComputeInstanceId,
854 timeline_context: &TimelineContext,
855 oracle_read_ts: Option<Timestamp>,
856 real_time_recency_ts: Option<mz_repr::Timestamp>,
857 ) -> Result<
858 (
859 TimestampDetermination<mz_repr::Timestamp>,
860 ReadHolds<mz_repr::Timestamp>,
861 ),
862 AdapterError,
863 > {
864 let constraint_based = ConstraintBasedTimestampSelection::from_str(
865 &CONSTRAINT_BASED_TIMESTAMP_SELECTION
866 .get(self.catalog_state().system_config().dyncfgs()),
867 );
868
869 let isolation_level = session.vars().transaction_isolation();
870 let (det, read_holds) = self.determine_timestamp_for(
871 session,
872 id_bundle,
873 when,
874 compute_instance,
875 timeline_context,
876 oracle_read_ts,
877 real_time_recency_ts,
878 isolation_level,
879 &constraint_based,
880 )?;
881 self.metrics
882 .determine_timestamp
883 .with_label_values(&[
884 match det.respond_immediately() {
885 true => "true",
886 false => "false",
887 },
888 isolation_level.as_str(),
889 &compute_instance.to_string(),
890 constraint_based.as_str(),
891 ])
892 .inc();
893 if !det.respond_immediately()
894 && isolation_level == &IsolationLevel::StrictSerializable
895 && real_time_recency_ts.is_none()
896 {
897 if let Some(strict) = det.timestamp_context.timestamp() {
898 let (serializable_det, _tmp_read_holds) = self.determine_timestamp_for(
899 session,
900 id_bundle,
901 when,
902 compute_instance,
903 timeline_context,
904 oracle_read_ts,
905 real_time_recency_ts,
906 &IsolationLevel::Serializable,
907 &constraint_based,
908 )?;
909
910 if let Some(serializable) = serializable_det.timestamp_context.timestamp() {
911 self.metrics
912 .timestamp_difference_for_strict_serializable_ms
913 .with_label_values(&[
914 &compute_instance.to_string(),
915 constraint_based.as_str(),
916 ])
917 .observe(f64::cast_lossy(u64::from(
918 strict.saturating_sub(*serializable),
919 )));
920 }
921 }
922 }
923 Ok((det, read_holds))
924 }
925
926 pub(crate) fn largest_not_in_advance_of_upper(
931 upper: &Antichain<mz_repr::Timestamp>,
932 ) -> mz_repr::Timestamp {
933 if let Some(upper) = upper.as_option() {
938 upper.step_back().unwrap_or_else(Timestamp::minimum)
939 } else {
940 Timestamp::MAX
945 }
946 }
947}
948
949#[derive(Serialize, Deserialize, Debug, Clone)]
951pub struct TimestampDetermination<T> {
952 pub timestamp_context: TimestampContext<T>,
954 pub since: Antichain<T>,
956 pub upper: Antichain<T>,
958 pub largest_not_in_advance_of_upper: T,
960 pub oracle_read_ts: Option<T>,
962 pub session_oracle_read_ts: Option<T>,
964 pub real_time_recency_ts: Option<T>,
966 pub constraints: Option<Constraints>,
969}
970
971impl<T: TimestampManipulation> TimestampDetermination<T> {
972 pub fn respond_immediately(&self) -> bool {
973 match &self.timestamp_context {
974 TimestampContext::TimelineTimestamp { chosen_ts, .. } => {
975 !self.upper.less_equal(chosen_ts)
976 }
977 TimestampContext::NoTimestamp => true,
978 }
979 }
980}
981
982#[derive(Clone, Debug, Serialize, Deserialize)]
984pub struct TimestampExplanation<T> {
985 pub determination: TimestampDetermination<T>,
987 pub sources: Vec<TimestampSource<T>>,
989 pub session_wall_time: DateTime<Utc>,
991 pub respond_immediately: bool,
993}
994
995#[derive(Clone, Debug, Serialize, Deserialize)]
996pub struct TimestampSource<T> {
997 pub name: String,
998 pub read_frontier: Vec<T>,
999 pub write_frontier: Vec<T>,
1000}
1001
1002pub trait DisplayableInTimeline {
1003 fn fmt(&self, timeline: Option<&Timeline>, f: &mut fmt::Formatter) -> fmt::Result;
1004 fn display<'a>(&'a self, timeline: Option<&'a Timeline>) -> DisplayInTimeline<'a, Self> {
1005 DisplayInTimeline { t: self, timeline }
1006 }
1007}
1008
1009impl DisplayableInTimeline for mz_repr::Timestamp {
1010 fn fmt(&self, timeline: Option<&Timeline>, f: &mut fmt::Formatter) -> fmt::Result {
1011 if let Some(Timeline::EpochMilliseconds) = timeline {
1012 let ts_ms: u64 = self.into();
1013 if let Ok(ts_ms) = i64::try_from(ts_ms) {
1014 if let Some(ndt) = DateTime::from_timestamp_millis(ts_ms) {
1015 return write!(f, "{:13} ({})", self, ndt.format("%Y-%m-%d %H:%M:%S%.3f"));
1016 }
1017 }
1018 }
1019 write!(f, "{:13}", self)
1020 }
1021}
1022
1023pub struct DisplayInTimeline<'a, T: ?Sized> {
1024 t: &'a T,
1025 timeline: Option<&'a Timeline>,
1026}
1027impl<'a, T> fmt::Display for DisplayInTimeline<'a, T>
1028where
1029 T: DisplayableInTimeline,
1030{
1031 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1032 self.t.fmt(self.timeline, f)
1033 }
1034}
1035
1036impl<'a, T> fmt::Debug for DisplayInTimeline<'a, T>
1037where
1038 T: DisplayableInTimeline,
1039{
1040 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1041 fmt::Display::fmt(&self, f)
1042 }
1043}
1044
1045impl<T: fmt::Display + fmt::Debug + DisplayableInTimeline + TimestampManipulation> fmt::Display
1046 for TimestampExplanation<T>
1047{
1048 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1049 let timeline = self.determination.timestamp_context.timeline();
1050 writeln!(
1051 f,
1052 " query timestamp: {}",
1053 self.determination
1054 .timestamp_context
1055 .timestamp_or_default()
1056 .display(timeline)
1057 )?;
1058 if let Some(oracle_read_ts) = &self.determination.oracle_read_ts {
1059 writeln!(
1060 f,
1061 " oracle read timestamp: {}",
1062 oracle_read_ts.display(timeline)
1063 )?;
1064 }
1065 if let Some(session_oracle_read_ts) = &self.determination.session_oracle_read_ts {
1066 writeln!(
1067 f,
1068 " session oracle read timestamp: {}",
1069 session_oracle_read_ts.display(timeline)
1070 )?;
1071 }
1072 if let Some(real_time_recency_ts) = &self.determination.real_time_recency_ts {
1073 writeln!(
1074 f,
1075 " real time recency timestamp: {}",
1076 real_time_recency_ts.display(timeline)
1077 )?;
1078 }
1079 writeln!(
1080 f,
1081 "largest not in advance of upper: {}",
1082 self.determination
1083 .largest_not_in_advance_of_upper
1084 .display(timeline),
1085 )?;
1086 writeln!(
1087 f,
1088 " upper:{:?}",
1089 self.determination
1090 .upper
1091 .iter()
1092 .map(|t| t.display(timeline))
1093 .collect::<Vec<_>>()
1094 )?;
1095 writeln!(
1096 f,
1097 " since:{:?}",
1098 self.determination
1099 .since
1100 .iter()
1101 .map(|t| t.display(timeline))
1102 .collect::<Vec<_>>()
1103 )?;
1104 writeln!(
1105 f,
1106 " can respond immediately: {}",
1107 self.respond_immediately
1108 )?;
1109 writeln!(f, " timeline: {:?}", &timeline)?;
1110 writeln!(
1111 f,
1112 " session wall time: {:13} ({})",
1113 self.session_wall_time.timestamp_millis(),
1114 self.session_wall_time.format("%Y-%m-%d %H:%M:%S%.3f"),
1115 )?;
1116
1117 for source in &self.sources {
1118 writeln!(f, "")?;
1119 writeln!(f, "source {}:", source.name)?;
1120 writeln!(
1121 f,
1122 " read frontier:{:?}",
1123 source
1124 .read_frontier
1125 .iter()
1126 .map(|t| t.display(timeline))
1127 .collect::<Vec<_>>()
1128 )?;
1129 writeln!(
1130 f,
1131 " write frontier:{:?}",
1132 source
1133 .write_frontier
1134 .iter()
1135 .map(|t| t.display(timeline))
1136 .collect::<Vec<_>>()
1137 )?;
1138 }
1139
1140 if let Some(constraints) = &self.determination.constraints {
1141 writeln!(f, "")?;
1142 writeln!(f, "binding constraints:")?;
1143 write!(f, "{}", constraints.display(timeline))?;
1144 }
1145
1146 Ok(())
1147 }
1148}
1149
1150mod constraints {
1152
1153 use core::fmt;
1154 use std::fmt::Debug;
1155
1156 use differential_dataflow::lattice::Lattice;
1157 use mz_storage_types::sources::Timeline;
1158 use serde::{Deserialize, Serialize};
1159 use timely::progress::{Antichain, Timestamp};
1160
1161 use mz_compute_types::ComputeInstanceId;
1162 use mz_repr::GlobalId;
1163 use mz_sql::session::vars::IsolationLevel;
1164
1165 use super::DisplayableInTimeline;
1166
1167 #[derive(Default, Serialize, Deserialize, Clone)]
1180 pub struct Constraints {
1181 pub lower: Vec<(Antichain<mz_repr::Timestamp>, Reason)>,
1183 pub upper: Vec<(Antichain<mz_repr::Timestamp>, Reason)>,
1185 }
1186
1187 impl DisplayableInTimeline for Constraints {
1188 fn fmt(&self, timeline: Option<&Timeline>, f: &mut fmt::Formatter) -> fmt::Result {
1189 if !self.lower.is_empty() {
1190 writeln!(f, "lower:")?;
1191 for (ts, reason) in &self.lower {
1192 let ts = ts.iter().map(|t| t.display(timeline)).collect::<Vec<_>>();
1193 writeln!(f, " ({:?}): {:?}", reason, ts)?;
1194 }
1195 }
1196 if !self.upper.is_empty() {
1197 writeln!(f, "upper:")?;
1198 for (ts, reason) in &self.upper {
1199 let ts = ts.iter().map(|t| t.display(timeline)).collect::<Vec<_>>();
1200 writeln!(f, " ({:?}): {:?}", reason, ts)?;
1201 }
1202 }
1203 Ok(())
1204 }
1205 }
1206
1207 impl Debug for Constraints {
1208 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1209 self.display(None).fmt(f)?;
1210 Ok(())
1211 }
1212 }
1213
1214 impl Constraints {
1215 pub fn minimize(&mut self) {
1225 let lower_frontier = self.lower_bound();
1227 self.lower.retain(|(anti, _)| {
1229 anti.iter()
1230 .any(|time| lower_frontier.elements().contains(time))
1231 });
1232
1233 let upper_frontier = self.upper_bound();
1235 self.upper.retain(|(anti, _)| {
1237 anti.iter()
1238 .any(|time| upper_frontier.elements().contains(time))
1239 });
1240 }
1241
1242 pub fn lower_bound(&self) -> Antichain<mz_repr::Timestamp> {
1244 let mut lower = Antichain::from_elem(mz_repr::Timestamp::minimum());
1245 for (anti, _) in self.lower.iter() {
1246 lower = lower.join(anti);
1247 }
1248 lower
1249 }
1250 pub fn upper_bound(&self) -> Antichain<mz_repr::Timestamp> {
1252 self.upper
1253 .iter()
1254 .flat_map(|(anti, _)| anti.iter())
1255 .cloned()
1256 .collect()
1257 }
1258 }
1259
1260 #[derive(Serialize, Deserialize, Clone)]
1262 pub enum Reason {
1263 ComputeInput(Vec<(ComputeInstanceId, GlobalId)>),
1267 StorageInput(Vec<GlobalId>),
1269 IsolationLevel(IsolationLevel),
1271 RealTimeRecency,
1273 QueryAsOf,
1275 }
1276
1277 impl Debug for Reason {
1278 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1279 match self {
1280 Reason::ComputeInput(ids) => write_split_ids(f, "ComputeInput", ids),
1281 Reason::StorageInput(ids) => write_split_ids(f, "StorageInput", ids),
1282 Reason::IsolationLevel(level) => {
1283 write!(f, "IsolationLevel({:?})", level)
1284 }
1285 Reason::RealTimeRecency => {
1286 write!(f, "RealTimeRecency")
1287 }
1288 Reason::QueryAsOf => {
1289 write!(f, "QueryAsOf")
1290 }
1291 }
1292 }
1293 }
1294
1295 fn write_split_ids<T: Debug>(f: &mut fmt::Formatter, label: &str, ids: &[T]) -> fmt::Result {
1298 let (ids, rest) = if ids.len() > 10 {
1299 ids.split_at(10)
1300 } else {
1301 let rest: &[T] = &[];
1302 (ids, rest)
1303 };
1304 if rest.is_empty() {
1305 write!(f, "{}({:?})", label, ids)
1306 } else {
1307 write!(f, "{}({:?}, ... {} more)", label, ids, rest.len())
1308 }
1309 }
1310
1311 pub enum Preference {
1314 FreshestAvailable,
1327 StalestValid,
1333 }
1334}