1use std::any::Any;
9use std::cell::RefCell;
10use std::cmp::Ordering;
11use std::collections::{BTreeMap, BTreeSet};
12use std::num::NonZeroUsize;
13use std::rc::Rc;
14use std::sync::Arc;
15use std::time::{Duration, Instant};
16
17use bytesize::ByteSize;
18use differential_dataflow::Hashable;
19use differential_dataflow::lattice::Lattice;
20use differential_dataflow::trace::implementations::BatchContainer;
21use differential_dataflow::trace::{Cursor, TraceReader};
22use mz_compute_client::logging::LoggingConfig;
23use mz_compute_client::protocol::command::{
24 ComputeCommand, ComputeParameters, InstanceConfig, Peek, PeekTarget,
25};
26use mz_compute_client::protocol::history::ComputeCommandHistory;
27use mz_compute_client::protocol::response::{
28 ComputeResponse, CopyToResponse, FrontiersResponse, PeekResponse, SubscribeResponse,
29};
30use mz_compute_types::dataflows::DataflowDescription;
31use mz_compute_types::dyncfgs::{
32 ENABLE_PEEK_RESPONSE_STASH, PEEK_RESPONSE_STASH_BATCH_MAX_RUNS,
33 PEEK_RESPONSE_STASH_THRESHOLD_BYTES, PEEK_STASH_BATCH_SIZE, PEEK_STASH_NUM_BATCHES,
34};
35use mz_compute_types::plan::render_plan::RenderPlan;
36use mz_dyncfg::ConfigSet;
37use mz_expr::row::RowCollection;
38use mz_expr::{RowComparator, SafeMfpPlan};
39use mz_ore::cast::CastFrom;
40use mz_ore::collections::CollectionExt;
41use mz_ore::metrics::{MetricsRegistry, UIntGauge};
42use mz_ore::now::EpochMillis;
43use mz_ore::soft_panic_or_log;
44use mz_ore::task::AbortOnDropHandle;
45use mz_ore::tracing::{OpenTelemetryContext, TracingHandle};
46use mz_persist_client::Diagnostics;
47use mz_persist_client::cache::PersistClientCache;
48use mz_persist_client::cfg::USE_CRITICAL_SINCE_SNAPSHOT;
49use mz_persist_client::read::ReadHandle;
50use mz_persist_types::PersistLocation;
51use mz_persist_types::codec_impls::UnitSchema;
52use mz_repr::fixed_length::ToDatumIter;
53use mz_repr::{DatumVec, Diff, GlobalId, Row, RowArena, Timestamp};
54use mz_storage_operators::stats::StatsCursor;
55use mz_storage_types::StorageDiff;
56use mz_storage_types::controller::CollectionMetadata;
57use mz_storage_types::dyncfgs::ORE_OVERFLOWING_BEHAVIOR;
58use mz_storage_types::sources::SourceData;
59use mz_storage_types::time_dependence::TimeDependence;
60use mz_txn_wal::operator::TxnsContext;
61use mz_txn_wal::txn_cache::TxnsCache;
62use timely::dataflow::operators::probe;
63use timely::order::PartialOrder;
64use timely::progress::frontier::Antichain;
65use timely::worker::Worker as TimelyWorker;
66use tokio::sync::{oneshot, watch};
67use tracing::{Level, debug, error, info, span, trace, warn};
68use uuid::Uuid;
69
70use crate::arrangement::manager::{TraceBundle, TraceManager};
71use crate::logging;
72use crate::logging::compute::{CollectionLogging, ComputeEvent, PeekEvent};
73use crate::logging::initialize::LoggingTraces;
74use crate::metrics::{CollectionMetrics, WorkerMetrics};
75use crate::render::{LinearJoinSpec, StartSignal};
76use crate::server::{ComputeInstanceContext, ResponseSender};
77
78mod peek_result_iterator;
79mod peek_stash;
80
81pub struct ComputeState {
86 pub collections: BTreeMap<GlobalId, CollectionState>,
95 pub traces: TraceManager,
97 pub subscribe_response_buffer: Rc<RefCell<Vec<(GlobalId, SubscribeResponse)>>>,
102 pub copy_to_response_buffer: Rc<RefCell<Vec<(GlobalId, CopyToResponse)>>>,
107 pub pending_peeks: BTreeMap<Uuid, PendingPeek>,
109 pub peek_stash_persist_location: Option<PersistLocation>,
111 pub compute_logger: Option<logging::compute::Logger>,
113 pub persist_clients: Arc<PersistClientCache>,
116 pub txns_ctx: TxnsContext,
118 pub command_history: ComputeCommandHistory<UIntGauge>,
120 max_result_size: u64,
122 pub linear_join_spec: LinearJoinSpec,
124 pub metrics: WorkerMetrics,
126 tracing_handle: Arc<TracingHandle>,
128 pub context: ComputeInstanceContext,
130 pub worker_config: Rc<ConfigSet>,
144
145 pub metrics_registry: MetricsRegistry,
147
148 pub workers_per_process: usize,
150
151 suspended_collections: BTreeMap<GlobalId, Rc<dyn Any>>,
157
158 pub server_maintenance_interval: Duration,
161
162 pub init_system_time: EpochMillis,
166
167 pub replica_expiration: Antichain<Timestamp>,
171
172 pub storage_log_reader: Option<crate::server::StorageTimelyLogReader>,
174}
175
176impl ComputeState {
177 pub fn new(
179 persist_clients: Arc<PersistClientCache>,
180 txns_ctx: TxnsContext,
181 metrics: WorkerMetrics,
182 tracing_handle: Arc<TracingHandle>,
183 context: ComputeInstanceContext,
184 metrics_registry: MetricsRegistry,
185 workers_per_process: usize,
186 storage_log_reader: Option<crate::server::StorageTimelyLogReader>,
187 ) -> Self {
188 let traces = TraceManager::new(metrics.clone());
189 let command_history = ComputeCommandHistory::new(metrics.for_history());
190
191 Self {
192 collections: Default::default(),
193 traces,
194 subscribe_response_buffer: Default::default(),
195 copy_to_response_buffer: Default::default(),
196 pending_peeks: Default::default(),
197 peek_stash_persist_location: None,
198 compute_logger: None,
199 persist_clients,
200 txns_ctx,
201 command_history,
202 max_result_size: u64::MAX,
203 linear_join_spec: Default::default(),
204 metrics,
205 tracing_handle,
206 context,
207 worker_config: mz_dyncfgs::all_dyncfgs().into(),
208 metrics_registry,
209 workers_per_process,
210 suspended_collections: Default::default(),
211 server_maintenance_interval: Duration::ZERO,
212 init_system_time: mz_ore::now::SYSTEM_TIME(),
213 replica_expiration: Antichain::default(),
214 storage_log_reader,
215 }
216 }
217
218 pub fn expect_collection_mut(&mut self, id: GlobalId) -> &mut CollectionState {
222 self.collections
223 .get_mut(&id)
224 .expect("collection must exist")
225 }
226
227 pub fn input_probe_for(
233 &mut self,
234 input_id: GlobalId,
235 collection_ids: impl Iterator<Item = GlobalId>,
236 ) -> probe::Handle<Timestamp> {
237 let probe = probe::Handle::default();
238 for id in collection_ids {
239 if let Some(collection) = self.collections.get_mut(&id) {
240 collection.input_probes.insert(input_id, probe.clone());
241 }
242 }
243 probe
244 }
245
246 fn apply_worker_config(&mut self) {
248 use mz_compute_types::dyncfgs::*;
249
250 let config = &self.worker_config;
251
252 self.linear_join_spec = LinearJoinSpec::from_config(config);
253
254 if ENABLE_LGALLOC.get(config) {
255 if let Some(path) = &self.context.scratch_directory {
256 let clear_bytes = LGALLOC_SLOW_CLEAR_BYTES.get(config);
257 let eager_return = ENABLE_LGALLOC_EAGER_RECLAMATION.get(config);
258 let file_growth_dampener = LGALLOC_FILE_GROWTH_DAMPENER.get(config);
259 let interval = LGALLOC_BACKGROUND_INTERVAL.get(config);
260 let local_buffer_bytes = LGALLOC_LOCAL_BUFFER_BYTES.get(config);
261 info!(
262 ?path,
263 backgrund_interval=?interval,
264 clear_bytes,
265 eager_return,
266 file_growth_dampener,
267 local_buffer_bytes,
268 "enabling lgalloc"
269 );
270 let background_worker_config = lgalloc::BackgroundWorkerConfig {
271 interval,
272 clear_bytes,
273 };
274 lgalloc::lgalloc_set_config(
275 lgalloc::LgAlloc::new()
276 .enable()
277 .with_path(path.clone())
278 .with_background_config(background_worker_config)
279 .eager_return(eager_return)
280 .file_growth_dampener(file_growth_dampener)
281 .local_buffer_bytes(local_buffer_bytes),
282 );
283 } else {
284 debug!("not enabling lgalloc, scratch directory not specified");
285 }
286 } else {
287 info!("disabling lgalloc");
288 lgalloc::lgalloc_set_config(lgalloc::LgAlloc::new().disable());
289 }
290
291 crate::memory_limiter::apply_limiter_config(config);
292
293 mz_ore::region::ENABLE_LGALLOC_REGION.store(
294 ENABLE_COLUMNATION_LGALLOC.get(config),
295 std::sync::atomic::Ordering::Relaxed,
296 );
297
298 self.server_maintenance_interval = COMPUTE_SERVER_MAINTENANCE_INTERVAL.get(config);
301
302 let overflowing_behavior = ORE_OVERFLOWING_BEHAVIOR.get(config);
303 match overflowing_behavior.parse() {
304 Ok(behavior) => mz_ore::overflowing::set_behavior(behavior),
305 Err(err) => {
306 error!(
307 err,
308 overflowing_behavior, "Invalid value for ore_overflowing_behavior"
309 );
310 }
311 }
312 }
313
314 pub fn apply_expiration_offset(&mut self, offset: Duration) {
320 if self.replica_expiration.is_empty() {
321 let offset: EpochMillis = offset
322 .as_millis()
323 .try_into()
324 .expect("duration must fit within u64");
325 let replica_expiration_millis = self.init_system_time + offset;
326 let replica_expiration = Timestamp::from(replica_expiration_millis);
327
328 info!(
329 offset = %offset,
330 replica_expiration_millis = %replica_expiration_millis,
331 replica_expiration_utc = %mz_ore::now::to_datetime(replica_expiration_millis),
332 "setting replica expiration",
333 );
334 self.replica_expiration = Antichain::from_elem(replica_expiration);
335
336 self.metrics
338 .replica_expiration_timestamp_seconds
339 .set(replica_expiration.into());
340 }
341 }
342
343 pub fn dataflow_max_inflight_bytes(&self) -> Option<usize> {
346 use mz_compute_types::dyncfgs::{
347 DATAFLOW_MAX_INFLIGHT_BYTES, DATAFLOW_MAX_INFLIGHT_BYTES_CC,
348 };
349
350 if self.persist_clients.cfg.is_cc_active {
351 DATAFLOW_MAX_INFLIGHT_BYTES_CC.get(&self.worker_config)
352 } else {
353 DATAFLOW_MAX_INFLIGHT_BYTES.get(&self.worker_config)
354 }
355 }
356}
357
358pub(crate) struct ActiveComputeState<'a> {
360 pub timely_worker: &'a mut TimelyWorker,
362 pub compute_state: &'a mut ComputeState,
364 pub response_tx: &'a mut ResponseSender,
366}
367
368pub struct SinkToken(#[allow(dead_code)] Box<dyn Any>);
370
371impl SinkToken {
372 pub fn new(t: Box<dyn Any>) -> Self {
374 Self(t)
375 }
376}
377
378impl<'a> ActiveComputeState<'a> {
379 #[mz_ore::instrument(level = "debug")]
381 pub fn handle_compute_command(&mut self, cmd: ComputeCommand) {
382 use ComputeCommand::*;
383
384 self.compute_state.command_history.push(cmd.clone());
385
386 let timer = self
388 .compute_state
389 .metrics
390 .handle_command_duration_seconds
391 .for_command(&cmd)
392 .start_timer();
393
394 match cmd {
395 Hello { .. } => panic!("Hello must be captured before"),
396 CreateInstance(instance_config) => self.handle_create_instance(*instance_config),
397 InitializationComplete => (),
398 UpdateConfiguration(params) => self.handle_update_configuration(*params),
399 CreateDataflow(dataflow) => self.handle_create_dataflow(*dataflow),
400 Schedule(id) => self.handle_schedule(id),
401 AllowCompaction { id, frontier } => self.handle_allow_compaction(id, frontier),
402 Peek(peek) => {
403 peek.otel_ctx.attach_as_parent();
404 self.handle_peek(*peek)
405 }
406 CancelPeek { uuid } => self.handle_cancel_peek(uuid),
407 AllowWrites(id) => {
408 self.handle_allow_writes(id);
409 }
410 }
411
412 timer.observe_duration();
413 }
414
415 fn handle_create_instance(&mut self, config: InstanceConfig) {
416 self.compute_state.apply_worker_config();
418 if let Some(offset) = config.expiration_offset {
419 self.compute_state.apply_expiration_offset(offset);
420 }
421
422 let storage_log_reader = self.compute_state.storage_log_reader.take();
423 self.initialize_logging(config.logging, storage_log_reader);
424
425 self.compute_state.peek_stash_persist_location = Some(config.peek_stash_persist_location);
426 }
427
428 fn handle_update_configuration(&mut self, params: ComputeParameters) {
429 debug!("Applying configuration update: {params:?}");
430
431 let ComputeParameters {
432 workload_class,
433 max_result_size,
434 tracing,
435 grpc_client: _grpc_client,
436 dyncfg_updates,
437 } = params;
438
439 if let Some(v) = workload_class {
440 self.compute_state.metrics.set_workload_class(v);
441 }
442 if let Some(v) = max_result_size {
443 self.compute_state.max_result_size = v;
444 }
445
446 tracing.apply(self.compute_state.tracing_handle.as_ref());
447
448 dyncfg_updates.apply(&self.compute_state.worker_config);
449 self.compute_state
450 .persist_clients
451 .cfg()
452 .apply_from(&dyncfg_updates);
453
454 mz_metrics::update_dyncfg(&dyncfg_updates);
458
459 self.compute_state.apply_worker_config();
460 }
461
462 fn handle_create_dataflow(
463 &mut self,
464 dataflow: DataflowDescription<RenderPlan, CollectionMetadata>,
465 ) {
466 let dataflow_index = Rc::new(self.timely_worker.next_dataflow_index());
467 let as_of = dataflow.as_of.clone().unwrap();
468
469 let dataflow_expiration = dataflow
470 .time_dependence
471 .as_ref()
472 .map(|time_dependence| {
473 self.determine_dataflow_expiration(time_dependence, &dataflow.until)
474 })
475 .unwrap_or_default();
476
477 let until = dataflow.until.meet(&dataflow_expiration);
479
480 if dataflow.is_transient() {
481 debug!(
482 name = %dataflow.debug_name,
483 import_ids = %dataflow.display_import_ids(),
484 export_ids = %dataflow.display_export_ids(),
485 as_of = ?as_of.elements(),
486 time_dependence = ?dataflow.time_dependence,
487 expiration = ?dataflow_expiration.elements(),
488 expiration_datetime = ?dataflow_expiration
489 .as_option()
490 .map(|t| mz_ore::now::to_datetime(t.into())),
491 plan_until = ?dataflow.until.elements(),
492 until = ?until.elements(),
493 "creating dataflow",
494 );
495 } else {
496 info!(
497 name = %dataflow.debug_name,
498 import_ids = %dataflow.display_import_ids(),
499 export_ids = %dataflow.display_export_ids(),
500 as_of = ?as_of.elements(),
501 time_dependence = ?dataflow.time_dependence,
502 expiration = ?dataflow_expiration.elements(),
503 expiration_datetime = ?dataflow_expiration
504 .as_option()
505 .map(|t| mz_ore::now::to_datetime(t.into())),
506 plan_until = ?dataflow.until.elements(),
507 until = ?until.elements(),
508 "creating dataflow",
509 );
510 };
511
512 let subscribe_copy_ids: BTreeSet<_> = dataflow
513 .subscribe_ids()
514 .chain(dataflow.copy_to_ids())
515 .collect();
516
517 for object_id in dataflow.export_ids() {
519 let is_subscribe_or_copy = subscribe_copy_ids.contains(&object_id);
520 let metrics = self.compute_state.metrics.for_collection(object_id);
521 let mut collection = CollectionState::new(
522 Rc::clone(&dataflow_index),
523 is_subscribe_or_copy,
524 as_of.clone(),
525 metrics,
526 );
527
528 if let Some(logger) = self.compute_state.compute_logger.clone() {
529 let logging = CollectionLogging::new(
530 object_id,
531 logger,
532 *dataflow_index,
533 dataflow.import_ids(),
534 );
535 collection.logging = Some(logging);
536 }
537
538 collection.reset_reported_frontiers(ReportedFrontier::NotReported {
539 lower: as_of.clone(),
540 });
541
542 let existing = self.compute_state.collections.insert(object_id, collection);
543 if existing.is_some() {
544 error!(
545 id = ?object_id,
546 "existing collection for newly created dataflow",
547 );
548 }
549 }
550
551 let (start_signal, suspension_token) = StartSignal::new();
552 for id in dataflow.export_ids() {
553 self.compute_state
554 .suspended_collections
555 .insert(id, Rc::clone(&suspension_token));
556 }
557
558 crate::render::build_compute_dataflow(
559 self.timely_worker,
560 self.compute_state,
561 dataflow,
562 start_signal,
563 until,
564 dataflow_expiration,
565 );
566 }
567
568 fn handle_schedule(&mut self, id: GlobalId) {
569 let suspension_token = self.compute_state.suspended_collections.remove(&id);
575 drop(suspension_token);
576 }
577
578 fn handle_allow_compaction(&mut self, id: GlobalId, frontier: Antichain<Timestamp>) {
579 if frontier.is_empty() {
580 self.drop_collection(id);
582 } else {
583 self.compute_state
584 .traces
585 .allow_compaction(id, frontier.borrow());
586 }
587 }
588
589 #[mz_ore::instrument(level = "debug")]
590 fn handle_peek(&mut self, peek: Peek) {
591 let pending = match &peek.target {
592 PeekTarget::Index { id } => {
593 let trace_bundle = self.compute_state.traces.get(id).unwrap().clone();
595 PendingPeek::index(peek, trace_bundle)
596 }
597 PeekTarget::Persist { metadata, .. } => {
598 let metadata = metadata.clone();
599 PendingPeek::persist(
600 peek,
601 Arc::clone(&self.compute_state.persist_clients),
602 metadata,
603 usize::cast_from(self.compute_state.max_result_size),
604 self.timely_worker,
605 )
606 }
607 };
608
609 if let Some(logger) = self.compute_state.compute_logger.as_mut() {
611 logger.log(&pending.as_log_event(true));
612 }
613
614 self.process_peek(&mut Antichain::new(), pending);
615 }
616
617 fn handle_cancel_peek(&mut self, uuid: Uuid) {
618 if let Some(peek) = self.compute_state.pending_peeks.remove(&uuid) {
619 self.send_peek_response(peek, PeekResponse::Canceled);
620 }
621 }
622
623 fn handle_allow_writes(&mut self, id: GlobalId) {
624 self.compute_state.persist_clients.cfg().enable_compaction();
628
629 if let Some(collection) = self.compute_state.collections.get_mut(&id) {
630 collection.allow_writes();
631 } else {
632 soft_panic_or_log!("allow writes for unknown collection {id}");
633 }
634 }
635
636 fn drop_collection(&mut self, id: GlobalId) {
638 let collection = self
639 .compute_state
640 .collections
641 .remove(&id)
642 .expect("dropped untracked collection");
643
644 self.compute_state.traces.remove(&id);
646 self.compute_state.suspended_collections.remove(&id);
648
649 if let Ok(index) = Rc::try_unwrap(collection.dataflow_index) {
651 self.timely_worker.drop_dataflow(index);
652 }
653
654 if !collection.is_subscribe_or_copy {
659 let reported = collection.reported_frontiers;
660 let write_frontier = (!reported.write_frontier.is_empty()).then(Antichain::new);
661 let input_frontier = (!reported.input_frontier.is_empty()).then(Antichain::new);
662 let output_frontier = (!reported.output_frontier.is_empty()).then(Antichain::new);
663
664 let frontiers = FrontiersResponse {
665 write_frontier,
666 input_frontier,
667 output_frontier,
668 };
669 if frontiers.has_updates() {
670 self.send_compute_response(ComputeResponse::Frontiers(id, frontiers));
671 }
672 }
673 }
674
675 pub fn initialize_logging(
677 &mut self,
678 config: LoggingConfig,
679 storage_log_reader: Option<crate::server::StorageTimelyLogReader>,
680 ) {
681 if self.compute_state.compute_logger.is_some() {
682 panic!("dataflow server has already initialized logging");
683 }
684
685 let LoggingTraces {
686 traces,
687 dataflow_index,
688 compute_logger: logger,
689 } = logging::initialize(
690 self.timely_worker,
691 &config,
692 self.compute_state.metrics_registry.clone(),
693 Rc::clone(&self.compute_state.worker_config),
694 self.compute_state.workers_per_process,
695 storage_log_reader,
696 );
697
698 let dataflow_index = Rc::new(dataflow_index);
699 let mut log_index_ids = config.index_logs;
700 for (log, trace) in traces {
701 let id = log_index_ids
703 .remove(&log)
704 .expect("`logging::initialize` does not invent logs");
705 self.compute_state.traces.set(id, trace);
706
707 let is_subscribe_or_copy = false;
709 let as_of = Antichain::from_elem(Timestamp::MIN);
710 let metrics = self.compute_state.metrics.for_collection(id);
711 let mut collection = CollectionState::new(
712 Rc::clone(&dataflow_index),
713 is_subscribe_or_copy,
714 as_of,
715 metrics,
716 );
717
718 let logging =
719 CollectionLogging::new(id, logger.clone(), *dataflow_index, std::iter::empty());
720 collection.logging = Some(logging);
721
722 let existing = self.compute_state.collections.insert(id, collection);
723 if existing.is_some() {
724 error!(
725 id = ?id,
726 "existing collection for newly initialized logging export",
727 );
728 }
729 }
730
731 assert!(
733 log_index_ids.is_empty(),
734 "failed to create requested logging indexes: {log_index_ids:?}",
735 );
736
737 self.compute_state.compute_logger = Some(logger);
738 }
739
740 pub fn report_frontiers(&mut self) {
742 let mut responses = Vec::new();
743
744 let mut new_frontier = Antichain::new();
746
747 for (&id, collection) in self.compute_state.collections.iter_mut() {
748 if collection.is_subscribe_or_copy {
751 continue;
752 }
753
754 let reported = collection.reported_frontiers();
755
756 new_frontier.clear();
758 if let Some(traces) = self.compute_state.traces.get_mut(&id) {
759 assert!(
760 collection.sink_write_frontier.is_none(),
761 "collection {id} has multiple frontiers"
762 );
763 traces.oks_mut().read_upper(&mut new_frontier);
764 } else if let Some(frontier) = &collection.sink_write_frontier {
765 new_frontier.clone_from(&frontier.borrow());
766 } else {
767 error!(id = ?id, "collection without write frontier");
768 continue;
769 }
770 let new_write_frontier = reported
771 .write_frontier
772 .allows_reporting(&new_frontier)
773 .then(|| new_frontier.clone());
774
775 if let Some(probe) = &collection.compute_probe {
788 if *collection.read_only_rx.borrow() {
789 new_frontier.clear();
790 }
791 probe.with_frontier(|frontier| new_frontier.extend(frontier.iter().copied()));
792 }
793 let new_output_frontier = reported
794 .output_frontier
795 .allows_reporting(&new_frontier)
796 .then(|| new_frontier.clone());
797
798 new_frontier.clear();
800 for probe in collection.input_probes.values() {
801 probe.with_frontier(|frontier| new_frontier.extend(frontier.iter().copied()));
802 }
803 let new_input_frontier = reported
804 .input_frontier
805 .allows_reporting(&new_frontier)
806 .then(|| new_frontier.clone());
807
808 if let Some(frontier) = &new_write_frontier {
809 collection
810 .set_reported_write_frontier(ReportedFrontier::Reported(frontier.clone()));
811 }
812 if let Some(frontier) = &new_input_frontier {
813 collection
814 .set_reported_input_frontier(ReportedFrontier::Reported(frontier.clone()));
815 }
816 if let Some(frontier) = &new_output_frontier {
817 collection
818 .set_reported_output_frontier(ReportedFrontier::Reported(frontier.clone()));
819 }
820
821 let response = FrontiersResponse {
822 write_frontier: new_write_frontier,
823 input_frontier: new_input_frontier,
824 output_frontier: new_output_frontier,
825 };
826 if response.has_updates() {
827 responses.push((id, response));
828 }
829 }
830
831 for (id, frontiers) in responses {
832 self.send_compute_response(ComputeResponse::Frontiers(id, frontiers));
833 }
834 }
835
836 pub(crate) fn report_metrics(&self) {
838 if let Some(expiration) = self.compute_state.replica_expiration.as_option() {
839 let now = Duration::from_millis(mz_ore::now::SYSTEM_TIME()).as_secs_f64();
840 let expiration = Duration::from_millis(<u64>::from(expiration)).as_secs_f64();
841 let remaining = expiration - now;
842 self.compute_state
843 .metrics
844 .replica_expiration_remaining_seconds
845 .set(remaining)
846 }
847 }
848
849 fn process_peek(&mut self, upper: &mut Antichain<Timestamp>, mut peek: PendingPeek) {
851 let response = match &mut peek {
852 PendingPeek::Index(peek) => {
853 let start = Instant::now();
854
855 let peek_stash_eligible = peek
856 .peek
857 .finishing
858 .is_streamable(peek.peek.result_desc.arity());
859
860 let peek_stash_enabled = {
861 let enabled = ENABLE_PEEK_RESPONSE_STASH.get(&self.compute_state.worker_config);
862 let peek_persist_stash_available =
863 self.compute_state.peek_stash_persist_location.is_some();
864 if !peek_persist_stash_available && enabled {
865 error!("missing peek_stash_persist_location but peek stash is enabled");
866 }
867 enabled && peek_persist_stash_available
868 };
869
870 let peek_stash_threshold_bytes =
871 PEEK_RESPONSE_STASH_THRESHOLD_BYTES.get(&self.compute_state.worker_config);
872
873 let metrics = IndexPeekMetrics {
874 seek_fulfillment_seconds: &self
875 .compute_state
876 .metrics
877 .index_peek_seek_fulfillment_seconds,
878 frontier_check_seconds: &self
879 .compute_state
880 .metrics
881 .index_peek_frontier_check_seconds,
882 error_scan_seconds: &self.compute_state.metrics.index_peek_error_scan_seconds,
883 cursor_setup_seconds: &self
884 .compute_state
885 .metrics
886 .index_peek_cursor_setup_seconds,
887 row_iteration_seconds: &self
888 .compute_state
889 .metrics
890 .index_peek_row_iteration_seconds,
891 result_sort_seconds: &self.compute_state.metrics.index_peek_result_sort_seconds,
892 row_collection_seconds: &self
893 .compute_state
894 .metrics
895 .index_peek_row_collection_seconds,
896 };
897
898 let status = peek.seek_fulfillment(
899 upper,
900 self.compute_state.max_result_size,
901 peek_stash_enabled && peek_stash_eligible,
902 peek_stash_threshold_bytes,
903 &metrics,
904 );
905
906 self.compute_state
907 .metrics
908 .index_peek_total_seconds
909 .observe(start.elapsed().as_secs_f64());
910
911 match status {
912 PeekStatus::Ready(result) => Some(result),
913 PeekStatus::NotReady => None,
914 PeekStatus::UsePeekStash => {
915 let _span =
916 span!(parent: &peek.span, Level::DEBUG, "process_stash_peek").entered();
917
918 let peek_stash_batch_max_runs = PEEK_RESPONSE_STASH_BATCH_MAX_RUNS
919 .get(&self.compute_state.worker_config);
920
921 let stash_task = peek_stash::StashingPeek::start_upload(
922 Arc::clone(&self.compute_state.persist_clients),
923 self.compute_state
924 .peek_stash_persist_location
925 .as_ref()
926 .expect("verified above"),
927 peek.peek.clone(),
928 peek.trace_bundle.clone(),
929 peek_stash_batch_max_runs,
930 );
931
932 self.compute_state
933 .pending_peeks
934 .insert(peek.peek.uuid, PendingPeek::Stash(stash_task));
935 return;
936 }
937 }
938 }
939 PendingPeek::Persist(peek) => peek.result.try_recv().ok().map(|(result, duration)| {
940 self.compute_state
941 .metrics
942 .persist_peek_seconds
943 .observe(duration.as_secs_f64());
944 result
945 }),
946 PendingPeek::Stash(stashing_peek) => {
947 let num_batches = PEEK_STASH_NUM_BATCHES.get(&self.compute_state.worker_config);
948 let batch_size = PEEK_STASH_BATCH_SIZE.get(&self.compute_state.worker_config);
949 stashing_peek.pump_rows(num_batches, batch_size);
950
951 if let Ok((response, duration)) = stashing_peek.result.try_recv() {
952 self.compute_state
953 .metrics
954 .stashed_peek_seconds
955 .observe(duration.as_secs_f64());
956 trace!(?stashing_peek.peek, ?duration, "finished stashing peek response in persist");
957
958 Some(response)
959 } else {
960 None
961 }
962 }
963 };
964
965 if let Some(response) = response {
966 let _span = span!(parent: peek.span(), Level::DEBUG, "process_peek_response").entered();
967 self.send_peek_response(peek, response)
968 } else {
969 let uuid = peek.peek().uuid;
970 self.compute_state.pending_peeks.insert(uuid, peek);
971 }
972 }
973
974 pub fn process_peeks(&mut self) {
976 let mut upper = Antichain::new();
977 let pending_peeks = std::mem::take(&mut self.compute_state.pending_peeks);
978 for (_uuid, peek) in pending_peeks {
979 self.process_peek(&mut upper, peek);
980 }
981 }
982
983 #[mz_ore::instrument(level = "debug")]
988 fn send_peek_response(&mut self, peek: PendingPeek, response: PeekResponse) {
989 let log_event = peek.as_log_event(false);
990 self.send_compute_response(ComputeResponse::PeekResponse(
992 peek.peek().uuid,
993 response,
994 OpenTelemetryContext::obtain(),
995 ));
996
997 if let Some(logger) = self.compute_state.compute_logger.as_mut() {
999 logger.log(&log_event);
1000 }
1001 }
1002
1003 pub fn process_subscribes(&mut self) {
1005 let mut subscribe_responses = self.compute_state.subscribe_response_buffer.borrow_mut();
1006 for (sink_id, mut response) in subscribe_responses.drain(..) {
1007 if let Some(collection) = self.compute_state.collections.get_mut(&sink_id) {
1009 let new_frontier = match &response {
1010 SubscribeResponse::Batch(b) => b.upper.clone(),
1011 SubscribeResponse::DroppedAt(_) => Antichain::new(),
1012 };
1013
1014 let reported = collection.reported_frontiers();
1015 assert!(
1016 reported.write_frontier.allows_reporting(&new_frontier),
1017 "subscribe write frontier regression: {:?} -> {:?}",
1018 reported.write_frontier,
1019 new_frontier,
1020 );
1021 assert!(
1022 reported.input_frontier.allows_reporting(&new_frontier),
1023 "subscribe input frontier regression: {:?} -> {:?}",
1024 reported.input_frontier,
1025 new_frontier,
1026 );
1027
1028 collection
1029 .set_reported_write_frontier(ReportedFrontier::Reported(new_frontier.clone()));
1030 collection
1031 .set_reported_input_frontier(ReportedFrontier::Reported(new_frontier.clone()));
1032 collection.set_reported_output_frontier(ReportedFrontier::Reported(new_frontier));
1033 } else {
1034 }
1037
1038 response
1039 .to_error_if_exceeds(usize::try_from(self.compute_state.max_result_size).unwrap());
1040 self.send_compute_response(ComputeResponse::SubscribeResponse(sink_id, response));
1041 }
1042 }
1043
1044 pub fn process_copy_tos(&self) {
1046 let mut responses = self.compute_state.copy_to_response_buffer.borrow_mut();
1047 for (sink_id, response) in responses.drain(..) {
1048 self.send_compute_response(ComputeResponse::CopyToResponse(sink_id, response));
1049 }
1050 }
1051
1052 fn send_compute_response(&self, response: ComputeResponse) {
1054 let _ = self.response_tx.send(response);
1057 }
1058
1059 pub(crate) fn check_expiration(&self) {
1061 let now = mz_ore::now::SYSTEM_TIME();
1062 if self.compute_state.replica_expiration.less_than(&now.into()) {
1063 let now_datetime = mz_ore::now::to_datetime(now);
1064 let expiration_datetime = self
1065 .compute_state
1066 .replica_expiration
1067 .as_option()
1068 .map(Into::into)
1069 .map(mz_ore::now::to_datetime);
1070
1071 error!(
1074 now,
1075 now_datetime = ?now_datetime,
1076 expiration = ?self.compute_state.replica_expiration.elements(),
1077 expiration_datetime = ?expiration_datetime,
1078 "replica expired"
1079 );
1080
1081 assert!(
1083 !self.compute_state.replica_expiration.less_than(&now.into()),
1084 "replica expired. now: {now} ({now_datetime:?}), expiration: {:?} ({expiration_datetime:?})",
1085 self.compute_state.replica_expiration.elements(),
1086 );
1087 }
1088 }
1089
1090 pub fn determine_dataflow_expiration(
1096 &self,
1097 time_dependence: &TimeDependence,
1098 until: &Antichain<Timestamp>,
1099 ) -> Antichain<Timestamp> {
1100 let iter = self
1105 .compute_state
1106 .replica_expiration
1107 .iter()
1108 .filter_map(|t| time_dependence.apply(*t))
1109 .filter_map(|t| Timestamp::try_step_forward(&t))
1110 .filter(|expiration| !until.less_equal(expiration));
1111 Antichain::from_iter(iter)
1112 }
1113}
1114
1115pub enum PendingPeek {
1120 Index(IndexPeek),
1122 Persist(PersistPeek),
1124 Stash(peek_stash::StashingPeek),
1127}
1128
1129impl PendingPeek {
1130 pub fn as_log_event(&self, installed: bool) -> ComputeEvent {
1132 let peek = self.peek();
1133 let (id, peek_type) = match &peek.target {
1134 PeekTarget::Index { id } => (*id, logging::compute::PeekType::Index),
1135 PeekTarget::Persist { id, .. } => (*id, logging::compute::PeekType::Persist),
1136 };
1137 let uuid = peek.uuid.into_bytes();
1138 ComputeEvent::Peek(PeekEvent {
1139 id,
1140 time: peek.timestamp,
1141 uuid,
1142 peek_type,
1143 installed,
1144 })
1145 }
1146
1147 fn index(peek: Peek, mut trace_bundle: TraceBundle) -> Self {
1148 let empty_frontier = Antichain::new();
1149 let timestamp_frontier = Antichain::from_elem(peek.timestamp);
1150 trace_bundle
1151 .oks_mut()
1152 .set_logical_compaction(timestamp_frontier.borrow());
1153 trace_bundle
1154 .errs_mut()
1155 .set_logical_compaction(timestamp_frontier.borrow());
1156 trace_bundle
1157 .oks_mut()
1158 .set_physical_compaction(empty_frontier.borrow());
1159 trace_bundle
1160 .errs_mut()
1161 .set_physical_compaction(empty_frontier.borrow());
1162
1163 PendingPeek::Index(IndexPeek {
1164 peek,
1165 trace_bundle,
1166 span: tracing::Span::current(),
1167 })
1168 }
1169
1170 fn persist(
1171 peek: Peek,
1172 persist_clients: Arc<PersistClientCache>,
1173 metadata: CollectionMetadata,
1174 max_result_size: usize,
1175 timely_worker: &TimelyWorker,
1176 ) -> Self {
1177 let active_worker = {
1178 let chosen_index = usize::cast_from(peek.uuid.hashed()) % timely_worker.peers();
1180 chosen_index == timely_worker.index()
1181 };
1182 let activator = timely_worker.sync_activator_for([].into());
1183 let peek_uuid = peek.uuid;
1184
1185 let (result_tx, result_rx) = oneshot::channel();
1186 let timestamp = peek.timestamp;
1187 let mfp_plan = peek.map_filter_project.clone();
1188 let max_results_needed = peek
1189 .finishing
1190 .limit
1191 .map(|l| usize::cast_from(u64::from(l)))
1192 .unwrap_or(usize::MAX)
1193 + peek.finishing.offset;
1194 let order_by = peek.finishing.order_by.clone();
1195
1196 let literal_constraint = peek
1198 .literal_constraints
1199 .clone()
1200 .map(|rows| rows.into_element());
1201
1202 let task_handle = mz_ore::task::spawn(|| "persist::peek", async move {
1203 let start = Instant::now();
1204 let result = if active_worker {
1205 PersistPeek::do_peek(
1206 &persist_clients,
1207 metadata,
1208 timestamp,
1209 literal_constraint,
1210 mfp_plan,
1211 max_result_size,
1212 max_results_needed,
1213 )
1214 .await
1215 } else {
1216 Ok(vec![])
1217 };
1218 let result = match result {
1219 Ok(rows) => PeekResponse::Rows(vec![RowCollection::new(rows, &order_by)]),
1220 Err(e) => PeekResponse::Error(e.to_string()),
1221 };
1222 match result_tx.send((result, start.elapsed())) {
1223 Ok(()) => {}
1224 Err((_result, elapsed)) => {
1225 debug!(duration =? elapsed, "dropping result for cancelled peek {peek_uuid}")
1226 }
1227 }
1228 match activator.activate() {
1229 Ok(()) => {}
1230 Err(_) => {
1231 debug!("unable to wake timely after completed peek {peek_uuid}");
1232 }
1233 }
1234 });
1235 PendingPeek::Persist(PersistPeek {
1236 peek,
1237 _abort_handle: task_handle.abort_on_drop(),
1238 result: result_rx,
1239 span: tracing::Span::current(),
1240 })
1241 }
1242
1243 fn span(&self) -> &tracing::Span {
1244 match self {
1245 PendingPeek::Index(p) => &p.span,
1246 PendingPeek::Persist(p) => &p.span,
1247 PendingPeek::Stash(p) => &p.span,
1248 }
1249 }
1250
1251 pub(crate) fn peek(&self) -> &Peek {
1252 match self {
1253 PendingPeek::Index(p) => &p.peek,
1254 PendingPeek::Persist(p) => &p.peek,
1255 PendingPeek::Stash(p) => &p.peek,
1256 }
1257 }
1258}
1259
1260pub struct PersistPeek {
1265 pub(crate) peek: Peek,
1266 _abort_handle: AbortOnDropHandle<()>,
1269 result: oneshot::Receiver<(PeekResponse, Duration)>,
1271 span: tracing::Span,
1273}
1274
1275impl PersistPeek {
1276 async fn do_peek(
1277 persist_clients: &PersistClientCache,
1278 metadata: CollectionMetadata,
1279 as_of: Timestamp,
1280 literal_constraint: Option<Row>,
1281 mfp_plan: SafeMfpPlan,
1282 max_result_size: usize,
1283 mut limit_remaining: usize,
1284 ) -> Result<Vec<(Row, NonZeroUsize)>, String> {
1285 let client = persist_clients
1286 .open(metadata.persist_location)
1287 .await
1288 .map_err(|e| e.to_string())?;
1289
1290 let mut reader: ReadHandle<SourceData, (), Timestamp, StorageDiff> = client
1291 .open_leased_reader(
1292 metadata.data_shard,
1293 Arc::new(metadata.relation_desc.clone()),
1294 Arc::new(UnitSchema),
1295 Diagnostics::from_purpose("persist::peek"),
1296 USE_CRITICAL_SINCE_SNAPSHOT.get(client.dyncfgs()),
1297 )
1298 .await
1299 .map_err(|e| e.to_string())?;
1300
1301 let mut txns_read = if let Some(txns_id) = metadata.txns_shard {
1308 Some(TxnsCache::open(&client, txns_id, Some(metadata.data_shard)).await)
1309 } else {
1310 None
1311 };
1312
1313 let metrics = client.metrics();
1314
1315 let mut cursor = StatsCursor::new(
1316 &mut reader,
1317 txns_read.as_mut(),
1318 metrics,
1319 &mfp_plan,
1320 &metadata.relation_desc,
1321 Antichain::from_elem(as_of),
1322 )
1323 .await
1324 .map_err(|since| {
1325 format!("attempted to peek at {as_of}, but the since has advanced to {since:?}")
1326 })?;
1327
1328 let mut result = vec![];
1330 let mut datum_vec = DatumVec::new();
1331 let mut row_builder = Row::default();
1332 let arena = RowArena::new();
1333 let mut total_size = 0usize;
1334
1335 let literal_len = match &literal_constraint {
1336 None => 0,
1337 Some(row) => row.iter().count(),
1338 };
1339
1340 'collect: while limit_remaining > 0 {
1341 let Some(batch) = cursor.next().await else {
1342 break;
1343 };
1344 for (data, _, d) in batch {
1345 let row = data.map_err(|e| e.to_string())?;
1346
1347 if let Some(literal) = &literal_constraint {
1348 match row.iter().take(literal_len).cmp(literal.iter()) {
1349 Ordering::Less => continue,
1350 Ordering::Equal => {}
1351 Ordering::Greater => break 'collect,
1352 }
1353 }
1354
1355 let count: usize = d.try_into().map_err(|_| {
1356 error!(
1357 shard = %metadata.data_shard, diff = d, ?row,
1358 "persist peek encountered negative multiplicities",
1359 );
1360 format!(
1361 "Invalid data in source, \
1362 saw retractions ({}) for row that does not exist: {:?}",
1363 -d, row,
1364 )
1365 })?;
1366 let Some(count) = NonZeroUsize::new(count) else {
1367 continue;
1368 };
1369 let mut datum_local = datum_vec.borrow_with(&row);
1370 let eval_result = mfp_plan
1371 .evaluate_into(&mut datum_local, &arena, &mut row_builder)
1372 .map(|row| row.cloned())
1373 .map_err(|e| e.to_string())?;
1374 if let Some(row) = eval_result {
1375 total_size = total_size
1376 .saturating_add(row.byte_len())
1377 .saturating_add(std::mem::size_of::<NonZeroUsize>());
1378 if total_size > max_result_size {
1379 return Err(format!(
1380 "result exceeds max size of {}",
1381 ByteSize::b(u64::cast_from(max_result_size))
1382 ));
1383 }
1384 result.push((row, count));
1385 limit_remaining = limit_remaining.saturating_sub(count.get());
1386 if limit_remaining == 0 {
1387 break;
1388 }
1389 }
1390 }
1391 }
1392
1393 Ok(result)
1394 }
1395}
1396
1397pub struct IndexPeek {
1399 peek: Peek,
1400 trace_bundle: TraceBundle,
1402 span: tracing::Span,
1404}
1405
1406pub(crate) struct IndexPeekMetrics<'a> {
1411 pub seek_fulfillment_seconds: &'a prometheus::Histogram,
1412 pub frontier_check_seconds: &'a prometheus::Histogram,
1413 pub error_scan_seconds: &'a prometheus::Histogram,
1414 pub cursor_setup_seconds: &'a prometheus::Histogram,
1415 pub row_iteration_seconds: &'a prometheus::Histogram,
1416 pub result_sort_seconds: &'a prometheus::Histogram,
1417 pub row_collection_seconds: &'a prometheus::Histogram,
1418}
1419
1420impl IndexPeek {
1421 fn seek_fulfillment(
1434 &mut self,
1435 upper: &mut Antichain<Timestamp>,
1436 max_result_size: u64,
1437 peek_stash_eligible: bool,
1438 peek_stash_threshold_bytes: usize,
1439 metrics: &IndexPeekMetrics<'_>,
1440 ) -> PeekStatus {
1441 let method_start = Instant::now();
1442
1443 self.trace_bundle.oks_mut().read_upper(upper);
1444 if upper.less_equal(&self.peek.timestamp) {
1445 return PeekStatus::NotReady;
1446 }
1447 self.trace_bundle.errs_mut().read_upper(upper);
1448 if upper.less_equal(&self.peek.timestamp) {
1449 return PeekStatus::NotReady;
1450 }
1451
1452 let read_frontier = self.trace_bundle.compaction_frontier();
1453 if !read_frontier.less_equal(&self.peek.timestamp) {
1454 let error = format!(
1455 "Arrangement compaction frontier ({:?}) is beyond the time of the attempted read ({})",
1456 read_frontier.elements(),
1457 self.peek.timestamp,
1458 );
1459 return PeekStatus::Ready(PeekResponse::Error(error));
1460 }
1461
1462 metrics
1463 .frontier_check_seconds
1464 .observe(method_start.elapsed().as_secs_f64());
1465
1466 let result = self.collect_finished_data(
1467 max_result_size,
1468 peek_stash_eligible,
1469 peek_stash_threshold_bytes,
1470 metrics,
1471 );
1472
1473 metrics
1474 .seek_fulfillment_seconds
1475 .observe(method_start.elapsed().as_secs_f64());
1476
1477 result
1478 }
1479
1480 fn collect_finished_data(
1482 &mut self,
1483 max_result_size: u64,
1484 peek_stash_eligible: bool,
1485 peek_stash_threshold_bytes: usize,
1486 metrics: &IndexPeekMetrics<'_>,
1487 ) -> PeekStatus {
1488 let error_scan_start = Instant::now();
1489
1490 let (mut cursor, storage) = self.trace_bundle.errs_mut().cursor();
1493 while cursor.key_valid(&storage) {
1494 let mut copies = Diff::ZERO;
1495 cursor.map_times(&storage, |time, diff| {
1496 if time.less_equal(&self.peek.timestamp) {
1497 copies += diff;
1498 }
1499 });
1500 if copies.is_negative() {
1501 let error = cursor.key(&storage);
1502 error!(
1503 target = %self.peek.target.id(), diff = %copies, %error,
1504 "index peek encountered negative multiplicities in error trace",
1505 );
1506 return PeekStatus::Ready(PeekResponse::Error(format!(
1507 "Invalid data in source errors, \
1508 saw retractions ({}) for row that does not exist: {}",
1509 -copies, error,
1510 )));
1511 }
1512 if copies.is_positive() {
1513 return PeekStatus::Ready(PeekResponse::Error(cursor.key(&storage).to_string()));
1514 }
1515 cursor.step_key(&storage);
1516 }
1517
1518 metrics
1519 .error_scan_seconds
1520 .observe(error_scan_start.elapsed().as_secs_f64());
1521
1522 Self::collect_ok_finished_data(
1523 &self.peek,
1524 self.trace_bundle.oks_mut(),
1525 max_result_size,
1526 peek_stash_eligible,
1527 peek_stash_threshold_bytes,
1528 metrics,
1529 )
1530 }
1531
1532 fn collect_ok_finished_data<Tr>(
1534 peek: &Peek,
1535 oks_handle: &mut Tr,
1536 max_result_size: u64,
1537 peek_stash_eligible: bool,
1538 peek_stash_threshold_bytes: usize,
1539 metrics: &IndexPeekMetrics<'_>,
1540 ) -> PeekStatus
1541 where
1542 for<'a> Tr: TraceReader<
1543 Key<'a>: ToDatumIter + Eq,
1544 KeyContainer: BatchContainer<Owned = Row>,
1545 Val<'a>: ToDatumIter,
1546 TimeGat<'a>: PartialOrder<Timestamp>,
1547 DiffGat<'a> = &'a Diff,
1548 >,
1549 {
1550 let max_result_size = usize::cast_from(max_result_size);
1551 let count_byte_size = size_of::<NonZeroUsize>();
1552
1553 let cursor_setup_start = Instant::now();
1555
1556 let mut peek_iterator = peek_result_iterator::PeekResultIterator::new(
1559 peek.target.id().clone(),
1560 peek.map_filter_project.clone(),
1561 peek.timestamp,
1562 peek.literal_constraints.clone().as_deref_mut(),
1563 oks_handle,
1564 );
1565
1566 metrics
1567 .cursor_setup_seconds
1568 .observe(cursor_setup_start.elapsed().as_secs_f64());
1569
1570 let mut results = Vec::new();
1572 let mut total_size: usize = 0;
1573
1574 let max_results = peek.finishing.num_rows_needed();
1580
1581 let comparator = RowComparator::new(peek.finishing.order_by.as_slice());
1582
1583 let row_iteration_start = Instant::now();
1585 let mut sort_time_accum = Duration::ZERO;
1586
1587 while let Some(row) = peek_iterator.next() {
1588 let row: (Row, _) = match row {
1589 Ok(row) => row,
1590 Err(err) => return PeekStatus::Ready(PeekResponse::Error(err)),
1591 };
1592 let (row, copies) = row;
1593 let copies: NonZeroUsize = NonZeroUsize::try_from(copies).expect("fits into usize");
1594
1595 total_size = total_size
1596 .saturating_add(row.byte_len())
1597 .saturating_add(count_byte_size);
1598 if peek_stash_eligible && total_size > peek_stash_threshold_bytes {
1599 return PeekStatus::UsePeekStash;
1600 }
1601 if total_size > max_result_size {
1602 return PeekStatus::Ready(PeekResponse::Error(format!(
1603 "result exceeds max size of {}",
1604 ByteSize::b(u64::cast_from(max_result_size))
1605 )));
1606 }
1607
1608 results.push((row, copies));
1609
1610 if let Some(max_results) = max_results {
1613 if results.len() >= 2 * max_results {
1617 if peek.finishing.order_by.is_empty() {
1618 results.truncate(max_results);
1619 metrics
1620 .row_iteration_seconds
1621 .observe(row_iteration_start.elapsed().as_secs_f64());
1622 metrics
1623 .result_sort_seconds
1624 .observe(sort_time_accum.as_secs_f64());
1625 let row_collection_start = Instant::now();
1626 let collection = RowCollection::new(results, &peek.finishing.order_by);
1627 metrics
1628 .row_collection_seconds
1629 .observe(row_collection_start.elapsed().as_secs_f64());
1630 return PeekStatus::Ready(PeekResponse::Rows(vec![collection]));
1631 } else {
1632 let sort_start = Instant::now();
1641 results.sort_by(|left, right| {
1642 comparator.compare_rows(&left.0, &right.0, || left.0.cmp(&right.0))
1643 });
1644 sort_time_accum += sort_start.elapsed();
1645 let dropped = results.drain(max_results..);
1646 let dropped_size =
1647 dropped
1648 .into_iter()
1649 .fold(0, |acc: usize, (row, _count): (Row, _)| {
1650 acc.saturating_add(
1651 row.byte_len().saturating_add(count_byte_size),
1652 )
1653 });
1654 total_size = total_size.saturating_sub(dropped_size);
1655 }
1656 }
1657 }
1658 }
1659
1660 metrics
1661 .row_iteration_seconds
1662 .observe(row_iteration_start.elapsed().as_secs_f64());
1663 metrics
1664 .result_sort_seconds
1665 .observe(sort_time_accum.as_secs_f64());
1666
1667 let row_collection_start = Instant::now();
1668 let collection = RowCollection::new(results, &peek.finishing.order_by);
1669 metrics
1670 .row_collection_seconds
1671 .observe(row_collection_start.elapsed().as_secs_f64());
1672 PeekStatus::Ready(PeekResponse::Rows(vec![collection]))
1673 }
1674}
1675
1676enum PeekStatus {
1679 NotReady,
1682 UsePeekStash,
1685 Ready(PeekResponse),
1687}
1688
1689#[derive(Debug)]
1691struct ReportedFrontiers {
1692 write_frontier: ReportedFrontier,
1694 input_frontier: ReportedFrontier,
1696 output_frontier: ReportedFrontier,
1698}
1699
1700impl ReportedFrontiers {
1701 fn new() -> Self {
1703 Self {
1704 write_frontier: ReportedFrontier::new(),
1705 input_frontier: ReportedFrontier::new(),
1706 output_frontier: ReportedFrontier::new(),
1707 }
1708 }
1709}
1710
1711#[derive(Clone, Debug)]
1713pub enum ReportedFrontier {
1714 Reported(Antichain<Timestamp>),
1716 NotReported {
1718 lower: Antichain<Timestamp>,
1720 },
1721}
1722
1723impl ReportedFrontier {
1724 pub fn new() -> Self {
1726 let lower = Antichain::from_elem(timely::progress::Timestamp::minimum());
1727 Self::NotReported { lower }
1728 }
1729
1730 pub fn is_empty(&self) -> bool {
1732 match self {
1733 Self::Reported(frontier) => frontier.is_empty(),
1734 Self::NotReported { .. } => false,
1735 }
1736 }
1737
1738 fn allows_reporting(&self, other: &Antichain<Timestamp>) -> bool {
1744 match self {
1745 Self::Reported(frontier) => PartialOrder::less_than(frontier, other),
1746 Self::NotReported { lower } => PartialOrder::less_equal(lower, other),
1747 }
1748 }
1749}
1750
1751pub struct CollectionState {
1753 reported_frontiers: ReportedFrontiers,
1755 dataflow_index: Rc<usize>,
1761 pub is_subscribe_or_copy: bool,
1767 as_of: Antichain<Timestamp>,
1771
1772 pub sink_token: Option<SinkToken>,
1777 pub sink_write_frontier: Option<Rc<RefCell<Antichain<Timestamp>>>>,
1781 pub input_probes: BTreeMap<GlobalId, probe::Handle<Timestamp>>,
1783 pub compute_probe: Option<probe::Handle<Timestamp>>,
1788 logging: Option<CollectionLogging>,
1790 metrics: CollectionMetrics,
1792 read_only_tx: watch::Sender<bool>,
1804 pub read_only_rx: watch::Receiver<bool>,
1806}
1807
1808impl CollectionState {
1809 fn new(
1810 dataflow_index: Rc<usize>,
1811 is_subscribe_or_copy: bool,
1812 as_of: Antichain<Timestamp>,
1813 metrics: CollectionMetrics,
1814 ) -> Self {
1815 let (read_only_tx, read_only_rx) = watch::channel(true);
1818
1819 Self {
1820 reported_frontiers: ReportedFrontiers::new(),
1821 dataflow_index,
1822 is_subscribe_or_copy,
1823 as_of,
1824 sink_token: None,
1825 sink_write_frontier: None,
1826 input_probes: Default::default(),
1827 compute_probe: None,
1828 logging: None,
1829 metrics,
1830 read_only_tx,
1831 read_only_rx,
1832 }
1833 }
1834
1835 fn reported_frontiers(&self) -> &ReportedFrontiers {
1837 &self.reported_frontiers
1838 }
1839
1840 pub fn reset_reported_frontiers(&mut self, frontier: ReportedFrontier) {
1842 self.reported_frontiers.write_frontier = frontier.clone();
1843 self.reported_frontiers.input_frontier = frontier.clone();
1844 self.reported_frontiers.output_frontier = frontier;
1845 }
1846
1847 fn set_reported_write_frontier(&mut self, frontier: ReportedFrontier) {
1849 if let Some(logging) = &mut self.logging {
1850 let time = match &frontier {
1851 ReportedFrontier::Reported(frontier) => frontier.get(0).copied(),
1852 ReportedFrontier::NotReported { .. } => Some(Timestamp::MIN),
1853 };
1854 logging.set_frontier(time);
1855 }
1856
1857 self.reported_frontiers.write_frontier = frontier;
1858 }
1859
1860 fn set_reported_input_frontier(&mut self, frontier: ReportedFrontier) {
1862 if let Some(logging) = &mut self.logging {
1864 for (id, probe) in &self.input_probes {
1865 let new_time = probe.with_frontier(|frontier| frontier.as_option().copied());
1866 logging.set_import_frontier(*id, new_time);
1867 }
1868 }
1869
1870 self.reported_frontiers.input_frontier = frontier;
1871 }
1872
1873 fn set_reported_output_frontier(&mut self, frontier: ReportedFrontier) {
1875 let already_hydrated = self.hydrated();
1876
1877 self.reported_frontiers.output_frontier = frontier;
1878
1879 if !already_hydrated && self.hydrated() {
1880 if let Some(logging) = &mut self.logging {
1881 logging.set_hydrated();
1882 }
1883 self.metrics.record_collection_hydrated();
1884 }
1885 }
1886
1887 fn hydrated(&self) -> bool {
1889 match &self.reported_frontiers.output_frontier {
1890 ReportedFrontier::Reported(frontier) => PartialOrder::less_than(&self.as_of, frontier),
1891 ReportedFrontier::NotReported { .. } => false,
1892 }
1893 }
1894
1895 fn allow_writes(&self) {
1897 info!(
1898 dataflow_index = *self.dataflow_index,
1899 export = ?self.logging.as_ref().map(|l| l.export_id()),
1900 "allowing writes for dataflow",
1901 );
1902 let _ = self.read_only_tx.send(false);
1903 }
1904}