1use std::any::Any;
9use std::cell::RefCell;
10use std::cmp::Ordering;
11use std::collections::{BTreeMap, BTreeSet};
12use std::num::NonZeroUsize;
13
14use std::rc::Rc;
15use std::sync::Arc;
16use std::time::{Duration, Instant};
17
18use bytesize::ByteSize;
19use differential_dataflow::Hashable;
20use differential_dataflow::lattice::Lattice;
21use differential_dataflow::trace::{Cursor, TraceReader};
22use mz_compute_client::logging::LoggingConfig;
23use mz_compute_client::protocol::command::{
24 ComputeCommand, ComputeParameters, InstanceConfig, Peek, PeekTarget,
25};
26use mz_compute_client::protocol::history::ComputeCommandHistory;
27use mz_compute_client::protocol::response::{
28 ComputeResponse, CopyToResponse, FrontiersResponse, PeekResponse, SubscribeResponse,
29};
30use mz_compute_types::dataflows::DataflowDescription;
31use mz_compute_types::dyncfgs::{
32 ENABLE_ACTIVE_DATAFLOW_CANCELATION, ENABLE_PEEK_RESPONSE_STASH,
33 PEEK_RESPONSE_STASH_BATCH_MAX_RUNS, PEEK_RESPONSE_STASH_THRESHOLD_BYTES, PEEK_STASH_BATCH_SIZE,
34 PEEK_STASH_NUM_BATCHES,
35};
36use mz_compute_types::plan::render_plan::RenderPlan;
37use mz_dyncfg::ConfigSet;
38use mz_expr::SafeMfpPlan;
39use mz_expr::row::RowCollection;
40use mz_ore::cast::CastFrom;
41use mz_ore::collections::CollectionExt;
42use mz_ore::metrics::UIntGauge;
43use mz_ore::now::EpochMillis;
44use mz_ore::task::AbortOnDropHandle;
45use mz_ore::tracing::{OpenTelemetryContext, TracingHandle};
46use mz_persist_client::Diagnostics;
47use mz_persist_client::cache::PersistClientCache;
48use mz_persist_client::cfg::USE_CRITICAL_SINCE_SNAPSHOT;
49use mz_persist_client::read::ReadHandle;
50use mz_persist_types::PersistLocation;
51use mz_persist_types::codec_impls::UnitSchema;
52use mz_repr::fixed_length::ToDatumIter;
53use mz_repr::{DatumVec, Diff, GlobalId, Row, RowArena, Timestamp};
54use mz_storage_operators::stats::StatsCursor;
55use mz_storage_types::StorageDiff;
56use mz_storage_types::controller::CollectionMetadata;
57use mz_storage_types::dyncfgs::ORE_OVERFLOWING_BEHAVIOR;
58use mz_storage_types::sources::SourceData;
59use mz_storage_types::time_dependence::TimeDependence;
60use mz_txn_wal::operator::TxnsContext;
61use mz_txn_wal::txn_cache::TxnsCache;
62use timely::communication::Allocate;
63use timely::dataflow::operators::probe;
64use timely::order::PartialOrder;
65use timely::progress::frontier::Antichain;
66use timely::scheduling::Scheduler;
67use timely::worker::Worker as TimelyWorker;
68use tokio::sync::{oneshot, watch};
69use tracing::{Level, debug, error, info, span, warn};
70use uuid::Uuid;
71
72use crate::arrangement::manager::{TraceBundle, TraceManager};
73use crate::logging;
74use crate::logging::compute::{CollectionLogging, ComputeEvent, PeekEvent};
75use crate::logging::initialize::LoggingTraces;
76use crate::metrics::{CollectionMetrics, WorkerMetrics};
77use crate::render::{LinearJoinSpec, StartSignal};
78use crate::server::{ComputeInstanceContext, ResponseSender};
79
80mod peek_result_iterator;
81mod peek_stash;
82
83pub struct ComputeState {
88 pub collections: BTreeMap<GlobalId, CollectionState>,
97 pub dropped_collections: Vec<(GlobalId, DroppedCollection)>,
99 pub traces: TraceManager,
101 pub subscribe_response_buffer: Rc<RefCell<Vec<(GlobalId, SubscribeResponse)>>>,
106 pub copy_to_response_buffer: Rc<RefCell<Vec<(GlobalId, CopyToResponse)>>>,
111 pub pending_peeks: BTreeMap<Uuid, PendingPeek>,
113 pub peek_stash_persist_location: Option<PersistLocation>,
115 pub compute_logger: Option<logging::compute::Logger>,
117 pub persist_clients: Arc<PersistClientCache>,
120 pub txns_ctx: TxnsContext,
122 pub command_history: ComputeCommandHistory<UIntGauge>,
124 max_result_size: u64,
126 pub linear_join_spec: LinearJoinSpec,
128 pub metrics: WorkerMetrics,
130 tracing_handle: Arc<TracingHandle>,
132 pub context: ComputeInstanceContext,
134 pub worker_config: Rc<ConfigSet>,
148
149 suspended_collections: BTreeMap<GlobalId, Rc<dyn Any>>,
155
156 pub read_only_rx: watch::Receiver<bool>,
167
168 pub read_only_tx: watch::Sender<bool>,
170
171 pub server_maintenance_interval: Duration,
174
175 pub init_system_time: EpochMillis,
179
180 pub replica_expiration: Antichain<Timestamp>,
184}
185
186impl ComputeState {
187 pub fn new(
189 persist_clients: Arc<PersistClientCache>,
190 txns_ctx: TxnsContext,
191 metrics: WorkerMetrics,
192 tracing_handle: Arc<TracingHandle>,
193 context: ComputeInstanceContext,
194 ) -> Self {
195 let traces = TraceManager::new(metrics.clone());
196 let command_history = ComputeCommandHistory::new(metrics.for_history());
197
198 let (read_only_tx, read_only_rx) = watch::channel(true);
201
202 Self {
203 collections: Default::default(),
204 dropped_collections: Default::default(),
205 traces,
206 subscribe_response_buffer: Default::default(),
207 copy_to_response_buffer: Default::default(),
208 pending_peeks: Default::default(),
209 peek_stash_persist_location: None,
210 compute_logger: None,
211 persist_clients,
212 txns_ctx,
213 command_history,
214 max_result_size: u64::MAX,
215 linear_join_spec: Default::default(),
216 metrics,
217 tracing_handle,
218 context,
219 worker_config: mz_dyncfgs::all_dyncfgs().into(),
220 suspended_collections: Default::default(),
221 read_only_tx,
222 read_only_rx,
223 server_maintenance_interval: Duration::ZERO,
224 init_system_time: mz_ore::now::SYSTEM_TIME(),
225 replica_expiration: Antichain::default(),
226 }
227 }
228
229 pub fn expect_collection_mut(&mut self, id: GlobalId) -> &mut CollectionState {
233 self.collections
234 .get_mut(&id)
235 .expect("collection must exist")
236 }
237
238 pub fn input_probe_for(
244 &mut self,
245 input_id: GlobalId,
246 collection_ids: impl Iterator<Item = GlobalId>,
247 ) -> probe::Handle<Timestamp> {
248 let probe = probe::Handle::default();
249 for id in collection_ids {
250 if let Some(collection) = self.collections.get_mut(&id) {
251 collection.input_probes.insert(input_id, probe.clone());
252 }
253 }
254 probe
255 }
256
257 fn apply_worker_config(&mut self) {
259 use mz_compute_types::dyncfgs::*;
260
261 let config = &self.worker_config;
262
263 self.linear_join_spec = LinearJoinSpec::from_config(config);
264
265 if ENABLE_LGALLOC.get(config) {
266 if let Some(path) = &self.context.scratch_directory {
267 let clear_bytes = LGALLOC_SLOW_CLEAR_BYTES.get(config);
268 let eager_return = ENABLE_LGALLOC_EAGER_RECLAMATION.get(config);
269 let file_growth_dampener = LGALLOC_FILE_GROWTH_DAMPENER.get(config);
270 let interval = LGALLOC_BACKGROUND_INTERVAL.get(config);
271 let local_buffer_bytes = LGALLOC_LOCAL_BUFFER_BYTES.get(config);
272 info!(
273 ?path,
274 backgrund_interval=?interval,
275 clear_bytes,
276 eager_return,
277 file_growth_dampener,
278 local_buffer_bytes,
279 "enabling lgalloc"
280 );
281 let background_worker_config = lgalloc::BackgroundWorkerConfig {
282 interval,
283 clear_bytes,
284 };
285 lgalloc::lgalloc_set_config(
286 lgalloc::LgAlloc::new()
287 .enable()
288 .with_path(path.clone())
289 .with_background_config(background_worker_config)
290 .eager_return(eager_return)
291 .file_growth_dampener(file_growth_dampener)
292 .local_buffer_bytes(local_buffer_bytes),
293 );
294 } else {
295 debug!("not enabling lgalloc, scratch directory not specified");
296 }
297 } else {
298 info!("disabling lgalloc");
299 lgalloc::lgalloc_set_config(lgalloc::LgAlloc::new().disable());
300 }
301
302 crate::memory_limiter::apply_limiter_config(config);
303
304 mz_ore::region::ENABLE_LGALLOC_REGION.store(
305 ENABLE_COLUMNATION_LGALLOC.get(config),
306 std::sync::atomic::Ordering::Relaxed,
307 );
308
309 let enable_columnar_lgalloc = ENABLE_COLUMNAR_LGALLOC.get(config);
310 mz_timely_util::containers::set_enable_columnar_lgalloc(enable_columnar_lgalloc);
311
312 self.server_maintenance_interval = COMPUTE_SERVER_MAINTENANCE_INTERVAL.get(config);
315
316 let overflowing_behavior = ORE_OVERFLOWING_BEHAVIOR.get(config);
317 match overflowing_behavior.parse() {
318 Ok(behavior) => mz_ore::overflowing::set_behavior(behavior),
319 Err(err) => {
320 error!(
321 err,
322 overflowing_behavior, "Invalid value for ore_overflowing_behavior"
323 );
324 }
325 }
326 }
327
328 pub fn apply_expiration_offset(&mut self, offset: Duration) {
334 if self.replica_expiration.is_empty() {
335 let offset: EpochMillis = offset
336 .as_millis()
337 .try_into()
338 .expect("duration must fit within u64");
339 let replica_expiration_millis = self.init_system_time + offset;
340 let replica_expiration = Timestamp::from(replica_expiration_millis);
341
342 info!(
343 offset = %offset,
344 replica_expiration_millis = %replica_expiration_millis,
345 replica_expiration_utc = %mz_ore::now::to_datetime(replica_expiration_millis),
346 "setting replica expiration",
347 );
348 self.replica_expiration = Antichain::from_elem(replica_expiration);
349
350 self.metrics
352 .replica_expiration_timestamp_seconds
353 .set(replica_expiration.into());
354 }
355 }
356
357 pub fn dataflow_max_inflight_bytes(&self) -> Option<usize> {
360 use mz_compute_types::dyncfgs::{
361 DATAFLOW_MAX_INFLIGHT_BYTES, DATAFLOW_MAX_INFLIGHT_BYTES_CC,
362 };
363
364 if self.persist_clients.cfg.is_cc_active {
365 DATAFLOW_MAX_INFLIGHT_BYTES_CC.get(&self.worker_config)
366 } else {
367 DATAFLOW_MAX_INFLIGHT_BYTES.get(&self.worker_config)
368 }
369 }
370}
371
372pub(crate) struct ActiveComputeState<'a, A: Allocate> {
374 pub timely_worker: &'a mut TimelyWorker<A>,
376 pub compute_state: &'a mut ComputeState,
378 pub response_tx: &'a mut ResponseSender,
380}
381
382pub struct SinkToken(#[allow(dead_code)] Box<dyn Any>);
384
385impl SinkToken {
386 pub fn new(t: Box<dyn Any>) -> Self {
388 Self(t)
389 }
390}
391
392impl<'a, A: Allocate + 'static> ActiveComputeState<'a, A> {
393 #[mz_ore::instrument(level = "debug")]
395 pub fn handle_compute_command(&mut self, cmd: ComputeCommand) {
396 use ComputeCommand::*;
397
398 self.compute_state.command_history.push(cmd.clone());
399
400 let timer = self
402 .compute_state
403 .metrics
404 .handle_command_duration_seconds
405 .for_command(&cmd)
406 .start_timer();
407
408 match cmd {
409 Hello { .. } => panic!("Hello must be captured before"),
410 CreateInstance(instance_config) => self.handle_create_instance(*instance_config),
411 InitializationComplete => (),
412 UpdateConfiguration(params) => self.handle_update_configuration(*params),
413 CreateDataflow(dataflow) => self.handle_create_dataflow(*dataflow),
414 Schedule(id) => self.handle_schedule(id),
415 AllowCompaction { id, frontier } => self.handle_allow_compaction(id, frontier),
416 Peek(peek) => {
417 peek.otel_ctx.attach_as_parent();
418 self.handle_peek(*peek)
419 }
420 CancelPeek { uuid } => self.handle_cancel_peek(uuid),
421 AllowWrites => {
422 self.compute_state
423 .read_only_tx
424 .send(false)
425 .expect("we're holding one other end");
426 self.compute_state.persist_clients.cfg().enable_compaction();
427 }
428 }
429
430 timer.observe_duration();
431 }
432
433 fn handle_create_instance(&mut self, config: InstanceConfig) {
434 self.compute_state.apply_worker_config();
436 if let Some(offset) = config.expiration_offset {
437 self.compute_state.apply_expiration_offset(offset);
438 }
439
440 self.initialize_logging(config.logging);
441
442 self.compute_state.peek_stash_persist_location = Some(config.peek_stash_persist_location);
443 }
444
445 fn handle_update_configuration(&mut self, params: ComputeParameters) {
446 info!("Applying configuration update: {params:?}");
447
448 let ComputeParameters {
449 workload_class,
450 max_result_size,
451 tracing,
452 grpc_client: _grpc_client,
453 dyncfg_updates,
454 } = params;
455
456 if let Some(v) = workload_class {
457 self.compute_state.metrics.set_workload_class(v);
458 }
459 if let Some(v) = max_result_size {
460 self.compute_state.max_result_size = v;
461 }
462
463 tracing.apply(self.compute_state.tracing_handle.as_ref());
464
465 dyncfg_updates.apply(&self.compute_state.worker_config);
466 self.compute_state
467 .persist_clients
468 .cfg()
469 .apply_from(&dyncfg_updates);
470
471 mz_metrics::update_dyncfg(&dyncfg_updates);
475
476 self.compute_state.apply_worker_config();
477 }
478
479 fn handle_create_dataflow(
480 &mut self,
481 dataflow: DataflowDescription<RenderPlan, CollectionMetadata>,
482 ) {
483 let dataflow_index = Rc::new(self.timely_worker.next_dataflow_index());
484 let as_of = dataflow.as_of.clone().unwrap();
485
486 let dataflow_expiration = dataflow
487 .time_dependence
488 .as_ref()
489 .map(|time_dependence| {
490 self.determine_dataflow_expiration(time_dependence, &dataflow.until)
491 })
492 .unwrap_or_default();
493
494 let until = dataflow.until.meet(&dataflow_expiration);
496
497 if dataflow.is_transient() {
498 debug!(
499 name = %dataflow.debug_name,
500 import_ids = %dataflow.display_import_ids(),
501 export_ids = %dataflow.display_export_ids(),
502 as_of = ?as_of.elements(),
503 time_dependence = ?dataflow.time_dependence,
504 expiration = ?dataflow_expiration.elements(),
505 expiration_datetime = ?dataflow_expiration.as_option().map(|t| mz_ore::now::to_datetime(t.into())),
506 plan_until = ?dataflow.until.elements(),
507 until = ?until.elements(),
508 "creating dataflow",
509 );
510 } else {
511 info!(
512 name = %dataflow.debug_name,
513 import_ids = %dataflow.display_import_ids(),
514 export_ids = %dataflow.display_export_ids(),
515 as_of = ?as_of.elements(),
516 time_dependence = ?dataflow.time_dependence,
517 expiration = ?dataflow_expiration.elements(),
518 expiration_datetime = ?dataflow_expiration.as_option().map(|t| mz_ore::now::to_datetime(t.into())),
519 plan_until = ?dataflow.until.elements(),
520 until = ?until.elements(),
521 "creating dataflow",
522 );
523 };
524
525 let subscribe_copy_ids: BTreeSet<_> = dataflow
526 .subscribe_ids()
527 .chain(dataflow.copy_to_ids())
528 .collect();
529
530 for object_id in dataflow.export_ids() {
532 let is_subscribe_or_copy = subscribe_copy_ids.contains(&object_id);
533 let metrics = self.compute_state.metrics.for_collection(object_id);
534 let mut collection = CollectionState::new(
535 Rc::clone(&dataflow_index),
536 is_subscribe_or_copy,
537 as_of.clone(),
538 metrics,
539 );
540
541 if let Some(logger) = self.compute_state.compute_logger.clone() {
542 let logging = CollectionLogging::new(
543 object_id,
544 logger,
545 *dataflow_index,
546 dataflow.import_ids(),
547 );
548 collection.logging = Some(logging);
549 }
550
551 collection.reset_reported_frontiers(ReportedFrontier::NotReported {
552 lower: as_of.clone(),
553 });
554
555 let existing = self.compute_state.collections.insert(object_id, collection);
556 if existing.is_some() {
557 error!(
558 id = ?object_id,
559 "existing collection for newly created dataflow",
560 );
561 }
562 }
563
564 let (start_signal, suspension_token) = StartSignal::new();
565 for id in dataflow.export_ids() {
566 self.compute_state
567 .suspended_collections
568 .insert(id, Rc::clone(&suspension_token));
569 }
570
571 crate::render::build_compute_dataflow(
572 self.timely_worker,
573 self.compute_state,
574 dataflow,
575 start_signal,
576 until,
577 dataflow_expiration,
578 );
579 }
580
581 fn handle_schedule(&mut self, id: GlobalId) {
582 let suspension_token = self.compute_state.suspended_collections.remove(&id);
588 drop(suspension_token);
589 }
590
591 fn handle_allow_compaction(&mut self, id: GlobalId, frontier: Antichain<Timestamp>) {
592 if frontier.is_empty() {
593 self.drop_collection(id);
595 } else {
596 self.compute_state
597 .traces
598 .allow_compaction(id, frontier.borrow());
599 }
600 }
601
602 #[mz_ore::instrument(level = "debug")]
603 fn handle_peek(&mut self, peek: Peek) {
604 let pending = match &peek.target {
605 PeekTarget::Index { id } => {
606 let trace_bundle = self.compute_state.traces.get(id).unwrap().clone();
608 PendingPeek::index(peek, trace_bundle)
609 }
610 PeekTarget::Persist { metadata, .. } => {
611 let metadata = metadata.clone();
612 PendingPeek::persist(
613 peek,
614 Arc::clone(&self.compute_state.persist_clients),
615 metadata,
616 usize::cast_from(self.compute_state.max_result_size),
617 self.timely_worker,
618 )
619 }
620 };
621
622 if let Some(logger) = self.compute_state.compute_logger.as_mut() {
624 logger.log(&pending.as_log_event(true));
625 }
626
627 self.process_peek(&mut Antichain::new(), pending);
628 }
629
630 fn handle_cancel_peek(&mut self, uuid: Uuid) {
631 if let Some(peek) = self.compute_state.pending_peeks.remove(&uuid) {
632 self.send_peek_response(peek, PeekResponse::Canceled);
633 }
634 }
635
636 fn drop_collection(&mut self, id: GlobalId) {
653 let collection = self
654 .compute_state
655 .collections
656 .remove(&id)
657 .expect("dropped untracked collection");
658
659 self.compute_state.traces.remove(&id);
661 self.compute_state.suspended_collections.remove(&id);
663
664 if ENABLE_ACTIVE_DATAFLOW_CANCELATION.get(&self.compute_state.worker_config) {
665 if let Ok(index) = Rc::try_unwrap(collection.dataflow_index) {
667 self.timely_worker.drop_dataflow(index);
668 }
669 }
670
671 let dropped = DroppedCollection {
673 reported_frontiers: collection.reported_frontiers,
674 is_subscribe_or_copy: collection.is_subscribe_or_copy,
675 };
676 self.compute_state.dropped_collections.push((id, dropped));
677 }
678
679 pub fn initialize_logging(&mut self, config: LoggingConfig) {
681 if self.compute_state.compute_logger.is_some() {
682 panic!("dataflow server has already initialized logging");
683 }
684
685 let LoggingTraces {
686 traces,
687 dataflow_index,
688 compute_logger: logger,
689 } = logging::initialize(self.timely_worker, &config);
690
691 let dataflow_index = Rc::new(dataflow_index);
692 let mut log_index_ids = config.index_logs;
693 for (log, trace) in traces {
694 let id = log_index_ids
696 .remove(&log)
697 .expect("`logging::initialize` does not invent logs");
698 self.compute_state.traces.set(id, trace);
699
700 let is_subscribe_or_copy = false;
702 let as_of = Antichain::from_elem(Timestamp::MIN);
703 let metrics = self.compute_state.metrics.for_collection(id);
704 let mut collection = CollectionState::new(
705 Rc::clone(&dataflow_index),
706 is_subscribe_or_copy,
707 as_of,
708 metrics,
709 );
710
711 let logging =
712 CollectionLogging::new(id, logger.clone(), *dataflow_index, std::iter::empty());
713 collection.logging = Some(logging);
714
715 let existing = self.compute_state.collections.insert(id, collection);
716 if existing.is_some() {
717 error!(
718 id = ?id,
719 "existing collection for newly initialized logging export",
720 );
721 }
722 }
723
724 assert!(
726 log_index_ids.is_empty(),
727 "failed to create requested logging indexes: {log_index_ids:?}",
728 );
729
730 self.compute_state.compute_logger = Some(logger);
731 }
732
733 pub fn report_frontiers(&mut self) {
735 let mut responses = Vec::new();
736
737 let mut new_frontier = Antichain::new();
739
740 for (&id, collection) in self.compute_state.collections.iter_mut() {
741 if collection.is_subscribe_or_copy {
744 continue;
745 }
746
747 let reported = collection.reported_frontiers();
748
749 new_frontier.clear();
751 if let Some(traces) = self.compute_state.traces.get_mut(&id) {
752 assert!(
753 collection.sink_write_frontier.is_none(),
754 "collection {id} has multiple frontiers"
755 );
756 traces.oks_mut().read_upper(&mut new_frontier);
757 } else if let Some(frontier) = &collection.sink_write_frontier {
758 new_frontier.clone_from(&frontier.borrow());
759 } else {
760 error!(id = ?id, "collection without write frontier");
761 continue;
762 }
763 let new_write_frontier = reported
764 .write_frontier
765 .allows_reporting(&new_frontier)
766 .then(|| new_frontier.clone());
767
768 if let Some(probe) = &collection.compute_probe {
777 probe.with_frontier(|frontier| new_frontier.extend(frontier.iter().copied()));
778 }
779 let new_output_frontier = reported
780 .output_frontier
781 .allows_reporting(&new_frontier)
782 .then(|| new_frontier.clone());
783
784 new_frontier.clear();
786 for probe in collection.input_probes.values() {
787 probe.with_frontier(|frontier| new_frontier.extend(frontier.iter().copied()));
788 }
789 let new_input_frontier = reported
790 .input_frontier
791 .allows_reporting(&new_frontier)
792 .then(|| new_frontier.clone());
793
794 if let Some(frontier) = &new_write_frontier {
795 collection
796 .set_reported_write_frontier(ReportedFrontier::Reported(frontier.clone()));
797 }
798 if let Some(frontier) = &new_input_frontier {
799 collection
800 .set_reported_input_frontier(ReportedFrontier::Reported(frontier.clone()));
801 }
802 if let Some(frontier) = &new_output_frontier {
803 collection
804 .set_reported_output_frontier(ReportedFrontier::Reported(frontier.clone()));
805 }
806
807 let response = FrontiersResponse {
808 write_frontier: new_write_frontier,
809 input_frontier: new_input_frontier,
810 output_frontier: new_output_frontier,
811 };
812 if response.has_updates() {
813 responses.push((id, response));
814 }
815 }
816
817 for (id, frontiers) in responses {
818 self.send_compute_response(ComputeResponse::Frontiers(id, frontiers));
819 }
820 }
821
822 pub fn report_dropped_collections(&mut self) {
824 let dropped_collections = std::mem::take(&mut self.compute_state.dropped_collections);
825
826 for (id, collection) in dropped_collections {
827 if collection.is_subscribe_or_copy {
833 continue;
834 }
835
836 let reported = collection.reported_frontiers;
837 let write_frontier = (!reported.write_frontier.is_empty()).then(Antichain::new);
838 let input_frontier = (!reported.input_frontier.is_empty()).then(Antichain::new);
839 let output_frontier = (!reported.output_frontier.is_empty()).then(Antichain::new);
840
841 let frontiers = FrontiersResponse {
842 write_frontier,
843 input_frontier,
844 output_frontier,
845 };
846 if frontiers.has_updates() {
847 self.send_compute_response(ComputeResponse::Frontiers(id, frontiers));
848 }
849 }
850 }
851
852 pub(crate) fn report_metrics(&self) {
854 if let Some(expiration) = self.compute_state.replica_expiration.as_option() {
855 let now = Duration::from_millis(mz_ore::now::SYSTEM_TIME()).as_secs_f64();
856 let expiration = Duration::from_millis(<u64>::from(expiration)).as_secs_f64();
857 let remaining = expiration - now;
858 self.compute_state
859 .metrics
860 .replica_expiration_remaining_seconds
861 .set(remaining)
862 }
863 }
864
865 fn process_peek(&mut self, upper: &mut Antichain<Timestamp>, mut peek: PendingPeek) {
867 let response = match &mut peek {
868 PendingPeek::Index(peek) => {
869 let peek_stash_eligible = peek
870 .peek
871 .finishing
872 .is_streamable(peek.peek.result_desc.arity());
873
874 let peek_stash_enabled = {
875 let enabled = ENABLE_PEEK_RESPONSE_STASH.get(&self.compute_state.worker_config);
876 let peek_persist_stash_available =
877 self.compute_state.peek_stash_persist_location.is_some();
878 if !peek_persist_stash_available && enabled {
879 tracing::error!(
880 "missing peek_stash_persist_location but peek stash is enabled"
881 );
882 }
883 enabled && peek_persist_stash_available
884 };
885
886 let peek_stash_threshold_bytes =
887 PEEK_RESPONSE_STASH_THRESHOLD_BYTES.get(&self.compute_state.worker_config);
888
889 match peek.seek_fulfillment(
890 upper,
891 self.compute_state.max_result_size,
892 peek_stash_enabled && peek_stash_eligible,
893 peek_stash_threshold_bytes,
894 ) {
895 PeekStatus::Ready(result) => Some(result),
896 PeekStatus::NotReady => None,
897 PeekStatus::UsePeekStash => {
898 let _span =
899 span!(parent: &peek.span, Level::DEBUG, "process_stash_peek").entered();
900
901 let peek_stash_batch_max_runs = PEEK_RESPONSE_STASH_BATCH_MAX_RUNS
902 .get(&self.compute_state.worker_config);
903
904 let stash_task = peek_stash::StashingPeek::start_upload(
905 Arc::clone(&self.compute_state.persist_clients),
906 self.compute_state
907 .peek_stash_persist_location
908 .as_ref()
909 .expect("verified above"),
910 peek.peek.clone(),
911 peek.trace_bundle.clone(),
912 peek_stash_batch_max_runs,
913 );
914
915 self.compute_state
916 .pending_peeks
917 .insert(peek.peek.uuid, PendingPeek::Stash(stash_task));
918 return;
919 }
920 }
921 }
922 PendingPeek::Persist(peek) => peek.result.try_recv().ok().map(|(result, duration)| {
923 self.compute_state
924 .metrics
925 .persist_peek_seconds
926 .observe(duration.as_secs_f64());
927 result
928 }),
929 PendingPeek::Stash(stashing_peek) => {
930 let num_batches = PEEK_STASH_NUM_BATCHES.get(&self.compute_state.worker_config);
931 let batch_size = PEEK_STASH_BATCH_SIZE.get(&self.compute_state.worker_config);
932 stashing_peek.pump_rows(num_batches, batch_size);
933
934 if let Ok((response, duration)) = stashing_peek.result.try_recv() {
935 self.compute_state
936 .metrics
937 .stashed_peek_seconds
938 .observe(duration.as_secs_f64());
939 tracing::trace!(?stashing_peek.peek, ?duration, "finished stashing peek response in persist");
940
941 Some(response)
942 } else {
943 None
944 }
945 }
946 };
947
948 if let Some(response) = response {
949 let _span = span!(parent: peek.span(), Level::DEBUG, "process_peek_response").entered();
950 self.send_peek_response(peek, response)
951 } else {
952 let uuid = peek.peek().uuid;
953 self.compute_state.pending_peeks.insert(uuid, peek);
954 }
955 }
956
957 pub fn process_peeks(&mut self) {
959 let mut upper = Antichain::new();
960 let pending_peeks = std::mem::take(&mut self.compute_state.pending_peeks);
961 for (_uuid, peek) in pending_peeks {
962 self.process_peek(&mut upper, peek);
963 }
964 }
965
966 #[mz_ore::instrument(level = "debug")]
971 fn send_peek_response(&mut self, peek: PendingPeek, response: PeekResponse) {
972 let log_event = peek.as_log_event(false);
973 self.send_compute_response(ComputeResponse::PeekResponse(
975 peek.peek().uuid,
976 response,
977 OpenTelemetryContext::obtain(),
978 ));
979
980 if let Some(logger) = self.compute_state.compute_logger.as_mut() {
982 logger.log(&log_event);
983 }
984 }
985
986 pub fn process_subscribes(&mut self) {
988 let mut subscribe_responses = self.compute_state.subscribe_response_buffer.borrow_mut();
989 for (sink_id, mut response) in subscribe_responses.drain(..) {
990 if let Some(collection) = self.compute_state.collections.get_mut(&sink_id) {
992 let new_frontier = match &response {
993 SubscribeResponse::Batch(b) => b.upper.clone(),
994 SubscribeResponse::DroppedAt(_) => Antichain::new(),
995 };
996
997 let reported = collection.reported_frontiers();
998 assert!(
999 reported.write_frontier.allows_reporting(&new_frontier),
1000 "subscribe write frontier regression: {:?} -> {:?}",
1001 reported.write_frontier,
1002 new_frontier,
1003 );
1004 assert!(
1005 reported.input_frontier.allows_reporting(&new_frontier),
1006 "subscribe input frontier regression: {:?} -> {:?}",
1007 reported.input_frontier,
1008 new_frontier,
1009 );
1010
1011 collection
1012 .set_reported_write_frontier(ReportedFrontier::Reported(new_frontier.clone()));
1013 collection
1014 .set_reported_input_frontier(ReportedFrontier::Reported(new_frontier.clone()));
1015 collection.set_reported_output_frontier(ReportedFrontier::Reported(new_frontier));
1016 } else {
1017 }
1020
1021 response
1022 .to_error_if_exceeds(usize::try_from(self.compute_state.max_result_size).unwrap());
1023 self.send_compute_response(ComputeResponse::SubscribeResponse(sink_id, response));
1024 }
1025 }
1026
1027 pub fn process_copy_tos(&self) {
1029 let mut responses = self.compute_state.copy_to_response_buffer.borrow_mut();
1030 for (sink_id, response) in responses.drain(..) {
1031 self.send_compute_response(ComputeResponse::CopyToResponse(sink_id, response));
1032 }
1033 }
1034
1035 fn send_compute_response(&self, response: ComputeResponse) {
1037 let _ = self.response_tx.send(response);
1040 }
1041
1042 pub(crate) fn check_expiration(&self) {
1044 let now = mz_ore::now::SYSTEM_TIME();
1045 if self.compute_state.replica_expiration.less_than(&now.into()) {
1046 let now_datetime = mz_ore::now::to_datetime(now);
1047 let expiration_datetime = self
1048 .compute_state
1049 .replica_expiration
1050 .as_option()
1051 .map(Into::into)
1052 .map(mz_ore::now::to_datetime);
1053
1054 error!(
1057 now,
1058 now_datetime = ?now_datetime,
1059 expiration = ?self.compute_state.replica_expiration.elements(),
1060 expiration_datetime = ?expiration_datetime,
1061 "replica expired"
1062 );
1063
1064 assert!(
1066 !self.compute_state.replica_expiration.less_than(&now.into()),
1067 "replica expired. now: {now} ({now_datetime:?}), expiration: {:?} ({expiration_datetime:?})",
1068 self.compute_state.replica_expiration.elements(),
1069 );
1070 }
1071 }
1072
1073 pub fn determine_dataflow_expiration(
1079 &self,
1080 time_dependence: &TimeDependence,
1081 until: &Antichain<mz_repr::Timestamp>,
1082 ) -> Antichain<mz_repr::Timestamp> {
1083 let iter = self
1088 .compute_state
1089 .replica_expiration
1090 .iter()
1091 .filter_map(|t| time_dependence.apply(*t))
1092 .filter_map(|t| mz_repr::Timestamp::try_step_forward(&t))
1093 .filter(|expiration| !until.less_equal(expiration));
1094 Antichain::from_iter(iter)
1095 }
1096}
1097
1098pub enum PendingPeek {
1103 Index(IndexPeek),
1105 Persist(PersistPeek),
1107 Stash(peek_stash::StashingPeek),
1110}
1111
1112impl PendingPeek {
1113 pub fn as_log_event(&self, installed: bool) -> ComputeEvent {
1115 let peek = self.peek();
1116 let (id, peek_type) = match &peek.target {
1117 PeekTarget::Index { id } => (*id, logging::compute::PeekType::Index),
1118 PeekTarget::Persist { id, .. } => (*id, logging::compute::PeekType::Persist),
1119 };
1120 let uuid = peek.uuid.into_bytes();
1121 ComputeEvent::Peek(PeekEvent {
1122 id,
1123 time: peek.timestamp,
1124 uuid,
1125 peek_type,
1126 installed,
1127 })
1128 }
1129
1130 fn index(peek: Peek, mut trace_bundle: TraceBundle) -> Self {
1131 let empty_frontier = Antichain::new();
1132 let timestamp_frontier = Antichain::from_elem(peek.timestamp);
1133 trace_bundle
1134 .oks_mut()
1135 .set_logical_compaction(timestamp_frontier.borrow());
1136 trace_bundle
1137 .errs_mut()
1138 .set_logical_compaction(timestamp_frontier.borrow());
1139 trace_bundle
1140 .oks_mut()
1141 .set_physical_compaction(empty_frontier.borrow());
1142 trace_bundle
1143 .errs_mut()
1144 .set_physical_compaction(empty_frontier.borrow());
1145
1146 PendingPeek::Index(IndexPeek {
1147 peek,
1148 trace_bundle,
1149 span: tracing::Span::current(),
1150 })
1151 }
1152
1153 fn persist<A: Allocate>(
1154 peek: Peek,
1155 persist_clients: Arc<PersistClientCache>,
1156 metadata: CollectionMetadata,
1157 max_result_size: usize,
1158 timely_worker: &TimelyWorker<A>,
1159 ) -> Self {
1160 let active_worker = {
1161 let chosen_index = usize::cast_from(peek.uuid.hashed()) % timely_worker.peers();
1163 chosen_index == timely_worker.index()
1164 };
1165 let activator = timely_worker.sync_activator_for([].into());
1166 let peek_uuid = peek.uuid;
1167
1168 let (result_tx, result_rx) = oneshot::channel();
1169 let timestamp = peek.timestamp;
1170 let mfp_plan = peek.map_filter_project.clone();
1171 let max_results_needed = peek
1172 .finishing
1173 .limit
1174 .map(|l| usize::cast_from(u64::from(l)))
1175 .unwrap_or(usize::MAX)
1176 + peek.finishing.offset;
1177 let order_by = peek.finishing.order_by.clone();
1178
1179 let literal_constraint = peek
1181 .literal_constraints
1182 .clone()
1183 .map(|rows| rows.into_element());
1184
1185 let task_handle = mz_ore::task::spawn(|| "persist::peek", async move {
1186 let start = Instant::now();
1187 let result = if active_worker {
1188 PersistPeek::do_peek(
1189 &persist_clients,
1190 metadata,
1191 timestamp,
1192 literal_constraint,
1193 mfp_plan,
1194 max_result_size,
1195 max_results_needed,
1196 )
1197 .await
1198 } else {
1199 Ok(vec![])
1200 };
1201 let result = match result {
1202 Ok(rows) => PeekResponse::Rows(RowCollection::new(rows, &order_by)),
1203 Err(e) => PeekResponse::Error(e.to_string()),
1204 };
1205 match result_tx.send((result, start.elapsed())) {
1206 Ok(()) => {}
1207 Err((_result, elapsed)) => {
1208 debug!(duration =? elapsed, "dropping result for cancelled peek {peek_uuid}")
1209 }
1210 }
1211 match activator.activate() {
1212 Ok(()) => {}
1213 Err(_) => {
1214 debug!("unable to wake timely after completed peek {peek_uuid}");
1215 }
1216 }
1217 });
1218 PendingPeek::Persist(PersistPeek {
1219 peek,
1220 _abort_handle: task_handle.abort_on_drop(),
1221 result: result_rx,
1222 span: tracing::Span::current(),
1223 })
1224 }
1225
1226 fn span(&self) -> &tracing::Span {
1227 match self {
1228 PendingPeek::Index(p) => &p.span,
1229 PendingPeek::Persist(p) => &p.span,
1230 PendingPeek::Stash(p) => &p.span,
1231 }
1232 }
1233
1234 pub(crate) fn peek(&self) -> &Peek {
1235 match self {
1236 PendingPeek::Index(p) => &p.peek,
1237 PendingPeek::Persist(p) => &p.peek,
1238 PendingPeek::Stash(p) => &p.peek,
1239 }
1240 }
1241}
1242
1243pub struct PersistPeek {
1248 pub(crate) peek: Peek,
1249 _abort_handle: AbortOnDropHandle<()>,
1252 result: oneshot::Receiver<(PeekResponse, Duration)>,
1254 span: tracing::Span,
1256}
1257
1258impl PersistPeek {
1259 async fn do_peek(
1260 persist_clients: &PersistClientCache,
1261 metadata: CollectionMetadata,
1262 as_of: Timestamp,
1263 literal_constraint: Option<Row>,
1264 mfp_plan: SafeMfpPlan,
1265 max_result_size: usize,
1266 mut limit_remaining: usize,
1267 ) -> Result<Vec<(Row, NonZeroUsize)>, String> {
1268 let client = persist_clients
1269 .open(metadata.persist_location)
1270 .await
1271 .map_err(|e| e.to_string())?;
1272
1273 let mut reader: ReadHandle<SourceData, (), Timestamp, StorageDiff> = client
1274 .open_leased_reader(
1275 metadata.data_shard,
1276 Arc::new(metadata.relation_desc.clone()),
1277 Arc::new(UnitSchema),
1278 Diagnostics::from_purpose("persist::peek"),
1279 USE_CRITICAL_SINCE_SNAPSHOT.get(client.dyncfgs()),
1280 )
1281 .await
1282 .map_err(|e| e.to_string())?;
1283
1284 let mut txns_read = if let Some(txns_id) = metadata.txns_shard {
1291 Some(TxnsCache::open(&client, txns_id, Some(metadata.data_shard)).await)
1292 } else {
1293 None
1294 };
1295
1296 let metrics = client.metrics();
1297
1298 let mut cursor = StatsCursor::new(
1299 &mut reader,
1300 txns_read.as_mut(),
1301 metrics,
1302 &mfp_plan,
1303 &metadata.relation_desc,
1304 Antichain::from_elem(as_of),
1305 )
1306 .await
1307 .map_err(|since| {
1308 format!("attempted to peek at {as_of}, but the since has advanced to {since:?}")
1309 })?;
1310
1311 let mut result = vec![];
1313 let mut datum_vec = DatumVec::new();
1314 let mut row_builder = Row::default();
1315 let arena = RowArena::new();
1316 let mut total_size = 0usize;
1317
1318 let literal_len = match &literal_constraint {
1319 None => 0,
1320 Some(row) => row.iter().count(),
1321 };
1322
1323 'collect: while limit_remaining > 0 {
1324 let Some(batch) = cursor.next().await else {
1325 break;
1326 };
1327 for (data, _, d) in batch {
1328 let row = data.map_err(|e| e.to_string())?;
1329
1330 if let Some(literal) = &literal_constraint {
1331 match row.iter().take(literal_len).cmp(literal.iter()) {
1332 Ordering::Less => continue,
1333 Ordering::Equal => {}
1334 Ordering::Greater => break 'collect,
1335 }
1336 }
1337
1338 let count: usize = d.try_into().map_err(|_| {
1339 tracing::error!(
1340 shard = %metadata.data_shard, diff = d, ?row,
1341 "persist peek encountered negative multiplicities",
1342 );
1343 format!(
1344 "Invalid data in source, \
1345 saw retractions ({}) for row that does not exist: {:?}",
1346 -d, row,
1347 )
1348 })?;
1349 let Some(count) = NonZeroUsize::new(count) else {
1350 continue;
1351 };
1352 let mut datum_local = datum_vec.borrow_with(&row);
1353 let eval_result = mfp_plan
1354 .evaluate_into(&mut datum_local, &arena, &mut row_builder)
1355 .map(|row| row.cloned())
1356 .map_err(|e| e.to_string())?;
1357 if let Some(row) = eval_result {
1358 total_size = total_size
1359 .saturating_add(row.byte_len())
1360 .saturating_add(std::mem::size_of::<NonZeroUsize>());
1361 if total_size > max_result_size {
1362 return Err(format!(
1363 "result exceeds max size of {}",
1364 ByteSize::b(u64::cast_from(max_result_size))
1365 ));
1366 }
1367 result.push((row, count));
1368 limit_remaining = limit_remaining.saturating_sub(count.get());
1369 if limit_remaining == 0 {
1370 break;
1371 }
1372 }
1373 }
1374 }
1375
1376 Ok(result)
1377 }
1378}
1379
1380pub struct IndexPeek {
1382 peek: Peek,
1383 trace_bundle: TraceBundle,
1385 span: tracing::Span,
1387}
1388
1389impl IndexPeek {
1390 fn seek_fulfillment(
1403 &mut self,
1404 upper: &mut Antichain<Timestamp>,
1405 max_result_size: u64,
1406 peek_stash_eligible: bool,
1407 peek_stash_threshold_bytes: usize,
1408 ) -> PeekStatus {
1409 self.trace_bundle.oks_mut().read_upper(upper);
1410 if upper.less_equal(&self.peek.timestamp) {
1411 return PeekStatus::NotReady;
1412 }
1413 self.trace_bundle.errs_mut().read_upper(upper);
1414 if upper.less_equal(&self.peek.timestamp) {
1415 return PeekStatus::NotReady;
1416 }
1417
1418 let read_frontier = self.trace_bundle.compaction_frontier();
1419 if !read_frontier.less_equal(&self.peek.timestamp) {
1420 let error = format!(
1421 "Arrangement compaction frontier ({:?}) is beyond the time of the attempted read ({})",
1422 read_frontier.elements(),
1423 self.peek.timestamp,
1424 );
1425 return PeekStatus::Ready(PeekResponse::Error(error));
1426 }
1427
1428 self.collect_finished_data(
1429 max_result_size,
1430 peek_stash_eligible,
1431 peek_stash_threshold_bytes,
1432 )
1433 }
1434
1435 fn collect_finished_data(
1437 &mut self,
1438 max_result_size: u64,
1439 peek_stash_eligible: bool,
1440 peek_stash_threshold_bytes: usize,
1441 ) -> PeekStatus {
1442 let (mut cursor, storage) = self.trace_bundle.errs_mut().cursor();
1445 while cursor.key_valid(&storage) {
1446 let mut copies = Diff::ZERO;
1447 cursor.map_times(&storage, |time, diff| {
1448 if time.less_equal(&self.peek.timestamp) {
1449 copies += diff;
1450 }
1451 });
1452 if copies.is_negative() {
1453 let error = cursor.key(&storage);
1454 tracing::error!(
1455 target = %self.peek.target.id(), diff = %copies, %error,
1456 "index peek encountered negative multiplicities in error trace",
1457 );
1458 return PeekStatus::Ready(PeekResponse::Error(format!(
1459 "Invalid data in source errors, \
1460 saw retractions ({}) for row that does not exist: {}",
1461 -copies, error,
1462 )));
1463 }
1464 if copies.is_positive() {
1465 return PeekStatus::Ready(PeekResponse::Error(cursor.key(&storage).to_string()));
1466 }
1467 cursor.step_key(&storage);
1468 }
1469
1470 Self::collect_ok_finished_data(
1471 &self.peek,
1472 self.trace_bundle.oks_mut(),
1473 max_result_size,
1474 peek_stash_eligible,
1475 peek_stash_threshold_bytes,
1476 )
1477 }
1478
1479 fn collect_ok_finished_data<Tr>(
1481 peek: &Peek<Timestamp>,
1482 oks_handle: &mut Tr,
1483 max_result_size: u64,
1484 peek_stash_eligible: bool,
1485 peek_stash_threshold_bytes: usize,
1486 ) -> PeekStatus
1487 where
1488 for<'a> Tr: TraceReader<
1489 Key<'a>: ToDatumIter + Eq,
1490 KeyOwn = Row,
1491 Val<'a>: ToDatumIter,
1492 TimeGat<'a>: PartialOrder<mz_repr::Timestamp>,
1493 DiffGat<'a> = &'a Diff,
1494 >,
1495 {
1496 let max_result_size = usize::cast_from(max_result_size);
1497 let count_byte_size = size_of::<NonZeroUsize>();
1498
1499 let mut peek_iterator = peek_result_iterator::PeekResultIterator::new(
1502 peek.target.id().clone(),
1503 peek.map_filter_project.clone(),
1504 peek.timestamp,
1505 peek.literal_constraints.clone().as_deref_mut(),
1506 oks_handle,
1507 );
1508
1509 let mut results = Vec::new();
1511 let mut total_size: usize = 0;
1512
1513 let max_results = peek.finishing.num_rows_needed();
1519
1520 let mut l_datum_vec = DatumVec::new();
1521 let mut r_datum_vec = DatumVec::new();
1522
1523 while let Some(row) = peek_iterator.next() {
1524 let row = match row {
1525 Ok(row) => row,
1526 Err(err) => return PeekStatus::Ready(PeekResponse::Error(err)),
1527 };
1528 let (row, copies) = row;
1529 let copies: NonZeroUsize = NonZeroUsize::try_from(copies).expect("fits into usize");
1530
1531 total_size = total_size
1532 .saturating_add(row.byte_len())
1533 .saturating_add(count_byte_size);
1534 if peek_stash_eligible && total_size > peek_stash_threshold_bytes {
1535 return PeekStatus::UsePeekStash;
1536 }
1537 if total_size > max_result_size {
1538 return PeekStatus::Ready(PeekResponse::Error(format!(
1539 "result exceeds max size of {}",
1540 ByteSize::b(u64::cast_from(max_result_size))
1541 )));
1542 }
1543
1544 results.push((row, copies));
1545
1546 if let Some(max_results) = max_results {
1549 if results.len() >= 2 * max_results {
1553 if peek.finishing.order_by.is_empty() {
1554 results.truncate(max_results);
1555 return PeekStatus::Ready(PeekResponse::Rows(RowCollection::new(
1556 results,
1557 &peek.finishing.order_by,
1558 )));
1559 } else {
1560 results.sort_by(|left, right| {
1569 let left_datums = l_datum_vec.borrow_with(&left.0);
1570 let right_datums = r_datum_vec.borrow_with(&right.0);
1571 mz_expr::compare_columns(
1572 &peek.finishing.order_by,
1573 &left_datums,
1574 &right_datums,
1575 || left.0.cmp(&right.0),
1576 )
1577 });
1578 let dropped = results.drain(max_results..);
1579 let dropped_size =
1580 dropped.into_iter().fold(0, |acc: usize, (row, _count)| {
1581 acc.saturating_add(row.byte_len().saturating_add(count_byte_size))
1582 });
1583 total_size = total_size.saturating_sub(dropped_size);
1584 }
1585 }
1586 }
1587 }
1588
1589 PeekStatus::Ready(PeekResponse::Rows(RowCollection::new(
1590 results,
1591 &peek.finishing.order_by,
1592 )))
1593 }
1594}
1595
1596enum PeekStatus {
1599 NotReady,
1602 UsePeekStash,
1605 Ready(PeekResponse),
1607}
1608
1609#[derive(Debug)]
1611struct ReportedFrontiers {
1612 write_frontier: ReportedFrontier,
1614 input_frontier: ReportedFrontier,
1616 output_frontier: ReportedFrontier,
1618}
1619
1620impl ReportedFrontiers {
1621 fn new() -> Self {
1623 Self {
1624 write_frontier: ReportedFrontier::new(),
1625 input_frontier: ReportedFrontier::new(),
1626 output_frontier: ReportedFrontier::new(),
1627 }
1628 }
1629}
1630
1631#[derive(Clone, Debug)]
1633pub enum ReportedFrontier {
1634 Reported(Antichain<Timestamp>),
1636 NotReported {
1638 lower: Antichain<Timestamp>,
1640 },
1641}
1642
1643impl ReportedFrontier {
1644 pub fn new() -> Self {
1646 let lower = Antichain::from_elem(timely::progress::Timestamp::minimum());
1647 Self::NotReported { lower }
1648 }
1649
1650 pub fn is_empty(&self) -> bool {
1652 match self {
1653 Self::Reported(frontier) => frontier.is_empty(),
1654 Self::NotReported { .. } => false,
1655 }
1656 }
1657
1658 fn allows_reporting(&self, other: &Antichain<Timestamp>) -> bool {
1664 match self {
1665 Self::Reported(frontier) => PartialOrder::less_than(frontier, other),
1666 Self::NotReported { lower } => PartialOrder::less_equal(lower, other),
1667 }
1668 }
1669}
1670
1671pub struct CollectionState {
1673 reported_frontiers: ReportedFrontiers,
1675 dataflow_index: Rc<usize>,
1681 pub is_subscribe_or_copy: bool,
1687 as_of: Antichain<Timestamp>,
1691
1692 pub sink_token: Option<SinkToken>,
1697 pub sink_write_frontier: Option<Rc<RefCell<Antichain<Timestamp>>>>,
1701 pub input_probes: BTreeMap<GlobalId, probe::Handle<Timestamp>>,
1703 pub compute_probe: Option<probe::Handle<Timestamp>>,
1708 logging: Option<CollectionLogging>,
1710 metrics: CollectionMetrics,
1712}
1713
1714impl CollectionState {
1715 fn new(
1716 dataflow_index: Rc<usize>,
1717 is_subscribe_or_copy: bool,
1718 as_of: Antichain<Timestamp>,
1719 metrics: CollectionMetrics,
1720 ) -> Self {
1721 Self {
1722 reported_frontiers: ReportedFrontiers::new(),
1723 dataflow_index,
1724 is_subscribe_or_copy,
1725 as_of,
1726 sink_token: None,
1727 sink_write_frontier: None,
1728 input_probes: Default::default(),
1729 compute_probe: None,
1730 logging: None,
1731 metrics,
1732 }
1733 }
1734
1735 fn reported_frontiers(&self) -> &ReportedFrontiers {
1737 &self.reported_frontiers
1738 }
1739
1740 pub fn reset_reported_frontiers(&mut self, frontier: ReportedFrontier) {
1742 self.reported_frontiers.write_frontier = frontier.clone();
1743 self.reported_frontiers.input_frontier = frontier.clone();
1744 self.reported_frontiers.output_frontier = frontier;
1745 }
1746
1747 fn set_reported_write_frontier(&mut self, frontier: ReportedFrontier) {
1749 if let Some(logging) = &mut self.logging {
1750 let time = match &frontier {
1751 ReportedFrontier::Reported(frontier) => frontier.get(0).copied(),
1752 ReportedFrontier::NotReported { .. } => Some(Timestamp::MIN),
1753 };
1754 logging.set_frontier(time);
1755 }
1756
1757 self.reported_frontiers.write_frontier = frontier;
1758 }
1759
1760 fn set_reported_input_frontier(&mut self, frontier: ReportedFrontier) {
1762 if let Some(logging) = &mut self.logging {
1764 for (id, probe) in &self.input_probes {
1765 let new_time = probe.with_frontier(|frontier| frontier.as_option().copied());
1766 logging.set_import_frontier(*id, new_time);
1767 }
1768 }
1769
1770 self.reported_frontiers.input_frontier = frontier;
1771 }
1772
1773 fn set_reported_output_frontier(&mut self, frontier: ReportedFrontier) {
1775 let already_hydrated = self.hydrated();
1776
1777 self.reported_frontiers.output_frontier = frontier;
1778
1779 if !already_hydrated && self.hydrated() {
1780 if let Some(logging) = &mut self.logging {
1781 logging.set_hydrated();
1782 }
1783 self.metrics.record_collection_hydrated();
1784 }
1785 }
1786
1787 fn hydrated(&self) -> bool {
1789 match &self.reported_frontiers.output_frontier {
1790 ReportedFrontier::Reported(frontier) => PartialOrder::less_than(&self.as_of, frontier),
1791 ReportedFrontier::NotReported { .. } => false,
1792 }
1793 }
1794}
1795
1796pub struct DroppedCollection {
1808 reported_frontiers: ReportedFrontiers,
1809 is_subscribe_or_copy: bool,
1810}