1use std::any::Any;
9use std::cell::RefCell;
10use std::cmp::Ordering;
11use std::collections::{BTreeMap, BTreeSet};
12use std::num::NonZeroUsize;
13use std::rc::Rc;
14use std::sync::Arc;
15use std::time::{Duration, Instant};
16
17use bytesize::ByteSize;
18use differential_dataflow::Hashable;
19use differential_dataflow::lattice::Lattice;
20use differential_dataflow::trace::{Cursor, TraceReader};
21use mz_compute_client::logging::LoggingConfig;
22use mz_compute_client::protocol::command::{
23 ComputeCommand, ComputeParameters, InstanceConfig, Peek, PeekTarget,
24};
25use mz_compute_client::protocol::history::ComputeCommandHistory;
26use mz_compute_client::protocol::response::{
27 ComputeResponse, CopyToResponse, FrontiersResponse, PeekResponse, SubscribeResponse,
28};
29use mz_compute_types::dataflows::DataflowDescription;
30use mz_compute_types::dyncfgs::{
31 ENABLE_PEEK_RESPONSE_STASH, PEEK_RESPONSE_STASH_BATCH_MAX_RUNS,
32 PEEK_RESPONSE_STASH_THRESHOLD_BYTES, PEEK_STASH_BATCH_SIZE, PEEK_STASH_NUM_BATCHES,
33};
34use mz_compute_types::plan::render_plan::RenderPlan;
35use mz_dyncfg::ConfigSet;
36use mz_expr::SafeMfpPlan;
37use mz_expr::row::RowCollection;
38use mz_ore::cast::CastFrom;
39use mz_ore::collections::CollectionExt;
40use mz_ore::metrics::UIntGauge;
41use mz_ore::now::EpochMillis;
42use mz_ore::soft_panic_or_log;
43use mz_ore::task::AbortOnDropHandle;
44use mz_ore::tracing::{OpenTelemetryContext, TracingHandle};
45use mz_persist_client::Diagnostics;
46use mz_persist_client::cache::PersistClientCache;
47use mz_persist_client::cfg::USE_CRITICAL_SINCE_SNAPSHOT;
48use mz_persist_client::read::ReadHandle;
49use mz_persist_types::PersistLocation;
50use mz_persist_types::codec_impls::UnitSchema;
51use mz_repr::fixed_length::ToDatumIter;
52use mz_repr::{DatumVec, Diff, GlobalId, Row, RowArena, Timestamp};
53use mz_storage_operators::stats::StatsCursor;
54use mz_storage_types::StorageDiff;
55use mz_storage_types::controller::CollectionMetadata;
56use mz_storage_types::dyncfgs::ORE_OVERFLOWING_BEHAVIOR;
57use mz_storage_types::sources::SourceData;
58use mz_storage_types::time_dependence::TimeDependence;
59use mz_txn_wal::operator::TxnsContext;
60use mz_txn_wal::txn_cache::TxnsCache;
61use timely::communication::Allocate;
62use timely::dataflow::operators::probe;
63use timely::order::PartialOrder;
64use timely::progress::frontier::Antichain;
65use timely::scheduling::Scheduler;
66use timely::worker::Worker as TimelyWorker;
67use tokio::sync::{oneshot, watch};
68use tracing::{Level, debug, error, info, span, warn};
69use uuid::Uuid;
70
71use crate::arrangement::manager::{TraceBundle, TraceManager};
72use crate::logging;
73use crate::logging::compute::{CollectionLogging, ComputeEvent, PeekEvent};
74use crate::logging::initialize::LoggingTraces;
75use crate::metrics::{CollectionMetrics, WorkerMetrics};
76use crate::render::{LinearJoinSpec, StartSignal};
77use crate::server::{ComputeInstanceContext, ResponseSender};
78
79mod peek_result_iterator;
80mod peek_stash;
81
82pub struct ComputeState {
87 pub collections: BTreeMap<GlobalId, CollectionState>,
96 pub traces: TraceManager,
98 pub subscribe_response_buffer: Rc<RefCell<Vec<(GlobalId, SubscribeResponse)>>>,
103 pub copy_to_response_buffer: Rc<RefCell<Vec<(GlobalId, CopyToResponse)>>>,
108 pub pending_peeks: BTreeMap<Uuid, PendingPeek>,
110 pub peek_stash_persist_location: Option<PersistLocation>,
112 pub compute_logger: Option<logging::compute::Logger>,
114 pub persist_clients: Arc<PersistClientCache>,
117 pub txns_ctx: TxnsContext,
119 pub command_history: ComputeCommandHistory<UIntGauge>,
121 max_result_size: u64,
123 pub linear_join_spec: LinearJoinSpec,
125 pub metrics: WorkerMetrics,
127 tracing_handle: Arc<TracingHandle>,
129 pub context: ComputeInstanceContext,
131 pub worker_config: Rc<ConfigSet>,
145
146 suspended_collections: BTreeMap<GlobalId, Rc<dyn Any>>,
152
153 pub server_maintenance_interval: Duration,
156
157 pub init_system_time: EpochMillis,
161
162 pub replica_expiration: Antichain<Timestamp>,
166}
167
168impl ComputeState {
169 pub fn new(
171 persist_clients: Arc<PersistClientCache>,
172 txns_ctx: TxnsContext,
173 metrics: WorkerMetrics,
174 tracing_handle: Arc<TracingHandle>,
175 context: ComputeInstanceContext,
176 ) -> Self {
177 let traces = TraceManager::new(metrics.clone());
178 let command_history = ComputeCommandHistory::new(metrics.for_history());
179
180 Self {
181 collections: Default::default(),
182 traces,
183 subscribe_response_buffer: Default::default(),
184 copy_to_response_buffer: Default::default(),
185 pending_peeks: Default::default(),
186 peek_stash_persist_location: None,
187 compute_logger: None,
188 persist_clients,
189 txns_ctx,
190 command_history,
191 max_result_size: u64::MAX,
192 linear_join_spec: Default::default(),
193 metrics,
194 tracing_handle,
195 context,
196 worker_config: mz_dyncfgs::all_dyncfgs().into(),
197 suspended_collections: Default::default(),
198 server_maintenance_interval: Duration::ZERO,
199 init_system_time: mz_ore::now::SYSTEM_TIME(),
200 replica_expiration: Antichain::default(),
201 }
202 }
203
204 pub fn expect_collection_mut(&mut self, id: GlobalId) -> &mut CollectionState {
208 self.collections
209 .get_mut(&id)
210 .expect("collection must exist")
211 }
212
213 pub fn input_probe_for(
219 &mut self,
220 input_id: GlobalId,
221 collection_ids: impl Iterator<Item = GlobalId>,
222 ) -> probe::Handle<Timestamp> {
223 let probe = probe::Handle::default();
224 for id in collection_ids {
225 if let Some(collection) = self.collections.get_mut(&id) {
226 collection.input_probes.insert(input_id, probe.clone());
227 }
228 }
229 probe
230 }
231
232 fn apply_worker_config(&mut self) {
234 use mz_compute_types::dyncfgs::*;
235
236 let config = &self.worker_config;
237
238 self.linear_join_spec = LinearJoinSpec::from_config(config);
239
240 if ENABLE_LGALLOC.get(config) {
241 if let Some(path) = &self.context.scratch_directory {
242 let clear_bytes = LGALLOC_SLOW_CLEAR_BYTES.get(config);
243 let eager_return = ENABLE_LGALLOC_EAGER_RECLAMATION.get(config);
244 let file_growth_dampener = LGALLOC_FILE_GROWTH_DAMPENER.get(config);
245 let interval = LGALLOC_BACKGROUND_INTERVAL.get(config);
246 let local_buffer_bytes = LGALLOC_LOCAL_BUFFER_BYTES.get(config);
247 info!(
248 ?path,
249 backgrund_interval=?interval,
250 clear_bytes,
251 eager_return,
252 file_growth_dampener,
253 local_buffer_bytes,
254 "enabling lgalloc"
255 );
256 let background_worker_config = lgalloc::BackgroundWorkerConfig {
257 interval,
258 clear_bytes,
259 };
260 lgalloc::lgalloc_set_config(
261 lgalloc::LgAlloc::new()
262 .enable()
263 .with_path(path.clone())
264 .with_background_config(background_worker_config)
265 .eager_return(eager_return)
266 .file_growth_dampener(file_growth_dampener)
267 .local_buffer_bytes(local_buffer_bytes),
268 );
269 } else {
270 debug!("not enabling lgalloc, scratch directory not specified");
271 }
272 } else {
273 info!("disabling lgalloc");
274 lgalloc::lgalloc_set_config(lgalloc::LgAlloc::new().disable());
275 }
276
277 crate::memory_limiter::apply_limiter_config(config);
278
279 mz_ore::region::ENABLE_LGALLOC_REGION.store(
280 ENABLE_COLUMNATION_LGALLOC.get(config),
281 std::sync::atomic::Ordering::Relaxed,
282 );
283
284 let enable_columnar_lgalloc = ENABLE_COLUMNAR_LGALLOC.get(config);
285 mz_timely_util::containers::set_enable_columnar_lgalloc(enable_columnar_lgalloc);
286
287 self.server_maintenance_interval = COMPUTE_SERVER_MAINTENANCE_INTERVAL.get(config);
290
291 let overflowing_behavior = ORE_OVERFLOWING_BEHAVIOR.get(config);
292 match overflowing_behavior.parse() {
293 Ok(behavior) => mz_ore::overflowing::set_behavior(behavior),
294 Err(err) => {
295 error!(
296 err,
297 overflowing_behavior, "Invalid value for ore_overflowing_behavior"
298 );
299 }
300 }
301 }
302
303 pub fn apply_expiration_offset(&mut self, offset: Duration) {
309 if self.replica_expiration.is_empty() {
310 let offset: EpochMillis = offset
311 .as_millis()
312 .try_into()
313 .expect("duration must fit within u64");
314 let replica_expiration_millis = self.init_system_time + offset;
315 let replica_expiration = Timestamp::from(replica_expiration_millis);
316
317 info!(
318 offset = %offset,
319 replica_expiration_millis = %replica_expiration_millis,
320 replica_expiration_utc = %mz_ore::now::to_datetime(replica_expiration_millis),
321 "setting replica expiration",
322 );
323 self.replica_expiration = Antichain::from_elem(replica_expiration);
324
325 self.metrics
327 .replica_expiration_timestamp_seconds
328 .set(replica_expiration.into());
329 }
330 }
331
332 pub fn dataflow_max_inflight_bytes(&self) -> Option<usize> {
335 use mz_compute_types::dyncfgs::{
336 DATAFLOW_MAX_INFLIGHT_BYTES, DATAFLOW_MAX_INFLIGHT_BYTES_CC,
337 };
338
339 if self.persist_clients.cfg.is_cc_active {
340 DATAFLOW_MAX_INFLIGHT_BYTES_CC.get(&self.worker_config)
341 } else {
342 DATAFLOW_MAX_INFLIGHT_BYTES.get(&self.worker_config)
343 }
344 }
345}
346
347pub(crate) struct ActiveComputeState<'a, A: Allocate> {
349 pub timely_worker: &'a mut TimelyWorker<A>,
351 pub compute_state: &'a mut ComputeState,
353 pub response_tx: &'a mut ResponseSender,
355}
356
357pub struct SinkToken(#[allow(dead_code)] Box<dyn Any>);
359
360impl SinkToken {
361 pub fn new(t: Box<dyn Any>) -> Self {
363 Self(t)
364 }
365}
366
367impl<'a, A: Allocate + 'static> ActiveComputeState<'a, A> {
368 #[mz_ore::instrument(level = "debug")]
370 pub fn handle_compute_command(&mut self, cmd: ComputeCommand) {
371 use ComputeCommand::*;
372
373 self.compute_state.command_history.push(cmd.clone());
374
375 let timer = self
377 .compute_state
378 .metrics
379 .handle_command_duration_seconds
380 .for_command(&cmd)
381 .start_timer();
382
383 match cmd {
384 Hello { .. } => panic!("Hello must be captured before"),
385 CreateInstance(instance_config) => self.handle_create_instance(*instance_config),
386 InitializationComplete => (),
387 UpdateConfiguration(params) => self.handle_update_configuration(*params),
388 CreateDataflow(dataflow) => self.handle_create_dataflow(*dataflow),
389 Schedule(id) => self.handle_schedule(id),
390 AllowCompaction { id, frontier } => self.handle_allow_compaction(id, frontier),
391 Peek(peek) => {
392 peek.otel_ctx.attach_as_parent();
393 self.handle_peek(*peek)
394 }
395 CancelPeek { uuid } => self.handle_cancel_peek(uuid),
396 AllowWrites(id) => {
397 self.handle_allow_writes(id);
398 }
399 }
400
401 timer.observe_duration();
402 }
403
404 fn handle_create_instance(&mut self, config: InstanceConfig) {
405 self.compute_state.apply_worker_config();
407 if let Some(offset) = config.expiration_offset {
408 self.compute_state.apply_expiration_offset(offset);
409 }
410
411 self.initialize_logging(config.logging);
412
413 self.compute_state.peek_stash_persist_location = Some(config.peek_stash_persist_location);
414 }
415
416 fn handle_update_configuration(&mut self, params: ComputeParameters) {
417 info!("Applying configuration update: {params:?}");
418
419 let ComputeParameters {
420 workload_class,
421 max_result_size,
422 tracing,
423 grpc_client: _grpc_client,
424 dyncfg_updates,
425 } = params;
426
427 if let Some(v) = workload_class {
428 self.compute_state.metrics.set_workload_class(v);
429 }
430 if let Some(v) = max_result_size {
431 self.compute_state.max_result_size = v;
432 }
433
434 tracing.apply(self.compute_state.tracing_handle.as_ref());
435
436 dyncfg_updates.apply(&self.compute_state.worker_config);
437 self.compute_state
438 .persist_clients
439 .cfg()
440 .apply_from(&dyncfg_updates);
441
442 mz_metrics::update_dyncfg(&dyncfg_updates);
446
447 self.compute_state.apply_worker_config();
448 }
449
450 fn handle_create_dataflow(
451 &mut self,
452 dataflow: DataflowDescription<RenderPlan, CollectionMetadata>,
453 ) {
454 let dataflow_index = Rc::new(self.timely_worker.next_dataflow_index());
455 let as_of = dataflow.as_of.clone().unwrap();
456
457 let dataflow_expiration = dataflow
458 .time_dependence
459 .as_ref()
460 .map(|time_dependence| {
461 self.determine_dataflow_expiration(time_dependence, &dataflow.until)
462 })
463 .unwrap_or_default();
464
465 let until = dataflow.until.meet(&dataflow_expiration);
467
468 if dataflow.is_transient() {
469 debug!(
470 name = %dataflow.debug_name,
471 import_ids = %dataflow.display_import_ids(),
472 export_ids = %dataflow.display_export_ids(),
473 as_of = ?as_of.elements(),
474 time_dependence = ?dataflow.time_dependence,
475 expiration = ?dataflow_expiration.elements(),
476 expiration_datetime = ?dataflow_expiration.as_option().map(|t| mz_ore::now::to_datetime(t.into())),
477 plan_until = ?dataflow.until.elements(),
478 until = ?until.elements(),
479 "creating dataflow",
480 );
481 } else {
482 info!(
483 name = %dataflow.debug_name,
484 import_ids = %dataflow.display_import_ids(),
485 export_ids = %dataflow.display_export_ids(),
486 as_of = ?as_of.elements(),
487 time_dependence = ?dataflow.time_dependence,
488 expiration = ?dataflow_expiration.elements(),
489 expiration_datetime = ?dataflow_expiration.as_option().map(|t| mz_ore::now::to_datetime(t.into())),
490 plan_until = ?dataflow.until.elements(),
491 until = ?until.elements(),
492 "creating dataflow",
493 );
494 };
495
496 let subscribe_copy_ids: BTreeSet<_> = dataflow
497 .subscribe_ids()
498 .chain(dataflow.copy_to_ids())
499 .collect();
500
501 for object_id in dataflow.export_ids() {
503 let is_subscribe_or_copy = subscribe_copy_ids.contains(&object_id);
504 let metrics = self.compute_state.metrics.for_collection(object_id);
505 let mut collection = CollectionState::new(
506 Rc::clone(&dataflow_index),
507 is_subscribe_or_copy,
508 as_of.clone(),
509 metrics,
510 );
511
512 if let Some(logger) = self.compute_state.compute_logger.clone() {
513 let logging = CollectionLogging::new(
514 object_id,
515 logger,
516 *dataflow_index,
517 dataflow.import_ids(),
518 );
519 collection.logging = Some(logging);
520 }
521
522 collection.reset_reported_frontiers(ReportedFrontier::NotReported {
523 lower: as_of.clone(),
524 });
525
526 let existing = self.compute_state.collections.insert(object_id, collection);
527 if existing.is_some() {
528 error!(
529 id = ?object_id,
530 "existing collection for newly created dataflow",
531 );
532 }
533 }
534
535 let (start_signal, suspension_token) = StartSignal::new();
536 for id in dataflow.export_ids() {
537 self.compute_state
538 .suspended_collections
539 .insert(id, Rc::clone(&suspension_token));
540 }
541
542 crate::render::build_compute_dataflow(
543 self.timely_worker,
544 self.compute_state,
545 dataflow,
546 start_signal,
547 until,
548 dataflow_expiration,
549 );
550 }
551
552 fn handle_schedule(&mut self, id: GlobalId) {
553 let suspension_token = self.compute_state.suspended_collections.remove(&id);
559 drop(suspension_token);
560 }
561
562 fn handle_allow_compaction(&mut self, id: GlobalId, frontier: Antichain<Timestamp>) {
563 if frontier.is_empty() {
564 self.drop_collection(id);
566 } else {
567 self.compute_state
568 .traces
569 .allow_compaction(id, frontier.borrow());
570 }
571 }
572
573 #[mz_ore::instrument(level = "debug")]
574 fn handle_peek(&mut self, peek: Peek) {
575 let pending = match &peek.target {
576 PeekTarget::Index { id } => {
577 let trace_bundle = self.compute_state.traces.get(id).unwrap().clone();
579 PendingPeek::index(peek, trace_bundle)
580 }
581 PeekTarget::Persist { metadata, .. } => {
582 let metadata = metadata.clone();
583 PendingPeek::persist(
584 peek,
585 Arc::clone(&self.compute_state.persist_clients),
586 metadata,
587 usize::cast_from(self.compute_state.max_result_size),
588 self.timely_worker,
589 )
590 }
591 };
592
593 if let Some(logger) = self.compute_state.compute_logger.as_mut() {
595 logger.log(&pending.as_log_event(true));
596 }
597
598 self.process_peek(&mut Antichain::new(), pending);
599 }
600
601 fn handle_cancel_peek(&mut self, uuid: Uuid) {
602 if let Some(peek) = self.compute_state.pending_peeks.remove(&uuid) {
603 self.send_peek_response(peek, PeekResponse::Canceled);
604 }
605 }
606
607 fn handle_allow_writes(&mut self, id: GlobalId) {
608 self.compute_state.persist_clients.cfg().enable_compaction();
612
613 if let Some(collection) = self.compute_state.collections.get_mut(&id) {
614 collection.allow_writes();
615 } else {
616 soft_panic_or_log!("allow writes for unknown collection {id}");
617 }
618 }
619
620 fn drop_collection(&mut self, id: GlobalId) {
622 let collection = self
623 .compute_state
624 .collections
625 .remove(&id)
626 .expect("dropped untracked collection");
627
628 self.compute_state.traces.remove(&id);
630 self.compute_state.suspended_collections.remove(&id);
632
633 if let Ok(index) = Rc::try_unwrap(collection.dataflow_index) {
635 self.timely_worker.drop_dataflow(index);
636 }
637
638 if !collection.is_subscribe_or_copy {
643 let reported = collection.reported_frontiers;
644 let write_frontier = (!reported.write_frontier.is_empty()).then(Antichain::new);
645 let input_frontier = (!reported.input_frontier.is_empty()).then(Antichain::new);
646 let output_frontier = (!reported.output_frontier.is_empty()).then(Antichain::new);
647
648 let frontiers = FrontiersResponse {
649 write_frontier,
650 input_frontier,
651 output_frontier,
652 };
653 if frontiers.has_updates() {
654 self.send_compute_response(ComputeResponse::Frontiers(id, frontiers));
655 }
656 }
657 }
658
659 pub fn initialize_logging(&mut self, config: LoggingConfig) {
661 if self.compute_state.compute_logger.is_some() {
662 panic!("dataflow server has already initialized logging");
663 }
664
665 let LoggingTraces {
666 traces,
667 dataflow_index,
668 compute_logger: logger,
669 } = logging::initialize(self.timely_worker, &config);
670
671 let dataflow_index = Rc::new(dataflow_index);
672 let mut log_index_ids = config.index_logs;
673 for (log, trace) in traces {
674 let id = log_index_ids
676 .remove(&log)
677 .expect("`logging::initialize` does not invent logs");
678 self.compute_state.traces.set(id, trace);
679
680 let is_subscribe_or_copy = false;
682 let as_of = Antichain::from_elem(Timestamp::MIN);
683 let metrics = self.compute_state.metrics.for_collection(id);
684 let mut collection = CollectionState::new(
685 Rc::clone(&dataflow_index),
686 is_subscribe_or_copy,
687 as_of,
688 metrics,
689 );
690
691 let logging =
692 CollectionLogging::new(id, logger.clone(), *dataflow_index, std::iter::empty());
693 collection.logging = Some(logging);
694
695 let existing = self.compute_state.collections.insert(id, collection);
696 if existing.is_some() {
697 error!(
698 id = ?id,
699 "existing collection for newly initialized logging export",
700 );
701 }
702 }
703
704 assert!(
706 log_index_ids.is_empty(),
707 "failed to create requested logging indexes: {log_index_ids:?}",
708 );
709
710 self.compute_state.compute_logger = Some(logger);
711 }
712
713 pub fn report_frontiers(&mut self) {
715 let mut responses = Vec::new();
716
717 let mut new_frontier = Antichain::new();
719
720 for (&id, collection) in self.compute_state.collections.iter_mut() {
721 if collection.is_subscribe_or_copy {
724 continue;
725 }
726
727 let reported = collection.reported_frontiers();
728
729 new_frontier.clear();
731 if let Some(traces) = self.compute_state.traces.get_mut(&id) {
732 assert!(
733 collection.sink_write_frontier.is_none(),
734 "collection {id} has multiple frontiers"
735 );
736 traces.oks_mut().read_upper(&mut new_frontier);
737 } else if let Some(frontier) = &collection.sink_write_frontier {
738 new_frontier.clone_from(&frontier.borrow());
739 } else {
740 error!(id = ?id, "collection without write frontier");
741 continue;
742 }
743 let new_write_frontier = reported
744 .write_frontier
745 .allows_reporting(&new_frontier)
746 .then(|| new_frontier.clone());
747
748 if let Some(probe) = &collection.compute_probe {
757 probe.with_frontier(|frontier| new_frontier.extend(frontier.iter().copied()));
758 }
759 let new_output_frontier = reported
760 .output_frontier
761 .allows_reporting(&new_frontier)
762 .then(|| new_frontier.clone());
763
764 new_frontier.clear();
766 for probe in collection.input_probes.values() {
767 probe.with_frontier(|frontier| new_frontier.extend(frontier.iter().copied()));
768 }
769 let new_input_frontier = reported
770 .input_frontier
771 .allows_reporting(&new_frontier)
772 .then(|| new_frontier.clone());
773
774 if let Some(frontier) = &new_write_frontier {
775 collection
776 .set_reported_write_frontier(ReportedFrontier::Reported(frontier.clone()));
777 }
778 if let Some(frontier) = &new_input_frontier {
779 collection
780 .set_reported_input_frontier(ReportedFrontier::Reported(frontier.clone()));
781 }
782 if let Some(frontier) = &new_output_frontier {
783 collection
784 .set_reported_output_frontier(ReportedFrontier::Reported(frontier.clone()));
785 }
786
787 let response = FrontiersResponse {
788 write_frontier: new_write_frontier,
789 input_frontier: new_input_frontier,
790 output_frontier: new_output_frontier,
791 };
792 if response.has_updates() {
793 responses.push((id, response));
794 }
795 }
796
797 for (id, frontiers) in responses {
798 self.send_compute_response(ComputeResponse::Frontiers(id, frontiers));
799 }
800 }
801
802 pub(crate) fn report_metrics(&self) {
804 if let Some(expiration) = self.compute_state.replica_expiration.as_option() {
805 let now = Duration::from_millis(mz_ore::now::SYSTEM_TIME()).as_secs_f64();
806 let expiration = Duration::from_millis(<u64>::from(expiration)).as_secs_f64();
807 let remaining = expiration - now;
808 self.compute_state
809 .metrics
810 .replica_expiration_remaining_seconds
811 .set(remaining)
812 }
813 }
814
815 fn process_peek(&mut self, upper: &mut Antichain<Timestamp>, mut peek: PendingPeek) {
817 let response = match &mut peek {
818 PendingPeek::Index(peek) => {
819 let peek_stash_eligible = peek
820 .peek
821 .finishing
822 .is_streamable(peek.peek.result_desc.arity());
823
824 let peek_stash_enabled = {
825 let enabled = ENABLE_PEEK_RESPONSE_STASH.get(&self.compute_state.worker_config);
826 let peek_persist_stash_available =
827 self.compute_state.peek_stash_persist_location.is_some();
828 if !peek_persist_stash_available && enabled {
829 tracing::error!(
830 "missing peek_stash_persist_location but peek stash is enabled"
831 );
832 }
833 enabled && peek_persist_stash_available
834 };
835
836 let peek_stash_threshold_bytes =
837 PEEK_RESPONSE_STASH_THRESHOLD_BYTES.get(&self.compute_state.worker_config);
838
839 match peek.seek_fulfillment(
840 upper,
841 self.compute_state.max_result_size,
842 peek_stash_enabled && peek_stash_eligible,
843 peek_stash_threshold_bytes,
844 ) {
845 PeekStatus::Ready(result) => Some(result),
846 PeekStatus::NotReady => None,
847 PeekStatus::UsePeekStash => {
848 let _span =
849 span!(parent: &peek.span, Level::DEBUG, "process_stash_peek").entered();
850
851 let peek_stash_batch_max_runs = PEEK_RESPONSE_STASH_BATCH_MAX_RUNS
852 .get(&self.compute_state.worker_config);
853
854 let stash_task = peek_stash::StashingPeek::start_upload(
855 Arc::clone(&self.compute_state.persist_clients),
856 self.compute_state
857 .peek_stash_persist_location
858 .as_ref()
859 .expect("verified above"),
860 peek.peek.clone(),
861 peek.trace_bundle.clone(),
862 peek_stash_batch_max_runs,
863 );
864
865 self.compute_state
866 .pending_peeks
867 .insert(peek.peek.uuid, PendingPeek::Stash(stash_task));
868 return;
869 }
870 }
871 }
872 PendingPeek::Persist(peek) => peek.result.try_recv().ok().map(|(result, duration)| {
873 self.compute_state
874 .metrics
875 .persist_peek_seconds
876 .observe(duration.as_secs_f64());
877 result
878 }),
879 PendingPeek::Stash(stashing_peek) => {
880 let num_batches = PEEK_STASH_NUM_BATCHES.get(&self.compute_state.worker_config);
881 let batch_size = PEEK_STASH_BATCH_SIZE.get(&self.compute_state.worker_config);
882 stashing_peek.pump_rows(num_batches, batch_size);
883
884 if let Ok((response, duration)) = stashing_peek.result.try_recv() {
885 self.compute_state
886 .metrics
887 .stashed_peek_seconds
888 .observe(duration.as_secs_f64());
889 tracing::trace!(?stashing_peek.peek, ?duration, "finished stashing peek response in persist");
890
891 Some(response)
892 } else {
893 None
894 }
895 }
896 };
897
898 if let Some(response) = response {
899 let _span = span!(parent: peek.span(), Level::DEBUG, "process_peek_response").entered();
900 self.send_peek_response(peek, response)
901 } else {
902 let uuid = peek.peek().uuid;
903 self.compute_state.pending_peeks.insert(uuid, peek);
904 }
905 }
906
907 pub fn process_peeks(&mut self) {
909 let mut upper = Antichain::new();
910 let pending_peeks = std::mem::take(&mut self.compute_state.pending_peeks);
911 for (_uuid, peek) in pending_peeks {
912 self.process_peek(&mut upper, peek);
913 }
914 }
915
916 #[mz_ore::instrument(level = "debug")]
921 fn send_peek_response(&mut self, peek: PendingPeek, response: PeekResponse) {
922 let log_event = peek.as_log_event(false);
923 self.send_compute_response(ComputeResponse::PeekResponse(
925 peek.peek().uuid,
926 response,
927 OpenTelemetryContext::obtain(),
928 ));
929
930 if let Some(logger) = self.compute_state.compute_logger.as_mut() {
932 logger.log(&log_event);
933 }
934 }
935
936 pub fn process_subscribes(&mut self) {
938 let mut subscribe_responses = self.compute_state.subscribe_response_buffer.borrow_mut();
939 for (sink_id, mut response) in subscribe_responses.drain(..) {
940 if let Some(collection) = self.compute_state.collections.get_mut(&sink_id) {
942 let new_frontier = match &response {
943 SubscribeResponse::Batch(b) => b.upper.clone(),
944 SubscribeResponse::DroppedAt(_) => Antichain::new(),
945 };
946
947 let reported = collection.reported_frontiers();
948 assert!(
949 reported.write_frontier.allows_reporting(&new_frontier),
950 "subscribe write frontier regression: {:?} -> {:?}",
951 reported.write_frontier,
952 new_frontier,
953 );
954 assert!(
955 reported.input_frontier.allows_reporting(&new_frontier),
956 "subscribe input frontier regression: {:?} -> {:?}",
957 reported.input_frontier,
958 new_frontier,
959 );
960
961 collection
962 .set_reported_write_frontier(ReportedFrontier::Reported(new_frontier.clone()));
963 collection
964 .set_reported_input_frontier(ReportedFrontier::Reported(new_frontier.clone()));
965 collection.set_reported_output_frontier(ReportedFrontier::Reported(new_frontier));
966 } else {
967 }
970
971 response
972 .to_error_if_exceeds(usize::try_from(self.compute_state.max_result_size).unwrap());
973 self.send_compute_response(ComputeResponse::SubscribeResponse(sink_id, response));
974 }
975 }
976
977 pub fn process_copy_tos(&self) {
979 let mut responses = self.compute_state.copy_to_response_buffer.borrow_mut();
980 for (sink_id, response) in responses.drain(..) {
981 self.send_compute_response(ComputeResponse::CopyToResponse(sink_id, response));
982 }
983 }
984
985 fn send_compute_response(&self, response: ComputeResponse) {
987 let _ = self.response_tx.send(response);
990 }
991
992 pub(crate) fn check_expiration(&self) {
994 let now = mz_ore::now::SYSTEM_TIME();
995 if self.compute_state.replica_expiration.less_than(&now.into()) {
996 let now_datetime = mz_ore::now::to_datetime(now);
997 let expiration_datetime = self
998 .compute_state
999 .replica_expiration
1000 .as_option()
1001 .map(Into::into)
1002 .map(mz_ore::now::to_datetime);
1003
1004 error!(
1007 now,
1008 now_datetime = ?now_datetime,
1009 expiration = ?self.compute_state.replica_expiration.elements(),
1010 expiration_datetime = ?expiration_datetime,
1011 "replica expired"
1012 );
1013
1014 assert!(
1016 !self.compute_state.replica_expiration.less_than(&now.into()),
1017 "replica expired. now: {now} ({now_datetime:?}), expiration: {:?} ({expiration_datetime:?})",
1018 self.compute_state.replica_expiration.elements(),
1019 );
1020 }
1021 }
1022
1023 pub fn determine_dataflow_expiration(
1029 &self,
1030 time_dependence: &TimeDependence,
1031 until: &Antichain<mz_repr::Timestamp>,
1032 ) -> Antichain<mz_repr::Timestamp> {
1033 let iter = self
1038 .compute_state
1039 .replica_expiration
1040 .iter()
1041 .filter_map(|t| time_dependence.apply(*t))
1042 .filter_map(|t| mz_repr::Timestamp::try_step_forward(&t))
1043 .filter(|expiration| !until.less_equal(expiration));
1044 Antichain::from_iter(iter)
1045 }
1046}
1047
1048pub enum PendingPeek {
1053 Index(IndexPeek),
1055 Persist(PersistPeek),
1057 Stash(peek_stash::StashingPeek),
1060}
1061
1062impl PendingPeek {
1063 pub fn as_log_event(&self, installed: bool) -> ComputeEvent {
1065 let peek = self.peek();
1066 let (id, peek_type) = match &peek.target {
1067 PeekTarget::Index { id } => (*id, logging::compute::PeekType::Index),
1068 PeekTarget::Persist { id, .. } => (*id, logging::compute::PeekType::Persist),
1069 };
1070 let uuid = peek.uuid.into_bytes();
1071 ComputeEvent::Peek(PeekEvent {
1072 id,
1073 time: peek.timestamp,
1074 uuid,
1075 peek_type,
1076 installed,
1077 })
1078 }
1079
1080 fn index(peek: Peek, mut trace_bundle: TraceBundle) -> Self {
1081 let empty_frontier = Antichain::new();
1082 let timestamp_frontier = Antichain::from_elem(peek.timestamp);
1083 trace_bundle
1084 .oks_mut()
1085 .set_logical_compaction(timestamp_frontier.borrow());
1086 trace_bundle
1087 .errs_mut()
1088 .set_logical_compaction(timestamp_frontier.borrow());
1089 trace_bundle
1090 .oks_mut()
1091 .set_physical_compaction(empty_frontier.borrow());
1092 trace_bundle
1093 .errs_mut()
1094 .set_physical_compaction(empty_frontier.borrow());
1095
1096 PendingPeek::Index(IndexPeek {
1097 peek,
1098 trace_bundle,
1099 span: tracing::Span::current(),
1100 })
1101 }
1102
1103 fn persist<A: Allocate>(
1104 peek: Peek,
1105 persist_clients: Arc<PersistClientCache>,
1106 metadata: CollectionMetadata,
1107 max_result_size: usize,
1108 timely_worker: &TimelyWorker<A>,
1109 ) -> Self {
1110 let active_worker = {
1111 let chosen_index = usize::cast_from(peek.uuid.hashed()) % timely_worker.peers();
1113 chosen_index == timely_worker.index()
1114 };
1115 let activator = timely_worker.sync_activator_for([].into());
1116 let peek_uuid = peek.uuid;
1117
1118 let (result_tx, result_rx) = oneshot::channel();
1119 let timestamp = peek.timestamp;
1120 let mfp_plan = peek.map_filter_project.clone();
1121 let max_results_needed = peek
1122 .finishing
1123 .limit
1124 .map(|l| usize::cast_from(u64::from(l)))
1125 .unwrap_or(usize::MAX)
1126 + peek.finishing.offset;
1127 let order_by = peek.finishing.order_by.clone();
1128
1129 let literal_constraint = peek
1131 .literal_constraints
1132 .clone()
1133 .map(|rows| rows.into_element());
1134
1135 let task_handle = mz_ore::task::spawn(|| "persist::peek", async move {
1136 let start = Instant::now();
1137 let result = if active_worker {
1138 PersistPeek::do_peek(
1139 &persist_clients,
1140 metadata,
1141 timestamp,
1142 literal_constraint,
1143 mfp_plan,
1144 max_result_size,
1145 max_results_needed,
1146 )
1147 .await
1148 } else {
1149 Ok(vec![])
1150 };
1151 let result = match result {
1152 Ok(rows) => PeekResponse::Rows(RowCollection::new(rows, &order_by)),
1153 Err(e) => PeekResponse::Error(e.to_string()),
1154 };
1155 match result_tx.send((result, start.elapsed())) {
1156 Ok(()) => {}
1157 Err((_result, elapsed)) => {
1158 debug!(duration =? elapsed, "dropping result for cancelled peek {peek_uuid}")
1159 }
1160 }
1161 match activator.activate() {
1162 Ok(()) => {}
1163 Err(_) => {
1164 debug!("unable to wake timely after completed peek {peek_uuid}");
1165 }
1166 }
1167 });
1168 PendingPeek::Persist(PersistPeek {
1169 peek,
1170 _abort_handle: task_handle.abort_on_drop(),
1171 result: result_rx,
1172 span: tracing::Span::current(),
1173 })
1174 }
1175
1176 fn span(&self) -> &tracing::Span {
1177 match self {
1178 PendingPeek::Index(p) => &p.span,
1179 PendingPeek::Persist(p) => &p.span,
1180 PendingPeek::Stash(p) => &p.span,
1181 }
1182 }
1183
1184 pub(crate) fn peek(&self) -> &Peek {
1185 match self {
1186 PendingPeek::Index(p) => &p.peek,
1187 PendingPeek::Persist(p) => &p.peek,
1188 PendingPeek::Stash(p) => &p.peek,
1189 }
1190 }
1191}
1192
1193pub struct PersistPeek {
1198 pub(crate) peek: Peek,
1199 _abort_handle: AbortOnDropHandle<()>,
1202 result: oneshot::Receiver<(PeekResponse, Duration)>,
1204 span: tracing::Span,
1206}
1207
1208impl PersistPeek {
1209 async fn do_peek(
1210 persist_clients: &PersistClientCache,
1211 metadata: CollectionMetadata,
1212 as_of: Timestamp,
1213 literal_constraint: Option<Row>,
1214 mfp_plan: SafeMfpPlan,
1215 max_result_size: usize,
1216 mut limit_remaining: usize,
1217 ) -> Result<Vec<(Row, NonZeroUsize)>, String> {
1218 let client = persist_clients
1219 .open(metadata.persist_location)
1220 .await
1221 .map_err(|e| e.to_string())?;
1222
1223 let mut reader: ReadHandle<SourceData, (), Timestamp, StorageDiff> = client
1224 .open_leased_reader(
1225 metadata.data_shard,
1226 Arc::new(metadata.relation_desc.clone()),
1227 Arc::new(UnitSchema),
1228 Diagnostics::from_purpose("persist::peek"),
1229 USE_CRITICAL_SINCE_SNAPSHOT.get(client.dyncfgs()),
1230 )
1231 .await
1232 .map_err(|e| e.to_string())?;
1233
1234 let mut txns_read = if let Some(txns_id) = metadata.txns_shard {
1241 Some(TxnsCache::open(&client, txns_id, Some(metadata.data_shard)).await)
1242 } else {
1243 None
1244 };
1245
1246 let metrics = client.metrics();
1247
1248 let mut cursor = StatsCursor::new(
1249 &mut reader,
1250 txns_read.as_mut(),
1251 metrics,
1252 &mfp_plan,
1253 &metadata.relation_desc,
1254 Antichain::from_elem(as_of),
1255 )
1256 .await
1257 .map_err(|since| {
1258 format!("attempted to peek at {as_of}, but the since has advanced to {since:?}")
1259 })?;
1260
1261 let mut result = vec![];
1263 let mut datum_vec = DatumVec::new();
1264 let mut row_builder = Row::default();
1265 let arena = RowArena::new();
1266 let mut total_size = 0usize;
1267
1268 let literal_len = match &literal_constraint {
1269 None => 0,
1270 Some(row) => row.iter().count(),
1271 };
1272
1273 'collect: while limit_remaining > 0 {
1274 let Some(batch) = cursor.next().await else {
1275 break;
1276 };
1277 for (data, _, d) in batch {
1278 let row = data.map_err(|e| e.to_string())?;
1279
1280 if let Some(literal) = &literal_constraint {
1281 match row.iter().take(literal_len).cmp(literal.iter()) {
1282 Ordering::Less => continue,
1283 Ordering::Equal => {}
1284 Ordering::Greater => break 'collect,
1285 }
1286 }
1287
1288 let count: usize = d.try_into().map_err(|_| {
1289 tracing::error!(
1290 shard = %metadata.data_shard, diff = d, ?row,
1291 "persist peek encountered negative multiplicities",
1292 );
1293 format!(
1294 "Invalid data in source, \
1295 saw retractions ({}) for row that does not exist: {:?}",
1296 -d, row,
1297 )
1298 })?;
1299 let Some(count) = NonZeroUsize::new(count) else {
1300 continue;
1301 };
1302 let mut datum_local = datum_vec.borrow_with(&row);
1303 let eval_result = mfp_plan
1304 .evaluate_into(&mut datum_local, &arena, &mut row_builder)
1305 .map(|row| row.cloned())
1306 .map_err(|e| e.to_string())?;
1307 if let Some(row) = eval_result {
1308 total_size = total_size
1309 .saturating_add(row.byte_len())
1310 .saturating_add(std::mem::size_of::<NonZeroUsize>());
1311 if total_size > max_result_size {
1312 return Err(format!(
1313 "result exceeds max size of {}",
1314 ByteSize::b(u64::cast_from(max_result_size))
1315 ));
1316 }
1317 result.push((row, count));
1318 limit_remaining = limit_remaining.saturating_sub(count.get());
1319 if limit_remaining == 0 {
1320 break;
1321 }
1322 }
1323 }
1324 }
1325
1326 Ok(result)
1327 }
1328}
1329
1330pub struct IndexPeek {
1332 peek: Peek,
1333 trace_bundle: TraceBundle,
1335 span: tracing::Span,
1337}
1338
1339impl IndexPeek {
1340 fn seek_fulfillment(
1353 &mut self,
1354 upper: &mut Antichain<Timestamp>,
1355 max_result_size: u64,
1356 peek_stash_eligible: bool,
1357 peek_stash_threshold_bytes: usize,
1358 ) -> PeekStatus {
1359 self.trace_bundle.oks_mut().read_upper(upper);
1360 if upper.less_equal(&self.peek.timestamp) {
1361 return PeekStatus::NotReady;
1362 }
1363 self.trace_bundle.errs_mut().read_upper(upper);
1364 if upper.less_equal(&self.peek.timestamp) {
1365 return PeekStatus::NotReady;
1366 }
1367
1368 let read_frontier = self.trace_bundle.compaction_frontier();
1369 if !read_frontier.less_equal(&self.peek.timestamp) {
1370 let error = format!(
1371 "Arrangement compaction frontier ({:?}) is beyond the time of the attempted read ({})",
1372 read_frontier.elements(),
1373 self.peek.timestamp,
1374 );
1375 return PeekStatus::Ready(PeekResponse::Error(error));
1376 }
1377
1378 self.collect_finished_data(
1379 max_result_size,
1380 peek_stash_eligible,
1381 peek_stash_threshold_bytes,
1382 )
1383 }
1384
1385 fn collect_finished_data(
1387 &mut self,
1388 max_result_size: u64,
1389 peek_stash_eligible: bool,
1390 peek_stash_threshold_bytes: usize,
1391 ) -> PeekStatus {
1392 let (mut cursor, storage) = self.trace_bundle.errs_mut().cursor();
1395 while cursor.key_valid(&storage) {
1396 let mut copies = Diff::ZERO;
1397 cursor.map_times(&storage, |time, diff| {
1398 if time.less_equal(&self.peek.timestamp) {
1399 copies += diff;
1400 }
1401 });
1402 if copies.is_negative() {
1403 let error = cursor.key(&storage);
1404 tracing::error!(
1405 target = %self.peek.target.id(), diff = %copies, %error,
1406 "index peek encountered negative multiplicities in error trace",
1407 );
1408 return PeekStatus::Ready(PeekResponse::Error(format!(
1409 "Invalid data in source errors, \
1410 saw retractions ({}) for row that does not exist: {}",
1411 -copies, error,
1412 )));
1413 }
1414 if copies.is_positive() {
1415 return PeekStatus::Ready(PeekResponse::Error(cursor.key(&storage).to_string()));
1416 }
1417 cursor.step_key(&storage);
1418 }
1419
1420 Self::collect_ok_finished_data(
1421 &self.peek,
1422 self.trace_bundle.oks_mut(),
1423 max_result_size,
1424 peek_stash_eligible,
1425 peek_stash_threshold_bytes,
1426 )
1427 }
1428
1429 fn collect_ok_finished_data<Tr>(
1431 peek: &Peek<Timestamp>,
1432 oks_handle: &mut Tr,
1433 max_result_size: u64,
1434 peek_stash_eligible: bool,
1435 peek_stash_threshold_bytes: usize,
1436 ) -> PeekStatus
1437 where
1438 for<'a> Tr: TraceReader<
1439 Key<'a>: ToDatumIter + Eq,
1440 KeyOwn = Row,
1441 Val<'a>: ToDatumIter,
1442 TimeGat<'a>: PartialOrder<mz_repr::Timestamp>,
1443 DiffGat<'a> = &'a Diff,
1444 >,
1445 {
1446 let max_result_size = usize::cast_from(max_result_size);
1447 let count_byte_size = size_of::<NonZeroUsize>();
1448
1449 let mut peek_iterator = peek_result_iterator::PeekResultIterator::new(
1452 peek.target.id().clone(),
1453 peek.map_filter_project.clone(),
1454 peek.timestamp,
1455 peek.literal_constraints.clone().as_deref_mut(),
1456 oks_handle,
1457 );
1458
1459 let mut results = Vec::new();
1461 let mut total_size: usize = 0;
1462
1463 let max_results = peek.finishing.num_rows_needed();
1469
1470 let mut l_datum_vec = DatumVec::new();
1471 let mut r_datum_vec = DatumVec::new();
1472
1473 while let Some(row) = peek_iterator.next() {
1474 let row = match row {
1475 Ok(row) => row,
1476 Err(err) => return PeekStatus::Ready(PeekResponse::Error(err)),
1477 };
1478 let (row, copies) = row;
1479 let copies: NonZeroUsize = NonZeroUsize::try_from(copies).expect("fits into usize");
1480
1481 total_size = total_size
1482 .saturating_add(row.byte_len())
1483 .saturating_add(count_byte_size);
1484 if peek_stash_eligible && total_size > peek_stash_threshold_bytes {
1485 return PeekStatus::UsePeekStash;
1486 }
1487 if total_size > max_result_size {
1488 return PeekStatus::Ready(PeekResponse::Error(format!(
1489 "result exceeds max size of {}",
1490 ByteSize::b(u64::cast_from(max_result_size))
1491 )));
1492 }
1493
1494 results.push((row, copies));
1495
1496 if let Some(max_results) = max_results {
1499 if results.len() >= 2 * max_results {
1503 if peek.finishing.order_by.is_empty() {
1504 results.truncate(max_results);
1505 return PeekStatus::Ready(PeekResponse::Rows(RowCollection::new(
1506 results,
1507 &peek.finishing.order_by,
1508 )));
1509 } else {
1510 results.sort_by(|left, right| {
1519 let left_datums = l_datum_vec.borrow_with(&left.0);
1520 let right_datums = r_datum_vec.borrow_with(&right.0);
1521 mz_expr::compare_columns(
1522 &peek.finishing.order_by,
1523 &left_datums,
1524 &right_datums,
1525 || left.0.cmp(&right.0),
1526 )
1527 });
1528 let dropped = results.drain(max_results..);
1529 let dropped_size =
1530 dropped.into_iter().fold(0, |acc: usize, (row, _count)| {
1531 acc.saturating_add(row.byte_len().saturating_add(count_byte_size))
1532 });
1533 total_size = total_size.saturating_sub(dropped_size);
1534 }
1535 }
1536 }
1537 }
1538
1539 PeekStatus::Ready(PeekResponse::Rows(RowCollection::new(
1540 results,
1541 &peek.finishing.order_by,
1542 )))
1543 }
1544}
1545
1546enum PeekStatus {
1549 NotReady,
1552 UsePeekStash,
1555 Ready(PeekResponse),
1557}
1558
1559#[derive(Debug)]
1561struct ReportedFrontiers {
1562 write_frontier: ReportedFrontier,
1564 input_frontier: ReportedFrontier,
1566 output_frontier: ReportedFrontier,
1568}
1569
1570impl ReportedFrontiers {
1571 fn new() -> Self {
1573 Self {
1574 write_frontier: ReportedFrontier::new(),
1575 input_frontier: ReportedFrontier::new(),
1576 output_frontier: ReportedFrontier::new(),
1577 }
1578 }
1579}
1580
1581#[derive(Clone, Debug)]
1583pub enum ReportedFrontier {
1584 Reported(Antichain<Timestamp>),
1586 NotReported {
1588 lower: Antichain<Timestamp>,
1590 },
1591}
1592
1593impl ReportedFrontier {
1594 pub fn new() -> Self {
1596 let lower = Antichain::from_elem(timely::progress::Timestamp::minimum());
1597 Self::NotReported { lower }
1598 }
1599
1600 pub fn is_empty(&self) -> bool {
1602 match self {
1603 Self::Reported(frontier) => frontier.is_empty(),
1604 Self::NotReported { .. } => false,
1605 }
1606 }
1607
1608 fn allows_reporting(&self, other: &Antichain<Timestamp>) -> bool {
1614 match self {
1615 Self::Reported(frontier) => PartialOrder::less_than(frontier, other),
1616 Self::NotReported { lower } => PartialOrder::less_equal(lower, other),
1617 }
1618 }
1619}
1620
1621pub struct CollectionState {
1623 reported_frontiers: ReportedFrontiers,
1625 dataflow_index: Rc<usize>,
1631 pub is_subscribe_or_copy: bool,
1637 as_of: Antichain<Timestamp>,
1641
1642 pub sink_token: Option<SinkToken>,
1647 pub sink_write_frontier: Option<Rc<RefCell<Antichain<Timestamp>>>>,
1651 pub input_probes: BTreeMap<GlobalId, probe::Handle<Timestamp>>,
1653 pub compute_probe: Option<probe::Handle<Timestamp>>,
1658 logging: Option<CollectionLogging>,
1660 metrics: CollectionMetrics,
1662 read_only_tx: watch::Sender<bool>,
1674 pub read_only_rx: watch::Receiver<bool>,
1676}
1677
1678impl CollectionState {
1679 fn new(
1680 dataflow_index: Rc<usize>,
1681 is_subscribe_or_copy: bool,
1682 as_of: Antichain<Timestamp>,
1683 metrics: CollectionMetrics,
1684 ) -> Self {
1685 let (read_only_tx, read_only_rx) = watch::channel(true);
1688
1689 Self {
1690 reported_frontiers: ReportedFrontiers::new(),
1691 dataflow_index,
1692 is_subscribe_or_copy,
1693 as_of,
1694 sink_token: None,
1695 sink_write_frontier: None,
1696 input_probes: Default::default(),
1697 compute_probe: None,
1698 logging: None,
1699 metrics,
1700 read_only_tx,
1701 read_only_rx,
1702 }
1703 }
1704
1705 fn reported_frontiers(&self) -> &ReportedFrontiers {
1707 &self.reported_frontiers
1708 }
1709
1710 pub fn reset_reported_frontiers(&mut self, frontier: ReportedFrontier) {
1712 self.reported_frontiers.write_frontier = frontier.clone();
1713 self.reported_frontiers.input_frontier = frontier.clone();
1714 self.reported_frontiers.output_frontier = frontier;
1715 }
1716
1717 fn set_reported_write_frontier(&mut self, frontier: ReportedFrontier) {
1719 if let Some(logging) = &mut self.logging {
1720 let time = match &frontier {
1721 ReportedFrontier::Reported(frontier) => frontier.get(0).copied(),
1722 ReportedFrontier::NotReported { .. } => Some(Timestamp::MIN),
1723 };
1724 logging.set_frontier(time);
1725 }
1726
1727 self.reported_frontiers.write_frontier = frontier;
1728 }
1729
1730 fn set_reported_input_frontier(&mut self, frontier: ReportedFrontier) {
1732 if let Some(logging) = &mut self.logging {
1734 for (id, probe) in &self.input_probes {
1735 let new_time = probe.with_frontier(|frontier| frontier.as_option().copied());
1736 logging.set_import_frontier(*id, new_time);
1737 }
1738 }
1739
1740 self.reported_frontiers.input_frontier = frontier;
1741 }
1742
1743 fn set_reported_output_frontier(&mut self, frontier: ReportedFrontier) {
1745 let already_hydrated = self.hydrated();
1746
1747 self.reported_frontiers.output_frontier = frontier;
1748
1749 if !already_hydrated && self.hydrated() {
1750 if let Some(logging) = &mut self.logging {
1751 logging.set_hydrated();
1752 }
1753 self.metrics.record_collection_hydrated();
1754 }
1755 }
1756
1757 fn hydrated(&self) -> bool {
1759 match &self.reported_frontiers.output_frontier {
1760 ReportedFrontier::Reported(frontier) => PartialOrder::less_than(&self.as_of, frontier),
1761 ReportedFrontier::NotReported { .. } => false,
1762 }
1763 }
1764
1765 fn allow_writes(&self) {
1767 info!(
1768 dataflow_index = *self.dataflow_index,
1769 export = ?self.logging.as_ref().map(|l| l.export_id()),
1770 "allowing writes for dataflow",
1771 );
1772 let _ = self.read_only_tx.send(false);
1773 }
1774}