1use std::any::Any;
9use std::cell::RefCell;
10use std::cmp::Ordering;
11use std::collections::{BTreeMap, BTreeSet};
12use std::num::NonZeroUsize;
13use std::rc::Rc;
14use std::sync::Arc;
15use std::time::{Duration, Instant};
16
17use bytesize::ByteSize;
18use differential_dataflow::Hashable;
19use differential_dataflow::lattice::Lattice;
20use differential_dataflow::trace::{Cursor, TraceReader};
21use mz_compute_client::logging::LoggingConfig;
22use mz_compute_client::protocol::command::{
23 ComputeCommand, ComputeParameters, InstanceConfig, Peek, PeekTarget,
24};
25use mz_compute_client::protocol::history::ComputeCommandHistory;
26use mz_compute_client::protocol::response::{
27 ComputeResponse, CopyToResponse, FrontiersResponse, PeekResponse, SubscribeResponse,
28};
29use mz_compute_types::dataflows::DataflowDescription;
30use mz_compute_types::dyncfgs::{
31 ENABLE_PEEK_RESPONSE_STASH, PEEK_RESPONSE_STASH_BATCH_MAX_RUNS,
32 PEEK_RESPONSE_STASH_THRESHOLD_BYTES, PEEK_STASH_BATCH_SIZE, PEEK_STASH_NUM_BATCHES,
33};
34use mz_compute_types::plan::render_plan::RenderPlan;
35use mz_dyncfg::ConfigSet;
36use mz_expr::row::RowCollection;
37use mz_expr::{RowComparator, SafeMfpPlan};
38use mz_ore::cast::CastFrom;
39use mz_ore::collections::CollectionExt;
40use mz_ore::metrics::{MetricsRegistry, UIntGauge};
41use mz_ore::now::EpochMillis;
42use mz_ore::soft_panic_or_log;
43use mz_ore::task::AbortOnDropHandle;
44use mz_ore::tracing::{OpenTelemetryContext, TracingHandle};
45use mz_persist_client::Diagnostics;
46use mz_persist_client::cache::PersistClientCache;
47use mz_persist_client::cfg::USE_CRITICAL_SINCE_SNAPSHOT;
48use mz_persist_client::read::ReadHandle;
49use mz_persist_types::PersistLocation;
50use mz_persist_types::codec_impls::UnitSchema;
51use mz_repr::fixed_length::ToDatumIter;
52use mz_repr::{DatumVec, Diff, GlobalId, Row, RowArena, Timestamp};
53use mz_storage_operators::stats::StatsCursor;
54use mz_storage_types::StorageDiff;
55use mz_storage_types::controller::CollectionMetadata;
56use mz_storage_types::dyncfgs::ORE_OVERFLOWING_BEHAVIOR;
57use mz_storage_types::sources::SourceData;
58use mz_storage_types::time_dependence::TimeDependence;
59use mz_txn_wal::operator::TxnsContext;
60use mz_txn_wal::txn_cache::TxnsCache;
61use timely::communication::Allocate;
62use timely::dataflow::operators::probe;
63use timely::order::PartialOrder;
64use timely::progress::frontier::Antichain;
65use timely::scheduling::Scheduler;
66use timely::worker::Worker as TimelyWorker;
67use tokio::sync::{oneshot, watch};
68use tracing::{Level, debug, error, info, span, trace, warn};
69use uuid::Uuid;
70
71use crate::arrangement::manager::{TraceBundle, TraceManager};
72use crate::logging;
73use crate::logging::compute::{CollectionLogging, ComputeEvent, PeekEvent};
74use crate::logging::initialize::LoggingTraces;
75use crate::metrics::{CollectionMetrics, WorkerMetrics};
76use crate::render::{LinearJoinSpec, StartSignal};
77use crate::server::{ComputeInstanceContext, ResponseSender};
78
79mod peek_result_iterator;
80mod peek_stash;
81
82pub struct ComputeState {
87 pub collections: BTreeMap<GlobalId, CollectionState>,
96 pub traces: TraceManager,
98 pub subscribe_response_buffer: Rc<RefCell<Vec<(GlobalId, SubscribeResponse)>>>,
103 pub copy_to_response_buffer: Rc<RefCell<Vec<(GlobalId, CopyToResponse)>>>,
108 pub pending_peeks: BTreeMap<Uuid, PendingPeek>,
110 pub peek_stash_persist_location: Option<PersistLocation>,
112 pub compute_logger: Option<logging::compute::Logger>,
114 pub persist_clients: Arc<PersistClientCache>,
117 pub txns_ctx: TxnsContext,
119 pub command_history: ComputeCommandHistory<UIntGauge>,
121 max_result_size: u64,
123 pub linear_join_spec: LinearJoinSpec,
125 pub metrics: WorkerMetrics,
127 tracing_handle: Arc<TracingHandle>,
129 pub context: ComputeInstanceContext,
131 pub worker_config: Rc<ConfigSet>,
145
146 pub metrics_registry: MetricsRegistry,
148
149 pub workers_per_process: usize,
151
152 suspended_collections: BTreeMap<GlobalId, Rc<dyn Any>>,
158
159 pub server_maintenance_interval: Duration,
162
163 pub init_system_time: EpochMillis,
167
168 pub replica_expiration: Antichain<Timestamp>,
172}
173
174impl ComputeState {
175 pub fn new(
177 persist_clients: Arc<PersistClientCache>,
178 txns_ctx: TxnsContext,
179 metrics: WorkerMetrics,
180 tracing_handle: Arc<TracingHandle>,
181 context: ComputeInstanceContext,
182 metrics_registry: MetricsRegistry,
183 workers_per_process: usize,
184 ) -> Self {
185 let traces = TraceManager::new(metrics.clone());
186 let command_history = ComputeCommandHistory::new(metrics.for_history());
187
188 Self {
189 collections: Default::default(),
190 traces,
191 subscribe_response_buffer: Default::default(),
192 copy_to_response_buffer: Default::default(),
193 pending_peeks: Default::default(),
194 peek_stash_persist_location: None,
195 compute_logger: None,
196 persist_clients,
197 txns_ctx,
198 command_history,
199 max_result_size: u64::MAX,
200 linear_join_spec: Default::default(),
201 metrics,
202 tracing_handle,
203 context,
204 worker_config: mz_dyncfgs::all_dyncfgs().into(),
205 metrics_registry,
206 workers_per_process,
207 suspended_collections: Default::default(),
208 server_maintenance_interval: Duration::ZERO,
209 init_system_time: mz_ore::now::SYSTEM_TIME(),
210 replica_expiration: Antichain::default(),
211 }
212 }
213
214 pub fn expect_collection_mut(&mut self, id: GlobalId) -> &mut CollectionState {
218 self.collections
219 .get_mut(&id)
220 .expect("collection must exist")
221 }
222
223 pub fn input_probe_for(
229 &mut self,
230 input_id: GlobalId,
231 collection_ids: impl Iterator<Item = GlobalId>,
232 ) -> probe::Handle<Timestamp> {
233 let probe = probe::Handle::default();
234 for id in collection_ids {
235 if let Some(collection) = self.collections.get_mut(&id) {
236 collection.input_probes.insert(input_id, probe.clone());
237 }
238 }
239 probe
240 }
241
242 fn apply_worker_config(&mut self) {
244 use mz_compute_types::dyncfgs::*;
245
246 let config = &self.worker_config;
247
248 self.linear_join_spec = LinearJoinSpec::from_config(config);
249
250 if ENABLE_LGALLOC.get(config) {
251 if let Some(path) = &self.context.scratch_directory {
252 let clear_bytes = LGALLOC_SLOW_CLEAR_BYTES.get(config);
253 let eager_return = ENABLE_LGALLOC_EAGER_RECLAMATION.get(config);
254 let file_growth_dampener = LGALLOC_FILE_GROWTH_DAMPENER.get(config);
255 let interval = LGALLOC_BACKGROUND_INTERVAL.get(config);
256 let local_buffer_bytes = LGALLOC_LOCAL_BUFFER_BYTES.get(config);
257 info!(
258 ?path,
259 backgrund_interval=?interval,
260 clear_bytes,
261 eager_return,
262 file_growth_dampener,
263 local_buffer_bytes,
264 "enabling lgalloc"
265 );
266 let background_worker_config = lgalloc::BackgroundWorkerConfig {
267 interval,
268 clear_bytes,
269 };
270 lgalloc::lgalloc_set_config(
271 lgalloc::LgAlloc::new()
272 .enable()
273 .with_path(path.clone())
274 .with_background_config(background_worker_config)
275 .eager_return(eager_return)
276 .file_growth_dampener(file_growth_dampener)
277 .local_buffer_bytes(local_buffer_bytes),
278 );
279 } else {
280 debug!("not enabling lgalloc, scratch directory not specified");
281 }
282 } else {
283 info!("disabling lgalloc");
284 lgalloc::lgalloc_set_config(lgalloc::LgAlloc::new().disable());
285 }
286
287 crate::memory_limiter::apply_limiter_config(config);
288
289 mz_ore::region::ENABLE_LGALLOC_REGION.store(
290 ENABLE_COLUMNATION_LGALLOC.get(config),
291 std::sync::atomic::Ordering::Relaxed,
292 );
293
294 let enable_columnar_lgalloc = ENABLE_COLUMNAR_LGALLOC.get(config);
295 mz_timely_util::containers::set_enable_columnar_lgalloc(enable_columnar_lgalloc);
296
297 self.server_maintenance_interval = COMPUTE_SERVER_MAINTENANCE_INTERVAL.get(config);
300
301 let overflowing_behavior = ORE_OVERFLOWING_BEHAVIOR.get(config);
302 match overflowing_behavior.parse() {
303 Ok(behavior) => mz_ore::overflowing::set_behavior(behavior),
304 Err(err) => {
305 error!(
306 err,
307 overflowing_behavior, "Invalid value for ore_overflowing_behavior"
308 );
309 }
310 }
311 }
312
313 pub fn apply_expiration_offset(&mut self, offset: Duration) {
319 if self.replica_expiration.is_empty() {
320 let offset: EpochMillis = offset
321 .as_millis()
322 .try_into()
323 .expect("duration must fit within u64");
324 let replica_expiration_millis = self.init_system_time + offset;
325 let replica_expiration = Timestamp::from(replica_expiration_millis);
326
327 info!(
328 offset = %offset,
329 replica_expiration_millis = %replica_expiration_millis,
330 replica_expiration_utc = %mz_ore::now::to_datetime(replica_expiration_millis),
331 "setting replica expiration",
332 );
333 self.replica_expiration = Antichain::from_elem(replica_expiration);
334
335 self.metrics
337 .replica_expiration_timestamp_seconds
338 .set(replica_expiration.into());
339 }
340 }
341
342 pub fn dataflow_max_inflight_bytes(&self) -> Option<usize> {
345 use mz_compute_types::dyncfgs::{
346 DATAFLOW_MAX_INFLIGHT_BYTES, DATAFLOW_MAX_INFLIGHT_BYTES_CC,
347 };
348
349 if self.persist_clients.cfg.is_cc_active {
350 DATAFLOW_MAX_INFLIGHT_BYTES_CC.get(&self.worker_config)
351 } else {
352 DATAFLOW_MAX_INFLIGHT_BYTES.get(&self.worker_config)
353 }
354 }
355}
356
357pub(crate) struct ActiveComputeState<'a, A: Allocate> {
359 pub timely_worker: &'a mut TimelyWorker<A>,
361 pub compute_state: &'a mut ComputeState,
363 pub response_tx: &'a mut ResponseSender,
365}
366
367pub struct SinkToken(#[allow(dead_code)] Box<dyn Any>);
369
370impl SinkToken {
371 pub fn new(t: Box<dyn Any>) -> Self {
373 Self(t)
374 }
375}
376
377impl<'a, A: Allocate + 'static> ActiveComputeState<'a, A> {
378 #[mz_ore::instrument(level = "debug")]
380 pub fn handle_compute_command(&mut self, cmd: ComputeCommand) {
381 use ComputeCommand::*;
382
383 self.compute_state.command_history.push(cmd.clone());
384
385 let timer = self
387 .compute_state
388 .metrics
389 .handle_command_duration_seconds
390 .for_command(&cmd)
391 .start_timer();
392
393 match cmd {
394 Hello { .. } => panic!("Hello must be captured before"),
395 CreateInstance(instance_config) => self.handle_create_instance(*instance_config),
396 InitializationComplete => (),
397 UpdateConfiguration(params) => self.handle_update_configuration(*params),
398 CreateDataflow(dataflow) => self.handle_create_dataflow(*dataflow),
399 Schedule(id) => self.handle_schedule(id),
400 AllowCompaction { id, frontier } => self.handle_allow_compaction(id, frontier),
401 Peek(peek) => {
402 peek.otel_ctx.attach_as_parent();
403 self.handle_peek(*peek)
404 }
405 CancelPeek { uuid } => self.handle_cancel_peek(uuid),
406 AllowWrites(id) => {
407 self.handle_allow_writes(id);
408 }
409 }
410
411 timer.observe_duration();
412 }
413
414 fn handle_create_instance(&mut self, config: InstanceConfig) {
415 self.compute_state.apply_worker_config();
417 if let Some(offset) = config.expiration_offset {
418 self.compute_state.apply_expiration_offset(offset);
419 }
420
421 self.initialize_logging(config.logging);
422
423 self.compute_state.peek_stash_persist_location = Some(config.peek_stash_persist_location);
424 }
425
426 fn handle_update_configuration(&mut self, params: ComputeParameters) {
427 debug!("Applying configuration update: {params:?}");
428
429 let ComputeParameters {
430 workload_class,
431 max_result_size,
432 tracing,
433 grpc_client: _grpc_client,
434 dyncfg_updates,
435 } = params;
436
437 if let Some(v) = workload_class {
438 self.compute_state.metrics.set_workload_class(v);
439 }
440 if let Some(v) = max_result_size {
441 self.compute_state.max_result_size = v;
442 }
443
444 tracing.apply(self.compute_state.tracing_handle.as_ref());
445
446 dyncfg_updates.apply(&self.compute_state.worker_config);
447 self.compute_state
448 .persist_clients
449 .cfg()
450 .apply_from(&dyncfg_updates);
451
452 mz_metrics::update_dyncfg(&dyncfg_updates);
456
457 self.compute_state.apply_worker_config();
458 }
459
460 fn handle_create_dataflow(
461 &mut self,
462 dataflow: DataflowDescription<RenderPlan, CollectionMetadata>,
463 ) {
464 let dataflow_index = Rc::new(self.timely_worker.next_dataflow_index());
465 let as_of = dataflow.as_of.clone().unwrap();
466
467 let dataflow_expiration = dataflow
468 .time_dependence
469 .as_ref()
470 .map(|time_dependence| {
471 self.determine_dataflow_expiration(time_dependence, &dataflow.until)
472 })
473 .unwrap_or_default();
474
475 let until = dataflow.until.meet(&dataflow_expiration);
477
478 if dataflow.is_transient() {
479 debug!(
480 name = %dataflow.debug_name,
481 import_ids = %dataflow.display_import_ids(),
482 export_ids = %dataflow.display_export_ids(),
483 as_of = ?as_of.elements(),
484 time_dependence = ?dataflow.time_dependence,
485 expiration = ?dataflow_expiration.elements(),
486 expiration_datetime = ?dataflow_expiration
487 .as_option()
488 .map(|t| mz_ore::now::to_datetime(t.into())),
489 plan_until = ?dataflow.until.elements(),
490 until = ?until.elements(),
491 "creating dataflow",
492 );
493 } else {
494 info!(
495 name = %dataflow.debug_name,
496 import_ids = %dataflow.display_import_ids(),
497 export_ids = %dataflow.display_export_ids(),
498 as_of = ?as_of.elements(),
499 time_dependence = ?dataflow.time_dependence,
500 expiration = ?dataflow_expiration.elements(),
501 expiration_datetime = ?dataflow_expiration
502 .as_option()
503 .map(|t| mz_ore::now::to_datetime(t.into())),
504 plan_until = ?dataflow.until.elements(),
505 until = ?until.elements(),
506 "creating dataflow",
507 );
508 };
509
510 let subscribe_copy_ids: BTreeSet<_> = dataflow
511 .subscribe_ids()
512 .chain(dataflow.copy_to_ids())
513 .collect();
514
515 for object_id in dataflow.export_ids() {
517 let is_subscribe_or_copy = subscribe_copy_ids.contains(&object_id);
518 let metrics = self.compute_state.metrics.for_collection(object_id);
519 let mut collection = CollectionState::new(
520 Rc::clone(&dataflow_index),
521 is_subscribe_or_copy,
522 as_of.clone(),
523 metrics,
524 );
525
526 if let Some(logger) = self.compute_state.compute_logger.clone() {
527 let logging = CollectionLogging::new(
528 object_id,
529 logger,
530 *dataflow_index,
531 dataflow.import_ids(),
532 );
533 collection.logging = Some(logging);
534 }
535
536 collection.reset_reported_frontiers(ReportedFrontier::NotReported {
537 lower: as_of.clone(),
538 });
539
540 let existing = self.compute_state.collections.insert(object_id, collection);
541 if existing.is_some() {
542 error!(
543 id = ?object_id,
544 "existing collection for newly created dataflow",
545 );
546 }
547 }
548
549 let (start_signal, suspension_token) = StartSignal::new();
550 for id in dataflow.export_ids() {
551 self.compute_state
552 .suspended_collections
553 .insert(id, Rc::clone(&suspension_token));
554 }
555
556 crate::render::build_compute_dataflow(
557 self.timely_worker,
558 self.compute_state,
559 dataflow,
560 start_signal,
561 until,
562 dataflow_expiration,
563 );
564 }
565
566 fn handle_schedule(&mut self, id: GlobalId) {
567 let suspension_token = self.compute_state.suspended_collections.remove(&id);
573 drop(suspension_token);
574 }
575
576 fn handle_allow_compaction(&mut self, id: GlobalId, frontier: Antichain<Timestamp>) {
577 if frontier.is_empty() {
578 self.drop_collection(id);
580 } else {
581 self.compute_state
582 .traces
583 .allow_compaction(id, frontier.borrow());
584 }
585 }
586
587 #[mz_ore::instrument(level = "debug")]
588 fn handle_peek(&mut self, peek: Peek) {
589 let pending = match &peek.target {
590 PeekTarget::Index { id } => {
591 let trace_bundle = self.compute_state.traces.get(id).unwrap().clone();
593 PendingPeek::index(peek, trace_bundle)
594 }
595 PeekTarget::Persist { metadata, .. } => {
596 let metadata = metadata.clone();
597 PendingPeek::persist(
598 peek,
599 Arc::clone(&self.compute_state.persist_clients),
600 metadata,
601 usize::cast_from(self.compute_state.max_result_size),
602 self.timely_worker,
603 )
604 }
605 };
606
607 if let Some(logger) = self.compute_state.compute_logger.as_mut() {
609 logger.log(&pending.as_log_event(true));
610 }
611
612 self.process_peek(&mut Antichain::new(), pending);
613 }
614
615 fn handle_cancel_peek(&mut self, uuid: Uuid) {
616 if let Some(peek) = self.compute_state.pending_peeks.remove(&uuid) {
617 self.send_peek_response(peek, PeekResponse::Canceled);
618 }
619 }
620
621 fn handle_allow_writes(&mut self, id: GlobalId) {
622 self.compute_state.persist_clients.cfg().enable_compaction();
626
627 if let Some(collection) = self.compute_state.collections.get_mut(&id) {
628 collection.allow_writes();
629 } else {
630 soft_panic_or_log!("allow writes for unknown collection {id}");
631 }
632 }
633
634 fn drop_collection(&mut self, id: GlobalId) {
636 let collection = self
637 .compute_state
638 .collections
639 .remove(&id)
640 .expect("dropped untracked collection");
641
642 self.compute_state.traces.remove(&id);
644 self.compute_state.suspended_collections.remove(&id);
646
647 if let Ok(index) = Rc::try_unwrap(collection.dataflow_index) {
649 self.timely_worker.drop_dataflow(index);
650 }
651
652 if !collection.is_subscribe_or_copy {
657 let reported = collection.reported_frontiers;
658 let write_frontier = (!reported.write_frontier.is_empty()).then(Antichain::new);
659 let input_frontier = (!reported.input_frontier.is_empty()).then(Antichain::new);
660 let output_frontier = (!reported.output_frontier.is_empty()).then(Antichain::new);
661
662 let frontiers = FrontiersResponse {
663 write_frontier,
664 input_frontier,
665 output_frontier,
666 };
667 if frontiers.has_updates() {
668 self.send_compute_response(ComputeResponse::Frontiers(id, frontiers));
669 }
670 }
671 }
672
673 pub fn initialize_logging(&mut self, config: LoggingConfig) {
675 if self.compute_state.compute_logger.is_some() {
676 panic!("dataflow server has already initialized logging");
677 }
678
679 let LoggingTraces {
680 traces,
681 dataflow_index,
682 compute_logger: logger,
683 } = logging::initialize(
684 self.timely_worker,
685 &config,
686 self.compute_state.metrics_registry.clone(),
687 Rc::clone(&self.compute_state.worker_config),
688 self.compute_state.workers_per_process,
689 );
690
691 let dataflow_index = Rc::new(dataflow_index);
692 let mut log_index_ids = config.index_logs;
693 for (log, trace) in traces {
694 let id = log_index_ids
696 .remove(&log)
697 .expect("`logging::initialize` does not invent logs");
698 self.compute_state.traces.set(id, trace);
699
700 let is_subscribe_or_copy = false;
702 let as_of = Antichain::from_elem(Timestamp::MIN);
703 let metrics = self.compute_state.metrics.for_collection(id);
704 let mut collection = CollectionState::new(
705 Rc::clone(&dataflow_index),
706 is_subscribe_or_copy,
707 as_of,
708 metrics,
709 );
710
711 let logging =
712 CollectionLogging::new(id, logger.clone(), *dataflow_index, std::iter::empty());
713 collection.logging = Some(logging);
714
715 let existing = self.compute_state.collections.insert(id, collection);
716 if existing.is_some() {
717 error!(
718 id = ?id,
719 "existing collection for newly initialized logging export",
720 );
721 }
722 }
723
724 assert!(
726 log_index_ids.is_empty(),
727 "failed to create requested logging indexes: {log_index_ids:?}",
728 );
729
730 self.compute_state.compute_logger = Some(logger);
731 }
732
733 pub fn report_frontiers(&mut self) {
735 let mut responses = Vec::new();
736
737 let mut new_frontier = Antichain::new();
739
740 for (&id, collection) in self.compute_state.collections.iter_mut() {
741 if collection.is_subscribe_or_copy {
744 continue;
745 }
746
747 let reported = collection.reported_frontiers();
748
749 new_frontier.clear();
751 if let Some(traces) = self.compute_state.traces.get_mut(&id) {
752 assert!(
753 collection.sink_write_frontier.is_none(),
754 "collection {id} has multiple frontiers"
755 );
756 traces.oks_mut().read_upper(&mut new_frontier);
757 } else if let Some(frontier) = &collection.sink_write_frontier {
758 new_frontier.clone_from(&frontier.borrow());
759 } else {
760 error!(id = ?id, "collection without write frontier");
761 continue;
762 }
763 let new_write_frontier = reported
764 .write_frontier
765 .allows_reporting(&new_frontier)
766 .then(|| new_frontier.clone());
767
768 if let Some(probe) = &collection.compute_probe {
781 if *collection.read_only_rx.borrow() {
782 new_frontier.clear();
783 }
784 probe.with_frontier(|frontier| new_frontier.extend(frontier.iter().copied()));
785 }
786 let new_output_frontier = reported
787 .output_frontier
788 .allows_reporting(&new_frontier)
789 .then(|| new_frontier.clone());
790
791 new_frontier.clear();
793 for probe in collection.input_probes.values() {
794 probe.with_frontier(|frontier| new_frontier.extend(frontier.iter().copied()));
795 }
796 let new_input_frontier = reported
797 .input_frontier
798 .allows_reporting(&new_frontier)
799 .then(|| new_frontier.clone());
800
801 if let Some(frontier) = &new_write_frontier {
802 collection
803 .set_reported_write_frontier(ReportedFrontier::Reported(frontier.clone()));
804 }
805 if let Some(frontier) = &new_input_frontier {
806 collection
807 .set_reported_input_frontier(ReportedFrontier::Reported(frontier.clone()));
808 }
809 if let Some(frontier) = &new_output_frontier {
810 collection
811 .set_reported_output_frontier(ReportedFrontier::Reported(frontier.clone()));
812 }
813
814 let response = FrontiersResponse {
815 write_frontier: new_write_frontier,
816 input_frontier: new_input_frontier,
817 output_frontier: new_output_frontier,
818 };
819 if response.has_updates() {
820 responses.push((id, response));
821 }
822 }
823
824 for (id, frontiers) in responses {
825 self.send_compute_response(ComputeResponse::Frontiers(id, frontiers));
826 }
827 }
828
829 pub(crate) fn report_metrics(&self) {
831 if let Some(expiration) = self.compute_state.replica_expiration.as_option() {
832 let now = Duration::from_millis(mz_ore::now::SYSTEM_TIME()).as_secs_f64();
833 let expiration = Duration::from_millis(<u64>::from(expiration)).as_secs_f64();
834 let remaining = expiration - now;
835 self.compute_state
836 .metrics
837 .replica_expiration_remaining_seconds
838 .set(remaining)
839 }
840 }
841
842 fn process_peek(&mut self, upper: &mut Antichain<Timestamp>, mut peek: PendingPeek) {
844 let response = match &mut peek {
845 PendingPeek::Index(peek) => {
846 let start = Instant::now();
847
848 let peek_stash_eligible = peek
849 .peek
850 .finishing
851 .is_streamable(peek.peek.result_desc.arity());
852
853 let peek_stash_enabled = {
854 let enabled = ENABLE_PEEK_RESPONSE_STASH.get(&self.compute_state.worker_config);
855 let peek_persist_stash_available =
856 self.compute_state.peek_stash_persist_location.is_some();
857 if !peek_persist_stash_available && enabled {
858 error!("missing peek_stash_persist_location but peek stash is enabled");
859 }
860 enabled && peek_persist_stash_available
861 };
862
863 let peek_stash_threshold_bytes =
864 PEEK_RESPONSE_STASH_THRESHOLD_BYTES.get(&self.compute_state.worker_config);
865
866 let metrics = IndexPeekMetrics {
867 seek_fulfillment_seconds: &self
868 .compute_state
869 .metrics
870 .index_peek_seek_fulfillment_seconds,
871 frontier_check_seconds: &self
872 .compute_state
873 .metrics
874 .index_peek_frontier_check_seconds,
875 error_scan_seconds: &self.compute_state.metrics.index_peek_error_scan_seconds,
876 cursor_setup_seconds: &self
877 .compute_state
878 .metrics
879 .index_peek_cursor_setup_seconds,
880 row_iteration_seconds: &self
881 .compute_state
882 .metrics
883 .index_peek_row_iteration_seconds,
884 result_sort_seconds: &self.compute_state.metrics.index_peek_result_sort_seconds,
885 row_collection_seconds: &self
886 .compute_state
887 .metrics
888 .index_peek_row_collection_seconds,
889 };
890
891 let status = peek.seek_fulfillment(
892 upper,
893 self.compute_state.max_result_size,
894 peek_stash_enabled && peek_stash_eligible,
895 peek_stash_threshold_bytes,
896 &metrics,
897 );
898
899 self.compute_state
900 .metrics
901 .index_peek_total_seconds
902 .observe(start.elapsed().as_secs_f64());
903
904 match status {
905 PeekStatus::Ready(result) => Some(result),
906 PeekStatus::NotReady => None,
907 PeekStatus::UsePeekStash => {
908 let _span =
909 span!(parent: &peek.span, Level::DEBUG, "process_stash_peek").entered();
910
911 let peek_stash_batch_max_runs = PEEK_RESPONSE_STASH_BATCH_MAX_RUNS
912 .get(&self.compute_state.worker_config);
913
914 let stash_task = peek_stash::StashingPeek::start_upload(
915 Arc::clone(&self.compute_state.persist_clients),
916 self.compute_state
917 .peek_stash_persist_location
918 .as_ref()
919 .expect("verified above"),
920 peek.peek.clone(),
921 peek.trace_bundle.clone(),
922 peek_stash_batch_max_runs,
923 );
924
925 self.compute_state
926 .pending_peeks
927 .insert(peek.peek.uuid, PendingPeek::Stash(stash_task));
928 return;
929 }
930 }
931 }
932 PendingPeek::Persist(peek) => peek.result.try_recv().ok().map(|(result, duration)| {
933 self.compute_state
934 .metrics
935 .persist_peek_seconds
936 .observe(duration.as_secs_f64());
937 result
938 }),
939 PendingPeek::Stash(stashing_peek) => {
940 let num_batches = PEEK_STASH_NUM_BATCHES.get(&self.compute_state.worker_config);
941 let batch_size = PEEK_STASH_BATCH_SIZE.get(&self.compute_state.worker_config);
942 stashing_peek.pump_rows(num_batches, batch_size);
943
944 if let Ok((response, duration)) = stashing_peek.result.try_recv() {
945 self.compute_state
946 .metrics
947 .stashed_peek_seconds
948 .observe(duration.as_secs_f64());
949 trace!(?stashing_peek.peek, ?duration, "finished stashing peek response in persist");
950
951 Some(response)
952 } else {
953 None
954 }
955 }
956 };
957
958 if let Some(response) = response {
959 let _span = span!(parent: peek.span(), Level::DEBUG, "process_peek_response").entered();
960 self.send_peek_response(peek, response)
961 } else {
962 let uuid = peek.peek().uuid;
963 self.compute_state.pending_peeks.insert(uuid, peek);
964 }
965 }
966
967 pub fn process_peeks(&mut self) {
969 let mut upper = Antichain::new();
970 let pending_peeks = std::mem::take(&mut self.compute_state.pending_peeks);
971 for (_uuid, peek) in pending_peeks {
972 self.process_peek(&mut upper, peek);
973 }
974 }
975
976 #[mz_ore::instrument(level = "debug")]
981 fn send_peek_response(&mut self, peek: PendingPeek, response: PeekResponse) {
982 let log_event = peek.as_log_event(false);
983 self.send_compute_response(ComputeResponse::PeekResponse(
985 peek.peek().uuid,
986 response,
987 OpenTelemetryContext::obtain(),
988 ));
989
990 if let Some(logger) = self.compute_state.compute_logger.as_mut() {
992 logger.log(&log_event);
993 }
994 }
995
996 pub fn process_subscribes(&mut self) {
998 let mut subscribe_responses = self.compute_state.subscribe_response_buffer.borrow_mut();
999 for (sink_id, mut response) in subscribe_responses.drain(..) {
1000 if let Some(collection) = self.compute_state.collections.get_mut(&sink_id) {
1002 let new_frontier = match &response {
1003 SubscribeResponse::Batch(b) => b.upper.clone(),
1004 SubscribeResponse::DroppedAt(_) => Antichain::new(),
1005 };
1006
1007 let reported = collection.reported_frontiers();
1008 assert!(
1009 reported.write_frontier.allows_reporting(&new_frontier),
1010 "subscribe write frontier regression: {:?} -> {:?}",
1011 reported.write_frontier,
1012 new_frontier,
1013 );
1014 assert!(
1015 reported.input_frontier.allows_reporting(&new_frontier),
1016 "subscribe input frontier regression: {:?} -> {:?}",
1017 reported.input_frontier,
1018 new_frontier,
1019 );
1020
1021 collection
1022 .set_reported_write_frontier(ReportedFrontier::Reported(new_frontier.clone()));
1023 collection
1024 .set_reported_input_frontier(ReportedFrontier::Reported(new_frontier.clone()));
1025 collection.set_reported_output_frontier(ReportedFrontier::Reported(new_frontier));
1026 } else {
1027 }
1030
1031 response
1032 .to_error_if_exceeds(usize::try_from(self.compute_state.max_result_size).unwrap());
1033 self.send_compute_response(ComputeResponse::SubscribeResponse(sink_id, response));
1034 }
1035 }
1036
1037 pub fn process_copy_tos(&self) {
1039 let mut responses = self.compute_state.copy_to_response_buffer.borrow_mut();
1040 for (sink_id, response) in responses.drain(..) {
1041 self.send_compute_response(ComputeResponse::CopyToResponse(sink_id, response));
1042 }
1043 }
1044
1045 fn send_compute_response(&self, response: ComputeResponse) {
1047 let _ = self.response_tx.send(response);
1050 }
1051
1052 pub(crate) fn check_expiration(&self) {
1054 let now = mz_ore::now::SYSTEM_TIME();
1055 if self.compute_state.replica_expiration.less_than(&now.into()) {
1056 let now_datetime = mz_ore::now::to_datetime(now);
1057 let expiration_datetime = self
1058 .compute_state
1059 .replica_expiration
1060 .as_option()
1061 .map(Into::into)
1062 .map(mz_ore::now::to_datetime);
1063
1064 error!(
1067 now,
1068 now_datetime = ?now_datetime,
1069 expiration = ?self.compute_state.replica_expiration.elements(),
1070 expiration_datetime = ?expiration_datetime,
1071 "replica expired"
1072 );
1073
1074 assert!(
1076 !self.compute_state.replica_expiration.less_than(&now.into()),
1077 "replica expired. now: {now} ({now_datetime:?}), expiration: {:?} ({expiration_datetime:?})",
1078 self.compute_state.replica_expiration.elements(),
1079 );
1080 }
1081 }
1082
1083 pub fn determine_dataflow_expiration(
1089 &self,
1090 time_dependence: &TimeDependence,
1091 until: &Antichain<mz_repr::Timestamp>,
1092 ) -> Antichain<mz_repr::Timestamp> {
1093 let iter = self
1098 .compute_state
1099 .replica_expiration
1100 .iter()
1101 .filter_map(|t| time_dependence.apply(*t))
1102 .filter_map(|t| mz_repr::Timestamp::try_step_forward(&t))
1103 .filter(|expiration| !until.less_equal(expiration));
1104 Antichain::from_iter(iter)
1105 }
1106}
1107
1108pub enum PendingPeek {
1113 Index(IndexPeek),
1115 Persist(PersistPeek),
1117 Stash(peek_stash::StashingPeek),
1120}
1121
1122impl PendingPeek {
1123 pub fn as_log_event(&self, installed: bool) -> ComputeEvent {
1125 let peek = self.peek();
1126 let (id, peek_type) = match &peek.target {
1127 PeekTarget::Index { id } => (*id, logging::compute::PeekType::Index),
1128 PeekTarget::Persist { id, .. } => (*id, logging::compute::PeekType::Persist),
1129 };
1130 let uuid = peek.uuid.into_bytes();
1131 ComputeEvent::Peek(PeekEvent {
1132 id,
1133 time: peek.timestamp,
1134 uuid,
1135 peek_type,
1136 installed,
1137 })
1138 }
1139
1140 fn index(peek: Peek, mut trace_bundle: TraceBundle) -> Self {
1141 let empty_frontier = Antichain::new();
1142 let timestamp_frontier = Antichain::from_elem(peek.timestamp);
1143 trace_bundle
1144 .oks_mut()
1145 .set_logical_compaction(timestamp_frontier.borrow());
1146 trace_bundle
1147 .errs_mut()
1148 .set_logical_compaction(timestamp_frontier.borrow());
1149 trace_bundle
1150 .oks_mut()
1151 .set_physical_compaction(empty_frontier.borrow());
1152 trace_bundle
1153 .errs_mut()
1154 .set_physical_compaction(empty_frontier.borrow());
1155
1156 PendingPeek::Index(IndexPeek {
1157 peek,
1158 trace_bundle,
1159 span: tracing::Span::current(),
1160 })
1161 }
1162
1163 fn persist<A: Allocate>(
1164 peek: Peek,
1165 persist_clients: Arc<PersistClientCache>,
1166 metadata: CollectionMetadata,
1167 max_result_size: usize,
1168 timely_worker: &TimelyWorker<A>,
1169 ) -> Self {
1170 let active_worker = {
1171 let chosen_index = usize::cast_from(peek.uuid.hashed()) % timely_worker.peers();
1173 chosen_index == timely_worker.index()
1174 };
1175 let activator = timely_worker.sync_activator_for([].into());
1176 let peek_uuid = peek.uuid;
1177
1178 let (result_tx, result_rx) = oneshot::channel();
1179 let timestamp = peek.timestamp;
1180 let mfp_plan = peek.map_filter_project.clone();
1181 let max_results_needed = peek
1182 .finishing
1183 .limit
1184 .map(|l| usize::cast_from(u64::from(l)))
1185 .unwrap_or(usize::MAX)
1186 + peek.finishing.offset;
1187 let order_by = peek.finishing.order_by.clone();
1188
1189 let literal_constraint = peek
1191 .literal_constraints
1192 .clone()
1193 .map(|rows| rows.into_element());
1194
1195 let task_handle = mz_ore::task::spawn(|| "persist::peek", async move {
1196 let start = Instant::now();
1197 let result = if active_worker {
1198 PersistPeek::do_peek(
1199 &persist_clients,
1200 metadata,
1201 timestamp,
1202 literal_constraint,
1203 mfp_plan,
1204 max_result_size,
1205 max_results_needed,
1206 )
1207 .await
1208 } else {
1209 Ok(vec![])
1210 };
1211 let result = match result {
1212 Ok(rows) => PeekResponse::Rows(vec![RowCollection::new(rows, &order_by)]),
1213 Err(e) => PeekResponse::Error(e.to_string()),
1214 };
1215 match result_tx.send((result, start.elapsed())) {
1216 Ok(()) => {}
1217 Err((_result, elapsed)) => {
1218 debug!(duration =? elapsed, "dropping result for cancelled peek {peek_uuid}")
1219 }
1220 }
1221 match activator.activate() {
1222 Ok(()) => {}
1223 Err(_) => {
1224 debug!("unable to wake timely after completed peek {peek_uuid}");
1225 }
1226 }
1227 });
1228 PendingPeek::Persist(PersistPeek {
1229 peek,
1230 _abort_handle: task_handle.abort_on_drop(),
1231 result: result_rx,
1232 span: tracing::Span::current(),
1233 })
1234 }
1235
1236 fn span(&self) -> &tracing::Span {
1237 match self {
1238 PendingPeek::Index(p) => &p.span,
1239 PendingPeek::Persist(p) => &p.span,
1240 PendingPeek::Stash(p) => &p.span,
1241 }
1242 }
1243
1244 pub(crate) fn peek(&self) -> &Peek {
1245 match self {
1246 PendingPeek::Index(p) => &p.peek,
1247 PendingPeek::Persist(p) => &p.peek,
1248 PendingPeek::Stash(p) => &p.peek,
1249 }
1250 }
1251}
1252
1253pub struct PersistPeek {
1258 pub(crate) peek: Peek,
1259 _abort_handle: AbortOnDropHandle<()>,
1262 result: oneshot::Receiver<(PeekResponse, Duration)>,
1264 span: tracing::Span,
1266}
1267
1268impl PersistPeek {
1269 async fn do_peek(
1270 persist_clients: &PersistClientCache,
1271 metadata: CollectionMetadata,
1272 as_of: Timestamp,
1273 literal_constraint: Option<Row>,
1274 mfp_plan: SafeMfpPlan,
1275 max_result_size: usize,
1276 mut limit_remaining: usize,
1277 ) -> Result<Vec<(Row, NonZeroUsize)>, String> {
1278 let client = persist_clients
1279 .open(metadata.persist_location)
1280 .await
1281 .map_err(|e| e.to_string())?;
1282
1283 let mut reader: ReadHandle<SourceData, (), Timestamp, StorageDiff> = client
1284 .open_leased_reader(
1285 metadata.data_shard,
1286 Arc::new(metadata.relation_desc.clone()),
1287 Arc::new(UnitSchema),
1288 Diagnostics::from_purpose("persist::peek"),
1289 USE_CRITICAL_SINCE_SNAPSHOT.get(client.dyncfgs()),
1290 )
1291 .await
1292 .map_err(|e| e.to_string())?;
1293
1294 let mut txns_read = if let Some(txns_id) = metadata.txns_shard {
1301 Some(TxnsCache::open(&client, txns_id, Some(metadata.data_shard)).await)
1302 } else {
1303 None
1304 };
1305
1306 let metrics = client.metrics();
1307
1308 let mut cursor = StatsCursor::new(
1309 &mut reader,
1310 txns_read.as_mut(),
1311 metrics,
1312 &mfp_plan,
1313 &metadata.relation_desc,
1314 Antichain::from_elem(as_of),
1315 )
1316 .await
1317 .map_err(|since| {
1318 format!("attempted to peek at {as_of}, but the since has advanced to {since:?}")
1319 })?;
1320
1321 let mut result = vec![];
1323 let mut datum_vec = DatumVec::new();
1324 let mut row_builder = Row::default();
1325 let arena = RowArena::new();
1326 let mut total_size = 0usize;
1327
1328 let literal_len = match &literal_constraint {
1329 None => 0,
1330 Some(row) => row.iter().count(),
1331 };
1332
1333 'collect: while limit_remaining > 0 {
1334 let Some(batch) = cursor.next().await else {
1335 break;
1336 };
1337 for (data, _, d) in batch {
1338 let row = data.map_err(|e| e.to_string())?;
1339
1340 if let Some(literal) = &literal_constraint {
1341 match row.iter().take(literal_len).cmp(literal.iter()) {
1342 Ordering::Less => continue,
1343 Ordering::Equal => {}
1344 Ordering::Greater => break 'collect,
1345 }
1346 }
1347
1348 let count: usize = d.try_into().map_err(|_| {
1349 error!(
1350 shard = %metadata.data_shard, diff = d, ?row,
1351 "persist peek encountered negative multiplicities",
1352 );
1353 format!(
1354 "Invalid data in source, \
1355 saw retractions ({}) for row that does not exist: {:?}",
1356 -d, row,
1357 )
1358 })?;
1359 let Some(count) = NonZeroUsize::new(count) else {
1360 continue;
1361 };
1362 let mut datum_local = datum_vec.borrow_with(&row);
1363 let eval_result = mfp_plan
1364 .evaluate_into(&mut datum_local, &arena, &mut row_builder)
1365 .map(|row| row.cloned())
1366 .map_err(|e| e.to_string())?;
1367 if let Some(row) = eval_result {
1368 total_size = total_size
1369 .saturating_add(row.byte_len())
1370 .saturating_add(std::mem::size_of::<NonZeroUsize>());
1371 if total_size > max_result_size {
1372 return Err(format!(
1373 "result exceeds max size of {}",
1374 ByteSize::b(u64::cast_from(max_result_size))
1375 ));
1376 }
1377 result.push((row, count));
1378 limit_remaining = limit_remaining.saturating_sub(count.get());
1379 if limit_remaining == 0 {
1380 break;
1381 }
1382 }
1383 }
1384 }
1385
1386 Ok(result)
1387 }
1388}
1389
1390pub struct IndexPeek {
1392 peek: Peek,
1393 trace_bundle: TraceBundle,
1395 span: tracing::Span,
1397}
1398
1399pub(crate) struct IndexPeekMetrics<'a> {
1404 pub seek_fulfillment_seconds: &'a prometheus::Histogram,
1405 pub frontier_check_seconds: &'a prometheus::Histogram,
1406 pub error_scan_seconds: &'a prometheus::Histogram,
1407 pub cursor_setup_seconds: &'a prometheus::Histogram,
1408 pub row_iteration_seconds: &'a prometheus::Histogram,
1409 pub result_sort_seconds: &'a prometheus::Histogram,
1410 pub row_collection_seconds: &'a prometheus::Histogram,
1411}
1412
1413impl IndexPeek {
1414 fn seek_fulfillment(
1427 &mut self,
1428 upper: &mut Antichain<Timestamp>,
1429 max_result_size: u64,
1430 peek_stash_eligible: bool,
1431 peek_stash_threshold_bytes: usize,
1432 metrics: &IndexPeekMetrics<'_>,
1433 ) -> PeekStatus {
1434 let method_start = Instant::now();
1435
1436 self.trace_bundle.oks_mut().read_upper(upper);
1437 if upper.less_equal(&self.peek.timestamp) {
1438 return PeekStatus::NotReady;
1439 }
1440 self.trace_bundle.errs_mut().read_upper(upper);
1441 if upper.less_equal(&self.peek.timestamp) {
1442 return PeekStatus::NotReady;
1443 }
1444
1445 let read_frontier = self.trace_bundle.compaction_frontier();
1446 if !read_frontier.less_equal(&self.peek.timestamp) {
1447 let error = format!(
1448 "Arrangement compaction frontier ({:?}) is beyond the time of the attempted read ({})",
1449 read_frontier.elements(),
1450 self.peek.timestamp,
1451 );
1452 return PeekStatus::Ready(PeekResponse::Error(error));
1453 }
1454
1455 metrics
1456 .frontier_check_seconds
1457 .observe(method_start.elapsed().as_secs_f64());
1458
1459 let result = self.collect_finished_data(
1460 max_result_size,
1461 peek_stash_eligible,
1462 peek_stash_threshold_bytes,
1463 metrics,
1464 );
1465
1466 metrics
1467 .seek_fulfillment_seconds
1468 .observe(method_start.elapsed().as_secs_f64());
1469
1470 result
1471 }
1472
1473 fn collect_finished_data(
1475 &mut self,
1476 max_result_size: u64,
1477 peek_stash_eligible: bool,
1478 peek_stash_threshold_bytes: usize,
1479 metrics: &IndexPeekMetrics<'_>,
1480 ) -> PeekStatus {
1481 let error_scan_start = Instant::now();
1482
1483 let (mut cursor, storage) = self.trace_bundle.errs_mut().cursor();
1486 while cursor.key_valid(&storage) {
1487 let mut copies = Diff::ZERO;
1488 cursor.map_times(&storage, |time, diff| {
1489 if time.less_equal(&self.peek.timestamp) {
1490 copies += diff;
1491 }
1492 });
1493 if copies.is_negative() {
1494 let error = cursor.key(&storage);
1495 error!(
1496 target = %self.peek.target.id(), diff = %copies, %error,
1497 "index peek encountered negative multiplicities in error trace",
1498 );
1499 return PeekStatus::Ready(PeekResponse::Error(format!(
1500 "Invalid data in source errors, \
1501 saw retractions ({}) for row that does not exist: {}",
1502 -copies, error,
1503 )));
1504 }
1505 if copies.is_positive() {
1506 return PeekStatus::Ready(PeekResponse::Error(cursor.key(&storage).to_string()));
1507 }
1508 cursor.step_key(&storage);
1509 }
1510
1511 metrics
1512 .error_scan_seconds
1513 .observe(error_scan_start.elapsed().as_secs_f64());
1514
1515 Self::collect_ok_finished_data(
1516 &self.peek,
1517 self.trace_bundle.oks_mut(),
1518 max_result_size,
1519 peek_stash_eligible,
1520 peek_stash_threshold_bytes,
1521 metrics,
1522 )
1523 }
1524
1525 fn collect_ok_finished_data<Tr>(
1527 peek: &Peek<Timestamp>,
1528 oks_handle: &mut Tr,
1529 max_result_size: u64,
1530 peek_stash_eligible: bool,
1531 peek_stash_threshold_bytes: usize,
1532 metrics: &IndexPeekMetrics<'_>,
1533 ) -> PeekStatus
1534 where
1535 for<'a> Tr: TraceReader<
1536 Key<'a>: ToDatumIter + Eq,
1537 KeyOwn = Row,
1538 Val<'a>: ToDatumIter,
1539 TimeGat<'a>: PartialOrder<mz_repr::Timestamp>,
1540 DiffGat<'a> = &'a Diff,
1541 >,
1542 {
1543 let max_result_size = usize::cast_from(max_result_size);
1544 let count_byte_size = size_of::<NonZeroUsize>();
1545
1546 let cursor_setup_start = Instant::now();
1548
1549 let mut peek_iterator = peek_result_iterator::PeekResultIterator::new(
1552 peek.target.id().clone(),
1553 peek.map_filter_project.clone(),
1554 peek.timestamp,
1555 peek.literal_constraints.clone().as_deref_mut(),
1556 oks_handle,
1557 );
1558
1559 metrics
1560 .cursor_setup_seconds
1561 .observe(cursor_setup_start.elapsed().as_secs_f64());
1562
1563 let mut results = Vec::new();
1565 let mut total_size: usize = 0;
1566
1567 let max_results = peek.finishing.num_rows_needed();
1573
1574 let comparator = RowComparator::new(peek.finishing.order_by.as_slice());
1575
1576 let row_iteration_start = Instant::now();
1578 let mut sort_time_accum = Duration::ZERO;
1579
1580 while let Some(row) = peek_iterator.next() {
1581 let row = match row {
1582 Ok(row) => row,
1583 Err(err) => return PeekStatus::Ready(PeekResponse::Error(err)),
1584 };
1585 let (row, copies) = row;
1586 let copies: NonZeroUsize = NonZeroUsize::try_from(copies).expect("fits into usize");
1587
1588 total_size = total_size
1589 .saturating_add(row.byte_len())
1590 .saturating_add(count_byte_size);
1591 if peek_stash_eligible && total_size > peek_stash_threshold_bytes {
1592 return PeekStatus::UsePeekStash;
1593 }
1594 if total_size > max_result_size {
1595 return PeekStatus::Ready(PeekResponse::Error(format!(
1596 "result exceeds max size of {}",
1597 ByteSize::b(u64::cast_from(max_result_size))
1598 )));
1599 }
1600
1601 results.push((row, copies));
1602
1603 if let Some(max_results) = max_results {
1606 if results.len() >= 2 * max_results {
1610 if peek.finishing.order_by.is_empty() {
1611 results.truncate(max_results);
1612 metrics
1613 .row_iteration_seconds
1614 .observe(row_iteration_start.elapsed().as_secs_f64());
1615 metrics
1616 .result_sort_seconds
1617 .observe(sort_time_accum.as_secs_f64());
1618 let row_collection_start = Instant::now();
1619 let collection = RowCollection::new(results, &peek.finishing.order_by);
1620 metrics
1621 .row_collection_seconds
1622 .observe(row_collection_start.elapsed().as_secs_f64());
1623 return PeekStatus::Ready(PeekResponse::Rows(vec![collection]));
1624 } else {
1625 let sort_start = Instant::now();
1634 results.sort_by(|left, right| {
1635 comparator.compare_rows(&left.0, &right.0, || left.0.cmp(&right.0))
1636 });
1637 sort_time_accum += sort_start.elapsed();
1638 let dropped = results.drain(max_results..);
1639 let dropped_size =
1640 dropped.into_iter().fold(0, |acc: usize, (row, _count)| {
1641 acc.saturating_add(row.byte_len().saturating_add(count_byte_size))
1642 });
1643 total_size = total_size.saturating_sub(dropped_size);
1644 }
1645 }
1646 }
1647 }
1648
1649 metrics
1650 .row_iteration_seconds
1651 .observe(row_iteration_start.elapsed().as_secs_f64());
1652 metrics
1653 .result_sort_seconds
1654 .observe(sort_time_accum.as_secs_f64());
1655
1656 let row_collection_start = Instant::now();
1657 let collection = RowCollection::new(results, &peek.finishing.order_by);
1658 metrics
1659 .row_collection_seconds
1660 .observe(row_collection_start.elapsed().as_secs_f64());
1661 PeekStatus::Ready(PeekResponse::Rows(vec![collection]))
1662 }
1663}
1664
1665enum PeekStatus {
1668 NotReady,
1671 UsePeekStash,
1674 Ready(PeekResponse),
1676}
1677
1678#[derive(Debug)]
1680struct ReportedFrontiers {
1681 write_frontier: ReportedFrontier,
1683 input_frontier: ReportedFrontier,
1685 output_frontier: ReportedFrontier,
1687}
1688
1689impl ReportedFrontiers {
1690 fn new() -> Self {
1692 Self {
1693 write_frontier: ReportedFrontier::new(),
1694 input_frontier: ReportedFrontier::new(),
1695 output_frontier: ReportedFrontier::new(),
1696 }
1697 }
1698}
1699
1700#[derive(Clone, Debug)]
1702pub enum ReportedFrontier {
1703 Reported(Antichain<Timestamp>),
1705 NotReported {
1707 lower: Antichain<Timestamp>,
1709 },
1710}
1711
1712impl ReportedFrontier {
1713 pub fn new() -> Self {
1715 let lower = Antichain::from_elem(timely::progress::Timestamp::minimum());
1716 Self::NotReported { lower }
1717 }
1718
1719 pub fn is_empty(&self) -> bool {
1721 match self {
1722 Self::Reported(frontier) => frontier.is_empty(),
1723 Self::NotReported { .. } => false,
1724 }
1725 }
1726
1727 fn allows_reporting(&self, other: &Antichain<Timestamp>) -> bool {
1733 match self {
1734 Self::Reported(frontier) => PartialOrder::less_than(frontier, other),
1735 Self::NotReported { lower } => PartialOrder::less_equal(lower, other),
1736 }
1737 }
1738}
1739
1740pub struct CollectionState {
1742 reported_frontiers: ReportedFrontiers,
1744 dataflow_index: Rc<usize>,
1750 pub is_subscribe_or_copy: bool,
1756 as_of: Antichain<Timestamp>,
1760
1761 pub sink_token: Option<SinkToken>,
1766 pub sink_write_frontier: Option<Rc<RefCell<Antichain<Timestamp>>>>,
1770 pub input_probes: BTreeMap<GlobalId, probe::Handle<Timestamp>>,
1772 pub compute_probe: Option<probe::Handle<Timestamp>>,
1777 logging: Option<CollectionLogging>,
1779 metrics: CollectionMetrics,
1781 read_only_tx: watch::Sender<bool>,
1793 pub read_only_rx: watch::Receiver<bool>,
1795}
1796
1797impl CollectionState {
1798 fn new(
1799 dataflow_index: Rc<usize>,
1800 is_subscribe_or_copy: bool,
1801 as_of: Antichain<Timestamp>,
1802 metrics: CollectionMetrics,
1803 ) -> Self {
1804 let (read_only_tx, read_only_rx) = watch::channel(true);
1807
1808 Self {
1809 reported_frontiers: ReportedFrontiers::new(),
1810 dataflow_index,
1811 is_subscribe_or_copy,
1812 as_of,
1813 sink_token: None,
1814 sink_write_frontier: None,
1815 input_probes: Default::default(),
1816 compute_probe: None,
1817 logging: None,
1818 metrics,
1819 read_only_tx,
1820 read_only_rx,
1821 }
1822 }
1823
1824 fn reported_frontiers(&self) -> &ReportedFrontiers {
1826 &self.reported_frontiers
1827 }
1828
1829 pub fn reset_reported_frontiers(&mut self, frontier: ReportedFrontier) {
1831 self.reported_frontiers.write_frontier = frontier.clone();
1832 self.reported_frontiers.input_frontier = frontier.clone();
1833 self.reported_frontiers.output_frontier = frontier;
1834 }
1835
1836 fn set_reported_write_frontier(&mut self, frontier: ReportedFrontier) {
1838 if let Some(logging) = &mut self.logging {
1839 let time = match &frontier {
1840 ReportedFrontier::Reported(frontier) => frontier.get(0).copied(),
1841 ReportedFrontier::NotReported { .. } => Some(Timestamp::MIN),
1842 };
1843 logging.set_frontier(time);
1844 }
1845
1846 self.reported_frontiers.write_frontier = frontier;
1847 }
1848
1849 fn set_reported_input_frontier(&mut self, frontier: ReportedFrontier) {
1851 if let Some(logging) = &mut self.logging {
1853 for (id, probe) in &self.input_probes {
1854 let new_time = probe.with_frontier(|frontier| frontier.as_option().copied());
1855 logging.set_import_frontier(*id, new_time);
1856 }
1857 }
1858
1859 self.reported_frontiers.input_frontier = frontier;
1860 }
1861
1862 fn set_reported_output_frontier(&mut self, frontier: ReportedFrontier) {
1864 let already_hydrated = self.hydrated();
1865
1866 self.reported_frontiers.output_frontier = frontier;
1867
1868 if !already_hydrated && self.hydrated() {
1869 if let Some(logging) = &mut self.logging {
1870 logging.set_hydrated();
1871 }
1872 self.metrics.record_collection_hydrated();
1873 }
1874 }
1875
1876 fn hydrated(&self) -> bool {
1878 match &self.reported_frontiers.output_frontier {
1879 ReportedFrontier::Reported(frontier) => PartialOrder::less_than(&self.as_of, frontier),
1880 ReportedFrontier::NotReported { .. } => false,
1881 }
1882 }
1883
1884 fn allow_writes(&self) {
1886 info!(
1887 dataflow_index = *self.dataflow_index,
1888 export = ?self.logging.as_ref().map(|l| l.export_id()),
1889 "allowing writes for dataflow",
1890 );
1891 let _ = self.read_only_tx.send(false);
1892 }
1893}