1use std::fmt;
13
14use async_trait::async_trait;
15use chrono::{DateTime, Utc};
16use constraints::Constraints;
17use differential_dataflow::lattice::Lattice;
18use itertools::Itertools;
19use mz_adapter_types::dyncfgs::CONSTRAINT_BASED_TIMESTAMP_SELECTION;
20use mz_adapter_types::timestamp_selection::ConstraintBasedTimestampSelection;
21use mz_compute_types::ComputeInstanceId;
22use mz_expr::MirScalarExpr;
23use mz_ore::cast::CastLossy;
24use mz_ore::soft_assert_eq_or_log;
25use mz_repr::explain::ExprHumanizer;
26use mz_repr::{GlobalId, RowArena, SqlScalarType, Timestamp, TimestampManipulation};
27use mz_sql::plan::QueryWhen;
28use mz_sql::session::metadata::SessionMetadata;
29use mz_sql::session::vars::IsolationLevel;
30use mz_storage_types::sources::Timeline;
31use serde::{Deserialize, Serialize};
32use timely::progress::{Antichain, Timestamp as TimelyTimestamp};
33use tracing::{Level, event};
34
35use crate::AdapterError;
36use crate::catalog::CatalogState;
37use crate::coord::Coordinator;
38use crate::coord::id_bundle::CollectionIdBundle;
39use crate::coord::read_policy::ReadHolds;
40use crate::coord::timeline::TimelineContext;
41use crate::optimize::dataflows::{ExprPrepStyle, prep_scalar_expr};
42use crate::session::Session;
43
44#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
46pub enum TimestampContext<T> {
47 TimelineTimestamp {
49 timeline: Timeline,
50 chosen_ts: T,
57 oracle_ts: Option<T>,
61 },
62 NoTimestamp,
64}
65
66impl<T: TimestampManipulation> TimestampContext<T> {
67 pub fn from_timeline_context(
69 chosen_ts: T,
70 oracle_ts: Option<T>,
71 transaction_timeline: Option<Timeline>,
72 timeline_context: &TimelineContext,
73 ) -> TimestampContext<T> {
74 match timeline_context {
75 TimelineContext::TimelineDependent(timeline) => {
76 if let Some(transaction_timeline) = transaction_timeline {
77 assert_eq!(timeline, &transaction_timeline);
78 }
79 Self::TimelineTimestamp {
80 timeline: timeline.clone(),
81 chosen_ts,
82 oracle_ts,
83 }
84 }
85 TimelineContext::TimestampDependent => {
86 Self::TimelineTimestamp {
88 timeline: transaction_timeline.unwrap_or(Timeline::EpochMilliseconds),
89 chosen_ts,
90 oracle_ts,
91 }
92 }
93 TimelineContext::TimestampIndependent => Self::NoTimestamp,
94 }
95 }
96
97 pub fn timeline(&self) -> Option<&Timeline> {
99 self.timeline_timestamp().map(|tt| tt.0)
100 }
101
102 pub fn timestamp(&self) -> Option<&T> {
104 self.timeline_timestamp().map(|tt| tt.1)
105 }
106
107 pub fn timeline_timestamp(&self) -> Option<(&Timeline, &T)> {
109 match self {
110 Self::TimelineTimestamp {
111 timeline,
112 chosen_ts,
113 ..
114 } => Some((timeline, chosen_ts)),
115 Self::NoTimestamp => None,
116 }
117 }
118
119 pub fn timestamp_or_default(&self) -> T {
121 match self {
122 Self::TimelineTimestamp { chosen_ts, .. } => chosen_ts.clone(),
123 Self::NoTimestamp => T::maximum(),
127 }
128 }
129
130 pub fn contains_timestamp(&self) -> bool {
132 self.timestamp().is_some()
133 }
134
135 pub fn antichain(&self) -> Antichain<T> {
137 Antichain::from_elem(self.timestamp_or_default())
138 }
139}
140
141#[async_trait(?Send)]
142impl TimestampProvider for Coordinator {
143 fn compute_read_frontier(
145 &self,
146 instance: ComputeInstanceId,
147 id: GlobalId,
148 ) -> Antichain<Timestamp> {
149 self.controller
150 .compute
151 .collection_frontiers(id, Some(instance))
152 .expect("id does not exist")
153 .read_frontier
154 }
155
156 fn compute_write_frontier(
158 &self,
159 instance: ComputeInstanceId,
160 id: GlobalId,
161 ) -> Antichain<Timestamp> {
162 self.controller
163 .compute
164 .collection_frontiers(id, Some(instance))
165 .expect("id does not exist")
166 .write_frontier
167 }
168
169 fn storage_frontiers(
170 &self,
171 ids: Vec<GlobalId>,
172 ) -> Vec<(GlobalId, Antichain<Timestamp>, Antichain<Timestamp>)> {
173 self.controller
174 .storage
175 .collections_frontiers(ids)
176 .expect("missing collections")
177 }
178
179 fn acquire_read_holds(&self, id_bundle: &CollectionIdBundle) -> ReadHolds<Timestamp> {
180 self.acquire_read_holds(id_bundle)
181 }
182
183 fn catalog_state(&self) -> &CatalogState {
184 self.catalog().state()
185 }
186}
187
188#[derive(Debug, Clone, Serialize, Deserialize)]
191pub struct RawTimestampDetermination<T> {
192 pub timestamp: T,
193 pub constraints: Option<Constraints>,
194 pub session_oracle_read_ts: Option<T>,
195}
196
197#[async_trait(?Send)]
198pub trait TimestampProvider {
199 fn compute_read_frontier(
200 &self,
201 instance: ComputeInstanceId,
202 id: GlobalId,
203 ) -> Antichain<Timestamp>;
204 fn compute_write_frontier(
205 &self,
206 instance: ComputeInstanceId,
207 id: GlobalId,
208 ) -> Antichain<Timestamp>;
209
210 fn storage_frontiers(
213 &self,
214 ids: Vec<GlobalId>,
215 ) -> Vec<(GlobalId, Antichain<Timestamp>, Antichain<Timestamp>)>;
216
217 fn catalog_state(&self) -> &CatalogState;
218
219 fn get_timeline(timeline_context: &TimelineContext) -> Option<Timeline> {
220 let timeline = match timeline_context {
221 TimelineContext::TimelineDependent(timeline) => Some(timeline.clone()),
222 TimelineContext::TimestampDependent => Some(Timeline::EpochMilliseconds),
224 TimelineContext::TimestampIndependent => None,
225 };
226
227 timeline
228 }
229
230 fn needs_linearized_read_ts(isolation_level: &IsolationLevel, when: &QueryWhen) -> bool {
237 when.must_advance_to_timeline_ts()
243 || (when.can_advance_to_timeline_ts()
244 && matches!(
245 isolation_level,
246 IsolationLevel::StrictSerializable | IsolationLevel::StrongSessionSerializable
247 ))
248 }
249
250 fn determine_timestamp_classical(
252 &self,
253 session: &Session,
254 read_holds: &ReadHolds<Timestamp>,
255 id_bundle: &CollectionIdBundle,
256 when: &QueryWhen,
257 oracle_read_ts: Option<Timestamp>,
258 compute_instance: ComputeInstanceId,
259 real_time_recency_ts: Option<Timestamp>,
260 isolation_level: &IsolationLevel,
261 timeline: &Option<Timeline>,
262 largest_not_in_advance_of_upper: Timestamp,
263 since: &Antichain<Timestamp>,
264 ) -> Result<RawTimestampDetermination<Timestamp>, AdapterError> {
265 let mut session_oracle_read_ts = None;
266 {
279 if timeline.is_some() && Self::needs_linearized_read_ts(isolation_level, when) {
290 assert!(
291 oracle_read_ts.is_some(),
292 "should get a timestamp from the oracle for linearized timeline {:?} but didn't",
293 timeline
294 );
295 }
296 }
297
298 let mut candidate = Timestamp::minimum();
300
301 if let Some(timestamp) = when.advance_to_timestamp() {
302 let catalog_state = self.catalog_state();
303 let ts = Coordinator::evaluate_when(catalog_state, timestamp, session)?;
304 candidate.join_assign(&ts);
305 }
306
307 if when.advance_to_since() {
308 candidate.advance_by(since.borrow());
309 }
310
311 if let Some(timestamp) = &oracle_read_ts {
316 if isolation_level != &IsolationLevel::StrongSessionSerializable
317 || when.must_advance_to_timeline_ts()
318 {
319 candidate.join_assign(timestamp);
320 }
321 }
322
323 if when.can_advance_to_upper()
330 && (isolation_level == &IsolationLevel::Serializable || timeline.is_none())
331 {
332 candidate.join_assign(&largest_not_in_advance_of_upper);
333 }
334
335 if let Some(real_time_recency_ts) = real_time_recency_ts {
336 if !(session.vars().real_time_recency()
337 && isolation_level == &IsolationLevel::StrictSerializable)
338 {
339 coord_bail!(
343 "real time recency timestamp should only be supplied when real time recency \
344 is enabled and the isolation level is strict serializable"
345 );
346 }
347 candidate.join_assign(&real_time_recency_ts);
348 }
349
350 if isolation_level == &IsolationLevel::StrongSessionSerializable {
351 if let Some(timeline) = &timeline {
352 if let Some(oracle) = session.get_timestamp_oracle(timeline) {
353 let session_ts = oracle.read_ts();
354 candidate.join_assign(&session_ts);
355 session_oracle_read_ts = Some(session_ts);
356 }
357 }
358
359 if when.can_advance_to_upper() && when.can_advance_to_timeline_ts() {
369 let mut advance_to = largest_not_in_advance_of_upper;
370 if let Some(oracle_read_ts) = oracle_read_ts {
371 advance_to = std::cmp::min(advance_to, oracle_read_ts);
372 }
373 candidate.join_assign(&advance_to);
374 }
375 }
376
377 let timestamp = if since.less_equal(&candidate) {
385 event!(
386 Level::DEBUG,
387 conn_id = format!("{}", session.conn_id()),
388 since = format!("{since:?}"),
389 largest_not_in_advance_of_upper = format!("{largest_not_in_advance_of_upper}"),
390 timestamp = format!("{candidate}")
391 );
392 candidate
393 } else {
394 coord_bail!(generate_timestamp_not_valid_error_msg(
395 id_bundle,
396 compute_instance,
397 read_holds,
398 candidate
399 ));
400 };
401 Ok(RawTimestampDetermination {
402 timestamp,
403 constraints: None,
404 session_oracle_read_ts,
405 })
406 }
407
408 fn determine_timestamp_via_constraints(
412 &self,
413 session: &Session,
414 read_holds: &ReadHolds<Timestamp>,
415 id_bundle: &CollectionIdBundle,
416 when: &QueryWhen,
417 oracle_read_ts: Option<Timestamp>,
418 compute_instance: ComputeInstanceId,
419 real_time_recency_ts: Option<Timestamp>,
420 isolation_level: &IsolationLevel,
421 timeline: &Option<Timeline>,
422 largest_not_in_advance_of_upper: Timestamp,
423 ) -> Result<RawTimestampDetermination<Timestamp>, AdapterError> {
424 use constraints::{Constraints, Preference, Reason};
425
426 let mut session_oracle_read_ts = None;
427 let constraints = {
431 let mut constraints = Constraints::default();
433
434 let since = read_holds.least_valid_read();
440 let storage = id_bundle
441 .storage_ids
442 .iter()
443 .cloned()
444 .collect::<Vec<GlobalId>>();
445 if !storage.is_empty() {
446 constraints
447 .lower
448 .push((since.clone(), Reason::StorageInput(storage)));
449 }
450 let compute = id_bundle
451 .compute_ids
452 .iter()
453 .flat_map(|(key, ids)| ids.iter().map(|id| (*key, *id)))
454 .collect::<Vec<(ComputeInstanceId, GlobalId)>>();
455 if !compute.is_empty() {
456 constraints
457 .lower
458 .push((since.clone(), Reason::ComputeInput(compute)));
459 }
460
461 if let Some(timestamp) = when.advance_to_timestamp() {
463 let catalog_state = self.catalog_state();
464 let ts = Coordinator::evaluate_when(catalog_state, timestamp, session)?;
465 constraints
466 .lower
467 .push((Antichain::from_elem(ts), Reason::QueryAsOf));
468 if when.constrains_upper() {
470 constraints
471 .upper
472 .push((Antichain::from_elem(ts), Reason::QueryAsOf));
473 }
474 }
475
476 if let Some(timestamp) = &oracle_read_ts {
481 if isolation_level != &IsolationLevel::StrongSessionSerializable
482 || when.must_advance_to_timeline_ts()
483 {
484 constraints.lower.push((
487 Antichain::from_elem(*timestamp),
488 Reason::IsolationLevel(*isolation_level),
489 ));
490 }
491 }
492
493 if let Some(real_time_recency_ts) = real_time_recency_ts {
495 assert!(
496 session.vars().real_time_recency()
497 && isolation_level == &IsolationLevel::StrictSerializable,
498 "real time recency timestamp should only be supplied when real time recency \
499 is enabled and the isolation level is strict serializable"
500 );
501 constraints.lower.push((
502 Antichain::from_elem(real_time_recency_ts),
503 Reason::RealTimeRecency,
504 ));
505 }
506
507 if isolation_level == &IsolationLevel::StrongSessionSerializable {
509 if let Some(timeline) = &timeline {
510 if let Some(oracle) = session.get_timestamp_oracle(timeline) {
511 let session_ts = oracle.read_ts();
512 constraints.lower.push((
513 Antichain::from_elem(session_ts),
514 Reason::IsolationLevel(*isolation_level),
515 ));
516 session_oracle_read_ts = Some(session_ts);
517 }
518
519 if when.can_advance_to_upper() && when.can_advance_to_timeline_ts() {
529 let mut advance_to = largest_not_in_advance_of_upper;
530 if let Some(oracle_read_ts) = oracle_read_ts {
531 advance_to = std::cmp::min(advance_to, oracle_read_ts);
532 }
533 constraints.lower.push((
534 Antichain::from_elem(advance_to),
535 Reason::IsolationLevel(*isolation_level),
536 ));
537 }
538 }
539 }
540
541 constraints.minimize();
542 constraints
543 };
544
545 let preferences = {
550 if when.can_advance_to_upper()
555 && (isolation_level == &IsolationLevel::Serializable || timeline.is_none())
556 {
557 Preference::FreshestAvailable
558 } else {
559 Preference::StalestValid
560 }
561
562 };
566
567 let constraint_candidate = {
569 let mut candidate = Timestamp::minimum();
570 candidate.advance_by(constraints.lower_bound().borrow());
571 if let Preference::FreshestAvailable = preferences {
574 let mut upper_bound = constraints.upper_bound();
575 upper_bound.insert(largest_not_in_advance_of_upper);
576 candidate.advance_by(upper_bound.borrow());
577 }
578 if constraints.upper_bound().less_than(&candidate) {
580 coord_bail!(generate_timestamp_not_valid_error_msg(
581 id_bundle,
582 compute_instance,
583 read_holds,
584 candidate
585 ));
586 } else {
587 candidate
588 }
589 };
590
591 Ok(RawTimestampDetermination {
592 timestamp: constraint_candidate,
593 constraints: Some(constraints),
594 session_oracle_read_ts,
595 })
596 }
597
598 fn determine_timestamp_for(
606 &self,
607 session: &Session,
608 id_bundle: &CollectionIdBundle,
609 when: &QueryWhen,
610 compute_instance: ComputeInstanceId,
611 timeline_context: &TimelineContext,
612 oracle_read_ts: Option<Timestamp>,
613 real_time_recency_ts: Option<mz_repr::Timestamp>,
614 isolation_level: &IsolationLevel,
615 constraint_based: &ConstraintBasedTimestampSelection,
616 ) -> Result<
617 (
618 TimestampDetermination<mz_repr::Timestamp>,
619 ReadHolds<mz_repr::Timestamp>,
620 ),
621 AdapterError,
622 > {
623 let read_holds = self.acquire_read_holds(id_bundle);
626 let timeline = Self::get_timeline(timeline_context);
627
628 let since = read_holds.least_valid_read();
629 let upper = self.least_valid_write(id_bundle);
630 let largest_not_in_advance_of_upper = Coordinator::largest_not_in_advance_of_upper(&upper);
631
632 let raw_determination = match constraint_based {
633 ConstraintBasedTimestampSelection::Disabled => self.determine_timestamp_classical(
634 session,
635 &read_holds,
636 id_bundle,
637 when,
638 oracle_read_ts,
639 compute_instance,
640 real_time_recency_ts,
641 isolation_level,
642 &timeline,
643 largest_not_in_advance_of_upper,
644 &since,
645 )?,
646 ConstraintBasedTimestampSelection::Enabled => self
647 .determine_timestamp_via_constraints(
648 session,
649 &read_holds,
650 id_bundle,
651 when,
652 oracle_read_ts,
653 compute_instance,
654 real_time_recency_ts,
655 isolation_level,
656 &timeline,
657 largest_not_in_advance_of_upper,
658 )?,
659 ConstraintBasedTimestampSelection::Verify => {
660 let classical_determination = self.determine_timestamp_classical(
661 session,
662 &read_holds,
663 id_bundle,
664 when,
665 oracle_read_ts,
666 compute_instance,
667 real_time_recency_ts,
668 isolation_level,
669 &timeline,
670 largest_not_in_advance_of_upper,
671 &since,
672 )?;
673
674 match self.determine_timestamp_via_constraints(
675 session,
676 &read_holds,
677 id_bundle,
678 when,
679 oracle_read_ts,
680 compute_instance,
681 real_time_recency_ts,
682 isolation_level,
683 &timeline,
684 largest_not_in_advance_of_upper,
685 ) {
686 Ok(constraint_determination) => {
687 soft_assert_eq_or_log!(
688 classical_determination.timestamp,
689 constraint_determination.timestamp,
690 "timestamp determination mismatch"
691 );
692 if classical_determination.timestamp != constraint_determination.timestamp {
693 tracing::info!(
694 "timestamp constrains: {:?}",
695 constraint_determination.constraints
696 );
697 }
698 RawTimestampDetermination {
699 timestamp: classical_determination.timestamp,
700 constraints: constraint_determination.constraints,
701 session_oracle_read_ts: classical_determination.session_oracle_read_ts,
702 }
703 }
704 Err(e) => {
705 event!(Level::ERROR, error = ?e, "constraint-based timestamp determination failed");
706 RawTimestampDetermination {
707 timestamp: classical_determination.timestamp,
708 constraints: classical_determination.constraints,
709 session_oracle_read_ts: classical_determination.session_oracle_read_ts,
710 }
711 }
712 }
713 }
714 };
715
716 let timestamp_context = TimestampContext::from_timeline_context(
717 raw_determination.timestamp,
718 oracle_read_ts,
719 timeline,
720 timeline_context,
721 );
722
723 let determination = TimestampDetermination {
724 timestamp_context,
725 since,
726 upper,
727 largest_not_in_advance_of_upper,
728 oracle_read_ts,
729 session_oracle_read_ts: raw_determination.session_oracle_read_ts,
730 real_time_recency_ts,
731 constraints: raw_determination.constraints,
732 };
733
734 Ok((determination, read_holds))
735 }
736
737 fn acquire_read_holds(&self, id_bundle: &CollectionIdBundle) -> ReadHolds<mz_repr::Timestamp>;
740
741 fn least_valid_write(&self, id_bundle: &CollectionIdBundle) -> Antichain<mz_repr::Timestamp> {
746 let mut upper = Antichain::new();
747 {
748 for (_id, _since, collection_upper) in
749 self.storage_frontiers(id_bundle.storage_ids.iter().cloned().collect_vec())
750 {
751 upper.extend(collection_upper);
752 }
753 }
754 {
755 for (instance, compute_ids) in &id_bundle.compute_ids {
756 for id in compute_ids.iter() {
757 upper.extend(self.compute_write_frontier(*instance, *id).into_iter());
758 }
759 }
760 }
761 upper
762 }
763
764 fn greatest_available_read(&self, id_bundle: &CollectionIdBundle) -> Antichain<Timestamp> {
767 let mut frontier = Antichain::new();
768 for t in self.least_valid_write(id_bundle) {
769 frontier.insert(t.step_back().unwrap_or(t));
770 }
771 frontier
772 }
773}
774
775fn generate_timestamp_not_valid_error_msg(
776 id_bundle: &CollectionIdBundle,
777 compute_instance: ComputeInstanceId,
778 read_holds: &ReadHolds<mz_repr::Timestamp>,
779 candidate: mz_repr::Timestamp,
780) -> String {
781 let mut invalid = Vec::new();
782
783 if let Some(compute_ids) = id_bundle.compute_ids.get(&compute_instance) {
784 for id in compute_ids {
785 let since = read_holds.since(id);
786 if !since.less_equal(&candidate) {
787 invalid.push((*id, since));
788 }
789 }
790 }
791
792 for id in id_bundle.storage_ids.iter() {
793 let since = read_holds.since(id);
794 if !since.less_equal(&candidate) {
795 invalid.push((*id, since));
796 }
797 }
798
799 format!(
800 "Timestamp ({}) is not valid for all inputs: {:?}",
801 candidate, invalid,
802 )
803}
804
805impl Coordinator {
806 pub(crate) async fn oracle_read_ts(
807 &self,
808 session: &Session,
809 timeline_ctx: &TimelineContext,
810 when: &QueryWhen,
811 ) -> Option<Timestamp> {
812 let isolation_level = session.vars().transaction_isolation().clone();
813 let timeline = Coordinator::get_timeline(timeline_ctx);
814 let needs_linearized_read_ts =
815 Coordinator::needs_linearized_read_ts(&isolation_level, when);
816
817 let oracle_read_ts = match timeline {
818 Some(timeline) if needs_linearized_read_ts => {
819 let timestamp_oracle = self.get_timestamp_oracle(&timeline);
820 Some(timestamp_oracle.read_ts().await)
821 }
822 Some(_) | None => None,
823 };
824
825 oracle_read_ts
826 }
827
828 #[mz_ore::instrument(level = "debug")]
832 pub(crate) fn determine_timestamp(
833 &self,
834 session: &Session,
835 id_bundle: &CollectionIdBundle,
836 when: &QueryWhen,
837 compute_instance: ComputeInstanceId,
838 timeline_context: &TimelineContext,
839 oracle_read_ts: Option<Timestamp>,
840 real_time_recency_ts: Option<mz_repr::Timestamp>,
841 ) -> Result<
842 (
843 TimestampDetermination<mz_repr::Timestamp>,
844 ReadHolds<mz_repr::Timestamp>,
845 ),
846 AdapterError,
847 > {
848 let constraint_based = ConstraintBasedTimestampSelection::from_str(
849 &CONSTRAINT_BASED_TIMESTAMP_SELECTION
850 .get(self.catalog_state().system_config().dyncfgs()),
851 );
852
853 let isolation_level = session.vars().transaction_isolation();
854 let (det, read_holds) = self.determine_timestamp_for(
855 session,
856 id_bundle,
857 when,
858 compute_instance,
859 timeline_context,
860 oracle_read_ts,
861 real_time_recency_ts,
862 isolation_level,
863 &constraint_based,
864 )?;
865 self.metrics
866 .determine_timestamp
867 .with_label_values(&[
868 match det.respond_immediately() {
869 true => "true",
870 false => "false",
871 },
872 isolation_level.as_str(),
873 &compute_instance.to_string(),
874 constraint_based.as_str(),
875 ])
876 .inc();
877 if !det.respond_immediately()
878 && isolation_level == &IsolationLevel::StrictSerializable
879 && real_time_recency_ts.is_none()
880 {
881 if let Some(strict) = det.timestamp_context.timestamp() {
882 let (serializable_det, _tmp_read_holds) = self.determine_timestamp_for(
883 session,
884 id_bundle,
885 when,
886 compute_instance,
887 timeline_context,
888 oracle_read_ts,
889 real_time_recency_ts,
890 &IsolationLevel::Serializable,
891 &constraint_based,
892 )?;
893
894 if let Some(serializable) = serializable_det.timestamp_context.timestamp() {
895 self.metrics
896 .timestamp_difference_for_strict_serializable_ms
897 .with_label_values(&[
898 &compute_instance.to_string(),
899 constraint_based.as_str(),
900 ])
901 .observe(f64::cast_lossy(u64::from(
902 strict.saturating_sub(*serializable),
903 )));
904 }
905 }
906 }
907 Ok((det, read_holds))
908 }
909
910 pub(crate) fn largest_not_in_advance_of_upper(
915 upper: &Antichain<mz_repr::Timestamp>,
916 ) -> mz_repr::Timestamp {
917 if let Some(upper) = upper.as_option() {
922 upper.step_back().unwrap_or_else(Timestamp::minimum)
923 } else {
924 Timestamp::MAX
929 }
930 }
931
932 pub(crate) fn evaluate_when(
933 catalog: &CatalogState,
934 mut timestamp: MirScalarExpr,
935 session: &Session,
936 ) -> Result<mz_repr::Timestamp, AdapterError> {
937 let temp_storage = RowArena::new();
938 prep_scalar_expr(&mut timestamp, ExprPrepStyle::AsOfUpTo)?;
939 let evaled = timestamp.eval(&[], &temp_storage)?;
940 if evaled.is_null() {
941 coord_bail!("can't use {} as a mz_timestamp for AS OF or UP TO", evaled);
942 }
943 let ty = timestamp.typ(&[]);
944 Ok(match ty.scalar_type {
945 SqlScalarType::MzTimestamp => evaled.unwrap_mz_timestamp(),
946 SqlScalarType::Numeric { .. } => {
947 let n = evaled.unwrap_numeric().0;
948 n.try_into()?
949 }
950 SqlScalarType::Int16 => i64::from(evaled.unwrap_int16()).try_into()?,
951 SqlScalarType::Int32 => i64::from(evaled.unwrap_int32()).try_into()?,
952 SqlScalarType::Int64 => evaled.unwrap_int64().try_into()?,
953 SqlScalarType::UInt16 => u64::from(evaled.unwrap_uint16()).into(),
954 SqlScalarType::UInt32 => u64::from(evaled.unwrap_uint32()).into(),
955 SqlScalarType::UInt64 => evaled.unwrap_uint64().into(),
956 SqlScalarType::TimestampTz { .. } => {
957 evaled.unwrap_timestamptz().timestamp_millis().try_into()?
958 }
959 SqlScalarType::Timestamp { .. } => evaled
960 .unwrap_timestamp()
961 .and_utc()
962 .timestamp_millis()
963 .try_into()?,
964 _ => coord_bail!(
965 "can't use {} as a mz_timestamp for AS OF or UP TO",
966 catalog
967 .for_session(session)
968 .humanize_column_type(&ty, false)
969 ),
970 })
971 }
972}
973
974#[derive(Serialize, Deserialize, Debug, Clone)]
976pub struct TimestampDetermination<T> {
977 pub timestamp_context: TimestampContext<T>,
979 pub since: Antichain<T>,
981 pub upper: Antichain<T>,
983 pub largest_not_in_advance_of_upper: T,
985 pub oracle_read_ts: Option<T>,
987 pub session_oracle_read_ts: Option<T>,
989 pub real_time_recency_ts: Option<T>,
991 pub constraints: Option<Constraints>,
994}
995
996impl<T: TimestampManipulation> TimestampDetermination<T> {
997 pub fn respond_immediately(&self) -> bool {
998 match &self.timestamp_context {
999 TimestampContext::TimelineTimestamp { chosen_ts, .. } => {
1000 !self.upper.less_equal(chosen_ts)
1001 }
1002 TimestampContext::NoTimestamp => true,
1003 }
1004 }
1005}
1006
1007#[derive(Clone, Debug, Serialize, Deserialize)]
1009pub struct TimestampExplanation<T> {
1010 pub determination: TimestampDetermination<T>,
1012 pub sources: Vec<TimestampSource<T>>,
1014 pub session_wall_time: DateTime<Utc>,
1016 pub respond_immediately: bool,
1018}
1019
1020#[derive(Clone, Debug, Serialize, Deserialize)]
1021pub struct TimestampSource<T> {
1022 pub name: String,
1023 pub read_frontier: Vec<T>,
1024 pub write_frontier: Vec<T>,
1025}
1026
1027pub trait DisplayableInTimeline {
1028 fn fmt(&self, timeline: Option<&Timeline>, f: &mut fmt::Formatter) -> fmt::Result;
1029 fn display<'a>(&'a self, timeline: Option<&'a Timeline>) -> DisplayInTimeline<'a, Self> {
1030 DisplayInTimeline { t: self, timeline }
1031 }
1032}
1033
1034impl DisplayableInTimeline for mz_repr::Timestamp {
1035 fn fmt(&self, timeline: Option<&Timeline>, f: &mut fmt::Formatter) -> fmt::Result {
1036 if let Some(Timeline::EpochMilliseconds) = timeline {
1037 let ts_ms: u64 = self.into();
1038 if let Ok(ts_ms) = i64::try_from(ts_ms) {
1039 if let Some(ndt) = DateTime::from_timestamp_millis(ts_ms) {
1040 return write!(f, "{:13} ({})", self, ndt.format("%Y-%m-%d %H:%M:%S%.3f"));
1041 }
1042 }
1043 }
1044 write!(f, "{:13}", self)
1045 }
1046}
1047
1048pub struct DisplayInTimeline<'a, T: ?Sized> {
1049 t: &'a T,
1050 timeline: Option<&'a Timeline>,
1051}
1052impl<'a, T> fmt::Display for DisplayInTimeline<'a, T>
1053where
1054 T: DisplayableInTimeline,
1055{
1056 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1057 self.t.fmt(self.timeline, f)
1058 }
1059}
1060
1061impl<'a, T> fmt::Debug for DisplayInTimeline<'a, T>
1062where
1063 T: DisplayableInTimeline,
1064{
1065 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1066 fmt::Display::fmt(&self, f)
1067 }
1068}
1069
1070impl<T: fmt::Display + fmt::Debug + DisplayableInTimeline + TimestampManipulation> fmt::Display
1071 for TimestampExplanation<T>
1072{
1073 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1074 let timeline = self.determination.timestamp_context.timeline();
1075 writeln!(
1076 f,
1077 " query timestamp: {}",
1078 self.determination
1079 .timestamp_context
1080 .timestamp_or_default()
1081 .display(timeline)
1082 )?;
1083 if let Some(oracle_read_ts) = &self.determination.oracle_read_ts {
1084 writeln!(
1085 f,
1086 " oracle read timestamp: {}",
1087 oracle_read_ts.display(timeline)
1088 )?;
1089 }
1090 if let Some(session_oracle_read_ts) = &self.determination.session_oracle_read_ts {
1091 writeln!(
1092 f,
1093 " session oracle read timestamp: {}",
1094 session_oracle_read_ts.display(timeline)
1095 )?;
1096 }
1097 if let Some(real_time_recency_ts) = &self.determination.real_time_recency_ts {
1098 writeln!(
1099 f,
1100 " real time recency timestamp: {}",
1101 real_time_recency_ts.display(timeline)
1102 )?;
1103 }
1104 writeln!(
1105 f,
1106 "largest not in advance of upper: {}",
1107 self.determination
1108 .largest_not_in_advance_of_upper
1109 .display(timeline),
1110 )?;
1111 writeln!(
1112 f,
1113 " upper:{:?}",
1114 self.determination
1115 .upper
1116 .iter()
1117 .map(|t| t.display(timeline))
1118 .collect::<Vec<_>>()
1119 )?;
1120 writeln!(
1121 f,
1122 " since:{:?}",
1123 self.determination
1124 .since
1125 .iter()
1126 .map(|t| t.display(timeline))
1127 .collect::<Vec<_>>()
1128 )?;
1129 writeln!(
1130 f,
1131 " can respond immediately: {}",
1132 self.respond_immediately
1133 )?;
1134 writeln!(f, " timeline: {:?}", &timeline)?;
1135 writeln!(
1136 f,
1137 " session wall time: {:13} ({})",
1138 self.session_wall_time.timestamp_millis(),
1139 self.session_wall_time.format("%Y-%m-%d %H:%M:%S%.3f"),
1140 )?;
1141
1142 for source in &self.sources {
1143 writeln!(f, "")?;
1144 writeln!(f, "source {}:", source.name)?;
1145 writeln!(
1146 f,
1147 " read frontier:{:?}",
1148 source
1149 .read_frontier
1150 .iter()
1151 .map(|t| t.display(timeline))
1152 .collect::<Vec<_>>()
1153 )?;
1154 writeln!(
1155 f,
1156 " write frontier:{:?}",
1157 source
1158 .write_frontier
1159 .iter()
1160 .map(|t| t.display(timeline))
1161 .collect::<Vec<_>>()
1162 )?;
1163 }
1164
1165 if let Some(constraints) = &self.determination.constraints {
1166 writeln!(f, "")?;
1167 writeln!(f, "binding constraints:")?;
1168 write!(f, "{}", constraints.display(timeline))?;
1169 }
1170
1171 Ok(())
1172 }
1173}
1174
1175mod constraints {
1177
1178 use core::fmt;
1179 use std::fmt::Debug;
1180
1181 use differential_dataflow::lattice::Lattice;
1182 use mz_storage_types::sources::Timeline;
1183 use serde::{Deserialize, Serialize};
1184 use timely::progress::{Antichain, Timestamp};
1185
1186 use mz_compute_types::ComputeInstanceId;
1187 use mz_repr::GlobalId;
1188 use mz_sql::session::vars::IsolationLevel;
1189
1190 use super::DisplayableInTimeline;
1191
1192 #[derive(Default, Serialize, Deserialize, Clone)]
1205 pub struct Constraints {
1206 pub lower: Vec<(Antichain<mz_repr::Timestamp>, Reason)>,
1208 pub upper: Vec<(Antichain<mz_repr::Timestamp>, Reason)>,
1210 }
1211
1212 impl DisplayableInTimeline for Constraints {
1213 fn fmt(&self, timeline: Option<&Timeline>, f: &mut fmt::Formatter) -> fmt::Result {
1214 if !self.lower.is_empty() {
1215 writeln!(f, "lower:")?;
1216 for (ts, reason) in &self.lower {
1217 let ts = ts.iter().map(|t| t.display(timeline)).collect::<Vec<_>>();
1218 writeln!(f, " ({:?}): {:?}", reason, ts)?;
1219 }
1220 }
1221 if !self.upper.is_empty() {
1222 writeln!(f, "upper:")?;
1223 for (ts, reason) in &self.upper {
1224 let ts = ts.iter().map(|t| t.display(timeline)).collect::<Vec<_>>();
1225 writeln!(f, " ({:?}): {:?}", reason, ts)?;
1226 }
1227 }
1228 Ok(())
1229 }
1230 }
1231
1232 impl Debug for Constraints {
1233 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1234 self.display(None).fmt(f)?;
1235 Ok(())
1236 }
1237 }
1238
1239 impl Constraints {
1240 pub fn minimize(&mut self) {
1250 let lower_frontier = self.lower_bound();
1252 self.lower.retain(|(anti, _)| {
1254 anti.iter()
1255 .any(|time| lower_frontier.elements().contains(time))
1256 });
1257
1258 let upper_frontier = self.upper_bound();
1260 self.upper.retain(|(anti, _)| {
1262 anti.iter()
1263 .any(|time| upper_frontier.elements().contains(time))
1264 });
1265 }
1266
1267 pub fn lower_bound(&self) -> Antichain<mz_repr::Timestamp> {
1269 let mut lower = Antichain::from_elem(mz_repr::Timestamp::minimum());
1270 for (anti, _) in self.lower.iter() {
1271 lower = lower.join(anti);
1272 }
1273 lower
1274 }
1275 pub fn upper_bound(&self) -> Antichain<mz_repr::Timestamp> {
1277 self.upper
1278 .iter()
1279 .flat_map(|(anti, _)| anti.iter())
1280 .cloned()
1281 .collect()
1282 }
1283 }
1284
1285 #[derive(Serialize, Deserialize, Clone)]
1287 pub enum Reason {
1288 ComputeInput(Vec<(ComputeInstanceId, GlobalId)>),
1292 StorageInput(Vec<GlobalId>),
1294 IsolationLevel(IsolationLevel),
1296 RealTimeRecency,
1298 QueryAsOf,
1300 }
1301
1302 impl Debug for Reason {
1303 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1304 match self {
1305 Reason::ComputeInput(ids) => write_split_ids(f, "ComputeInput", ids),
1306 Reason::StorageInput(ids) => write_split_ids(f, "StorageInput", ids),
1307 Reason::IsolationLevel(level) => {
1308 write!(f, "IsolationLevel({:?})", level)
1309 }
1310 Reason::RealTimeRecency => {
1311 write!(f, "RealTimeRecency")
1312 }
1313 Reason::QueryAsOf => {
1314 write!(f, "QueryAsOf")
1315 }
1316 }
1317 }
1318 }
1319
1320 fn write_split_ids<T: Debug>(f: &mut fmt::Formatter, label: &str, ids: &[T]) -> fmt::Result {
1323 let (ids, rest) = if ids.len() > 10 {
1324 ids.split_at(10)
1325 } else {
1326 let rest: &[T] = &[];
1327 (ids, rest)
1328 };
1329 if rest.is_empty() {
1330 write!(f, "{}({:?})", label, ids)
1331 } else {
1332 write!(f, "{}({:?}, ... {} more)", label, ids, rest.len())
1333 }
1334 }
1335
1336 pub enum Preference {
1339 FreshestAvailable,
1352 StalestValid,
1358 }
1359}