1use std::any::Any;
9use std::cell::RefCell;
10use std::cmp::Ordering;
11use std::collections::{BTreeMap, BTreeSet};
12use std::num::NonZeroUsize;
13use std::ops::DerefMut;
14use std::rc::Rc;
15use std::sync::{Arc, mpsc};
16use std::time::{Duration, Instant};
17
18use bytesize::ByteSize;
19use differential_dataflow::Hashable;
20use differential_dataflow::IntoOwned;
21use differential_dataflow::lattice::Lattice;
22use differential_dataflow::trace::{Cursor, TraceReader};
23use mz_compute_client::logging::LoggingConfig;
24use mz_compute_client::protocol::command::{
25 ComputeCommand, ComputeParameters, InstanceConfig, Peek, PeekTarget,
26};
27use mz_compute_client::protocol::history::ComputeCommandHistory;
28use mz_compute_client::protocol::response::{
29 ComputeResponse, CopyToResponse, FrontiersResponse, OperatorHydrationStatus, PeekResponse,
30 StatusResponse, SubscribeResponse,
31};
32use mz_compute_types::dataflows::DataflowDescription;
33use mz_compute_types::plan::LirId;
34use mz_compute_types::plan::render_plan::RenderPlan;
35use mz_dyncfg::ConfigSet;
36use mz_expr::SafeMfpPlan;
37use mz_expr::row::RowCollection;
38use mz_ore::cast::CastFrom;
39use mz_ore::collections::CollectionExt;
40use mz_ore::metrics::UIntGauge;
41use mz_ore::now::EpochMillis;
42use mz_ore::task::AbortOnDropHandle;
43use mz_ore::tracing::{OpenTelemetryContext, TracingHandle};
44use mz_persist_client::Diagnostics;
45use mz_persist_client::cache::PersistClientCache;
46use mz_persist_client::cfg::USE_CRITICAL_SINCE_SNAPSHOT;
47use mz_persist_client::read::ReadHandle;
48use mz_persist_types::codec_impls::UnitSchema;
49use mz_repr::fixed_length::ToDatumIter;
50use mz_repr::{DatumVec, Diff, GlobalId, Row, RowArena, Timestamp};
51use mz_storage_operators::stats::StatsCursor;
52use mz_storage_types::StorageDiff;
53use mz_storage_types::controller::CollectionMetadata;
54use mz_storage_types::dyncfgs::ORE_OVERFLOWING_BEHAVIOR;
55use mz_storage_types::sources::SourceData;
56use mz_storage_types::time_dependence::TimeDependence;
57use mz_txn_wal::operator::TxnsContext;
58use mz_txn_wal::txn_cache::TxnsCache;
59use timely::communication::Allocate;
60use timely::dataflow::operators::probe;
61use timely::order::PartialOrder;
62use timely::progress::frontier::Antichain;
63use timely::scheduling::Scheduler;
64use timely::worker::Worker as TimelyWorker;
65use tokio::sync::{oneshot, watch};
66use tracing::{Level, debug, error, info, span, warn};
67use uuid::Uuid;
68
69use crate::arrangement::manager::{TraceBundle, TraceManager};
70use crate::logging;
71use crate::logging::compute::{CollectionLogging, ComputeEvent, PeekEvent};
72use crate::logging::initialize::LoggingTraces;
73use crate::metrics::{CollectionMetrics, WorkerMetrics};
74use crate::render::{LinearJoinSpec, StartSignal};
75use crate::server::{ComputeInstanceContext, ResponseSender};
76
77pub struct ComputeState {
82 pub collections: BTreeMap<GlobalId, CollectionState>,
91 pub dropped_collections: Vec<(GlobalId, DroppedCollection)>,
93 pub traces: TraceManager,
95 pub subscribe_response_buffer: Rc<RefCell<Vec<(GlobalId, SubscribeResponse)>>>,
100 pub copy_to_response_buffer: Rc<RefCell<Vec<(GlobalId, CopyToResponse)>>>,
105 pub pending_peeks: BTreeMap<Uuid, PendingPeek>,
107 pub compute_logger: Option<logging::compute::Logger>,
109 pub persist_clients: Arc<PersistClientCache>,
112 pub txns_ctx: TxnsContext,
114 pub command_history: ComputeCommandHistory<UIntGauge>,
116 max_result_size: u64,
118 pub linear_join_spec: LinearJoinSpec,
120 pub metrics: WorkerMetrics,
122 tracing_handle: Arc<TracingHandle>,
124 pub context: ComputeInstanceContext,
126 pub worker_config: Rc<ConfigSet>,
140
141 pub hydration_rx: mpsc::Receiver<HydrationEvent>,
143 pub hydration_tx: mpsc::Sender<HydrationEvent>,
147
148 suspended_collections: BTreeMap<GlobalId, Rc<dyn Any>>,
154
155 pub read_only_rx: watch::Receiver<bool>,
166
167 pub read_only_tx: watch::Sender<bool>,
169
170 pub server_maintenance_interval: Duration,
173
174 pub init_system_time: EpochMillis,
178
179 pub replica_expiration: Antichain<Timestamp>,
183}
184
185impl ComputeState {
186 pub fn new(
188 persist_clients: Arc<PersistClientCache>,
189 txns_ctx: TxnsContext,
190 metrics: WorkerMetrics,
191 tracing_handle: Arc<TracingHandle>,
192 context: ComputeInstanceContext,
193 ) -> Self {
194 let traces = TraceManager::new(metrics.clone());
195 let command_history = ComputeCommandHistory::new(metrics.for_history());
196 let (hydration_tx, hydration_rx) = mpsc::channel();
197
198 let (read_only_tx, read_only_rx) = watch::channel(true);
201
202 Self {
203 collections: Default::default(),
204 dropped_collections: Default::default(),
205 traces,
206 subscribe_response_buffer: Default::default(),
207 copy_to_response_buffer: Default::default(),
208 pending_peeks: Default::default(),
209 compute_logger: None,
210 persist_clients,
211 txns_ctx,
212 command_history,
213 max_result_size: u64::MAX,
214 linear_join_spec: Default::default(),
215 metrics,
216 tracing_handle,
217 context,
218 worker_config: mz_dyncfgs::all_dyncfgs().into(),
219 hydration_rx,
220 hydration_tx,
221 suspended_collections: Default::default(),
222 read_only_tx,
223 read_only_rx,
224 server_maintenance_interval: Duration::ZERO,
225 init_system_time: mz_ore::now::SYSTEM_TIME(),
226 replica_expiration: Antichain::default(),
227 }
228 }
229
230 pub fn expect_collection_mut(&mut self, id: GlobalId) -> &mut CollectionState {
234 self.collections
235 .get_mut(&id)
236 .expect("collection must exist")
237 }
238
239 pub fn input_probe_for(
245 &mut self,
246 input_id: GlobalId,
247 collection_ids: impl Iterator<Item = GlobalId>,
248 ) -> probe::Handle<Timestamp> {
249 let probe = probe::Handle::default();
250 for id in collection_ids {
251 if let Some(collection) = self.collections.get_mut(&id) {
252 collection.input_probes.insert(input_id, probe.clone());
253 }
254 }
255 probe
256 }
257
258 fn apply_worker_config(&mut self) {
260 use mz_compute_types::dyncfgs::*;
261
262 let config = &self.worker_config;
263
264 self.linear_join_spec = LinearJoinSpec::from_config(config);
265
266 if ENABLE_LGALLOC.get(config) {
267 if let Some(path) = &self.context.scratch_directory {
268 let clear_bytes = LGALLOC_SLOW_CLEAR_BYTES.get(config);
269 let eager_return = ENABLE_LGALLOC_EAGER_RECLAMATION.get(config);
270 let file_growth_dampener = LGALLOC_FILE_GROWTH_DAMPENER.get(config);
271 let interval = LGALLOC_BACKGROUND_INTERVAL.get(config);
272 let local_buffer_bytes = LGALLOC_LOCAL_BUFFER_BYTES.get(config);
273 info!(
274 ?path,
275 backgrund_interval=?interval,
276 clear_bytes,
277 eager_return,
278 file_growth_dampener,
279 local_buffer_bytes,
280 "enabling lgalloc"
281 );
282 let background_worker_config = lgalloc::BackgroundWorkerConfig {
283 interval,
284 clear_bytes,
285 };
286 lgalloc::lgalloc_set_config(
287 lgalloc::LgAlloc::new()
288 .enable()
289 .with_path(path.clone())
290 .with_background_config(background_worker_config)
291 .eager_return(eager_return)
292 .file_growth_dampener(file_growth_dampener)
293 .local_buffer_bytes(local_buffer_bytes),
294 );
295 } else {
296 debug!("not enabling lgalloc, scratch directory not specified");
297 }
298 } else {
299 info!("disabling lgalloc");
300 lgalloc::lgalloc_set_config(lgalloc::LgAlloc::new().disable());
301 }
302
303 mz_ore::region::ENABLE_LGALLOC_REGION.store(
304 ENABLE_COLUMNATION_LGALLOC.get(config),
305 std::sync::atomic::Ordering::Relaxed,
306 );
307
308 let enable_columnar_lgalloc = ENABLE_COLUMNAR_LGALLOC.get(config);
309 mz_timely_util::containers::set_enable_columnar_lgalloc(enable_columnar_lgalloc);
310
311 self.server_maintenance_interval = COMPUTE_SERVER_MAINTENANCE_INTERVAL.get(config);
314
315 let overflowing_behavior = ORE_OVERFLOWING_BEHAVIOR.get(config);
316 match overflowing_behavior.parse() {
317 Ok(behavior) => mz_ore::overflowing::set_behavior(behavior),
318 Err(err) => {
319 error!(
320 err,
321 overflowing_behavior, "Invalid value for ore_overflowing_behavior"
322 );
323 }
324 }
325 }
326
327 pub fn apply_expiration_offset(&mut self, offset: Duration) {
333 if self.replica_expiration.is_empty() {
334 let offset: EpochMillis = offset
335 .as_millis()
336 .try_into()
337 .expect("duration must fit within u64");
338 let replica_expiration_millis = self.init_system_time + offset;
339 let replica_expiration = Timestamp::from(replica_expiration_millis);
340
341 info!(
342 offset = %offset,
343 replica_expiration_millis = %replica_expiration_millis,
344 replica_expiration_utc = %mz_ore::now::to_datetime(replica_expiration_millis),
345 "setting replica expiration",
346 );
347 self.replica_expiration = Antichain::from_elem(replica_expiration);
348
349 self.metrics
351 .replica_expiration_timestamp_seconds
352 .set(replica_expiration.into());
353 }
354 }
355
356 pub fn dataflow_max_inflight_bytes(&self) -> Option<usize> {
359 use mz_compute_types::dyncfgs::{
360 DATAFLOW_MAX_INFLIGHT_BYTES, DATAFLOW_MAX_INFLIGHT_BYTES_CC,
361 };
362
363 if self.persist_clients.cfg.is_cc_active {
364 DATAFLOW_MAX_INFLIGHT_BYTES_CC.get(&self.worker_config)
365 } else {
366 DATAFLOW_MAX_INFLIGHT_BYTES.get(&self.worker_config)
367 }
368 }
369}
370
371pub(crate) struct ActiveComputeState<'a, A: Allocate> {
373 pub timely_worker: &'a mut TimelyWorker<A>,
375 pub compute_state: &'a mut ComputeState,
377 pub response_tx: &'a mut ResponseSender,
379}
380
381pub struct SinkToken(#[allow(dead_code)] Box<dyn Any>);
383
384impl SinkToken {
385 pub fn new(t: Box<dyn Any>) -> Self {
387 Self(t)
388 }
389}
390
391impl<'a, A: Allocate + 'static> ActiveComputeState<'a, A> {
392 #[mz_ore::instrument(level = "debug")]
394 pub fn handle_compute_command(&mut self, cmd: ComputeCommand) {
395 use ComputeCommand::*;
396
397 self.compute_state.command_history.push(cmd.clone());
398
399 let timer = self
401 .compute_state
402 .metrics
403 .handle_command_duration_seconds
404 .for_command(&cmd)
405 .start_timer();
406
407 match cmd {
408 CreateTimely { .. } => panic!("CreateTimely must be captured before"),
409 CreateInstance(instance_config) => self.handle_create_instance(*instance_config),
410 InitializationComplete => (),
411 UpdateConfiguration(params) => self.handle_update_configuration(*params),
412 CreateDataflow(dataflow) => self.handle_create_dataflow(*dataflow),
413 Schedule(id) => self.handle_schedule(id),
414 AllowCompaction { id, frontier } => self.handle_allow_compaction(id, frontier),
415 Peek(peek) => {
416 peek.otel_ctx.attach_as_parent();
417 self.handle_peek(*peek)
418 }
419 CancelPeek { uuid } => self.handle_cancel_peek(uuid),
420 AllowWrites => {
421 self.compute_state
422 .read_only_tx
423 .send(false)
424 .expect("we're holding one other end");
425 self.compute_state.persist_clients.cfg().enable_compaction();
426 }
427 }
428
429 timer.observe_duration();
430 }
431
432 fn handle_create_instance(&mut self, config: InstanceConfig) {
433 self.compute_state.apply_worker_config();
435 if let Some(offset) = config.expiration_offset {
436 self.compute_state.apply_expiration_offset(offset);
437 }
438
439 self.initialize_logging(config.logging);
440 }
441
442 fn handle_update_configuration(&mut self, params: ComputeParameters) {
443 info!("Applying configuration update: {params:?}");
444
445 let ComputeParameters {
446 workload_class,
447 max_result_size,
448 tracing,
449 grpc_client: _grpc_client,
450 dyncfg_updates,
451 } = params;
452
453 if let Some(v) = workload_class {
454 self.compute_state.metrics.set_workload_class(v);
455 }
456 if let Some(v) = max_result_size {
457 self.compute_state.max_result_size = v;
458 }
459
460 tracing.apply(self.compute_state.tracing_handle.as_ref());
461
462 dyncfg_updates.apply(&self.compute_state.worker_config);
463 self.compute_state
464 .persist_clients
465 .cfg()
466 .apply_from(&dyncfg_updates);
467
468 mz_metrics::update_dyncfg(&dyncfg_updates);
472
473 self.compute_state.apply_worker_config();
474 }
475
476 fn handle_create_dataflow(
477 &mut self,
478 dataflow: DataflowDescription<RenderPlan, CollectionMetadata>,
479 ) {
480 let dataflow_index = self.timely_worker.next_dataflow_index();
483 let as_of = dataflow.as_of.clone().unwrap();
484
485 let dataflow_expiration = dataflow
486 .time_dependence
487 .as_ref()
488 .map(|time_dependence| {
489 self.determine_dataflow_expiration(time_dependence, &dataflow.until)
490 })
491 .unwrap_or_default();
492
493 let until = dataflow.until.meet(&dataflow_expiration);
495
496 if dataflow.is_transient() {
497 debug!(
498 name = %dataflow.debug_name,
499 import_ids = %dataflow.display_import_ids(),
500 export_ids = %dataflow.display_export_ids(),
501 as_of = ?as_of.elements(),
502 time_dependence = ?dataflow.time_dependence,
503 expiration = ?dataflow_expiration.elements(),
504 expiration_datetime = ?dataflow_expiration.as_option().map(|t| mz_ore::now::to_datetime(t.into())),
505 plan_until = ?dataflow.until.elements(),
506 until = ?until.elements(),
507 "creating dataflow",
508 );
509 } else {
510 info!(
511 name = %dataflow.debug_name,
512 import_ids = %dataflow.display_import_ids(),
513 export_ids = %dataflow.display_export_ids(),
514 as_of = ?as_of.elements(),
515 time_dependence = ?dataflow.time_dependence,
516 expiration = ?dataflow_expiration.elements(),
517 expiration_datetime = ?dataflow_expiration.as_option().map(|t| mz_ore::now::to_datetime(t.into())),
518 plan_until = ?dataflow.until.elements(),
519 until = ?until.elements(),
520 "creating dataflow",
521 );
522 };
523
524 let subscribe_copy_ids: BTreeSet<_> = dataflow
525 .subscribe_ids()
526 .chain(dataflow.copy_to_ids())
527 .collect();
528
529 for object_id in dataflow.export_ids() {
531 let is_subscribe_or_copy = subscribe_copy_ids.contains(&object_id);
532 let metrics = self.compute_state.metrics.for_collection(object_id);
533 let mut collection = CollectionState::new(is_subscribe_or_copy, as_of.clone(), metrics);
534
535 if let Some(logger) = self.compute_state.compute_logger.clone() {
536 let logging = CollectionLogging::new(
537 object_id,
538 logger,
539 dataflow_index,
540 dataflow.import_ids(),
541 );
542 collection.logging = Some(logging);
543 }
544
545 collection.reset_reported_frontiers(ReportedFrontier::NotReported {
546 lower: as_of.clone(),
547 });
548
549 let existing = self.compute_state.collections.insert(object_id, collection);
550 if existing.is_some() {
551 error!(
552 id = ?object_id,
553 "existing collection for newly created dataflow",
554 );
555 }
556 }
557
558 let (start_signal, suspension_token) = StartSignal::new();
559 for id in dataflow.export_ids() {
560 self.compute_state
561 .suspended_collections
562 .insert(id, Rc::clone(&suspension_token));
563 }
564
565 crate::render::build_compute_dataflow(
566 self.timely_worker,
567 self.compute_state,
568 dataflow,
569 start_signal,
570 until,
571 dataflow_expiration,
572 );
573 }
574
575 fn handle_schedule(&mut self, id: GlobalId) {
576 let suspension_token = self.compute_state.suspended_collections.remove(&id);
582 drop(suspension_token);
583 }
584
585 fn handle_allow_compaction(&mut self, id: GlobalId, frontier: Antichain<Timestamp>) {
586 if frontier.is_empty() {
587 self.drop_collection(id);
589 } else {
590 self.compute_state
591 .traces
592 .allow_compaction(id, frontier.borrow());
593 }
594 }
595
596 #[mz_ore::instrument(level = "debug")]
597 fn handle_peek(&mut self, peek: Peek) {
598 let pending = match &peek.target {
599 PeekTarget::Index { id } => {
600 let trace_bundle = self.compute_state.traces.get(id).unwrap().clone();
602 PendingPeek::index(peek, trace_bundle)
603 }
604 PeekTarget::Persist { metadata, .. } => {
605 let metadata = metadata.clone();
606 PendingPeek::persist(
607 peek,
608 Arc::clone(&self.compute_state.persist_clients),
609 metadata,
610 usize::cast_from(self.compute_state.max_result_size),
611 self.timely_worker,
612 )
613 }
614 };
615
616 if let Some(logger) = self.compute_state.compute_logger.as_mut() {
618 logger.log(&pending.as_log_event(true));
619 }
620
621 self.process_peek(&mut Antichain::new(), pending);
622 }
623
624 fn handle_cancel_peek(&mut self, uuid: Uuid) {
625 if let Some(peek) = self.compute_state.pending_peeks.remove(&uuid) {
626 self.send_peek_response(peek, PeekResponse::Canceled);
627 }
628 }
629
630 fn drop_collection(&mut self, id: GlobalId) {
647 let collection = self
648 .compute_state
649 .collections
650 .remove(&id)
651 .expect("dropped untracked collection");
652
653 self.compute_state.traces.remove(&id);
655 self.compute_state.suspended_collections.remove(&id);
657
658 let dropped = DroppedCollection {
660 reported_frontiers: collection.reported_frontiers,
661 is_subscribe_or_copy: collection.is_subscribe_or_copy,
662 };
663 self.compute_state.dropped_collections.push((id, dropped));
664 }
665
666 pub fn initialize_logging(&mut self, config: LoggingConfig) {
668 if self.compute_state.compute_logger.is_some() {
669 panic!("dataflow server has already initialized logging");
670 }
671
672 let LoggingTraces {
673 traces,
674 dataflow_index,
675 compute_logger: logger,
676 } = logging::initialize(self.timely_worker, &config);
677
678 let mut log_index_ids = config.index_logs;
679 for (log, trace) in traces {
680 let id = log_index_ids
682 .remove(&log)
683 .expect("`logging::initialize` does not invent logs");
684 self.compute_state.traces.set(id, trace);
685
686 let is_subscribe_or_copy = false;
688 let as_of = Antichain::from_elem(Timestamp::MIN);
689 let metrics = self.compute_state.metrics.for_collection(id);
690 let mut collection = CollectionState::new(is_subscribe_or_copy, as_of, metrics);
691
692 let logging =
693 CollectionLogging::new(id, logger.clone(), dataflow_index, std::iter::empty());
694 collection.logging = Some(logging);
695
696 let existing = self.compute_state.collections.insert(id, collection);
697 if existing.is_some() {
698 error!(
699 id = ?id,
700 "existing collection for newly initialized logging export",
701 );
702 }
703 }
704
705 assert!(
707 log_index_ids.is_empty(),
708 "failed to create requested logging indexes: {log_index_ids:?}",
709 );
710
711 self.compute_state.compute_logger = Some(logger);
712 }
713
714 pub fn report_frontiers(&mut self) {
716 let mut responses = Vec::new();
717
718 let mut new_frontier = Antichain::new();
720
721 for (&id, collection) in self.compute_state.collections.iter_mut() {
722 if collection.is_subscribe_or_copy {
725 continue;
726 }
727
728 let reported = collection.reported_frontiers();
729
730 new_frontier.clear();
732 if let Some(traces) = self.compute_state.traces.get_mut(&id) {
733 assert!(
734 collection.sink_write_frontier.is_none(),
735 "collection {id} has multiple frontiers"
736 );
737 traces.oks_mut().read_upper(&mut new_frontier);
738 } else if let Some(frontier) = &collection.sink_write_frontier {
739 new_frontier.clone_from(&frontier.borrow());
740 } else {
741 error!(id = ?id, "collection without write frontier");
742 continue;
743 }
744 let new_write_frontier = reported
745 .write_frontier
746 .allows_reporting(&new_frontier)
747 .then(|| new_frontier.clone());
748
749 if let Some(probe) = &collection.compute_probe {
758 probe.with_frontier(|frontier| new_frontier.extend(frontier.iter().copied()));
759 }
760 let new_output_frontier = reported
761 .output_frontier
762 .allows_reporting(&new_frontier)
763 .then(|| new_frontier.clone());
764
765 new_frontier.clear();
767 for probe in collection.input_probes.values() {
768 probe.with_frontier(|frontier| new_frontier.extend(frontier.iter().copied()));
769 }
770 let new_input_frontier = reported
771 .input_frontier
772 .allows_reporting(&new_frontier)
773 .then(|| new_frontier.clone());
774
775 if let Some(frontier) = &new_write_frontier {
776 collection
777 .set_reported_write_frontier(ReportedFrontier::Reported(frontier.clone()));
778 }
779 if let Some(frontier) = &new_input_frontier {
780 collection
781 .set_reported_input_frontier(ReportedFrontier::Reported(frontier.clone()));
782 }
783 if let Some(frontier) = &new_output_frontier {
784 collection
785 .set_reported_output_frontier(ReportedFrontier::Reported(frontier.clone()));
786 }
787
788 let response = FrontiersResponse {
789 write_frontier: new_write_frontier,
790 input_frontier: new_input_frontier,
791 output_frontier: new_output_frontier,
792 };
793 if response.has_updates() {
794 responses.push((id, response));
795 }
796 }
797
798 for (id, frontiers) in responses {
799 self.send_compute_response(ComputeResponse::Frontiers(id, frontiers));
800 }
801 }
802
803 pub fn report_dropped_collections(&mut self) {
805 let dropped_collections = std::mem::take(&mut self.compute_state.dropped_collections);
806
807 for (id, collection) in dropped_collections {
808 if collection.is_subscribe_or_copy {
814 continue;
815 }
816
817 let reported = collection.reported_frontiers;
818 let write_frontier = (!reported.write_frontier.is_empty()).then(Antichain::new);
819 let input_frontier = (!reported.input_frontier.is_empty()).then(Antichain::new);
820 let output_frontier = (!reported.output_frontier.is_empty()).then(Antichain::new);
821
822 let frontiers = FrontiersResponse {
823 write_frontier,
824 input_frontier,
825 output_frontier,
826 };
827 if frontiers.has_updates() {
828 self.send_compute_response(ComputeResponse::Frontiers(id, frontiers));
829 }
830 }
831 }
832
833 pub fn report_operator_hydration(&self) {
835 let worker_id = self.timely_worker.index();
836 for event in self.compute_state.hydration_rx.try_iter() {
837 let collection = self.compute_state.collections.get(&event.export_id);
840 if collection.map_or(true, |c| c.reported_frontiers().all_empty()) {
841 continue;
842 }
843
844 let status = OperatorHydrationStatus {
845 collection_id: event.export_id,
846 lir_id: event.lir_id,
847 worker_id,
848 hydrated: event.hydrated,
849 };
850 let response = ComputeResponse::Status(StatusResponse::OperatorHydration(status));
851 self.send_compute_response(response);
852 }
853 }
854
855 pub(crate) fn report_metrics(&self) {
857 if let Some(expiration) = self.compute_state.replica_expiration.as_option() {
858 let now = Duration::from_millis(mz_ore::now::SYSTEM_TIME()).as_secs_f64();
859 let expiration = Duration::from_millis(<u64>::from(expiration)).as_secs_f64();
860 let remaining = expiration - now;
861 self.compute_state
862 .metrics
863 .replica_expiration_remaining_seconds
864 .set(remaining)
865 }
866 }
867
868 fn process_peek(&mut self, upper: &mut Antichain<Timestamp>, mut peek: PendingPeek) {
870 let response = match &mut peek {
871 PendingPeek::Index(peek) => {
872 peek.seek_fulfillment(upper, self.compute_state.max_result_size)
873 }
874 PendingPeek::Persist(peek) => peek.result.try_recv().ok().map(|(result, duration)| {
875 self.compute_state
876 .metrics
877 .persist_peek_seconds
878 .observe(duration.as_secs_f64());
879 result
880 }),
881 };
882
883 if let Some(response) = response {
884 let _span = span!(parent: peek.span(), Level::DEBUG, "process_peek").entered();
885 self.send_peek_response(peek, response)
886 } else {
887 let uuid = peek.peek().uuid;
888 self.compute_state.pending_peeks.insert(uuid, peek);
889 }
890 }
891
892 pub fn process_peeks(&mut self) {
894 let mut upper = Antichain::new();
895 let pending_peeks = std::mem::take(&mut self.compute_state.pending_peeks);
896 for (_uuid, peek) in pending_peeks {
897 self.process_peek(&mut upper, peek);
898 }
899 }
900
901 #[mz_ore::instrument(level = "debug")]
906 fn send_peek_response(&mut self, peek: PendingPeek, response: PeekResponse) {
907 let log_event = peek.as_log_event(false);
908 self.send_compute_response(ComputeResponse::PeekResponse(
910 peek.peek().uuid,
911 response,
912 OpenTelemetryContext::obtain(),
913 ));
914
915 if let Some(logger) = self.compute_state.compute_logger.as_mut() {
917 logger.log(&log_event);
918 }
919 }
920
921 pub fn process_subscribes(&mut self) {
923 let mut subscribe_responses = self.compute_state.subscribe_response_buffer.borrow_mut();
924 for (sink_id, mut response) in subscribe_responses.drain(..) {
925 if let Some(collection) = self.compute_state.collections.get_mut(&sink_id) {
927 let new_frontier = match &response {
928 SubscribeResponse::Batch(b) => b.upper.clone(),
929 SubscribeResponse::DroppedAt(_) => Antichain::new(),
930 };
931
932 let reported = collection.reported_frontiers();
933 assert!(
934 reported.write_frontier.allows_reporting(&new_frontier),
935 "subscribe write frontier regression: {:?} -> {:?}",
936 reported.write_frontier,
937 new_frontier,
938 );
939 assert!(
940 reported.input_frontier.allows_reporting(&new_frontier),
941 "subscribe input frontier regression: {:?} -> {:?}",
942 reported.input_frontier,
943 new_frontier,
944 );
945
946 collection
947 .set_reported_write_frontier(ReportedFrontier::Reported(new_frontier.clone()));
948 collection
949 .set_reported_input_frontier(ReportedFrontier::Reported(new_frontier.clone()));
950 collection.set_reported_output_frontier(ReportedFrontier::Reported(new_frontier));
951 } else {
952 }
955
956 response
957 .to_error_if_exceeds(usize::try_from(self.compute_state.max_result_size).unwrap());
958 self.send_compute_response(ComputeResponse::SubscribeResponse(sink_id, response));
959 }
960 }
961
962 pub fn process_copy_tos(&self) {
964 let mut responses = self.compute_state.copy_to_response_buffer.borrow_mut();
965 for (sink_id, response) in responses.drain(..) {
966 self.send_compute_response(ComputeResponse::CopyToResponse(sink_id, response));
967 }
968 }
969
970 fn send_compute_response(&self, response: ComputeResponse) {
972 let _ = self.response_tx.send(response);
975 }
976
977 pub(crate) fn check_expiration(&self) {
979 let now = mz_ore::now::SYSTEM_TIME();
980 if self.compute_state.replica_expiration.less_than(&now.into()) {
981 let now_datetime = mz_ore::now::to_datetime(now);
982 let expiration_datetime = self
983 .compute_state
984 .replica_expiration
985 .as_option()
986 .map(Into::into)
987 .map(mz_ore::now::to_datetime);
988
989 error!(
992 now,
993 now_datetime = ?now_datetime,
994 expiration = ?self.compute_state.replica_expiration.elements(),
995 expiration_datetime = ?expiration_datetime,
996 "replica expired"
997 );
998
999 assert!(
1001 !self.compute_state.replica_expiration.less_than(&now.into()),
1002 "replica expired. now: {now} ({now_datetime:?}), expiration: {:?} ({expiration_datetime:?})",
1003 self.compute_state.replica_expiration.elements(),
1004 );
1005 }
1006 }
1007
1008 pub fn determine_dataflow_expiration(
1014 &self,
1015 time_dependence: &TimeDependence,
1016 until: &Antichain<mz_repr::Timestamp>,
1017 ) -> Antichain<mz_repr::Timestamp> {
1018 let iter = self
1023 .compute_state
1024 .replica_expiration
1025 .iter()
1026 .filter_map(|t| time_dependence.apply(*t))
1027 .filter_map(|t| mz_repr::Timestamp::try_step_forward(&t))
1028 .filter(|expiration| !until.less_equal(expiration));
1029 Antichain::from_iter(iter)
1030 }
1031}
1032
1033pub enum PendingPeek {
1038 Index(IndexPeek),
1040 Persist(PersistPeek),
1042}
1043
1044impl PendingPeek {
1045 pub fn as_log_event(&self, installed: bool) -> ComputeEvent {
1047 let peek = self.peek();
1048 let (id, peek_type) = match &peek.target {
1049 PeekTarget::Index { id } => (id, logging::compute::PeekType::Index),
1050 PeekTarget::Persist { id, .. } => (id, logging::compute::PeekType::Persist),
1051 };
1052 ComputeEvent::Peek(PeekEvent {
1053 peek: logging::compute::Peek::new(*id, peek.timestamp, peek.uuid),
1054 peek_type,
1055 installed,
1056 })
1057 }
1058
1059 fn index(peek: Peek, mut trace_bundle: TraceBundle) -> Self {
1060 let empty_frontier = Antichain::new();
1061 let timestamp_frontier = Antichain::from_elem(peek.timestamp);
1062 trace_bundle
1063 .oks_mut()
1064 .set_logical_compaction(timestamp_frontier.borrow());
1065 trace_bundle
1066 .errs_mut()
1067 .set_logical_compaction(timestamp_frontier.borrow());
1068 trace_bundle
1069 .oks_mut()
1070 .set_physical_compaction(empty_frontier.borrow());
1071 trace_bundle
1072 .errs_mut()
1073 .set_physical_compaction(empty_frontier.borrow());
1074
1075 PendingPeek::Index(IndexPeek {
1076 peek,
1077 trace_bundle,
1078 span: tracing::Span::current(),
1079 })
1080 }
1081
1082 fn persist<A: Allocate>(
1083 peek: Peek,
1084 persist_clients: Arc<PersistClientCache>,
1085 metadata: CollectionMetadata,
1086 max_result_size: usize,
1087 timely_worker: &TimelyWorker<A>,
1088 ) -> Self {
1089 let active_worker = {
1090 let chosen_index = usize::cast_from(peek.uuid.hashed()) % timely_worker.peers();
1092 chosen_index == timely_worker.index()
1093 };
1094 let activator = timely_worker.sync_activator_for([].into());
1095 let peek_uuid = peek.uuid;
1096
1097 let (result_tx, result_rx) = oneshot::channel();
1098 let timestamp = peek.timestamp;
1099 let mfp_plan = peek.map_filter_project.clone();
1100 let max_results_needed = peek
1101 .finishing
1102 .limit
1103 .map(|l| usize::cast_from(u64::from(l)))
1104 .unwrap_or(usize::MAX)
1105 + peek.finishing.offset;
1106 let order_by = peek.finishing.order_by.clone();
1107
1108 let literal_constraint = peek
1110 .literal_constraints
1111 .clone()
1112 .map(|rows| rows.into_element());
1113
1114 let task_handle = mz_ore::task::spawn(|| "persist::peek", async move {
1115 let start = Instant::now();
1116 let result = if active_worker {
1117 PersistPeek::do_peek(
1118 &persist_clients,
1119 metadata,
1120 timestamp,
1121 literal_constraint,
1122 mfp_plan,
1123 max_result_size,
1124 max_results_needed,
1125 )
1126 .await
1127 } else {
1128 Ok(vec![])
1129 };
1130 let result = match result {
1131 Ok(rows) => PeekResponse::Rows(RowCollection::new(rows, &order_by)),
1132 Err(e) => PeekResponse::Error(e.to_string()),
1133 };
1134 match result_tx.send((result, start.elapsed())) {
1135 Ok(()) => {}
1136 Err((_result, elapsed)) => {
1137 debug!(duration =? elapsed, "dropping result for cancelled peek {peek_uuid}")
1138 }
1139 }
1140 match activator.activate() {
1141 Ok(()) => {}
1142 Err(_) => {
1143 debug!("unable to wake timely after completed peek {peek_uuid}");
1144 }
1145 }
1146 });
1147 PendingPeek::Persist(PersistPeek {
1148 peek,
1149 _abort_handle: task_handle.abort_on_drop(),
1150 result: result_rx,
1151 span: tracing::Span::current(),
1152 })
1153 }
1154
1155 fn span(&self) -> &tracing::Span {
1156 match self {
1157 PendingPeek::Index(p) => &p.span,
1158 PendingPeek::Persist(p) => &p.span,
1159 }
1160 }
1161
1162 pub(crate) fn peek(&self) -> &Peek {
1163 match self {
1164 PendingPeek::Index(p) => &p.peek,
1165 PendingPeek::Persist(p) => &p.peek,
1166 }
1167 }
1168}
1169
1170pub struct PersistPeek {
1175 pub(crate) peek: Peek,
1176 _abort_handle: AbortOnDropHandle<()>,
1179 result: oneshot::Receiver<(PeekResponse, Duration)>,
1181 span: tracing::Span,
1183}
1184
1185impl PersistPeek {
1186 async fn do_peek(
1187 persist_clients: &PersistClientCache,
1188 metadata: CollectionMetadata,
1189 as_of: Timestamp,
1190 literal_constraint: Option<Row>,
1191 mfp_plan: SafeMfpPlan,
1192 max_result_size: usize,
1193 mut limit_remaining: usize,
1194 ) -> Result<Vec<(Row, NonZeroUsize)>, String> {
1195 let client = persist_clients
1196 .open(metadata.persist_location)
1197 .await
1198 .map_err(|e| e.to_string())?;
1199
1200 let mut reader: ReadHandle<SourceData, (), Timestamp, StorageDiff> = client
1201 .open_leased_reader(
1202 metadata.data_shard,
1203 Arc::new(metadata.relation_desc.clone()),
1204 Arc::new(UnitSchema),
1205 Diagnostics::from_purpose("persist::peek"),
1206 USE_CRITICAL_SINCE_SNAPSHOT.get(client.dyncfgs()),
1207 )
1208 .await
1209 .map_err(|e| e.to_string())?;
1210
1211 let mut txns_read = if let Some(txns_id) = metadata.txns_shard {
1218 Some(TxnsCache::open(&client, txns_id, Some(metadata.data_shard)).await)
1219 } else {
1220 None
1221 };
1222
1223 let metrics = client.metrics();
1224
1225 let mut cursor = StatsCursor::new(
1226 &mut reader,
1227 txns_read.as_mut(),
1228 metrics,
1229 &mfp_plan,
1230 &metadata.relation_desc,
1231 Antichain::from_elem(as_of),
1232 )
1233 .await
1234 .map_err(|since| {
1235 format!("attempted to peek at {as_of}, but the since has advanced to {since:?}")
1236 })?;
1237
1238 let mut result = vec![];
1240 let mut datum_vec = DatumVec::new();
1241 let mut row_builder = Row::default();
1242 let arena = RowArena::new();
1243 let mut total_size = 0usize;
1244
1245 let literal_len = match &literal_constraint {
1246 None => 0,
1247 Some(row) => row.iter().count(),
1248 };
1249
1250 'collect: while limit_remaining > 0 {
1251 let Some(batch) = cursor.next().await else {
1252 break;
1253 };
1254 for (data, _, d) in batch {
1255 let row = data.map_err(|e| e.to_string())?;
1256
1257 if let Some(literal) = &literal_constraint {
1258 match row.iter().take(literal_len).cmp(literal.iter()) {
1259 Ordering::Less => continue,
1260 Ordering::Equal => {}
1261 Ordering::Greater => break 'collect,
1262 }
1263 }
1264
1265 let count: usize = d.try_into().map_err(|_| {
1266 format!(
1267 "Invalid data in source, saw retractions ({}) for row that does not exist: {:?}",
1268 -d,
1269 row,
1270 )
1271 })?;
1272 let Some(count) = NonZeroUsize::new(count) else {
1273 continue;
1274 };
1275 let mut datum_local = datum_vec.borrow_with(&row);
1276 let eval_result = mfp_plan
1277 .evaluate_into(&mut datum_local, &arena, &mut row_builder)
1278 .map(|row| row.cloned())
1279 .map_err(|e| e.to_string())?;
1280 if let Some(row) = eval_result {
1281 total_size = total_size
1282 .saturating_add(row.byte_len())
1283 .saturating_add(std::mem::size_of::<NonZeroUsize>());
1284 if total_size > max_result_size {
1285 return Err(format!(
1286 "result exceeds max size of {}",
1287 ByteSize::b(u64::cast_from(max_result_size))
1288 ));
1289 }
1290 result.push((row, count));
1291 limit_remaining = limit_remaining.saturating_sub(count.get());
1292 if limit_remaining == 0 {
1293 break;
1294 }
1295 }
1296 }
1297 }
1298
1299 Ok(result)
1300 }
1301}
1302
1303pub struct IndexPeek {
1305 peek: Peek,
1306 trace_bundle: TraceBundle,
1308 span: tracing::Span,
1310}
1311
1312impl IndexPeek {
1313 fn seek_fulfillment(
1326 &mut self,
1327 upper: &mut Antichain<Timestamp>,
1328 max_result_size: u64,
1329 ) -> Option<PeekResponse> {
1330 self.trace_bundle.oks_mut().read_upper(upper);
1331 if upper.less_equal(&self.peek.timestamp) {
1332 return None;
1333 }
1334 self.trace_bundle.errs_mut().read_upper(upper);
1335 if upper.less_equal(&self.peek.timestamp) {
1336 return None;
1337 }
1338
1339 let read_frontier = self.trace_bundle.compaction_frontier();
1340 if !read_frontier.less_equal(&self.peek.timestamp) {
1341 let error = format!(
1342 "Arrangement compaction frontier ({:?}) is beyond the time of the attempted read ({})",
1343 read_frontier.elements(),
1344 self.peek.timestamp,
1345 );
1346 return Some(PeekResponse::Error(error));
1347 }
1348
1349 let response = match self.collect_finished_data(max_result_size) {
1350 Ok(rows) => PeekResponse::Rows(RowCollection::new(rows, &self.peek.finishing.order_by)),
1351 Err(text) => PeekResponse::Error(text),
1352 };
1353 Some(response)
1354 }
1355
1356 fn collect_finished_data(
1358 &mut self,
1359 max_result_size: u64,
1360 ) -> Result<Vec<(Row, NonZeroUsize)>, String> {
1361 let (mut cursor, storage) = self.trace_bundle.errs_mut().cursor();
1364 while cursor.key_valid(&storage) {
1365 let mut copies = Diff::ZERO;
1366 cursor.map_times(&storage, |time, diff| {
1367 if time.less_equal(&self.peek.timestamp) {
1368 copies += diff;
1369 }
1370 });
1371 if copies.is_negative() {
1372 return Err(format!(
1373 "Invalid data in source errors, saw retractions ({}) for row that does not exist: {}",
1374 -copies,
1375 cursor.key(&storage),
1376 ));
1377 }
1378 if copies.is_positive() {
1379 return Err(cursor.key(&storage).to_string());
1380 }
1381 cursor.step_key(&storage);
1382 }
1383
1384 Self::collect_ok_finished_data(&mut self.peek, self.trace_bundle.oks_mut(), max_result_size)
1385 }
1386
1387 fn collect_ok_finished_data<Tr>(
1389 peek: &mut Peek<Timestamp>,
1390 oks_handle: &mut Tr,
1391 max_result_size: u64,
1392 ) -> Result<Vec<(Row, NonZeroUsize)>, String>
1393 where
1394 for<'a> Tr: TraceReader<DiffGat<'a> = &'a Diff>,
1395 for<'a> Tr::Key<'a>: ToDatumIter + IntoOwned<'a, Owned = Row> + Eq,
1396 for<'a> Tr::Val<'a>: ToDatumIter,
1397 for<'a> Tr::TimeGat<'a>: PartialOrder<mz_repr::Timestamp>,
1398 {
1399 let max_result_size = usize::cast_from(max_result_size);
1400 let count_byte_size = std::mem::size_of::<NonZeroUsize>();
1401
1402 let (mut cursor, storage) = oks_handle.cursor();
1404 let mut results = Vec::new();
1406 let mut total_size: usize = 0;
1407
1408 let max_results = peek
1414 .finishing
1415 .limit
1416 .map(|l| usize::cast_from(u64::from(l)) + peek.finishing.offset);
1417
1418 use mz_ore::result::ResultExt;
1419
1420 let mut row_builder = Row::default();
1421 let mut datum_vec = DatumVec::new();
1422 let mut l_datum_vec = DatumVec::new();
1423 let mut r_datum_vec = DatumVec::new();
1424
1425 peek.literal_constraints
1427 .iter_mut()
1428 .for_each(|vec| vec.sort());
1429 let has_literal_constraints = peek.literal_constraints.is_some();
1430 let mut literals = peek.literal_constraints.iter().flatten();
1431 let mut current_literal = None;
1432
1433 while cursor.key_valid(&storage) {
1434 if has_literal_constraints {
1435 loop {
1436 current_literal = literals.next();
1439 match current_literal {
1440 None => return Ok(results),
1441 Some(current_literal) => {
1442 cursor.seek_key(&storage, IntoOwned::borrow_as(current_literal));
1445 if !cursor.key_valid(&storage) {
1446 return Ok(results);
1447 }
1448 if cursor.get_key(&storage).unwrap()
1449 == IntoOwned::borrow_as(current_literal)
1450 {
1451 break;
1454 }
1455 }
1458 }
1459 }
1460 }
1461
1462 while cursor.val_valid(&storage) {
1463 let arena = RowArena::new();
1469
1470 let key_item = cursor.key(&storage);
1471 let key = key_item.to_datum_iter();
1472 let row_item = cursor.val(&storage);
1473 let row = row_item.to_datum_iter();
1474
1475 let mut borrow = datum_vec.borrow();
1476 borrow.extend(key);
1477 borrow.extend(row);
1478
1479 if has_literal_constraints {
1480 let datum_vec = borrow.deref_mut();
1483 datum_vec.extend(current_literal.unwrap().iter());
1487 }
1488 if let Some(result) = peek
1489 .map_filter_project
1490 .evaluate_into(&mut borrow, &arena, &mut row_builder)
1491 .map(|row| row.cloned())
1492 .map_err_to_string_with_causes()?
1493 {
1494 let mut copies = Diff::ZERO;
1495 cursor.map_times(&storage, |time, diff| {
1496 if time.less_equal(&peek.timestamp) {
1497 copies += diff;
1498 }
1499 });
1500 let copies: usize = if copies.is_negative() {
1501 return Err(format!(
1502 "Invalid data in source, saw retractions ({}) for row that does not exist: {:?}",
1503 -copies, &*borrow,
1504 ));
1505 } else {
1506 copies.into_inner().try_into().unwrap()
1507 };
1508 if let Some(copies) = NonZeroUsize::new(copies) {
1510 total_size = total_size
1511 .saturating_add(result.byte_len())
1512 .saturating_add(count_byte_size);
1513 if total_size > max_result_size {
1514 return Err(format!(
1515 "result exceeds max size of {}",
1516 ByteSize::b(u64::cast_from(max_result_size))
1517 ));
1518 }
1519 results.push((result, copies));
1520 }
1521
1522 if let Some(max_results) = max_results {
1525 if results.len() >= 2 * max_results {
1529 if peek.finishing.order_by.is_empty() {
1530 results.truncate(max_results);
1531 return Ok(results);
1532 } else {
1533 results.sort_by(|left, right| {
1542 let left_datums = l_datum_vec.borrow_with(&left.0);
1543 let right_datums = r_datum_vec.borrow_with(&right.0);
1544 mz_expr::compare_columns(
1545 &peek.finishing.order_by,
1546 &left_datums,
1547 &right_datums,
1548 || left.0.cmp(&right.0),
1549 )
1550 });
1551 let dropped = results.drain(max_results..);
1552 let dropped_size =
1553 dropped.into_iter().fold(0, |acc: usize, (row, _count)| {
1554 acc.saturating_add(
1555 row.byte_len().saturating_add(count_byte_size),
1556 )
1557 });
1558 total_size = total_size.saturating_sub(dropped_size);
1559 }
1560 }
1561 }
1562 }
1563 cursor.step_val(&storage);
1564 }
1565 if !has_literal_constraints {
1568 cursor.step_key(&storage);
1570 }
1571 }
1572
1573 Ok(results)
1574 }
1575}
1576
1577#[derive(Debug)]
1579struct ReportedFrontiers {
1580 write_frontier: ReportedFrontier,
1582 input_frontier: ReportedFrontier,
1584 output_frontier: ReportedFrontier,
1586}
1587
1588impl ReportedFrontiers {
1589 fn new() -> Self {
1591 Self {
1592 write_frontier: ReportedFrontier::new(),
1593 input_frontier: ReportedFrontier::new(),
1594 output_frontier: ReportedFrontier::new(),
1595 }
1596 }
1597
1598 fn all_empty(&self) -> bool {
1600 self.write_frontier.is_empty()
1601 && self.input_frontier.is_empty()
1602 && self.output_frontier.is_empty()
1603 }
1604}
1605
1606#[derive(Clone, Debug)]
1608pub enum ReportedFrontier {
1609 Reported(Antichain<Timestamp>),
1611 NotReported {
1613 lower: Antichain<Timestamp>,
1615 },
1616}
1617
1618impl ReportedFrontier {
1619 pub fn new() -> Self {
1621 let lower = Antichain::from_elem(timely::progress::Timestamp::minimum());
1622 Self::NotReported { lower }
1623 }
1624
1625 pub fn is_empty(&self) -> bool {
1627 match self {
1628 Self::Reported(frontier) => frontier.is_empty(),
1629 Self::NotReported { .. } => false,
1630 }
1631 }
1632
1633 fn allows_reporting(&self, other: &Antichain<Timestamp>) -> bool {
1639 match self {
1640 Self::Reported(frontier) => PartialOrder::less_than(frontier, other),
1641 Self::NotReported { lower } => PartialOrder::less_equal(lower, other),
1642 }
1643 }
1644}
1645
1646pub struct CollectionState {
1648 reported_frontiers: ReportedFrontiers,
1650 pub is_subscribe_or_copy: bool,
1656 as_of: Antichain<Timestamp>,
1660
1661 pub sink_token: Option<SinkToken>,
1666 pub sink_write_frontier: Option<Rc<RefCell<Antichain<Timestamp>>>>,
1670 pub input_probes: BTreeMap<GlobalId, probe::Handle<Timestamp>>,
1672 pub compute_probe: Option<probe::Handle<Timestamp>>,
1677 logging: Option<CollectionLogging>,
1679 metrics: CollectionMetrics,
1681}
1682
1683impl CollectionState {
1684 fn new(
1685 is_subscribe_or_copy: bool,
1686 as_of: Antichain<Timestamp>,
1687 metrics: CollectionMetrics,
1688 ) -> Self {
1689 Self {
1690 reported_frontiers: ReportedFrontiers::new(),
1691 is_subscribe_or_copy,
1692 as_of,
1693 sink_token: None,
1694 sink_write_frontier: None,
1695 input_probes: Default::default(),
1696 compute_probe: None,
1697 logging: None,
1698 metrics,
1699 }
1700 }
1701
1702 fn reported_frontiers(&self) -> &ReportedFrontiers {
1704 &self.reported_frontiers
1705 }
1706
1707 pub fn reset_reported_frontiers(&mut self, frontier: ReportedFrontier) {
1709 self.reported_frontiers.write_frontier = frontier.clone();
1710 self.reported_frontiers.input_frontier = frontier.clone();
1711 self.reported_frontiers.output_frontier = frontier;
1712 }
1713
1714 fn set_reported_write_frontier(&mut self, frontier: ReportedFrontier) {
1716 if let Some(logging) = &mut self.logging {
1717 let time = match &frontier {
1718 ReportedFrontier::Reported(frontier) => frontier.get(0).copied(),
1719 ReportedFrontier::NotReported { .. } => Some(Timestamp::MIN),
1720 };
1721 logging.set_frontier(time);
1722 }
1723
1724 self.reported_frontiers.write_frontier = frontier;
1725 }
1726
1727 fn set_reported_input_frontier(&mut self, frontier: ReportedFrontier) {
1729 if let Some(logging) = &mut self.logging {
1731 for (id, probe) in &self.input_probes {
1732 let new_time = probe.with_frontier(|frontier| frontier.as_option().copied());
1733 logging.set_import_frontier(*id, new_time);
1734 }
1735 }
1736
1737 self.reported_frontiers.input_frontier = frontier;
1738 }
1739
1740 fn set_reported_output_frontier(&mut self, frontier: ReportedFrontier) {
1742 let already_hydrated = self.hydrated();
1743
1744 self.reported_frontiers.output_frontier = frontier;
1745
1746 if !already_hydrated && self.hydrated() {
1747 if let Some(logging) = &mut self.logging {
1748 logging.set_hydrated();
1749 }
1750 self.metrics.record_collection_hydrated();
1751 }
1752 }
1753
1754 fn hydrated(&self) -> bool {
1756 match &self.reported_frontiers.output_frontier {
1757 ReportedFrontier::Reported(frontier) => PartialOrder::less_than(&self.as_of, frontier),
1758 ReportedFrontier::NotReported { .. } => false,
1759 }
1760 }
1761}
1762
1763pub struct DroppedCollection {
1775 reported_frontiers: ReportedFrontiers,
1776 is_subscribe_or_copy: bool,
1777}
1778
1779pub struct HydrationEvent {
1781 pub export_id: GlobalId,
1783 pub lir_id: LirId,
1785 pub hydrated: bool,
1787}