1use std::any::Any;
9use std::cell::RefCell;
10use std::cmp::Ordering;
11use std::collections::{BTreeMap, BTreeSet};
12use std::num::NonZeroUsize;
13use std::rc::Rc;
14use std::sync::Arc;
15use std::time::{Duration, Instant};
16
17use bytesize::ByteSize;
18use differential_dataflow::Hashable;
19use differential_dataflow::lattice::Lattice;
20use differential_dataflow::trace::{Cursor, TraceReader};
21use mz_compute_client::logging::LoggingConfig;
22use mz_compute_client::protocol::command::{
23 ComputeCommand, ComputeParameters, InstanceConfig, Peek, PeekTarget,
24};
25use mz_compute_client::protocol::history::ComputeCommandHistory;
26use mz_compute_client::protocol::response::{
27 ComputeResponse, CopyToResponse, FrontiersResponse, PeekResponse, SubscribeResponse,
28};
29use mz_compute_types::dataflows::DataflowDescription;
30use mz_compute_types::dyncfgs::{
31 ENABLE_PEEK_RESPONSE_STASH, PEEK_RESPONSE_STASH_BATCH_MAX_RUNS,
32 PEEK_RESPONSE_STASH_THRESHOLD_BYTES, PEEK_STASH_BATCH_SIZE, PEEK_STASH_NUM_BATCHES,
33};
34use mz_compute_types::plan::render_plan::RenderPlan;
35use mz_dyncfg::ConfigSet;
36use mz_expr::SafeMfpPlan;
37use mz_expr::row::RowCollection;
38use mz_ore::cast::CastFrom;
39use mz_ore::collections::CollectionExt;
40use mz_ore::metrics::UIntGauge;
41use mz_ore::now::EpochMillis;
42use mz_ore::soft_panic_or_log;
43use mz_ore::task::AbortOnDropHandle;
44use mz_ore::tracing::{OpenTelemetryContext, TracingHandle};
45use mz_persist_client::Diagnostics;
46use mz_persist_client::cache::PersistClientCache;
47use mz_persist_client::cfg::USE_CRITICAL_SINCE_SNAPSHOT;
48use mz_persist_client::read::ReadHandle;
49use mz_persist_types::PersistLocation;
50use mz_persist_types::codec_impls::UnitSchema;
51use mz_repr::fixed_length::ToDatumIter;
52use mz_repr::{DatumVec, Diff, GlobalId, Row, RowArena, Timestamp};
53use mz_storage_operators::stats::StatsCursor;
54use mz_storage_types::StorageDiff;
55use mz_storage_types::controller::CollectionMetadata;
56use mz_storage_types::dyncfgs::ORE_OVERFLOWING_BEHAVIOR;
57use mz_storage_types::sources::SourceData;
58use mz_storage_types::time_dependence::TimeDependence;
59use mz_txn_wal::operator::TxnsContext;
60use mz_txn_wal::txn_cache::TxnsCache;
61use timely::communication::Allocate;
62use timely::dataflow::operators::probe;
63use timely::order::PartialOrder;
64use timely::progress::frontier::Antichain;
65use timely::scheduling::Scheduler;
66use timely::worker::Worker as TimelyWorker;
67use tokio::sync::{oneshot, watch};
68use tracing::{Level, debug, error, info, span, warn};
69use uuid::Uuid;
70
71use crate::arrangement::manager::{TraceBundle, TraceManager};
72use crate::logging;
73use crate::logging::compute::{CollectionLogging, ComputeEvent, PeekEvent};
74use crate::logging::initialize::LoggingTraces;
75use crate::metrics::{CollectionMetrics, WorkerMetrics};
76use crate::render::{LinearJoinSpec, StartSignal};
77use crate::server::{ComputeInstanceContext, ResponseSender};
78
79mod peek_result_iterator;
80mod peek_stash;
81
82pub struct ComputeState {
87 pub collections: BTreeMap<GlobalId, CollectionState>,
96 pub traces: TraceManager,
98 pub subscribe_response_buffer: Rc<RefCell<Vec<(GlobalId, SubscribeResponse)>>>,
103 pub copy_to_response_buffer: Rc<RefCell<Vec<(GlobalId, CopyToResponse)>>>,
108 pub pending_peeks: BTreeMap<Uuid, PendingPeek>,
110 pub peek_stash_persist_location: Option<PersistLocation>,
112 pub compute_logger: Option<logging::compute::Logger>,
114 pub persist_clients: Arc<PersistClientCache>,
117 pub txns_ctx: TxnsContext,
119 pub command_history: ComputeCommandHistory<UIntGauge>,
121 max_result_size: u64,
123 pub linear_join_spec: LinearJoinSpec,
125 pub metrics: WorkerMetrics,
127 tracing_handle: Arc<TracingHandle>,
129 pub context: ComputeInstanceContext,
131 pub worker_config: Rc<ConfigSet>,
145
146 suspended_collections: BTreeMap<GlobalId, Rc<dyn Any>>,
152
153 pub server_maintenance_interval: Duration,
156
157 pub init_system_time: EpochMillis,
161
162 pub replica_expiration: Antichain<Timestamp>,
166}
167
168impl ComputeState {
169 pub fn new(
171 persist_clients: Arc<PersistClientCache>,
172 txns_ctx: TxnsContext,
173 metrics: WorkerMetrics,
174 tracing_handle: Arc<TracingHandle>,
175 context: ComputeInstanceContext,
176 ) -> Self {
177 let traces = TraceManager::new(metrics.clone());
178 let command_history = ComputeCommandHistory::new(metrics.for_history());
179
180 Self {
181 collections: Default::default(),
182 traces,
183 subscribe_response_buffer: Default::default(),
184 copy_to_response_buffer: Default::default(),
185 pending_peeks: Default::default(),
186 peek_stash_persist_location: None,
187 compute_logger: None,
188 persist_clients,
189 txns_ctx,
190 command_history,
191 max_result_size: u64::MAX,
192 linear_join_spec: Default::default(),
193 metrics,
194 tracing_handle,
195 context,
196 worker_config: mz_dyncfgs::all_dyncfgs().into(),
197 suspended_collections: Default::default(),
198 server_maintenance_interval: Duration::ZERO,
199 init_system_time: mz_ore::now::SYSTEM_TIME(),
200 replica_expiration: Antichain::default(),
201 }
202 }
203
204 pub fn expect_collection_mut(&mut self, id: GlobalId) -> &mut CollectionState {
208 self.collections
209 .get_mut(&id)
210 .expect("collection must exist")
211 }
212
213 pub fn input_probe_for(
219 &mut self,
220 input_id: GlobalId,
221 collection_ids: impl Iterator<Item = GlobalId>,
222 ) -> probe::Handle<Timestamp> {
223 let probe = probe::Handle::default();
224 for id in collection_ids {
225 if let Some(collection) = self.collections.get_mut(&id) {
226 collection.input_probes.insert(input_id, probe.clone());
227 }
228 }
229 probe
230 }
231
232 fn apply_worker_config(&mut self) {
234 use mz_compute_types::dyncfgs::*;
235
236 let config = &self.worker_config;
237
238 self.linear_join_spec = LinearJoinSpec::from_config(config);
239
240 if ENABLE_LGALLOC.get(config) {
241 if let Some(path) = &self.context.scratch_directory {
242 let clear_bytes = LGALLOC_SLOW_CLEAR_BYTES.get(config);
243 let eager_return = ENABLE_LGALLOC_EAGER_RECLAMATION.get(config);
244 let file_growth_dampener = LGALLOC_FILE_GROWTH_DAMPENER.get(config);
245 let interval = LGALLOC_BACKGROUND_INTERVAL.get(config);
246 let local_buffer_bytes = LGALLOC_LOCAL_BUFFER_BYTES.get(config);
247 info!(
248 ?path,
249 backgrund_interval=?interval,
250 clear_bytes,
251 eager_return,
252 file_growth_dampener,
253 local_buffer_bytes,
254 "enabling lgalloc"
255 );
256 let background_worker_config = lgalloc::BackgroundWorkerConfig {
257 interval,
258 clear_bytes,
259 };
260 lgalloc::lgalloc_set_config(
261 lgalloc::LgAlloc::new()
262 .enable()
263 .with_path(path.clone())
264 .with_background_config(background_worker_config)
265 .eager_return(eager_return)
266 .file_growth_dampener(file_growth_dampener)
267 .local_buffer_bytes(local_buffer_bytes),
268 );
269 } else {
270 debug!("not enabling lgalloc, scratch directory not specified");
271 }
272 } else {
273 info!("disabling lgalloc");
274 lgalloc::lgalloc_set_config(lgalloc::LgAlloc::new().disable());
275 }
276
277 crate::memory_limiter::apply_limiter_config(config);
278
279 mz_ore::region::ENABLE_LGALLOC_REGION.store(
280 ENABLE_COLUMNATION_LGALLOC.get(config),
281 std::sync::atomic::Ordering::Relaxed,
282 );
283
284 let enable_columnar_lgalloc = ENABLE_COLUMNAR_LGALLOC.get(config);
285 mz_timely_util::containers::set_enable_columnar_lgalloc(enable_columnar_lgalloc);
286
287 self.server_maintenance_interval = COMPUTE_SERVER_MAINTENANCE_INTERVAL.get(config);
290
291 let overflowing_behavior = ORE_OVERFLOWING_BEHAVIOR.get(config);
292 match overflowing_behavior.parse() {
293 Ok(behavior) => mz_ore::overflowing::set_behavior(behavior),
294 Err(err) => {
295 error!(
296 err,
297 overflowing_behavior, "Invalid value for ore_overflowing_behavior"
298 );
299 }
300 }
301 }
302
303 pub fn apply_expiration_offset(&mut self, offset: Duration) {
309 if self.replica_expiration.is_empty() {
310 let offset: EpochMillis = offset
311 .as_millis()
312 .try_into()
313 .expect("duration must fit within u64");
314 let replica_expiration_millis = self.init_system_time + offset;
315 let replica_expiration = Timestamp::from(replica_expiration_millis);
316
317 info!(
318 offset = %offset,
319 replica_expiration_millis = %replica_expiration_millis,
320 replica_expiration_utc = %mz_ore::now::to_datetime(replica_expiration_millis),
321 "setting replica expiration",
322 );
323 self.replica_expiration = Antichain::from_elem(replica_expiration);
324
325 self.metrics
327 .replica_expiration_timestamp_seconds
328 .set(replica_expiration.into());
329 }
330 }
331
332 pub fn dataflow_max_inflight_bytes(&self) -> Option<usize> {
335 use mz_compute_types::dyncfgs::{
336 DATAFLOW_MAX_INFLIGHT_BYTES, DATAFLOW_MAX_INFLIGHT_BYTES_CC,
337 };
338
339 if self.persist_clients.cfg.is_cc_active {
340 DATAFLOW_MAX_INFLIGHT_BYTES_CC.get(&self.worker_config)
341 } else {
342 DATAFLOW_MAX_INFLIGHT_BYTES.get(&self.worker_config)
343 }
344 }
345}
346
347pub(crate) struct ActiveComputeState<'a, A: Allocate> {
349 pub timely_worker: &'a mut TimelyWorker<A>,
351 pub compute_state: &'a mut ComputeState,
353 pub response_tx: &'a mut ResponseSender,
355}
356
357pub struct SinkToken(#[allow(dead_code)] Box<dyn Any>);
359
360impl SinkToken {
361 pub fn new(t: Box<dyn Any>) -> Self {
363 Self(t)
364 }
365}
366
367impl<'a, A: Allocate + 'static> ActiveComputeState<'a, A> {
368 #[mz_ore::instrument(level = "debug")]
370 pub fn handle_compute_command(&mut self, cmd: ComputeCommand) {
371 use ComputeCommand::*;
372
373 self.compute_state.command_history.push(cmd.clone());
374
375 let timer = self
377 .compute_state
378 .metrics
379 .handle_command_duration_seconds
380 .for_command(&cmd)
381 .start_timer();
382
383 match cmd {
384 Hello { .. } => panic!("Hello must be captured before"),
385 CreateInstance(instance_config) => self.handle_create_instance(*instance_config),
386 InitializationComplete => (),
387 UpdateConfiguration(params) => self.handle_update_configuration(*params),
388 CreateDataflow(dataflow) => self.handle_create_dataflow(*dataflow),
389 Schedule(id) => self.handle_schedule(id),
390 AllowCompaction { id, frontier } => self.handle_allow_compaction(id, frontier),
391 Peek(peek) => {
392 peek.otel_ctx.attach_as_parent();
393 self.handle_peek(*peek)
394 }
395 CancelPeek { uuid } => self.handle_cancel_peek(uuid),
396 AllowWrites(id) => {
397 self.handle_allow_writes(id);
398 }
399 }
400
401 timer.observe_duration();
402 }
403
404 fn handle_create_instance(&mut self, config: InstanceConfig) {
405 self.compute_state.apply_worker_config();
407 if let Some(offset) = config.expiration_offset {
408 self.compute_state.apply_expiration_offset(offset);
409 }
410
411 self.initialize_logging(config.logging);
412
413 self.compute_state.peek_stash_persist_location = Some(config.peek_stash_persist_location);
414 }
415
416 fn handle_update_configuration(&mut self, params: ComputeParameters) {
417 info!("Applying configuration update: {params:?}");
418
419 let ComputeParameters {
420 workload_class,
421 max_result_size,
422 tracing,
423 grpc_client: _grpc_client,
424 dyncfg_updates,
425 } = params;
426
427 if let Some(v) = workload_class {
428 self.compute_state.metrics.set_workload_class(v);
429 }
430 if let Some(v) = max_result_size {
431 self.compute_state.max_result_size = v;
432 }
433
434 tracing.apply(self.compute_state.tracing_handle.as_ref());
435
436 dyncfg_updates.apply(&self.compute_state.worker_config);
437 self.compute_state
438 .persist_clients
439 .cfg()
440 .apply_from(&dyncfg_updates);
441
442 mz_metrics::update_dyncfg(&dyncfg_updates);
446
447 self.compute_state.apply_worker_config();
448 }
449
450 fn handle_create_dataflow(
451 &mut self,
452 dataflow: DataflowDescription<RenderPlan, CollectionMetadata>,
453 ) {
454 let dataflow_index = Rc::new(self.timely_worker.next_dataflow_index());
455 let as_of = dataflow.as_of.clone().unwrap();
456
457 let dataflow_expiration = dataflow
458 .time_dependence
459 .as_ref()
460 .map(|time_dependence| {
461 self.determine_dataflow_expiration(time_dependence, &dataflow.until)
462 })
463 .unwrap_or_default();
464
465 let until = dataflow.until.meet(&dataflow_expiration);
467
468 if dataflow.is_transient() {
469 debug!(
470 name = %dataflow.debug_name,
471 import_ids = %dataflow.display_import_ids(),
472 export_ids = %dataflow.display_export_ids(),
473 as_of = ?as_of.elements(),
474 time_dependence = ?dataflow.time_dependence,
475 expiration = ?dataflow_expiration.elements(),
476 expiration_datetime = ?dataflow_expiration
477 .as_option()
478 .map(|t| mz_ore::now::to_datetime(t.into())),
479 plan_until = ?dataflow.until.elements(),
480 until = ?until.elements(),
481 "creating dataflow",
482 );
483 } else {
484 info!(
485 name = %dataflow.debug_name,
486 import_ids = %dataflow.display_import_ids(),
487 export_ids = %dataflow.display_export_ids(),
488 as_of = ?as_of.elements(),
489 time_dependence = ?dataflow.time_dependence,
490 expiration = ?dataflow_expiration.elements(),
491 expiration_datetime = ?dataflow_expiration
492 .as_option()
493 .map(|t| mz_ore::now::to_datetime(t.into())),
494 plan_until = ?dataflow.until.elements(),
495 until = ?until.elements(),
496 "creating dataflow",
497 );
498 };
499
500 let subscribe_copy_ids: BTreeSet<_> = dataflow
501 .subscribe_ids()
502 .chain(dataflow.copy_to_ids())
503 .collect();
504
505 for object_id in dataflow.export_ids() {
507 let is_subscribe_or_copy = subscribe_copy_ids.contains(&object_id);
508 let metrics = self.compute_state.metrics.for_collection(object_id);
509 let mut collection = CollectionState::new(
510 Rc::clone(&dataflow_index),
511 is_subscribe_or_copy,
512 as_of.clone(),
513 metrics,
514 );
515
516 if let Some(logger) = self.compute_state.compute_logger.clone() {
517 let logging = CollectionLogging::new(
518 object_id,
519 logger,
520 *dataflow_index,
521 dataflow.import_ids(),
522 );
523 collection.logging = Some(logging);
524 }
525
526 collection.reset_reported_frontiers(ReportedFrontier::NotReported {
527 lower: as_of.clone(),
528 });
529
530 let existing = self.compute_state.collections.insert(object_id, collection);
531 if existing.is_some() {
532 error!(
533 id = ?object_id,
534 "existing collection for newly created dataflow",
535 );
536 }
537 }
538
539 let (start_signal, suspension_token) = StartSignal::new();
540 for id in dataflow.export_ids() {
541 self.compute_state
542 .suspended_collections
543 .insert(id, Rc::clone(&suspension_token));
544 }
545
546 crate::render::build_compute_dataflow(
547 self.timely_worker,
548 self.compute_state,
549 dataflow,
550 start_signal,
551 until,
552 dataflow_expiration,
553 );
554 }
555
556 fn handle_schedule(&mut self, id: GlobalId) {
557 let suspension_token = self.compute_state.suspended_collections.remove(&id);
563 drop(suspension_token);
564 }
565
566 fn handle_allow_compaction(&mut self, id: GlobalId, frontier: Antichain<Timestamp>) {
567 if frontier.is_empty() {
568 self.drop_collection(id);
570 } else {
571 self.compute_state
572 .traces
573 .allow_compaction(id, frontier.borrow());
574 }
575 }
576
577 #[mz_ore::instrument(level = "debug")]
578 fn handle_peek(&mut self, peek: Peek) {
579 let pending = match &peek.target {
580 PeekTarget::Index { id } => {
581 let trace_bundle = self.compute_state.traces.get(id).unwrap().clone();
583 PendingPeek::index(peek, trace_bundle)
584 }
585 PeekTarget::Persist { metadata, .. } => {
586 let metadata = metadata.clone();
587 PendingPeek::persist(
588 peek,
589 Arc::clone(&self.compute_state.persist_clients),
590 metadata,
591 usize::cast_from(self.compute_state.max_result_size),
592 self.timely_worker,
593 )
594 }
595 };
596
597 if let Some(logger) = self.compute_state.compute_logger.as_mut() {
599 logger.log(&pending.as_log_event(true));
600 }
601
602 self.process_peek(&mut Antichain::new(), pending);
603 }
604
605 fn handle_cancel_peek(&mut self, uuid: Uuid) {
606 if let Some(peek) = self.compute_state.pending_peeks.remove(&uuid) {
607 self.send_peek_response(peek, PeekResponse::Canceled);
608 }
609 }
610
611 fn handle_allow_writes(&mut self, id: GlobalId) {
612 self.compute_state.persist_clients.cfg().enable_compaction();
616
617 if let Some(collection) = self.compute_state.collections.get_mut(&id) {
618 collection.allow_writes();
619 } else {
620 soft_panic_or_log!("allow writes for unknown collection {id}");
621 }
622 }
623
624 fn drop_collection(&mut self, id: GlobalId) {
626 let collection = self
627 .compute_state
628 .collections
629 .remove(&id)
630 .expect("dropped untracked collection");
631
632 self.compute_state.traces.remove(&id);
634 self.compute_state.suspended_collections.remove(&id);
636
637 if let Ok(index) = Rc::try_unwrap(collection.dataflow_index) {
639 self.timely_worker.drop_dataflow(index);
640 }
641
642 if !collection.is_subscribe_or_copy {
647 let reported = collection.reported_frontiers;
648 let write_frontier = (!reported.write_frontier.is_empty()).then(Antichain::new);
649 let input_frontier = (!reported.input_frontier.is_empty()).then(Antichain::new);
650 let output_frontier = (!reported.output_frontier.is_empty()).then(Antichain::new);
651
652 let frontiers = FrontiersResponse {
653 write_frontier,
654 input_frontier,
655 output_frontier,
656 };
657 if frontiers.has_updates() {
658 self.send_compute_response(ComputeResponse::Frontiers(id, frontiers));
659 }
660 }
661 }
662
663 pub fn initialize_logging(&mut self, config: LoggingConfig) {
665 if self.compute_state.compute_logger.is_some() {
666 panic!("dataflow server has already initialized logging");
667 }
668
669 let LoggingTraces {
670 traces,
671 dataflow_index,
672 compute_logger: logger,
673 } = logging::initialize(self.timely_worker, &config);
674
675 let dataflow_index = Rc::new(dataflow_index);
676 let mut log_index_ids = config.index_logs;
677 for (log, trace) in traces {
678 let id = log_index_ids
680 .remove(&log)
681 .expect("`logging::initialize` does not invent logs");
682 self.compute_state.traces.set(id, trace);
683
684 let is_subscribe_or_copy = false;
686 let as_of = Antichain::from_elem(Timestamp::MIN);
687 let metrics = self.compute_state.metrics.for_collection(id);
688 let mut collection = CollectionState::new(
689 Rc::clone(&dataflow_index),
690 is_subscribe_or_copy,
691 as_of,
692 metrics,
693 );
694
695 let logging =
696 CollectionLogging::new(id, logger.clone(), *dataflow_index, std::iter::empty());
697 collection.logging = Some(logging);
698
699 let existing = self.compute_state.collections.insert(id, collection);
700 if existing.is_some() {
701 error!(
702 id = ?id,
703 "existing collection for newly initialized logging export",
704 );
705 }
706 }
707
708 assert!(
710 log_index_ids.is_empty(),
711 "failed to create requested logging indexes: {log_index_ids:?}",
712 );
713
714 self.compute_state.compute_logger = Some(logger);
715 }
716
717 pub fn report_frontiers(&mut self) {
719 let mut responses = Vec::new();
720
721 let mut new_frontier = Antichain::new();
723
724 for (&id, collection) in self.compute_state.collections.iter_mut() {
725 if collection.is_subscribe_or_copy {
728 continue;
729 }
730
731 let reported = collection.reported_frontiers();
732
733 new_frontier.clear();
735 if let Some(traces) = self.compute_state.traces.get_mut(&id) {
736 assert!(
737 collection.sink_write_frontier.is_none(),
738 "collection {id} has multiple frontiers"
739 );
740 traces.oks_mut().read_upper(&mut new_frontier);
741 } else if let Some(frontier) = &collection.sink_write_frontier {
742 new_frontier.clone_from(&frontier.borrow());
743 } else {
744 error!(id = ?id, "collection without write frontier");
745 continue;
746 }
747 let new_write_frontier = reported
748 .write_frontier
749 .allows_reporting(&new_frontier)
750 .then(|| new_frontier.clone());
751
752 if let Some(probe) = &collection.compute_probe {
761 probe.with_frontier(|frontier| new_frontier.extend(frontier.iter().copied()));
762 }
763 let new_output_frontier = reported
764 .output_frontier
765 .allows_reporting(&new_frontier)
766 .then(|| new_frontier.clone());
767
768 new_frontier.clear();
770 for probe in collection.input_probes.values() {
771 probe.with_frontier(|frontier| new_frontier.extend(frontier.iter().copied()));
772 }
773 let new_input_frontier = reported
774 .input_frontier
775 .allows_reporting(&new_frontier)
776 .then(|| new_frontier.clone());
777
778 if let Some(frontier) = &new_write_frontier {
779 collection
780 .set_reported_write_frontier(ReportedFrontier::Reported(frontier.clone()));
781 }
782 if let Some(frontier) = &new_input_frontier {
783 collection
784 .set_reported_input_frontier(ReportedFrontier::Reported(frontier.clone()));
785 }
786 if let Some(frontier) = &new_output_frontier {
787 collection
788 .set_reported_output_frontier(ReportedFrontier::Reported(frontier.clone()));
789 }
790
791 let response = FrontiersResponse {
792 write_frontier: new_write_frontier,
793 input_frontier: new_input_frontier,
794 output_frontier: new_output_frontier,
795 };
796 if response.has_updates() {
797 responses.push((id, response));
798 }
799 }
800
801 for (id, frontiers) in responses {
802 self.send_compute_response(ComputeResponse::Frontiers(id, frontiers));
803 }
804 }
805
806 pub(crate) fn report_metrics(&self) {
808 if let Some(expiration) = self.compute_state.replica_expiration.as_option() {
809 let now = Duration::from_millis(mz_ore::now::SYSTEM_TIME()).as_secs_f64();
810 let expiration = Duration::from_millis(<u64>::from(expiration)).as_secs_f64();
811 let remaining = expiration - now;
812 self.compute_state
813 .metrics
814 .replica_expiration_remaining_seconds
815 .set(remaining)
816 }
817 }
818
819 fn process_peek(&mut self, upper: &mut Antichain<Timestamp>, mut peek: PendingPeek) {
821 let response = match &mut peek {
822 PendingPeek::Index(peek) => {
823 let start = Instant::now();
824
825 let peek_stash_eligible = peek
826 .peek
827 .finishing
828 .is_streamable(peek.peek.result_desc.arity());
829
830 let peek_stash_enabled = {
831 let enabled = ENABLE_PEEK_RESPONSE_STASH.get(&self.compute_state.worker_config);
832 let peek_persist_stash_available =
833 self.compute_state.peek_stash_persist_location.is_some();
834 if !peek_persist_stash_available && enabled {
835 tracing::error!(
836 "missing peek_stash_persist_location but peek stash is enabled"
837 );
838 }
839 enabled && peek_persist_stash_available
840 };
841
842 let peek_stash_threshold_bytes =
843 PEEK_RESPONSE_STASH_THRESHOLD_BYTES.get(&self.compute_state.worker_config);
844
845 let metrics = IndexPeekMetrics {
846 seek_fulfillment_seconds: &self
847 .compute_state
848 .metrics
849 .index_peek_seek_fulfillment_seconds,
850 frontier_check_seconds: &self
851 .compute_state
852 .metrics
853 .index_peek_frontier_check_seconds,
854 error_scan_seconds: &self.compute_state.metrics.index_peek_error_scan_seconds,
855 cursor_setup_seconds: &self
856 .compute_state
857 .metrics
858 .index_peek_cursor_setup_seconds,
859 row_iteration_seconds: &self
860 .compute_state
861 .metrics
862 .index_peek_row_iteration_seconds,
863 result_sort_seconds: &self.compute_state.metrics.index_peek_result_sort_seconds,
864 row_collection_seconds: &self
865 .compute_state
866 .metrics
867 .index_peek_row_collection_seconds,
868 };
869
870 let status = peek.seek_fulfillment(
871 upper,
872 self.compute_state.max_result_size,
873 peek_stash_enabled && peek_stash_eligible,
874 peek_stash_threshold_bytes,
875 &metrics,
876 );
877
878 self.compute_state
879 .metrics
880 .index_peek_total_seconds
881 .observe(start.elapsed().as_secs_f64());
882
883 match status {
884 PeekStatus::Ready(result) => Some(result),
885 PeekStatus::NotReady => None,
886 PeekStatus::UsePeekStash => {
887 let _span =
888 span!(parent: &peek.span, Level::DEBUG, "process_stash_peek").entered();
889
890 let peek_stash_batch_max_runs = PEEK_RESPONSE_STASH_BATCH_MAX_RUNS
891 .get(&self.compute_state.worker_config);
892
893 let stash_task = peek_stash::StashingPeek::start_upload(
894 Arc::clone(&self.compute_state.persist_clients),
895 self.compute_state
896 .peek_stash_persist_location
897 .as_ref()
898 .expect("verified above"),
899 peek.peek.clone(),
900 peek.trace_bundle.clone(),
901 peek_stash_batch_max_runs,
902 );
903
904 self.compute_state
905 .pending_peeks
906 .insert(peek.peek.uuid, PendingPeek::Stash(stash_task));
907 return;
908 }
909 }
910 }
911 PendingPeek::Persist(peek) => peek.result.try_recv().ok().map(|(result, duration)| {
912 self.compute_state
913 .metrics
914 .persist_peek_seconds
915 .observe(duration.as_secs_f64());
916 result
917 }),
918 PendingPeek::Stash(stashing_peek) => {
919 let num_batches = PEEK_STASH_NUM_BATCHES.get(&self.compute_state.worker_config);
920 let batch_size = PEEK_STASH_BATCH_SIZE.get(&self.compute_state.worker_config);
921 stashing_peek.pump_rows(num_batches, batch_size);
922
923 if let Ok((response, duration)) = stashing_peek.result.try_recv() {
924 self.compute_state
925 .metrics
926 .stashed_peek_seconds
927 .observe(duration.as_secs_f64());
928 tracing::trace!(?stashing_peek.peek, ?duration, "finished stashing peek response in persist");
929
930 Some(response)
931 } else {
932 None
933 }
934 }
935 };
936
937 if let Some(response) = response {
938 let _span = span!(parent: peek.span(), Level::DEBUG, "process_peek_response").entered();
939 self.send_peek_response(peek, response)
940 } else {
941 let uuid = peek.peek().uuid;
942 self.compute_state.pending_peeks.insert(uuid, peek);
943 }
944 }
945
946 pub fn process_peeks(&mut self) {
948 let mut upper = Antichain::new();
949 let pending_peeks = std::mem::take(&mut self.compute_state.pending_peeks);
950 for (_uuid, peek) in pending_peeks {
951 self.process_peek(&mut upper, peek);
952 }
953 }
954
955 #[mz_ore::instrument(level = "debug")]
960 fn send_peek_response(&mut self, peek: PendingPeek, response: PeekResponse) {
961 let log_event = peek.as_log_event(false);
962 self.send_compute_response(ComputeResponse::PeekResponse(
964 peek.peek().uuid,
965 response,
966 OpenTelemetryContext::obtain(),
967 ));
968
969 if let Some(logger) = self.compute_state.compute_logger.as_mut() {
971 logger.log(&log_event);
972 }
973 }
974
975 pub fn process_subscribes(&mut self) {
977 let mut subscribe_responses = self.compute_state.subscribe_response_buffer.borrow_mut();
978 for (sink_id, mut response) in subscribe_responses.drain(..) {
979 if let Some(collection) = self.compute_state.collections.get_mut(&sink_id) {
981 let new_frontier = match &response {
982 SubscribeResponse::Batch(b) => b.upper.clone(),
983 SubscribeResponse::DroppedAt(_) => Antichain::new(),
984 };
985
986 let reported = collection.reported_frontiers();
987 assert!(
988 reported.write_frontier.allows_reporting(&new_frontier),
989 "subscribe write frontier regression: {:?} -> {:?}",
990 reported.write_frontier,
991 new_frontier,
992 );
993 assert!(
994 reported.input_frontier.allows_reporting(&new_frontier),
995 "subscribe input frontier regression: {:?} -> {:?}",
996 reported.input_frontier,
997 new_frontier,
998 );
999
1000 collection
1001 .set_reported_write_frontier(ReportedFrontier::Reported(new_frontier.clone()));
1002 collection
1003 .set_reported_input_frontier(ReportedFrontier::Reported(new_frontier.clone()));
1004 collection.set_reported_output_frontier(ReportedFrontier::Reported(new_frontier));
1005 } else {
1006 }
1009
1010 response
1011 .to_error_if_exceeds(usize::try_from(self.compute_state.max_result_size).unwrap());
1012 self.send_compute_response(ComputeResponse::SubscribeResponse(sink_id, response));
1013 }
1014 }
1015
1016 pub fn process_copy_tos(&self) {
1018 let mut responses = self.compute_state.copy_to_response_buffer.borrow_mut();
1019 for (sink_id, response) in responses.drain(..) {
1020 self.send_compute_response(ComputeResponse::CopyToResponse(sink_id, response));
1021 }
1022 }
1023
1024 fn send_compute_response(&self, response: ComputeResponse) {
1026 let _ = self.response_tx.send(response);
1029 }
1030
1031 pub(crate) fn check_expiration(&self) {
1033 let now = mz_ore::now::SYSTEM_TIME();
1034 if self.compute_state.replica_expiration.less_than(&now.into()) {
1035 let now_datetime = mz_ore::now::to_datetime(now);
1036 let expiration_datetime = self
1037 .compute_state
1038 .replica_expiration
1039 .as_option()
1040 .map(Into::into)
1041 .map(mz_ore::now::to_datetime);
1042
1043 error!(
1046 now,
1047 now_datetime = ?now_datetime,
1048 expiration = ?self.compute_state.replica_expiration.elements(),
1049 expiration_datetime = ?expiration_datetime,
1050 "replica expired"
1051 );
1052
1053 assert!(
1055 !self.compute_state.replica_expiration.less_than(&now.into()),
1056 "replica expired. now: {now} ({now_datetime:?}), expiration: {:?} ({expiration_datetime:?})",
1057 self.compute_state.replica_expiration.elements(),
1058 );
1059 }
1060 }
1061
1062 pub fn determine_dataflow_expiration(
1068 &self,
1069 time_dependence: &TimeDependence,
1070 until: &Antichain<mz_repr::Timestamp>,
1071 ) -> Antichain<mz_repr::Timestamp> {
1072 let iter = self
1077 .compute_state
1078 .replica_expiration
1079 .iter()
1080 .filter_map(|t| time_dependence.apply(*t))
1081 .filter_map(|t| mz_repr::Timestamp::try_step_forward(&t))
1082 .filter(|expiration| !until.less_equal(expiration));
1083 Antichain::from_iter(iter)
1084 }
1085}
1086
1087pub enum PendingPeek {
1092 Index(IndexPeek),
1094 Persist(PersistPeek),
1096 Stash(peek_stash::StashingPeek),
1099}
1100
1101impl PendingPeek {
1102 pub fn as_log_event(&self, installed: bool) -> ComputeEvent {
1104 let peek = self.peek();
1105 let (id, peek_type) = match &peek.target {
1106 PeekTarget::Index { id } => (*id, logging::compute::PeekType::Index),
1107 PeekTarget::Persist { id, .. } => (*id, logging::compute::PeekType::Persist),
1108 };
1109 let uuid = peek.uuid.into_bytes();
1110 ComputeEvent::Peek(PeekEvent {
1111 id,
1112 time: peek.timestamp,
1113 uuid,
1114 peek_type,
1115 installed,
1116 })
1117 }
1118
1119 fn index(peek: Peek, mut trace_bundle: TraceBundle) -> Self {
1120 let empty_frontier = Antichain::new();
1121 let timestamp_frontier = Antichain::from_elem(peek.timestamp);
1122 trace_bundle
1123 .oks_mut()
1124 .set_logical_compaction(timestamp_frontier.borrow());
1125 trace_bundle
1126 .errs_mut()
1127 .set_logical_compaction(timestamp_frontier.borrow());
1128 trace_bundle
1129 .oks_mut()
1130 .set_physical_compaction(empty_frontier.borrow());
1131 trace_bundle
1132 .errs_mut()
1133 .set_physical_compaction(empty_frontier.borrow());
1134
1135 PendingPeek::Index(IndexPeek {
1136 peek,
1137 trace_bundle,
1138 span: tracing::Span::current(),
1139 })
1140 }
1141
1142 fn persist<A: Allocate>(
1143 peek: Peek,
1144 persist_clients: Arc<PersistClientCache>,
1145 metadata: CollectionMetadata,
1146 max_result_size: usize,
1147 timely_worker: &TimelyWorker<A>,
1148 ) -> Self {
1149 let active_worker = {
1150 let chosen_index = usize::cast_from(peek.uuid.hashed()) % timely_worker.peers();
1152 chosen_index == timely_worker.index()
1153 };
1154 let activator = timely_worker.sync_activator_for([].into());
1155 let peek_uuid = peek.uuid;
1156
1157 let (result_tx, result_rx) = oneshot::channel();
1158 let timestamp = peek.timestamp;
1159 let mfp_plan = peek.map_filter_project.clone();
1160 let max_results_needed = peek
1161 .finishing
1162 .limit
1163 .map(|l| usize::cast_from(u64::from(l)))
1164 .unwrap_or(usize::MAX)
1165 + peek.finishing.offset;
1166 let order_by = peek.finishing.order_by.clone();
1167
1168 let literal_constraint = peek
1170 .literal_constraints
1171 .clone()
1172 .map(|rows| rows.into_element());
1173
1174 let task_handle = mz_ore::task::spawn(|| "persist::peek", async move {
1175 let start = Instant::now();
1176 let result = if active_worker {
1177 PersistPeek::do_peek(
1178 &persist_clients,
1179 metadata,
1180 timestamp,
1181 literal_constraint,
1182 mfp_plan,
1183 max_result_size,
1184 max_results_needed,
1185 )
1186 .await
1187 } else {
1188 Ok(vec![])
1189 };
1190 let result = match result {
1191 Ok(rows) => PeekResponse::Rows(RowCollection::new(rows, &order_by)),
1192 Err(e) => PeekResponse::Error(e.to_string()),
1193 };
1194 match result_tx.send((result, start.elapsed())) {
1195 Ok(()) => {}
1196 Err((_result, elapsed)) => {
1197 debug!(duration =? elapsed, "dropping result for cancelled peek {peek_uuid}")
1198 }
1199 }
1200 match activator.activate() {
1201 Ok(()) => {}
1202 Err(_) => {
1203 debug!("unable to wake timely after completed peek {peek_uuid}");
1204 }
1205 }
1206 });
1207 PendingPeek::Persist(PersistPeek {
1208 peek,
1209 _abort_handle: task_handle.abort_on_drop(),
1210 result: result_rx,
1211 span: tracing::Span::current(),
1212 })
1213 }
1214
1215 fn span(&self) -> &tracing::Span {
1216 match self {
1217 PendingPeek::Index(p) => &p.span,
1218 PendingPeek::Persist(p) => &p.span,
1219 PendingPeek::Stash(p) => &p.span,
1220 }
1221 }
1222
1223 pub(crate) fn peek(&self) -> &Peek {
1224 match self {
1225 PendingPeek::Index(p) => &p.peek,
1226 PendingPeek::Persist(p) => &p.peek,
1227 PendingPeek::Stash(p) => &p.peek,
1228 }
1229 }
1230}
1231
1232pub struct PersistPeek {
1237 pub(crate) peek: Peek,
1238 _abort_handle: AbortOnDropHandle<()>,
1241 result: oneshot::Receiver<(PeekResponse, Duration)>,
1243 span: tracing::Span,
1245}
1246
1247impl PersistPeek {
1248 async fn do_peek(
1249 persist_clients: &PersistClientCache,
1250 metadata: CollectionMetadata,
1251 as_of: Timestamp,
1252 literal_constraint: Option<Row>,
1253 mfp_plan: SafeMfpPlan,
1254 max_result_size: usize,
1255 mut limit_remaining: usize,
1256 ) -> Result<Vec<(Row, NonZeroUsize)>, String> {
1257 let client = persist_clients
1258 .open(metadata.persist_location)
1259 .await
1260 .map_err(|e| e.to_string())?;
1261
1262 let mut reader: ReadHandle<SourceData, (), Timestamp, StorageDiff> = client
1263 .open_leased_reader(
1264 metadata.data_shard,
1265 Arc::new(metadata.relation_desc.clone()),
1266 Arc::new(UnitSchema),
1267 Diagnostics::from_purpose("persist::peek"),
1268 USE_CRITICAL_SINCE_SNAPSHOT.get(client.dyncfgs()),
1269 )
1270 .await
1271 .map_err(|e| e.to_string())?;
1272
1273 let mut txns_read = if let Some(txns_id) = metadata.txns_shard {
1280 Some(TxnsCache::open(&client, txns_id, Some(metadata.data_shard)).await)
1281 } else {
1282 None
1283 };
1284
1285 let metrics = client.metrics();
1286
1287 let mut cursor = StatsCursor::new(
1288 &mut reader,
1289 txns_read.as_mut(),
1290 metrics,
1291 &mfp_plan,
1292 &metadata.relation_desc,
1293 Antichain::from_elem(as_of),
1294 )
1295 .await
1296 .map_err(|since| {
1297 format!("attempted to peek at {as_of}, but the since has advanced to {since:?}")
1298 })?;
1299
1300 let mut result = vec![];
1302 let mut datum_vec = DatumVec::new();
1303 let mut row_builder = Row::default();
1304 let arena = RowArena::new();
1305 let mut total_size = 0usize;
1306
1307 let literal_len = match &literal_constraint {
1308 None => 0,
1309 Some(row) => row.iter().count(),
1310 };
1311
1312 'collect: while limit_remaining > 0 {
1313 let Some(batch) = cursor.next().await else {
1314 break;
1315 };
1316 for (data, _, d) in batch {
1317 let row = data.map_err(|e| e.to_string())?;
1318
1319 if let Some(literal) = &literal_constraint {
1320 match row.iter().take(literal_len).cmp(literal.iter()) {
1321 Ordering::Less => continue,
1322 Ordering::Equal => {}
1323 Ordering::Greater => break 'collect,
1324 }
1325 }
1326
1327 let count: usize = d.try_into().map_err(|_| {
1328 tracing::error!(
1329 shard = %metadata.data_shard, diff = d, ?row,
1330 "persist peek encountered negative multiplicities",
1331 );
1332 format!(
1333 "Invalid data in source, \
1334 saw retractions ({}) for row that does not exist: {:?}",
1335 -d, row,
1336 )
1337 })?;
1338 let Some(count) = NonZeroUsize::new(count) else {
1339 continue;
1340 };
1341 let mut datum_local = datum_vec.borrow_with(&row);
1342 let eval_result = mfp_plan
1343 .evaluate_into(&mut datum_local, &arena, &mut row_builder)
1344 .map(|row| row.cloned())
1345 .map_err(|e| e.to_string())?;
1346 if let Some(row) = eval_result {
1347 total_size = total_size
1348 .saturating_add(row.byte_len())
1349 .saturating_add(std::mem::size_of::<NonZeroUsize>());
1350 if total_size > max_result_size {
1351 return Err(format!(
1352 "result exceeds max size of {}",
1353 ByteSize::b(u64::cast_from(max_result_size))
1354 ));
1355 }
1356 result.push((row, count));
1357 limit_remaining = limit_remaining.saturating_sub(count.get());
1358 if limit_remaining == 0 {
1359 break;
1360 }
1361 }
1362 }
1363 }
1364
1365 Ok(result)
1366 }
1367}
1368
1369pub struct IndexPeek {
1371 peek: Peek,
1372 trace_bundle: TraceBundle,
1374 span: tracing::Span,
1376}
1377
1378pub(crate) struct IndexPeekMetrics<'a> {
1383 pub seek_fulfillment_seconds: &'a prometheus::Histogram,
1384 pub frontier_check_seconds: &'a prometheus::Histogram,
1385 pub error_scan_seconds: &'a prometheus::Histogram,
1386 pub cursor_setup_seconds: &'a prometheus::Histogram,
1387 pub row_iteration_seconds: &'a prometheus::Histogram,
1388 pub result_sort_seconds: &'a prometheus::Histogram,
1389 pub row_collection_seconds: &'a prometheus::Histogram,
1390}
1391
1392impl IndexPeek {
1393 fn seek_fulfillment(
1406 &mut self,
1407 upper: &mut Antichain<Timestamp>,
1408 max_result_size: u64,
1409 peek_stash_eligible: bool,
1410 peek_stash_threshold_bytes: usize,
1411 metrics: &IndexPeekMetrics<'_>,
1412 ) -> PeekStatus {
1413 let method_start = Instant::now();
1414
1415 self.trace_bundle.oks_mut().read_upper(upper);
1416 if upper.less_equal(&self.peek.timestamp) {
1417 return PeekStatus::NotReady;
1418 }
1419 self.trace_bundle.errs_mut().read_upper(upper);
1420 if upper.less_equal(&self.peek.timestamp) {
1421 return PeekStatus::NotReady;
1422 }
1423
1424 let read_frontier = self.trace_bundle.compaction_frontier();
1425 if !read_frontier.less_equal(&self.peek.timestamp) {
1426 let error = format!(
1427 "Arrangement compaction frontier ({:?}) is beyond the time of the attempted read ({})",
1428 read_frontier.elements(),
1429 self.peek.timestamp,
1430 );
1431 return PeekStatus::Ready(PeekResponse::Error(error));
1432 }
1433
1434 metrics
1435 .frontier_check_seconds
1436 .observe(method_start.elapsed().as_secs_f64());
1437
1438 let result = self.collect_finished_data(
1439 max_result_size,
1440 peek_stash_eligible,
1441 peek_stash_threshold_bytes,
1442 metrics,
1443 );
1444
1445 metrics
1446 .seek_fulfillment_seconds
1447 .observe(method_start.elapsed().as_secs_f64());
1448
1449 result
1450 }
1451
1452 fn collect_finished_data(
1454 &mut self,
1455 max_result_size: u64,
1456 peek_stash_eligible: bool,
1457 peek_stash_threshold_bytes: usize,
1458 metrics: &IndexPeekMetrics<'_>,
1459 ) -> PeekStatus {
1460 let error_scan_start = Instant::now();
1461
1462 let (mut cursor, storage) = self.trace_bundle.errs_mut().cursor();
1465 while cursor.key_valid(&storage) {
1466 let mut copies = Diff::ZERO;
1467 cursor.map_times(&storage, |time, diff| {
1468 if time.less_equal(&self.peek.timestamp) {
1469 copies += diff;
1470 }
1471 });
1472 if copies.is_negative() {
1473 let error = cursor.key(&storage);
1474 tracing::error!(
1475 target = %self.peek.target.id(), diff = %copies, %error,
1476 "index peek encountered negative multiplicities in error trace",
1477 );
1478 return PeekStatus::Ready(PeekResponse::Error(format!(
1479 "Invalid data in source errors, \
1480 saw retractions ({}) for row that does not exist: {}",
1481 -copies, error,
1482 )));
1483 }
1484 if copies.is_positive() {
1485 return PeekStatus::Ready(PeekResponse::Error(cursor.key(&storage).to_string()));
1486 }
1487 cursor.step_key(&storage);
1488 }
1489
1490 metrics
1491 .error_scan_seconds
1492 .observe(error_scan_start.elapsed().as_secs_f64());
1493
1494 Self::collect_ok_finished_data(
1495 &self.peek,
1496 self.trace_bundle.oks_mut(),
1497 max_result_size,
1498 peek_stash_eligible,
1499 peek_stash_threshold_bytes,
1500 metrics,
1501 )
1502 }
1503
1504 fn collect_ok_finished_data<Tr>(
1506 peek: &Peek<Timestamp>,
1507 oks_handle: &mut Tr,
1508 max_result_size: u64,
1509 peek_stash_eligible: bool,
1510 peek_stash_threshold_bytes: usize,
1511 metrics: &IndexPeekMetrics<'_>,
1512 ) -> PeekStatus
1513 where
1514 for<'a> Tr: TraceReader<
1515 Key<'a>: ToDatumIter + Eq,
1516 KeyOwn = Row,
1517 Val<'a>: ToDatumIter,
1518 TimeGat<'a>: PartialOrder<mz_repr::Timestamp>,
1519 DiffGat<'a> = &'a Diff,
1520 >,
1521 {
1522 let max_result_size = usize::cast_from(max_result_size);
1523 let count_byte_size = size_of::<NonZeroUsize>();
1524
1525 let cursor_setup_start = Instant::now();
1527
1528 let mut peek_iterator = peek_result_iterator::PeekResultIterator::new(
1531 peek.target.id().clone(),
1532 peek.map_filter_project.clone(),
1533 peek.timestamp,
1534 peek.literal_constraints.clone().as_deref_mut(),
1535 oks_handle,
1536 );
1537
1538 metrics
1539 .cursor_setup_seconds
1540 .observe(cursor_setup_start.elapsed().as_secs_f64());
1541
1542 let mut results = Vec::new();
1544 let mut total_size: usize = 0;
1545
1546 let max_results = peek.finishing.num_rows_needed();
1552
1553 let mut l_datum_vec = DatumVec::new();
1554 let mut r_datum_vec = DatumVec::new();
1555
1556 let row_iteration_start = Instant::now();
1558 let mut sort_time_accum = Duration::ZERO;
1559
1560 while let Some(row) = peek_iterator.next() {
1561 let row = match row {
1562 Ok(row) => row,
1563 Err(err) => return PeekStatus::Ready(PeekResponse::Error(err)),
1564 };
1565 let (row, copies) = row;
1566 let copies: NonZeroUsize = NonZeroUsize::try_from(copies).expect("fits into usize");
1567
1568 total_size = total_size
1569 .saturating_add(row.byte_len())
1570 .saturating_add(count_byte_size);
1571 if peek_stash_eligible && total_size > peek_stash_threshold_bytes {
1572 return PeekStatus::UsePeekStash;
1573 }
1574 if total_size > max_result_size {
1575 return PeekStatus::Ready(PeekResponse::Error(format!(
1576 "result exceeds max size of {}",
1577 ByteSize::b(u64::cast_from(max_result_size))
1578 )));
1579 }
1580
1581 results.push((row, copies));
1582
1583 if let Some(max_results) = max_results {
1586 if results.len() >= 2 * max_results {
1590 if peek.finishing.order_by.is_empty() {
1591 results.truncate(max_results);
1592 metrics
1593 .row_iteration_seconds
1594 .observe(row_iteration_start.elapsed().as_secs_f64());
1595 metrics
1596 .result_sort_seconds
1597 .observe(sort_time_accum.as_secs_f64());
1598 let row_collection_start = Instant::now();
1599 let collection = RowCollection::new(results, &peek.finishing.order_by);
1600 metrics
1601 .row_collection_seconds
1602 .observe(row_collection_start.elapsed().as_secs_f64());
1603 return PeekStatus::Ready(PeekResponse::Rows(collection));
1604 } else {
1605 let sort_start = Instant::now();
1614 results.sort_by(|left, right| {
1615 let left_datums = l_datum_vec.borrow_with(&left.0);
1616 let right_datums = r_datum_vec.borrow_with(&right.0);
1617 mz_expr::compare_columns(
1618 &peek.finishing.order_by,
1619 &left_datums,
1620 &right_datums,
1621 || left.0.cmp(&right.0),
1622 )
1623 });
1624 sort_time_accum += sort_start.elapsed();
1625 let dropped = results.drain(max_results..);
1626 let dropped_size =
1627 dropped.into_iter().fold(0, |acc: usize, (row, _count)| {
1628 acc.saturating_add(row.byte_len().saturating_add(count_byte_size))
1629 });
1630 total_size = total_size.saturating_sub(dropped_size);
1631 }
1632 }
1633 }
1634 }
1635
1636 metrics
1637 .row_iteration_seconds
1638 .observe(row_iteration_start.elapsed().as_secs_f64());
1639 metrics
1640 .result_sort_seconds
1641 .observe(sort_time_accum.as_secs_f64());
1642
1643 let row_collection_start = Instant::now();
1644 let collection = RowCollection::new(results, &peek.finishing.order_by);
1645 metrics
1646 .row_collection_seconds
1647 .observe(row_collection_start.elapsed().as_secs_f64());
1648 PeekStatus::Ready(PeekResponse::Rows(collection))
1649 }
1650}
1651
1652enum PeekStatus {
1655 NotReady,
1658 UsePeekStash,
1661 Ready(PeekResponse),
1663}
1664
1665#[derive(Debug)]
1667struct ReportedFrontiers {
1668 write_frontier: ReportedFrontier,
1670 input_frontier: ReportedFrontier,
1672 output_frontier: ReportedFrontier,
1674}
1675
1676impl ReportedFrontiers {
1677 fn new() -> Self {
1679 Self {
1680 write_frontier: ReportedFrontier::new(),
1681 input_frontier: ReportedFrontier::new(),
1682 output_frontier: ReportedFrontier::new(),
1683 }
1684 }
1685}
1686
1687#[derive(Clone, Debug)]
1689pub enum ReportedFrontier {
1690 Reported(Antichain<Timestamp>),
1692 NotReported {
1694 lower: Antichain<Timestamp>,
1696 },
1697}
1698
1699impl ReportedFrontier {
1700 pub fn new() -> Self {
1702 let lower = Antichain::from_elem(timely::progress::Timestamp::minimum());
1703 Self::NotReported { lower }
1704 }
1705
1706 pub fn is_empty(&self) -> bool {
1708 match self {
1709 Self::Reported(frontier) => frontier.is_empty(),
1710 Self::NotReported { .. } => false,
1711 }
1712 }
1713
1714 fn allows_reporting(&self, other: &Antichain<Timestamp>) -> bool {
1720 match self {
1721 Self::Reported(frontier) => PartialOrder::less_than(frontier, other),
1722 Self::NotReported { lower } => PartialOrder::less_equal(lower, other),
1723 }
1724 }
1725}
1726
1727pub struct CollectionState {
1729 reported_frontiers: ReportedFrontiers,
1731 dataflow_index: Rc<usize>,
1737 pub is_subscribe_or_copy: bool,
1743 as_of: Antichain<Timestamp>,
1747
1748 pub sink_token: Option<SinkToken>,
1753 pub sink_write_frontier: Option<Rc<RefCell<Antichain<Timestamp>>>>,
1757 pub input_probes: BTreeMap<GlobalId, probe::Handle<Timestamp>>,
1759 pub compute_probe: Option<probe::Handle<Timestamp>>,
1764 logging: Option<CollectionLogging>,
1766 metrics: CollectionMetrics,
1768 read_only_tx: watch::Sender<bool>,
1780 pub read_only_rx: watch::Receiver<bool>,
1782}
1783
1784impl CollectionState {
1785 fn new(
1786 dataflow_index: Rc<usize>,
1787 is_subscribe_or_copy: bool,
1788 as_of: Antichain<Timestamp>,
1789 metrics: CollectionMetrics,
1790 ) -> Self {
1791 let (read_only_tx, read_only_rx) = watch::channel(true);
1794
1795 Self {
1796 reported_frontiers: ReportedFrontiers::new(),
1797 dataflow_index,
1798 is_subscribe_or_copy,
1799 as_of,
1800 sink_token: None,
1801 sink_write_frontier: None,
1802 input_probes: Default::default(),
1803 compute_probe: None,
1804 logging: None,
1805 metrics,
1806 read_only_tx,
1807 read_only_rx,
1808 }
1809 }
1810
1811 fn reported_frontiers(&self) -> &ReportedFrontiers {
1813 &self.reported_frontiers
1814 }
1815
1816 pub fn reset_reported_frontiers(&mut self, frontier: ReportedFrontier) {
1818 self.reported_frontiers.write_frontier = frontier.clone();
1819 self.reported_frontiers.input_frontier = frontier.clone();
1820 self.reported_frontiers.output_frontier = frontier;
1821 }
1822
1823 fn set_reported_write_frontier(&mut self, frontier: ReportedFrontier) {
1825 if let Some(logging) = &mut self.logging {
1826 let time = match &frontier {
1827 ReportedFrontier::Reported(frontier) => frontier.get(0).copied(),
1828 ReportedFrontier::NotReported { .. } => Some(Timestamp::MIN),
1829 };
1830 logging.set_frontier(time);
1831 }
1832
1833 self.reported_frontiers.write_frontier = frontier;
1834 }
1835
1836 fn set_reported_input_frontier(&mut self, frontier: ReportedFrontier) {
1838 if let Some(logging) = &mut self.logging {
1840 for (id, probe) in &self.input_probes {
1841 let new_time = probe.with_frontier(|frontier| frontier.as_option().copied());
1842 logging.set_import_frontier(*id, new_time);
1843 }
1844 }
1845
1846 self.reported_frontiers.input_frontier = frontier;
1847 }
1848
1849 fn set_reported_output_frontier(&mut self, frontier: ReportedFrontier) {
1851 let already_hydrated = self.hydrated();
1852
1853 self.reported_frontiers.output_frontier = frontier;
1854
1855 if !already_hydrated && self.hydrated() {
1856 if let Some(logging) = &mut self.logging {
1857 logging.set_hydrated();
1858 }
1859 self.metrics.record_collection_hydrated();
1860 }
1861 }
1862
1863 fn hydrated(&self) -> bool {
1865 match &self.reported_frontiers.output_frontier {
1866 ReportedFrontier::Reported(frontier) => PartialOrder::less_than(&self.as_of, frontier),
1867 ReportedFrontier::NotReported { .. } => false,
1868 }
1869 }
1870
1871 fn allow_writes(&self) {
1873 info!(
1874 dataflow_index = *self.dataflow_index,
1875 export = ?self.logging.as_ref().map(|l| l.export_id()),
1876 "allowing writes for dataflow",
1877 );
1878 let _ = self.read_only_tx.send(false);
1879 }
1880}