1use std::fmt;
13
14use async_trait::async_trait;
15use chrono::{DateTime, Utc};
16use constraints::Constraints;
17use differential_dataflow::lattice::Lattice;
18use itertools::Itertools;
19use mz_adapter_types::dyncfgs::CONSTRAINT_BASED_TIMESTAMP_SELECTION;
20use mz_adapter_types::timestamp_selection::ConstraintBasedTimestampSelection;
21use mz_compute_types::ComputeInstanceId;
22use mz_ore::cast::CastLossy;
23use mz_ore::soft_assert_eq_or_log;
24use mz_repr::{GlobalId, Timestamp, TimestampManipulation};
25use mz_sql::plan::QueryWhen;
26use mz_sql::session::metadata::SessionMetadata;
27use mz_sql::session::vars::IsolationLevel;
28use mz_storage_types::sources::Timeline;
29use serde::{Deserialize, Serialize};
30use timely::progress::{Antichain, Timestamp as TimelyTimestamp};
31use tracing::{Level, event};
32
33use crate::AdapterError;
34use crate::catalog::CatalogState;
35use crate::coord::Coordinator;
36use crate::coord::id_bundle::CollectionIdBundle;
37use crate::coord::read_policy::ReadHolds;
38use crate::coord::timeline::TimelineContext;
39use crate::session::Session;
40
41#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
43pub enum TimestampContext<T> {
44 TimelineTimestamp {
46 timeline: Timeline,
47 chosen_ts: T,
54 oracle_ts: Option<T>,
58 },
59 NoTimestamp,
61}
62
63impl<T: TimestampManipulation> TimestampContext<T> {
64 pub fn from_timeline_context(
66 chosen_ts: T,
67 oracle_ts: Option<T>,
68 transaction_timeline: Option<Timeline>,
69 timeline_context: &TimelineContext,
70 ) -> TimestampContext<T> {
71 match timeline_context {
72 TimelineContext::TimelineDependent(timeline) => {
73 if let Some(transaction_timeline) = transaction_timeline {
74 assert_eq!(timeline, &transaction_timeline);
75 }
76 Self::TimelineTimestamp {
77 timeline: timeline.clone(),
78 chosen_ts,
79 oracle_ts,
80 }
81 }
82 TimelineContext::TimestampDependent => {
83 Self::TimelineTimestamp {
85 timeline: transaction_timeline.unwrap_or(Timeline::EpochMilliseconds),
86 chosen_ts,
87 oracle_ts,
88 }
89 }
90 TimelineContext::TimestampIndependent => Self::NoTimestamp,
91 }
92 }
93
94 pub fn timeline(&self) -> Option<&Timeline> {
96 self.timeline_timestamp().map(|tt| tt.0)
97 }
98
99 pub fn timestamp(&self) -> Option<&T> {
101 self.timeline_timestamp().map(|tt| tt.1)
102 }
103
104 pub fn timeline_timestamp(&self) -> Option<(&Timeline, &T)> {
106 match self {
107 Self::TimelineTimestamp {
108 timeline,
109 chosen_ts,
110 ..
111 } => Some((timeline, chosen_ts)),
112 Self::NoTimestamp => None,
113 }
114 }
115
116 pub fn timestamp_or_default(&self) -> T {
118 match self {
119 Self::TimelineTimestamp { chosen_ts, .. } => chosen_ts.clone(),
120 Self::NoTimestamp => T::maximum(),
124 }
125 }
126
127 pub fn contains_timestamp(&self) -> bool {
129 self.timestamp().is_some()
130 }
131
132 pub fn antichain(&self) -> Antichain<T> {
134 Antichain::from_elem(self.timestamp_or_default())
135 }
136}
137
138#[async_trait(?Send)]
139impl TimestampProvider for Coordinator {
140 fn compute_read_frontier(
142 &self,
143 instance: ComputeInstanceId,
144 id: GlobalId,
145 ) -> Antichain<Timestamp> {
146 self.controller
147 .compute
148 .collection_frontiers(id, Some(instance))
149 .expect("id does not exist")
150 .read_frontier
151 }
152
153 fn compute_write_frontier(
155 &self,
156 instance: ComputeInstanceId,
157 id: GlobalId,
158 ) -> Antichain<Timestamp> {
159 self.controller
160 .compute
161 .collection_frontiers(id, Some(instance))
162 .expect("id does not exist")
163 .write_frontier
164 }
165
166 fn storage_frontiers(
167 &self,
168 ids: Vec<GlobalId>,
169 ) -> Vec<(GlobalId, Antichain<Timestamp>, Antichain<Timestamp>)> {
170 self.controller
171 .storage
172 .collections_frontiers(ids)
173 .expect("missing collections")
174 }
175
176 fn acquire_read_holds(&self, id_bundle: &CollectionIdBundle) -> ReadHolds<Timestamp> {
177 self.acquire_read_holds(id_bundle)
178 }
179
180 fn catalog_state(&self) -> &CatalogState {
181 self.catalog().state()
182 }
183}
184
185#[derive(Debug, Clone, Serialize, Deserialize)]
188pub struct RawTimestampDetermination<T> {
189 pub timestamp: T,
190 pub constraints: Option<Constraints>,
191 pub session_oracle_read_ts: Option<T>,
192}
193
194#[async_trait(?Send)]
195pub trait TimestampProvider {
196 fn compute_read_frontier(
197 &self,
198 instance: ComputeInstanceId,
199 id: GlobalId,
200 ) -> Antichain<Timestamp>;
201 fn compute_write_frontier(
202 &self,
203 instance: ComputeInstanceId,
204 id: GlobalId,
205 ) -> Antichain<Timestamp>;
206
207 fn storage_frontiers(
210 &self,
211 ids: Vec<GlobalId>,
212 ) -> Vec<(GlobalId, Antichain<Timestamp>, Antichain<Timestamp>)>;
213
214 fn catalog_state(&self) -> &CatalogState;
215
216 fn get_timeline(timeline_context: &TimelineContext) -> Option<Timeline> {
217 let timeline = match timeline_context {
218 TimelineContext::TimelineDependent(timeline) => Some(timeline.clone()),
219 TimelineContext::TimestampDependent => Some(Timeline::EpochMilliseconds),
221 TimelineContext::TimestampIndependent => None,
222 };
223
224 timeline
225 }
226
227 fn needs_linearized_read_ts(isolation_level: &IsolationLevel, when: &QueryWhen) -> bool {
234 when.must_advance_to_timeline_ts()
240 || (when.can_advance_to_timeline_ts()
241 && matches!(
242 isolation_level,
243 IsolationLevel::StrictSerializable | IsolationLevel::StrongSessionSerializable
244 ))
245 }
246
247 fn determine_timestamp_classical(
249 session: &Session,
250 read_holds: &ReadHolds<Timestamp>,
251 id_bundle: &CollectionIdBundle,
252 when: &QueryWhen,
253 oracle_read_ts: Option<Timestamp>,
254 compute_instance: ComputeInstanceId,
255 real_time_recency_ts: Option<Timestamp>,
256 isolation_level: &IsolationLevel,
257 timeline: &Option<Timeline>,
258 largest_not_in_advance_of_upper: Timestamp,
259 since: &Antichain<Timestamp>,
260 ) -> Result<RawTimestampDetermination<Timestamp>, AdapterError> {
261 let mut session_oracle_read_ts = None;
262 {
275 if timeline.is_some() && Self::needs_linearized_read_ts(isolation_level, when) {
286 assert!(
287 oracle_read_ts.is_some(),
288 "should get a timestamp from the oracle for linearized timeline {:?} but didn't",
289 timeline
290 );
291 }
292 }
293
294 let mut candidate = Timestamp::minimum();
296
297 if let Some(ts) = when.advance_to_timestamp() {
298 candidate.join_assign(&ts);
299 }
300
301 if when.advance_to_since() {
302 candidate.advance_by(since.borrow());
304 }
305
306 if let Some(timestamp) = &oracle_read_ts {
311 if isolation_level != &IsolationLevel::StrongSessionSerializable
312 || when.must_advance_to_timeline_ts()
313 {
314 candidate.join_assign(timestamp);
315 }
316 }
317
318 if when.can_advance_to_upper()
325 && (isolation_level == &IsolationLevel::Serializable || timeline.is_none())
326 {
327 candidate.join_assign(&largest_not_in_advance_of_upper);
328 }
329
330 if let Some(real_time_recency_ts) = real_time_recency_ts {
331 if !(session.vars().real_time_recency()
332 && isolation_level == &IsolationLevel::StrictSerializable)
333 {
334 coord_bail!(
338 "real time recency timestamp should only be supplied when real time recency \
339 is enabled and the isolation level is strict serializable"
340 );
341 }
342 candidate.join_assign(&real_time_recency_ts);
343 }
344
345 if isolation_level == &IsolationLevel::StrongSessionSerializable {
346 if let Some(timeline) = &timeline {
347 if let Some(oracle) = session.get_timestamp_oracle(timeline) {
348 let session_ts = oracle.read_ts();
349 candidate.join_assign(&session_ts);
350 session_oracle_read_ts = Some(session_ts);
351 }
352 }
353
354 if when.can_advance_to_upper() && when.can_advance_to_timeline_ts() {
364 let mut advance_to = largest_not_in_advance_of_upper;
365 if let Some(oracle_read_ts) = oracle_read_ts {
366 advance_to = std::cmp::min(advance_to, oracle_read_ts);
367 }
368 candidate.join_assign(&advance_to);
369 }
370 }
371
372 let timestamp = if since.less_equal(&candidate) {
380 event!(
381 Level::DEBUG,
382 conn_id = format!("{}", session.conn_id()),
383 since = format!("{since:?}"),
384 largest_not_in_advance_of_upper = format!("{largest_not_in_advance_of_upper}"),
385 timestamp = format!("{candidate}")
386 );
387 candidate
388 } else {
389 coord_bail!(generate_timestamp_not_valid_error_msg(
392 id_bundle,
393 compute_instance,
394 read_holds,
395 candidate
396 ));
397 };
398 Ok(RawTimestampDetermination {
399 timestamp,
400 constraints: None,
401 session_oracle_read_ts,
402 })
403 }
404
405 fn determine_timestamp_via_constraints(
409 session: &Session,
410 read_holds: &ReadHolds<Timestamp>,
411 id_bundle: &CollectionIdBundle,
412 when: &QueryWhen,
413 oracle_read_ts: Option<Timestamp>,
414 compute_instance: ComputeInstanceId,
415 real_time_recency_ts: Option<Timestamp>,
416 isolation_level: &IsolationLevel,
417 timeline: &Option<Timeline>,
418 largest_not_in_advance_of_upper: Timestamp,
419 ) -> Result<RawTimestampDetermination<Timestamp>, AdapterError> {
420 use constraints::{Constraints, Preference, Reason};
421
422 let mut session_oracle_read_ts = None;
423 let constraints = {
427 let mut constraints = Constraints::default();
429
430 let since = read_holds.least_valid_read();
436 let storage = id_bundle
437 .storage_ids
438 .iter()
439 .cloned()
440 .collect::<Vec<GlobalId>>();
441 if !storage.is_empty() {
442 constraints
443 .lower
444 .push((since.clone(), Reason::StorageInput(storage)));
445 }
446 let compute = id_bundle
447 .compute_ids
448 .iter()
449 .flat_map(|(key, ids)| ids.iter().map(|id| (*key, *id)))
450 .collect::<Vec<(ComputeInstanceId, GlobalId)>>();
451 if !compute.is_empty() {
452 constraints
453 .lower
454 .push((since.clone(), Reason::ComputeInput(compute)));
455 }
456
457 if let Some(ts) = when.advance_to_timestamp() {
459 constraints
460 .lower
461 .push((Antichain::from_elem(ts), Reason::QueryAsOf));
462 if when.constrains_upper() {
464 constraints
465 .upper
466 .push((Antichain::from_elem(ts), Reason::QueryAsOf));
467 }
468 }
469
470 if let Some(timestamp) = &oracle_read_ts {
475 if isolation_level != &IsolationLevel::StrongSessionSerializable
476 || when.must_advance_to_timeline_ts()
477 {
478 constraints.lower.push((
481 Antichain::from_elem(*timestamp),
482 Reason::IsolationLevel(*isolation_level),
483 ));
484 }
485 }
486
487 if let Some(real_time_recency_ts) = real_time_recency_ts {
489 assert!(
490 session.vars().real_time_recency()
491 && isolation_level == &IsolationLevel::StrictSerializable,
492 "real time recency timestamp should only be supplied when real time recency \
493 is enabled and the isolation level is strict serializable"
494 );
495 constraints.lower.push((
496 Antichain::from_elem(real_time_recency_ts),
497 Reason::RealTimeRecency,
498 ));
499 }
500
501 if isolation_level == &IsolationLevel::StrongSessionSerializable {
503 if let Some(timeline) = &timeline {
504 if let Some(oracle) = session.get_timestamp_oracle(timeline) {
505 let session_ts = oracle.read_ts();
506 constraints.lower.push((
507 Antichain::from_elem(session_ts),
508 Reason::IsolationLevel(*isolation_level),
509 ));
510 session_oracle_read_ts = Some(session_ts);
511 }
512
513 if when.can_advance_to_upper() && when.can_advance_to_timeline_ts() {
523 let mut advance_to = largest_not_in_advance_of_upper;
524 if let Some(oracle_read_ts) = oracle_read_ts {
525 advance_to = std::cmp::min(advance_to, oracle_read_ts);
526 }
527 constraints.lower.push((
528 Antichain::from_elem(advance_to),
529 Reason::IsolationLevel(*isolation_level),
530 ));
531 }
532 }
533 }
534
535 constraints.minimize();
536 constraints
537 };
538
539 let preferences = {
544 if when.can_advance_to_upper()
549 && (isolation_level == &IsolationLevel::Serializable || timeline.is_none())
550 {
551 Preference::FreshestAvailable
552 } else {
553 Preference::StalestValid
554 }
555
556 };
560
561 let constraint_candidate = {
563 let mut candidate = Timestamp::minimum();
564 candidate.advance_by(constraints.lower_bound().borrow());
566 if let Preference::FreshestAvailable = preferences {
569 let mut upper_bound = constraints.upper_bound();
570 upper_bound.insert(largest_not_in_advance_of_upper);
571 candidate.advance_by(upper_bound.borrow());
572 }
573 if !constraints.lower_bound().less_equal(&candidate)
577 || constraints.upper_bound().less_than(&candidate)
578 {
579 coord_bail!(generate_timestamp_not_valid_error_msg(
581 id_bundle,
582 compute_instance,
583 read_holds,
584 candidate
585 ));
586 } else {
587 candidate
588 }
589 };
590
591 Ok(RawTimestampDetermination {
592 timestamp: constraint_candidate,
593 constraints: Some(constraints),
594 session_oracle_read_ts,
595 })
596 }
597
598 fn determine_timestamp_for(
606 &self,
607 session: &Session,
608 id_bundle: &CollectionIdBundle,
609 when: &QueryWhen,
610 compute_instance: ComputeInstanceId,
611 timeline_context: &TimelineContext,
612 oracle_read_ts: Option<Timestamp>,
613 real_time_recency_ts: Option<Timestamp>,
614 isolation_level: &IsolationLevel,
615 constraint_based: &ConstraintBasedTimestampSelection,
616 ) -> Result<(TimestampDetermination<Timestamp>, ReadHolds<Timestamp>), AdapterError> {
617 let read_holds = self.acquire_read_holds(id_bundle);
620
621 let upper = self.least_valid_write(id_bundle);
622
623 Self::determine_timestamp_for_inner(
624 session,
625 id_bundle,
626 when,
627 compute_instance,
628 timeline_context,
629 oracle_read_ts,
630 real_time_recency_ts,
631 isolation_level,
632 constraint_based,
633 read_holds,
634 upper,
635 )
636 }
637
638 fn determine_timestamp_for_inner(
640 session: &Session,
641 id_bundle: &CollectionIdBundle,
642 when: &QueryWhen,
643 compute_instance: ComputeInstanceId,
644 timeline_context: &TimelineContext,
645 oracle_read_ts: Option<Timestamp>,
646 real_time_recency_ts: Option<Timestamp>,
647 isolation_level: &IsolationLevel,
648 constraint_based: &ConstraintBasedTimestampSelection,
649 read_holds: ReadHolds<Timestamp>,
650 upper: Antichain<Timestamp>,
651 ) -> Result<(TimestampDetermination<Timestamp>, ReadHolds<Timestamp>), AdapterError> {
652 let timeline = Self::get_timeline(timeline_context);
653 let largest_not_in_advance_of_upper = Coordinator::largest_not_in_advance_of_upper(&upper);
654 let since = read_holds.least_valid_read();
655
656 if since.is_empty() {
660 let mut unreadable_collections = Vec::new();
662 for (coll_id, hold) in read_holds.storage_holds {
663 if hold.since().is_empty() {
664 unreadable_collections.push(coll_id);
665 }
666 }
667 for ((_instance_id, coll_id), hold) in read_holds.compute_holds {
668 if hold.since().is_empty() {
669 unreadable_collections.push(coll_id);
670 }
671 }
672 return Err(AdapterError::CollectionUnreadable {
673 id: unreadable_collections.into_iter().join(", "),
674 });
675 }
676
677 let raw_determination = match constraint_based {
678 ConstraintBasedTimestampSelection::Disabled => Self::determine_timestamp_classical(
679 session,
680 &read_holds,
681 id_bundle,
682 when,
683 oracle_read_ts,
684 compute_instance,
685 real_time_recency_ts,
686 isolation_level,
687 &timeline,
688 largest_not_in_advance_of_upper,
689 &since,
690 )?,
691 ConstraintBasedTimestampSelection::Enabled => {
692 Self::determine_timestamp_via_constraints(
693 session,
694 &read_holds,
695 id_bundle,
696 when,
697 oracle_read_ts,
698 compute_instance,
699 real_time_recency_ts,
700 isolation_level,
701 &timeline,
702 largest_not_in_advance_of_upper,
703 )?
704 }
705 ConstraintBasedTimestampSelection::Verify => {
706 let classical_determination = Self::determine_timestamp_classical(
707 session,
708 &read_holds,
709 id_bundle,
710 when,
711 oracle_read_ts,
712 compute_instance,
713 real_time_recency_ts,
714 isolation_level,
715 &timeline,
716 largest_not_in_advance_of_upper,
717 &since,
718 );
719
720 let constraint_determination = Self::determine_timestamp_via_constraints(
721 session,
722 &read_holds,
723 id_bundle,
724 when,
725 oracle_read_ts,
726 compute_instance,
727 real_time_recency_ts,
728 isolation_level,
729 &timeline,
730 largest_not_in_advance_of_upper,
731 );
732
733 match (classical_determination, constraint_determination) {
734 (Ok(classical_determination), Ok(constraint_determination)) => {
735 soft_assert_eq_or_log!(
736 classical_determination.timestamp,
737 constraint_determination.timestamp,
738 "timestamp determination mismatch"
739 );
740 if classical_determination.timestamp != constraint_determination.timestamp {
741 tracing::info!(
742 "timestamp constrains: {:?}",
743 constraint_determination.constraints
744 );
745 }
746 RawTimestampDetermination {
747 timestamp: classical_determination.timestamp,
748 constraints: constraint_determination.constraints,
749 session_oracle_read_ts: classical_determination.session_oracle_read_ts,
750 }
751 }
752 (Err(classical_determination_err), Err(_constraint_determination_err)) => {
753 return Err(classical_determination_err);
755 }
756 (Ok(classical_determination), Err(constraint_determination_err)) => {
757 event!(
758 Level::ERROR,
759 classical = ?classical_determination,
760 constraint_based = ?constraint_determination_err,
761 "classical timestamp determination succeeded, but constraint-based failed"
762 );
763 RawTimestampDetermination {
764 timestamp: classical_determination.timestamp,
765 constraints: classical_determination.constraints,
766 session_oracle_read_ts: classical_determination.session_oracle_read_ts,
767 }
768 }
769 (Err(classical_determination_err), Ok(constraint_determination)) => {
770 event!(
771 Level::ERROR,
772 classical = ?classical_determination_err,
773 constraint_based = ?constraint_determination,
774 "classical timestamp determination failed, but constraint-based succeeded"
775 );
776 return Err(classical_determination_err);
777 }
778 }
779 }
780 };
781
782 let timestamp_context = TimestampContext::from_timeline_context(
783 raw_determination.timestamp,
784 oracle_read_ts,
785 timeline,
786 timeline_context,
787 );
788
789 let determination = TimestampDetermination {
790 timestamp_context,
791 since,
792 upper,
793 largest_not_in_advance_of_upper,
794 oracle_read_ts,
795 session_oracle_read_ts: raw_determination.session_oracle_read_ts,
796 real_time_recency_ts,
797 constraints: raw_determination.constraints,
798 };
799
800 Ok((determination, read_holds))
801 }
802
803 fn acquire_read_holds(&self, id_bundle: &CollectionIdBundle) -> ReadHolds<mz_repr::Timestamp>;
806
807 fn least_valid_write(&self, id_bundle: &CollectionIdBundle) -> Antichain<mz_repr::Timestamp> {
812 let mut upper = Antichain::new();
813 {
814 for (_id, _since, collection_upper) in
815 self.storage_frontiers(id_bundle.storage_ids.iter().cloned().collect_vec())
816 {
817 upper.extend(collection_upper);
818 }
819 }
820 {
821 for (instance, compute_ids) in &id_bundle.compute_ids {
822 for id in compute_ids.iter() {
823 upper.extend(self.compute_write_frontier(*instance, *id).into_iter());
824 }
825 }
826 }
827 upper
828 }
829
830 fn greatest_available_read(&self, id_bundle: &CollectionIdBundle) -> Antichain<Timestamp> {
833 let mut frontier = Antichain::new();
834 for t in self.least_valid_write(id_bundle) {
835 frontier.insert(t.step_back().unwrap_or(t));
836 }
837 frontier
838 }
839}
840
841fn generate_timestamp_not_valid_error_msg(
842 id_bundle: &CollectionIdBundle,
843 compute_instance: ComputeInstanceId,
844 read_holds: &ReadHolds<mz_repr::Timestamp>,
845 candidate: mz_repr::Timestamp,
846) -> String {
847 let mut invalid = Vec::new();
848
849 if let Some(compute_ids) = id_bundle.compute_ids.get(&compute_instance) {
850 for id in compute_ids {
851 let since = read_holds.since(id);
852 if !since.less_equal(&candidate) {
853 invalid.push((*id, since));
854 }
855 }
856 }
857
858 for id in id_bundle.storage_ids.iter() {
859 let since = read_holds.since(id);
860 if !since.less_equal(&candidate) {
861 invalid.push((*id, since));
862 }
863 }
864
865 format!(
866 "Timestamp ({}) is not valid for all inputs: {:?}",
867 candidate, invalid,
868 )
869}
870
871impl Coordinator {
872 pub(crate) async fn oracle_read_ts(
873 &self,
874 session: &Session,
875 timeline_ctx: &TimelineContext,
876 when: &QueryWhen,
877 ) -> Option<Timestamp> {
878 let isolation_level = session.vars().transaction_isolation().clone();
879 let timeline = Coordinator::get_timeline(timeline_ctx);
880 let needs_linearized_read_ts =
881 Coordinator::needs_linearized_read_ts(&isolation_level, when);
882
883 let oracle_read_ts = match timeline {
884 Some(timeline) if needs_linearized_read_ts => {
885 let timestamp_oracle = self.get_timestamp_oracle(&timeline);
886 Some(timestamp_oracle.read_ts().await)
887 }
888 Some(_) | None => None,
889 };
890
891 oracle_read_ts
892 }
893
894 #[mz_ore::instrument(level = "debug")]
898 pub(crate) fn determine_timestamp(
899 &self,
900 session: &Session,
901 id_bundle: &CollectionIdBundle,
902 when: &QueryWhen,
903 compute_instance: ComputeInstanceId,
904 timeline_context: &TimelineContext,
905 oracle_read_ts: Option<Timestamp>,
906 real_time_recency_ts: Option<mz_repr::Timestamp>,
907 ) -> Result<
908 (
909 TimestampDetermination<mz_repr::Timestamp>,
910 ReadHolds<mz_repr::Timestamp>,
911 ),
912 AdapterError,
913 > {
914 let constraint_based = ConstraintBasedTimestampSelection::from_str(
915 &CONSTRAINT_BASED_TIMESTAMP_SELECTION
916 .get(self.catalog_state().system_config().dyncfgs()),
917 );
918
919 let isolation_level = session.vars().transaction_isolation();
920 let (det, read_holds) = self.determine_timestamp_for(
921 session,
922 id_bundle,
923 when,
924 compute_instance,
925 timeline_context,
926 oracle_read_ts,
927 real_time_recency_ts,
928 isolation_level,
929 &constraint_based,
930 )?;
931 self.metrics
932 .determine_timestamp
933 .with_label_values(&[
934 match det.respond_immediately() {
935 true => "true",
936 false => "false",
937 },
938 isolation_level.as_str(),
939 &compute_instance.to_string(),
940 constraint_based.as_str(),
941 ])
942 .inc();
943 if !det.respond_immediately()
944 && isolation_level == &IsolationLevel::StrictSerializable
945 && real_time_recency_ts.is_none()
946 {
947 if let Some(strict) = det.timestamp_context.timestamp() {
949 let (serializable_det, _tmp_read_holds) = self.determine_timestamp_for(
950 session,
951 id_bundle,
952 when,
953 compute_instance,
954 timeline_context,
955 oracle_read_ts,
956 real_time_recency_ts,
957 &IsolationLevel::Serializable,
958 &constraint_based,
959 )?;
960
961 if let Some(serializable) = serializable_det.timestamp_context.timestamp() {
962 self.metrics
963 .timestamp_difference_for_strict_serializable_ms
964 .with_label_values(&[
965 compute_instance.to_string().as_ref(),
966 constraint_based.as_str(),
967 ])
968 .observe(f64::cast_lossy(u64::from(
969 strict.saturating_sub(*serializable),
970 )));
971 }
972 }
973 }
974 Ok((det, read_holds))
975 }
976
977 pub(crate) fn largest_not_in_advance_of_upper(
982 upper: &Antichain<mz_repr::Timestamp>,
983 ) -> mz_repr::Timestamp {
984 if let Some(upper) = upper.as_option() {
989 upper.step_back().unwrap_or_else(Timestamp::minimum)
990 } else {
991 Timestamp::MAX
996 }
997 }
998}
999
1000#[derive(Serialize, Deserialize, Debug, Clone)]
1002pub struct TimestampDetermination<T> {
1003 pub timestamp_context: TimestampContext<T>,
1005 pub since: Antichain<T>,
1007 pub upper: Antichain<T>,
1009 pub largest_not_in_advance_of_upper: T,
1011 pub oracle_read_ts: Option<T>,
1013 pub session_oracle_read_ts: Option<T>,
1015 pub real_time_recency_ts: Option<T>,
1017 pub constraints: Option<Constraints>,
1020}
1021
1022impl<T: TimestampManipulation> TimestampDetermination<T> {
1023 pub fn respond_immediately(&self) -> bool {
1024 match &self.timestamp_context {
1025 TimestampContext::TimelineTimestamp { chosen_ts, .. } => {
1026 !self.upper.less_equal(chosen_ts)
1027 }
1028 TimestampContext::NoTimestamp => true,
1029 }
1030 }
1031}
1032
1033#[derive(Clone, Debug, Serialize, Deserialize)]
1035pub struct TimestampExplanation<T> {
1036 pub determination: TimestampDetermination<T>,
1038 pub sources: Vec<TimestampSource<T>>,
1040 pub session_wall_time: DateTime<Utc>,
1042 pub respond_immediately: bool,
1044}
1045
1046#[derive(Clone, Debug, Serialize, Deserialize)]
1047pub struct TimestampSource<T> {
1048 pub name: String,
1049 pub read_frontier: Vec<T>,
1050 pub write_frontier: Vec<T>,
1051}
1052
1053pub trait DisplayableInTimeline {
1054 fn fmt(&self, timeline: Option<&Timeline>, f: &mut fmt::Formatter) -> fmt::Result;
1055 fn display<'a>(&'a self, timeline: Option<&'a Timeline>) -> DisplayInTimeline<'a, Self> {
1056 DisplayInTimeline { t: self, timeline }
1057 }
1058}
1059
1060impl DisplayableInTimeline for mz_repr::Timestamp {
1061 fn fmt(&self, timeline: Option<&Timeline>, f: &mut fmt::Formatter) -> fmt::Result {
1062 if let Some(Timeline::EpochMilliseconds) = timeline {
1063 let ts_ms: u64 = self.into();
1064 if let Ok(ts_ms) = i64::try_from(ts_ms) {
1065 if let Some(ndt) = DateTime::from_timestamp_millis(ts_ms) {
1066 return write!(f, "{:13} ({})", self, ndt.format("%Y-%m-%d %H:%M:%S%.3f"));
1067 }
1068 }
1069 }
1070 write!(f, "{:13}", self)
1071 }
1072}
1073
1074pub struct DisplayInTimeline<'a, T: ?Sized> {
1075 t: &'a T,
1076 timeline: Option<&'a Timeline>,
1077}
1078impl<'a, T> fmt::Display for DisplayInTimeline<'a, T>
1079where
1080 T: DisplayableInTimeline,
1081{
1082 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1083 self.t.fmt(self.timeline, f)
1084 }
1085}
1086
1087impl<'a, T> fmt::Debug for DisplayInTimeline<'a, T>
1088where
1089 T: DisplayableInTimeline,
1090{
1091 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1092 fmt::Display::fmt(&self, f)
1093 }
1094}
1095
1096impl<T: fmt::Display + fmt::Debug + DisplayableInTimeline + TimestampManipulation> fmt::Display
1097 for TimestampExplanation<T>
1098{
1099 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1100 let timeline = self.determination.timestamp_context.timeline();
1101 writeln!(
1102 f,
1103 " query timestamp: {}",
1104 self.determination
1105 .timestamp_context
1106 .timestamp_or_default()
1107 .display(timeline)
1108 )?;
1109 if let Some(oracle_read_ts) = &self.determination.oracle_read_ts {
1110 writeln!(
1111 f,
1112 " oracle read timestamp: {}",
1113 oracle_read_ts.display(timeline)
1114 )?;
1115 }
1116 if let Some(session_oracle_read_ts) = &self.determination.session_oracle_read_ts {
1117 writeln!(
1118 f,
1119 " session oracle read timestamp: {}",
1120 session_oracle_read_ts.display(timeline)
1121 )?;
1122 }
1123 if let Some(real_time_recency_ts) = &self.determination.real_time_recency_ts {
1124 writeln!(
1125 f,
1126 " real time recency timestamp: {}",
1127 real_time_recency_ts.display(timeline)
1128 )?;
1129 }
1130 writeln!(
1131 f,
1132 "largest not in advance of upper: {}",
1133 self.determination
1134 .largest_not_in_advance_of_upper
1135 .display(timeline),
1136 )?;
1137 writeln!(
1138 f,
1139 " upper:{:?}",
1140 self.determination
1141 .upper
1142 .iter()
1143 .map(|t| t.display(timeline))
1144 .collect::<Vec<_>>()
1145 )?;
1146 writeln!(
1147 f,
1148 " since:{:?}",
1149 self.determination
1150 .since
1151 .iter()
1152 .map(|t| t.display(timeline))
1153 .collect::<Vec<_>>()
1154 )?;
1155 writeln!(
1156 f,
1157 " can respond immediately: {}",
1158 self.respond_immediately
1159 )?;
1160 writeln!(f, " timeline: {:?}", &timeline)?;
1161 writeln!(
1162 f,
1163 " session wall time: {:13} ({})",
1164 self.session_wall_time.timestamp_millis(),
1165 self.session_wall_time.format("%Y-%m-%d %H:%M:%S%.3f"),
1166 )?;
1167
1168 for source in &self.sources {
1169 writeln!(f, "")?;
1170 writeln!(f, "source {}:", source.name)?;
1171 writeln!(
1172 f,
1173 " read frontier:{:?}",
1174 source
1175 .read_frontier
1176 .iter()
1177 .map(|t| t.display(timeline))
1178 .collect::<Vec<_>>()
1179 )?;
1180 writeln!(
1181 f,
1182 " write frontier:{:?}",
1183 source
1184 .write_frontier
1185 .iter()
1186 .map(|t| t.display(timeline))
1187 .collect::<Vec<_>>()
1188 )?;
1189 }
1190
1191 if let Some(constraints) = &self.determination.constraints {
1192 writeln!(f, "")?;
1193 writeln!(f, "binding constraints:")?;
1194 write!(f, "{}", constraints.display(timeline))?;
1195 }
1196
1197 Ok(())
1198 }
1199}
1200
1201mod constraints {
1203
1204 use core::fmt;
1205 use std::fmt::Debug;
1206
1207 use differential_dataflow::lattice::Lattice;
1208 use mz_storage_types::sources::Timeline;
1209 use serde::{Deserialize, Serialize};
1210 use timely::progress::{Antichain, Timestamp};
1211
1212 use mz_compute_types::ComputeInstanceId;
1213 use mz_repr::GlobalId;
1214 use mz_sql::session::vars::IsolationLevel;
1215
1216 use super::DisplayableInTimeline;
1217
1218 #[derive(Default, Serialize, Deserialize, Clone)]
1231 pub struct Constraints {
1232 pub lower: Vec<(Antichain<mz_repr::Timestamp>, Reason)>,
1234 pub upper: Vec<(Antichain<mz_repr::Timestamp>, Reason)>,
1236 }
1237
1238 impl DisplayableInTimeline for Constraints {
1239 fn fmt(&self, timeline: Option<&Timeline>, f: &mut fmt::Formatter) -> fmt::Result {
1240 if !self.lower.is_empty() {
1241 writeln!(f, "lower:")?;
1242 for (ts, reason) in &self.lower {
1243 let ts = ts.iter().map(|t| t.display(timeline)).collect::<Vec<_>>();
1244 writeln!(f, " ({:?}): {:?}", reason, ts)?;
1245 }
1246 }
1247 if !self.upper.is_empty() {
1248 writeln!(f, "upper:")?;
1249 for (ts, reason) in &self.upper {
1250 let ts = ts.iter().map(|t| t.display(timeline)).collect::<Vec<_>>();
1251 writeln!(f, " ({:?}): {:?}", reason, ts)?;
1252 }
1253 }
1254 Ok(())
1255 }
1256 }
1257
1258 impl Debug for Constraints {
1259 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1260 self.display(None).fmt(f)?;
1261 Ok(())
1262 }
1263 }
1264
1265 impl Constraints {
1266 pub fn minimize(&mut self) {
1276 let lower_frontier = self.lower_bound();
1278 self.lower.retain(|(anti, _)| {
1280 anti.iter()
1281 .any(|time| lower_frontier.elements().contains(time))
1282 });
1283
1284 let upper_frontier = self.upper_bound();
1286 self.upper.retain(|(anti, _)| {
1288 anti.iter()
1289 .any(|time| upper_frontier.elements().contains(time))
1290 });
1291 }
1292
1293 pub fn lower_bound(&self) -> Antichain<mz_repr::Timestamp> {
1295 let mut lower = Antichain::from_elem(mz_repr::Timestamp::minimum());
1296 for (anti, _) in self.lower.iter() {
1297 lower = lower.join(anti);
1298 }
1299 lower
1300 }
1301 pub fn upper_bound(&self) -> Antichain<mz_repr::Timestamp> {
1303 self.upper
1304 .iter()
1305 .flat_map(|(anti, _)| anti.iter())
1306 .cloned()
1307 .collect()
1308 }
1309 }
1310
1311 #[derive(Serialize, Deserialize, Clone)]
1313 pub enum Reason {
1314 ComputeInput(Vec<(ComputeInstanceId, GlobalId)>),
1318 StorageInput(Vec<GlobalId>),
1320 IsolationLevel(IsolationLevel),
1322 RealTimeRecency,
1324 QueryAsOf,
1326 }
1327
1328 impl Debug for Reason {
1329 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1330 match self {
1331 Reason::ComputeInput(ids) => write_split_ids(f, "ComputeInput", ids),
1332 Reason::StorageInput(ids) => write_split_ids(f, "StorageInput", ids),
1333 Reason::IsolationLevel(level) => {
1334 write!(f, "IsolationLevel({:?})", level)
1335 }
1336 Reason::RealTimeRecency => {
1337 write!(f, "RealTimeRecency")
1338 }
1339 Reason::QueryAsOf => {
1340 write!(f, "QueryAsOf")
1341 }
1342 }
1343 }
1344 }
1345
1346 fn write_split_ids<T: Debug>(f: &mut fmt::Formatter, label: &str, ids: &[T]) -> fmt::Result {
1349 let (ids, rest) = if ids.len() > 10 {
1350 ids.split_at(10)
1351 } else {
1352 let rest: &[T] = &[];
1353 (ids, rest)
1354 };
1355 if rest.is_empty() {
1356 write!(f, "{}({:?})", label, ids)
1357 } else {
1358 write!(f, "{}({:?}, ... {} more)", label, ids, rest.len())
1359 }
1360 }
1361
1362 pub enum Preference {
1365 FreshestAvailable,
1378 StalestValid,
1384 }
1385}