1use std::any::Any;
9use std::cell::RefCell;
10use std::cmp::Ordering;
11use std::collections::{BTreeMap, BTreeSet};
12use std::num::NonZeroUsize;
13
14use std::rc::Rc;
15use std::sync::Arc;
16use std::time::{Duration, Instant};
17
18use bytesize::ByteSize;
19use differential_dataflow::Hashable;
20use differential_dataflow::lattice::Lattice;
21use differential_dataflow::trace::{Cursor, TraceReader};
22use mz_compute_client::logging::LoggingConfig;
23use mz_compute_client::protocol::command::{
24 ComputeCommand, ComputeParameters, InstanceConfig, Peek, PeekTarget,
25};
26use mz_compute_client::protocol::history::ComputeCommandHistory;
27use mz_compute_client::protocol::response::{
28 ComputeResponse, CopyToResponse, FrontiersResponse, PeekResponse, SubscribeResponse,
29};
30use mz_compute_types::dataflows::DataflowDescription;
31use mz_compute_types::dyncfgs::{
32 ENABLE_PEEK_RESPONSE_STASH, PEEK_RESPONSE_STASH_BATCH_MAX_RUNS,
33 PEEK_RESPONSE_STASH_THRESHOLD_BYTES, PEEK_STASH_BATCH_SIZE, PEEK_STASH_NUM_BATCHES,
34};
35use mz_compute_types::plan::render_plan::RenderPlan;
36use mz_dyncfg::ConfigSet;
37use mz_expr::SafeMfpPlan;
38use mz_expr::row::RowCollection;
39use mz_ore::cast::CastFrom;
40use mz_ore::collections::CollectionExt;
41use mz_ore::metrics::UIntGauge;
42use mz_ore::now::EpochMillis;
43use mz_ore::task::AbortOnDropHandle;
44use mz_ore::tracing::{OpenTelemetryContext, TracingHandle};
45use mz_persist_client::Diagnostics;
46use mz_persist_client::cache::PersistClientCache;
47use mz_persist_client::cfg::USE_CRITICAL_SINCE_SNAPSHOT;
48use mz_persist_client::read::ReadHandle;
49use mz_persist_types::PersistLocation;
50use mz_persist_types::codec_impls::UnitSchema;
51use mz_repr::fixed_length::ToDatumIter;
52use mz_repr::{DatumVec, Diff, GlobalId, Row, RowArena, Timestamp};
53use mz_storage_operators::stats::StatsCursor;
54use mz_storage_types::StorageDiff;
55use mz_storage_types::controller::CollectionMetadata;
56use mz_storage_types::dyncfgs::ORE_OVERFLOWING_BEHAVIOR;
57use mz_storage_types::sources::SourceData;
58use mz_storage_types::time_dependence::TimeDependence;
59use mz_txn_wal::operator::TxnsContext;
60use mz_txn_wal::txn_cache::TxnsCache;
61use timely::communication::Allocate;
62use timely::dataflow::operators::probe;
63use timely::order::PartialOrder;
64use timely::progress::frontier::Antichain;
65use timely::scheduling::Scheduler;
66use timely::worker::Worker as TimelyWorker;
67use tokio::sync::{oneshot, watch};
68use tracing::{Level, debug, error, info, span, warn};
69use uuid::Uuid;
70
71use crate::arrangement::manager::{TraceBundle, TraceManager};
72use crate::logging;
73use crate::logging::compute::{CollectionLogging, ComputeEvent, PeekEvent};
74use crate::logging::initialize::LoggingTraces;
75use crate::metrics::{CollectionMetrics, WorkerMetrics};
76use crate::render::{LinearJoinSpec, StartSignal};
77use crate::server::{ComputeInstanceContext, ResponseSender};
78
79mod peek_result_iterator;
80mod peek_stash;
81
82pub struct ComputeState {
87 pub collections: BTreeMap<GlobalId, CollectionState>,
96 pub dropped_collections: Vec<(GlobalId, DroppedCollection)>,
98 pub traces: TraceManager,
100 pub subscribe_response_buffer: Rc<RefCell<Vec<(GlobalId, SubscribeResponse)>>>,
105 pub copy_to_response_buffer: Rc<RefCell<Vec<(GlobalId, CopyToResponse)>>>,
110 pub pending_peeks: BTreeMap<Uuid, PendingPeek>,
112 pub peek_stash_persist_location: Option<PersistLocation>,
114 pub compute_logger: Option<logging::compute::Logger>,
116 pub persist_clients: Arc<PersistClientCache>,
119 pub txns_ctx: TxnsContext,
121 pub command_history: ComputeCommandHistory<UIntGauge>,
123 max_result_size: u64,
125 pub linear_join_spec: LinearJoinSpec,
127 pub metrics: WorkerMetrics,
129 tracing_handle: Arc<TracingHandle>,
131 pub context: ComputeInstanceContext,
133 pub worker_config: Rc<ConfigSet>,
147
148 suspended_collections: BTreeMap<GlobalId, Rc<dyn Any>>,
154
155 pub read_only_rx: watch::Receiver<bool>,
166
167 pub read_only_tx: watch::Sender<bool>,
169
170 pub server_maintenance_interval: Duration,
173
174 pub init_system_time: EpochMillis,
178
179 pub replica_expiration: Antichain<Timestamp>,
183}
184
185impl ComputeState {
186 pub fn new(
188 persist_clients: Arc<PersistClientCache>,
189 txns_ctx: TxnsContext,
190 metrics: WorkerMetrics,
191 tracing_handle: Arc<TracingHandle>,
192 context: ComputeInstanceContext,
193 ) -> Self {
194 let traces = TraceManager::new(metrics.clone());
195 let command_history = ComputeCommandHistory::new(metrics.for_history());
196
197 let (read_only_tx, read_only_rx) = watch::channel(true);
200
201 Self {
202 collections: Default::default(),
203 dropped_collections: Default::default(),
204 traces,
205 subscribe_response_buffer: Default::default(),
206 copy_to_response_buffer: Default::default(),
207 pending_peeks: Default::default(),
208 peek_stash_persist_location: None,
209 compute_logger: None,
210 persist_clients,
211 txns_ctx,
212 command_history,
213 max_result_size: u64::MAX,
214 linear_join_spec: Default::default(),
215 metrics,
216 tracing_handle,
217 context,
218 worker_config: mz_dyncfgs::all_dyncfgs().into(),
219 suspended_collections: Default::default(),
220 read_only_tx,
221 read_only_rx,
222 server_maintenance_interval: Duration::ZERO,
223 init_system_time: mz_ore::now::SYSTEM_TIME(),
224 replica_expiration: Antichain::default(),
225 }
226 }
227
228 pub fn expect_collection_mut(&mut self, id: GlobalId) -> &mut CollectionState {
232 self.collections
233 .get_mut(&id)
234 .expect("collection must exist")
235 }
236
237 pub fn input_probe_for(
243 &mut self,
244 input_id: GlobalId,
245 collection_ids: impl Iterator<Item = GlobalId>,
246 ) -> probe::Handle<Timestamp> {
247 let probe = probe::Handle::default();
248 for id in collection_ids {
249 if let Some(collection) = self.collections.get_mut(&id) {
250 collection.input_probes.insert(input_id, probe.clone());
251 }
252 }
253 probe
254 }
255
256 fn apply_worker_config(&mut self) {
258 use mz_compute_types::dyncfgs::*;
259
260 let config = &self.worker_config;
261
262 self.linear_join_spec = LinearJoinSpec::from_config(config);
263
264 if ENABLE_LGALLOC.get(config) {
265 if let Some(path) = &self.context.scratch_directory {
266 let clear_bytes = LGALLOC_SLOW_CLEAR_BYTES.get(config);
267 let eager_return = ENABLE_LGALLOC_EAGER_RECLAMATION.get(config);
268 let file_growth_dampener = LGALLOC_FILE_GROWTH_DAMPENER.get(config);
269 let interval = LGALLOC_BACKGROUND_INTERVAL.get(config);
270 let local_buffer_bytes = LGALLOC_LOCAL_BUFFER_BYTES.get(config);
271 info!(
272 ?path,
273 backgrund_interval=?interval,
274 clear_bytes,
275 eager_return,
276 file_growth_dampener,
277 local_buffer_bytes,
278 "enabling lgalloc"
279 );
280 let background_worker_config = lgalloc::BackgroundWorkerConfig {
281 interval,
282 clear_bytes,
283 };
284 lgalloc::lgalloc_set_config(
285 lgalloc::LgAlloc::new()
286 .enable()
287 .with_path(path.clone())
288 .with_background_config(background_worker_config)
289 .eager_return(eager_return)
290 .file_growth_dampener(file_growth_dampener)
291 .local_buffer_bytes(local_buffer_bytes),
292 );
293 } else {
294 debug!("not enabling lgalloc, scratch directory not specified");
295 }
296 } else {
297 info!("disabling lgalloc");
298 lgalloc::lgalloc_set_config(lgalloc::LgAlloc::new().disable());
299 }
300
301 crate::memory_limiter::apply_limiter_config(config);
302
303 mz_ore::region::ENABLE_LGALLOC_REGION.store(
304 ENABLE_COLUMNATION_LGALLOC.get(config),
305 std::sync::atomic::Ordering::Relaxed,
306 );
307
308 let enable_columnar_lgalloc = ENABLE_COLUMNAR_LGALLOC.get(config);
309 mz_timely_util::containers::set_enable_columnar_lgalloc(enable_columnar_lgalloc);
310
311 self.server_maintenance_interval = COMPUTE_SERVER_MAINTENANCE_INTERVAL.get(config);
314
315 let overflowing_behavior = ORE_OVERFLOWING_BEHAVIOR.get(config);
316 match overflowing_behavior.parse() {
317 Ok(behavior) => mz_ore::overflowing::set_behavior(behavior),
318 Err(err) => {
319 error!(
320 err,
321 overflowing_behavior, "Invalid value for ore_overflowing_behavior"
322 );
323 }
324 }
325 }
326
327 pub fn apply_expiration_offset(&mut self, offset: Duration) {
333 if self.replica_expiration.is_empty() {
334 let offset: EpochMillis = offset
335 .as_millis()
336 .try_into()
337 .expect("duration must fit within u64");
338 let replica_expiration_millis = self.init_system_time + offset;
339 let replica_expiration = Timestamp::from(replica_expiration_millis);
340
341 info!(
342 offset = %offset,
343 replica_expiration_millis = %replica_expiration_millis,
344 replica_expiration_utc = %mz_ore::now::to_datetime(replica_expiration_millis),
345 "setting replica expiration",
346 );
347 self.replica_expiration = Antichain::from_elem(replica_expiration);
348
349 self.metrics
351 .replica_expiration_timestamp_seconds
352 .set(replica_expiration.into());
353 }
354 }
355
356 pub fn dataflow_max_inflight_bytes(&self) -> Option<usize> {
359 use mz_compute_types::dyncfgs::{
360 DATAFLOW_MAX_INFLIGHT_BYTES, DATAFLOW_MAX_INFLIGHT_BYTES_CC,
361 };
362
363 if self.persist_clients.cfg.is_cc_active {
364 DATAFLOW_MAX_INFLIGHT_BYTES_CC.get(&self.worker_config)
365 } else {
366 DATAFLOW_MAX_INFLIGHT_BYTES.get(&self.worker_config)
367 }
368 }
369}
370
371pub(crate) struct ActiveComputeState<'a, A: Allocate> {
373 pub timely_worker: &'a mut TimelyWorker<A>,
375 pub compute_state: &'a mut ComputeState,
377 pub response_tx: &'a mut ResponseSender,
379}
380
381pub struct SinkToken(#[allow(dead_code)] Box<dyn Any>);
383
384impl SinkToken {
385 pub fn new(t: Box<dyn Any>) -> Self {
387 Self(t)
388 }
389}
390
391impl<'a, A: Allocate + 'static> ActiveComputeState<'a, A> {
392 #[mz_ore::instrument(level = "debug")]
394 pub fn handle_compute_command(&mut self, cmd: ComputeCommand) {
395 use ComputeCommand::*;
396
397 self.compute_state.command_history.push(cmd.clone());
398
399 let timer = self
401 .compute_state
402 .metrics
403 .handle_command_duration_seconds
404 .for_command(&cmd)
405 .start_timer();
406
407 match cmd {
408 Hello { .. } => panic!("Hello must be captured before"),
409 CreateInstance(instance_config) => self.handle_create_instance(*instance_config),
410 InitializationComplete => (),
411 UpdateConfiguration(params) => self.handle_update_configuration(*params),
412 CreateDataflow(dataflow) => self.handle_create_dataflow(*dataflow),
413 Schedule(id) => self.handle_schedule(id),
414 AllowCompaction { id, frontier } => self.handle_allow_compaction(id, frontier),
415 Peek(peek) => {
416 peek.otel_ctx.attach_as_parent();
417 self.handle_peek(*peek)
418 }
419 CancelPeek { uuid } => self.handle_cancel_peek(uuid),
420 AllowWrites => {
421 self.compute_state
422 .read_only_tx
423 .send(false)
424 .expect("we're holding one other end");
425 self.compute_state.persist_clients.cfg().enable_compaction();
426 }
427 }
428
429 timer.observe_duration();
430 }
431
432 fn handle_create_instance(&mut self, config: InstanceConfig) {
433 self.compute_state.apply_worker_config();
435 if let Some(offset) = config.expiration_offset {
436 self.compute_state.apply_expiration_offset(offset);
437 }
438
439 self.initialize_logging(config.logging);
440
441 self.compute_state.peek_stash_persist_location = Some(config.peek_stash_persist_location);
442 }
443
444 fn handle_update_configuration(&mut self, params: ComputeParameters) {
445 info!("Applying configuration update: {params:?}");
446
447 let ComputeParameters {
448 workload_class,
449 max_result_size,
450 tracing,
451 grpc_client: _grpc_client,
452 dyncfg_updates,
453 } = params;
454
455 if let Some(v) = workload_class {
456 self.compute_state.metrics.set_workload_class(v);
457 }
458 if let Some(v) = max_result_size {
459 self.compute_state.max_result_size = v;
460 }
461
462 tracing.apply(self.compute_state.tracing_handle.as_ref());
463
464 dyncfg_updates.apply(&self.compute_state.worker_config);
465 self.compute_state
466 .persist_clients
467 .cfg()
468 .apply_from(&dyncfg_updates);
469
470 mz_metrics::update_dyncfg(&dyncfg_updates);
474
475 self.compute_state.apply_worker_config();
476 }
477
478 fn handle_create_dataflow(
479 &mut self,
480 dataflow: DataflowDescription<RenderPlan, CollectionMetadata>,
481 ) {
482 let dataflow_index = Rc::new(self.timely_worker.next_dataflow_index());
483 let as_of = dataflow.as_of.clone().unwrap();
484
485 let dataflow_expiration = dataflow
486 .time_dependence
487 .as_ref()
488 .map(|time_dependence| {
489 self.determine_dataflow_expiration(time_dependence, &dataflow.until)
490 })
491 .unwrap_or_default();
492
493 let until = dataflow.until.meet(&dataflow_expiration);
495
496 if dataflow.is_transient() {
497 debug!(
498 name = %dataflow.debug_name,
499 import_ids = %dataflow.display_import_ids(),
500 export_ids = %dataflow.display_export_ids(),
501 as_of = ?as_of.elements(),
502 time_dependence = ?dataflow.time_dependence,
503 expiration = ?dataflow_expiration.elements(),
504 expiration_datetime = ?dataflow_expiration.as_option().map(|t| mz_ore::now::to_datetime(t.into())),
505 plan_until = ?dataflow.until.elements(),
506 until = ?until.elements(),
507 "creating dataflow",
508 );
509 } else {
510 info!(
511 name = %dataflow.debug_name,
512 import_ids = %dataflow.display_import_ids(),
513 export_ids = %dataflow.display_export_ids(),
514 as_of = ?as_of.elements(),
515 time_dependence = ?dataflow.time_dependence,
516 expiration = ?dataflow_expiration.elements(),
517 expiration_datetime = ?dataflow_expiration.as_option().map(|t| mz_ore::now::to_datetime(t.into())),
518 plan_until = ?dataflow.until.elements(),
519 until = ?until.elements(),
520 "creating dataflow",
521 );
522 };
523
524 let subscribe_copy_ids: BTreeSet<_> = dataflow
525 .subscribe_ids()
526 .chain(dataflow.copy_to_ids())
527 .collect();
528
529 for object_id in dataflow.export_ids() {
531 let is_subscribe_or_copy = subscribe_copy_ids.contains(&object_id);
532 let metrics = self.compute_state.metrics.for_collection(object_id);
533 let mut collection = CollectionState::new(
534 Rc::clone(&dataflow_index),
535 is_subscribe_or_copy,
536 as_of.clone(),
537 metrics,
538 );
539
540 if let Some(logger) = self.compute_state.compute_logger.clone() {
541 let logging = CollectionLogging::new(
542 object_id,
543 logger,
544 *dataflow_index,
545 dataflow.import_ids(),
546 );
547 collection.logging = Some(logging);
548 }
549
550 collection.reset_reported_frontiers(ReportedFrontier::NotReported {
551 lower: as_of.clone(),
552 });
553
554 let existing = self.compute_state.collections.insert(object_id, collection);
555 if existing.is_some() {
556 error!(
557 id = ?object_id,
558 "existing collection for newly created dataflow",
559 );
560 }
561 }
562
563 let (start_signal, suspension_token) = StartSignal::new();
564 for id in dataflow.export_ids() {
565 self.compute_state
566 .suspended_collections
567 .insert(id, Rc::clone(&suspension_token));
568 }
569
570 crate::render::build_compute_dataflow(
571 self.timely_worker,
572 self.compute_state,
573 dataflow,
574 start_signal,
575 until,
576 dataflow_expiration,
577 );
578 }
579
580 fn handle_schedule(&mut self, id: GlobalId) {
581 let suspension_token = self.compute_state.suspended_collections.remove(&id);
587 drop(suspension_token);
588 }
589
590 fn handle_allow_compaction(&mut self, id: GlobalId, frontier: Antichain<Timestamp>) {
591 if frontier.is_empty() {
592 self.drop_collection(id);
594 } else {
595 self.compute_state
596 .traces
597 .allow_compaction(id, frontier.borrow());
598 }
599 }
600
601 #[mz_ore::instrument(level = "debug")]
602 fn handle_peek(&mut self, peek: Peek) {
603 let pending = match &peek.target {
604 PeekTarget::Index { id } => {
605 let trace_bundle = self.compute_state.traces.get(id).unwrap().clone();
607 PendingPeek::index(peek, trace_bundle)
608 }
609 PeekTarget::Persist { metadata, .. } => {
610 let metadata = metadata.clone();
611 PendingPeek::persist(
612 peek,
613 Arc::clone(&self.compute_state.persist_clients),
614 metadata,
615 usize::cast_from(self.compute_state.max_result_size),
616 self.timely_worker,
617 )
618 }
619 };
620
621 if let Some(logger) = self.compute_state.compute_logger.as_mut() {
623 logger.log(&pending.as_log_event(true));
624 }
625
626 self.process_peek(&mut Antichain::new(), pending);
627 }
628
629 fn handle_cancel_peek(&mut self, uuid: Uuid) {
630 if let Some(peek) = self.compute_state.pending_peeks.remove(&uuid) {
631 self.send_peek_response(peek, PeekResponse::Canceled);
632 }
633 }
634
635 fn drop_collection(&mut self, id: GlobalId) {
652 let collection = self
653 .compute_state
654 .collections
655 .remove(&id)
656 .expect("dropped untracked collection");
657
658 self.compute_state.traces.remove(&id);
660 self.compute_state.suspended_collections.remove(&id);
662
663 if let Ok(index) = Rc::try_unwrap(collection.dataflow_index) {
665 self.timely_worker.drop_dataflow(index);
666 }
667
668 let dropped = DroppedCollection {
670 reported_frontiers: collection.reported_frontiers,
671 is_subscribe_or_copy: collection.is_subscribe_or_copy,
672 };
673 self.compute_state.dropped_collections.push((id, dropped));
674 }
675
676 pub fn initialize_logging(&mut self, config: LoggingConfig) {
678 if self.compute_state.compute_logger.is_some() {
679 panic!("dataflow server has already initialized logging");
680 }
681
682 let LoggingTraces {
683 traces,
684 dataflow_index,
685 compute_logger: logger,
686 } = logging::initialize(self.timely_worker, &config);
687
688 let dataflow_index = Rc::new(dataflow_index);
689 let mut log_index_ids = config.index_logs;
690 for (log, trace) in traces {
691 let id = log_index_ids
693 .remove(&log)
694 .expect("`logging::initialize` does not invent logs");
695 self.compute_state.traces.set(id, trace);
696
697 let is_subscribe_or_copy = false;
699 let as_of = Antichain::from_elem(Timestamp::MIN);
700 let metrics = self.compute_state.metrics.for_collection(id);
701 let mut collection = CollectionState::new(
702 Rc::clone(&dataflow_index),
703 is_subscribe_or_copy,
704 as_of,
705 metrics,
706 );
707
708 let logging =
709 CollectionLogging::new(id, logger.clone(), *dataflow_index, std::iter::empty());
710 collection.logging = Some(logging);
711
712 let existing = self.compute_state.collections.insert(id, collection);
713 if existing.is_some() {
714 error!(
715 id = ?id,
716 "existing collection for newly initialized logging export",
717 );
718 }
719 }
720
721 assert!(
723 log_index_ids.is_empty(),
724 "failed to create requested logging indexes: {log_index_ids:?}",
725 );
726
727 self.compute_state.compute_logger = Some(logger);
728 }
729
730 pub fn report_frontiers(&mut self) {
732 let mut responses = Vec::new();
733
734 let mut new_frontier = Antichain::new();
736
737 for (&id, collection) in self.compute_state.collections.iter_mut() {
738 if collection.is_subscribe_or_copy {
741 continue;
742 }
743
744 let reported = collection.reported_frontiers();
745
746 new_frontier.clear();
748 if let Some(traces) = self.compute_state.traces.get_mut(&id) {
749 assert!(
750 collection.sink_write_frontier.is_none(),
751 "collection {id} has multiple frontiers"
752 );
753 traces.oks_mut().read_upper(&mut new_frontier);
754 } else if let Some(frontier) = &collection.sink_write_frontier {
755 new_frontier.clone_from(&frontier.borrow());
756 } else {
757 error!(id = ?id, "collection without write frontier");
758 continue;
759 }
760 let new_write_frontier = reported
761 .write_frontier
762 .allows_reporting(&new_frontier)
763 .then(|| new_frontier.clone());
764
765 if let Some(probe) = &collection.compute_probe {
774 probe.with_frontier(|frontier| new_frontier.extend(frontier.iter().copied()));
775 }
776 let new_output_frontier = reported
777 .output_frontier
778 .allows_reporting(&new_frontier)
779 .then(|| new_frontier.clone());
780
781 new_frontier.clear();
783 for probe in collection.input_probes.values() {
784 probe.with_frontier(|frontier| new_frontier.extend(frontier.iter().copied()));
785 }
786 let new_input_frontier = reported
787 .input_frontier
788 .allows_reporting(&new_frontier)
789 .then(|| new_frontier.clone());
790
791 if let Some(frontier) = &new_write_frontier {
792 collection
793 .set_reported_write_frontier(ReportedFrontier::Reported(frontier.clone()));
794 }
795 if let Some(frontier) = &new_input_frontier {
796 collection
797 .set_reported_input_frontier(ReportedFrontier::Reported(frontier.clone()));
798 }
799 if let Some(frontier) = &new_output_frontier {
800 collection
801 .set_reported_output_frontier(ReportedFrontier::Reported(frontier.clone()));
802 }
803
804 let response = FrontiersResponse {
805 write_frontier: new_write_frontier,
806 input_frontier: new_input_frontier,
807 output_frontier: new_output_frontier,
808 };
809 if response.has_updates() {
810 responses.push((id, response));
811 }
812 }
813
814 for (id, frontiers) in responses {
815 self.send_compute_response(ComputeResponse::Frontiers(id, frontiers));
816 }
817 }
818
819 pub fn report_dropped_collections(&mut self) {
821 let dropped_collections = std::mem::take(&mut self.compute_state.dropped_collections);
822
823 for (id, collection) in dropped_collections {
824 if collection.is_subscribe_or_copy {
830 continue;
831 }
832
833 let reported = collection.reported_frontiers;
834 let write_frontier = (!reported.write_frontier.is_empty()).then(Antichain::new);
835 let input_frontier = (!reported.input_frontier.is_empty()).then(Antichain::new);
836 let output_frontier = (!reported.output_frontier.is_empty()).then(Antichain::new);
837
838 let frontiers = FrontiersResponse {
839 write_frontier,
840 input_frontier,
841 output_frontier,
842 };
843 if frontiers.has_updates() {
844 self.send_compute_response(ComputeResponse::Frontiers(id, frontiers));
845 }
846 }
847 }
848
849 pub(crate) fn report_metrics(&self) {
851 if let Some(expiration) = self.compute_state.replica_expiration.as_option() {
852 let now = Duration::from_millis(mz_ore::now::SYSTEM_TIME()).as_secs_f64();
853 let expiration = Duration::from_millis(<u64>::from(expiration)).as_secs_f64();
854 let remaining = expiration - now;
855 self.compute_state
856 .metrics
857 .replica_expiration_remaining_seconds
858 .set(remaining)
859 }
860 }
861
862 fn process_peek(&mut self, upper: &mut Antichain<Timestamp>, mut peek: PendingPeek) {
864 let response = match &mut peek {
865 PendingPeek::Index(peek) => {
866 let peek_stash_eligible = peek
867 .peek
868 .finishing
869 .is_streamable(peek.peek.result_desc.arity());
870
871 let peek_stash_enabled = {
872 let enabled = ENABLE_PEEK_RESPONSE_STASH.get(&self.compute_state.worker_config);
873 let peek_persist_stash_available =
874 self.compute_state.peek_stash_persist_location.is_some();
875 if !peek_persist_stash_available && enabled {
876 tracing::error!(
877 "missing peek_stash_persist_location but peek stash is enabled"
878 );
879 }
880 enabled && peek_persist_stash_available
881 };
882
883 let peek_stash_threshold_bytes =
884 PEEK_RESPONSE_STASH_THRESHOLD_BYTES.get(&self.compute_state.worker_config);
885
886 match peek.seek_fulfillment(
887 upper,
888 self.compute_state.max_result_size,
889 peek_stash_enabled && peek_stash_eligible,
890 peek_stash_threshold_bytes,
891 ) {
892 PeekStatus::Ready(result) => Some(result),
893 PeekStatus::NotReady => None,
894 PeekStatus::UsePeekStash => {
895 let _span =
896 span!(parent: &peek.span, Level::DEBUG, "process_stash_peek").entered();
897
898 let peek_stash_batch_max_runs = PEEK_RESPONSE_STASH_BATCH_MAX_RUNS
899 .get(&self.compute_state.worker_config);
900
901 let stash_task = peek_stash::StashingPeek::start_upload(
902 Arc::clone(&self.compute_state.persist_clients),
903 self.compute_state
904 .peek_stash_persist_location
905 .as_ref()
906 .expect("verified above"),
907 peek.peek.clone(),
908 peek.trace_bundle.clone(),
909 peek_stash_batch_max_runs,
910 );
911
912 self.compute_state
913 .pending_peeks
914 .insert(peek.peek.uuid, PendingPeek::Stash(stash_task));
915 return;
916 }
917 }
918 }
919 PendingPeek::Persist(peek) => peek.result.try_recv().ok().map(|(result, duration)| {
920 self.compute_state
921 .metrics
922 .persist_peek_seconds
923 .observe(duration.as_secs_f64());
924 result
925 }),
926 PendingPeek::Stash(stashing_peek) => {
927 let num_batches = PEEK_STASH_NUM_BATCHES.get(&self.compute_state.worker_config);
928 let batch_size = PEEK_STASH_BATCH_SIZE.get(&self.compute_state.worker_config);
929 stashing_peek.pump_rows(num_batches, batch_size);
930
931 if let Ok((response, duration)) = stashing_peek.result.try_recv() {
932 self.compute_state
933 .metrics
934 .stashed_peek_seconds
935 .observe(duration.as_secs_f64());
936 tracing::trace!(?stashing_peek.peek, ?duration, "finished stashing peek response in persist");
937
938 Some(response)
939 } else {
940 None
941 }
942 }
943 };
944
945 if let Some(response) = response {
946 let _span = span!(parent: peek.span(), Level::DEBUG, "process_peek_response").entered();
947 self.send_peek_response(peek, response)
948 } else {
949 let uuid = peek.peek().uuid;
950 self.compute_state.pending_peeks.insert(uuid, peek);
951 }
952 }
953
954 pub fn process_peeks(&mut self) {
956 let mut upper = Antichain::new();
957 let pending_peeks = std::mem::take(&mut self.compute_state.pending_peeks);
958 for (_uuid, peek) in pending_peeks {
959 self.process_peek(&mut upper, peek);
960 }
961 }
962
963 #[mz_ore::instrument(level = "debug")]
968 fn send_peek_response(&mut self, peek: PendingPeek, response: PeekResponse) {
969 let log_event = peek.as_log_event(false);
970 self.send_compute_response(ComputeResponse::PeekResponse(
972 peek.peek().uuid,
973 response,
974 OpenTelemetryContext::obtain(),
975 ));
976
977 if let Some(logger) = self.compute_state.compute_logger.as_mut() {
979 logger.log(&log_event);
980 }
981 }
982
983 pub fn process_subscribes(&mut self) {
985 let mut subscribe_responses = self.compute_state.subscribe_response_buffer.borrow_mut();
986 for (sink_id, mut response) in subscribe_responses.drain(..) {
987 if let Some(collection) = self.compute_state.collections.get_mut(&sink_id) {
989 let new_frontier = match &response {
990 SubscribeResponse::Batch(b) => b.upper.clone(),
991 SubscribeResponse::DroppedAt(_) => Antichain::new(),
992 };
993
994 let reported = collection.reported_frontiers();
995 assert!(
996 reported.write_frontier.allows_reporting(&new_frontier),
997 "subscribe write frontier regression: {:?} -> {:?}",
998 reported.write_frontier,
999 new_frontier,
1000 );
1001 assert!(
1002 reported.input_frontier.allows_reporting(&new_frontier),
1003 "subscribe input frontier regression: {:?} -> {:?}",
1004 reported.input_frontier,
1005 new_frontier,
1006 );
1007
1008 collection
1009 .set_reported_write_frontier(ReportedFrontier::Reported(new_frontier.clone()));
1010 collection
1011 .set_reported_input_frontier(ReportedFrontier::Reported(new_frontier.clone()));
1012 collection.set_reported_output_frontier(ReportedFrontier::Reported(new_frontier));
1013 } else {
1014 }
1017
1018 response
1019 .to_error_if_exceeds(usize::try_from(self.compute_state.max_result_size).unwrap());
1020 self.send_compute_response(ComputeResponse::SubscribeResponse(sink_id, response));
1021 }
1022 }
1023
1024 pub fn process_copy_tos(&self) {
1026 let mut responses = self.compute_state.copy_to_response_buffer.borrow_mut();
1027 for (sink_id, response) in responses.drain(..) {
1028 self.send_compute_response(ComputeResponse::CopyToResponse(sink_id, response));
1029 }
1030 }
1031
1032 fn send_compute_response(&self, response: ComputeResponse) {
1034 let _ = self.response_tx.send(response);
1037 }
1038
1039 pub(crate) fn check_expiration(&self) {
1041 let now = mz_ore::now::SYSTEM_TIME();
1042 if self.compute_state.replica_expiration.less_than(&now.into()) {
1043 let now_datetime = mz_ore::now::to_datetime(now);
1044 let expiration_datetime = self
1045 .compute_state
1046 .replica_expiration
1047 .as_option()
1048 .map(Into::into)
1049 .map(mz_ore::now::to_datetime);
1050
1051 error!(
1054 now,
1055 now_datetime = ?now_datetime,
1056 expiration = ?self.compute_state.replica_expiration.elements(),
1057 expiration_datetime = ?expiration_datetime,
1058 "replica expired"
1059 );
1060
1061 assert!(
1063 !self.compute_state.replica_expiration.less_than(&now.into()),
1064 "replica expired. now: {now} ({now_datetime:?}), expiration: {:?} ({expiration_datetime:?})",
1065 self.compute_state.replica_expiration.elements(),
1066 );
1067 }
1068 }
1069
1070 pub fn determine_dataflow_expiration(
1076 &self,
1077 time_dependence: &TimeDependence,
1078 until: &Antichain<mz_repr::Timestamp>,
1079 ) -> Antichain<mz_repr::Timestamp> {
1080 let iter = self
1085 .compute_state
1086 .replica_expiration
1087 .iter()
1088 .filter_map(|t| time_dependence.apply(*t))
1089 .filter_map(|t| mz_repr::Timestamp::try_step_forward(&t))
1090 .filter(|expiration| !until.less_equal(expiration));
1091 Antichain::from_iter(iter)
1092 }
1093}
1094
1095pub enum PendingPeek {
1100 Index(IndexPeek),
1102 Persist(PersistPeek),
1104 Stash(peek_stash::StashingPeek),
1107}
1108
1109impl PendingPeek {
1110 pub fn as_log_event(&self, installed: bool) -> ComputeEvent {
1112 let peek = self.peek();
1113 let (id, peek_type) = match &peek.target {
1114 PeekTarget::Index { id } => (*id, logging::compute::PeekType::Index),
1115 PeekTarget::Persist { id, .. } => (*id, logging::compute::PeekType::Persist),
1116 };
1117 let uuid = peek.uuid.into_bytes();
1118 ComputeEvent::Peek(PeekEvent {
1119 id,
1120 time: peek.timestamp,
1121 uuid,
1122 peek_type,
1123 installed,
1124 })
1125 }
1126
1127 fn index(peek: Peek, mut trace_bundle: TraceBundle) -> Self {
1128 let empty_frontier = Antichain::new();
1129 let timestamp_frontier = Antichain::from_elem(peek.timestamp);
1130 trace_bundle
1131 .oks_mut()
1132 .set_logical_compaction(timestamp_frontier.borrow());
1133 trace_bundle
1134 .errs_mut()
1135 .set_logical_compaction(timestamp_frontier.borrow());
1136 trace_bundle
1137 .oks_mut()
1138 .set_physical_compaction(empty_frontier.borrow());
1139 trace_bundle
1140 .errs_mut()
1141 .set_physical_compaction(empty_frontier.borrow());
1142
1143 PendingPeek::Index(IndexPeek {
1144 peek,
1145 trace_bundle,
1146 span: tracing::Span::current(),
1147 })
1148 }
1149
1150 fn persist<A: Allocate>(
1151 peek: Peek,
1152 persist_clients: Arc<PersistClientCache>,
1153 metadata: CollectionMetadata,
1154 max_result_size: usize,
1155 timely_worker: &TimelyWorker<A>,
1156 ) -> Self {
1157 let active_worker = {
1158 let chosen_index = usize::cast_from(peek.uuid.hashed()) % timely_worker.peers();
1160 chosen_index == timely_worker.index()
1161 };
1162 let activator = timely_worker.sync_activator_for([].into());
1163 let peek_uuid = peek.uuid;
1164
1165 let (result_tx, result_rx) = oneshot::channel();
1166 let timestamp = peek.timestamp;
1167 let mfp_plan = peek.map_filter_project.clone();
1168 let max_results_needed = peek
1169 .finishing
1170 .limit
1171 .map(|l| usize::cast_from(u64::from(l)))
1172 .unwrap_or(usize::MAX)
1173 + peek.finishing.offset;
1174 let order_by = peek.finishing.order_by.clone();
1175
1176 let literal_constraint = peek
1178 .literal_constraints
1179 .clone()
1180 .map(|rows| rows.into_element());
1181
1182 let task_handle = mz_ore::task::spawn(|| "persist::peek", async move {
1183 let start = Instant::now();
1184 let result = if active_worker {
1185 PersistPeek::do_peek(
1186 &persist_clients,
1187 metadata,
1188 timestamp,
1189 literal_constraint,
1190 mfp_plan,
1191 max_result_size,
1192 max_results_needed,
1193 )
1194 .await
1195 } else {
1196 Ok(vec![])
1197 };
1198 let result = match result {
1199 Ok(rows) => PeekResponse::Rows(RowCollection::new(rows, &order_by)),
1200 Err(e) => PeekResponse::Error(e.to_string()),
1201 };
1202 match result_tx.send((result, start.elapsed())) {
1203 Ok(()) => {}
1204 Err((_result, elapsed)) => {
1205 debug!(duration =? elapsed, "dropping result for cancelled peek {peek_uuid}")
1206 }
1207 }
1208 match activator.activate() {
1209 Ok(()) => {}
1210 Err(_) => {
1211 debug!("unable to wake timely after completed peek {peek_uuid}");
1212 }
1213 }
1214 });
1215 PendingPeek::Persist(PersistPeek {
1216 peek,
1217 _abort_handle: task_handle.abort_on_drop(),
1218 result: result_rx,
1219 span: tracing::Span::current(),
1220 })
1221 }
1222
1223 fn span(&self) -> &tracing::Span {
1224 match self {
1225 PendingPeek::Index(p) => &p.span,
1226 PendingPeek::Persist(p) => &p.span,
1227 PendingPeek::Stash(p) => &p.span,
1228 }
1229 }
1230
1231 pub(crate) fn peek(&self) -> &Peek {
1232 match self {
1233 PendingPeek::Index(p) => &p.peek,
1234 PendingPeek::Persist(p) => &p.peek,
1235 PendingPeek::Stash(p) => &p.peek,
1236 }
1237 }
1238}
1239
1240pub struct PersistPeek {
1245 pub(crate) peek: Peek,
1246 _abort_handle: AbortOnDropHandle<()>,
1249 result: oneshot::Receiver<(PeekResponse, Duration)>,
1251 span: tracing::Span,
1253}
1254
1255impl PersistPeek {
1256 async fn do_peek(
1257 persist_clients: &PersistClientCache,
1258 metadata: CollectionMetadata,
1259 as_of: Timestamp,
1260 literal_constraint: Option<Row>,
1261 mfp_plan: SafeMfpPlan,
1262 max_result_size: usize,
1263 mut limit_remaining: usize,
1264 ) -> Result<Vec<(Row, NonZeroUsize)>, String> {
1265 let client = persist_clients
1266 .open(metadata.persist_location)
1267 .await
1268 .map_err(|e| e.to_string())?;
1269
1270 let mut reader: ReadHandle<SourceData, (), Timestamp, StorageDiff> = client
1271 .open_leased_reader(
1272 metadata.data_shard,
1273 Arc::new(metadata.relation_desc.clone()),
1274 Arc::new(UnitSchema),
1275 Diagnostics::from_purpose("persist::peek"),
1276 USE_CRITICAL_SINCE_SNAPSHOT.get(client.dyncfgs()),
1277 )
1278 .await
1279 .map_err(|e| e.to_string())?;
1280
1281 let mut txns_read = if let Some(txns_id) = metadata.txns_shard {
1288 Some(TxnsCache::open(&client, txns_id, Some(metadata.data_shard)).await)
1289 } else {
1290 None
1291 };
1292
1293 let metrics = client.metrics();
1294
1295 let mut cursor = StatsCursor::new(
1296 &mut reader,
1297 txns_read.as_mut(),
1298 metrics,
1299 &mfp_plan,
1300 &metadata.relation_desc,
1301 Antichain::from_elem(as_of),
1302 )
1303 .await
1304 .map_err(|since| {
1305 format!("attempted to peek at {as_of}, but the since has advanced to {since:?}")
1306 })?;
1307
1308 let mut result = vec![];
1310 let mut datum_vec = DatumVec::new();
1311 let mut row_builder = Row::default();
1312 let arena = RowArena::new();
1313 let mut total_size = 0usize;
1314
1315 let literal_len = match &literal_constraint {
1316 None => 0,
1317 Some(row) => row.iter().count(),
1318 };
1319
1320 'collect: while limit_remaining > 0 {
1321 let Some(batch) = cursor.next().await else {
1322 break;
1323 };
1324 for (data, _, d) in batch {
1325 let row = data.map_err(|e| e.to_string())?;
1326
1327 if let Some(literal) = &literal_constraint {
1328 match row.iter().take(literal_len).cmp(literal.iter()) {
1329 Ordering::Less => continue,
1330 Ordering::Equal => {}
1331 Ordering::Greater => break 'collect,
1332 }
1333 }
1334
1335 let count: usize = d.try_into().map_err(|_| {
1336 tracing::error!(
1337 shard = %metadata.data_shard, diff = d, ?row,
1338 "persist peek encountered negative multiplicities",
1339 );
1340 format!(
1341 "Invalid data in source, \
1342 saw retractions ({}) for row that does not exist: {:?}",
1343 -d, row,
1344 )
1345 })?;
1346 let Some(count) = NonZeroUsize::new(count) else {
1347 continue;
1348 };
1349 let mut datum_local = datum_vec.borrow_with(&row);
1350 let eval_result = mfp_plan
1351 .evaluate_into(&mut datum_local, &arena, &mut row_builder)
1352 .map(|row| row.cloned())
1353 .map_err(|e| e.to_string())?;
1354 if let Some(row) = eval_result {
1355 total_size = total_size
1356 .saturating_add(row.byte_len())
1357 .saturating_add(std::mem::size_of::<NonZeroUsize>());
1358 if total_size > max_result_size {
1359 return Err(format!(
1360 "result exceeds max size of {}",
1361 ByteSize::b(u64::cast_from(max_result_size))
1362 ));
1363 }
1364 result.push((row, count));
1365 limit_remaining = limit_remaining.saturating_sub(count.get());
1366 if limit_remaining == 0 {
1367 break;
1368 }
1369 }
1370 }
1371 }
1372
1373 Ok(result)
1374 }
1375}
1376
1377pub struct IndexPeek {
1379 peek: Peek,
1380 trace_bundle: TraceBundle,
1382 span: tracing::Span,
1384}
1385
1386impl IndexPeek {
1387 fn seek_fulfillment(
1400 &mut self,
1401 upper: &mut Antichain<Timestamp>,
1402 max_result_size: u64,
1403 peek_stash_eligible: bool,
1404 peek_stash_threshold_bytes: usize,
1405 ) -> PeekStatus {
1406 self.trace_bundle.oks_mut().read_upper(upper);
1407 if upper.less_equal(&self.peek.timestamp) {
1408 return PeekStatus::NotReady;
1409 }
1410 self.trace_bundle.errs_mut().read_upper(upper);
1411 if upper.less_equal(&self.peek.timestamp) {
1412 return PeekStatus::NotReady;
1413 }
1414
1415 let read_frontier = self.trace_bundle.compaction_frontier();
1416 if !read_frontier.less_equal(&self.peek.timestamp) {
1417 let error = format!(
1418 "Arrangement compaction frontier ({:?}) is beyond the time of the attempted read ({})",
1419 read_frontier.elements(),
1420 self.peek.timestamp,
1421 );
1422 return PeekStatus::Ready(PeekResponse::Error(error));
1423 }
1424
1425 self.collect_finished_data(
1426 max_result_size,
1427 peek_stash_eligible,
1428 peek_stash_threshold_bytes,
1429 )
1430 }
1431
1432 fn collect_finished_data(
1434 &mut self,
1435 max_result_size: u64,
1436 peek_stash_eligible: bool,
1437 peek_stash_threshold_bytes: usize,
1438 ) -> PeekStatus {
1439 let (mut cursor, storage) = self.trace_bundle.errs_mut().cursor();
1442 while cursor.key_valid(&storage) {
1443 let mut copies = Diff::ZERO;
1444 cursor.map_times(&storage, |time, diff| {
1445 if time.less_equal(&self.peek.timestamp) {
1446 copies += diff;
1447 }
1448 });
1449 if copies.is_negative() {
1450 let error = cursor.key(&storage);
1451 tracing::error!(
1452 target = %self.peek.target.id(), diff = %copies, %error,
1453 "index peek encountered negative multiplicities in error trace",
1454 );
1455 return PeekStatus::Ready(PeekResponse::Error(format!(
1456 "Invalid data in source errors, \
1457 saw retractions ({}) for row that does not exist: {}",
1458 -copies, error,
1459 )));
1460 }
1461 if copies.is_positive() {
1462 return PeekStatus::Ready(PeekResponse::Error(cursor.key(&storage).to_string()));
1463 }
1464 cursor.step_key(&storage);
1465 }
1466
1467 Self::collect_ok_finished_data(
1468 &self.peek,
1469 self.trace_bundle.oks_mut(),
1470 max_result_size,
1471 peek_stash_eligible,
1472 peek_stash_threshold_bytes,
1473 )
1474 }
1475
1476 fn collect_ok_finished_data<Tr>(
1478 peek: &Peek<Timestamp>,
1479 oks_handle: &mut Tr,
1480 max_result_size: u64,
1481 peek_stash_eligible: bool,
1482 peek_stash_threshold_bytes: usize,
1483 ) -> PeekStatus
1484 where
1485 for<'a> Tr: TraceReader<
1486 Key<'a>: ToDatumIter + Eq,
1487 KeyOwn = Row,
1488 Val<'a>: ToDatumIter,
1489 TimeGat<'a>: PartialOrder<mz_repr::Timestamp>,
1490 DiffGat<'a> = &'a Diff,
1491 >,
1492 {
1493 let max_result_size = usize::cast_from(max_result_size);
1494 let count_byte_size = size_of::<NonZeroUsize>();
1495
1496 let mut peek_iterator = peek_result_iterator::PeekResultIterator::new(
1499 peek.target.id().clone(),
1500 peek.map_filter_project.clone(),
1501 peek.timestamp,
1502 peek.literal_constraints.clone().as_deref_mut(),
1503 oks_handle,
1504 );
1505
1506 let mut results = Vec::new();
1508 let mut total_size: usize = 0;
1509
1510 let max_results = peek.finishing.num_rows_needed();
1516
1517 let mut l_datum_vec = DatumVec::new();
1518 let mut r_datum_vec = DatumVec::new();
1519
1520 while let Some(row) = peek_iterator.next() {
1521 let row = match row {
1522 Ok(row) => row,
1523 Err(err) => return PeekStatus::Ready(PeekResponse::Error(err)),
1524 };
1525 let (row, copies) = row;
1526 let copies: NonZeroUsize = NonZeroUsize::try_from(copies).expect("fits into usize");
1527
1528 total_size = total_size
1529 .saturating_add(row.byte_len())
1530 .saturating_add(count_byte_size);
1531 if peek_stash_eligible && total_size > peek_stash_threshold_bytes {
1532 return PeekStatus::UsePeekStash;
1533 }
1534 if total_size > max_result_size {
1535 return PeekStatus::Ready(PeekResponse::Error(format!(
1536 "result exceeds max size of {}",
1537 ByteSize::b(u64::cast_from(max_result_size))
1538 )));
1539 }
1540
1541 results.push((row, copies));
1542
1543 if let Some(max_results) = max_results {
1546 if results.len() >= 2 * max_results {
1550 if peek.finishing.order_by.is_empty() {
1551 results.truncate(max_results);
1552 return PeekStatus::Ready(PeekResponse::Rows(RowCollection::new(
1553 results,
1554 &peek.finishing.order_by,
1555 )));
1556 } else {
1557 results.sort_by(|left, right| {
1566 let left_datums = l_datum_vec.borrow_with(&left.0);
1567 let right_datums = r_datum_vec.borrow_with(&right.0);
1568 mz_expr::compare_columns(
1569 &peek.finishing.order_by,
1570 &left_datums,
1571 &right_datums,
1572 || left.0.cmp(&right.0),
1573 )
1574 });
1575 let dropped = results.drain(max_results..);
1576 let dropped_size =
1577 dropped.into_iter().fold(0, |acc: usize, (row, _count)| {
1578 acc.saturating_add(row.byte_len().saturating_add(count_byte_size))
1579 });
1580 total_size = total_size.saturating_sub(dropped_size);
1581 }
1582 }
1583 }
1584 }
1585
1586 PeekStatus::Ready(PeekResponse::Rows(RowCollection::new(
1587 results,
1588 &peek.finishing.order_by,
1589 )))
1590 }
1591}
1592
1593enum PeekStatus {
1596 NotReady,
1599 UsePeekStash,
1602 Ready(PeekResponse),
1604}
1605
1606#[derive(Debug)]
1608struct ReportedFrontiers {
1609 write_frontier: ReportedFrontier,
1611 input_frontier: ReportedFrontier,
1613 output_frontier: ReportedFrontier,
1615}
1616
1617impl ReportedFrontiers {
1618 fn new() -> Self {
1620 Self {
1621 write_frontier: ReportedFrontier::new(),
1622 input_frontier: ReportedFrontier::new(),
1623 output_frontier: ReportedFrontier::new(),
1624 }
1625 }
1626}
1627
1628#[derive(Clone, Debug)]
1630pub enum ReportedFrontier {
1631 Reported(Antichain<Timestamp>),
1633 NotReported {
1635 lower: Antichain<Timestamp>,
1637 },
1638}
1639
1640impl ReportedFrontier {
1641 pub fn new() -> Self {
1643 let lower = Antichain::from_elem(timely::progress::Timestamp::minimum());
1644 Self::NotReported { lower }
1645 }
1646
1647 pub fn is_empty(&self) -> bool {
1649 match self {
1650 Self::Reported(frontier) => frontier.is_empty(),
1651 Self::NotReported { .. } => false,
1652 }
1653 }
1654
1655 fn allows_reporting(&self, other: &Antichain<Timestamp>) -> bool {
1661 match self {
1662 Self::Reported(frontier) => PartialOrder::less_than(frontier, other),
1663 Self::NotReported { lower } => PartialOrder::less_equal(lower, other),
1664 }
1665 }
1666}
1667
1668pub struct CollectionState {
1670 reported_frontiers: ReportedFrontiers,
1672 dataflow_index: Rc<usize>,
1678 pub is_subscribe_or_copy: bool,
1684 as_of: Antichain<Timestamp>,
1688
1689 pub sink_token: Option<SinkToken>,
1694 pub sink_write_frontier: Option<Rc<RefCell<Antichain<Timestamp>>>>,
1698 pub input_probes: BTreeMap<GlobalId, probe::Handle<Timestamp>>,
1700 pub compute_probe: Option<probe::Handle<Timestamp>>,
1705 logging: Option<CollectionLogging>,
1707 metrics: CollectionMetrics,
1709}
1710
1711impl CollectionState {
1712 fn new(
1713 dataflow_index: Rc<usize>,
1714 is_subscribe_or_copy: bool,
1715 as_of: Antichain<Timestamp>,
1716 metrics: CollectionMetrics,
1717 ) -> Self {
1718 Self {
1719 reported_frontiers: ReportedFrontiers::new(),
1720 dataflow_index,
1721 is_subscribe_or_copy,
1722 as_of,
1723 sink_token: None,
1724 sink_write_frontier: None,
1725 input_probes: Default::default(),
1726 compute_probe: None,
1727 logging: None,
1728 metrics,
1729 }
1730 }
1731
1732 fn reported_frontiers(&self) -> &ReportedFrontiers {
1734 &self.reported_frontiers
1735 }
1736
1737 pub fn reset_reported_frontiers(&mut self, frontier: ReportedFrontier) {
1739 self.reported_frontiers.write_frontier = frontier.clone();
1740 self.reported_frontiers.input_frontier = frontier.clone();
1741 self.reported_frontiers.output_frontier = frontier;
1742 }
1743
1744 fn set_reported_write_frontier(&mut self, frontier: ReportedFrontier) {
1746 if let Some(logging) = &mut self.logging {
1747 let time = match &frontier {
1748 ReportedFrontier::Reported(frontier) => frontier.get(0).copied(),
1749 ReportedFrontier::NotReported { .. } => Some(Timestamp::MIN),
1750 };
1751 logging.set_frontier(time);
1752 }
1753
1754 self.reported_frontiers.write_frontier = frontier;
1755 }
1756
1757 fn set_reported_input_frontier(&mut self, frontier: ReportedFrontier) {
1759 if let Some(logging) = &mut self.logging {
1761 for (id, probe) in &self.input_probes {
1762 let new_time = probe.with_frontier(|frontier| frontier.as_option().copied());
1763 logging.set_import_frontier(*id, new_time);
1764 }
1765 }
1766
1767 self.reported_frontiers.input_frontier = frontier;
1768 }
1769
1770 fn set_reported_output_frontier(&mut self, frontier: ReportedFrontier) {
1772 let already_hydrated = self.hydrated();
1773
1774 self.reported_frontiers.output_frontier = frontier;
1775
1776 if !already_hydrated && self.hydrated() {
1777 if let Some(logging) = &mut self.logging {
1778 logging.set_hydrated();
1779 }
1780 self.metrics.record_collection_hydrated();
1781 }
1782 }
1783
1784 fn hydrated(&self) -> bool {
1786 match &self.reported_frontiers.output_frontier {
1787 ReportedFrontier::Reported(frontier) => PartialOrder::less_than(&self.as_of, frontier),
1788 ReportedFrontier::NotReported { .. } => false,
1789 }
1790 }
1791}
1792
1793pub struct DroppedCollection {
1805 reported_frontiers: ReportedFrontiers,
1806 is_subscribe_or_copy: bool,
1807}