1use std::any::Any;
9use std::cell::RefCell;
10use std::cmp::Ordering;
11use std::collections::{BTreeMap, BTreeSet};
12use std::num::NonZeroUsize;
13use std::rc::Rc;
14use std::sync::Arc;
15use std::time::{Duration, Instant};
16
17use bytesize::ByteSize;
18use differential_dataflow::Hashable;
19use differential_dataflow::lattice::Lattice;
20use differential_dataflow::trace::implementations::BatchContainer;
21use differential_dataflow::trace::{Cursor, TraceReader};
22use mz_compute_client::logging::LoggingConfig;
23use mz_compute_client::protocol::command::{
24 ComputeCommand, ComputeParameters, InstanceConfig, Peek, PeekTarget,
25};
26use mz_compute_client::protocol::history::ComputeCommandHistory;
27use mz_compute_client::protocol::response::{
28 ComputeResponse, CopyToResponse, FrontiersResponse, PeekResponse, SubscribeResponse,
29};
30use mz_compute_types::dataflows::DataflowDescription;
31use mz_compute_types::dyncfgs::{
32 ENABLE_PEEK_RESPONSE_STASH, PEEK_RESPONSE_STASH_BATCH_MAX_RUNS,
33 PEEK_RESPONSE_STASH_THRESHOLD_BYTES, PEEK_STASH_BATCH_SIZE, PEEK_STASH_NUM_BATCHES,
34};
35use mz_compute_types::plan::render_plan::RenderPlan;
36use mz_dyncfg::ConfigSet;
37use mz_expr::row::RowCollection;
38use mz_expr::{RowComparator, SafeMfpPlan};
39use mz_ore::cast::{CastFrom, CastLossy};
40use mz_ore::collections::CollectionExt;
41use mz_ore::metrics::{MetricsRegistry, UIntGauge};
42use mz_ore::now::EpochMillis;
43use mz_ore::soft_panic_or_log;
44use mz_ore::task::AbortOnDropHandle;
45use mz_ore::tracing::{OpenTelemetryContext, TracingHandle};
46use mz_persist_client::Diagnostics;
47use mz_persist_client::cache::PersistClientCache;
48use mz_persist_client::cfg::USE_CRITICAL_SINCE_SNAPSHOT;
49use mz_persist_client::read::ReadHandle;
50use mz_persist_types::PersistLocation;
51use mz_persist_types::codec_impls::UnitSchema;
52use mz_repr::fixed_length::ToDatumIter;
53use mz_repr::{DatumVec, Diff, GlobalId, Row, RowArena, Timestamp};
54use mz_storage_operators::stats::StatsCursor;
55use mz_storage_types::StorageDiff;
56use mz_storage_types::controller::CollectionMetadata;
57use mz_storage_types::dyncfgs::ORE_OVERFLOWING_BEHAVIOR;
58use mz_storage_types::sources::SourceData;
59use mz_storage_types::time_dependence::TimeDependence;
60use mz_txn_wal::operator::TxnsContext;
61use mz_txn_wal::txn_cache::TxnsCache;
62use timely::dataflow::operators::probe;
63use timely::order::PartialOrder;
64use timely::progress::frontier::Antichain;
65use timely::worker::Worker as TimelyWorker;
66use tokio::sync::{oneshot, watch};
67use tracing::{Level, debug, error, info, span, trace, warn};
68use uuid::Uuid;
69
70use crate::arrangement::manager::{TraceBundle, TraceManager};
71use crate::logging;
72use crate::logging::compute::{CollectionLogging, ComputeEvent, PeekEvent};
73use crate::logging::initialize::LoggingTraces;
74use crate::metrics::{CollectionMetrics, WorkerMetrics};
75use crate::render::{LinearJoinSpec, StartSignal};
76use crate::server::{ComputeInstanceContext, ResponseSender};
77
78mod peek_result_iterator;
79mod peek_stash;
80
81pub struct ComputeState {
86 pub collections: BTreeMap<GlobalId, CollectionState>,
95 pub traces: TraceManager,
97 pub subscribe_response_buffer: Rc<RefCell<Vec<(GlobalId, SubscribeResponse)>>>,
102 pub copy_to_response_buffer: Rc<RefCell<Vec<(GlobalId, CopyToResponse)>>>,
107 pub pending_peeks: BTreeMap<Uuid, PendingPeek>,
109 pub peek_stash_persist_location: Option<PersistLocation>,
111 pub compute_logger: Option<logging::compute::Logger>,
113 pub persist_clients: Arc<PersistClientCache>,
116 pub txns_ctx: TxnsContext,
118 pub command_history: ComputeCommandHistory<UIntGauge>,
120 max_result_size: u64,
122 pub linear_join_spec: LinearJoinSpec,
124 pub metrics: WorkerMetrics,
126 tracing_handle: Arc<TracingHandle>,
128 pub context: ComputeInstanceContext,
130 pub worker_config: Rc<ConfigSet>,
144
145 pub metrics_registry: MetricsRegistry,
147
148 pub workers_per_process: usize,
150
151 suspended_collections: BTreeMap<GlobalId, Rc<dyn Any>>,
157
158 pub server_maintenance_interval: Duration,
161
162 pub init_system_time: EpochMillis,
166
167 pub replica_expiration: Antichain<Timestamp>,
171
172 pub storage_log_reader: Option<crate::server::StorageTimelyLogReader>,
174}
175
176impl ComputeState {
177 pub fn new(
179 persist_clients: Arc<PersistClientCache>,
180 txns_ctx: TxnsContext,
181 metrics: WorkerMetrics,
182 tracing_handle: Arc<TracingHandle>,
183 context: ComputeInstanceContext,
184 metrics_registry: MetricsRegistry,
185 workers_per_process: usize,
186 storage_log_reader: Option<crate::server::StorageTimelyLogReader>,
187 ) -> Self {
188 let traces = TraceManager::new(metrics.clone());
189 let command_history = ComputeCommandHistory::new(metrics.for_history());
190
191 Self {
192 collections: Default::default(),
193 traces,
194 subscribe_response_buffer: Default::default(),
195 copy_to_response_buffer: Default::default(),
196 pending_peeks: Default::default(),
197 peek_stash_persist_location: None,
198 compute_logger: None,
199 persist_clients,
200 txns_ctx,
201 command_history,
202 max_result_size: u64::MAX,
203 linear_join_spec: Default::default(),
204 metrics,
205 tracing_handle,
206 context,
207 worker_config: mz_dyncfgs::all_dyncfgs().into(),
208 metrics_registry,
209 workers_per_process,
210 suspended_collections: Default::default(),
211 server_maintenance_interval: Duration::ZERO,
212 init_system_time: mz_ore::now::SYSTEM_TIME(),
213 replica_expiration: Antichain::default(),
214 storage_log_reader,
215 }
216 }
217
218 pub fn expect_collection_mut(&mut self, id: GlobalId) -> &mut CollectionState {
222 self.collections
223 .get_mut(&id)
224 .expect("collection must exist")
225 }
226
227 pub fn input_probe_for(
233 &mut self,
234 input_id: GlobalId,
235 collection_ids: impl Iterator<Item = GlobalId>,
236 ) -> probe::Handle<Timestamp> {
237 let probe = probe::Handle::default();
238 for id in collection_ids {
239 if let Some(collection) = self.collections.get_mut(&id) {
240 collection.input_probes.insert(input_id, probe.clone());
241 }
242 }
243 probe
244 }
245
246 fn apply_worker_config(&mut self) {
248 use mz_compute_types::dyncfgs::*;
249
250 let config = &self.worker_config;
251
252 self.linear_join_spec = LinearJoinSpec::from_config(config);
253
254 if ENABLE_LGALLOC.get(config) {
255 if let Some(path) = &self.context.scratch_directory {
256 let clear_bytes = LGALLOC_SLOW_CLEAR_BYTES.get(config);
257 let eager_return = ENABLE_LGALLOC_EAGER_RECLAMATION.get(config);
258 let file_growth_dampener = LGALLOC_FILE_GROWTH_DAMPENER.get(config);
259 let interval = LGALLOC_BACKGROUND_INTERVAL.get(config);
260 let local_buffer_bytes = LGALLOC_LOCAL_BUFFER_BYTES.get(config);
261 info!(
262 ?path,
263 backgrund_interval=?interval,
264 clear_bytes,
265 eager_return,
266 file_growth_dampener,
267 local_buffer_bytes,
268 "enabling lgalloc"
269 );
270 let background_worker_config = lgalloc::BackgroundWorkerConfig {
271 interval,
272 clear_bytes,
273 };
274 lgalloc::lgalloc_set_config(
275 lgalloc::LgAlloc::new()
276 .enable()
277 .with_path(path.clone())
278 .with_background_config(background_worker_config)
279 .eager_return(eager_return)
280 .file_growth_dampener(file_growth_dampener)
281 .local_buffer_bytes(local_buffer_bytes),
282 );
283 } else {
284 debug!("not enabling lgalloc, scratch directory not specified");
285 }
286 } else {
287 info!("disabling lgalloc");
288 lgalloc::lgalloc_set_config(lgalloc::LgAlloc::new().disable());
289 }
290
291 #[cfg(unix)]
297 if let Some(path) = &self.context.scratch_directory {
298 mz_ore::pager::set_scratch_dir(path.clone());
299 mz_ore::pager::set_backend(mz_ore::pager::Backend::File);
300 } else {
301 mz_ore::pager::set_backend(mz_ore::pager::Backend::Swap);
302 }
303
304 crate::memory_limiter::apply_limiter_config(config);
305
306 mz_ore::region::ENABLE_LGALLOC_REGION.store(
307 ENABLE_COLUMNATION_LGALLOC.get(config),
308 std::sync::atomic::Ordering::Relaxed,
309 );
310
311 {
321 use mz_ore::pager::Backend;
322 use mz_timely_util::column_pager::apply_tiered_config;
323
324 let enabled = ENABLE_COLUMN_PAGED_BATCHER_SPILL.get(config);
325
326 const MIB: usize = 1024 * 1024;
331 const DEFAULT_MEM_LIMIT: usize = 4 * 1024 * MIB;
332 let mem_limit = crate::memory_limiter::get_memory_limit().unwrap_or(DEFAULT_MEM_LIMIT);
333 let fraction = COLUMN_PAGED_BATCHER_BUDGET_FRACTION.get(config).max(0.0);
334 let total = usize::cast_lossy(f64::cast_lossy(mem_limit) * fraction).max(128 * MIB);
335
336 let backend = if self.context.scratch_directory.is_some() {
337 Backend::File
338 } else {
339 Backend::Swap
340 };
341
342 debug!(
343 enabled,
344 ?backend,
345 fraction,
346 mem_limit,
347 budget_bytes = total,
348 "column-paged batcher: applying tiered config",
349 );
350 apply_tiered_config(enabled, total, backend, None);
351 }
352
353 self.server_maintenance_interval = COMPUTE_SERVER_MAINTENANCE_INTERVAL.get(config);
356
357 let overflowing_behavior = ORE_OVERFLOWING_BEHAVIOR.get(config);
358 match overflowing_behavior.parse() {
359 Ok(behavior) => mz_ore::overflowing::set_behavior(behavior),
360 Err(err) => {
361 error!(
362 err,
363 overflowing_behavior, "Invalid value for ore_overflowing_behavior"
364 );
365 }
366 }
367 }
368
369 pub fn apply_expiration_offset(&mut self, offset: Duration) {
375 if self.replica_expiration.is_empty() {
376 let offset: EpochMillis = offset
377 .as_millis()
378 .try_into()
379 .expect("duration must fit within u64");
380 let replica_expiration_millis = self.init_system_time + offset;
381 let replica_expiration = Timestamp::from(replica_expiration_millis);
382
383 info!(
384 offset = %offset,
385 replica_expiration_millis = %replica_expiration_millis,
386 replica_expiration_utc = %mz_ore::now::to_datetime(replica_expiration_millis),
387 "setting replica expiration",
388 );
389 self.replica_expiration = Antichain::from_elem(replica_expiration);
390
391 self.metrics
393 .replica_expiration_timestamp_seconds
394 .set(replica_expiration.into());
395 }
396 }
397
398 pub fn dataflow_max_inflight_bytes(&self) -> Option<usize> {
401 use mz_compute_types::dyncfgs::{
402 DATAFLOW_MAX_INFLIGHT_BYTES, DATAFLOW_MAX_INFLIGHT_BYTES_CC,
403 };
404
405 if self.persist_clients.cfg.is_cc_active {
406 DATAFLOW_MAX_INFLIGHT_BYTES_CC.get(&self.worker_config)
407 } else {
408 DATAFLOW_MAX_INFLIGHT_BYTES.get(&self.worker_config)
409 }
410 }
411}
412
413pub(crate) struct ActiveComputeState<'a> {
415 pub timely_worker: &'a mut TimelyWorker,
417 pub compute_state: &'a mut ComputeState,
419 pub response_tx: &'a mut ResponseSender,
421}
422
423pub struct SinkToken(#[allow(dead_code)] Box<dyn Any>);
425
426impl SinkToken {
427 pub fn new(t: Box<dyn Any>) -> Self {
429 Self(t)
430 }
431}
432
433impl<'a> ActiveComputeState<'a> {
434 #[mz_ore::instrument(level = "debug")]
436 pub fn handle_compute_command(&mut self, cmd: ComputeCommand) {
437 use ComputeCommand::*;
438
439 self.compute_state.command_history.push(cmd.clone());
440
441 let timer = self
443 .compute_state
444 .metrics
445 .handle_command_duration_seconds
446 .for_command(&cmd)
447 .start_timer();
448
449 match cmd {
450 Hello { .. } => panic!("Hello must be captured before"),
451 CreateInstance(instance_config) => self.handle_create_instance(*instance_config),
452 InitializationComplete => (),
453 UpdateConfiguration(params) => self.handle_update_configuration(*params),
454 CreateDataflow(dataflow) => self.handle_create_dataflow(*dataflow),
455 Schedule(id) => self.handle_schedule(id),
456 AllowCompaction { id, frontier } => self.handle_allow_compaction(id, frontier),
457 Peek(peek) => {
458 peek.otel_ctx.attach_as_parent();
459 self.handle_peek(*peek)
460 }
461 CancelPeek { uuid } => self.handle_cancel_peek(uuid),
462 AllowWrites(id) => {
463 self.handle_allow_writes(id);
464 }
465 }
466
467 timer.observe_duration();
468 }
469
470 fn handle_create_instance(&mut self, config: InstanceConfig) {
471 self.compute_state.apply_worker_config();
473 if let Some(offset) = config.expiration_offset {
474 self.compute_state.apply_expiration_offset(offset);
475 }
476
477 let storage_log_reader = self.compute_state.storage_log_reader.take();
478 self.initialize_logging(config.logging, storage_log_reader);
479
480 self.compute_state.peek_stash_persist_location = Some(config.peek_stash_persist_location);
481 }
482
483 fn handle_update_configuration(&mut self, params: ComputeParameters) {
484 debug!("Applying configuration update: {params:?}");
485
486 let ComputeParameters {
487 workload_class,
488 max_result_size,
489 tracing,
490 grpc_client: _grpc_client,
491 dyncfg_updates,
492 } = params;
493
494 if let Some(v) = workload_class {
495 self.compute_state.metrics.set_workload_class(v);
496 }
497 if let Some(v) = max_result_size {
498 self.compute_state.max_result_size = v;
499 }
500
501 tracing.apply(self.compute_state.tracing_handle.as_ref());
502
503 dyncfg_updates.apply(&self.compute_state.worker_config);
504 self.compute_state
505 .persist_clients
506 .cfg()
507 .apply_from(&dyncfg_updates);
508
509 mz_metrics::update_dyncfg(&dyncfg_updates);
513
514 self.compute_state.apply_worker_config();
515 }
516
517 fn handle_create_dataflow(
518 &mut self,
519 dataflow: DataflowDescription<RenderPlan, CollectionMetadata>,
520 ) {
521 let dataflow_index = Rc::new(self.timely_worker.next_dataflow_index());
522 let as_of = dataflow.as_of.clone().unwrap();
523
524 let dataflow_expiration = dataflow
525 .time_dependence
526 .as_ref()
527 .map(|time_dependence| {
528 self.determine_dataflow_expiration(time_dependence, &dataflow.until)
529 })
530 .unwrap_or_default();
531
532 let until = dataflow.until.meet(&dataflow_expiration);
534
535 if dataflow.is_transient() {
536 debug!(
537 name = %dataflow.debug_name,
538 import_ids = %dataflow.display_import_ids(),
539 export_ids = %dataflow.display_export_ids(),
540 as_of = ?as_of.elements(),
541 time_dependence = ?dataflow.time_dependence,
542 expiration = ?dataflow_expiration.elements(),
543 expiration_datetime = ?dataflow_expiration
544 .as_option()
545 .map(|t| mz_ore::now::to_datetime(t.into())),
546 plan_until = ?dataflow.until.elements(),
547 until = ?until.elements(),
548 "creating dataflow",
549 );
550 } else {
551 info!(
552 name = %dataflow.debug_name,
553 import_ids = %dataflow.display_import_ids(),
554 export_ids = %dataflow.display_export_ids(),
555 as_of = ?as_of.elements(),
556 time_dependence = ?dataflow.time_dependence,
557 expiration = ?dataflow_expiration.elements(),
558 expiration_datetime = ?dataflow_expiration
559 .as_option()
560 .map(|t| mz_ore::now::to_datetime(t.into())),
561 plan_until = ?dataflow.until.elements(),
562 until = ?until.elements(),
563 "creating dataflow",
564 );
565 };
566
567 let subscribe_copy_ids: BTreeSet<_> = dataflow
568 .subscribe_ids()
569 .chain(dataflow.copy_to_ids())
570 .collect();
571
572 for object_id in dataflow.export_ids() {
574 let is_subscribe_or_copy = subscribe_copy_ids.contains(&object_id);
575 let metrics = self.compute_state.metrics.for_collection(object_id);
576 let mut collection = CollectionState::new(
577 Rc::clone(&dataflow_index),
578 is_subscribe_or_copy,
579 as_of.clone(),
580 metrics,
581 );
582
583 if let Some(logger) = self.compute_state.compute_logger.clone() {
584 let logging = CollectionLogging::new(
585 object_id,
586 logger,
587 *dataflow_index,
588 dataflow.import_ids(),
589 );
590 collection.logging = Some(logging);
591 }
592
593 collection.reset_reported_frontiers(ReportedFrontier::NotReported {
594 lower: as_of.clone(),
595 });
596
597 let existing = self.compute_state.collections.insert(object_id, collection);
598 if existing.is_some() {
599 error!(
600 id = ?object_id,
601 "existing collection for newly created dataflow",
602 );
603 }
604 }
605
606 let (start_signal, suspension_token) = StartSignal::new();
607 for id in dataflow.export_ids() {
608 self.compute_state
609 .suspended_collections
610 .insert(id, Rc::clone(&suspension_token));
611 }
612
613 crate::render::build_compute_dataflow(
614 self.timely_worker,
615 self.compute_state,
616 dataflow,
617 start_signal,
618 until,
619 dataflow_expiration,
620 );
621 }
622
623 fn handle_schedule(&mut self, id: GlobalId) {
624 let suspension_token = self.compute_state.suspended_collections.remove(&id);
630 drop(suspension_token);
631 }
632
633 fn handle_allow_compaction(&mut self, id: GlobalId, frontier: Antichain<Timestamp>) {
634 if frontier.is_empty() {
635 self.drop_collection(id);
637 } else {
638 self.compute_state
639 .traces
640 .allow_compaction(id, frontier.borrow());
641 }
642 }
643
644 #[mz_ore::instrument(level = "debug")]
645 fn handle_peek(&mut self, peek: Peek) {
646 let pending = match &peek.target {
647 PeekTarget::Index { id } => {
648 let trace_bundle = self.compute_state.traces.get(id).unwrap().clone();
650 PendingPeek::index(peek, trace_bundle)
651 }
652 PeekTarget::Persist { metadata, .. } => {
653 let metadata = metadata.clone();
654 PendingPeek::persist(
655 peek,
656 Arc::clone(&self.compute_state.persist_clients),
657 metadata,
658 usize::cast_from(self.compute_state.max_result_size),
659 self.timely_worker,
660 )
661 }
662 };
663
664 if let Some(logger) = self.compute_state.compute_logger.as_mut() {
666 logger.log(&pending.as_log_event(true));
667 }
668
669 self.process_peek(&mut Antichain::new(), pending);
670 }
671
672 fn handle_cancel_peek(&mut self, uuid: Uuid) {
673 if let Some(peek) = self.compute_state.pending_peeks.remove(&uuid) {
674 self.send_peek_response(peek, PeekResponse::Canceled);
675 }
676 }
677
678 fn handle_allow_writes(&mut self, id: GlobalId) {
679 self.compute_state.persist_clients.cfg().enable_compaction();
683
684 if let Some(collection) = self.compute_state.collections.get_mut(&id) {
685 collection.allow_writes();
686 } else {
687 soft_panic_or_log!("allow writes for unknown collection {id}");
688 }
689 }
690
691 fn drop_collection(&mut self, id: GlobalId) {
693 let collection = self
694 .compute_state
695 .collections
696 .remove(&id)
697 .expect("dropped untracked collection");
698
699 self.compute_state.traces.remove(&id);
701 self.compute_state.suspended_collections.remove(&id);
703
704 if let Ok(index) = Rc::try_unwrap(collection.dataflow_index) {
706 self.timely_worker.drop_dataflow(index);
707 }
708
709 if !collection.is_subscribe_or_copy {
714 let reported = collection.reported_frontiers;
715 let write_frontier = (!reported.write_frontier.is_empty()).then(Antichain::new);
716 let input_frontier = (!reported.input_frontier.is_empty()).then(Antichain::new);
717 let output_frontier = (!reported.output_frontier.is_empty()).then(Antichain::new);
718
719 let frontiers = FrontiersResponse {
720 write_frontier,
721 input_frontier,
722 output_frontier,
723 };
724 if frontiers.has_updates() {
725 self.send_compute_response(ComputeResponse::Frontiers(id, frontiers));
726 }
727 }
728 }
729
730 pub fn initialize_logging(
732 &mut self,
733 config: LoggingConfig,
734 storage_log_reader: Option<crate::server::StorageTimelyLogReader>,
735 ) {
736 if self.compute_state.compute_logger.is_some() {
737 panic!("dataflow server has already initialized logging");
738 }
739
740 let LoggingTraces {
741 traces,
742 dataflow_index,
743 compute_logger: logger,
744 } = logging::initialize(
745 self.timely_worker,
746 &config,
747 self.compute_state.metrics_registry.clone(),
748 Rc::clone(&self.compute_state.worker_config),
749 self.compute_state.workers_per_process,
750 storage_log_reader,
751 );
752
753 let dataflow_index = Rc::new(dataflow_index);
754 let mut log_index_ids = config.index_logs;
755 for (log, trace) in traces {
756 let id = log_index_ids
758 .remove(&log)
759 .expect("`logging::initialize` does not invent logs");
760 self.compute_state.traces.set(id, trace);
761
762 let is_subscribe_or_copy = false;
764 let as_of = Antichain::from_elem(Timestamp::MIN);
765 let metrics = self.compute_state.metrics.for_collection(id);
766 let mut collection = CollectionState::new(
767 Rc::clone(&dataflow_index),
768 is_subscribe_or_copy,
769 as_of,
770 metrics,
771 );
772
773 let logging =
774 CollectionLogging::new(id, logger.clone(), *dataflow_index, std::iter::empty());
775 collection.logging = Some(logging);
776
777 let existing = self.compute_state.collections.insert(id, collection);
778 if existing.is_some() {
779 error!(
780 id = ?id,
781 "existing collection for newly initialized logging export",
782 );
783 }
784 }
785
786 assert!(
788 log_index_ids.is_empty(),
789 "failed to create requested logging indexes: {log_index_ids:?}",
790 );
791
792 self.compute_state.compute_logger = Some(logger);
793 }
794
795 pub fn report_frontiers(&mut self) {
797 let mut responses = Vec::new();
798
799 let mut new_frontier = Antichain::new();
801
802 for (&id, collection) in self.compute_state.collections.iter_mut() {
803 if collection.is_subscribe_or_copy {
806 continue;
807 }
808
809 let reported = collection.reported_frontiers();
810
811 new_frontier.clear();
813 if let Some(traces) = self.compute_state.traces.get_mut(&id) {
814 assert!(
815 collection.sink_write_frontier.is_none(),
816 "collection {id} has multiple frontiers"
817 );
818 traces.oks_mut().read_upper(&mut new_frontier);
819 } else if let Some(frontier) = &collection.sink_write_frontier {
820 new_frontier.clone_from(&frontier.borrow());
821 } else {
822 error!(id = ?id, "collection without write frontier");
823 continue;
824 }
825 let new_write_frontier = reported
826 .write_frontier
827 .allows_reporting(&new_frontier)
828 .then(|| new_frontier.clone());
829
830 if let Some(probe) = &collection.compute_probe {
843 if *collection.read_only_rx.borrow() {
844 new_frontier.clear();
845 }
846 probe.with_frontier(|frontier| new_frontier.extend(frontier.iter().copied()));
847 }
848 let new_output_frontier = reported
849 .output_frontier
850 .allows_reporting(&new_frontier)
851 .then(|| new_frontier.clone());
852
853 new_frontier.clear();
855 for probe in collection.input_probes.values() {
856 probe.with_frontier(|frontier| new_frontier.extend(frontier.iter().copied()));
857 }
858 let new_input_frontier = reported
859 .input_frontier
860 .allows_reporting(&new_frontier)
861 .then(|| new_frontier.clone());
862
863 if let Some(frontier) = &new_write_frontier {
864 collection
865 .set_reported_write_frontier(ReportedFrontier::Reported(frontier.clone()));
866 }
867 if let Some(frontier) = &new_input_frontier {
868 collection
869 .set_reported_input_frontier(ReportedFrontier::Reported(frontier.clone()));
870 }
871 if let Some(frontier) = &new_output_frontier {
872 collection
873 .set_reported_output_frontier(ReportedFrontier::Reported(frontier.clone()));
874 }
875
876 let response = FrontiersResponse {
877 write_frontier: new_write_frontier,
878 input_frontier: new_input_frontier,
879 output_frontier: new_output_frontier,
880 };
881 if response.has_updates() {
882 responses.push((id, response));
883 }
884 }
885
886 for (id, frontiers) in responses {
887 self.send_compute_response(ComputeResponse::Frontiers(id, frontiers));
888 }
889 }
890
891 pub(crate) fn report_metrics(&self) {
893 if let Some(expiration) = self.compute_state.replica_expiration.as_option() {
894 let now = Duration::from_millis(mz_ore::now::SYSTEM_TIME()).as_secs_f64();
895 let expiration = Duration::from_millis(<u64>::from(expiration)).as_secs_f64();
896 let remaining = expiration - now;
897 self.compute_state
898 .metrics
899 .replica_expiration_remaining_seconds
900 .set(remaining)
901 }
902 }
903
904 fn process_peek(&mut self, upper: &mut Antichain<Timestamp>, mut peek: PendingPeek) {
906 let response = match &mut peek {
907 PendingPeek::Index(peek) => {
908 let start = Instant::now();
909
910 let peek_stash_eligible = peek
911 .peek
912 .finishing
913 .is_streamable(peek.peek.result_desc.arity());
914
915 let peek_stash_enabled = {
916 let enabled = ENABLE_PEEK_RESPONSE_STASH.get(&self.compute_state.worker_config);
917 let peek_persist_stash_available =
918 self.compute_state.peek_stash_persist_location.is_some();
919 if !peek_persist_stash_available && enabled {
920 error!("missing peek_stash_persist_location but peek stash is enabled");
921 }
922 enabled && peek_persist_stash_available
923 };
924
925 let peek_stash_threshold_bytes =
926 PEEK_RESPONSE_STASH_THRESHOLD_BYTES.get(&self.compute_state.worker_config);
927
928 let metrics = IndexPeekMetrics {
929 seek_fulfillment_seconds: &self
930 .compute_state
931 .metrics
932 .index_peek_seek_fulfillment_seconds,
933 frontier_check_seconds: &self
934 .compute_state
935 .metrics
936 .index_peek_frontier_check_seconds,
937 error_scan_seconds: &self.compute_state.metrics.index_peek_error_scan_seconds,
938 cursor_setup_seconds: &self
939 .compute_state
940 .metrics
941 .index_peek_cursor_setup_seconds,
942 row_iteration_seconds: &self
943 .compute_state
944 .metrics
945 .index_peek_row_iteration_seconds,
946 result_sort_seconds: &self.compute_state.metrics.index_peek_result_sort_seconds,
947 row_collection_seconds: &self
948 .compute_state
949 .metrics
950 .index_peek_row_collection_seconds,
951 };
952
953 let status = peek.seek_fulfillment(
954 upper,
955 self.compute_state.max_result_size,
956 peek_stash_enabled && peek_stash_eligible,
957 peek_stash_threshold_bytes,
958 &metrics,
959 );
960
961 self.compute_state
962 .metrics
963 .index_peek_total_seconds
964 .observe(start.elapsed().as_secs_f64());
965
966 match status {
967 PeekStatus::Ready(result) => Some(result),
968 PeekStatus::NotReady => None,
969 PeekStatus::UsePeekStash => {
970 let _span =
971 span!(parent: &peek.span, Level::DEBUG, "process_stash_peek").entered();
972
973 let peek_stash_batch_max_runs = PEEK_RESPONSE_STASH_BATCH_MAX_RUNS
974 .get(&self.compute_state.worker_config);
975
976 let stash_task = peek_stash::StashingPeek::start_upload(
977 Arc::clone(&self.compute_state.persist_clients),
978 self.compute_state
979 .peek_stash_persist_location
980 .as_ref()
981 .expect("verified above"),
982 peek.peek.clone(),
983 peek.trace_bundle.clone(),
984 peek_stash_batch_max_runs,
985 );
986
987 self.compute_state
988 .pending_peeks
989 .insert(peek.peek.uuid, PendingPeek::Stash(stash_task));
990 return;
991 }
992 }
993 }
994 PendingPeek::Persist(peek) => peek.result.try_recv().ok().map(|(result, duration)| {
995 self.compute_state
996 .metrics
997 .persist_peek_seconds
998 .observe(duration.as_secs_f64());
999 result
1000 }),
1001 PendingPeek::Stash(stashing_peek) => {
1002 let num_batches = PEEK_STASH_NUM_BATCHES.get(&self.compute_state.worker_config);
1003 let batch_size = PEEK_STASH_BATCH_SIZE.get(&self.compute_state.worker_config);
1004 stashing_peek.pump_rows(num_batches, batch_size);
1005
1006 if let Ok((response, duration)) = stashing_peek.result.try_recv() {
1007 self.compute_state
1008 .metrics
1009 .stashed_peek_seconds
1010 .observe(duration.as_secs_f64());
1011 trace!(?stashing_peek.peek, ?duration, "finished stashing peek response in persist");
1012
1013 Some(response)
1014 } else {
1015 None
1016 }
1017 }
1018 };
1019
1020 if let Some(response) = response {
1021 let _span = span!(parent: peek.span(), Level::DEBUG, "process_peek_response").entered();
1022 self.send_peek_response(peek, response)
1023 } else {
1024 let uuid = peek.peek().uuid;
1025 self.compute_state.pending_peeks.insert(uuid, peek);
1026 }
1027 }
1028
1029 pub fn process_peeks(&mut self) {
1031 let mut upper = Antichain::new();
1032 let pending_peeks = std::mem::take(&mut self.compute_state.pending_peeks);
1033 for (_uuid, peek) in pending_peeks {
1034 self.process_peek(&mut upper, peek);
1035 }
1036 }
1037
1038 #[mz_ore::instrument(level = "debug")]
1043 fn send_peek_response(&mut self, peek: PendingPeek, response: PeekResponse) {
1044 let log_event = peek.as_log_event(false);
1045 self.send_compute_response(ComputeResponse::PeekResponse(
1047 peek.peek().uuid,
1048 response,
1049 OpenTelemetryContext::obtain(),
1050 ));
1051
1052 if let Some(logger) = self.compute_state.compute_logger.as_mut() {
1054 logger.log(&log_event);
1055 }
1056 }
1057
1058 pub fn process_subscribes(&mut self) {
1060 let mut subscribe_responses = self.compute_state.subscribe_response_buffer.borrow_mut();
1061 for (sink_id, mut response) in subscribe_responses.drain(..) {
1062 if let Some(collection) = self.compute_state.collections.get_mut(&sink_id) {
1064 let new_frontier = match &response {
1065 SubscribeResponse::Batch(b) => b.upper.clone(),
1066 SubscribeResponse::DroppedAt(_) => Antichain::new(),
1067 };
1068
1069 let reported = collection.reported_frontiers();
1070 assert!(
1071 reported.write_frontier.allows_reporting(&new_frontier),
1072 "subscribe write frontier regression: {:?} -> {:?}",
1073 reported.write_frontier,
1074 new_frontier,
1075 );
1076 assert!(
1077 reported.input_frontier.allows_reporting(&new_frontier),
1078 "subscribe input frontier regression: {:?} -> {:?}",
1079 reported.input_frontier,
1080 new_frontier,
1081 );
1082
1083 collection
1084 .set_reported_write_frontier(ReportedFrontier::Reported(new_frontier.clone()));
1085 collection
1086 .set_reported_input_frontier(ReportedFrontier::Reported(new_frontier.clone()));
1087 collection.set_reported_output_frontier(ReportedFrontier::Reported(new_frontier));
1088 } else {
1089 }
1092
1093 response
1094 .to_error_if_exceeds(usize::try_from(self.compute_state.max_result_size).unwrap());
1095 self.send_compute_response(ComputeResponse::SubscribeResponse(sink_id, response));
1096 }
1097 }
1098
1099 pub fn process_copy_tos(&self) {
1101 let mut responses = self.compute_state.copy_to_response_buffer.borrow_mut();
1102 for (sink_id, response) in responses.drain(..) {
1103 self.send_compute_response(ComputeResponse::CopyToResponse(sink_id, response));
1104 }
1105 }
1106
1107 fn send_compute_response(&self, response: ComputeResponse) {
1109 let _ = self.response_tx.send(response);
1112 }
1113
1114 pub(crate) fn check_expiration(&self) {
1116 let now = mz_ore::now::SYSTEM_TIME();
1117 if self.compute_state.replica_expiration.less_than(&now.into()) {
1118 let now_datetime = mz_ore::now::to_datetime(now);
1119 let expiration_datetime = self
1120 .compute_state
1121 .replica_expiration
1122 .as_option()
1123 .map(Into::into)
1124 .map(mz_ore::now::to_datetime);
1125
1126 error!(
1129 now,
1130 now_datetime = ?now_datetime,
1131 expiration = ?self.compute_state.replica_expiration.elements(),
1132 expiration_datetime = ?expiration_datetime,
1133 "replica expired"
1134 );
1135
1136 assert!(
1138 !self.compute_state.replica_expiration.less_than(&now.into()),
1139 "replica expired. now: {now} ({now_datetime:?}), expiration: {:?} ({expiration_datetime:?})",
1140 self.compute_state.replica_expiration.elements(),
1141 );
1142 }
1143 }
1144
1145 pub fn determine_dataflow_expiration(
1151 &self,
1152 time_dependence: &TimeDependence,
1153 until: &Antichain<Timestamp>,
1154 ) -> Antichain<Timestamp> {
1155 let iter = self
1160 .compute_state
1161 .replica_expiration
1162 .iter()
1163 .filter_map(|t| time_dependence.apply(*t))
1164 .filter_map(|t| Timestamp::try_step_forward(&t))
1165 .filter(|expiration| !until.less_equal(expiration));
1166 Antichain::from_iter(iter)
1167 }
1168}
1169
1170pub enum PendingPeek {
1175 Index(IndexPeek),
1177 Persist(PersistPeek),
1179 Stash(peek_stash::StashingPeek),
1182}
1183
1184impl PendingPeek {
1185 pub fn as_log_event(&self, installed: bool) -> ComputeEvent {
1187 let peek = self.peek();
1188 let (id, peek_type) = match &peek.target {
1189 PeekTarget::Index { id } => (*id, logging::compute::PeekType::Index),
1190 PeekTarget::Persist { id, .. } => (*id, logging::compute::PeekType::Persist),
1191 };
1192 let uuid = peek.uuid.into_bytes();
1193 ComputeEvent::Peek(PeekEvent {
1194 id,
1195 time: peek.timestamp,
1196 uuid,
1197 peek_type,
1198 installed,
1199 })
1200 }
1201
1202 fn index(peek: Peek, mut trace_bundle: TraceBundle) -> Self {
1203 let empty_frontier = Antichain::new();
1204 let timestamp_frontier = Antichain::from_elem(peek.timestamp);
1205 trace_bundle
1206 .oks_mut()
1207 .set_logical_compaction(timestamp_frontier.borrow());
1208 trace_bundle
1209 .errs_mut()
1210 .set_logical_compaction(timestamp_frontier.borrow());
1211 trace_bundle
1212 .oks_mut()
1213 .set_physical_compaction(empty_frontier.borrow());
1214 trace_bundle
1215 .errs_mut()
1216 .set_physical_compaction(empty_frontier.borrow());
1217
1218 PendingPeek::Index(IndexPeek {
1219 peek,
1220 trace_bundle,
1221 span: tracing::Span::current(),
1222 })
1223 }
1224
1225 fn persist(
1226 peek: Peek,
1227 persist_clients: Arc<PersistClientCache>,
1228 metadata: CollectionMetadata,
1229 max_result_size: usize,
1230 timely_worker: &TimelyWorker,
1231 ) -> Self {
1232 let active_worker = {
1233 let chosen_index = usize::cast_from(peek.uuid.hashed()) % timely_worker.peers();
1235 chosen_index == timely_worker.index()
1236 };
1237 let activator = timely_worker.sync_activator_for([].into());
1238 let peek_uuid = peek.uuid;
1239
1240 let (result_tx, result_rx) = oneshot::channel();
1241 let timestamp = peek.timestamp;
1242 let mfp_plan = peek.map_filter_project.clone();
1243 let max_results_needed = peek
1244 .finishing
1245 .limit
1246 .map(|l| usize::cast_from(u64::from(l)))
1247 .unwrap_or(usize::MAX)
1248 + peek.finishing.offset;
1249 let order_by = peek.finishing.order_by.clone();
1250
1251 let literal_constraint = peek
1253 .literal_constraints
1254 .clone()
1255 .map(|rows| rows.into_element());
1256
1257 let task_handle = mz_ore::task::spawn(|| "persist::peek", async move {
1258 let start = Instant::now();
1259 let result = if active_worker {
1260 PersistPeek::do_peek(
1261 &persist_clients,
1262 metadata,
1263 timestamp,
1264 literal_constraint,
1265 mfp_plan,
1266 max_result_size,
1267 max_results_needed,
1268 )
1269 .await
1270 } else {
1271 Ok(vec![])
1272 };
1273 let result = match result {
1274 Ok(rows) => PeekResponse::Rows(vec![RowCollection::new(rows, &order_by)]),
1275 Err(e) => PeekResponse::Error(e.to_string()),
1276 };
1277 match result_tx.send((result, start.elapsed())) {
1278 Ok(()) => {}
1279 Err((_result, elapsed)) => {
1280 debug!(duration =? elapsed, "dropping result for cancelled peek {peek_uuid}")
1281 }
1282 }
1283 match activator.activate() {
1284 Ok(()) => {}
1285 Err(_) => {
1286 debug!("unable to wake timely after completed peek {peek_uuid}");
1287 }
1288 }
1289 });
1290 PendingPeek::Persist(PersistPeek {
1291 peek,
1292 _abort_handle: task_handle.abort_on_drop(),
1293 result: result_rx,
1294 span: tracing::Span::current(),
1295 })
1296 }
1297
1298 fn span(&self) -> &tracing::Span {
1299 match self {
1300 PendingPeek::Index(p) => &p.span,
1301 PendingPeek::Persist(p) => &p.span,
1302 PendingPeek::Stash(p) => &p.span,
1303 }
1304 }
1305
1306 pub(crate) fn peek(&self) -> &Peek {
1307 match self {
1308 PendingPeek::Index(p) => &p.peek,
1309 PendingPeek::Persist(p) => &p.peek,
1310 PendingPeek::Stash(p) => &p.peek,
1311 }
1312 }
1313}
1314
1315pub struct PersistPeek {
1320 pub(crate) peek: Peek,
1321 _abort_handle: AbortOnDropHandle<()>,
1324 result: oneshot::Receiver<(PeekResponse, Duration)>,
1326 span: tracing::Span,
1328}
1329
1330impl PersistPeek {
1331 async fn do_peek(
1332 persist_clients: &PersistClientCache,
1333 metadata: CollectionMetadata,
1334 as_of: Timestamp,
1335 literal_constraint: Option<Row>,
1336 mfp_plan: SafeMfpPlan,
1337 max_result_size: usize,
1338 mut limit_remaining: usize,
1339 ) -> Result<Vec<(Row, NonZeroUsize)>, String> {
1340 let client = persist_clients
1341 .open(metadata.persist_location)
1342 .await
1343 .map_err(|e| e.to_string())?;
1344
1345 let mut reader: ReadHandle<SourceData, (), Timestamp, StorageDiff> = client
1346 .open_leased_reader(
1347 metadata.data_shard,
1348 Arc::new(metadata.relation_desc.clone()),
1349 Arc::new(UnitSchema),
1350 Diagnostics::from_purpose("persist::peek"),
1351 USE_CRITICAL_SINCE_SNAPSHOT.get(client.dyncfgs()),
1352 )
1353 .await
1354 .map_err(|e| e.to_string())?;
1355
1356 let mut txns_read = if let Some(txns_id) = metadata.txns_shard {
1363 Some(TxnsCache::open(&client, txns_id, Some(metadata.data_shard)).await)
1364 } else {
1365 None
1366 };
1367
1368 let metrics = client.metrics();
1369
1370 let mut cursor = StatsCursor::new(
1371 &mut reader,
1372 txns_read.as_mut(),
1373 metrics,
1374 &mfp_plan,
1375 &metadata.relation_desc,
1376 Antichain::from_elem(as_of),
1377 )
1378 .await
1379 .map_err(|since| {
1380 format!("attempted to peek at {as_of}, but the since has advanced to {since:?}")
1381 })?;
1382
1383 let mut result = vec![];
1385 let mut datum_vec = DatumVec::new();
1386 let mut row_builder = Row::default();
1387 let arena = RowArena::new();
1388 let mut total_size = 0usize;
1389
1390 let literal_len = match &literal_constraint {
1391 None => 0,
1392 Some(row) => row.iter().count(),
1393 };
1394
1395 'collect: while limit_remaining > 0 {
1396 let Some(batch) = cursor.next().await else {
1397 break;
1398 };
1399 for (data, _, d) in batch {
1400 let row = data.map_err(|e| e.to_string())?;
1401
1402 if let Some(literal) = &literal_constraint {
1403 match row.iter().take(literal_len).cmp(literal.iter()) {
1404 Ordering::Less => continue,
1405 Ordering::Equal => {}
1406 Ordering::Greater => break 'collect,
1407 }
1408 }
1409
1410 let count: usize = d.try_into().map_err(|_| {
1411 error!(
1412 shard = %metadata.data_shard, diff = d, ?row,
1413 "persist peek encountered negative multiplicities",
1414 );
1415 format!(
1416 "Invalid data in source, \
1417 saw retractions ({}) for row that does not exist: {:?}",
1418 -d, row,
1419 )
1420 })?;
1421 let Some(count) = NonZeroUsize::new(count) else {
1422 continue;
1423 };
1424 let mut datum_local = datum_vec.borrow_with(&row);
1425 let eval_result = mfp_plan
1426 .evaluate_into(&mut datum_local, &arena, &mut row_builder)
1427 .map(|row| row.cloned())
1428 .map_err(|e| e.to_string())?;
1429 if let Some(row) = eval_result {
1430 total_size = total_size
1431 .saturating_add(row.byte_len())
1432 .saturating_add(std::mem::size_of::<NonZeroUsize>());
1433 if total_size > max_result_size {
1434 return Err(format!(
1435 "result exceeds max size of {}",
1436 ByteSize::b(u64::cast_from(max_result_size))
1437 ));
1438 }
1439 result.push((row, count));
1440 limit_remaining = limit_remaining.saturating_sub(count.get());
1441 if limit_remaining == 0 {
1442 break;
1443 }
1444 }
1445 }
1446 }
1447
1448 Ok(result)
1449 }
1450}
1451
1452pub struct IndexPeek {
1454 peek: Peek,
1455 trace_bundle: TraceBundle,
1457 span: tracing::Span,
1459}
1460
1461pub(crate) struct IndexPeekMetrics<'a> {
1466 pub seek_fulfillment_seconds: &'a prometheus::Histogram,
1467 pub frontier_check_seconds: &'a prometheus::Histogram,
1468 pub error_scan_seconds: &'a prometheus::Histogram,
1469 pub cursor_setup_seconds: &'a prometheus::Histogram,
1470 pub row_iteration_seconds: &'a prometheus::Histogram,
1471 pub result_sort_seconds: &'a prometheus::Histogram,
1472 pub row_collection_seconds: &'a prometheus::Histogram,
1473}
1474
1475impl IndexPeek {
1476 fn seek_fulfillment(
1489 &mut self,
1490 upper: &mut Antichain<Timestamp>,
1491 max_result_size: u64,
1492 peek_stash_eligible: bool,
1493 peek_stash_threshold_bytes: usize,
1494 metrics: &IndexPeekMetrics<'_>,
1495 ) -> PeekStatus {
1496 let method_start = Instant::now();
1497
1498 self.trace_bundle.oks_mut().read_upper(upper);
1499 if upper.less_equal(&self.peek.timestamp) {
1500 return PeekStatus::NotReady;
1501 }
1502 self.trace_bundle.errs_mut().read_upper(upper);
1503 if upper.less_equal(&self.peek.timestamp) {
1504 return PeekStatus::NotReady;
1505 }
1506
1507 let read_frontier = self.trace_bundle.compaction_frontier();
1508 if !read_frontier.less_equal(&self.peek.timestamp) {
1509 let error = format!(
1510 "Arrangement compaction frontier ({:?}) is beyond the time of the attempted read ({})",
1511 read_frontier.elements(),
1512 self.peek.timestamp,
1513 );
1514 return PeekStatus::Ready(PeekResponse::Error(error));
1515 }
1516
1517 metrics
1518 .frontier_check_seconds
1519 .observe(method_start.elapsed().as_secs_f64());
1520
1521 let result = self.collect_finished_data(
1522 max_result_size,
1523 peek_stash_eligible,
1524 peek_stash_threshold_bytes,
1525 metrics,
1526 );
1527
1528 metrics
1529 .seek_fulfillment_seconds
1530 .observe(method_start.elapsed().as_secs_f64());
1531
1532 result
1533 }
1534
1535 fn collect_finished_data(
1537 &mut self,
1538 max_result_size: u64,
1539 peek_stash_eligible: bool,
1540 peek_stash_threshold_bytes: usize,
1541 metrics: &IndexPeekMetrics<'_>,
1542 ) -> PeekStatus {
1543 let error_scan_start = Instant::now();
1544
1545 let (mut cursor, storage) = self.trace_bundle.errs_mut().cursor();
1548 while cursor.key_valid(&storage) {
1549 let mut copies = Diff::ZERO;
1550 cursor.map_times(&storage, |time, diff| {
1551 if time.less_equal(&self.peek.timestamp) {
1552 copies += diff;
1553 }
1554 });
1555 if copies.is_negative() {
1556 let error = cursor.key(&storage);
1557 error!(
1558 target = %self.peek.target.id(), diff = %copies, %error,
1559 "index peek encountered negative multiplicities in error trace",
1560 );
1561 return PeekStatus::Ready(PeekResponse::Error(format!(
1562 "Invalid data in source errors, \
1563 saw retractions ({}) for row that does not exist: {}",
1564 -copies, error,
1565 )));
1566 }
1567 if copies.is_positive() {
1568 return PeekStatus::Ready(PeekResponse::Error(cursor.key(&storage).to_string()));
1569 }
1570 cursor.step_key(&storage);
1571 }
1572
1573 metrics
1574 .error_scan_seconds
1575 .observe(error_scan_start.elapsed().as_secs_f64());
1576
1577 Self::collect_ok_finished_data(
1578 &self.peek,
1579 self.trace_bundle.oks_mut(),
1580 max_result_size,
1581 peek_stash_eligible,
1582 peek_stash_threshold_bytes,
1583 metrics,
1584 )
1585 }
1586
1587 fn collect_ok_finished_data<Tr>(
1589 peek: &Peek,
1590 oks_handle: &mut Tr,
1591 max_result_size: u64,
1592 peek_stash_eligible: bool,
1593 peek_stash_threshold_bytes: usize,
1594 metrics: &IndexPeekMetrics<'_>,
1595 ) -> PeekStatus
1596 where
1597 for<'a> Tr: TraceReader<
1598 Key<'a>: ToDatumIter + Eq,
1599 KeyContainer: BatchContainer<Owned = Row>,
1600 Val<'a>: ToDatumIter,
1601 TimeGat<'a>: PartialOrder<Timestamp>,
1602 DiffGat<'a> = &'a Diff,
1603 >,
1604 {
1605 let max_result_size = usize::cast_from(max_result_size);
1606 let count_byte_size = size_of::<NonZeroUsize>();
1607
1608 let cursor_setup_start = Instant::now();
1610
1611 let mut peek_iterator = peek_result_iterator::PeekResultIterator::new(
1614 peek.target.id().clone(),
1615 peek.map_filter_project.clone(),
1616 peek.timestamp,
1617 peek.literal_constraints.clone().as_deref_mut(),
1618 oks_handle,
1619 );
1620
1621 metrics
1622 .cursor_setup_seconds
1623 .observe(cursor_setup_start.elapsed().as_secs_f64());
1624
1625 let mut results = Vec::new();
1627 let mut total_size: usize = 0;
1628
1629 let max_results = peek.finishing.num_rows_needed();
1635
1636 let comparator = RowComparator::new(peek.finishing.order_by.as_slice());
1637
1638 let row_iteration_start = Instant::now();
1640 let mut sort_time_accum = Duration::ZERO;
1641
1642 while let Some(row) = peek_iterator.next() {
1643 let row: (Row, _) = match row {
1644 Ok(row) => row,
1645 Err(err) => return PeekStatus::Ready(PeekResponse::Error(err)),
1646 };
1647 let (row, copies) = row;
1648 let copies: NonZeroUsize = NonZeroUsize::try_from(copies).expect("fits into usize");
1649
1650 total_size = total_size
1651 .saturating_add(row.byte_len())
1652 .saturating_add(count_byte_size);
1653 if peek_stash_eligible && total_size > peek_stash_threshold_bytes {
1654 return PeekStatus::UsePeekStash;
1655 }
1656 if total_size > max_result_size {
1657 return PeekStatus::Ready(PeekResponse::Error(format!(
1658 "result exceeds max size of {}",
1659 ByteSize::b(u64::cast_from(max_result_size))
1660 )));
1661 }
1662
1663 results.push((row, copies));
1664
1665 if let Some(max_results) = max_results {
1668 if results.len() >= 2 * max_results {
1672 if peek.finishing.order_by.is_empty() {
1673 results.truncate(max_results);
1674 metrics
1675 .row_iteration_seconds
1676 .observe(row_iteration_start.elapsed().as_secs_f64());
1677 metrics
1678 .result_sort_seconds
1679 .observe(sort_time_accum.as_secs_f64());
1680 let row_collection_start = Instant::now();
1681 let collection = RowCollection::new(results, &peek.finishing.order_by);
1682 metrics
1683 .row_collection_seconds
1684 .observe(row_collection_start.elapsed().as_secs_f64());
1685 return PeekStatus::Ready(PeekResponse::Rows(vec![collection]));
1686 } else {
1687 let sort_start = Instant::now();
1696 results.sort_by(|left, right| {
1697 comparator.compare_rows(&left.0, &right.0, || left.0.cmp(&right.0))
1698 });
1699 sort_time_accum += sort_start.elapsed();
1700 let dropped = results.drain(max_results..);
1701 let dropped_size =
1702 dropped
1703 .into_iter()
1704 .fold(0, |acc: usize, (row, _count): (Row, _)| {
1705 acc.saturating_add(
1706 row.byte_len().saturating_add(count_byte_size),
1707 )
1708 });
1709 total_size = total_size.saturating_sub(dropped_size);
1710 }
1711 }
1712 }
1713 }
1714
1715 metrics
1716 .row_iteration_seconds
1717 .observe(row_iteration_start.elapsed().as_secs_f64());
1718 metrics
1719 .result_sort_seconds
1720 .observe(sort_time_accum.as_secs_f64());
1721
1722 let row_collection_start = Instant::now();
1723 let collection = RowCollection::new(results, &peek.finishing.order_by);
1724 metrics
1725 .row_collection_seconds
1726 .observe(row_collection_start.elapsed().as_secs_f64());
1727 PeekStatus::Ready(PeekResponse::Rows(vec![collection]))
1728 }
1729}
1730
1731enum PeekStatus {
1734 NotReady,
1737 UsePeekStash,
1740 Ready(PeekResponse),
1742}
1743
1744#[derive(Debug)]
1746struct ReportedFrontiers {
1747 write_frontier: ReportedFrontier,
1749 input_frontier: ReportedFrontier,
1751 output_frontier: ReportedFrontier,
1753}
1754
1755impl ReportedFrontiers {
1756 fn new() -> Self {
1758 Self {
1759 write_frontier: ReportedFrontier::new(),
1760 input_frontier: ReportedFrontier::new(),
1761 output_frontier: ReportedFrontier::new(),
1762 }
1763 }
1764}
1765
1766#[derive(Clone, Debug)]
1768pub enum ReportedFrontier {
1769 Reported(Antichain<Timestamp>),
1771 NotReported {
1773 lower: Antichain<Timestamp>,
1775 },
1776}
1777
1778impl ReportedFrontier {
1779 pub fn new() -> Self {
1781 let lower = Antichain::from_elem(timely::progress::Timestamp::minimum());
1782 Self::NotReported { lower }
1783 }
1784
1785 pub fn is_empty(&self) -> bool {
1787 match self {
1788 Self::Reported(frontier) => frontier.is_empty(),
1789 Self::NotReported { .. } => false,
1790 }
1791 }
1792
1793 fn allows_reporting(&self, other: &Antichain<Timestamp>) -> bool {
1799 match self {
1800 Self::Reported(frontier) => PartialOrder::less_than(frontier, other),
1801 Self::NotReported { lower } => PartialOrder::less_equal(lower, other),
1802 }
1803 }
1804}
1805
1806pub struct CollectionState {
1808 reported_frontiers: ReportedFrontiers,
1810 dataflow_index: Rc<usize>,
1816 pub is_subscribe_or_copy: bool,
1822 as_of: Antichain<Timestamp>,
1826
1827 pub sink_token: Option<SinkToken>,
1832 pub sink_write_frontier: Option<Rc<RefCell<Antichain<Timestamp>>>>,
1836 pub input_probes: BTreeMap<GlobalId, probe::Handle<Timestamp>>,
1838 pub compute_probe: Option<probe::Handle<Timestamp>>,
1843 logging: Option<CollectionLogging>,
1845 metrics: CollectionMetrics,
1847 read_only_tx: watch::Sender<bool>,
1859 pub read_only_rx: watch::Receiver<bool>,
1861}
1862
1863impl CollectionState {
1864 fn new(
1865 dataflow_index: Rc<usize>,
1866 is_subscribe_or_copy: bool,
1867 as_of: Antichain<Timestamp>,
1868 metrics: CollectionMetrics,
1869 ) -> Self {
1870 let (read_only_tx, read_only_rx) = watch::channel(true);
1873
1874 Self {
1875 reported_frontiers: ReportedFrontiers::new(),
1876 dataflow_index,
1877 is_subscribe_or_copy,
1878 as_of,
1879 sink_token: None,
1880 sink_write_frontier: None,
1881 input_probes: Default::default(),
1882 compute_probe: None,
1883 logging: None,
1884 metrics,
1885 read_only_tx,
1886 read_only_rx,
1887 }
1888 }
1889
1890 fn reported_frontiers(&self) -> &ReportedFrontiers {
1892 &self.reported_frontiers
1893 }
1894
1895 pub fn reset_reported_frontiers(&mut self, frontier: ReportedFrontier) {
1897 self.reported_frontiers.write_frontier = frontier.clone();
1898 self.reported_frontiers.input_frontier = frontier.clone();
1899 self.reported_frontiers.output_frontier = frontier;
1900 }
1901
1902 fn set_reported_write_frontier(&mut self, frontier: ReportedFrontier) {
1904 if let Some(logging) = &mut self.logging {
1905 let time = match &frontier {
1906 ReportedFrontier::Reported(frontier) => frontier.get(0).copied(),
1907 ReportedFrontier::NotReported { .. } => Some(Timestamp::MIN),
1908 };
1909 logging.set_frontier(time);
1910 }
1911
1912 self.reported_frontiers.write_frontier = frontier;
1913 }
1914
1915 fn set_reported_input_frontier(&mut self, frontier: ReportedFrontier) {
1917 if let Some(logging) = &mut self.logging {
1919 for (id, probe) in &self.input_probes {
1920 let new_time = probe.with_frontier(|frontier| frontier.as_option().copied());
1921 logging.set_import_frontier(*id, new_time);
1922 }
1923 }
1924
1925 self.reported_frontiers.input_frontier = frontier;
1926 }
1927
1928 fn set_reported_output_frontier(&mut self, frontier: ReportedFrontier) {
1930 let already_hydrated = self.hydrated();
1931
1932 self.reported_frontiers.output_frontier = frontier;
1933
1934 if !already_hydrated && self.hydrated() {
1935 if let Some(logging) = &mut self.logging {
1936 logging.set_hydrated();
1937 }
1938 self.metrics.record_collection_hydrated();
1939 }
1940 }
1941
1942 fn hydrated(&self) -> bool {
1944 match &self.reported_frontiers.output_frontier {
1945 ReportedFrontier::Reported(frontier) => PartialOrder::less_than(&self.as_of, frontier),
1946 ReportedFrontier::NotReported { .. } => false,
1947 }
1948 }
1949
1950 fn allow_writes(&self) {
1952 info!(
1953 dataflow_index = *self.dataflow_index,
1954 export = ?self.logging.as_ref().map(|l| l.export_id()),
1955 "allowing writes for dataflow",
1956 );
1957 let _ = self.read_only_tx.send(false);
1958 }
1959}