1use std::any::Any;
9use std::cell::RefCell;
10use std::cmp::Ordering;
11use std::collections::{BTreeMap, BTreeSet};
12use std::num::NonZeroUsize;
13use std::rc::Rc;
14use std::sync::Arc;
15use std::time::{Duration, Instant};
16
17use bytesize::ByteSize;
18use differential_dataflow::Hashable;
19use differential_dataflow::lattice::Lattice;
20use differential_dataflow::trace::implementations::BatchContainer;
21use differential_dataflow::trace::{Cursor, TraceReader};
22use mz_compute_client::logging::LoggingConfig;
23use mz_compute_client::protocol::command::{
24 ComputeCommand, ComputeParameters, InstanceConfig, Peek, PeekTarget,
25};
26use mz_compute_client::protocol::history::ComputeCommandHistory;
27use mz_compute_client::protocol::response::{
28 ComputeResponse, CopyToResponse, FrontiersResponse, PeekResponse, SubscribeResponse,
29};
30use mz_compute_types::dataflows::DataflowDescription;
31use mz_compute_types::dyncfgs::{
32 ENABLE_PEEK_RESPONSE_STASH, PEEK_RESPONSE_STASH_BATCH_MAX_RUNS,
33 PEEK_RESPONSE_STASH_THRESHOLD_BYTES, PEEK_STASH_BATCH_SIZE, PEEK_STASH_NUM_BATCHES,
34};
35use mz_compute_types::plan::render_plan::RenderPlan;
36use mz_dyncfg::ConfigSet;
37use mz_expr::row::RowCollection;
38use mz_expr::{RowComparator, SafeMfpPlan};
39use mz_ore::cast::CastFrom;
40use mz_ore::collections::CollectionExt;
41use mz_ore::metrics::{MetricsRegistry, UIntGauge};
42use mz_ore::now::EpochMillis;
43use mz_ore::soft_panic_or_log;
44use mz_ore::task::AbortOnDropHandle;
45use mz_ore::tracing::{OpenTelemetryContext, TracingHandle};
46use mz_persist_client::Diagnostics;
47use mz_persist_client::cache::PersistClientCache;
48use mz_persist_client::cfg::USE_CRITICAL_SINCE_SNAPSHOT;
49use mz_persist_client::read::ReadHandle;
50use mz_persist_types::PersistLocation;
51use mz_persist_types::codec_impls::UnitSchema;
52use mz_repr::fixed_length::ToDatumIter;
53use mz_repr::{DatumVec, Diff, GlobalId, Row, RowArena, Timestamp};
54use mz_storage_operators::stats::StatsCursor;
55use mz_storage_types::StorageDiff;
56use mz_storage_types::controller::CollectionMetadata;
57use mz_storage_types::dyncfgs::ORE_OVERFLOWING_BEHAVIOR;
58use mz_storage_types::sources::SourceData;
59use mz_storage_types::time_dependence::TimeDependence;
60use mz_txn_wal::operator::TxnsContext;
61use mz_txn_wal::txn_cache::TxnsCache;
62use timely::dataflow::operators::probe;
63use timely::order::PartialOrder;
64use timely::progress::frontier::Antichain;
65use timely::worker::Worker as TimelyWorker;
66use tokio::sync::{oneshot, watch};
67use tracing::{Level, debug, error, info, span, trace, warn};
68use uuid::Uuid;
69
70use crate::arrangement::manager::{TraceBundle, TraceManager};
71use crate::logging;
72use crate::logging::compute::{CollectionLogging, ComputeEvent, PeekEvent};
73use crate::logging::initialize::LoggingTraces;
74use crate::metrics::{CollectionMetrics, WorkerMetrics};
75use crate::render::{LinearJoinSpec, StartSignal};
76use crate::server::{ComputeInstanceContext, ResponseSender};
77
78mod peek_result_iterator;
79mod peek_stash;
80
81pub struct ComputeState {
86 pub collections: BTreeMap<GlobalId, CollectionState>,
95 pub traces: TraceManager,
97 pub subscribe_response_buffer: Rc<RefCell<Vec<(GlobalId, SubscribeResponse)>>>,
102 pub copy_to_response_buffer: Rc<RefCell<Vec<(GlobalId, CopyToResponse)>>>,
107 pub pending_peeks: BTreeMap<Uuid, PendingPeek>,
109 pub peek_stash_persist_location: Option<PersistLocation>,
111 pub compute_logger: Option<logging::compute::Logger>,
113 pub persist_clients: Arc<PersistClientCache>,
116 pub txns_ctx: TxnsContext,
118 pub command_history: ComputeCommandHistory<UIntGauge>,
120 max_result_size: u64,
122 pub linear_join_spec: LinearJoinSpec,
124 pub metrics: WorkerMetrics,
126 tracing_handle: Arc<TracingHandle>,
128 pub context: ComputeInstanceContext,
130 pub worker_config: Rc<ConfigSet>,
144
145 pub metrics_registry: MetricsRegistry,
147
148 pub workers_per_process: usize,
150
151 suspended_collections: BTreeMap<GlobalId, Rc<dyn Any>>,
157
158 pub server_maintenance_interval: Duration,
161
162 pub init_system_time: EpochMillis,
166
167 pub replica_expiration: Antichain<Timestamp>,
171}
172
173impl ComputeState {
174 pub fn new(
176 persist_clients: Arc<PersistClientCache>,
177 txns_ctx: TxnsContext,
178 metrics: WorkerMetrics,
179 tracing_handle: Arc<TracingHandle>,
180 context: ComputeInstanceContext,
181 metrics_registry: MetricsRegistry,
182 workers_per_process: usize,
183 ) -> Self {
184 let traces = TraceManager::new(metrics.clone());
185 let command_history = ComputeCommandHistory::new(metrics.for_history());
186
187 Self {
188 collections: Default::default(),
189 traces,
190 subscribe_response_buffer: Default::default(),
191 copy_to_response_buffer: Default::default(),
192 pending_peeks: Default::default(),
193 peek_stash_persist_location: None,
194 compute_logger: None,
195 persist_clients,
196 txns_ctx,
197 command_history,
198 max_result_size: u64::MAX,
199 linear_join_spec: Default::default(),
200 metrics,
201 tracing_handle,
202 context,
203 worker_config: mz_dyncfgs::all_dyncfgs().into(),
204 metrics_registry,
205 workers_per_process,
206 suspended_collections: Default::default(),
207 server_maintenance_interval: Duration::ZERO,
208 init_system_time: mz_ore::now::SYSTEM_TIME(),
209 replica_expiration: Antichain::default(),
210 }
211 }
212
213 pub fn expect_collection_mut(&mut self, id: GlobalId) -> &mut CollectionState {
217 self.collections
218 .get_mut(&id)
219 .expect("collection must exist")
220 }
221
222 pub fn input_probe_for(
228 &mut self,
229 input_id: GlobalId,
230 collection_ids: impl Iterator<Item = GlobalId>,
231 ) -> probe::Handle<Timestamp> {
232 let probe = probe::Handle::default();
233 for id in collection_ids {
234 if let Some(collection) = self.collections.get_mut(&id) {
235 collection.input_probes.insert(input_id, probe.clone());
236 }
237 }
238 probe
239 }
240
241 fn apply_worker_config(&mut self) {
243 use mz_compute_types::dyncfgs::*;
244
245 let config = &self.worker_config;
246
247 self.linear_join_spec = LinearJoinSpec::from_config(config);
248
249 if ENABLE_LGALLOC.get(config) {
250 if let Some(path) = &self.context.scratch_directory {
251 let clear_bytes = LGALLOC_SLOW_CLEAR_BYTES.get(config);
252 let eager_return = ENABLE_LGALLOC_EAGER_RECLAMATION.get(config);
253 let file_growth_dampener = LGALLOC_FILE_GROWTH_DAMPENER.get(config);
254 let interval = LGALLOC_BACKGROUND_INTERVAL.get(config);
255 let local_buffer_bytes = LGALLOC_LOCAL_BUFFER_BYTES.get(config);
256 info!(
257 ?path,
258 backgrund_interval=?interval,
259 clear_bytes,
260 eager_return,
261 file_growth_dampener,
262 local_buffer_bytes,
263 "enabling lgalloc"
264 );
265 let background_worker_config = lgalloc::BackgroundWorkerConfig {
266 interval,
267 clear_bytes,
268 };
269 lgalloc::lgalloc_set_config(
270 lgalloc::LgAlloc::new()
271 .enable()
272 .with_path(path.clone())
273 .with_background_config(background_worker_config)
274 .eager_return(eager_return)
275 .file_growth_dampener(file_growth_dampener)
276 .local_buffer_bytes(local_buffer_bytes),
277 );
278 } else {
279 debug!("not enabling lgalloc, scratch directory not specified");
280 }
281 } else {
282 info!("disabling lgalloc");
283 lgalloc::lgalloc_set_config(lgalloc::LgAlloc::new().disable());
284 }
285
286 crate::memory_limiter::apply_limiter_config(config);
287
288 mz_ore::region::ENABLE_LGALLOC_REGION.store(
289 ENABLE_COLUMNATION_LGALLOC.get(config),
290 std::sync::atomic::Ordering::Relaxed,
291 );
292
293 let enable_columnar_lgalloc = ENABLE_COLUMNAR_LGALLOC.get(config);
294 mz_timely_util::containers::set_enable_columnar_lgalloc(enable_columnar_lgalloc);
295
296 self.server_maintenance_interval = COMPUTE_SERVER_MAINTENANCE_INTERVAL.get(config);
299
300 let overflowing_behavior = ORE_OVERFLOWING_BEHAVIOR.get(config);
301 match overflowing_behavior.parse() {
302 Ok(behavior) => mz_ore::overflowing::set_behavior(behavior),
303 Err(err) => {
304 error!(
305 err,
306 overflowing_behavior, "Invalid value for ore_overflowing_behavior"
307 );
308 }
309 }
310 }
311
312 pub fn apply_expiration_offset(&mut self, offset: Duration) {
318 if self.replica_expiration.is_empty() {
319 let offset: EpochMillis = offset
320 .as_millis()
321 .try_into()
322 .expect("duration must fit within u64");
323 let replica_expiration_millis = self.init_system_time + offset;
324 let replica_expiration = Timestamp::from(replica_expiration_millis);
325
326 info!(
327 offset = %offset,
328 replica_expiration_millis = %replica_expiration_millis,
329 replica_expiration_utc = %mz_ore::now::to_datetime(replica_expiration_millis),
330 "setting replica expiration",
331 );
332 self.replica_expiration = Antichain::from_elem(replica_expiration);
333
334 self.metrics
336 .replica_expiration_timestamp_seconds
337 .set(replica_expiration.into());
338 }
339 }
340
341 pub fn dataflow_max_inflight_bytes(&self) -> Option<usize> {
344 use mz_compute_types::dyncfgs::{
345 DATAFLOW_MAX_INFLIGHT_BYTES, DATAFLOW_MAX_INFLIGHT_BYTES_CC,
346 };
347
348 if self.persist_clients.cfg.is_cc_active {
349 DATAFLOW_MAX_INFLIGHT_BYTES_CC.get(&self.worker_config)
350 } else {
351 DATAFLOW_MAX_INFLIGHT_BYTES.get(&self.worker_config)
352 }
353 }
354}
355
356pub(crate) struct ActiveComputeState<'a> {
358 pub timely_worker: &'a mut TimelyWorker,
360 pub compute_state: &'a mut ComputeState,
362 pub response_tx: &'a mut ResponseSender,
364}
365
366pub struct SinkToken(#[allow(dead_code)] Box<dyn Any>);
368
369impl SinkToken {
370 pub fn new(t: Box<dyn Any>) -> Self {
372 Self(t)
373 }
374}
375
376impl<'a> ActiveComputeState<'a> {
377 #[mz_ore::instrument(level = "debug")]
379 pub fn handle_compute_command(&mut self, cmd: ComputeCommand) {
380 use ComputeCommand::*;
381
382 self.compute_state.command_history.push(cmd.clone());
383
384 let timer = self
386 .compute_state
387 .metrics
388 .handle_command_duration_seconds
389 .for_command(&cmd)
390 .start_timer();
391
392 match cmd {
393 Hello { .. } => panic!("Hello must be captured before"),
394 CreateInstance(instance_config) => self.handle_create_instance(*instance_config),
395 InitializationComplete => (),
396 UpdateConfiguration(params) => self.handle_update_configuration(*params),
397 CreateDataflow(dataflow) => self.handle_create_dataflow(*dataflow),
398 Schedule(id) => self.handle_schedule(id),
399 AllowCompaction { id, frontier } => self.handle_allow_compaction(id, frontier),
400 Peek(peek) => {
401 peek.otel_ctx.attach_as_parent();
402 self.handle_peek(*peek)
403 }
404 CancelPeek { uuid } => self.handle_cancel_peek(uuid),
405 AllowWrites(id) => {
406 self.handle_allow_writes(id);
407 }
408 }
409
410 timer.observe_duration();
411 }
412
413 fn handle_create_instance(&mut self, config: InstanceConfig) {
414 self.compute_state.apply_worker_config();
416 if let Some(offset) = config.expiration_offset {
417 self.compute_state.apply_expiration_offset(offset);
418 }
419
420 self.initialize_logging(config.logging);
421
422 self.compute_state.peek_stash_persist_location = Some(config.peek_stash_persist_location);
423 }
424
425 fn handle_update_configuration(&mut self, params: ComputeParameters) {
426 debug!("Applying configuration update: {params:?}");
427
428 let ComputeParameters {
429 workload_class,
430 max_result_size,
431 tracing,
432 grpc_client: _grpc_client,
433 dyncfg_updates,
434 } = params;
435
436 if let Some(v) = workload_class {
437 self.compute_state.metrics.set_workload_class(v);
438 }
439 if let Some(v) = max_result_size {
440 self.compute_state.max_result_size = v;
441 }
442
443 tracing.apply(self.compute_state.tracing_handle.as_ref());
444
445 dyncfg_updates.apply(&self.compute_state.worker_config);
446 self.compute_state
447 .persist_clients
448 .cfg()
449 .apply_from(&dyncfg_updates);
450
451 mz_metrics::update_dyncfg(&dyncfg_updates);
455
456 self.compute_state.apply_worker_config();
457 }
458
459 fn handle_create_dataflow(
460 &mut self,
461 dataflow: DataflowDescription<RenderPlan, CollectionMetadata>,
462 ) {
463 let dataflow_index = Rc::new(self.timely_worker.next_dataflow_index());
464 let as_of = dataflow.as_of.clone().unwrap();
465
466 let dataflow_expiration = dataflow
467 .time_dependence
468 .as_ref()
469 .map(|time_dependence| {
470 self.determine_dataflow_expiration(time_dependence, &dataflow.until)
471 })
472 .unwrap_or_default();
473
474 let until = dataflow.until.meet(&dataflow_expiration);
476
477 if dataflow.is_transient() {
478 debug!(
479 name = %dataflow.debug_name,
480 import_ids = %dataflow.display_import_ids(),
481 export_ids = %dataflow.display_export_ids(),
482 as_of = ?as_of.elements(),
483 time_dependence = ?dataflow.time_dependence,
484 expiration = ?dataflow_expiration.elements(),
485 expiration_datetime = ?dataflow_expiration
486 .as_option()
487 .map(|t| mz_ore::now::to_datetime(t.into())),
488 plan_until = ?dataflow.until.elements(),
489 until = ?until.elements(),
490 "creating dataflow",
491 );
492 } else {
493 info!(
494 name = %dataflow.debug_name,
495 import_ids = %dataflow.display_import_ids(),
496 export_ids = %dataflow.display_export_ids(),
497 as_of = ?as_of.elements(),
498 time_dependence = ?dataflow.time_dependence,
499 expiration = ?dataflow_expiration.elements(),
500 expiration_datetime = ?dataflow_expiration
501 .as_option()
502 .map(|t| mz_ore::now::to_datetime(t.into())),
503 plan_until = ?dataflow.until.elements(),
504 until = ?until.elements(),
505 "creating dataflow",
506 );
507 };
508
509 let subscribe_copy_ids: BTreeSet<_> = dataflow
510 .subscribe_ids()
511 .chain(dataflow.copy_to_ids())
512 .collect();
513
514 for object_id in dataflow.export_ids() {
516 let is_subscribe_or_copy = subscribe_copy_ids.contains(&object_id);
517 let metrics = self.compute_state.metrics.for_collection(object_id);
518 let mut collection = CollectionState::new(
519 Rc::clone(&dataflow_index),
520 is_subscribe_or_copy,
521 as_of.clone(),
522 metrics,
523 );
524
525 if let Some(logger) = self.compute_state.compute_logger.clone() {
526 let logging = CollectionLogging::new(
527 object_id,
528 logger,
529 *dataflow_index,
530 dataflow.import_ids(),
531 );
532 collection.logging = Some(logging);
533 }
534
535 collection.reset_reported_frontiers(ReportedFrontier::NotReported {
536 lower: as_of.clone(),
537 });
538
539 let existing = self.compute_state.collections.insert(object_id, collection);
540 if existing.is_some() {
541 error!(
542 id = ?object_id,
543 "existing collection for newly created dataflow",
544 );
545 }
546 }
547
548 let (start_signal, suspension_token) = StartSignal::new();
549 for id in dataflow.export_ids() {
550 self.compute_state
551 .suspended_collections
552 .insert(id, Rc::clone(&suspension_token));
553 }
554
555 crate::render::build_compute_dataflow(
556 self.timely_worker,
557 self.compute_state,
558 dataflow,
559 start_signal,
560 until,
561 dataflow_expiration,
562 );
563 }
564
565 fn handle_schedule(&mut self, id: GlobalId) {
566 let suspension_token = self.compute_state.suspended_collections.remove(&id);
572 drop(suspension_token);
573 }
574
575 fn handle_allow_compaction(&mut self, id: GlobalId, frontier: Antichain<Timestamp>) {
576 if frontier.is_empty() {
577 self.drop_collection(id);
579 } else {
580 self.compute_state
581 .traces
582 .allow_compaction(id, frontier.borrow());
583 }
584 }
585
586 #[mz_ore::instrument(level = "debug")]
587 fn handle_peek(&mut self, peek: Peek) {
588 let pending = match &peek.target {
589 PeekTarget::Index { id } => {
590 let trace_bundle = self.compute_state.traces.get(id).unwrap().clone();
592 PendingPeek::index(peek, trace_bundle)
593 }
594 PeekTarget::Persist { metadata, .. } => {
595 let metadata = metadata.clone();
596 PendingPeek::persist(
597 peek,
598 Arc::clone(&self.compute_state.persist_clients),
599 metadata,
600 usize::cast_from(self.compute_state.max_result_size),
601 self.timely_worker,
602 )
603 }
604 };
605
606 if let Some(logger) = self.compute_state.compute_logger.as_mut() {
608 logger.log(&pending.as_log_event(true));
609 }
610
611 self.process_peek(&mut Antichain::new(), pending);
612 }
613
614 fn handle_cancel_peek(&mut self, uuid: Uuid) {
615 if let Some(peek) = self.compute_state.pending_peeks.remove(&uuid) {
616 self.send_peek_response(peek, PeekResponse::Canceled);
617 }
618 }
619
620 fn handle_allow_writes(&mut self, id: GlobalId) {
621 self.compute_state.persist_clients.cfg().enable_compaction();
625
626 if let Some(collection) = self.compute_state.collections.get_mut(&id) {
627 collection.allow_writes();
628 } else {
629 soft_panic_or_log!("allow writes for unknown collection {id}");
630 }
631 }
632
633 fn drop_collection(&mut self, id: GlobalId) {
635 let collection = self
636 .compute_state
637 .collections
638 .remove(&id)
639 .expect("dropped untracked collection");
640
641 self.compute_state.traces.remove(&id);
643 self.compute_state.suspended_collections.remove(&id);
645
646 if let Ok(index) = Rc::try_unwrap(collection.dataflow_index) {
648 self.timely_worker.drop_dataflow(index);
649 }
650
651 if !collection.is_subscribe_or_copy {
656 let reported = collection.reported_frontiers;
657 let write_frontier = (!reported.write_frontier.is_empty()).then(Antichain::new);
658 let input_frontier = (!reported.input_frontier.is_empty()).then(Antichain::new);
659 let output_frontier = (!reported.output_frontier.is_empty()).then(Antichain::new);
660
661 let frontiers = FrontiersResponse {
662 write_frontier,
663 input_frontier,
664 output_frontier,
665 };
666 if frontiers.has_updates() {
667 self.send_compute_response(ComputeResponse::Frontiers(id, frontiers));
668 }
669 }
670 }
671
672 pub fn initialize_logging(&mut self, config: LoggingConfig) {
674 if self.compute_state.compute_logger.is_some() {
675 panic!("dataflow server has already initialized logging");
676 }
677
678 let LoggingTraces {
679 traces,
680 dataflow_index,
681 compute_logger: logger,
682 } = logging::initialize(
683 self.timely_worker,
684 &config,
685 self.compute_state.metrics_registry.clone(),
686 Rc::clone(&self.compute_state.worker_config),
687 self.compute_state.workers_per_process,
688 );
689
690 let dataflow_index = Rc::new(dataflow_index);
691 let mut log_index_ids = config.index_logs;
692 for (log, trace) in traces {
693 let id = log_index_ids
695 .remove(&log)
696 .expect("`logging::initialize` does not invent logs");
697 self.compute_state.traces.set(id, trace);
698
699 let is_subscribe_or_copy = false;
701 let as_of = Antichain::from_elem(Timestamp::MIN);
702 let metrics = self.compute_state.metrics.for_collection(id);
703 let mut collection = CollectionState::new(
704 Rc::clone(&dataflow_index),
705 is_subscribe_or_copy,
706 as_of,
707 metrics,
708 );
709
710 let logging =
711 CollectionLogging::new(id, logger.clone(), *dataflow_index, std::iter::empty());
712 collection.logging = Some(logging);
713
714 let existing = self.compute_state.collections.insert(id, collection);
715 if existing.is_some() {
716 error!(
717 id = ?id,
718 "existing collection for newly initialized logging export",
719 );
720 }
721 }
722
723 assert!(
725 log_index_ids.is_empty(),
726 "failed to create requested logging indexes: {log_index_ids:?}",
727 );
728
729 self.compute_state.compute_logger = Some(logger);
730 }
731
732 pub fn report_frontiers(&mut self) {
734 let mut responses = Vec::new();
735
736 let mut new_frontier = Antichain::new();
738
739 for (&id, collection) in self.compute_state.collections.iter_mut() {
740 if collection.is_subscribe_or_copy {
743 continue;
744 }
745
746 let reported = collection.reported_frontiers();
747
748 new_frontier.clear();
750 if let Some(traces) = self.compute_state.traces.get_mut(&id) {
751 assert!(
752 collection.sink_write_frontier.is_none(),
753 "collection {id} has multiple frontiers"
754 );
755 traces.oks_mut().read_upper(&mut new_frontier);
756 } else if let Some(frontier) = &collection.sink_write_frontier {
757 new_frontier.clone_from(&frontier.borrow());
758 } else {
759 error!(id = ?id, "collection without write frontier");
760 continue;
761 }
762 let new_write_frontier = reported
763 .write_frontier
764 .allows_reporting(&new_frontier)
765 .then(|| new_frontier.clone());
766
767 if let Some(probe) = &collection.compute_probe {
780 if *collection.read_only_rx.borrow() {
781 new_frontier.clear();
782 }
783 probe.with_frontier(|frontier| new_frontier.extend(frontier.iter().copied()));
784 }
785 let new_output_frontier = reported
786 .output_frontier
787 .allows_reporting(&new_frontier)
788 .then(|| new_frontier.clone());
789
790 new_frontier.clear();
792 for probe in collection.input_probes.values() {
793 probe.with_frontier(|frontier| new_frontier.extend(frontier.iter().copied()));
794 }
795 let new_input_frontier = reported
796 .input_frontier
797 .allows_reporting(&new_frontier)
798 .then(|| new_frontier.clone());
799
800 if let Some(frontier) = &new_write_frontier {
801 collection
802 .set_reported_write_frontier(ReportedFrontier::Reported(frontier.clone()));
803 }
804 if let Some(frontier) = &new_input_frontier {
805 collection
806 .set_reported_input_frontier(ReportedFrontier::Reported(frontier.clone()));
807 }
808 if let Some(frontier) = &new_output_frontier {
809 collection
810 .set_reported_output_frontier(ReportedFrontier::Reported(frontier.clone()));
811 }
812
813 let response = FrontiersResponse {
814 write_frontier: new_write_frontier,
815 input_frontier: new_input_frontier,
816 output_frontier: new_output_frontier,
817 };
818 if response.has_updates() {
819 responses.push((id, response));
820 }
821 }
822
823 for (id, frontiers) in responses {
824 self.send_compute_response(ComputeResponse::Frontiers(id, frontiers));
825 }
826 }
827
828 pub(crate) fn report_metrics(&self) {
830 if let Some(expiration) = self.compute_state.replica_expiration.as_option() {
831 let now = Duration::from_millis(mz_ore::now::SYSTEM_TIME()).as_secs_f64();
832 let expiration = Duration::from_millis(<u64>::from(expiration)).as_secs_f64();
833 let remaining = expiration - now;
834 self.compute_state
835 .metrics
836 .replica_expiration_remaining_seconds
837 .set(remaining)
838 }
839 }
840
841 fn process_peek(&mut self, upper: &mut Antichain<Timestamp>, mut peek: PendingPeek) {
843 let response = match &mut peek {
844 PendingPeek::Index(peek) => {
845 let start = Instant::now();
846
847 let peek_stash_eligible = peek
848 .peek
849 .finishing
850 .is_streamable(peek.peek.result_desc.arity());
851
852 let peek_stash_enabled = {
853 let enabled = ENABLE_PEEK_RESPONSE_STASH.get(&self.compute_state.worker_config);
854 let peek_persist_stash_available =
855 self.compute_state.peek_stash_persist_location.is_some();
856 if !peek_persist_stash_available && enabled {
857 error!("missing peek_stash_persist_location but peek stash is enabled");
858 }
859 enabled && peek_persist_stash_available
860 };
861
862 let peek_stash_threshold_bytes =
863 PEEK_RESPONSE_STASH_THRESHOLD_BYTES.get(&self.compute_state.worker_config);
864
865 let metrics = IndexPeekMetrics {
866 seek_fulfillment_seconds: &self
867 .compute_state
868 .metrics
869 .index_peek_seek_fulfillment_seconds,
870 frontier_check_seconds: &self
871 .compute_state
872 .metrics
873 .index_peek_frontier_check_seconds,
874 error_scan_seconds: &self.compute_state.metrics.index_peek_error_scan_seconds,
875 cursor_setup_seconds: &self
876 .compute_state
877 .metrics
878 .index_peek_cursor_setup_seconds,
879 row_iteration_seconds: &self
880 .compute_state
881 .metrics
882 .index_peek_row_iteration_seconds,
883 result_sort_seconds: &self.compute_state.metrics.index_peek_result_sort_seconds,
884 row_collection_seconds: &self
885 .compute_state
886 .metrics
887 .index_peek_row_collection_seconds,
888 };
889
890 let status = peek.seek_fulfillment(
891 upper,
892 self.compute_state.max_result_size,
893 peek_stash_enabled && peek_stash_eligible,
894 peek_stash_threshold_bytes,
895 &metrics,
896 );
897
898 self.compute_state
899 .metrics
900 .index_peek_total_seconds
901 .observe(start.elapsed().as_secs_f64());
902
903 match status {
904 PeekStatus::Ready(result) => Some(result),
905 PeekStatus::NotReady => None,
906 PeekStatus::UsePeekStash => {
907 let _span =
908 span!(parent: &peek.span, Level::DEBUG, "process_stash_peek").entered();
909
910 let peek_stash_batch_max_runs = PEEK_RESPONSE_STASH_BATCH_MAX_RUNS
911 .get(&self.compute_state.worker_config);
912
913 let stash_task = peek_stash::StashingPeek::start_upload(
914 Arc::clone(&self.compute_state.persist_clients),
915 self.compute_state
916 .peek_stash_persist_location
917 .as_ref()
918 .expect("verified above"),
919 peek.peek.clone(),
920 peek.trace_bundle.clone(),
921 peek_stash_batch_max_runs,
922 );
923
924 self.compute_state
925 .pending_peeks
926 .insert(peek.peek.uuid, PendingPeek::Stash(stash_task));
927 return;
928 }
929 }
930 }
931 PendingPeek::Persist(peek) => peek.result.try_recv().ok().map(|(result, duration)| {
932 self.compute_state
933 .metrics
934 .persist_peek_seconds
935 .observe(duration.as_secs_f64());
936 result
937 }),
938 PendingPeek::Stash(stashing_peek) => {
939 let num_batches = PEEK_STASH_NUM_BATCHES.get(&self.compute_state.worker_config);
940 let batch_size = PEEK_STASH_BATCH_SIZE.get(&self.compute_state.worker_config);
941 stashing_peek.pump_rows(num_batches, batch_size);
942
943 if let Ok((response, duration)) = stashing_peek.result.try_recv() {
944 self.compute_state
945 .metrics
946 .stashed_peek_seconds
947 .observe(duration.as_secs_f64());
948 trace!(?stashing_peek.peek, ?duration, "finished stashing peek response in persist");
949
950 Some(response)
951 } else {
952 None
953 }
954 }
955 };
956
957 if let Some(response) = response {
958 let _span = span!(parent: peek.span(), Level::DEBUG, "process_peek_response").entered();
959 self.send_peek_response(peek, response)
960 } else {
961 let uuid = peek.peek().uuid;
962 self.compute_state.pending_peeks.insert(uuid, peek);
963 }
964 }
965
966 pub fn process_peeks(&mut self) {
968 let mut upper = Antichain::new();
969 let pending_peeks = std::mem::take(&mut self.compute_state.pending_peeks);
970 for (_uuid, peek) in pending_peeks {
971 self.process_peek(&mut upper, peek);
972 }
973 }
974
975 #[mz_ore::instrument(level = "debug")]
980 fn send_peek_response(&mut self, peek: PendingPeek, response: PeekResponse) {
981 let log_event = peek.as_log_event(false);
982 self.send_compute_response(ComputeResponse::PeekResponse(
984 peek.peek().uuid,
985 response,
986 OpenTelemetryContext::obtain(),
987 ));
988
989 if let Some(logger) = self.compute_state.compute_logger.as_mut() {
991 logger.log(&log_event);
992 }
993 }
994
995 pub fn process_subscribes(&mut self) {
997 let mut subscribe_responses = self.compute_state.subscribe_response_buffer.borrow_mut();
998 for (sink_id, mut response) in subscribe_responses.drain(..) {
999 if let Some(collection) = self.compute_state.collections.get_mut(&sink_id) {
1001 let new_frontier = match &response {
1002 SubscribeResponse::Batch(b) => b.upper.clone(),
1003 SubscribeResponse::DroppedAt(_) => Antichain::new(),
1004 };
1005
1006 let reported = collection.reported_frontiers();
1007 assert!(
1008 reported.write_frontier.allows_reporting(&new_frontier),
1009 "subscribe write frontier regression: {:?} -> {:?}",
1010 reported.write_frontier,
1011 new_frontier,
1012 );
1013 assert!(
1014 reported.input_frontier.allows_reporting(&new_frontier),
1015 "subscribe input frontier regression: {:?} -> {:?}",
1016 reported.input_frontier,
1017 new_frontier,
1018 );
1019
1020 collection
1021 .set_reported_write_frontier(ReportedFrontier::Reported(new_frontier.clone()));
1022 collection
1023 .set_reported_input_frontier(ReportedFrontier::Reported(new_frontier.clone()));
1024 collection.set_reported_output_frontier(ReportedFrontier::Reported(new_frontier));
1025 } else {
1026 }
1029
1030 response
1031 .to_error_if_exceeds(usize::try_from(self.compute_state.max_result_size).unwrap());
1032 self.send_compute_response(ComputeResponse::SubscribeResponse(sink_id, response));
1033 }
1034 }
1035
1036 pub fn process_copy_tos(&self) {
1038 let mut responses = self.compute_state.copy_to_response_buffer.borrow_mut();
1039 for (sink_id, response) in responses.drain(..) {
1040 self.send_compute_response(ComputeResponse::CopyToResponse(sink_id, response));
1041 }
1042 }
1043
1044 fn send_compute_response(&self, response: ComputeResponse) {
1046 let _ = self.response_tx.send(response);
1049 }
1050
1051 pub(crate) fn check_expiration(&self) {
1053 let now = mz_ore::now::SYSTEM_TIME();
1054 if self.compute_state.replica_expiration.less_than(&now.into()) {
1055 let now_datetime = mz_ore::now::to_datetime(now);
1056 let expiration_datetime = self
1057 .compute_state
1058 .replica_expiration
1059 .as_option()
1060 .map(Into::into)
1061 .map(mz_ore::now::to_datetime);
1062
1063 error!(
1066 now,
1067 now_datetime = ?now_datetime,
1068 expiration = ?self.compute_state.replica_expiration.elements(),
1069 expiration_datetime = ?expiration_datetime,
1070 "replica expired"
1071 );
1072
1073 assert!(
1075 !self.compute_state.replica_expiration.less_than(&now.into()),
1076 "replica expired. now: {now} ({now_datetime:?}), expiration: {:?} ({expiration_datetime:?})",
1077 self.compute_state.replica_expiration.elements(),
1078 );
1079 }
1080 }
1081
1082 pub fn determine_dataflow_expiration(
1088 &self,
1089 time_dependence: &TimeDependence,
1090 until: &Antichain<Timestamp>,
1091 ) -> Antichain<Timestamp> {
1092 let iter = self
1097 .compute_state
1098 .replica_expiration
1099 .iter()
1100 .filter_map(|t| time_dependence.apply(*t))
1101 .filter_map(|t| Timestamp::try_step_forward(&t))
1102 .filter(|expiration| !until.less_equal(expiration));
1103 Antichain::from_iter(iter)
1104 }
1105}
1106
1107pub enum PendingPeek {
1112 Index(IndexPeek),
1114 Persist(PersistPeek),
1116 Stash(peek_stash::StashingPeek),
1119}
1120
1121impl PendingPeek {
1122 pub fn as_log_event(&self, installed: bool) -> ComputeEvent {
1124 let peek = self.peek();
1125 let (id, peek_type) = match &peek.target {
1126 PeekTarget::Index { id } => (*id, logging::compute::PeekType::Index),
1127 PeekTarget::Persist { id, .. } => (*id, logging::compute::PeekType::Persist),
1128 };
1129 let uuid = peek.uuid.into_bytes();
1130 ComputeEvent::Peek(PeekEvent {
1131 id,
1132 time: peek.timestamp,
1133 uuid,
1134 peek_type,
1135 installed,
1136 })
1137 }
1138
1139 fn index(peek: Peek, mut trace_bundle: TraceBundle) -> Self {
1140 let empty_frontier = Antichain::new();
1141 let timestamp_frontier = Antichain::from_elem(peek.timestamp);
1142 trace_bundle
1143 .oks_mut()
1144 .set_logical_compaction(timestamp_frontier.borrow());
1145 trace_bundle
1146 .errs_mut()
1147 .set_logical_compaction(timestamp_frontier.borrow());
1148 trace_bundle
1149 .oks_mut()
1150 .set_physical_compaction(empty_frontier.borrow());
1151 trace_bundle
1152 .errs_mut()
1153 .set_physical_compaction(empty_frontier.borrow());
1154
1155 PendingPeek::Index(IndexPeek {
1156 peek,
1157 trace_bundle,
1158 span: tracing::Span::current(),
1159 })
1160 }
1161
1162 fn persist(
1163 peek: Peek,
1164 persist_clients: Arc<PersistClientCache>,
1165 metadata: CollectionMetadata,
1166 max_result_size: usize,
1167 timely_worker: &TimelyWorker,
1168 ) -> Self {
1169 let active_worker = {
1170 let chosen_index = usize::cast_from(peek.uuid.hashed()) % timely_worker.peers();
1172 chosen_index == timely_worker.index()
1173 };
1174 let activator = timely_worker.sync_activator_for([].into());
1175 let peek_uuid = peek.uuid;
1176
1177 let (result_tx, result_rx) = oneshot::channel();
1178 let timestamp = peek.timestamp;
1179 let mfp_plan = peek.map_filter_project.clone();
1180 let max_results_needed = peek
1181 .finishing
1182 .limit
1183 .map(|l| usize::cast_from(u64::from(l)))
1184 .unwrap_or(usize::MAX)
1185 + peek.finishing.offset;
1186 let order_by = peek.finishing.order_by.clone();
1187
1188 let literal_constraint = peek
1190 .literal_constraints
1191 .clone()
1192 .map(|rows| rows.into_element());
1193
1194 let task_handle = mz_ore::task::spawn(|| "persist::peek", async move {
1195 let start = Instant::now();
1196 let result = if active_worker {
1197 PersistPeek::do_peek(
1198 &persist_clients,
1199 metadata,
1200 timestamp,
1201 literal_constraint,
1202 mfp_plan,
1203 max_result_size,
1204 max_results_needed,
1205 )
1206 .await
1207 } else {
1208 Ok(vec![])
1209 };
1210 let result = match result {
1211 Ok(rows) => PeekResponse::Rows(vec![RowCollection::new(rows, &order_by)]),
1212 Err(e) => PeekResponse::Error(e.to_string()),
1213 };
1214 match result_tx.send((result, start.elapsed())) {
1215 Ok(()) => {}
1216 Err((_result, elapsed)) => {
1217 debug!(duration =? elapsed, "dropping result for cancelled peek {peek_uuid}")
1218 }
1219 }
1220 match activator.activate() {
1221 Ok(()) => {}
1222 Err(_) => {
1223 debug!("unable to wake timely after completed peek {peek_uuid}");
1224 }
1225 }
1226 });
1227 PendingPeek::Persist(PersistPeek {
1228 peek,
1229 _abort_handle: task_handle.abort_on_drop(),
1230 result: result_rx,
1231 span: tracing::Span::current(),
1232 })
1233 }
1234
1235 fn span(&self) -> &tracing::Span {
1236 match self {
1237 PendingPeek::Index(p) => &p.span,
1238 PendingPeek::Persist(p) => &p.span,
1239 PendingPeek::Stash(p) => &p.span,
1240 }
1241 }
1242
1243 pub(crate) fn peek(&self) -> &Peek {
1244 match self {
1245 PendingPeek::Index(p) => &p.peek,
1246 PendingPeek::Persist(p) => &p.peek,
1247 PendingPeek::Stash(p) => &p.peek,
1248 }
1249 }
1250}
1251
1252pub struct PersistPeek {
1257 pub(crate) peek: Peek,
1258 _abort_handle: AbortOnDropHandle<()>,
1261 result: oneshot::Receiver<(PeekResponse, Duration)>,
1263 span: tracing::Span,
1265}
1266
1267impl PersistPeek {
1268 async fn do_peek(
1269 persist_clients: &PersistClientCache,
1270 metadata: CollectionMetadata,
1271 as_of: Timestamp,
1272 literal_constraint: Option<Row>,
1273 mfp_plan: SafeMfpPlan,
1274 max_result_size: usize,
1275 mut limit_remaining: usize,
1276 ) -> Result<Vec<(Row, NonZeroUsize)>, String> {
1277 let client = persist_clients
1278 .open(metadata.persist_location)
1279 .await
1280 .map_err(|e| e.to_string())?;
1281
1282 let mut reader: ReadHandle<SourceData, (), Timestamp, StorageDiff> = client
1283 .open_leased_reader(
1284 metadata.data_shard,
1285 Arc::new(metadata.relation_desc.clone()),
1286 Arc::new(UnitSchema),
1287 Diagnostics::from_purpose("persist::peek"),
1288 USE_CRITICAL_SINCE_SNAPSHOT.get(client.dyncfgs()),
1289 )
1290 .await
1291 .map_err(|e| e.to_string())?;
1292
1293 let mut txns_read = if let Some(txns_id) = metadata.txns_shard {
1300 Some(TxnsCache::open(&client, txns_id, Some(metadata.data_shard)).await)
1301 } else {
1302 None
1303 };
1304
1305 let metrics = client.metrics();
1306
1307 let mut cursor = StatsCursor::new(
1308 &mut reader,
1309 txns_read.as_mut(),
1310 metrics,
1311 &mfp_plan,
1312 &metadata.relation_desc,
1313 Antichain::from_elem(as_of),
1314 )
1315 .await
1316 .map_err(|since| {
1317 format!("attempted to peek at {as_of}, but the since has advanced to {since:?}")
1318 })?;
1319
1320 let mut result = vec![];
1322 let mut datum_vec = DatumVec::new();
1323 let mut row_builder = Row::default();
1324 let arena = RowArena::new();
1325 let mut total_size = 0usize;
1326
1327 let literal_len = match &literal_constraint {
1328 None => 0,
1329 Some(row) => row.iter().count(),
1330 };
1331
1332 'collect: while limit_remaining > 0 {
1333 let Some(batch) = cursor.next().await else {
1334 break;
1335 };
1336 for (data, _, d) in batch {
1337 let row = data.map_err(|e| e.to_string())?;
1338
1339 if let Some(literal) = &literal_constraint {
1340 match row.iter().take(literal_len).cmp(literal.iter()) {
1341 Ordering::Less => continue,
1342 Ordering::Equal => {}
1343 Ordering::Greater => break 'collect,
1344 }
1345 }
1346
1347 let count: usize = d.try_into().map_err(|_| {
1348 error!(
1349 shard = %metadata.data_shard, diff = d, ?row,
1350 "persist peek encountered negative multiplicities",
1351 );
1352 format!(
1353 "Invalid data in source, \
1354 saw retractions ({}) for row that does not exist: {:?}",
1355 -d, row,
1356 )
1357 })?;
1358 let Some(count) = NonZeroUsize::new(count) else {
1359 continue;
1360 };
1361 let mut datum_local = datum_vec.borrow_with(&row);
1362 let eval_result = mfp_plan
1363 .evaluate_into(&mut datum_local, &arena, &mut row_builder)
1364 .map(|row| row.cloned())
1365 .map_err(|e| e.to_string())?;
1366 if let Some(row) = eval_result {
1367 total_size = total_size
1368 .saturating_add(row.byte_len())
1369 .saturating_add(std::mem::size_of::<NonZeroUsize>());
1370 if total_size > max_result_size {
1371 return Err(format!(
1372 "result exceeds max size of {}",
1373 ByteSize::b(u64::cast_from(max_result_size))
1374 ));
1375 }
1376 result.push((row, count));
1377 limit_remaining = limit_remaining.saturating_sub(count.get());
1378 if limit_remaining == 0 {
1379 break;
1380 }
1381 }
1382 }
1383 }
1384
1385 Ok(result)
1386 }
1387}
1388
1389pub struct IndexPeek {
1391 peek: Peek,
1392 trace_bundle: TraceBundle,
1394 span: tracing::Span,
1396}
1397
1398pub(crate) struct IndexPeekMetrics<'a> {
1403 pub seek_fulfillment_seconds: &'a prometheus::Histogram,
1404 pub frontier_check_seconds: &'a prometheus::Histogram,
1405 pub error_scan_seconds: &'a prometheus::Histogram,
1406 pub cursor_setup_seconds: &'a prometheus::Histogram,
1407 pub row_iteration_seconds: &'a prometheus::Histogram,
1408 pub result_sort_seconds: &'a prometheus::Histogram,
1409 pub row_collection_seconds: &'a prometheus::Histogram,
1410}
1411
1412impl IndexPeek {
1413 fn seek_fulfillment(
1426 &mut self,
1427 upper: &mut Antichain<Timestamp>,
1428 max_result_size: u64,
1429 peek_stash_eligible: bool,
1430 peek_stash_threshold_bytes: usize,
1431 metrics: &IndexPeekMetrics<'_>,
1432 ) -> PeekStatus {
1433 let method_start = Instant::now();
1434
1435 self.trace_bundle.oks_mut().read_upper(upper);
1436 if upper.less_equal(&self.peek.timestamp) {
1437 return PeekStatus::NotReady;
1438 }
1439 self.trace_bundle.errs_mut().read_upper(upper);
1440 if upper.less_equal(&self.peek.timestamp) {
1441 return PeekStatus::NotReady;
1442 }
1443
1444 let read_frontier = self.trace_bundle.compaction_frontier();
1445 if !read_frontier.less_equal(&self.peek.timestamp) {
1446 let error = format!(
1447 "Arrangement compaction frontier ({:?}) is beyond the time of the attempted read ({})",
1448 read_frontier.elements(),
1449 self.peek.timestamp,
1450 );
1451 return PeekStatus::Ready(PeekResponse::Error(error));
1452 }
1453
1454 metrics
1455 .frontier_check_seconds
1456 .observe(method_start.elapsed().as_secs_f64());
1457
1458 let result = self.collect_finished_data(
1459 max_result_size,
1460 peek_stash_eligible,
1461 peek_stash_threshold_bytes,
1462 metrics,
1463 );
1464
1465 metrics
1466 .seek_fulfillment_seconds
1467 .observe(method_start.elapsed().as_secs_f64());
1468
1469 result
1470 }
1471
1472 fn collect_finished_data(
1474 &mut self,
1475 max_result_size: u64,
1476 peek_stash_eligible: bool,
1477 peek_stash_threshold_bytes: usize,
1478 metrics: &IndexPeekMetrics<'_>,
1479 ) -> PeekStatus {
1480 let error_scan_start = Instant::now();
1481
1482 let (mut cursor, storage) = self.trace_bundle.errs_mut().cursor();
1485 while cursor.key_valid(&storage) {
1486 let mut copies = Diff::ZERO;
1487 cursor.map_times(&storage, |time, diff| {
1488 if time.less_equal(&self.peek.timestamp) {
1489 copies += diff;
1490 }
1491 });
1492 if copies.is_negative() {
1493 let error = cursor.key(&storage);
1494 error!(
1495 target = %self.peek.target.id(), diff = %copies, %error,
1496 "index peek encountered negative multiplicities in error trace",
1497 );
1498 return PeekStatus::Ready(PeekResponse::Error(format!(
1499 "Invalid data in source errors, \
1500 saw retractions ({}) for row that does not exist: {}",
1501 -copies, error,
1502 )));
1503 }
1504 if copies.is_positive() {
1505 return PeekStatus::Ready(PeekResponse::Error(cursor.key(&storage).to_string()));
1506 }
1507 cursor.step_key(&storage);
1508 }
1509
1510 metrics
1511 .error_scan_seconds
1512 .observe(error_scan_start.elapsed().as_secs_f64());
1513
1514 Self::collect_ok_finished_data(
1515 &self.peek,
1516 self.trace_bundle.oks_mut(),
1517 max_result_size,
1518 peek_stash_eligible,
1519 peek_stash_threshold_bytes,
1520 metrics,
1521 )
1522 }
1523
1524 fn collect_ok_finished_data<Tr>(
1526 peek: &Peek,
1527 oks_handle: &mut Tr,
1528 max_result_size: u64,
1529 peek_stash_eligible: bool,
1530 peek_stash_threshold_bytes: usize,
1531 metrics: &IndexPeekMetrics<'_>,
1532 ) -> PeekStatus
1533 where
1534 for<'a> Tr: TraceReader<
1535 Key<'a>: ToDatumIter + Eq,
1536 KeyContainer: BatchContainer<Owned = Row>,
1537 Val<'a>: ToDatumIter,
1538 TimeGat<'a>: PartialOrder<Timestamp>,
1539 DiffGat<'a> = &'a Diff,
1540 >,
1541 {
1542 let max_result_size = usize::cast_from(max_result_size);
1543 let count_byte_size = size_of::<NonZeroUsize>();
1544
1545 let cursor_setup_start = Instant::now();
1547
1548 let mut peek_iterator = peek_result_iterator::PeekResultIterator::new(
1551 peek.target.id().clone(),
1552 peek.map_filter_project.clone(),
1553 peek.timestamp,
1554 peek.literal_constraints.clone().as_deref_mut(),
1555 oks_handle,
1556 );
1557
1558 metrics
1559 .cursor_setup_seconds
1560 .observe(cursor_setup_start.elapsed().as_secs_f64());
1561
1562 let mut results = Vec::new();
1564 let mut total_size: usize = 0;
1565
1566 let max_results = peek.finishing.num_rows_needed();
1572
1573 let comparator = RowComparator::new(peek.finishing.order_by.as_slice());
1574
1575 let row_iteration_start = Instant::now();
1577 let mut sort_time_accum = Duration::ZERO;
1578
1579 while let Some(row) = peek_iterator.next() {
1580 let row: (Row, _) = match row {
1581 Ok(row) => row,
1582 Err(err) => return PeekStatus::Ready(PeekResponse::Error(err)),
1583 };
1584 let (row, copies) = row;
1585 let copies: NonZeroUsize = NonZeroUsize::try_from(copies).expect("fits into usize");
1586
1587 total_size = total_size
1588 .saturating_add(row.byte_len())
1589 .saturating_add(count_byte_size);
1590 if peek_stash_eligible && total_size > peek_stash_threshold_bytes {
1591 return PeekStatus::UsePeekStash;
1592 }
1593 if total_size > max_result_size {
1594 return PeekStatus::Ready(PeekResponse::Error(format!(
1595 "result exceeds max size of {}",
1596 ByteSize::b(u64::cast_from(max_result_size))
1597 )));
1598 }
1599
1600 results.push((row, copies));
1601
1602 if let Some(max_results) = max_results {
1605 if results.len() >= 2 * max_results {
1609 if peek.finishing.order_by.is_empty() {
1610 results.truncate(max_results);
1611 metrics
1612 .row_iteration_seconds
1613 .observe(row_iteration_start.elapsed().as_secs_f64());
1614 metrics
1615 .result_sort_seconds
1616 .observe(sort_time_accum.as_secs_f64());
1617 let row_collection_start = Instant::now();
1618 let collection = RowCollection::new(results, &peek.finishing.order_by);
1619 metrics
1620 .row_collection_seconds
1621 .observe(row_collection_start.elapsed().as_secs_f64());
1622 return PeekStatus::Ready(PeekResponse::Rows(vec![collection]));
1623 } else {
1624 let sort_start = Instant::now();
1633 results.sort_by(|left, right| {
1634 comparator.compare_rows(&left.0, &right.0, || left.0.cmp(&right.0))
1635 });
1636 sort_time_accum += sort_start.elapsed();
1637 let dropped = results.drain(max_results..);
1638 let dropped_size =
1639 dropped
1640 .into_iter()
1641 .fold(0, |acc: usize, (row, _count): (Row, _)| {
1642 acc.saturating_add(
1643 row.byte_len().saturating_add(count_byte_size),
1644 )
1645 });
1646 total_size = total_size.saturating_sub(dropped_size);
1647 }
1648 }
1649 }
1650 }
1651
1652 metrics
1653 .row_iteration_seconds
1654 .observe(row_iteration_start.elapsed().as_secs_f64());
1655 metrics
1656 .result_sort_seconds
1657 .observe(sort_time_accum.as_secs_f64());
1658
1659 let row_collection_start = Instant::now();
1660 let collection = RowCollection::new(results, &peek.finishing.order_by);
1661 metrics
1662 .row_collection_seconds
1663 .observe(row_collection_start.elapsed().as_secs_f64());
1664 PeekStatus::Ready(PeekResponse::Rows(vec![collection]))
1665 }
1666}
1667
1668enum PeekStatus {
1671 NotReady,
1674 UsePeekStash,
1677 Ready(PeekResponse),
1679}
1680
1681#[derive(Debug)]
1683struct ReportedFrontiers {
1684 write_frontier: ReportedFrontier,
1686 input_frontier: ReportedFrontier,
1688 output_frontier: ReportedFrontier,
1690}
1691
1692impl ReportedFrontiers {
1693 fn new() -> Self {
1695 Self {
1696 write_frontier: ReportedFrontier::new(),
1697 input_frontier: ReportedFrontier::new(),
1698 output_frontier: ReportedFrontier::new(),
1699 }
1700 }
1701}
1702
1703#[derive(Clone, Debug)]
1705pub enum ReportedFrontier {
1706 Reported(Antichain<Timestamp>),
1708 NotReported {
1710 lower: Antichain<Timestamp>,
1712 },
1713}
1714
1715impl ReportedFrontier {
1716 pub fn new() -> Self {
1718 let lower = Antichain::from_elem(timely::progress::Timestamp::minimum());
1719 Self::NotReported { lower }
1720 }
1721
1722 pub fn is_empty(&self) -> bool {
1724 match self {
1725 Self::Reported(frontier) => frontier.is_empty(),
1726 Self::NotReported { .. } => false,
1727 }
1728 }
1729
1730 fn allows_reporting(&self, other: &Antichain<Timestamp>) -> bool {
1736 match self {
1737 Self::Reported(frontier) => PartialOrder::less_than(frontier, other),
1738 Self::NotReported { lower } => PartialOrder::less_equal(lower, other),
1739 }
1740 }
1741}
1742
1743pub struct CollectionState {
1745 reported_frontiers: ReportedFrontiers,
1747 dataflow_index: Rc<usize>,
1753 pub is_subscribe_or_copy: bool,
1759 as_of: Antichain<Timestamp>,
1763
1764 pub sink_token: Option<SinkToken>,
1769 pub sink_write_frontier: Option<Rc<RefCell<Antichain<Timestamp>>>>,
1773 pub input_probes: BTreeMap<GlobalId, probe::Handle<Timestamp>>,
1775 pub compute_probe: Option<probe::Handle<Timestamp>>,
1780 logging: Option<CollectionLogging>,
1782 metrics: CollectionMetrics,
1784 read_only_tx: watch::Sender<bool>,
1796 pub read_only_rx: watch::Receiver<bool>,
1798}
1799
1800impl CollectionState {
1801 fn new(
1802 dataflow_index: Rc<usize>,
1803 is_subscribe_or_copy: bool,
1804 as_of: Antichain<Timestamp>,
1805 metrics: CollectionMetrics,
1806 ) -> Self {
1807 let (read_only_tx, read_only_rx) = watch::channel(true);
1810
1811 Self {
1812 reported_frontiers: ReportedFrontiers::new(),
1813 dataflow_index,
1814 is_subscribe_or_copy,
1815 as_of,
1816 sink_token: None,
1817 sink_write_frontier: None,
1818 input_probes: Default::default(),
1819 compute_probe: None,
1820 logging: None,
1821 metrics,
1822 read_only_tx,
1823 read_only_rx,
1824 }
1825 }
1826
1827 fn reported_frontiers(&self) -> &ReportedFrontiers {
1829 &self.reported_frontiers
1830 }
1831
1832 pub fn reset_reported_frontiers(&mut self, frontier: ReportedFrontier) {
1834 self.reported_frontiers.write_frontier = frontier.clone();
1835 self.reported_frontiers.input_frontier = frontier.clone();
1836 self.reported_frontiers.output_frontier = frontier;
1837 }
1838
1839 fn set_reported_write_frontier(&mut self, frontier: ReportedFrontier) {
1841 if let Some(logging) = &mut self.logging {
1842 let time = match &frontier {
1843 ReportedFrontier::Reported(frontier) => frontier.get(0).copied(),
1844 ReportedFrontier::NotReported { .. } => Some(Timestamp::MIN),
1845 };
1846 logging.set_frontier(time);
1847 }
1848
1849 self.reported_frontiers.write_frontier = frontier;
1850 }
1851
1852 fn set_reported_input_frontier(&mut self, frontier: ReportedFrontier) {
1854 if let Some(logging) = &mut self.logging {
1856 for (id, probe) in &self.input_probes {
1857 let new_time = probe.with_frontier(|frontier| frontier.as_option().copied());
1858 logging.set_import_frontier(*id, new_time);
1859 }
1860 }
1861
1862 self.reported_frontiers.input_frontier = frontier;
1863 }
1864
1865 fn set_reported_output_frontier(&mut self, frontier: ReportedFrontier) {
1867 let already_hydrated = self.hydrated();
1868
1869 self.reported_frontiers.output_frontier = frontier;
1870
1871 if !already_hydrated && self.hydrated() {
1872 if let Some(logging) = &mut self.logging {
1873 logging.set_hydrated();
1874 }
1875 self.metrics.record_collection_hydrated();
1876 }
1877 }
1878
1879 fn hydrated(&self) -> bool {
1881 match &self.reported_frontiers.output_frontier {
1882 ReportedFrontier::Reported(frontier) => PartialOrder::less_than(&self.as_of, frontier),
1883 ReportedFrontier::NotReported { .. } => false,
1884 }
1885 }
1886
1887 fn allow_writes(&self) {
1889 info!(
1890 dataflow_index = *self.dataflow_index,
1891 export = ?self.logging.as_ref().map(|l| l.export_id()),
1892 "allowing writes for dataflow",
1893 );
1894 let _ = self.read_only_tx.send(false);
1895 }
1896}