1use std::any::Any;
9use std::cell::RefCell;
10use std::cmp::Ordering;
11use std::collections::{BTreeMap, BTreeSet};
12use std::num::NonZeroUsize;
13use std::rc::Rc;
14use std::sync::Arc;
15use std::time::{Duration, Instant};
16
17use bytesize::ByteSize;
18use differential_dataflow::Hashable;
19use differential_dataflow::lattice::Lattice;
20use differential_dataflow::trace::{Cursor, TraceReader};
21use mz_compute_client::logging::LoggingConfig;
22use mz_compute_client::protocol::command::{
23 ComputeCommand, ComputeParameters, InstanceConfig, Peek, PeekTarget,
24};
25use mz_compute_client::protocol::history::ComputeCommandHistory;
26use mz_compute_client::protocol::response::{
27 ComputeResponse, CopyToResponse, FrontiersResponse, PeekResponse, SubscribeResponse,
28};
29use mz_compute_types::dataflows::DataflowDescription;
30use mz_compute_types::dyncfgs::{
31 ENABLE_PEEK_RESPONSE_STASH, PEEK_RESPONSE_STASH_BATCH_MAX_RUNS,
32 PEEK_RESPONSE_STASH_THRESHOLD_BYTES, PEEK_STASH_BATCH_SIZE, PEEK_STASH_NUM_BATCHES,
33};
34use mz_compute_types::plan::render_plan::RenderPlan;
35use mz_dyncfg::ConfigSet;
36use mz_expr::SafeMfpPlan;
37use mz_expr::row::RowCollection;
38use mz_ore::cast::CastFrom;
39use mz_ore::collections::CollectionExt;
40use mz_ore::metrics::UIntGauge;
41use mz_ore::now::EpochMillis;
42use mz_ore::soft_panic_or_log;
43use mz_ore::task::AbortOnDropHandle;
44use mz_ore::tracing::{OpenTelemetryContext, TracingHandle};
45use mz_persist_client::Diagnostics;
46use mz_persist_client::cache::PersistClientCache;
47use mz_persist_client::cfg::USE_CRITICAL_SINCE_SNAPSHOT;
48use mz_persist_client::read::ReadHandle;
49use mz_persist_types::PersistLocation;
50use mz_persist_types::codec_impls::UnitSchema;
51use mz_repr::fixed_length::ToDatumIter;
52use mz_repr::{DatumVec, Diff, GlobalId, Row, RowArena, Timestamp};
53use mz_storage_operators::stats::StatsCursor;
54use mz_storage_types::StorageDiff;
55use mz_storage_types::controller::CollectionMetadata;
56use mz_storage_types::dyncfgs::ORE_OVERFLOWING_BEHAVIOR;
57use mz_storage_types::sources::SourceData;
58use mz_storage_types::time_dependence::TimeDependence;
59use mz_txn_wal::operator::TxnsContext;
60use mz_txn_wal::txn_cache::TxnsCache;
61use timely::communication::Allocate;
62use timely::dataflow::operators::probe;
63use timely::order::PartialOrder;
64use timely::progress::frontier::Antichain;
65use timely::scheduling::Scheduler;
66use timely::worker::Worker as TimelyWorker;
67use tokio::sync::{oneshot, watch};
68use tracing::{Level, debug, error, info, span, warn};
69use uuid::Uuid;
70
71use crate::arrangement::manager::{TraceBundle, TraceManager};
72use crate::logging;
73use crate::logging::compute::{CollectionLogging, ComputeEvent, PeekEvent};
74use crate::logging::initialize::LoggingTraces;
75use crate::metrics::{CollectionMetrics, WorkerMetrics};
76use crate::render::{LinearJoinSpec, StartSignal};
77use crate::server::{ComputeInstanceContext, ResponseSender};
78
79mod peek_result_iterator;
80mod peek_stash;
81
82pub struct ComputeState {
87 pub collections: BTreeMap<GlobalId, CollectionState>,
96 pub traces: TraceManager,
98 pub subscribe_response_buffer: Rc<RefCell<Vec<(GlobalId, SubscribeResponse)>>>,
103 pub copy_to_response_buffer: Rc<RefCell<Vec<(GlobalId, CopyToResponse)>>>,
108 pub pending_peeks: BTreeMap<Uuid, PendingPeek>,
110 pub peek_stash_persist_location: Option<PersistLocation>,
112 pub compute_logger: Option<logging::compute::Logger>,
114 pub persist_clients: Arc<PersistClientCache>,
117 pub txns_ctx: TxnsContext,
119 pub command_history: ComputeCommandHistory<UIntGauge>,
121 max_result_size: u64,
123 pub linear_join_spec: LinearJoinSpec,
125 pub metrics: WorkerMetrics,
127 tracing_handle: Arc<TracingHandle>,
129 pub context: ComputeInstanceContext,
131 pub worker_config: Rc<ConfigSet>,
145
146 suspended_collections: BTreeMap<GlobalId, Rc<dyn Any>>,
152
153 pub server_maintenance_interval: Duration,
156
157 pub init_system_time: EpochMillis,
161
162 pub replica_expiration: Antichain<Timestamp>,
166}
167
168impl ComputeState {
169 pub fn new(
171 persist_clients: Arc<PersistClientCache>,
172 txns_ctx: TxnsContext,
173 metrics: WorkerMetrics,
174 tracing_handle: Arc<TracingHandle>,
175 context: ComputeInstanceContext,
176 ) -> Self {
177 let traces = TraceManager::new(metrics.clone());
178 let command_history = ComputeCommandHistory::new(metrics.for_history());
179
180 Self {
181 collections: Default::default(),
182 traces,
183 subscribe_response_buffer: Default::default(),
184 copy_to_response_buffer: Default::default(),
185 pending_peeks: Default::default(),
186 peek_stash_persist_location: None,
187 compute_logger: None,
188 persist_clients,
189 txns_ctx,
190 command_history,
191 max_result_size: u64::MAX,
192 linear_join_spec: Default::default(),
193 metrics,
194 tracing_handle,
195 context,
196 worker_config: mz_dyncfgs::all_dyncfgs().into(),
197 suspended_collections: Default::default(),
198 server_maintenance_interval: Duration::ZERO,
199 init_system_time: mz_ore::now::SYSTEM_TIME(),
200 replica_expiration: Antichain::default(),
201 }
202 }
203
204 pub fn expect_collection_mut(&mut self, id: GlobalId) -> &mut CollectionState {
208 self.collections
209 .get_mut(&id)
210 .expect("collection must exist")
211 }
212
213 pub fn input_probe_for(
219 &mut self,
220 input_id: GlobalId,
221 collection_ids: impl Iterator<Item = GlobalId>,
222 ) -> probe::Handle<Timestamp> {
223 let probe = probe::Handle::default();
224 for id in collection_ids {
225 if let Some(collection) = self.collections.get_mut(&id) {
226 collection.input_probes.insert(input_id, probe.clone());
227 }
228 }
229 probe
230 }
231
232 fn apply_worker_config(&mut self) {
234 use mz_compute_types::dyncfgs::*;
235
236 let config = &self.worker_config;
237
238 self.linear_join_spec = LinearJoinSpec::from_config(config);
239
240 if ENABLE_LGALLOC.get(config) {
241 if let Some(path) = &self.context.scratch_directory {
242 let clear_bytes = LGALLOC_SLOW_CLEAR_BYTES.get(config);
243 let eager_return = ENABLE_LGALLOC_EAGER_RECLAMATION.get(config);
244 let file_growth_dampener = LGALLOC_FILE_GROWTH_DAMPENER.get(config);
245 let interval = LGALLOC_BACKGROUND_INTERVAL.get(config);
246 let local_buffer_bytes = LGALLOC_LOCAL_BUFFER_BYTES.get(config);
247 info!(
248 ?path,
249 backgrund_interval=?interval,
250 clear_bytes,
251 eager_return,
252 file_growth_dampener,
253 local_buffer_bytes,
254 "enabling lgalloc"
255 );
256 let background_worker_config = lgalloc::BackgroundWorkerConfig {
257 interval,
258 clear_bytes,
259 };
260 lgalloc::lgalloc_set_config(
261 lgalloc::LgAlloc::new()
262 .enable()
263 .with_path(path.clone())
264 .with_background_config(background_worker_config)
265 .eager_return(eager_return)
266 .file_growth_dampener(file_growth_dampener)
267 .local_buffer_bytes(local_buffer_bytes),
268 );
269 } else {
270 debug!("not enabling lgalloc, scratch directory not specified");
271 }
272 } else {
273 info!("disabling lgalloc");
274 lgalloc::lgalloc_set_config(lgalloc::LgAlloc::new().disable());
275 }
276
277 crate::memory_limiter::apply_limiter_config(config);
278
279 mz_ore::region::ENABLE_LGALLOC_REGION.store(
280 ENABLE_COLUMNATION_LGALLOC.get(config),
281 std::sync::atomic::Ordering::Relaxed,
282 );
283
284 let enable_columnar_lgalloc = ENABLE_COLUMNAR_LGALLOC.get(config);
285 mz_timely_util::containers::set_enable_columnar_lgalloc(enable_columnar_lgalloc);
286
287 self.server_maintenance_interval = COMPUTE_SERVER_MAINTENANCE_INTERVAL.get(config);
290
291 let overflowing_behavior = ORE_OVERFLOWING_BEHAVIOR.get(config);
292 match overflowing_behavior.parse() {
293 Ok(behavior) => mz_ore::overflowing::set_behavior(behavior),
294 Err(err) => {
295 error!(
296 err,
297 overflowing_behavior, "Invalid value for ore_overflowing_behavior"
298 );
299 }
300 }
301 }
302
303 pub fn apply_expiration_offset(&mut self, offset: Duration) {
309 if self.replica_expiration.is_empty() {
310 let offset: EpochMillis = offset
311 .as_millis()
312 .try_into()
313 .expect("duration must fit within u64");
314 let replica_expiration_millis = self.init_system_time + offset;
315 let replica_expiration = Timestamp::from(replica_expiration_millis);
316
317 info!(
318 offset = %offset,
319 replica_expiration_millis = %replica_expiration_millis,
320 replica_expiration_utc = %mz_ore::now::to_datetime(replica_expiration_millis),
321 "setting replica expiration",
322 );
323 self.replica_expiration = Antichain::from_elem(replica_expiration);
324
325 self.metrics
327 .replica_expiration_timestamp_seconds
328 .set(replica_expiration.into());
329 }
330 }
331
332 pub fn dataflow_max_inflight_bytes(&self) -> Option<usize> {
335 use mz_compute_types::dyncfgs::{
336 DATAFLOW_MAX_INFLIGHT_BYTES, DATAFLOW_MAX_INFLIGHT_BYTES_CC,
337 };
338
339 if self.persist_clients.cfg.is_cc_active {
340 DATAFLOW_MAX_INFLIGHT_BYTES_CC.get(&self.worker_config)
341 } else {
342 DATAFLOW_MAX_INFLIGHT_BYTES.get(&self.worker_config)
343 }
344 }
345}
346
347pub(crate) struct ActiveComputeState<'a, A: Allocate> {
349 pub timely_worker: &'a mut TimelyWorker<A>,
351 pub compute_state: &'a mut ComputeState,
353 pub response_tx: &'a mut ResponseSender,
355}
356
357pub struct SinkToken(#[allow(dead_code)] Box<dyn Any>);
359
360impl SinkToken {
361 pub fn new(t: Box<dyn Any>) -> Self {
363 Self(t)
364 }
365}
366
367impl<'a, A: Allocate + 'static> ActiveComputeState<'a, A> {
368 #[mz_ore::instrument(level = "debug")]
370 pub fn handle_compute_command(&mut self, cmd: ComputeCommand) {
371 use ComputeCommand::*;
372
373 self.compute_state.command_history.push(cmd.clone());
374
375 let timer = self
377 .compute_state
378 .metrics
379 .handle_command_duration_seconds
380 .for_command(&cmd)
381 .start_timer();
382
383 match cmd {
384 Hello { .. } => panic!("Hello must be captured before"),
385 CreateInstance(instance_config) => self.handle_create_instance(*instance_config),
386 InitializationComplete => (),
387 UpdateConfiguration(params) => self.handle_update_configuration(*params),
388 CreateDataflow(dataflow) => self.handle_create_dataflow(*dataflow),
389 Schedule(id) => self.handle_schedule(id),
390 AllowCompaction { id, frontier } => self.handle_allow_compaction(id, frontier),
391 Peek(peek) => {
392 peek.otel_ctx.attach_as_parent();
393 self.handle_peek(*peek)
394 }
395 CancelPeek { uuid } => self.handle_cancel_peek(uuid),
396 AllowWrites(id) => {
397 self.handle_allow_writes(id);
398 }
399 }
400
401 timer.observe_duration();
402 }
403
404 fn handle_create_instance(&mut self, config: InstanceConfig) {
405 self.compute_state.apply_worker_config();
407 if let Some(offset) = config.expiration_offset {
408 self.compute_state.apply_expiration_offset(offset);
409 }
410
411 self.initialize_logging(config.logging);
412
413 self.compute_state.peek_stash_persist_location = Some(config.peek_stash_persist_location);
414 }
415
416 fn handle_update_configuration(&mut self, params: ComputeParameters) {
417 info!("Applying configuration update: {params:?}");
418
419 let ComputeParameters {
420 workload_class,
421 max_result_size,
422 tracing,
423 grpc_client: _grpc_client,
424 dyncfg_updates,
425 } = params;
426
427 if let Some(v) = workload_class {
428 self.compute_state.metrics.set_workload_class(v);
429 }
430 if let Some(v) = max_result_size {
431 self.compute_state.max_result_size = v;
432 }
433
434 tracing.apply(self.compute_state.tracing_handle.as_ref());
435
436 dyncfg_updates.apply(&self.compute_state.worker_config);
437 self.compute_state
438 .persist_clients
439 .cfg()
440 .apply_from(&dyncfg_updates);
441
442 mz_metrics::update_dyncfg(&dyncfg_updates);
446
447 self.compute_state.apply_worker_config();
448 }
449
450 fn handle_create_dataflow(
451 &mut self,
452 dataflow: DataflowDescription<RenderPlan, CollectionMetadata>,
453 ) {
454 let dataflow_index = Rc::new(self.timely_worker.next_dataflow_index());
455 let as_of = dataflow.as_of.clone().unwrap();
456
457 let dataflow_expiration = dataflow
458 .time_dependence
459 .as_ref()
460 .map(|time_dependence| {
461 self.determine_dataflow_expiration(time_dependence, &dataflow.until)
462 })
463 .unwrap_or_default();
464
465 let until = dataflow.until.meet(&dataflow_expiration);
467
468 if dataflow.is_transient() {
469 debug!(
470 name = %dataflow.debug_name,
471 import_ids = %dataflow.display_import_ids(),
472 export_ids = %dataflow.display_export_ids(),
473 as_of = ?as_of.elements(),
474 time_dependence = ?dataflow.time_dependence,
475 expiration = ?dataflow_expiration.elements(),
476 expiration_datetime = ?dataflow_expiration.as_option().map(|t| mz_ore::now::to_datetime(t.into())),
477 plan_until = ?dataflow.until.elements(),
478 until = ?until.elements(),
479 "creating dataflow",
480 );
481 } else {
482 info!(
483 name = %dataflow.debug_name,
484 import_ids = %dataflow.display_import_ids(),
485 export_ids = %dataflow.display_export_ids(),
486 as_of = ?as_of.elements(),
487 time_dependence = ?dataflow.time_dependence,
488 expiration = ?dataflow_expiration.elements(),
489 expiration_datetime = ?dataflow_expiration.as_option().map(|t| mz_ore::now::to_datetime(t.into())),
490 plan_until = ?dataflow.until.elements(),
491 until = ?until.elements(),
492 "creating dataflow",
493 );
494 };
495
496 let subscribe_copy_ids: BTreeSet<_> = dataflow
497 .subscribe_ids()
498 .chain(dataflow.copy_to_ids())
499 .collect();
500
501 for object_id in dataflow.export_ids() {
503 let is_subscribe_or_copy = subscribe_copy_ids.contains(&object_id);
504 let metrics = self.compute_state.metrics.for_collection(object_id);
505 let mut collection = CollectionState::new(
506 Rc::clone(&dataflow_index),
507 is_subscribe_or_copy,
508 as_of.clone(),
509 metrics,
510 );
511
512 if let Some(logger) = self.compute_state.compute_logger.clone() {
513 let logging = CollectionLogging::new(
514 object_id,
515 logger,
516 *dataflow_index,
517 dataflow.import_ids(),
518 );
519 collection.logging = Some(logging);
520 }
521
522 collection.reset_reported_frontiers(ReportedFrontier::NotReported {
523 lower: as_of.clone(),
524 });
525
526 let existing = self.compute_state.collections.insert(object_id, collection);
527 if existing.is_some() {
528 error!(
529 id = ?object_id,
530 "existing collection for newly created dataflow",
531 );
532 }
533 }
534
535 let (start_signal, suspension_token) = StartSignal::new();
536 for id in dataflow.export_ids() {
537 self.compute_state
538 .suspended_collections
539 .insert(id, Rc::clone(&suspension_token));
540 }
541
542 crate::render::build_compute_dataflow(
543 self.timely_worker,
544 self.compute_state,
545 dataflow,
546 start_signal,
547 until,
548 dataflow_expiration,
549 );
550 }
551
552 fn handle_schedule(&mut self, id: GlobalId) {
553 let suspension_token = self.compute_state.suspended_collections.remove(&id);
559 drop(suspension_token);
560 }
561
562 fn handle_allow_compaction(&mut self, id: GlobalId, frontier: Antichain<Timestamp>) {
563 if frontier.is_empty() {
564 self.drop_collection(id);
566 } else {
567 self.compute_state
568 .traces
569 .allow_compaction(id, frontier.borrow());
570 }
571 }
572
573 #[mz_ore::instrument(level = "debug")]
574 fn handle_peek(&mut self, peek: Peek) {
575 let pending = match &peek.target {
576 PeekTarget::Index { id } => {
577 let trace_bundle = self.compute_state.traces.get(id).unwrap().clone();
579 PendingPeek::index(peek, trace_bundle)
580 }
581 PeekTarget::Persist { metadata, .. } => {
582 let metadata = metadata.clone();
583 PendingPeek::persist(
584 peek,
585 Arc::clone(&self.compute_state.persist_clients),
586 metadata,
587 usize::cast_from(self.compute_state.max_result_size),
588 self.timely_worker,
589 )
590 }
591 };
592
593 if let Some(logger) = self.compute_state.compute_logger.as_mut() {
595 logger.log(&pending.as_log_event(true));
596 }
597
598 self.process_peek(&mut Antichain::new(), pending);
599 }
600
601 fn handle_cancel_peek(&mut self, uuid: Uuid) {
602 if let Some(peek) = self.compute_state.pending_peeks.remove(&uuid) {
603 self.send_peek_response(peek, PeekResponse::Canceled);
604 }
605 }
606
607 fn handle_allow_writes(&mut self, id: GlobalId) {
608 self.compute_state.persist_clients.cfg().enable_compaction();
612
613 if let Some(collection) = self.compute_state.collections.get_mut(&id) {
614 collection.allow_writes();
615 } else {
616 soft_panic_or_log!("allow writes for unknown collection {id}");
617 }
618 }
619
620 fn drop_collection(&mut self, id: GlobalId) {
622 let collection = self
623 .compute_state
624 .collections
625 .remove(&id)
626 .expect("dropped untracked collection");
627
628 self.compute_state.traces.remove(&id);
630 self.compute_state.suspended_collections.remove(&id);
632
633 if let Ok(index) = Rc::try_unwrap(collection.dataflow_index) {
635 self.timely_worker.drop_dataflow(index);
636 }
637
638 if !collection.is_subscribe_or_copy {
643 let reported = collection.reported_frontiers;
644 let write_frontier = (!reported.write_frontier.is_empty()).then(Antichain::new);
645 let input_frontier = (!reported.input_frontier.is_empty()).then(Antichain::new);
646 let output_frontier = (!reported.output_frontier.is_empty()).then(Antichain::new);
647
648 let frontiers = FrontiersResponse {
649 write_frontier,
650 input_frontier,
651 output_frontier,
652 };
653 if frontiers.has_updates() {
654 self.send_compute_response(ComputeResponse::Frontiers(id, frontiers));
655 }
656 }
657 }
658
659 pub fn initialize_logging(&mut self, config: LoggingConfig) {
661 if self.compute_state.compute_logger.is_some() {
662 panic!("dataflow server has already initialized logging");
663 }
664
665 let LoggingTraces {
666 traces,
667 dataflow_index,
668 compute_logger: logger,
669 } = logging::initialize(self.timely_worker, &config);
670
671 let dataflow_index = Rc::new(dataflow_index);
672 let mut log_index_ids = config.index_logs;
673 for (log, trace) in traces {
674 let id = log_index_ids
676 .remove(&log)
677 .expect("`logging::initialize` does not invent logs");
678 self.compute_state.traces.set(id, trace);
679
680 let is_subscribe_or_copy = false;
682 let as_of = Antichain::from_elem(Timestamp::MIN);
683 let metrics = self.compute_state.metrics.for_collection(id);
684 let mut collection = CollectionState::new(
685 Rc::clone(&dataflow_index),
686 is_subscribe_or_copy,
687 as_of,
688 metrics,
689 );
690
691 let logging =
692 CollectionLogging::new(id, logger.clone(), *dataflow_index, std::iter::empty());
693 collection.logging = Some(logging);
694
695 let existing = self.compute_state.collections.insert(id, collection);
696 if existing.is_some() {
697 error!(
698 id = ?id,
699 "existing collection for newly initialized logging export",
700 );
701 }
702 }
703
704 assert!(
706 log_index_ids.is_empty(),
707 "failed to create requested logging indexes: {log_index_ids:?}",
708 );
709
710 self.compute_state.compute_logger = Some(logger);
711 }
712
713 pub fn report_frontiers(&mut self) {
715 let mut responses = Vec::new();
716
717 let mut new_frontier = Antichain::new();
719
720 for (&id, collection) in self.compute_state.collections.iter_mut() {
721 if collection.is_subscribe_or_copy {
724 continue;
725 }
726
727 let reported = collection.reported_frontiers();
728
729 new_frontier.clear();
731 if let Some(traces) = self.compute_state.traces.get_mut(&id) {
732 assert!(
733 collection.sink_write_frontier.is_none(),
734 "collection {id} has multiple frontiers"
735 );
736 traces.oks_mut().read_upper(&mut new_frontier);
737 } else if let Some(frontier) = &collection.sink_write_frontier {
738 new_frontier.clone_from(&frontier.borrow());
739 } else {
740 error!(id = ?id, "collection without write frontier");
741 continue;
742 }
743 let new_write_frontier = reported
744 .write_frontier
745 .allows_reporting(&new_frontier)
746 .then(|| new_frontier.clone());
747
748 if let Some(probe) = &collection.compute_probe {
757 probe.with_frontier(|frontier| new_frontier.extend(frontier.iter().copied()));
758 }
759 let new_output_frontier = reported
760 .output_frontier
761 .allows_reporting(&new_frontier)
762 .then(|| new_frontier.clone());
763
764 new_frontier.clear();
766 for probe in collection.input_probes.values() {
767 probe.with_frontier(|frontier| new_frontier.extend(frontier.iter().copied()));
768 }
769 let new_input_frontier = reported
770 .input_frontier
771 .allows_reporting(&new_frontier)
772 .then(|| new_frontier.clone());
773
774 if let Some(frontier) = &new_write_frontier {
775 collection
776 .set_reported_write_frontier(ReportedFrontier::Reported(frontier.clone()));
777 }
778 if let Some(frontier) = &new_input_frontier {
779 collection
780 .set_reported_input_frontier(ReportedFrontier::Reported(frontier.clone()));
781 }
782 if let Some(frontier) = &new_output_frontier {
783 collection
784 .set_reported_output_frontier(ReportedFrontier::Reported(frontier.clone()));
785 }
786
787 let response = FrontiersResponse {
788 write_frontier: new_write_frontier,
789 input_frontier: new_input_frontier,
790 output_frontier: new_output_frontier,
791 };
792 if response.has_updates() {
793 responses.push((id, response));
794 }
795 }
796
797 for (id, frontiers) in responses {
798 self.send_compute_response(ComputeResponse::Frontiers(id, frontiers));
799 }
800 }
801
802 pub(crate) fn report_metrics(&self) {
804 if let Some(expiration) = self.compute_state.replica_expiration.as_option() {
805 let now = Duration::from_millis(mz_ore::now::SYSTEM_TIME()).as_secs_f64();
806 let expiration = Duration::from_millis(<u64>::from(expiration)).as_secs_f64();
807 let remaining = expiration - now;
808 self.compute_state
809 .metrics
810 .replica_expiration_remaining_seconds
811 .set(remaining)
812 }
813 }
814
815 fn process_peek(&mut self, upper: &mut Antichain<Timestamp>, mut peek: PendingPeek) {
817 let response = match &mut peek {
818 PendingPeek::Index(peek) => {
819 let start = Instant::now();
820
821 let peek_stash_eligible = peek
822 .peek
823 .finishing
824 .is_streamable(peek.peek.result_desc.arity());
825
826 let peek_stash_enabled = {
827 let enabled = ENABLE_PEEK_RESPONSE_STASH.get(&self.compute_state.worker_config);
828 let peek_persist_stash_available =
829 self.compute_state.peek_stash_persist_location.is_some();
830 if !peek_persist_stash_available && enabled {
831 tracing::error!(
832 "missing peek_stash_persist_location but peek stash is enabled"
833 );
834 }
835 enabled && peek_persist_stash_available
836 };
837
838 let peek_stash_threshold_bytes =
839 PEEK_RESPONSE_STASH_THRESHOLD_BYTES.get(&self.compute_state.worker_config);
840
841 let metrics = IndexPeekMetrics {
842 seek_fulfillment_seconds: &self
843 .compute_state
844 .metrics
845 .index_peek_seek_fulfillment_seconds,
846 frontier_check_seconds: &self
847 .compute_state
848 .metrics
849 .index_peek_frontier_check_seconds,
850 error_scan_seconds: &self.compute_state.metrics.index_peek_error_scan_seconds,
851 cursor_setup_seconds: &self
852 .compute_state
853 .metrics
854 .index_peek_cursor_setup_seconds,
855 row_iteration_seconds: &self
856 .compute_state
857 .metrics
858 .index_peek_row_iteration_seconds,
859 result_sort_seconds: &self.compute_state.metrics.index_peek_result_sort_seconds,
860 row_collection_seconds: &self
861 .compute_state
862 .metrics
863 .index_peek_row_collection_seconds,
864 };
865
866 let status = peek.seek_fulfillment(
867 upper,
868 self.compute_state.max_result_size,
869 peek_stash_enabled && peek_stash_eligible,
870 peek_stash_threshold_bytes,
871 &metrics,
872 );
873
874 self.compute_state
875 .metrics
876 .index_peek_total_seconds
877 .observe(start.elapsed().as_secs_f64());
878
879 match status {
880 PeekStatus::Ready(result) => Some(result),
881 PeekStatus::NotReady => None,
882 PeekStatus::UsePeekStash => {
883 let _span =
884 span!(parent: &peek.span, Level::DEBUG, "process_stash_peek").entered();
885
886 let peek_stash_batch_max_runs = PEEK_RESPONSE_STASH_BATCH_MAX_RUNS
887 .get(&self.compute_state.worker_config);
888
889 let stash_task = peek_stash::StashingPeek::start_upload(
890 Arc::clone(&self.compute_state.persist_clients),
891 self.compute_state
892 .peek_stash_persist_location
893 .as_ref()
894 .expect("verified above"),
895 peek.peek.clone(),
896 peek.trace_bundle.clone(),
897 peek_stash_batch_max_runs,
898 );
899
900 self.compute_state
901 .pending_peeks
902 .insert(peek.peek.uuid, PendingPeek::Stash(stash_task));
903 return;
904 }
905 }
906 }
907 PendingPeek::Persist(peek) => peek.result.try_recv().ok().map(|(result, duration)| {
908 self.compute_state
909 .metrics
910 .persist_peek_seconds
911 .observe(duration.as_secs_f64());
912 result
913 }),
914 PendingPeek::Stash(stashing_peek) => {
915 let num_batches = PEEK_STASH_NUM_BATCHES.get(&self.compute_state.worker_config);
916 let batch_size = PEEK_STASH_BATCH_SIZE.get(&self.compute_state.worker_config);
917 stashing_peek.pump_rows(num_batches, batch_size);
918
919 if let Ok((response, duration)) = stashing_peek.result.try_recv() {
920 self.compute_state
921 .metrics
922 .stashed_peek_seconds
923 .observe(duration.as_secs_f64());
924 tracing::trace!(?stashing_peek.peek, ?duration, "finished stashing peek response in persist");
925
926 Some(response)
927 } else {
928 None
929 }
930 }
931 };
932
933 if let Some(response) = response {
934 let _span = span!(parent: peek.span(), Level::DEBUG, "process_peek_response").entered();
935 self.send_peek_response(peek, response)
936 } else {
937 let uuid = peek.peek().uuid;
938 self.compute_state.pending_peeks.insert(uuid, peek);
939 }
940 }
941
942 pub fn process_peeks(&mut self) {
944 let mut upper = Antichain::new();
945 let pending_peeks = std::mem::take(&mut self.compute_state.pending_peeks);
946 for (_uuid, peek) in pending_peeks {
947 self.process_peek(&mut upper, peek);
948 }
949 }
950
951 #[mz_ore::instrument(level = "debug")]
956 fn send_peek_response(&mut self, peek: PendingPeek, response: PeekResponse) {
957 let log_event = peek.as_log_event(false);
958 self.send_compute_response(ComputeResponse::PeekResponse(
960 peek.peek().uuid,
961 response,
962 OpenTelemetryContext::obtain(),
963 ));
964
965 if let Some(logger) = self.compute_state.compute_logger.as_mut() {
967 logger.log(&log_event);
968 }
969 }
970
971 pub fn process_subscribes(&mut self) {
973 let mut subscribe_responses = self.compute_state.subscribe_response_buffer.borrow_mut();
974 for (sink_id, mut response) in subscribe_responses.drain(..) {
975 if let Some(collection) = self.compute_state.collections.get_mut(&sink_id) {
977 let new_frontier = match &response {
978 SubscribeResponse::Batch(b) => b.upper.clone(),
979 SubscribeResponse::DroppedAt(_) => Antichain::new(),
980 };
981
982 let reported = collection.reported_frontiers();
983 assert!(
984 reported.write_frontier.allows_reporting(&new_frontier),
985 "subscribe write frontier regression: {:?} -> {:?}",
986 reported.write_frontier,
987 new_frontier,
988 );
989 assert!(
990 reported.input_frontier.allows_reporting(&new_frontier),
991 "subscribe input frontier regression: {:?} -> {:?}",
992 reported.input_frontier,
993 new_frontier,
994 );
995
996 collection
997 .set_reported_write_frontier(ReportedFrontier::Reported(new_frontier.clone()));
998 collection
999 .set_reported_input_frontier(ReportedFrontier::Reported(new_frontier.clone()));
1000 collection.set_reported_output_frontier(ReportedFrontier::Reported(new_frontier));
1001 } else {
1002 }
1005
1006 response
1007 .to_error_if_exceeds(usize::try_from(self.compute_state.max_result_size).unwrap());
1008 self.send_compute_response(ComputeResponse::SubscribeResponse(sink_id, response));
1009 }
1010 }
1011
1012 pub fn process_copy_tos(&self) {
1014 let mut responses = self.compute_state.copy_to_response_buffer.borrow_mut();
1015 for (sink_id, response) in responses.drain(..) {
1016 self.send_compute_response(ComputeResponse::CopyToResponse(sink_id, response));
1017 }
1018 }
1019
1020 fn send_compute_response(&self, response: ComputeResponse) {
1022 let _ = self.response_tx.send(response);
1025 }
1026
1027 pub(crate) fn check_expiration(&self) {
1029 let now = mz_ore::now::SYSTEM_TIME();
1030 if self.compute_state.replica_expiration.less_than(&now.into()) {
1031 let now_datetime = mz_ore::now::to_datetime(now);
1032 let expiration_datetime = self
1033 .compute_state
1034 .replica_expiration
1035 .as_option()
1036 .map(Into::into)
1037 .map(mz_ore::now::to_datetime);
1038
1039 error!(
1042 now,
1043 now_datetime = ?now_datetime,
1044 expiration = ?self.compute_state.replica_expiration.elements(),
1045 expiration_datetime = ?expiration_datetime,
1046 "replica expired"
1047 );
1048
1049 assert!(
1051 !self.compute_state.replica_expiration.less_than(&now.into()),
1052 "replica expired. now: {now} ({now_datetime:?}), expiration: {:?} ({expiration_datetime:?})",
1053 self.compute_state.replica_expiration.elements(),
1054 );
1055 }
1056 }
1057
1058 pub fn determine_dataflow_expiration(
1064 &self,
1065 time_dependence: &TimeDependence,
1066 until: &Antichain<mz_repr::Timestamp>,
1067 ) -> Antichain<mz_repr::Timestamp> {
1068 let iter = self
1073 .compute_state
1074 .replica_expiration
1075 .iter()
1076 .filter_map(|t| time_dependence.apply(*t))
1077 .filter_map(|t| mz_repr::Timestamp::try_step_forward(&t))
1078 .filter(|expiration| !until.less_equal(expiration));
1079 Antichain::from_iter(iter)
1080 }
1081}
1082
1083pub enum PendingPeek {
1088 Index(IndexPeek),
1090 Persist(PersistPeek),
1092 Stash(peek_stash::StashingPeek),
1095}
1096
1097impl PendingPeek {
1098 pub fn as_log_event(&self, installed: bool) -> ComputeEvent {
1100 let peek = self.peek();
1101 let (id, peek_type) = match &peek.target {
1102 PeekTarget::Index { id } => (*id, logging::compute::PeekType::Index),
1103 PeekTarget::Persist { id, .. } => (*id, logging::compute::PeekType::Persist),
1104 };
1105 let uuid = peek.uuid.into_bytes();
1106 ComputeEvent::Peek(PeekEvent {
1107 id,
1108 time: peek.timestamp,
1109 uuid,
1110 peek_type,
1111 installed,
1112 })
1113 }
1114
1115 fn index(peek: Peek, mut trace_bundle: TraceBundle) -> Self {
1116 let empty_frontier = Antichain::new();
1117 let timestamp_frontier = Antichain::from_elem(peek.timestamp);
1118 trace_bundle
1119 .oks_mut()
1120 .set_logical_compaction(timestamp_frontier.borrow());
1121 trace_bundle
1122 .errs_mut()
1123 .set_logical_compaction(timestamp_frontier.borrow());
1124 trace_bundle
1125 .oks_mut()
1126 .set_physical_compaction(empty_frontier.borrow());
1127 trace_bundle
1128 .errs_mut()
1129 .set_physical_compaction(empty_frontier.borrow());
1130
1131 PendingPeek::Index(IndexPeek {
1132 peek,
1133 trace_bundle,
1134 span: tracing::Span::current(),
1135 })
1136 }
1137
1138 fn persist<A: Allocate>(
1139 peek: Peek,
1140 persist_clients: Arc<PersistClientCache>,
1141 metadata: CollectionMetadata,
1142 max_result_size: usize,
1143 timely_worker: &TimelyWorker<A>,
1144 ) -> Self {
1145 let active_worker = {
1146 let chosen_index = usize::cast_from(peek.uuid.hashed()) % timely_worker.peers();
1148 chosen_index == timely_worker.index()
1149 };
1150 let activator = timely_worker.sync_activator_for([].into());
1151 let peek_uuid = peek.uuid;
1152
1153 let (result_tx, result_rx) = oneshot::channel();
1154 let timestamp = peek.timestamp;
1155 let mfp_plan = peek.map_filter_project.clone();
1156 let max_results_needed = peek
1157 .finishing
1158 .limit
1159 .map(|l| usize::cast_from(u64::from(l)))
1160 .unwrap_or(usize::MAX)
1161 + peek.finishing.offset;
1162 let order_by = peek.finishing.order_by.clone();
1163
1164 let literal_constraint = peek
1166 .literal_constraints
1167 .clone()
1168 .map(|rows| rows.into_element());
1169
1170 let task_handle = mz_ore::task::spawn(|| "persist::peek", async move {
1171 let start = Instant::now();
1172 let result = if active_worker {
1173 PersistPeek::do_peek(
1174 &persist_clients,
1175 metadata,
1176 timestamp,
1177 literal_constraint,
1178 mfp_plan,
1179 max_result_size,
1180 max_results_needed,
1181 )
1182 .await
1183 } else {
1184 Ok(vec![])
1185 };
1186 let result = match result {
1187 Ok(rows) => PeekResponse::Rows(RowCollection::new(rows, &order_by)),
1188 Err(e) => PeekResponse::Error(e.to_string()),
1189 };
1190 match result_tx.send((result, start.elapsed())) {
1191 Ok(()) => {}
1192 Err((_result, elapsed)) => {
1193 debug!(duration =? elapsed, "dropping result for cancelled peek {peek_uuid}")
1194 }
1195 }
1196 match activator.activate() {
1197 Ok(()) => {}
1198 Err(_) => {
1199 debug!("unable to wake timely after completed peek {peek_uuid}");
1200 }
1201 }
1202 });
1203 PendingPeek::Persist(PersistPeek {
1204 peek,
1205 _abort_handle: task_handle.abort_on_drop(),
1206 result: result_rx,
1207 span: tracing::Span::current(),
1208 })
1209 }
1210
1211 fn span(&self) -> &tracing::Span {
1212 match self {
1213 PendingPeek::Index(p) => &p.span,
1214 PendingPeek::Persist(p) => &p.span,
1215 PendingPeek::Stash(p) => &p.span,
1216 }
1217 }
1218
1219 pub(crate) fn peek(&self) -> &Peek {
1220 match self {
1221 PendingPeek::Index(p) => &p.peek,
1222 PendingPeek::Persist(p) => &p.peek,
1223 PendingPeek::Stash(p) => &p.peek,
1224 }
1225 }
1226}
1227
1228pub struct PersistPeek {
1233 pub(crate) peek: Peek,
1234 _abort_handle: AbortOnDropHandle<()>,
1237 result: oneshot::Receiver<(PeekResponse, Duration)>,
1239 span: tracing::Span,
1241}
1242
1243impl PersistPeek {
1244 async fn do_peek(
1245 persist_clients: &PersistClientCache,
1246 metadata: CollectionMetadata,
1247 as_of: Timestamp,
1248 literal_constraint: Option<Row>,
1249 mfp_plan: SafeMfpPlan,
1250 max_result_size: usize,
1251 mut limit_remaining: usize,
1252 ) -> Result<Vec<(Row, NonZeroUsize)>, String> {
1253 let client = persist_clients
1254 .open(metadata.persist_location)
1255 .await
1256 .map_err(|e| e.to_string())?;
1257
1258 let mut reader: ReadHandle<SourceData, (), Timestamp, StorageDiff> = client
1259 .open_leased_reader(
1260 metadata.data_shard,
1261 Arc::new(metadata.relation_desc.clone()),
1262 Arc::new(UnitSchema),
1263 Diagnostics::from_purpose("persist::peek"),
1264 USE_CRITICAL_SINCE_SNAPSHOT.get(client.dyncfgs()),
1265 )
1266 .await
1267 .map_err(|e| e.to_string())?;
1268
1269 let mut txns_read = if let Some(txns_id) = metadata.txns_shard {
1276 Some(TxnsCache::open(&client, txns_id, Some(metadata.data_shard)).await)
1277 } else {
1278 None
1279 };
1280
1281 let metrics = client.metrics();
1282
1283 let mut cursor = StatsCursor::new(
1284 &mut reader,
1285 txns_read.as_mut(),
1286 metrics,
1287 &mfp_plan,
1288 &metadata.relation_desc,
1289 Antichain::from_elem(as_of),
1290 )
1291 .await
1292 .map_err(|since| {
1293 format!("attempted to peek at {as_of}, but the since has advanced to {since:?}")
1294 })?;
1295
1296 let mut result = vec![];
1298 let mut datum_vec = DatumVec::new();
1299 let mut row_builder = Row::default();
1300 let arena = RowArena::new();
1301 let mut total_size = 0usize;
1302
1303 let literal_len = match &literal_constraint {
1304 None => 0,
1305 Some(row) => row.iter().count(),
1306 };
1307
1308 'collect: while limit_remaining > 0 {
1309 let Some(batch) = cursor.next().await else {
1310 break;
1311 };
1312 for (data, _, d) in batch {
1313 let row = data.map_err(|e| e.to_string())?;
1314
1315 if let Some(literal) = &literal_constraint {
1316 match row.iter().take(literal_len).cmp(literal.iter()) {
1317 Ordering::Less => continue,
1318 Ordering::Equal => {}
1319 Ordering::Greater => break 'collect,
1320 }
1321 }
1322
1323 let count: usize = d.try_into().map_err(|_| {
1324 tracing::error!(
1325 shard = %metadata.data_shard, diff = d, ?row,
1326 "persist peek encountered negative multiplicities",
1327 );
1328 format!(
1329 "Invalid data in source, \
1330 saw retractions ({}) for row that does not exist: {:?}",
1331 -d, row,
1332 )
1333 })?;
1334 let Some(count) = NonZeroUsize::new(count) else {
1335 continue;
1336 };
1337 let mut datum_local = datum_vec.borrow_with(&row);
1338 let eval_result = mfp_plan
1339 .evaluate_into(&mut datum_local, &arena, &mut row_builder)
1340 .map(|row| row.cloned())
1341 .map_err(|e| e.to_string())?;
1342 if let Some(row) = eval_result {
1343 total_size = total_size
1344 .saturating_add(row.byte_len())
1345 .saturating_add(std::mem::size_of::<NonZeroUsize>());
1346 if total_size > max_result_size {
1347 return Err(format!(
1348 "result exceeds max size of {}",
1349 ByteSize::b(u64::cast_from(max_result_size))
1350 ));
1351 }
1352 result.push((row, count));
1353 limit_remaining = limit_remaining.saturating_sub(count.get());
1354 if limit_remaining == 0 {
1355 break;
1356 }
1357 }
1358 }
1359 }
1360
1361 Ok(result)
1362 }
1363}
1364
1365pub struct IndexPeek {
1367 peek: Peek,
1368 trace_bundle: TraceBundle,
1370 span: tracing::Span,
1372}
1373
1374pub(crate) struct IndexPeekMetrics<'a> {
1379 pub seek_fulfillment_seconds: &'a prometheus::Histogram,
1380 pub frontier_check_seconds: &'a prometheus::Histogram,
1381 pub error_scan_seconds: &'a prometheus::Histogram,
1382 pub cursor_setup_seconds: &'a prometheus::Histogram,
1383 pub row_iteration_seconds: &'a prometheus::Histogram,
1384 pub result_sort_seconds: &'a prometheus::Histogram,
1385 pub row_collection_seconds: &'a prometheus::Histogram,
1386}
1387
1388impl IndexPeek {
1389 fn seek_fulfillment(
1402 &mut self,
1403 upper: &mut Antichain<Timestamp>,
1404 max_result_size: u64,
1405 peek_stash_eligible: bool,
1406 peek_stash_threshold_bytes: usize,
1407 metrics: &IndexPeekMetrics<'_>,
1408 ) -> PeekStatus {
1409 let method_start = Instant::now();
1410
1411 self.trace_bundle.oks_mut().read_upper(upper);
1412 if upper.less_equal(&self.peek.timestamp) {
1413 return PeekStatus::NotReady;
1414 }
1415 self.trace_bundle.errs_mut().read_upper(upper);
1416 if upper.less_equal(&self.peek.timestamp) {
1417 return PeekStatus::NotReady;
1418 }
1419
1420 let read_frontier = self.trace_bundle.compaction_frontier();
1421 if !read_frontier.less_equal(&self.peek.timestamp) {
1422 let error = format!(
1423 "Arrangement compaction frontier ({:?}) is beyond the time of the attempted read ({})",
1424 read_frontier.elements(),
1425 self.peek.timestamp,
1426 );
1427 return PeekStatus::Ready(PeekResponse::Error(error));
1428 }
1429
1430 metrics
1431 .frontier_check_seconds
1432 .observe(method_start.elapsed().as_secs_f64());
1433
1434 let result = self.collect_finished_data(
1435 max_result_size,
1436 peek_stash_eligible,
1437 peek_stash_threshold_bytes,
1438 metrics,
1439 );
1440
1441 metrics
1442 .seek_fulfillment_seconds
1443 .observe(method_start.elapsed().as_secs_f64());
1444
1445 result
1446 }
1447
1448 fn collect_finished_data(
1450 &mut self,
1451 max_result_size: u64,
1452 peek_stash_eligible: bool,
1453 peek_stash_threshold_bytes: usize,
1454 metrics: &IndexPeekMetrics<'_>,
1455 ) -> PeekStatus {
1456 let error_scan_start = Instant::now();
1457
1458 let (mut cursor, storage) = self.trace_bundle.errs_mut().cursor();
1461 while cursor.key_valid(&storage) {
1462 let mut copies = Diff::ZERO;
1463 cursor.map_times(&storage, |time, diff| {
1464 if time.less_equal(&self.peek.timestamp) {
1465 copies += diff;
1466 }
1467 });
1468 if copies.is_negative() {
1469 let error = cursor.key(&storage);
1470 tracing::error!(
1471 target = %self.peek.target.id(), diff = %copies, %error,
1472 "index peek encountered negative multiplicities in error trace",
1473 );
1474 return PeekStatus::Ready(PeekResponse::Error(format!(
1475 "Invalid data in source errors, \
1476 saw retractions ({}) for row that does not exist: {}",
1477 -copies, error,
1478 )));
1479 }
1480 if copies.is_positive() {
1481 return PeekStatus::Ready(PeekResponse::Error(cursor.key(&storage).to_string()));
1482 }
1483 cursor.step_key(&storage);
1484 }
1485
1486 metrics
1487 .error_scan_seconds
1488 .observe(error_scan_start.elapsed().as_secs_f64());
1489
1490 Self::collect_ok_finished_data(
1491 &self.peek,
1492 self.trace_bundle.oks_mut(),
1493 max_result_size,
1494 peek_stash_eligible,
1495 peek_stash_threshold_bytes,
1496 metrics,
1497 )
1498 }
1499
1500 fn collect_ok_finished_data<Tr>(
1502 peek: &Peek<Timestamp>,
1503 oks_handle: &mut Tr,
1504 max_result_size: u64,
1505 peek_stash_eligible: bool,
1506 peek_stash_threshold_bytes: usize,
1507 metrics: &IndexPeekMetrics<'_>,
1508 ) -> PeekStatus
1509 where
1510 for<'a> Tr: TraceReader<
1511 Key<'a>: ToDatumIter + Eq,
1512 KeyOwn = Row,
1513 Val<'a>: ToDatumIter,
1514 TimeGat<'a>: PartialOrder<mz_repr::Timestamp>,
1515 DiffGat<'a> = &'a Diff,
1516 >,
1517 {
1518 let max_result_size = usize::cast_from(max_result_size);
1519 let count_byte_size = size_of::<NonZeroUsize>();
1520
1521 let cursor_setup_start = Instant::now();
1523
1524 let mut peek_iterator = peek_result_iterator::PeekResultIterator::new(
1527 peek.target.id().clone(),
1528 peek.map_filter_project.clone(),
1529 peek.timestamp,
1530 peek.literal_constraints.clone().as_deref_mut(),
1531 oks_handle,
1532 );
1533
1534 metrics
1535 .cursor_setup_seconds
1536 .observe(cursor_setup_start.elapsed().as_secs_f64());
1537
1538 let mut results = Vec::new();
1540 let mut total_size: usize = 0;
1541
1542 let max_results = peek.finishing.num_rows_needed();
1548
1549 let mut l_datum_vec = DatumVec::new();
1550 let mut r_datum_vec = DatumVec::new();
1551
1552 let row_iteration_start = Instant::now();
1554 let mut sort_time_accum = Duration::ZERO;
1555
1556 while let Some(row) = peek_iterator.next() {
1557 let row = match row {
1558 Ok(row) => row,
1559 Err(err) => return PeekStatus::Ready(PeekResponse::Error(err)),
1560 };
1561 let (row, copies) = row;
1562 let copies: NonZeroUsize = NonZeroUsize::try_from(copies).expect("fits into usize");
1563
1564 total_size = total_size
1565 .saturating_add(row.byte_len())
1566 .saturating_add(count_byte_size);
1567 if peek_stash_eligible && total_size > peek_stash_threshold_bytes {
1568 return PeekStatus::UsePeekStash;
1569 }
1570 if total_size > max_result_size {
1571 return PeekStatus::Ready(PeekResponse::Error(format!(
1572 "result exceeds max size of {}",
1573 ByteSize::b(u64::cast_from(max_result_size))
1574 )));
1575 }
1576
1577 results.push((row, copies));
1578
1579 if let Some(max_results) = max_results {
1582 if results.len() >= 2 * max_results {
1586 if peek.finishing.order_by.is_empty() {
1587 results.truncate(max_results);
1588 metrics
1589 .row_iteration_seconds
1590 .observe(row_iteration_start.elapsed().as_secs_f64());
1591 metrics
1592 .result_sort_seconds
1593 .observe(sort_time_accum.as_secs_f64());
1594 let row_collection_start = Instant::now();
1595 let collection = RowCollection::new(results, &peek.finishing.order_by);
1596 metrics
1597 .row_collection_seconds
1598 .observe(row_collection_start.elapsed().as_secs_f64());
1599 return PeekStatus::Ready(PeekResponse::Rows(collection));
1600 } else {
1601 let sort_start = Instant::now();
1610 results.sort_by(|left, right| {
1611 let left_datums = l_datum_vec.borrow_with(&left.0);
1612 let right_datums = r_datum_vec.borrow_with(&right.0);
1613 mz_expr::compare_columns(
1614 &peek.finishing.order_by,
1615 &left_datums,
1616 &right_datums,
1617 || left.0.cmp(&right.0),
1618 )
1619 });
1620 sort_time_accum += sort_start.elapsed();
1621 let dropped = results.drain(max_results..);
1622 let dropped_size =
1623 dropped.into_iter().fold(0, |acc: usize, (row, _count)| {
1624 acc.saturating_add(row.byte_len().saturating_add(count_byte_size))
1625 });
1626 total_size = total_size.saturating_sub(dropped_size);
1627 }
1628 }
1629 }
1630 }
1631
1632 metrics
1633 .row_iteration_seconds
1634 .observe(row_iteration_start.elapsed().as_secs_f64());
1635 metrics
1636 .result_sort_seconds
1637 .observe(sort_time_accum.as_secs_f64());
1638
1639 let row_collection_start = Instant::now();
1640 let collection = RowCollection::new(results, &peek.finishing.order_by);
1641 metrics
1642 .row_collection_seconds
1643 .observe(row_collection_start.elapsed().as_secs_f64());
1644 PeekStatus::Ready(PeekResponse::Rows(collection))
1645 }
1646}
1647
1648enum PeekStatus {
1651 NotReady,
1654 UsePeekStash,
1657 Ready(PeekResponse),
1659}
1660
1661#[derive(Debug)]
1663struct ReportedFrontiers {
1664 write_frontier: ReportedFrontier,
1666 input_frontier: ReportedFrontier,
1668 output_frontier: ReportedFrontier,
1670}
1671
1672impl ReportedFrontiers {
1673 fn new() -> Self {
1675 Self {
1676 write_frontier: ReportedFrontier::new(),
1677 input_frontier: ReportedFrontier::new(),
1678 output_frontier: ReportedFrontier::new(),
1679 }
1680 }
1681}
1682
1683#[derive(Clone, Debug)]
1685pub enum ReportedFrontier {
1686 Reported(Antichain<Timestamp>),
1688 NotReported {
1690 lower: Antichain<Timestamp>,
1692 },
1693}
1694
1695impl ReportedFrontier {
1696 pub fn new() -> Self {
1698 let lower = Antichain::from_elem(timely::progress::Timestamp::minimum());
1699 Self::NotReported { lower }
1700 }
1701
1702 pub fn is_empty(&self) -> bool {
1704 match self {
1705 Self::Reported(frontier) => frontier.is_empty(),
1706 Self::NotReported { .. } => false,
1707 }
1708 }
1709
1710 fn allows_reporting(&self, other: &Antichain<Timestamp>) -> bool {
1716 match self {
1717 Self::Reported(frontier) => PartialOrder::less_than(frontier, other),
1718 Self::NotReported { lower } => PartialOrder::less_equal(lower, other),
1719 }
1720 }
1721}
1722
1723pub struct CollectionState {
1725 reported_frontiers: ReportedFrontiers,
1727 dataflow_index: Rc<usize>,
1733 pub is_subscribe_or_copy: bool,
1739 as_of: Antichain<Timestamp>,
1743
1744 pub sink_token: Option<SinkToken>,
1749 pub sink_write_frontier: Option<Rc<RefCell<Antichain<Timestamp>>>>,
1753 pub input_probes: BTreeMap<GlobalId, probe::Handle<Timestamp>>,
1755 pub compute_probe: Option<probe::Handle<Timestamp>>,
1760 logging: Option<CollectionLogging>,
1762 metrics: CollectionMetrics,
1764 read_only_tx: watch::Sender<bool>,
1776 pub read_only_rx: watch::Receiver<bool>,
1778}
1779
1780impl CollectionState {
1781 fn new(
1782 dataflow_index: Rc<usize>,
1783 is_subscribe_or_copy: bool,
1784 as_of: Antichain<Timestamp>,
1785 metrics: CollectionMetrics,
1786 ) -> Self {
1787 let (read_only_tx, read_only_rx) = watch::channel(true);
1790
1791 Self {
1792 reported_frontiers: ReportedFrontiers::new(),
1793 dataflow_index,
1794 is_subscribe_or_copy,
1795 as_of,
1796 sink_token: None,
1797 sink_write_frontier: None,
1798 input_probes: Default::default(),
1799 compute_probe: None,
1800 logging: None,
1801 metrics,
1802 read_only_tx,
1803 read_only_rx,
1804 }
1805 }
1806
1807 fn reported_frontiers(&self) -> &ReportedFrontiers {
1809 &self.reported_frontiers
1810 }
1811
1812 pub fn reset_reported_frontiers(&mut self, frontier: ReportedFrontier) {
1814 self.reported_frontiers.write_frontier = frontier.clone();
1815 self.reported_frontiers.input_frontier = frontier.clone();
1816 self.reported_frontiers.output_frontier = frontier;
1817 }
1818
1819 fn set_reported_write_frontier(&mut self, frontier: ReportedFrontier) {
1821 if let Some(logging) = &mut self.logging {
1822 let time = match &frontier {
1823 ReportedFrontier::Reported(frontier) => frontier.get(0).copied(),
1824 ReportedFrontier::NotReported { .. } => Some(Timestamp::MIN),
1825 };
1826 logging.set_frontier(time);
1827 }
1828
1829 self.reported_frontiers.write_frontier = frontier;
1830 }
1831
1832 fn set_reported_input_frontier(&mut self, frontier: ReportedFrontier) {
1834 if let Some(logging) = &mut self.logging {
1836 for (id, probe) in &self.input_probes {
1837 let new_time = probe.with_frontier(|frontier| frontier.as_option().copied());
1838 logging.set_import_frontier(*id, new_time);
1839 }
1840 }
1841
1842 self.reported_frontiers.input_frontier = frontier;
1843 }
1844
1845 fn set_reported_output_frontier(&mut self, frontier: ReportedFrontier) {
1847 let already_hydrated = self.hydrated();
1848
1849 self.reported_frontiers.output_frontier = frontier;
1850
1851 if !already_hydrated && self.hydrated() {
1852 if let Some(logging) = &mut self.logging {
1853 logging.set_hydrated();
1854 }
1855 self.metrics.record_collection_hydrated();
1856 }
1857 }
1858
1859 fn hydrated(&self) -> bool {
1861 match &self.reported_frontiers.output_frontier {
1862 ReportedFrontier::Reported(frontier) => PartialOrder::less_than(&self.as_of, frontier),
1863 ReportedFrontier::NotReported { .. } => false,
1864 }
1865 }
1866
1867 fn allow_writes(&self) {
1869 info!(
1870 dataflow_index = *self.dataflow_index,
1871 export = ?self.logging.as_ref().map(|l| l.export_id()),
1872 "allowing writes for dataflow",
1873 );
1874 let _ = self.read_only_tx.send(false);
1875 }
1876}