1use std::any::Any;
9use std::cell::RefCell;
10use std::cmp::Ordering;
11use std::collections::{BTreeMap, BTreeSet};
12use std::num::NonZeroUsize;
13use std::rc::Rc;
14use std::sync::Arc;
15use std::time::{Duration, Instant};
16
17use bytesize::ByteSize;
18use differential_dataflow::Hashable;
19use differential_dataflow::lattice::Lattice;
20use differential_dataflow::trace::implementations::BatchContainer;
21use differential_dataflow::trace::{Cursor, TraceReader};
22use mz_compute_client::logging::LoggingConfig;
23use mz_compute_client::protocol::command::{
24 ComputeCommand, ComputeParameters, InstanceConfig, Peek, PeekTarget,
25};
26use mz_compute_client::protocol::history::ComputeCommandHistory;
27use mz_compute_client::protocol::response::{
28 ComputeResponse, CopyToResponse, FrontiersResponse, PeekResponse, SubscribeResponse,
29};
30use mz_compute_types::dataflows::DataflowDescription;
31use mz_compute_types::dyncfgs::{
32 ENABLE_PEEK_RESPONSE_STASH, PEEK_RESPONSE_STASH_BATCH_MAX_RUNS,
33 PEEK_RESPONSE_STASH_THRESHOLD_BYTES, PEEK_STASH_BATCH_SIZE, PEEK_STASH_NUM_BATCHES,
34};
35use mz_compute_types::plan::render_plan::RenderPlan;
36use mz_dyncfg::ConfigSet;
37use mz_expr::row::RowCollection;
38use mz_expr::{RowComparator, SafeMfpPlan};
39use mz_ore::cast::{CastFrom, CastLossy};
40use mz_ore::collections::CollectionExt;
41use mz_ore::metrics::{MetricsRegistry, UIntGauge};
42use mz_ore::now::EpochMillis;
43use mz_ore::soft_panic_or_log;
44use mz_ore::task::AbortOnDropHandle;
45use mz_ore::tracing::{OpenTelemetryContext, TracingHandle};
46use mz_persist_client::Diagnostics;
47use mz_persist_client::cache::PersistClientCache;
48use mz_persist_client::cfg::USE_CRITICAL_SINCE_SNAPSHOT;
49use mz_persist_client::read::ReadHandle;
50use mz_persist_types::PersistLocation;
51use mz_persist_types::codec_impls::UnitSchema;
52use mz_repr::fixed_length::ExtendDatums;
53use mz_repr::{DatumVec, Diff, GlobalId, Row, RowArena, Timestamp};
54use mz_storage_operators::stats::StatsCursor;
55use mz_storage_types::StorageDiff;
56use mz_storage_types::controller::CollectionMetadata;
57use mz_storage_types::dyncfgs::ORE_OVERFLOWING_BEHAVIOR;
58use mz_storage_types::sources::SourceData;
59use mz_storage_types::time_dependence::TimeDependence;
60use mz_txn_wal::operator::TxnsContext;
61use mz_txn_wal::txn_cache::TxnsCache;
62use timely::dataflow::operators::probe;
63use timely::order::PartialOrder;
64use timely::progress::frontier::Antichain;
65use timely::worker::Worker as TimelyWorker;
66use tokio::sync::{oneshot, watch};
67use tracing::{Level, debug, error, info, span, trace, warn};
68use uuid::Uuid;
69
70use crate::arrangement::manager::{TraceBundle, TraceManager};
71use crate::logging;
72use crate::logging::compute::{CollectionLogging, ComputeEvent, PeekEvent};
73use crate::logging::initialize::LoggingTraces;
74use crate::metrics::{CollectionMetrics, WorkerMetrics};
75use crate::render::{LinearJoinSpec, StartSignal};
76use crate::server::{ComputeInstanceContext, ResponseSender};
77
78mod peek_result_iterator;
79mod peek_stash;
80
81pub struct ComputeState {
86 pub collections: BTreeMap<GlobalId, CollectionState>,
95 pub traces: TraceManager,
97 pub subscribe_response_buffer: Rc<RefCell<Vec<(GlobalId, SubscribeResponse)>>>,
102 pub copy_to_response_buffer: Rc<RefCell<Vec<(GlobalId, CopyToResponse)>>>,
107 pub pending_peeks: BTreeMap<Uuid, PendingPeek>,
109 pub peek_stash_persist_location: Option<PersistLocation>,
111 pub compute_logger: Option<logging::compute::Logger>,
113 pub persist_clients: Arc<PersistClientCache>,
116 pub txns_ctx: TxnsContext,
118 pub command_history: ComputeCommandHistory<UIntGauge>,
120 max_result_size: u64,
122 pub linear_join_spec: LinearJoinSpec,
124 pub metrics: WorkerMetrics,
126 tracing_handle: Arc<TracingHandle>,
128 pub context: ComputeInstanceContext,
130 pub worker_config: Rc<ConfigSet>,
144
145 pub metrics_registry: MetricsRegistry,
147
148 pub workers_per_process: usize,
150
151 suspended_collections: BTreeMap<GlobalId, Rc<dyn Any>>,
157
158 pub server_maintenance_interval: Duration,
161
162 pub init_system_time: EpochMillis,
166
167 pub replica_expiration: Antichain<Timestamp>,
171
172 pub storage_log_reader: Option<crate::server::StorageTimelyLogReader>,
174}
175
176impl ComputeState {
177 pub fn new(
179 persist_clients: Arc<PersistClientCache>,
180 txns_ctx: TxnsContext,
181 metrics: WorkerMetrics,
182 tracing_handle: Arc<TracingHandle>,
183 context: ComputeInstanceContext,
184 metrics_registry: MetricsRegistry,
185 workers_per_process: usize,
186 storage_log_reader: Option<crate::server::StorageTimelyLogReader>,
187 ) -> Self {
188 let traces = TraceManager::new(metrics.clone());
189 let command_history = ComputeCommandHistory::new(metrics.for_history());
190
191 Self {
192 collections: Default::default(),
193 traces,
194 subscribe_response_buffer: Default::default(),
195 copy_to_response_buffer: Default::default(),
196 pending_peeks: Default::default(),
197 peek_stash_persist_location: None,
198 compute_logger: None,
199 persist_clients,
200 txns_ctx,
201 command_history,
202 max_result_size: u64::MAX,
203 linear_join_spec: Default::default(),
204 metrics,
205 tracing_handle,
206 context,
207 worker_config: mz_dyncfgs::all_dyncfgs().into(),
208 metrics_registry,
209 workers_per_process,
210 suspended_collections: Default::default(),
211 server_maintenance_interval: Duration::ZERO,
212 init_system_time: mz_ore::now::SYSTEM_TIME(),
213 replica_expiration: Antichain::default(),
214 storage_log_reader,
215 }
216 }
217
218 pub fn expect_collection_mut(&mut self, id: GlobalId) -> &mut CollectionState {
222 self.collections
223 .get_mut(&id)
224 .expect("collection must exist")
225 }
226
227 pub fn input_probe_for(
233 &mut self,
234 input_id: GlobalId,
235 collection_ids: impl Iterator<Item = GlobalId>,
236 ) -> probe::Handle<Timestamp> {
237 let probe = probe::Handle::default();
238 for id in collection_ids {
239 if let Some(collection) = self.collections.get_mut(&id) {
240 collection.input_probes.insert(input_id, probe.clone());
241 }
242 }
243 probe
244 }
245
246 fn apply_worker_config(&mut self) {
248 use mz_compute_types::dyncfgs::*;
249
250 let config = &self.worker_config;
251
252 self.linear_join_spec = LinearJoinSpec::from_config(config);
253
254 if ENABLE_LGALLOC.get(config) {
255 if let Some(path) = &self.context.scratch_directory {
256 let clear_bytes = LGALLOC_SLOW_CLEAR_BYTES.get(config);
257 let eager_return = ENABLE_LGALLOC_EAGER_RECLAMATION.get(config);
258 let file_growth_dampener = LGALLOC_FILE_GROWTH_DAMPENER.get(config);
259 let interval = LGALLOC_BACKGROUND_INTERVAL.get(config);
260 let local_buffer_bytes = LGALLOC_LOCAL_BUFFER_BYTES.get(config);
261 info!(
262 ?path,
263 backgrund_interval=?interval,
264 clear_bytes,
265 eager_return,
266 file_growth_dampener,
267 local_buffer_bytes,
268 "enabling lgalloc"
269 );
270 let background_worker_config = lgalloc::BackgroundWorkerConfig {
271 interval,
272 clear_bytes,
273 };
274 lgalloc::lgalloc_set_config(
275 lgalloc::LgAlloc::new()
276 .enable()
277 .with_path(path.clone())
278 .with_background_config(background_worker_config)
279 .eager_return(eager_return)
280 .file_growth_dampener(file_growth_dampener)
281 .local_buffer_bytes(local_buffer_bytes),
282 );
283 } else {
284 debug!("not enabling lgalloc, scratch directory not specified");
285 }
286 } else {
287 info!("disabling lgalloc");
288 lgalloc::lgalloc_set_config(lgalloc::LgAlloc::new().disable());
289 }
290
291 #[cfg(unix)]
297 if let Some(path) = &self.context.scratch_directory {
298 mz_ore::pager::set_scratch_dir(path.clone());
299 mz_ore::pager::set_backend(mz_ore::pager::Backend::File);
300 } else {
301 mz_ore::pager::set_backend(mz_ore::pager::Backend::Swap);
302 }
303
304 crate::memory_limiter::apply_limiter_config(config);
305
306 mz_ore::region::ENABLE_LGALLOC_REGION.store(
307 ENABLE_COLUMNATION_LGALLOC.get(config),
308 std::sync::atomic::Ordering::Relaxed,
309 );
310
311 {
326 use mz_ore::pager::Backend;
327 use mz_timely_util::column_pager::{Codec, apply_tiered_config};
328
329 let enabled = ENABLE_COLUMN_PAGED_BATCHER_SPILL.get(config);
330 let codec = COLUMN_PAGED_BATCHER_LZ4.get(config).then_some(Codec::Lz4);
331 let swap_pageout = COLUMN_PAGED_BATCHER_SWAP_PAGEOUT.get(config);
332
333 const MIB: usize = 1024 * 1024;
338 const DEFAULT_MEM_LIMIT: usize = 4 * 1024 * MIB;
339 let mem_limit = crate::memory_limiter::get_memory_limit().unwrap_or(DEFAULT_MEM_LIMIT);
340 let fraction = COLUMN_PAGED_BATCHER_BUDGET_FRACTION.get(config).max(0.0);
341 let total = usize::cast_lossy(f64::cast_lossy(mem_limit) * fraction).max(128 * MIB);
342
343 let backend = if self.context.scratch_directory.is_some() {
344 Backend::File
345 } else {
346 Backend::Swap
347 };
348
349 debug!(
350 enabled,
351 ?backend,
352 ?codec,
353 swap_pageout,
354 fraction,
355 mem_limit,
356 budget_bytes = total,
357 "column-paged batcher: applying tiered config",
358 );
359 apply_tiered_config(enabled, total, backend, codec, swap_pageout);
360 }
361
362 self.server_maintenance_interval = COMPUTE_SERVER_MAINTENANCE_INTERVAL.get(config);
365
366 let overflowing_behavior = ORE_OVERFLOWING_BEHAVIOR.get(config);
367 match overflowing_behavior.parse() {
368 Ok(behavior) => mz_ore::overflowing::set_behavior(behavior),
369 Err(err) => {
370 error!(
371 err,
372 overflowing_behavior, "Invalid value for ore_overflowing_behavior"
373 );
374 }
375 }
376 }
377
378 pub fn apply_expiration_offset(&mut self, offset: Duration) {
384 if self.replica_expiration.is_empty() {
385 let offset: EpochMillis = offset
386 .as_millis()
387 .try_into()
388 .expect("duration must fit within u64");
389 let replica_expiration_millis = self.init_system_time + offset;
390 let replica_expiration = Timestamp::from(replica_expiration_millis);
391
392 info!(
393 offset = %offset,
394 replica_expiration_millis = %replica_expiration_millis,
395 replica_expiration_utc = %mz_ore::now::to_datetime(replica_expiration_millis),
396 "setting replica expiration",
397 );
398 self.replica_expiration = Antichain::from_elem(replica_expiration);
399
400 self.metrics
402 .replica_expiration_timestamp_seconds
403 .set(replica_expiration.into());
404 }
405 }
406
407 pub fn dataflow_max_inflight_bytes(&self) -> Option<usize> {
410 use mz_compute_types::dyncfgs::{
411 DATAFLOW_MAX_INFLIGHT_BYTES, DATAFLOW_MAX_INFLIGHT_BYTES_CC,
412 };
413
414 if self.persist_clients.cfg.is_cc_active {
415 DATAFLOW_MAX_INFLIGHT_BYTES_CC.get(&self.worker_config)
416 } else {
417 DATAFLOW_MAX_INFLIGHT_BYTES.get(&self.worker_config)
418 }
419 }
420}
421
422pub(crate) struct ActiveComputeState<'a> {
424 pub timely_worker: &'a mut TimelyWorker,
426 pub compute_state: &'a mut ComputeState,
428 pub response_tx: &'a mut ResponseSender,
430}
431
432pub struct SinkToken(#[allow(dead_code)] Box<dyn Any>);
434
435impl SinkToken {
436 pub fn new(t: Box<dyn Any>) -> Self {
438 Self(t)
439 }
440}
441
442impl<'a> ActiveComputeState<'a> {
443 #[mz_ore::instrument(level = "debug")]
445 pub fn handle_compute_command(&mut self, cmd: ComputeCommand) {
446 use ComputeCommand::*;
447
448 self.compute_state.command_history.push(cmd.clone());
449
450 let timer = self
452 .compute_state
453 .metrics
454 .handle_command_duration_seconds
455 .for_command(&cmd)
456 .start_timer();
457
458 match cmd {
459 Hello { .. } => panic!("Hello must be captured before"),
460 CreateInstance(instance_config) => self.handle_create_instance(*instance_config),
461 InitializationComplete => (),
462 UpdateConfiguration(params) => self.handle_update_configuration(*params),
463 CreateDataflow(dataflow) => self.handle_create_dataflow(*dataflow),
464 Schedule(id) => self.handle_schedule(id),
465 AllowCompaction { id, frontier } => self.handle_allow_compaction(id, frontier),
466 Peek(peek) => {
467 peek.otel_ctx.attach_as_parent();
468 self.handle_peek(*peek)
469 }
470 CancelPeek { uuid } => self.handle_cancel_peek(uuid),
471 AllowWrites(id) => {
472 self.handle_allow_writes(id);
473 }
474 }
475
476 timer.observe_duration();
477 }
478
479 fn handle_create_instance(&mut self, config: InstanceConfig) {
480 self.compute_state.apply_worker_config();
482
483 mz_row_spine::DICTIONARY_COMPRESSION.store(
489 config.arrangement_dictionary_compression,
490 std::sync::atomic::Ordering::Relaxed,
491 );
492
493 if let Some(offset) = config.expiration_offset {
494 self.compute_state.apply_expiration_offset(offset);
495 }
496
497 let storage_log_reader = self.compute_state.storage_log_reader.take();
498 self.initialize_logging(config.logging, storage_log_reader);
499
500 self.compute_state.peek_stash_persist_location = Some(config.peek_stash_persist_location);
501 }
502
503 fn handle_update_configuration(&mut self, params: ComputeParameters) {
504 debug!("Applying configuration update: {params:?}");
505
506 let ComputeParameters {
507 workload_class,
508 max_result_size,
509 tracing,
510 grpc_client: _grpc_client,
511 dyncfg_updates,
512 } = params;
513
514 if let Some(v) = workload_class {
515 self.compute_state.metrics.set_workload_class(v);
516 }
517 if let Some(v) = max_result_size {
518 self.compute_state.max_result_size = v;
519 }
520
521 tracing.apply(self.compute_state.tracing_handle.as_ref());
522
523 dyncfg_updates.apply(&self.compute_state.worker_config);
524 self.compute_state
525 .persist_clients
526 .cfg()
527 .apply_from(&dyncfg_updates);
528
529 mz_metrics::update_dyncfg(&dyncfg_updates);
533
534 self.compute_state.apply_worker_config();
535 }
536
537 fn handle_create_dataflow(
538 &mut self,
539 dataflow: DataflowDescription<RenderPlan, CollectionMetadata>,
540 ) {
541 let dataflow_index = Rc::new(self.timely_worker.next_dataflow_index());
542 let as_of = dataflow.as_of.clone().unwrap();
543
544 let dataflow_expiration = dataflow
545 .time_dependence
546 .as_ref()
547 .map(|time_dependence| {
548 self.determine_dataflow_expiration(time_dependence, &dataflow.until)
549 })
550 .unwrap_or_default();
551
552 let until = dataflow.until.meet(&dataflow_expiration);
554
555 if dataflow.is_transient() {
556 debug!(
557 name = %dataflow.debug_name,
558 import_ids = %dataflow.display_import_ids(),
559 export_ids = %dataflow.display_export_ids(),
560 as_of = ?as_of.elements(),
561 time_dependence = ?dataflow.time_dependence,
562 expiration = ?dataflow_expiration.elements(),
563 expiration_datetime = ?dataflow_expiration
564 .as_option()
565 .map(|t| mz_ore::now::to_datetime(t.into())),
566 plan_until = ?dataflow.until.elements(),
567 until = ?until.elements(),
568 "creating dataflow",
569 );
570 } else {
571 info!(
572 name = %dataflow.debug_name,
573 import_ids = %dataflow.display_import_ids(),
574 export_ids = %dataflow.display_export_ids(),
575 as_of = ?as_of.elements(),
576 time_dependence = ?dataflow.time_dependence,
577 expiration = ?dataflow_expiration.elements(),
578 expiration_datetime = ?dataflow_expiration
579 .as_option()
580 .map(|t| mz_ore::now::to_datetime(t.into())),
581 plan_until = ?dataflow.until.elements(),
582 until = ?until.elements(),
583 "creating dataflow",
584 );
585 };
586
587 let subscribe_copy_ids: BTreeSet<_> = dataflow
588 .subscribe_ids()
589 .chain(dataflow.copy_to_ids())
590 .collect();
591
592 for object_id in dataflow.export_ids() {
594 let is_subscribe_or_copy = subscribe_copy_ids.contains(&object_id);
595 let metrics = self.compute_state.metrics.for_collection(object_id);
596 let mut collection = CollectionState::new(
597 Rc::clone(&dataflow_index),
598 is_subscribe_or_copy,
599 as_of.clone(),
600 metrics,
601 );
602
603 if let Some(logger) = self.compute_state.compute_logger.clone() {
604 let logging = CollectionLogging::new(
605 object_id,
606 logger,
607 *dataflow_index,
608 dataflow.import_ids(),
609 );
610 collection.logging = Some(logging);
611 }
612
613 collection.reset_reported_frontiers(ReportedFrontier::NotReported {
614 lower: as_of.clone(),
615 });
616
617 let existing = self.compute_state.collections.insert(object_id, collection);
618 if existing.is_some() {
619 error!(
620 id = ?object_id,
621 "existing collection for newly created dataflow",
622 );
623 }
624 }
625
626 let (start_signal, suspension_token) = StartSignal::new();
627 for id in dataflow.export_ids() {
628 self.compute_state
629 .suspended_collections
630 .insert(id, Rc::clone(&suspension_token));
631 }
632
633 crate::render::build_compute_dataflow(
634 self.timely_worker,
635 self.compute_state,
636 dataflow,
637 start_signal,
638 until,
639 dataflow_expiration,
640 );
641 }
642
643 fn handle_schedule(&mut self, id: GlobalId) {
644 let suspension_token = self.compute_state.suspended_collections.remove(&id);
650 drop(suspension_token);
651 }
652
653 fn handle_allow_compaction(&mut self, id: GlobalId, frontier: Antichain<Timestamp>) {
654 if frontier.is_empty() {
655 self.drop_collection(id);
657 } else {
658 self.compute_state
659 .traces
660 .allow_compaction(id, frontier.borrow());
661 }
662 }
663
664 #[mz_ore::instrument(level = "debug")]
665 fn handle_peek(&mut self, peek: Peek) {
666 let pending = match &peek.target {
667 PeekTarget::Index { id } => {
668 let trace_bundle = self.compute_state.traces.get(id).unwrap().clone();
670 PendingPeek::index(peek, trace_bundle)
671 }
672 PeekTarget::Persist { metadata, .. } => {
673 let metadata = metadata.clone();
674 PendingPeek::persist(
675 peek,
676 Arc::clone(&self.compute_state.persist_clients),
677 metadata,
678 usize::cast_from(self.compute_state.max_result_size),
679 self.timely_worker,
680 )
681 }
682 };
683
684 if let Some(logger) = self.compute_state.compute_logger.as_mut() {
686 logger.log(&pending.as_log_event(true));
687 }
688
689 self.process_peek(&mut Antichain::new(), pending);
690 }
691
692 fn handle_cancel_peek(&mut self, uuid: Uuid) {
693 if let Some(peek) = self.compute_state.pending_peeks.remove(&uuid) {
694 self.send_peek_response(peek, PeekResponse::Canceled);
695 }
696 }
697
698 fn handle_allow_writes(&mut self, id: GlobalId) {
699 self.compute_state.persist_clients.cfg().enable_compaction();
703
704 if let Some(collection) = self.compute_state.collections.get_mut(&id) {
705 collection.allow_writes();
706 } else {
707 soft_panic_or_log!("allow writes for unknown collection {id}");
708 }
709 }
710
711 fn drop_collection(&mut self, id: GlobalId) {
713 let collection = self
714 .compute_state
715 .collections
716 .remove(&id)
717 .expect("dropped untracked collection");
718
719 self.compute_state.traces.remove(&id);
721 self.compute_state.suspended_collections.remove(&id);
723
724 if let Ok(index) = Rc::try_unwrap(collection.dataflow_index) {
726 self.timely_worker.drop_dataflow(index);
727 }
728
729 if !collection.is_subscribe_or_copy {
734 let reported = collection.reported_frontiers;
735 let write_frontier = (!reported.write_frontier.is_empty()).then(Antichain::new);
736 let input_frontier = (!reported.input_frontier.is_empty()).then(Antichain::new);
737 let output_frontier = (!reported.output_frontier.is_empty()).then(Antichain::new);
738
739 let frontiers = FrontiersResponse {
740 write_frontier,
741 input_frontier,
742 output_frontier,
743 };
744 if frontiers.has_updates() {
745 self.send_compute_response(ComputeResponse::Frontiers(id, frontiers));
746 }
747 }
748 }
749
750 pub fn initialize_logging(
752 &mut self,
753 config: LoggingConfig,
754 storage_log_reader: Option<crate::server::StorageTimelyLogReader>,
755 ) {
756 if self.compute_state.compute_logger.is_some() {
757 panic!("dataflow server has already initialized logging");
758 }
759
760 let LoggingTraces {
761 traces,
762 dataflow_index,
763 compute_logger: logger,
764 } = logging::initialize(
765 self.timely_worker,
766 &config,
767 self.compute_state.metrics_registry.clone(),
768 Rc::clone(&self.compute_state.worker_config),
769 self.compute_state.workers_per_process,
770 storage_log_reader,
771 );
772
773 let dataflow_index = Rc::new(dataflow_index);
774 let mut log_index_ids = config.index_logs;
775 for (log, trace) in traces {
776 let id = log_index_ids
778 .remove(&log)
779 .expect("`logging::initialize` does not invent logs");
780 self.compute_state.traces.set(id, trace);
781
782 let is_subscribe_or_copy = false;
784 let as_of = Antichain::from_elem(Timestamp::MIN);
785 let metrics = self.compute_state.metrics.for_collection(id);
786 let mut collection = CollectionState::new(
787 Rc::clone(&dataflow_index),
788 is_subscribe_or_copy,
789 as_of,
790 metrics,
791 );
792
793 let logging =
794 CollectionLogging::new(id, logger.clone(), *dataflow_index, std::iter::empty());
795 collection.logging = Some(logging);
796
797 let existing = self.compute_state.collections.insert(id, collection);
798 if existing.is_some() {
799 error!(
800 id = ?id,
801 "existing collection for newly initialized logging export",
802 );
803 }
804 }
805
806 assert!(
808 log_index_ids.is_empty(),
809 "failed to create requested logging indexes: {log_index_ids:?}",
810 );
811
812 self.compute_state.compute_logger = Some(logger);
813 }
814
815 pub fn report_frontiers(&mut self) {
817 let mut responses = Vec::new();
818
819 let mut new_frontier = Antichain::new();
821
822 for (&id, collection) in self.compute_state.collections.iter_mut() {
823 if collection.is_subscribe_or_copy {
826 continue;
827 }
828
829 let reported = collection.reported_frontiers();
830
831 new_frontier.clear();
833 if let Some(traces) = self.compute_state.traces.get_mut(&id) {
834 assert!(
835 collection.sink_write_frontier.is_none(),
836 "collection {id} has multiple frontiers"
837 );
838 traces.oks_mut().read_upper(&mut new_frontier);
839 } else if let Some(frontier) = &collection.sink_write_frontier {
840 new_frontier.clone_from(&frontier.borrow());
841 } else {
842 error!(id = ?id, "collection without write frontier");
843 continue;
844 }
845 let new_write_frontier = reported
846 .write_frontier
847 .allows_reporting(&new_frontier)
848 .then(|| new_frontier.clone());
849
850 if let Some(probe) = &collection.compute_probe {
863 if *collection.read_only_rx.borrow() {
864 new_frontier.clear();
865 }
866 probe.with_frontier(|frontier| new_frontier.extend(frontier.iter().copied()));
867 }
868 let new_output_frontier = reported
869 .output_frontier
870 .allows_reporting(&new_frontier)
871 .then(|| new_frontier.clone());
872
873 new_frontier.clear();
875 for probe in collection.input_probes.values() {
876 probe.with_frontier(|frontier| new_frontier.extend(frontier.iter().copied()));
877 }
878 let new_input_frontier = reported
879 .input_frontier
880 .allows_reporting(&new_frontier)
881 .then(|| new_frontier.clone());
882
883 if let Some(frontier) = &new_write_frontier {
884 collection
885 .set_reported_write_frontier(ReportedFrontier::Reported(frontier.clone()));
886 }
887 if let Some(frontier) = &new_input_frontier {
888 collection
889 .set_reported_input_frontier(ReportedFrontier::Reported(frontier.clone()));
890 }
891 if let Some(frontier) = &new_output_frontier {
892 collection
893 .set_reported_output_frontier(ReportedFrontier::Reported(frontier.clone()));
894 }
895
896 let response = FrontiersResponse {
897 write_frontier: new_write_frontier,
898 input_frontier: new_input_frontier,
899 output_frontier: new_output_frontier,
900 };
901 if response.has_updates() {
902 responses.push((id, response));
903 }
904 }
905
906 for (id, frontiers) in responses {
907 self.send_compute_response(ComputeResponse::Frontiers(id, frontiers));
908 }
909 }
910
911 pub(crate) fn report_metrics(&self) {
913 if let Some(expiration) = self.compute_state.replica_expiration.as_option() {
914 let now = Duration::from_millis(mz_ore::now::SYSTEM_TIME()).as_secs_f64();
915 let expiration = Duration::from_millis(<u64>::from(expiration)).as_secs_f64();
916 let remaining = expiration - now;
917 self.compute_state
918 .metrics
919 .replica_expiration_remaining_seconds
920 .set(remaining)
921 }
922 }
923
924 fn process_peek(&mut self, upper: &mut Antichain<Timestamp>, mut peek: PendingPeek) {
926 let response = match &mut peek {
927 PendingPeek::Index(peek) => {
928 let start = Instant::now();
929
930 let peek_stash_eligible = peek
931 .peek
932 .finishing
933 .is_streamable(peek.peek.result_desc.arity());
934
935 let peek_stash_enabled = {
936 let enabled = ENABLE_PEEK_RESPONSE_STASH.get(&self.compute_state.worker_config);
937 let peek_persist_stash_available =
938 self.compute_state.peek_stash_persist_location.is_some();
939 if !peek_persist_stash_available && enabled {
940 error!("missing peek_stash_persist_location but peek stash is enabled");
941 }
942 enabled && peek_persist_stash_available
943 };
944
945 let peek_stash_threshold_bytes =
946 PEEK_RESPONSE_STASH_THRESHOLD_BYTES.get(&self.compute_state.worker_config);
947
948 let metrics = IndexPeekMetrics {
949 seek_fulfillment_seconds: &self
950 .compute_state
951 .metrics
952 .index_peek_seek_fulfillment_seconds,
953 frontier_check_seconds: &self
954 .compute_state
955 .metrics
956 .index_peek_frontier_check_seconds,
957 error_scan_seconds: &self.compute_state.metrics.index_peek_error_scan_seconds,
958 cursor_setup_seconds: &self
959 .compute_state
960 .metrics
961 .index_peek_cursor_setup_seconds,
962 row_iteration_seconds: &self
963 .compute_state
964 .metrics
965 .index_peek_row_iteration_seconds,
966 result_sort_seconds: &self.compute_state.metrics.index_peek_result_sort_seconds,
967 row_collection_seconds: &self
968 .compute_state
969 .metrics
970 .index_peek_row_collection_seconds,
971 };
972
973 let status = peek.seek_fulfillment(
974 upper,
975 self.compute_state.max_result_size,
976 peek_stash_enabled && peek_stash_eligible,
977 peek_stash_threshold_bytes,
978 &metrics,
979 );
980
981 self.compute_state
982 .metrics
983 .index_peek_total_seconds
984 .observe(start.elapsed().as_secs_f64());
985
986 match status {
987 PeekStatus::Ready(result) => Some(result),
988 PeekStatus::NotReady => None,
989 PeekStatus::UsePeekStash => {
990 let _span =
991 span!(parent: &peek.span, Level::DEBUG, "process_stash_peek").entered();
992
993 let peek_stash_batch_max_runs = PEEK_RESPONSE_STASH_BATCH_MAX_RUNS
994 .get(&self.compute_state.worker_config);
995
996 let stash_task = peek_stash::StashingPeek::start_upload(
997 Arc::clone(&self.compute_state.persist_clients),
998 self.compute_state
999 .peek_stash_persist_location
1000 .as_ref()
1001 .expect("verified above"),
1002 peek.peek.clone(),
1003 peek.trace_bundle.clone(),
1004 peek_stash_batch_max_runs,
1005 );
1006
1007 self.compute_state
1008 .pending_peeks
1009 .insert(peek.peek.uuid, PendingPeek::Stash(stash_task));
1010 return;
1011 }
1012 }
1013 }
1014 PendingPeek::Persist(peek) => peek.result.try_recv().ok().map(|(result, duration)| {
1015 self.compute_state
1016 .metrics
1017 .persist_peek_seconds
1018 .observe(duration.as_secs_f64());
1019 result
1020 }),
1021 PendingPeek::Stash(stashing_peek) => {
1022 let num_batches = PEEK_STASH_NUM_BATCHES.get(&self.compute_state.worker_config);
1023 let batch_size = PEEK_STASH_BATCH_SIZE.get(&self.compute_state.worker_config);
1024 stashing_peek.pump_rows(num_batches, batch_size);
1025
1026 if let Ok((response, duration)) = stashing_peek.result.try_recv() {
1027 self.compute_state
1028 .metrics
1029 .stashed_peek_seconds
1030 .observe(duration.as_secs_f64());
1031 trace!(?stashing_peek.peek, ?duration, "finished stashing peek response in persist");
1032
1033 Some(response)
1034 } else {
1035 None
1036 }
1037 }
1038 };
1039
1040 if let Some(response) = response {
1041 let _span = span!(parent: peek.span(), Level::DEBUG, "process_peek_response").entered();
1042 self.send_peek_response(peek, response)
1043 } else {
1044 let uuid = peek.peek().uuid;
1045 self.compute_state.pending_peeks.insert(uuid, peek);
1046 }
1047 }
1048
1049 pub fn process_peeks(&mut self) {
1051 let mut upper = Antichain::new();
1052 let pending_peeks = std::mem::take(&mut self.compute_state.pending_peeks);
1053 for (_uuid, peek) in pending_peeks {
1054 self.process_peek(&mut upper, peek);
1055 }
1056 }
1057
1058 #[mz_ore::instrument(level = "debug")]
1063 fn send_peek_response(&mut self, peek: PendingPeek, response: PeekResponse) {
1064 let log_event = peek.as_log_event(false);
1065 self.send_compute_response(ComputeResponse::PeekResponse(
1067 peek.peek().uuid,
1068 response,
1069 OpenTelemetryContext::obtain(),
1070 ));
1071
1072 if let Some(logger) = self.compute_state.compute_logger.as_mut() {
1074 logger.log(&log_event);
1075 }
1076 }
1077
1078 pub fn process_subscribes(&mut self) {
1080 let mut subscribe_responses = self.compute_state.subscribe_response_buffer.borrow_mut();
1081 for (sink_id, mut response) in subscribe_responses.drain(..) {
1082 if let Some(collection) = self.compute_state.collections.get_mut(&sink_id) {
1084 let new_frontier = match &response {
1085 SubscribeResponse::Batch(b) => b.upper.clone(),
1086 SubscribeResponse::DroppedAt(_) => Antichain::new(),
1087 };
1088
1089 let reported = collection.reported_frontiers();
1090 assert!(
1091 reported.write_frontier.allows_reporting(&new_frontier),
1092 "subscribe write frontier regression: {:?} -> {:?}",
1093 reported.write_frontier,
1094 new_frontier,
1095 );
1096 assert!(
1097 reported.input_frontier.allows_reporting(&new_frontier),
1098 "subscribe input frontier regression: {:?} -> {:?}",
1099 reported.input_frontier,
1100 new_frontier,
1101 );
1102
1103 collection
1104 .set_reported_write_frontier(ReportedFrontier::Reported(new_frontier.clone()));
1105 collection
1106 .set_reported_input_frontier(ReportedFrontier::Reported(new_frontier.clone()));
1107 collection.set_reported_output_frontier(ReportedFrontier::Reported(new_frontier));
1108 } else {
1109 }
1112
1113 response
1114 .to_error_if_exceeds(usize::try_from(self.compute_state.max_result_size).unwrap());
1115 self.send_compute_response(ComputeResponse::SubscribeResponse(sink_id, response));
1116 }
1117 }
1118
1119 pub fn process_copy_tos(&self) {
1121 let mut responses = self.compute_state.copy_to_response_buffer.borrow_mut();
1122 for (sink_id, response) in responses.drain(..) {
1123 self.send_compute_response(ComputeResponse::CopyToResponse(sink_id, response));
1124 }
1125 }
1126
1127 fn send_compute_response(&self, response: ComputeResponse) {
1129 let _ = self.response_tx.send(response);
1132 }
1133
1134 pub(crate) fn check_expiration(&self) {
1136 let now = mz_ore::now::SYSTEM_TIME();
1137 if self.compute_state.replica_expiration.less_than(&now.into()) {
1138 let now_datetime = mz_ore::now::to_datetime(now);
1139 let expiration_datetime = self
1140 .compute_state
1141 .replica_expiration
1142 .as_option()
1143 .map(Into::into)
1144 .map(mz_ore::now::to_datetime);
1145
1146 error!(
1149 now,
1150 now_datetime = ?now_datetime,
1151 expiration = ?self.compute_state.replica_expiration.elements(),
1152 expiration_datetime = ?expiration_datetime,
1153 "replica expired"
1154 );
1155
1156 assert!(
1158 !self.compute_state.replica_expiration.less_than(&now.into()),
1159 "replica expired. now: {now} ({now_datetime:?}), expiration: {:?} ({expiration_datetime:?})",
1160 self.compute_state.replica_expiration.elements(),
1161 );
1162 }
1163 }
1164
1165 pub fn determine_dataflow_expiration(
1171 &self,
1172 time_dependence: &TimeDependence,
1173 until: &Antichain<Timestamp>,
1174 ) -> Antichain<Timestamp> {
1175 let iter = self
1180 .compute_state
1181 .replica_expiration
1182 .iter()
1183 .filter_map(|t| time_dependence.apply(*t))
1184 .filter_map(|t| Timestamp::try_step_forward(&t))
1185 .filter(|expiration| !until.less_equal(expiration));
1186 Antichain::from_iter(iter)
1187 }
1188}
1189
1190pub enum PendingPeek {
1195 Index(IndexPeek),
1197 Persist(PersistPeek),
1199 Stash(peek_stash::StashingPeek),
1202}
1203
1204impl PendingPeek {
1205 pub fn as_log_event(&self, installed: bool) -> ComputeEvent {
1207 let peek = self.peek();
1208 let (id, peek_type) = match &peek.target {
1209 PeekTarget::Index { id } => (*id, logging::compute::PeekType::Index),
1210 PeekTarget::Persist { id, .. } => (*id, logging::compute::PeekType::Persist),
1211 };
1212 let uuid = peek.uuid.into_bytes();
1213 ComputeEvent::Peek(PeekEvent {
1214 id,
1215 time: peek.timestamp,
1216 uuid,
1217 peek_type,
1218 installed,
1219 })
1220 }
1221
1222 fn index(peek: Peek, mut trace_bundle: TraceBundle) -> Self {
1223 let empty_frontier = Antichain::new();
1224 let timestamp_frontier = Antichain::from_elem(peek.timestamp);
1225 trace_bundle
1226 .oks_mut()
1227 .set_logical_compaction(timestamp_frontier.borrow());
1228 trace_bundle
1229 .errs_mut()
1230 .set_logical_compaction(timestamp_frontier.borrow());
1231 trace_bundle
1232 .oks_mut()
1233 .set_physical_compaction(empty_frontier.borrow());
1234 trace_bundle
1235 .errs_mut()
1236 .set_physical_compaction(empty_frontier.borrow());
1237
1238 PendingPeek::Index(IndexPeek {
1239 peek,
1240 trace_bundle,
1241 span: tracing::Span::current(),
1242 })
1243 }
1244
1245 fn persist(
1246 peek: Peek,
1247 persist_clients: Arc<PersistClientCache>,
1248 metadata: CollectionMetadata,
1249 max_result_size: usize,
1250 timely_worker: &TimelyWorker,
1251 ) -> Self {
1252 let active_worker = {
1253 let chosen_index = usize::cast_from(peek.uuid.hashed()) % timely_worker.peers();
1255 chosen_index == timely_worker.index()
1256 };
1257 let activator = timely_worker.sync_activator_for([].into());
1258 let peek_uuid = peek.uuid;
1259
1260 let (result_tx, result_rx) = oneshot::channel();
1261 let timestamp = peek.timestamp;
1262 let mfp_plan = peek.map_filter_project.clone();
1263 let max_results_needed = peek
1264 .finishing
1265 .limit
1266 .map(|l| usize::cast_from(u64::from(l)))
1267 .unwrap_or(usize::MAX)
1268 + peek.finishing.offset;
1269 let order_by = peek.finishing.order_by.clone();
1270
1271 let literal_constraint = peek
1273 .literal_constraints
1274 .clone()
1275 .map(|rows| rows.into_element());
1276
1277 let task_handle = mz_ore::task::spawn(|| "persist::peek", async move {
1278 let start = Instant::now();
1279 let result = if active_worker {
1280 PersistPeek::do_peek(
1281 &persist_clients,
1282 metadata,
1283 timestamp,
1284 literal_constraint,
1285 mfp_plan,
1286 max_result_size,
1287 max_results_needed,
1288 )
1289 .await
1290 } else {
1291 Ok(vec![])
1292 };
1293 let result = match result {
1294 Ok(rows) => PeekResponse::Rows(vec![RowCollection::new(rows, &order_by)]),
1295 Err(e) => PeekResponse::Error(e.to_string()),
1296 };
1297 match result_tx.send((result, start.elapsed())) {
1298 Ok(()) => {}
1299 Err((_result, elapsed)) => {
1300 debug!(duration =? elapsed, "dropping result for cancelled peek {peek_uuid}")
1301 }
1302 }
1303 match activator.activate() {
1304 Ok(()) => {}
1305 Err(_) => {
1306 debug!("unable to wake timely after completed peek {peek_uuid}");
1307 }
1308 }
1309 });
1310 PendingPeek::Persist(PersistPeek {
1311 peek,
1312 _abort_handle: task_handle.abort_on_drop(),
1313 result: result_rx,
1314 span: tracing::Span::current(),
1315 })
1316 }
1317
1318 fn span(&self) -> &tracing::Span {
1319 match self {
1320 PendingPeek::Index(p) => &p.span,
1321 PendingPeek::Persist(p) => &p.span,
1322 PendingPeek::Stash(p) => &p.span,
1323 }
1324 }
1325
1326 pub(crate) fn peek(&self) -> &Peek {
1327 match self {
1328 PendingPeek::Index(p) => &p.peek,
1329 PendingPeek::Persist(p) => &p.peek,
1330 PendingPeek::Stash(p) => &p.peek,
1331 }
1332 }
1333}
1334
1335pub struct PersistPeek {
1340 pub(crate) peek: Peek,
1341 _abort_handle: AbortOnDropHandle<()>,
1344 result: oneshot::Receiver<(PeekResponse, Duration)>,
1346 span: tracing::Span,
1348}
1349
1350impl PersistPeek {
1351 async fn do_peek(
1352 persist_clients: &PersistClientCache,
1353 metadata: CollectionMetadata,
1354 as_of: Timestamp,
1355 literal_constraint: Option<Row>,
1356 mfp_plan: SafeMfpPlan,
1357 max_result_size: usize,
1358 mut limit_remaining: usize,
1359 ) -> Result<Vec<(Row, NonZeroUsize)>, String> {
1360 let client = persist_clients
1361 .open(metadata.persist_location)
1362 .await
1363 .map_err(|e| e.to_string())?;
1364
1365 let mut reader: ReadHandle<SourceData, (), Timestamp, StorageDiff> = client
1366 .open_leased_reader(
1367 metadata.data_shard,
1368 Arc::new(metadata.relation_desc.clone()),
1369 Arc::new(UnitSchema),
1370 Diagnostics::from_purpose("persist::peek"),
1371 USE_CRITICAL_SINCE_SNAPSHOT.get(client.dyncfgs()),
1372 )
1373 .await
1374 .map_err(|e| e.to_string())?;
1375
1376 let mut txns_read = if let Some(txns_id) = metadata.txns_shard {
1383 Some(TxnsCache::open(&client, txns_id, Some(metadata.data_shard)).await)
1384 } else {
1385 None
1386 };
1387
1388 let metrics = client.metrics();
1389
1390 let mut cursor = StatsCursor::new(
1391 &mut reader,
1392 txns_read.as_mut(),
1393 metrics,
1394 &mfp_plan,
1395 &metadata.relation_desc,
1396 Antichain::from_elem(as_of),
1397 )
1398 .await
1399 .map_err(|since| {
1400 format!("attempted to peek at {as_of}, but the since has advanced to {since:?}")
1401 })?;
1402
1403 let mut result = vec![];
1405 let mut datum_vec = DatumVec::new();
1406 let mut row_builder = Row::default();
1407 let arena = RowArena::new();
1408 let mut total_size = 0usize;
1409
1410 let literal_len = match &literal_constraint {
1411 None => 0,
1412 Some(row) => row.iter().count(),
1413 };
1414
1415 'collect: while limit_remaining > 0 {
1416 let Some(batch) = cursor.next().await else {
1417 break;
1418 };
1419 for (data, _, d) in batch {
1420 let row = data.map_err(|e| e.to_string())?;
1421
1422 if let Some(literal) = &literal_constraint {
1423 match row.iter().take(literal_len).cmp(literal.iter()) {
1424 Ordering::Less => continue,
1425 Ordering::Equal => {}
1426 Ordering::Greater => break 'collect,
1427 }
1428 }
1429
1430 let count: usize = d.try_into().map_err(|_| {
1431 error!(
1432 shard = %metadata.data_shard, diff = d, ?row,
1433 "persist peek encountered negative multiplicities",
1434 );
1435 format!(
1436 "Invalid data in source, \
1437 saw retractions ({}) for row that does not exist: {:?}",
1438 -d, row,
1439 )
1440 })?;
1441 let Some(count) = NonZeroUsize::new(count) else {
1442 continue;
1443 };
1444 let mut datum_local = datum_vec.borrow_with(&row);
1445 let eval_result = mfp_plan
1446 .evaluate_into(&mut datum_local, &arena, &mut row_builder)
1447 .map(|row| row.cloned())
1448 .map_err(|e| e.to_string())?;
1449 if let Some(row) = eval_result {
1450 total_size = total_size
1451 .saturating_add(row.byte_len())
1452 .saturating_add(std::mem::size_of::<NonZeroUsize>());
1453 if total_size > max_result_size {
1454 return Err(format!(
1455 "result exceeds max size of {}",
1456 ByteSize::b(u64::cast_from(max_result_size))
1457 ));
1458 }
1459 result.push((row, count));
1460 limit_remaining = limit_remaining.saturating_sub(count.get());
1461 if limit_remaining == 0 {
1462 break;
1463 }
1464 }
1465 }
1466 }
1467
1468 Ok(result)
1469 }
1470}
1471
1472pub struct IndexPeek {
1474 peek: Peek,
1475 trace_bundle: TraceBundle,
1477 span: tracing::Span,
1479}
1480
1481pub(crate) struct IndexPeekMetrics<'a> {
1486 pub seek_fulfillment_seconds: &'a prometheus::Histogram,
1487 pub frontier_check_seconds: &'a prometheus::Histogram,
1488 pub error_scan_seconds: &'a prometheus::Histogram,
1489 pub cursor_setup_seconds: &'a prometheus::Histogram,
1490 pub row_iteration_seconds: &'a prometheus::Histogram,
1491 pub result_sort_seconds: &'a prometheus::Histogram,
1492 pub row_collection_seconds: &'a prometheus::Histogram,
1493}
1494
1495impl IndexPeek {
1496 fn seek_fulfillment(
1509 &mut self,
1510 upper: &mut Antichain<Timestamp>,
1511 max_result_size: u64,
1512 peek_stash_eligible: bool,
1513 peek_stash_threshold_bytes: usize,
1514 metrics: &IndexPeekMetrics<'_>,
1515 ) -> PeekStatus {
1516 let method_start = Instant::now();
1517
1518 self.trace_bundle.oks_mut().read_upper(upper);
1519 if upper.less_equal(&self.peek.timestamp) {
1520 return PeekStatus::NotReady;
1521 }
1522 self.trace_bundle.errs_mut().read_upper(upper);
1523 if upper.less_equal(&self.peek.timestamp) {
1524 return PeekStatus::NotReady;
1525 }
1526
1527 let read_frontier = self.trace_bundle.compaction_frontier();
1528 if !read_frontier.less_equal(&self.peek.timestamp) {
1529 let error = format!(
1530 "Arrangement compaction frontier ({:?}) is beyond the time of the attempted read ({})",
1531 read_frontier.elements(),
1532 self.peek.timestamp,
1533 );
1534 return PeekStatus::Ready(PeekResponse::Error(error));
1535 }
1536
1537 metrics
1538 .frontier_check_seconds
1539 .observe(method_start.elapsed().as_secs_f64());
1540
1541 let result = self.collect_finished_data(
1542 max_result_size,
1543 peek_stash_eligible,
1544 peek_stash_threshold_bytes,
1545 metrics,
1546 );
1547
1548 metrics
1549 .seek_fulfillment_seconds
1550 .observe(method_start.elapsed().as_secs_f64());
1551
1552 result
1553 }
1554
1555 fn collect_finished_data(
1557 &mut self,
1558 max_result_size: u64,
1559 peek_stash_eligible: bool,
1560 peek_stash_threshold_bytes: usize,
1561 metrics: &IndexPeekMetrics<'_>,
1562 ) -> PeekStatus {
1563 let error_scan_start = Instant::now();
1564
1565 let (mut cursor, storage) = self.trace_bundle.errs_mut().cursor();
1568 while cursor.key_valid(&storage) {
1569 let mut copies = Diff::ZERO;
1570 cursor.map_times(&storage, |time, diff| {
1571 if time.less_equal(&self.peek.timestamp) {
1572 copies += diff;
1573 }
1574 });
1575 if copies.is_negative() {
1576 let error = cursor.key(&storage);
1577 error!(
1578 target = %self.peek.target.id(), diff = %copies, %error,
1579 "index peek encountered negative multiplicities in error trace",
1580 );
1581 return PeekStatus::Ready(PeekResponse::Error(format!(
1582 "Invalid data in source errors, \
1583 saw retractions ({}) for row that does not exist: {}",
1584 -copies, error,
1585 )));
1586 }
1587 if copies.is_positive() {
1588 return PeekStatus::Ready(PeekResponse::Error(cursor.key(&storage).to_string()));
1589 }
1590 cursor.step_key(&storage);
1591 }
1592
1593 metrics
1594 .error_scan_seconds
1595 .observe(error_scan_start.elapsed().as_secs_f64());
1596
1597 Self::collect_ok_finished_data(
1598 &self.peek,
1599 self.trace_bundle.oks_mut(),
1600 max_result_size,
1601 peek_stash_eligible,
1602 peek_stash_threshold_bytes,
1603 metrics,
1604 )
1605 }
1606
1607 fn collect_ok_finished_data<Tr>(
1609 peek: &Peek,
1610 oks_handle: &mut Tr,
1611 max_result_size: u64,
1612 peek_stash_eligible: bool,
1613 peek_stash_threshold_bytes: usize,
1614 metrics: &IndexPeekMetrics<'_>,
1615 ) -> PeekStatus
1616 where
1617 for<'a> Tr: TraceReader<
1618 Key<'a>: ExtendDatums + Eq,
1619 KeyContainer: BatchContainer<Owned = Row>,
1620 Val<'a>: ExtendDatums,
1621 TimeGat<'a>: PartialOrder<Timestamp>,
1622 DiffGat<'a> = &'a Diff,
1623 >,
1624 {
1625 let max_result_size = usize::cast_from(max_result_size);
1626 let count_byte_size = size_of::<NonZeroUsize>();
1627
1628 let cursor_setup_start = Instant::now();
1630
1631 let mut peek_iterator = peek_result_iterator::PeekResultIterator::new(
1634 peek.target.id().clone(),
1635 peek.map_filter_project.clone(),
1636 peek.timestamp,
1637 peek.literal_constraints.clone().as_deref_mut(),
1638 oks_handle,
1639 );
1640
1641 metrics
1642 .cursor_setup_seconds
1643 .observe(cursor_setup_start.elapsed().as_secs_f64());
1644
1645 let mut results = Vec::new();
1647 let mut total_size: usize = 0;
1648
1649 let max_results = peek.finishing.num_rows_needed();
1655
1656 let comparator = RowComparator::new(peek.finishing.order_by.as_slice());
1657
1658 let row_iteration_start = Instant::now();
1660 let mut sort_time_accum = Duration::ZERO;
1661
1662 while let Some(row) = peek_iterator.next() {
1663 let row: (Row, _) = match row {
1664 Ok(row) => row,
1665 Err(err) => return PeekStatus::Ready(PeekResponse::Error(err)),
1666 };
1667 let (row, copies) = row;
1668 let copies: NonZeroUsize = NonZeroUsize::try_from(copies).expect("fits into usize");
1669
1670 total_size = total_size
1671 .saturating_add(row.byte_len())
1672 .saturating_add(count_byte_size);
1673 if peek_stash_eligible && total_size > peek_stash_threshold_bytes {
1674 return PeekStatus::UsePeekStash;
1675 }
1676 if total_size > max_result_size {
1677 return PeekStatus::Ready(PeekResponse::Error(format!(
1678 "result exceeds max size of {}",
1679 ByteSize::b(u64::cast_from(max_result_size))
1680 )));
1681 }
1682
1683 results.push((row, copies));
1684
1685 if let Some(max_results) = max_results {
1688 if results.len() >= 2 * max_results {
1692 if peek.finishing.order_by.is_empty() {
1693 results.truncate(max_results);
1694 metrics
1695 .row_iteration_seconds
1696 .observe(row_iteration_start.elapsed().as_secs_f64());
1697 metrics
1698 .result_sort_seconds
1699 .observe(sort_time_accum.as_secs_f64());
1700 let row_collection_start = Instant::now();
1701 let collection = RowCollection::new(results, &peek.finishing.order_by);
1702 metrics
1703 .row_collection_seconds
1704 .observe(row_collection_start.elapsed().as_secs_f64());
1705 return PeekStatus::Ready(PeekResponse::Rows(vec![collection]));
1706 } else {
1707 let sort_start = Instant::now();
1716 results.sort_by(|left, right| {
1717 comparator.compare_rows(&left.0, &right.0, || left.0.cmp(&right.0))
1718 });
1719 sort_time_accum += sort_start.elapsed();
1720 let dropped = results.drain(max_results..);
1721 let dropped_size =
1722 dropped
1723 .into_iter()
1724 .fold(0, |acc: usize, (row, _count): (Row, _)| {
1725 acc.saturating_add(
1726 row.byte_len().saturating_add(count_byte_size),
1727 )
1728 });
1729 total_size = total_size.saturating_sub(dropped_size);
1730 }
1731 }
1732 }
1733 }
1734
1735 metrics
1736 .row_iteration_seconds
1737 .observe(row_iteration_start.elapsed().as_secs_f64());
1738 metrics
1739 .result_sort_seconds
1740 .observe(sort_time_accum.as_secs_f64());
1741
1742 let row_collection_start = Instant::now();
1743 let collection = RowCollection::new(results, &peek.finishing.order_by);
1744 metrics
1745 .row_collection_seconds
1746 .observe(row_collection_start.elapsed().as_secs_f64());
1747 PeekStatus::Ready(PeekResponse::Rows(vec![collection]))
1748 }
1749}
1750
1751enum PeekStatus {
1754 NotReady,
1757 UsePeekStash,
1760 Ready(PeekResponse),
1762}
1763
1764#[derive(Debug)]
1766struct ReportedFrontiers {
1767 write_frontier: ReportedFrontier,
1769 input_frontier: ReportedFrontier,
1771 output_frontier: ReportedFrontier,
1773}
1774
1775impl ReportedFrontiers {
1776 fn new() -> Self {
1778 Self {
1779 write_frontier: ReportedFrontier::new(),
1780 input_frontier: ReportedFrontier::new(),
1781 output_frontier: ReportedFrontier::new(),
1782 }
1783 }
1784}
1785
1786#[derive(Clone, Debug)]
1788pub enum ReportedFrontier {
1789 Reported(Antichain<Timestamp>),
1791 NotReported {
1793 lower: Antichain<Timestamp>,
1795 },
1796}
1797
1798impl ReportedFrontier {
1799 pub fn new() -> Self {
1801 let lower = Antichain::from_elem(timely::progress::Timestamp::minimum());
1802 Self::NotReported { lower }
1803 }
1804
1805 pub fn is_empty(&self) -> bool {
1807 match self {
1808 Self::Reported(frontier) => frontier.is_empty(),
1809 Self::NotReported { .. } => false,
1810 }
1811 }
1812
1813 fn allows_reporting(&self, other: &Antichain<Timestamp>) -> bool {
1819 match self {
1820 Self::Reported(frontier) => PartialOrder::less_than(frontier, other),
1821 Self::NotReported { lower } => PartialOrder::less_equal(lower, other),
1822 }
1823 }
1824}
1825
1826pub struct CollectionState {
1828 reported_frontiers: ReportedFrontiers,
1830 dataflow_index: Rc<usize>,
1836 pub is_subscribe_or_copy: bool,
1842 as_of: Antichain<Timestamp>,
1846
1847 pub sink_token: Option<SinkToken>,
1852 pub sink_write_frontier: Option<Rc<RefCell<Antichain<Timestamp>>>>,
1856 pub input_probes: BTreeMap<GlobalId, probe::Handle<Timestamp>>,
1858 pub compute_probe: Option<probe::Handle<Timestamp>>,
1863 logging: Option<CollectionLogging>,
1865 metrics: CollectionMetrics,
1867 read_only_tx: watch::Sender<bool>,
1879 pub read_only_rx: watch::Receiver<bool>,
1881}
1882
1883impl CollectionState {
1884 fn new(
1885 dataflow_index: Rc<usize>,
1886 is_subscribe_or_copy: bool,
1887 as_of: Antichain<Timestamp>,
1888 metrics: CollectionMetrics,
1889 ) -> Self {
1890 let (read_only_tx, read_only_rx) = watch::channel(true);
1893
1894 Self {
1895 reported_frontiers: ReportedFrontiers::new(),
1896 dataflow_index,
1897 is_subscribe_or_copy,
1898 as_of,
1899 sink_token: None,
1900 sink_write_frontier: None,
1901 input_probes: Default::default(),
1902 compute_probe: None,
1903 logging: None,
1904 metrics,
1905 read_only_tx,
1906 read_only_rx,
1907 }
1908 }
1909
1910 fn reported_frontiers(&self) -> &ReportedFrontiers {
1912 &self.reported_frontiers
1913 }
1914
1915 pub fn reset_reported_frontiers(&mut self, frontier: ReportedFrontier) {
1917 self.reported_frontiers.write_frontier = frontier.clone();
1918 self.reported_frontiers.input_frontier = frontier.clone();
1919 self.reported_frontiers.output_frontier = frontier;
1920 }
1921
1922 fn set_reported_write_frontier(&mut self, frontier: ReportedFrontier) {
1924 if let Some(logging) = &mut self.logging {
1925 let time = match &frontier {
1926 ReportedFrontier::Reported(frontier) => frontier.get(0).copied(),
1927 ReportedFrontier::NotReported { .. } => Some(Timestamp::MIN),
1928 };
1929 logging.set_frontier(time);
1930 }
1931
1932 self.reported_frontiers.write_frontier = frontier;
1933 }
1934
1935 fn set_reported_input_frontier(&mut self, frontier: ReportedFrontier) {
1937 if let Some(logging) = &mut self.logging {
1939 for (id, probe) in &self.input_probes {
1940 let new_time = probe.with_frontier(|frontier| frontier.as_option().copied());
1941 logging.set_import_frontier(*id, new_time);
1942 }
1943 }
1944
1945 self.reported_frontiers.input_frontier = frontier;
1946 }
1947
1948 fn set_reported_output_frontier(&mut self, frontier: ReportedFrontier) {
1950 let already_hydrated = self.hydrated();
1951
1952 self.reported_frontiers.output_frontier = frontier;
1953
1954 if !already_hydrated && self.hydrated() {
1955 if let Some(logging) = &mut self.logging {
1956 logging.set_hydrated();
1957 }
1958 self.metrics.record_collection_hydrated();
1959 }
1960 }
1961
1962 fn hydrated(&self) -> bool {
1964 match &self.reported_frontiers.output_frontier {
1965 ReportedFrontier::Reported(frontier) => PartialOrder::less_than(&self.as_of, frontier),
1966 ReportedFrontier::NotReported { .. } => false,
1967 }
1968 }
1969
1970 fn allow_writes(&self) {
1972 info!(
1973 dataflow_index = *self.dataflow_index,
1974 export = ?self.logging.as_ref().map(|l| l.export_id()),
1975 "allowing writes for dataflow",
1976 );
1977 let _ = self.read_only_tx.send(false);
1978 }
1979}