1use std::any::Any;
9use std::cell::RefCell;
10use std::cmp::Ordering;
11use std::collections::{BTreeMap, BTreeSet};
12use std::num::NonZeroUsize;
13
14use std::rc::Rc;
15use std::sync::{Arc, mpsc};
16use std::time::{Duration, Instant};
17
18use bytesize::ByteSize;
19use differential_dataflow::Hashable;
20use differential_dataflow::lattice::Lattice;
21use differential_dataflow::trace::{Cursor, TraceReader};
22use mz_compute_client::logging::LoggingConfig;
23use mz_compute_client::protocol::command::{
24 ComputeCommand, ComputeParameters, InstanceConfig, Peek, PeekTarget,
25};
26use mz_compute_client::protocol::history::ComputeCommandHistory;
27use mz_compute_client::protocol::response::{
28 ComputeResponse, CopyToResponse, FrontiersResponse, OperatorHydrationStatus, PeekResponse,
29 StatusResponse, SubscribeResponse,
30};
31use mz_compute_types::dataflows::DataflowDescription;
32use mz_compute_types::dyncfgs::{
33 ENABLE_ACTIVE_DATAFLOW_CANCELATION, ENABLE_PEEK_RESPONSE_STASH,
34 PEEK_RESPONSE_STASH_BATCH_MAX_RUNS, PEEK_RESPONSE_STASH_THRESHOLD_BYTES, PEEK_STASH_BATCH_SIZE,
35 PEEK_STASH_NUM_BATCHES,
36};
37use mz_compute_types::plan::LirId;
38use mz_compute_types::plan::render_plan::RenderPlan;
39use mz_dyncfg::ConfigSet;
40use mz_expr::SafeMfpPlan;
41use mz_expr::row::RowCollection;
42use mz_ore::cast::CastFrom;
43use mz_ore::collections::CollectionExt;
44use mz_ore::metrics::UIntGauge;
45use mz_ore::now::EpochMillis;
46use mz_ore::task::AbortOnDropHandle;
47use mz_ore::tracing::{OpenTelemetryContext, TracingHandle};
48use mz_persist_client::Diagnostics;
49use mz_persist_client::cache::PersistClientCache;
50use mz_persist_client::cfg::USE_CRITICAL_SINCE_SNAPSHOT;
51use mz_persist_client::read::ReadHandle;
52use mz_persist_types::PersistLocation;
53use mz_persist_types::codec_impls::UnitSchema;
54use mz_repr::fixed_length::ToDatumIter;
55use mz_repr::{DatumVec, Diff, GlobalId, Row, RowArena, Timestamp};
56use mz_storage_operators::stats::StatsCursor;
57use mz_storage_types::StorageDiff;
58use mz_storage_types::controller::CollectionMetadata;
59use mz_storage_types::dyncfgs::ORE_OVERFLOWING_BEHAVIOR;
60use mz_storage_types::sources::SourceData;
61use mz_storage_types::time_dependence::TimeDependence;
62use mz_txn_wal::operator::TxnsContext;
63use mz_txn_wal::txn_cache::TxnsCache;
64use timely::communication::Allocate;
65use timely::dataflow::operators::probe;
66use timely::order::PartialOrder;
67use timely::progress::frontier::Antichain;
68use timely::scheduling::Scheduler;
69use timely::worker::Worker as TimelyWorker;
70use tokio::sync::{oneshot, watch};
71use tracing::{Level, debug, error, info, span, warn};
72use uuid::Uuid;
73
74use crate::arrangement::manager::{TraceBundle, TraceManager};
75use crate::logging;
76use crate::logging::compute::{CollectionLogging, ComputeEvent, PeekEvent};
77use crate::logging::initialize::LoggingTraces;
78use crate::metrics::{CollectionMetrics, WorkerMetrics};
79use crate::render::{LinearJoinSpec, StartSignal};
80use crate::server::{ComputeInstanceContext, ResponseSender};
81
82mod peek_result_iterator;
83mod peek_stash;
84
85pub struct ComputeState {
90 pub collections: BTreeMap<GlobalId, CollectionState>,
99 pub dropped_collections: Vec<(GlobalId, DroppedCollection)>,
101 pub traces: TraceManager,
103 pub subscribe_response_buffer: Rc<RefCell<Vec<(GlobalId, SubscribeResponse)>>>,
108 pub copy_to_response_buffer: Rc<RefCell<Vec<(GlobalId, CopyToResponse)>>>,
113 pub pending_peeks: BTreeMap<Uuid, PendingPeek>,
115 pub peek_stash_persist_location: Option<PersistLocation>,
117 pub compute_logger: Option<logging::compute::Logger>,
119 pub persist_clients: Arc<PersistClientCache>,
122 pub txns_ctx: TxnsContext,
124 pub command_history: ComputeCommandHistory<UIntGauge>,
126 max_result_size: u64,
128 pub linear_join_spec: LinearJoinSpec,
130 pub metrics: WorkerMetrics,
132 tracing_handle: Arc<TracingHandle>,
134 pub context: ComputeInstanceContext,
136 pub worker_config: Rc<ConfigSet>,
150
151 pub hydration_rx: mpsc::Receiver<HydrationEvent>,
153 pub hydration_tx: mpsc::Sender<HydrationEvent>,
157
158 suspended_collections: BTreeMap<GlobalId, Rc<dyn Any>>,
164
165 pub read_only_rx: watch::Receiver<bool>,
176
177 pub read_only_tx: watch::Sender<bool>,
179
180 pub server_maintenance_interval: Duration,
183
184 pub init_system_time: EpochMillis,
188
189 pub replica_expiration: Antichain<Timestamp>,
193}
194
195impl ComputeState {
196 pub fn new(
198 persist_clients: Arc<PersistClientCache>,
199 txns_ctx: TxnsContext,
200 metrics: WorkerMetrics,
201 tracing_handle: Arc<TracingHandle>,
202 context: ComputeInstanceContext,
203 ) -> Self {
204 let traces = TraceManager::new(metrics.clone());
205 let command_history = ComputeCommandHistory::new(metrics.for_history());
206 let (hydration_tx, hydration_rx) = mpsc::channel();
207
208 let (read_only_tx, read_only_rx) = watch::channel(true);
211
212 Self {
213 collections: Default::default(),
214 dropped_collections: Default::default(),
215 traces,
216 subscribe_response_buffer: Default::default(),
217 copy_to_response_buffer: Default::default(),
218 pending_peeks: Default::default(),
219 peek_stash_persist_location: None,
220 compute_logger: None,
221 persist_clients,
222 txns_ctx,
223 command_history,
224 max_result_size: u64::MAX,
225 linear_join_spec: Default::default(),
226 metrics,
227 tracing_handle,
228 context,
229 worker_config: mz_dyncfgs::all_dyncfgs().into(),
230 hydration_rx,
231 hydration_tx,
232 suspended_collections: Default::default(),
233 read_only_tx,
234 read_only_rx,
235 server_maintenance_interval: Duration::ZERO,
236 init_system_time: mz_ore::now::SYSTEM_TIME(),
237 replica_expiration: Antichain::default(),
238 }
239 }
240
241 pub fn expect_collection_mut(&mut self, id: GlobalId) -> &mut CollectionState {
245 self.collections
246 .get_mut(&id)
247 .expect("collection must exist")
248 }
249
250 pub fn input_probe_for(
256 &mut self,
257 input_id: GlobalId,
258 collection_ids: impl Iterator<Item = GlobalId>,
259 ) -> probe::Handle<Timestamp> {
260 let probe = probe::Handle::default();
261 for id in collection_ids {
262 if let Some(collection) = self.collections.get_mut(&id) {
263 collection.input_probes.insert(input_id, probe.clone());
264 }
265 }
266 probe
267 }
268
269 fn apply_worker_config(&mut self) {
271 use mz_compute_types::dyncfgs::*;
272
273 let config = &self.worker_config;
274
275 self.linear_join_spec = LinearJoinSpec::from_config(config);
276
277 if ENABLE_LGALLOC.get(config) {
278 if let Some(path) = &self.context.scratch_directory {
279 let clear_bytes = LGALLOC_SLOW_CLEAR_BYTES.get(config);
280 let eager_return = ENABLE_LGALLOC_EAGER_RECLAMATION.get(config);
281 let file_growth_dampener = LGALLOC_FILE_GROWTH_DAMPENER.get(config);
282 let interval = LGALLOC_BACKGROUND_INTERVAL.get(config);
283 let local_buffer_bytes = LGALLOC_LOCAL_BUFFER_BYTES.get(config);
284 info!(
285 ?path,
286 backgrund_interval=?interval,
287 clear_bytes,
288 eager_return,
289 file_growth_dampener,
290 local_buffer_bytes,
291 "enabling lgalloc"
292 );
293 let background_worker_config = lgalloc::BackgroundWorkerConfig {
294 interval,
295 clear_bytes,
296 };
297 lgalloc::lgalloc_set_config(
298 lgalloc::LgAlloc::new()
299 .enable()
300 .with_path(path.clone())
301 .with_background_config(background_worker_config)
302 .eager_return(eager_return)
303 .file_growth_dampener(file_growth_dampener)
304 .local_buffer_bytes(local_buffer_bytes),
305 );
306 crate::lgalloc::apply_limiter_config(config);
307 } else {
308 debug!("not enabling lgalloc, scratch directory not specified");
309 }
310 } else {
311 info!("disabling lgalloc");
312 lgalloc::lgalloc_set_config(lgalloc::LgAlloc::new().disable());
313 }
314
315 crate::memory_limiter::apply_limiter_config(config);
316
317 mz_ore::region::ENABLE_LGALLOC_REGION.store(
318 ENABLE_COLUMNATION_LGALLOC.get(config),
319 std::sync::atomic::Ordering::Relaxed,
320 );
321
322 let enable_columnar_lgalloc = ENABLE_COLUMNAR_LGALLOC.get(config);
323 mz_timely_util::containers::set_enable_columnar_lgalloc(enable_columnar_lgalloc);
324
325 self.server_maintenance_interval = COMPUTE_SERVER_MAINTENANCE_INTERVAL.get(config);
328
329 let overflowing_behavior = ORE_OVERFLOWING_BEHAVIOR.get(config);
330 match overflowing_behavior.parse() {
331 Ok(behavior) => mz_ore::overflowing::set_behavior(behavior),
332 Err(err) => {
333 error!(
334 err,
335 overflowing_behavior, "Invalid value for ore_overflowing_behavior"
336 );
337 }
338 }
339 }
340
341 pub fn apply_expiration_offset(&mut self, offset: Duration) {
347 if self.replica_expiration.is_empty() {
348 let offset: EpochMillis = offset
349 .as_millis()
350 .try_into()
351 .expect("duration must fit within u64");
352 let replica_expiration_millis = self.init_system_time + offset;
353 let replica_expiration = Timestamp::from(replica_expiration_millis);
354
355 info!(
356 offset = %offset,
357 replica_expiration_millis = %replica_expiration_millis,
358 replica_expiration_utc = %mz_ore::now::to_datetime(replica_expiration_millis),
359 "setting replica expiration",
360 );
361 self.replica_expiration = Antichain::from_elem(replica_expiration);
362
363 self.metrics
365 .replica_expiration_timestamp_seconds
366 .set(replica_expiration.into());
367 }
368 }
369
370 pub fn dataflow_max_inflight_bytes(&self) -> Option<usize> {
373 use mz_compute_types::dyncfgs::{
374 DATAFLOW_MAX_INFLIGHT_BYTES, DATAFLOW_MAX_INFLIGHT_BYTES_CC,
375 };
376
377 if self.persist_clients.cfg.is_cc_active {
378 DATAFLOW_MAX_INFLIGHT_BYTES_CC.get(&self.worker_config)
379 } else {
380 DATAFLOW_MAX_INFLIGHT_BYTES.get(&self.worker_config)
381 }
382 }
383}
384
385pub(crate) struct ActiveComputeState<'a, A: Allocate> {
387 pub timely_worker: &'a mut TimelyWorker<A>,
389 pub compute_state: &'a mut ComputeState,
391 pub response_tx: &'a mut ResponseSender,
393}
394
395pub struct SinkToken(#[allow(dead_code)] Box<dyn Any>);
397
398impl SinkToken {
399 pub fn new(t: Box<dyn Any>) -> Self {
401 Self(t)
402 }
403}
404
405impl<'a, A: Allocate + 'static> ActiveComputeState<'a, A> {
406 #[mz_ore::instrument(level = "debug")]
408 pub fn handle_compute_command(&mut self, cmd: ComputeCommand) {
409 use ComputeCommand::*;
410
411 self.compute_state.command_history.push(cmd.clone());
412
413 let timer = self
415 .compute_state
416 .metrics
417 .handle_command_duration_seconds
418 .for_command(&cmd)
419 .start_timer();
420
421 match cmd {
422 Hello { .. } => panic!("Hello must be captured before"),
423 CreateInstance(instance_config) => self.handle_create_instance(*instance_config),
424 InitializationComplete => (),
425 UpdateConfiguration(params) => self.handle_update_configuration(*params),
426 CreateDataflow(dataflow) => self.handle_create_dataflow(*dataflow),
427 Schedule(id) => self.handle_schedule(id),
428 AllowCompaction { id, frontier } => self.handle_allow_compaction(id, frontier),
429 Peek(peek) => {
430 peek.otel_ctx.attach_as_parent();
431 self.handle_peek(*peek)
432 }
433 CancelPeek { uuid } => self.handle_cancel_peek(uuid),
434 AllowWrites => {
435 self.compute_state
436 .read_only_tx
437 .send(false)
438 .expect("we're holding one other end");
439 self.compute_state.persist_clients.cfg().enable_compaction();
440 }
441 }
442
443 timer.observe_duration();
444 }
445
446 fn handle_create_instance(&mut self, config: InstanceConfig) {
447 self.compute_state.apply_worker_config();
449 if let Some(offset) = config.expiration_offset {
450 self.compute_state.apply_expiration_offset(offset);
451 }
452
453 self.initialize_logging(config.logging);
454
455 self.compute_state.peek_stash_persist_location = Some(config.peek_stash_persist_location);
456 }
457
458 fn handle_update_configuration(&mut self, params: ComputeParameters) {
459 info!("Applying configuration update: {params:?}");
460
461 let ComputeParameters {
462 workload_class,
463 max_result_size,
464 tracing,
465 grpc_client: _grpc_client,
466 dyncfg_updates,
467 } = params;
468
469 if let Some(v) = workload_class {
470 self.compute_state.metrics.set_workload_class(v);
471 }
472 if let Some(v) = max_result_size {
473 self.compute_state.max_result_size = v;
474 }
475
476 tracing.apply(self.compute_state.tracing_handle.as_ref());
477
478 dyncfg_updates.apply(&self.compute_state.worker_config);
479 self.compute_state
480 .persist_clients
481 .cfg()
482 .apply_from(&dyncfg_updates);
483
484 mz_metrics::update_dyncfg(&dyncfg_updates);
488
489 self.compute_state.apply_worker_config();
490 }
491
492 fn handle_create_dataflow(
493 &mut self,
494 dataflow: DataflowDescription<RenderPlan, CollectionMetadata>,
495 ) {
496 let dataflow_index = Rc::new(self.timely_worker.next_dataflow_index());
497 let as_of = dataflow.as_of.clone().unwrap();
498
499 let dataflow_expiration = dataflow
500 .time_dependence
501 .as_ref()
502 .map(|time_dependence| {
503 self.determine_dataflow_expiration(time_dependence, &dataflow.until)
504 })
505 .unwrap_or_default();
506
507 let until = dataflow.until.meet(&dataflow_expiration);
509
510 if dataflow.is_transient() {
511 debug!(
512 name = %dataflow.debug_name,
513 import_ids = %dataflow.display_import_ids(),
514 export_ids = %dataflow.display_export_ids(),
515 as_of = ?as_of.elements(),
516 time_dependence = ?dataflow.time_dependence,
517 expiration = ?dataflow_expiration.elements(),
518 expiration_datetime = ?dataflow_expiration.as_option().map(|t| mz_ore::now::to_datetime(t.into())),
519 plan_until = ?dataflow.until.elements(),
520 until = ?until.elements(),
521 "creating dataflow",
522 );
523 } else {
524 info!(
525 name = %dataflow.debug_name,
526 import_ids = %dataflow.display_import_ids(),
527 export_ids = %dataflow.display_export_ids(),
528 as_of = ?as_of.elements(),
529 time_dependence = ?dataflow.time_dependence,
530 expiration = ?dataflow_expiration.elements(),
531 expiration_datetime = ?dataflow_expiration.as_option().map(|t| mz_ore::now::to_datetime(t.into())),
532 plan_until = ?dataflow.until.elements(),
533 until = ?until.elements(),
534 "creating dataflow",
535 );
536 };
537
538 let subscribe_copy_ids: BTreeSet<_> = dataflow
539 .subscribe_ids()
540 .chain(dataflow.copy_to_ids())
541 .collect();
542
543 for object_id in dataflow.export_ids() {
545 let is_subscribe_or_copy = subscribe_copy_ids.contains(&object_id);
546 let metrics = self.compute_state.metrics.for_collection(object_id);
547 let mut collection = CollectionState::new(
548 Rc::clone(&dataflow_index),
549 is_subscribe_or_copy,
550 as_of.clone(),
551 metrics,
552 );
553
554 if let Some(logger) = self.compute_state.compute_logger.clone() {
555 let logging = CollectionLogging::new(
556 object_id,
557 logger,
558 *dataflow_index,
559 dataflow.import_ids(),
560 );
561 collection.logging = Some(logging);
562 }
563
564 collection.reset_reported_frontiers(ReportedFrontier::NotReported {
565 lower: as_of.clone(),
566 });
567
568 let existing = self.compute_state.collections.insert(object_id, collection);
569 if existing.is_some() {
570 error!(
571 id = ?object_id,
572 "existing collection for newly created dataflow",
573 );
574 }
575 }
576
577 let (start_signal, suspension_token) = StartSignal::new();
578 for id in dataflow.export_ids() {
579 self.compute_state
580 .suspended_collections
581 .insert(id, Rc::clone(&suspension_token));
582 }
583
584 crate::render::build_compute_dataflow(
585 self.timely_worker,
586 self.compute_state,
587 dataflow,
588 start_signal,
589 until,
590 dataflow_expiration,
591 );
592 }
593
594 fn handle_schedule(&mut self, id: GlobalId) {
595 let suspension_token = self.compute_state.suspended_collections.remove(&id);
601 drop(suspension_token);
602 }
603
604 fn handle_allow_compaction(&mut self, id: GlobalId, frontier: Antichain<Timestamp>) {
605 if frontier.is_empty() {
606 self.drop_collection(id);
608 } else {
609 self.compute_state
610 .traces
611 .allow_compaction(id, frontier.borrow());
612 }
613 }
614
615 #[mz_ore::instrument(level = "debug")]
616 fn handle_peek(&mut self, peek: Peek) {
617 let pending = match &peek.target {
618 PeekTarget::Index { id } => {
619 let trace_bundle = self.compute_state.traces.get(id).unwrap().clone();
621 PendingPeek::index(peek, trace_bundle)
622 }
623 PeekTarget::Persist { metadata, .. } => {
624 let metadata = metadata.clone();
625 PendingPeek::persist(
626 peek,
627 Arc::clone(&self.compute_state.persist_clients),
628 metadata,
629 usize::cast_from(self.compute_state.max_result_size),
630 self.timely_worker,
631 )
632 }
633 };
634
635 if let Some(logger) = self.compute_state.compute_logger.as_mut() {
637 logger.log(&pending.as_log_event(true));
638 }
639
640 self.process_peek(&mut Antichain::new(), pending);
641 }
642
643 fn handle_cancel_peek(&mut self, uuid: Uuid) {
644 if let Some(peek) = self.compute_state.pending_peeks.remove(&uuid) {
645 self.send_peek_response(peek, PeekResponse::Canceled);
646 }
647 }
648
649 fn drop_collection(&mut self, id: GlobalId) {
666 let collection = self
667 .compute_state
668 .collections
669 .remove(&id)
670 .expect("dropped untracked collection");
671
672 self.compute_state.traces.remove(&id);
674 self.compute_state.suspended_collections.remove(&id);
676
677 if ENABLE_ACTIVE_DATAFLOW_CANCELATION.get(&self.compute_state.worker_config) {
678 if let Ok(index) = Rc::try_unwrap(collection.dataflow_index) {
680 self.timely_worker.drop_dataflow(index);
681 }
682 }
683
684 let dropped = DroppedCollection {
686 reported_frontiers: collection.reported_frontiers,
687 is_subscribe_or_copy: collection.is_subscribe_or_copy,
688 };
689 self.compute_state.dropped_collections.push((id, dropped));
690 }
691
692 pub fn initialize_logging(&mut self, config: LoggingConfig) {
694 if self.compute_state.compute_logger.is_some() {
695 panic!("dataflow server has already initialized logging");
696 }
697
698 let LoggingTraces {
699 traces,
700 dataflow_index,
701 compute_logger: logger,
702 } = logging::initialize(self.timely_worker, &config);
703
704 let dataflow_index = Rc::new(dataflow_index);
705 let mut log_index_ids = config.index_logs;
706 for (log, trace) in traces {
707 let id = log_index_ids
709 .remove(&log)
710 .expect("`logging::initialize` does not invent logs");
711 self.compute_state.traces.set(id, trace);
712
713 let is_subscribe_or_copy = false;
715 let as_of = Antichain::from_elem(Timestamp::MIN);
716 let metrics = self.compute_state.metrics.for_collection(id);
717 let mut collection = CollectionState::new(
718 Rc::clone(&dataflow_index),
719 is_subscribe_or_copy,
720 as_of,
721 metrics,
722 );
723
724 let logging =
725 CollectionLogging::new(id, logger.clone(), *dataflow_index, std::iter::empty());
726 collection.logging = Some(logging);
727
728 let existing = self.compute_state.collections.insert(id, collection);
729 if existing.is_some() {
730 error!(
731 id = ?id,
732 "existing collection for newly initialized logging export",
733 );
734 }
735 }
736
737 assert!(
739 log_index_ids.is_empty(),
740 "failed to create requested logging indexes: {log_index_ids:?}",
741 );
742
743 self.compute_state.compute_logger = Some(logger);
744 }
745
746 pub fn report_frontiers(&mut self) {
748 let mut responses = Vec::new();
749
750 let mut new_frontier = Antichain::new();
752
753 for (&id, collection) in self.compute_state.collections.iter_mut() {
754 if collection.is_subscribe_or_copy {
757 continue;
758 }
759
760 let reported = collection.reported_frontiers();
761
762 new_frontier.clear();
764 if let Some(traces) = self.compute_state.traces.get_mut(&id) {
765 assert!(
766 collection.sink_write_frontier.is_none(),
767 "collection {id} has multiple frontiers"
768 );
769 traces.oks_mut().read_upper(&mut new_frontier);
770 } else if let Some(frontier) = &collection.sink_write_frontier {
771 new_frontier.clone_from(&frontier.borrow());
772 } else {
773 error!(id = ?id, "collection without write frontier");
774 continue;
775 }
776 let new_write_frontier = reported
777 .write_frontier
778 .allows_reporting(&new_frontier)
779 .then(|| new_frontier.clone());
780
781 if let Some(probe) = &collection.compute_probe {
790 probe.with_frontier(|frontier| new_frontier.extend(frontier.iter().copied()));
791 }
792 let new_output_frontier = reported
793 .output_frontier
794 .allows_reporting(&new_frontier)
795 .then(|| new_frontier.clone());
796
797 new_frontier.clear();
799 for probe in collection.input_probes.values() {
800 probe.with_frontier(|frontier| new_frontier.extend(frontier.iter().copied()));
801 }
802 let new_input_frontier = reported
803 .input_frontier
804 .allows_reporting(&new_frontier)
805 .then(|| new_frontier.clone());
806
807 if let Some(frontier) = &new_write_frontier {
808 collection
809 .set_reported_write_frontier(ReportedFrontier::Reported(frontier.clone()));
810 }
811 if let Some(frontier) = &new_input_frontier {
812 collection
813 .set_reported_input_frontier(ReportedFrontier::Reported(frontier.clone()));
814 }
815 if let Some(frontier) = &new_output_frontier {
816 collection
817 .set_reported_output_frontier(ReportedFrontier::Reported(frontier.clone()));
818 }
819
820 let response = FrontiersResponse {
821 write_frontier: new_write_frontier,
822 input_frontier: new_input_frontier,
823 output_frontier: new_output_frontier,
824 };
825 if response.has_updates() {
826 responses.push((id, response));
827 }
828 }
829
830 for (id, frontiers) in responses {
831 self.send_compute_response(ComputeResponse::Frontiers(id, frontiers));
832 }
833 }
834
835 pub fn report_dropped_collections(&mut self) {
837 let dropped_collections = std::mem::take(&mut self.compute_state.dropped_collections);
838
839 for (id, collection) in dropped_collections {
840 if collection.is_subscribe_or_copy {
846 continue;
847 }
848
849 let reported = collection.reported_frontiers;
850 let write_frontier = (!reported.write_frontier.is_empty()).then(Antichain::new);
851 let input_frontier = (!reported.input_frontier.is_empty()).then(Antichain::new);
852 let output_frontier = (!reported.output_frontier.is_empty()).then(Antichain::new);
853
854 let frontiers = FrontiersResponse {
855 write_frontier,
856 input_frontier,
857 output_frontier,
858 };
859 if frontiers.has_updates() {
860 self.send_compute_response(ComputeResponse::Frontiers(id, frontiers));
861 }
862 }
863 }
864
865 pub fn report_operator_hydration(&self) {
867 let worker_id = self.timely_worker.index();
868 for event in self.compute_state.hydration_rx.try_iter() {
869 let collection = self.compute_state.collections.get(&event.export_id);
872 if collection.map_or(true, |c| c.reported_frontiers().all_empty()) {
873 continue;
874 }
875
876 let status = OperatorHydrationStatus {
877 collection_id: event.export_id,
878 lir_id: event.lir_id,
879 worker_id,
880 hydrated: event.hydrated,
881 };
882 let response = ComputeResponse::Status(StatusResponse::OperatorHydration(status));
883 self.send_compute_response(response);
884 }
885 }
886
887 pub(crate) fn report_metrics(&self) {
889 if let Some(expiration) = self.compute_state.replica_expiration.as_option() {
890 let now = Duration::from_millis(mz_ore::now::SYSTEM_TIME()).as_secs_f64();
891 let expiration = Duration::from_millis(<u64>::from(expiration)).as_secs_f64();
892 let remaining = expiration - now;
893 self.compute_state
894 .metrics
895 .replica_expiration_remaining_seconds
896 .set(remaining)
897 }
898 }
899
900 fn process_peek(&mut self, upper: &mut Antichain<Timestamp>, mut peek: PendingPeek) {
902 let response = match &mut peek {
903 PendingPeek::Index(peek) => {
904 let peek_stash_eligible = peek
905 .peek
906 .finishing
907 .is_streamable(peek.peek.result_desc.arity());
908
909 let peek_stash_enabled = {
910 let enabled = ENABLE_PEEK_RESPONSE_STASH.get(&self.compute_state.worker_config);
911 let peek_persist_stash_available =
912 self.compute_state.peek_stash_persist_location.is_some();
913 if !peek_persist_stash_available && enabled {
914 tracing::error!(
915 "missing peek_stash_persist_location but peek stash is enabled"
916 );
917 }
918 enabled && peek_persist_stash_available
919 };
920
921 let peek_stash_threshold_bytes =
922 PEEK_RESPONSE_STASH_THRESHOLD_BYTES.get(&self.compute_state.worker_config);
923
924 match peek.seek_fulfillment(
925 upper,
926 self.compute_state.max_result_size,
927 peek_stash_enabled && peek_stash_eligible,
928 peek_stash_threshold_bytes,
929 ) {
930 PeekStatus::Ready(result) => Some(result),
931 PeekStatus::NotReady => None,
932 PeekStatus::UsePeekStash => {
933 let _span =
934 span!(parent: &peek.span, Level::DEBUG, "process_stash_peek").entered();
935
936 let peek_stash_batch_max_runs = PEEK_RESPONSE_STASH_BATCH_MAX_RUNS
937 .get(&self.compute_state.worker_config);
938
939 let stash_task = peek_stash::StashingPeek::start_upload(
940 Arc::clone(&self.compute_state.persist_clients),
941 self.compute_state
942 .peek_stash_persist_location
943 .as_ref()
944 .expect("verified above"),
945 peek.peek.clone(),
946 peek.trace_bundle.clone(),
947 peek_stash_batch_max_runs,
948 );
949
950 self.compute_state
951 .pending_peeks
952 .insert(peek.peek.uuid, PendingPeek::Stash(stash_task));
953 return;
954 }
955 }
956 }
957 PendingPeek::Persist(peek) => peek.result.try_recv().ok().map(|(result, duration)| {
958 self.compute_state
959 .metrics
960 .persist_peek_seconds
961 .observe(duration.as_secs_f64());
962 result
963 }),
964 PendingPeek::Stash(stashing_peek) => {
965 let num_batches = PEEK_STASH_NUM_BATCHES.get(&self.compute_state.worker_config);
966 let batch_size = PEEK_STASH_BATCH_SIZE.get(&self.compute_state.worker_config);
967 stashing_peek.pump_rows(num_batches, batch_size);
968
969 if let Ok((response, duration)) = stashing_peek.result.try_recv() {
970 self.compute_state
971 .metrics
972 .stashed_peek_seconds
973 .observe(duration.as_secs_f64());
974 tracing::trace!(?stashing_peek.peek, ?duration, "finished stashing peek response in persist");
975
976 Some(response)
977 } else {
978 None
979 }
980 }
981 };
982
983 if let Some(response) = response {
984 let _span = span!(parent: peek.span(), Level::DEBUG, "process_peek_response").entered();
985 self.send_peek_response(peek, response)
986 } else {
987 let uuid = peek.peek().uuid;
988 self.compute_state.pending_peeks.insert(uuid, peek);
989 }
990 }
991
992 pub fn process_peeks(&mut self) {
994 let mut upper = Antichain::new();
995 let pending_peeks = std::mem::take(&mut self.compute_state.pending_peeks);
996 for (_uuid, peek) in pending_peeks {
997 self.process_peek(&mut upper, peek);
998 }
999 }
1000
1001 #[mz_ore::instrument(level = "debug")]
1006 fn send_peek_response(&mut self, peek: PendingPeek, response: PeekResponse) {
1007 let log_event = peek.as_log_event(false);
1008 self.send_compute_response(ComputeResponse::PeekResponse(
1010 peek.peek().uuid,
1011 response,
1012 OpenTelemetryContext::obtain(),
1013 ));
1014
1015 if let Some(logger) = self.compute_state.compute_logger.as_mut() {
1017 logger.log(&log_event);
1018 }
1019 }
1020
1021 pub fn process_subscribes(&mut self) {
1023 let mut subscribe_responses = self.compute_state.subscribe_response_buffer.borrow_mut();
1024 for (sink_id, mut response) in subscribe_responses.drain(..) {
1025 if let Some(collection) = self.compute_state.collections.get_mut(&sink_id) {
1027 let new_frontier = match &response {
1028 SubscribeResponse::Batch(b) => b.upper.clone(),
1029 SubscribeResponse::DroppedAt(_) => Antichain::new(),
1030 };
1031
1032 let reported = collection.reported_frontiers();
1033 assert!(
1034 reported.write_frontier.allows_reporting(&new_frontier),
1035 "subscribe write frontier regression: {:?} -> {:?}",
1036 reported.write_frontier,
1037 new_frontier,
1038 );
1039 assert!(
1040 reported.input_frontier.allows_reporting(&new_frontier),
1041 "subscribe input frontier regression: {:?} -> {:?}",
1042 reported.input_frontier,
1043 new_frontier,
1044 );
1045
1046 collection
1047 .set_reported_write_frontier(ReportedFrontier::Reported(new_frontier.clone()));
1048 collection
1049 .set_reported_input_frontier(ReportedFrontier::Reported(new_frontier.clone()));
1050 collection.set_reported_output_frontier(ReportedFrontier::Reported(new_frontier));
1051 } else {
1052 }
1055
1056 response
1057 .to_error_if_exceeds(usize::try_from(self.compute_state.max_result_size).unwrap());
1058 self.send_compute_response(ComputeResponse::SubscribeResponse(sink_id, response));
1059 }
1060 }
1061
1062 pub fn process_copy_tos(&self) {
1064 let mut responses = self.compute_state.copy_to_response_buffer.borrow_mut();
1065 for (sink_id, response) in responses.drain(..) {
1066 self.send_compute_response(ComputeResponse::CopyToResponse(sink_id, response));
1067 }
1068 }
1069
1070 fn send_compute_response(&self, response: ComputeResponse) {
1072 let _ = self.response_tx.send(response);
1075 }
1076
1077 pub(crate) fn check_expiration(&self) {
1079 let now = mz_ore::now::SYSTEM_TIME();
1080 if self.compute_state.replica_expiration.less_than(&now.into()) {
1081 let now_datetime = mz_ore::now::to_datetime(now);
1082 let expiration_datetime = self
1083 .compute_state
1084 .replica_expiration
1085 .as_option()
1086 .map(Into::into)
1087 .map(mz_ore::now::to_datetime);
1088
1089 error!(
1092 now,
1093 now_datetime = ?now_datetime,
1094 expiration = ?self.compute_state.replica_expiration.elements(),
1095 expiration_datetime = ?expiration_datetime,
1096 "replica expired"
1097 );
1098
1099 assert!(
1101 !self.compute_state.replica_expiration.less_than(&now.into()),
1102 "replica expired. now: {now} ({now_datetime:?}), expiration: {:?} ({expiration_datetime:?})",
1103 self.compute_state.replica_expiration.elements(),
1104 );
1105 }
1106 }
1107
1108 pub fn determine_dataflow_expiration(
1114 &self,
1115 time_dependence: &TimeDependence,
1116 until: &Antichain<mz_repr::Timestamp>,
1117 ) -> Antichain<mz_repr::Timestamp> {
1118 let iter = self
1123 .compute_state
1124 .replica_expiration
1125 .iter()
1126 .filter_map(|t| time_dependence.apply(*t))
1127 .filter_map(|t| mz_repr::Timestamp::try_step_forward(&t))
1128 .filter(|expiration| !until.less_equal(expiration));
1129 Antichain::from_iter(iter)
1130 }
1131}
1132
1133pub enum PendingPeek {
1138 Index(IndexPeek),
1140 Persist(PersistPeek),
1142 Stash(peek_stash::StashingPeek),
1145}
1146
1147impl PendingPeek {
1148 pub fn as_log_event(&self, installed: bool) -> ComputeEvent {
1150 let peek = self.peek();
1151 let (id, peek_type) = match &peek.target {
1152 PeekTarget::Index { id } => (id, logging::compute::PeekType::Index),
1153 PeekTarget::Persist { id, .. } => (id, logging::compute::PeekType::Persist),
1154 };
1155 ComputeEvent::Peek(PeekEvent {
1156 peek: logging::compute::Peek::new(*id, peek.timestamp, peek.uuid),
1157 peek_type,
1158 installed,
1159 })
1160 }
1161
1162 fn index(peek: Peek, mut trace_bundle: TraceBundle) -> Self {
1163 let empty_frontier = Antichain::new();
1164 let timestamp_frontier = Antichain::from_elem(peek.timestamp);
1165 trace_bundle
1166 .oks_mut()
1167 .set_logical_compaction(timestamp_frontier.borrow());
1168 trace_bundle
1169 .errs_mut()
1170 .set_logical_compaction(timestamp_frontier.borrow());
1171 trace_bundle
1172 .oks_mut()
1173 .set_physical_compaction(empty_frontier.borrow());
1174 trace_bundle
1175 .errs_mut()
1176 .set_physical_compaction(empty_frontier.borrow());
1177
1178 PendingPeek::Index(IndexPeek {
1179 peek,
1180 trace_bundle,
1181 span: tracing::Span::current(),
1182 })
1183 }
1184
1185 fn persist<A: Allocate>(
1186 peek: Peek,
1187 persist_clients: Arc<PersistClientCache>,
1188 metadata: CollectionMetadata,
1189 max_result_size: usize,
1190 timely_worker: &TimelyWorker<A>,
1191 ) -> Self {
1192 let active_worker = {
1193 let chosen_index = usize::cast_from(peek.uuid.hashed()) % timely_worker.peers();
1195 chosen_index == timely_worker.index()
1196 };
1197 let activator = timely_worker.sync_activator_for([].into());
1198 let peek_uuid = peek.uuid;
1199
1200 let (result_tx, result_rx) = oneshot::channel();
1201 let timestamp = peek.timestamp;
1202 let mfp_plan = peek.map_filter_project.clone();
1203 let max_results_needed = peek
1204 .finishing
1205 .limit
1206 .map(|l| usize::cast_from(u64::from(l)))
1207 .unwrap_or(usize::MAX)
1208 + peek.finishing.offset;
1209 let order_by = peek.finishing.order_by.clone();
1210
1211 let literal_constraint = peek
1213 .literal_constraints
1214 .clone()
1215 .map(|rows| rows.into_element());
1216
1217 let task_handle = mz_ore::task::spawn(|| "persist::peek", async move {
1218 let start = Instant::now();
1219 let result = if active_worker {
1220 PersistPeek::do_peek(
1221 &persist_clients,
1222 metadata,
1223 timestamp,
1224 literal_constraint,
1225 mfp_plan,
1226 max_result_size,
1227 max_results_needed,
1228 )
1229 .await
1230 } else {
1231 Ok(vec![])
1232 };
1233 let result = match result {
1234 Ok(rows) => PeekResponse::Rows(RowCollection::new(rows, &order_by)),
1235 Err(e) => PeekResponse::Error(e.to_string()),
1236 };
1237 match result_tx.send((result, start.elapsed())) {
1238 Ok(()) => {}
1239 Err((_result, elapsed)) => {
1240 debug!(duration =? elapsed, "dropping result for cancelled peek {peek_uuid}")
1241 }
1242 }
1243 match activator.activate() {
1244 Ok(()) => {}
1245 Err(_) => {
1246 debug!("unable to wake timely after completed peek {peek_uuid}");
1247 }
1248 }
1249 });
1250 PendingPeek::Persist(PersistPeek {
1251 peek,
1252 _abort_handle: task_handle.abort_on_drop(),
1253 result: result_rx,
1254 span: tracing::Span::current(),
1255 })
1256 }
1257
1258 fn span(&self) -> &tracing::Span {
1259 match self {
1260 PendingPeek::Index(p) => &p.span,
1261 PendingPeek::Persist(p) => &p.span,
1262 PendingPeek::Stash(p) => &p.span,
1263 }
1264 }
1265
1266 pub(crate) fn peek(&self) -> &Peek {
1267 match self {
1268 PendingPeek::Index(p) => &p.peek,
1269 PendingPeek::Persist(p) => &p.peek,
1270 PendingPeek::Stash(p) => &p.peek,
1271 }
1272 }
1273}
1274
1275pub struct PersistPeek {
1280 pub(crate) peek: Peek,
1281 _abort_handle: AbortOnDropHandle<()>,
1284 result: oneshot::Receiver<(PeekResponse, Duration)>,
1286 span: tracing::Span,
1288}
1289
1290impl PersistPeek {
1291 async fn do_peek(
1292 persist_clients: &PersistClientCache,
1293 metadata: CollectionMetadata,
1294 as_of: Timestamp,
1295 literal_constraint: Option<Row>,
1296 mfp_plan: SafeMfpPlan,
1297 max_result_size: usize,
1298 mut limit_remaining: usize,
1299 ) -> Result<Vec<(Row, NonZeroUsize)>, String> {
1300 let client = persist_clients
1301 .open(metadata.persist_location)
1302 .await
1303 .map_err(|e| e.to_string())?;
1304
1305 let mut reader: ReadHandle<SourceData, (), Timestamp, StorageDiff> = client
1306 .open_leased_reader(
1307 metadata.data_shard,
1308 Arc::new(metadata.relation_desc.clone()),
1309 Arc::new(UnitSchema),
1310 Diagnostics::from_purpose("persist::peek"),
1311 USE_CRITICAL_SINCE_SNAPSHOT.get(client.dyncfgs()),
1312 )
1313 .await
1314 .map_err(|e| e.to_string())?;
1315
1316 let mut txns_read = if let Some(txns_id) = metadata.txns_shard {
1323 Some(TxnsCache::open(&client, txns_id, Some(metadata.data_shard)).await)
1324 } else {
1325 None
1326 };
1327
1328 let metrics = client.metrics();
1329
1330 let mut cursor = StatsCursor::new(
1331 &mut reader,
1332 txns_read.as_mut(),
1333 metrics,
1334 &mfp_plan,
1335 &metadata.relation_desc,
1336 Antichain::from_elem(as_of),
1337 )
1338 .await
1339 .map_err(|since| {
1340 format!("attempted to peek at {as_of}, but the since has advanced to {since:?}")
1341 })?;
1342
1343 let mut result = vec![];
1345 let mut datum_vec = DatumVec::new();
1346 let mut row_builder = Row::default();
1347 let arena = RowArena::new();
1348 let mut total_size = 0usize;
1349
1350 let literal_len = match &literal_constraint {
1351 None => 0,
1352 Some(row) => row.iter().count(),
1353 };
1354
1355 'collect: while limit_remaining > 0 {
1356 let Some(batch) = cursor.next().await else {
1357 break;
1358 };
1359 for (data, _, d) in batch {
1360 let row = data.map_err(|e| e.to_string())?;
1361
1362 if let Some(literal) = &literal_constraint {
1363 match row.iter().take(literal_len).cmp(literal.iter()) {
1364 Ordering::Less => continue,
1365 Ordering::Equal => {}
1366 Ordering::Greater => break 'collect,
1367 }
1368 }
1369
1370 let count: usize = d.try_into().map_err(|_| {
1371 tracing::error!(
1372 shard = %metadata.data_shard, diff = d, ?row,
1373 "persist peek encountered negative multiplicities",
1374 );
1375 format!(
1376 "Invalid data in source, \
1377 saw retractions ({}) for row that does not exist: {:?}",
1378 -d, row,
1379 )
1380 })?;
1381 let Some(count) = NonZeroUsize::new(count) else {
1382 continue;
1383 };
1384 let mut datum_local = datum_vec.borrow_with(&row);
1385 let eval_result = mfp_plan
1386 .evaluate_into(&mut datum_local, &arena, &mut row_builder)
1387 .map(|row| row.cloned())
1388 .map_err(|e| e.to_string())?;
1389 if let Some(row) = eval_result {
1390 total_size = total_size
1391 .saturating_add(row.byte_len())
1392 .saturating_add(std::mem::size_of::<NonZeroUsize>());
1393 if total_size > max_result_size {
1394 return Err(format!(
1395 "result exceeds max size of {}",
1396 ByteSize::b(u64::cast_from(max_result_size))
1397 ));
1398 }
1399 result.push((row, count));
1400 limit_remaining = limit_remaining.saturating_sub(count.get());
1401 if limit_remaining == 0 {
1402 break;
1403 }
1404 }
1405 }
1406 }
1407
1408 Ok(result)
1409 }
1410}
1411
1412pub struct IndexPeek {
1414 peek: Peek,
1415 trace_bundle: TraceBundle,
1417 span: tracing::Span,
1419}
1420
1421impl IndexPeek {
1422 fn seek_fulfillment(
1435 &mut self,
1436 upper: &mut Antichain<Timestamp>,
1437 max_result_size: u64,
1438 peek_stash_eligible: bool,
1439 peek_stash_threshold_bytes: usize,
1440 ) -> PeekStatus {
1441 self.trace_bundle.oks_mut().read_upper(upper);
1442 if upper.less_equal(&self.peek.timestamp) {
1443 return PeekStatus::NotReady;
1444 }
1445 self.trace_bundle.errs_mut().read_upper(upper);
1446 if upper.less_equal(&self.peek.timestamp) {
1447 return PeekStatus::NotReady;
1448 }
1449
1450 let read_frontier = self.trace_bundle.compaction_frontier();
1451 if !read_frontier.less_equal(&self.peek.timestamp) {
1452 let error = format!(
1453 "Arrangement compaction frontier ({:?}) is beyond the time of the attempted read ({})",
1454 read_frontier.elements(),
1455 self.peek.timestamp,
1456 );
1457 return PeekStatus::Ready(PeekResponse::Error(error));
1458 }
1459
1460 self.collect_finished_data(
1461 max_result_size,
1462 peek_stash_eligible,
1463 peek_stash_threshold_bytes,
1464 )
1465 }
1466
1467 fn collect_finished_data(
1469 &mut self,
1470 max_result_size: u64,
1471 peek_stash_eligible: bool,
1472 peek_stash_threshold_bytes: usize,
1473 ) -> PeekStatus {
1474 let (mut cursor, storage) = self.trace_bundle.errs_mut().cursor();
1477 while cursor.key_valid(&storage) {
1478 let mut copies = Diff::ZERO;
1479 cursor.map_times(&storage, |time, diff| {
1480 if time.less_equal(&self.peek.timestamp) {
1481 copies += diff;
1482 }
1483 });
1484 if copies.is_negative() {
1485 let error = cursor.key(&storage);
1486 tracing::error!(
1487 target = %self.peek.target.id(), diff = %copies, %error,
1488 "index peek encountered negative multiplicities in error trace",
1489 );
1490 return PeekStatus::Ready(PeekResponse::Error(format!(
1491 "Invalid data in source errors, \
1492 saw retractions ({}) for row that does not exist: {}",
1493 -copies, error,
1494 )));
1495 }
1496 if copies.is_positive() {
1497 return PeekStatus::Ready(PeekResponse::Error(cursor.key(&storage).to_string()));
1498 }
1499 cursor.step_key(&storage);
1500 }
1501
1502 Self::collect_ok_finished_data(
1503 &self.peek,
1504 self.trace_bundle.oks_mut(),
1505 max_result_size,
1506 peek_stash_eligible,
1507 peek_stash_threshold_bytes,
1508 )
1509 }
1510
1511 fn collect_ok_finished_data<Tr>(
1513 peek: &Peek<Timestamp>,
1514 oks_handle: &mut Tr,
1515 max_result_size: u64,
1516 peek_stash_eligible: bool,
1517 peek_stash_threshold_bytes: usize,
1518 ) -> PeekStatus
1519 where
1520 for<'a> Tr: TraceReader<
1521 Key<'a>: ToDatumIter + Eq,
1522 KeyOwn = Row,
1523 Val<'a>: ToDatumIter,
1524 TimeGat<'a>: PartialOrder<mz_repr::Timestamp>,
1525 DiffGat<'a> = &'a Diff,
1526 >,
1527 {
1528 let max_result_size = usize::cast_from(max_result_size);
1529 let count_byte_size = size_of::<NonZeroUsize>();
1530
1531 let mut peek_iterator = peek_result_iterator::PeekResultIterator::new(
1534 peek.target.id().clone(),
1535 peek.map_filter_project.clone(),
1536 peek.timestamp,
1537 peek.literal_constraints.clone().as_deref_mut(),
1538 oks_handle,
1539 );
1540
1541 let mut results = Vec::new();
1543 let mut total_size: usize = 0;
1544
1545 let max_results = peek.finishing.num_rows_needed();
1551
1552 let mut l_datum_vec = DatumVec::new();
1553 let mut r_datum_vec = DatumVec::new();
1554
1555 while let Some(row) = peek_iterator.next() {
1556 let row = match row {
1557 Ok(row) => row,
1558 Err(err) => return PeekStatus::Ready(PeekResponse::Error(err)),
1559 };
1560 let (row, copies) = row;
1561 let copies: NonZeroUsize = NonZeroUsize::try_from(copies).expect("fits into usize");
1562
1563 total_size = total_size
1564 .saturating_add(row.byte_len())
1565 .saturating_add(count_byte_size);
1566 if peek_stash_eligible && total_size > peek_stash_threshold_bytes {
1567 return PeekStatus::UsePeekStash;
1568 }
1569 if total_size > max_result_size {
1570 return PeekStatus::Ready(PeekResponse::Error(format!(
1571 "result exceeds max size of {}",
1572 ByteSize::b(u64::cast_from(max_result_size))
1573 )));
1574 }
1575
1576 results.push((row, copies));
1577
1578 if let Some(max_results) = max_results {
1581 if results.len() >= 2 * max_results {
1585 if peek.finishing.order_by.is_empty() {
1586 results.truncate(max_results);
1587 return PeekStatus::Ready(PeekResponse::Rows(RowCollection::new(
1588 results,
1589 &peek.finishing.order_by,
1590 )));
1591 } else {
1592 results.sort_by(|left, right| {
1601 let left_datums = l_datum_vec.borrow_with(&left.0);
1602 let right_datums = r_datum_vec.borrow_with(&right.0);
1603 mz_expr::compare_columns(
1604 &peek.finishing.order_by,
1605 &left_datums,
1606 &right_datums,
1607 || left.0.cmp(&right.0),
1608 )
1609 });
1610 let dropped = results.drain(max_results..);
1611 let dropped_size =
1612 dropped.into_iter().fold(0, |acc: usize, (row, _count)| {
1613 acc.saturating_add(row.byte_len().saturating_add(count_byte_size))
1614 });
1615 total_size = total_size.saturating_sub(dropped_size);
1616 }
1617 }
1618 }
1619 }
1620
1621 PeekStatus::Ready(PeekResponse::Rows(RowCollection::new(
1622 results,
1623 &peek.finishing.order_by,
1624 )))
1625 }
1626}
1627
1628enum PeekStatus {
1631 NotReady,
1634 UsePeekStash,
1637 Ready(PeekResponse),
1639}
1640
1641#[derive(Debug)]
1643struct ReportedFrontiers {
1644 write_frontier: ReportedFrontier,
1646 input_frontier: ReportedFrontier,
1648 output_frontier: ReportedFrontier,
1650}
1651
1652impl ReportedFrontiers {
1653 fn new() -> Self {
1655 Self {
1656 write_frontier: ReportedFrontier::new(),
1657 input_frontier: ReportedFrontier::new(),
1658 output_frontier: ReportedFrontier::new(),
1659 }
1660 }
1661
1662 fn all_empty(&self) -> bool {
1664 self.write_frontier.is_empty()
1665 && self.input_frontier.is_empty()
1666 && self.output_frontier.is_empty()
1667 }
1668}
1669
1670#[derive(Clone, Debug)]
1672pub enum ReportedFrontier {
1673 Reported(Antichain<Timestamp>),
1675 NotReported {
1677 lower: Antichain<Timestamp>,
1679 },
1680}
1681
1682impl ReportedFrontier {
1683 pub fn new() -> Self {
1685 let lower = Antichain::from_elem(timely::progress::Timestamp::minimum());
1686 Self::NotReported { lower }
1687 }
1688
1689 pub fn is_empty(&self) -> bool {
1691 match self {
1692 Self::Reported(frontier) => frontier.is_empty(),
1693 Self::NotReported { .. } => false,
1694 }
1695 }
1696
1697 fn allows_reporting(&self, other: &Antichain<Timestamp>) -> bool {
1703 match self {
1704 Self::Reported(frontier) => PartialOrder::less_than(frontier, other),
1705 Self::NotReported { lower } => PartialOrder::less_equal(lower, other),
1706 }
1707 }
1708}
1709
1710pub struct CollectionState {
1712 reported_frontiers: ReportedFrontiers,
1714 dataflow_index: Rc<usize>,
1720 pub is_subscribe_or_copy: bool,
1726 as_of: Antichain<Timestamp>,
1730
1731 pub sink_token: Option<SinkToken>,
1736 pub sink_write_frontier: Option<Rc<RefCell<Antichain<Timestamp>>>>,
1740 pub input_probes: BTreeMap<GlobalId, probe::Handle<Timestamp>>,
1742 pub compute_probe: Option<probe::Handle<Timestamp>>,
1747 logging: Option<CollectionLogging>,
1749 metrics: CollectionMetrics,
1751}
1752
1753impl CollectionState {
1754 fn new(
1755 dataflow_index: Rc<usize>,
1756 is_subscribe_or_copy: bool,
1757 as_of: Antichain<Timestamp>,
1758 metrics: CollectionMetrics,
1759 ) -> Self {
1760 Self {
1761 reported_frontiers: ReportedFrontiers::new(),
1762 dataflow_index,
1763 is_subscribe_or_copy,
1764 as_of,
1765 sink_token: None,
1766 sink_write_frontier: None,
1767 input_probes: Default::default(),
1768 compute_probe: None,
1769 logging: None,
1770 metrics,
1771 }
1772 }
1773
1774 fn reported_frontiers(&self) -> &ReportedFrontiers {
1776 &self.reported_frontiers
1777 }
1778
1779 pub fn reset_reported_frontiers(&mut self, frontier: ReportedFrontier) {
1781 self.reported_frontiers.write_frontier = frontier.clone();
1782 self.reported_frontiers.input_frontier = frontier.clone();
1783 self.reported_frontiers.output_frontier = frontier;
1784 }
1785
1786 fn set_reported_write_frontier(&mut self, frontier: ReportedFrontier) {
1788 if let Some(logging) = &mut self.logging {
1789 let time = match &frontier {
1790 ReportedFrontier::Reported(frontier) => frontier.get(0).copied(),
1791 ReportedFrontier::NotReported { .. } => Some(Timestamp::MIN),
1792 };
1793 logging.set_frontier(time);
1794 }
1795
1796 self.reported_frontiers.write_frontier = frontier;
1797 }
1798
1799 fn set_reported_input_frontier(&mut self, frontier: ReportedFrontier) {
1801 if let Some(logging) = &mut self.logging {
1803 for (id, probe) in &self.input_probes {
1804 let new_time = probe.with_frontier(|frontier| frontier.as_option().copied());
1805 logging.set_import_frontier(*id, new_time);
1806 }
1807 }
1808
1809 self.reported_frontiers.input_frontier = frontier;
1810 }
1811
1812 fn set_reported_output_frontier(&mut self, frontier: ReportedFrontier) {
1814 let already_hydrated = self.hydrated();
1815
1816 self.reported_frontiers.output_frontier = frontier;
1817
1818 if !already_hydrated && self.hydrated() {
1819 if let Some(logging) = &mut self.logging {
1820 logging.set_hydrated();
1821 }
1822 self.metrics.record_collection_hydrated();
1823 }
1824 }
1825
1826 fn hydrated(&self) -> bool {
1828 match &self.reported_frontiers.output_frontier {
1829 ReportedFrontier::Reported(frontier) => PartialOrder::less_than(&self.as_of, frontier),
1830 ReportedFrontier::NotReported { .. } => false,
1831 }
1832 }
1833}
1834
1835pub struct DroppedCollection {
1847 reported_frontiers: ReportedFrontiers,
1848 is_subscribe_or_copy: bool,
1849}
1850
1851pub struct HydrationEvent {
1853 pub export_id: GlobalId,
1855 pub lir_id: LirId,
1857 pub hydrated: bool,
1859}