1use std::any::Any;
9use std::cell::RefCell;
10use std::cmp::Ordering;
11use std::collections::{BTreeMap, BTreeSet};
12use std::num::NonZeroUsize;
13use std::rc::Rc;
14use std::sync::Arc;
15use std::time::{Duration, Instant};
16
17use bytesize::ByteSize;
18use differential_dataflow::Hashable;
19use differential_dataflow::lattice::Lattice;
20use differential_dataflow::trace::{Cursor, TraceReader};
21use mz_compute_client::logging::LoggingConfig;
22use mz_compute_client::protocol::command::{
23 ComputeCommand, ComputeParameters, InstanceConfig, Peek, PeekTarget,
24};
25use mz_compute_client::protocol::history::ComputeCommandHistory;
26use mz_compute_client::protocol::response::{
27 ComputeResponse, CopyToResponse, FrontiersResponse, PeekResponse, SubscribeResponse,
28};
29use mz_compute_types::dataflows::DataflowDescription;
30use mz_compute_types::dyncfgs::{
31 ENABLE_PEEK_RESPONSE_STASH, PEEK_RESPONSE_STASH_BATCH_MAX_RUNS,
32 PEEK_RESPONSE_STASH_THRESHOLD_BYTES, PEEK_STASH_BATCH_SIZE, PEEK_STASH_NUM_BATCHES,
33};
34use mz_compute_types::plan::render_plan::RenderPlan;
35use mz_dyncfg::ConfigSet;
36use mz_expr::SafeMfpPlan;
37use mz_expr::row::RowCollection;
38use mz_ore::cast::CastFrom;
39use mz_ore::collections::CollectionExt;
40use mz_ore::metrics::UIntGauge;
41use mz_ore::now::EpochMillis;
42use mz_ore::soft_panic_or_log;
43use mz_ore::task::AbortOnDropHandle;
44use mz_ore::tracing::{OpenTelemetryContext, TracingHandle};
45use mz_persist_client::Diagnostics;
46use mz_persist_client::cache::PersistClientCache;
47use mz_persist_client::cfg::USE_CRITICAL_SINCE_SNAPSHOT;
48use mz_persist_client::read::ReadHandle;
49use mz_persist_types::PersistLocation;
50use mz_persist_types::codec_impls::UnitSchema;
51use mz_repr::fixed_length::ToDatumIter;
52use mz_repr::{DatumVec, Diff, GlobalId, Row, RowArena, Timestamp};
53use mz_storage_operators::stats::StatsCursor;
54use mz_storage_types::StorageDiff;
55use mz_storage_types::controller::CollectionMetadata;
56use mz_storage_types::dyncfgs::ORE_OVERFLOWING_BEHAVIOR;
57use mz_storage_types::sources::SourceData;
58use mz_storage_types::time_dependence::TimeDependence;
59use mz_txn_wal::operator::TxnsContext;
60use mz_txn_wal::txn_cache::TxnsCache;
61use timely::communication::Allocate;
62use timely::dataflow::operators::probe;
63use timely::order::PartialOrder;
64use timely::progress::frontier::Antichain;
65use timely::scheduling::Scheduler;
66use timely::worker::Worker as TimelyWorker;
67use tokio::sync::{oneshot, watch};
68use tracing::{Level, debug, error, info, span, trace, warn};
69use uuid::Uuid;
70
71use crate::arrangement::manager::{TraceBundle, TraceManager};
72use crate::logging;
73use crate::logging::compute::{CollectionLogging, ComputeEvent, PeekEvent};
74use crate::logging::initialize::LoggingTraces;
75use crate::metrics::{CollectionMetrics, WorkerMetrics};
76use crate::render::{LinearJoinSpec, StartSignal};
77use crate::server::{ComputeInstanceContext, ResponseSender};
78
79mod peek_result_iterator;
80mod peek_stash;
81
82pub struct ComputeState {
87 pub collections: BTreeMap<GlobalId, CollectionState>,
96 pub traces: TraceManager,
98 pub subscribe_response_buffer: Rc<RefCell<Vec<(GlobalId, SubscribeResponse)>>>,
103 pub copy_to_response_buffer: Rc<RefCell<Vec<(GlobalId, CopyToResponse)>>>,
108 pub pending_peeks: BTreeMap<Uuid, PendingPeek>,
110 pub peek_stash_persist_location: Option<PersistLocation>,
112 pub compute_logger: Option<logging::compute::Logger>,
114 pub persist_clients: Arc<PersistClientCache>,
117 pub txns_ctx: TxnsContext,
119 pub command_history: ComputeCommandHistory<UIntGauge>,
121 max_result_size: u64,
123 pub linear_join_spec: LinearJoinSpec,
125 pub metrics: WorkerMetrics,
127 tracing_handle: Arc<TracingHandle>,
129 pub context: ComputeInstanceContext,
131 pub worker_config: Rc<ConfigSet>,
145
146 suspended_collections: BTreeMap<GlobalId, Rc<dyn Any>>,
152
153 pub server_maintenance_interval: Duration,
156
157 pub init_system_time: EpochMillis,
161
162 pub replica_expiration: Antichain<Timestamp>,
166}
167
168impl ComputeState {
169 pub fn new(
171 persist_clients: Arc<PersistClientCache>,
172 txns_ctx: TxnsContext,
173 metrics: WorkerMetrics,
174 tracing_handle: Arc<TracingHandle>,
175 context: ComputeInstanceContext,
176 ) -> Self {
177 let traces = TraceManager::new(metrics.clone());
178 let command_history = ComputeCommandHistory::new(metrics.for_history());
179
180 Self {
181 collections: Default::default(),
182 traces,
183 subscribe_response_buffer: Default::default(),
184 copy_to_response_buffer: Default::default(),
185 pending_peeks: Default::default(),
186 peek_stash_persist_location: None,
187 compute_logger: None,
188 persist_clients,
189 txns_ctx,
190 command_history,
191 max_result_size: u64::MAX,
192 linear_join_spec: Default::default(),
193 metrics,
194 tracing_handle,
195 context,
196 worker_config: mz_dyncfgs::all_dyncfgs().into(),
197 suspended_collections: Default::default(),
198 server_maintenance_interval: Duration::ZERO,
199 init_system_time: mz_ore::now::SYSTEM_TIME(),
200 replica_expiration: Antichain::default(),
201 }
202 }
203
204 pub fn expect_collection_mut(&mut self, id: GlobalId) -> &mut CollectionState {
208 self.collections
209 .get_mut(&id)
210 .expect("collection must exist")
211 }
212
213 pub fn input_probe_for(
219 &mut self,
220 input_id: GlobalId,
221 collection_ids: impl Iterator<Item = GlobalId>,
222 ) -> probe::Handle<Timestamp> {
223 let probe = probe::Handle::default();
224 for id in collection_ids {
225 if let Some(collection) = self.collections.get_mut(&id) {
226 collection.input_probes.insert(input_id, probe.clone());
227 }
228 }
229 probe
230 }
231
232 fn apply_worker_config(&mut self) {
234 use mz_compute_types::dyncfgs::*;
235
236 let config = &self.worker_config;
237
238 self.linear_join_spec = LinearJoinSpec::from_config(config);
239
240 if ENABLE_LGALLOC.get(config) {
241 if let Some(path) = &self.context.scratch_directory {
242 let clear_bytes = LGALLOC_SLOW_CLEAR_BYTES.get(config);
243 let eager_return = ENABLE_LGALLOC_EAGER_RECLAMATION.get(config);
244 let file_growth_dampener = LGALLOC_FILE_GROWTH_DAMPENER.get(config);
245 let interval = LGALLOC_BACKGROUND_INTERVAL.get(config);
246 let local_buffer_bytes = LGALLOC_LOCAL_BUFFER_BYTES.get(config);
247 info!(
248 ?path,
249 backgrund_interval=?interval,
250 clear_bytes,
251 eager_return,
252 file_growth_dampener,
253 local_buffer_bytes,
254 "enabling lgalloc"
255 );
256 let background_worker_config = lgalloc::BackgroundWorkerConfig {
257 interval,
258 clear_bytes,
259 };
260 lgalloc::lgalloc_set_config(
261 lgalloc::LgAlloc::new()
262 .enable()
263 .with_path(path.clone())
264 .with_background_config(background_worker_config)
265 .eager_return(eager_return)
266 .file_growth_dampener(file_growth_dampener)
267 .local_buffer_bytes(local_buffer_bytes),
268 );
269 } else {
270 debug!("not enabling lgalloc, scratch directory not specified");
271 }
272 } else {
273 info!("disabling lgalloc");
274 lgalloc::lgalloc_set_config(lgalloc::LgAlloc::new().disable());
275 }
276
277 crate::memory_limiter::apply_limiter_config(config);
278
279 mz_ore::region::ENABLE_LGALLOC_REGION.store(
280 ENABLE_COLUMNATION_LGALLOC.get(config),
281 std::sync::atomic::Ordering::Relaxed,
282 );
283
284 let enable_columnar_lgalloc = ENABLE_COLUMNAR_LGALLOC.get(config);
285 mz_timely_util::containers::set_enable_columnar_lgalloc(enable_columnar_lgalloc);
286
287 self.server_maintenance_interval = COMPUTE_SERVER_MAINTENANCE_INTERVAL.get(config);
290
291 let overflowing_behavior = ORE_OVERFLOWING_BEHAVIOR.get(config);
292 match overflowing_behavior.parse() {
293 Ok(behavior) => mz_ore::overflowing::set_behavior(behavior),
294 Err(err) => {
295 error!(
296 err,
297 overflowing_behavior, "Invalid value for ore_overflowing_behavior"
298 );
299 }
300 }
301 }
302
303 pub fn apply_expiration_offset(&mut self, offset: Duration) {
309 if self.replica_expiration.is_empty() {
310 let offset: EpochMillis = offset
311 .as_millis()
312 .try_into()
313 .expect("duration must fit within u64");
314 let replica_expiration_millis = self.init_system_time + offset;
315 let replica_expiration = Timestamp::from(replica_expiration_millis);
316
317 info!(
318 offset = %offset,
319 replica_expiration_millis = %replica_expiration_millis,
320 replica_expiration_utc = %mz_ore::now::to_datetime(replica_expiration_millis),
321 "setting replica expiration",
322 );
323 self.replica_expiration = Antichain::from_elem(replica_expiration);
324
325 self.metrics
327 .replica_expiration_timestamp_seconds
328 .set(replica_expiration.into());
329 }
330 }
331
332 pub fn dataflow_max_inflight_bytes(&self) -> Option<usize> {
335 use mz_compute_types::dyncfgs::{
336 DATAFLOW_MAX_INFLIGHT_BYTES, DATAFLOW_MAX_INFLIGHT_BYTES_CC,
337 };
338
339 if self.persist_clients.cfg.is_cc_active {
340 DATAFLOW_MAX_INFLIGHT_BYTES_CC.get(&self.worker_config)
341 } else {
342 DATAFLOW_MAX_INFLIGHT_BYTES.get(&self.worker_config)
343 }
344 }
345}
346
347pub(crate) struct ActiveComputeState<'a, A: Allocate> {
349 pub timely_worker: &'a mut TimelyWorker<A>,
351 pub compute_state: &'a mut ComputeState,
353 pub response_tx: &'a mut ResponseSender,
355}
356
357pub struct SinkToken(#[allow(dead_code)] Box<dyn Any>);
359
360impl SinkToken {
361 pub fn new(t: Box<dyn Any>) -> Self {
363 Self(t)
364 }
365}
366
367impl<'a, A: Allocate + 'static> ActiveComputeState<'a, A> {
368 #[mz_ore::instrument(level = "debug")]
370 pub fn handle_compute_command(&mut self, cmd: ComputeCommand) {
371 use ComputeCommand::*;
372
373 self.compute_state.command_history.push(cmd.clone());
374
375 let timer = self
377 .compute_state
378 .metrics
379 .handle_command_duration_seconds
380 .for_command(&cmd)
381 .start_timer();
382
383 match cmd {
384 Hello { .. } => panic!("Hello must be captured before"),
385 CreateInstance(instance_config) => self.handle_create_instance(*instance_config),
386 InitializationComplete => (),
387 UpdateConfiguration(params) => self.handle_update_configuration(*params),
388 CreateDataflow(dataflow) => self.handle_create_dataflow(*dataflow),
389 Schedule(id) => self.handle_schedule(id),
390 AllowCompaction { id, frontier } => self.handle_allow_compaction(id, frontier),
391 Peek(peek) => {
392 peek.otel_ctx.attach_as_parent();
393 self.handle_peek(*peek)
394 }
395 CancelPeek { uuid } => self.handle_cancel_peek(uuid),
396 AllowWrites(id) => {
397 self.handle_allow_writes(id);
398 }
399 }
400
401 timer.observe_duration();
402 }
403
404 fn handle_create_instance(&mut self, config: InstanceConfig) {
405 self.compute_state.apply_worker_config();
407 if let Some(offset) = config.expiration_offset {
408 self.compute_state.apply_expiration_offset(offset);
409 }
410
411 self.initialize_logging(config.logging);
412
413 self.compute_state.peek_stash_persist_location = Some(config.peek_stash_persist_location);
414 }
415
416 fn handle_update_configuration(&mut self, params: ComputeParameters) {
417 debug!("Applying configuration update: {params:?}");
418
419 let ComputeParameters {
420 workload_class,
421 max_result_size,
422 tracing,
423 grpc_client: _grpc_client,
424 dyncfg_updates,
425 } = params;
426
427 if let Some(v) = workload_class {
428 self.compute_state.metrics.set_workload_class(v);
429 }
430 if let Some(v) = max_result_size {
431 self.compute_state.max_result_size = v;
432 }
433
434 tracing.apply(self.compute_state.tracing_handle.as_ref());
435
436 dyncfg_updates.apply(&self.compute_state.worker_config);
437 self.compute_state
438 .persist_clients
439 .cfg()
440 .apply_from(&dyncfg_updates);
441
442 mz_metrics::update_dyncfg(&dyncfg_updates);
446
447 self.compute_state.apply_worker_config();
448 }
449
450 fn handle_create_dataflow(
451 &mut self,
452 dataflow: DataflowDescription<RenderPlan, CollectionMetadata>,
453 ) {
454 let dataflow_index = Rc::new(self.timely_worker.next_dataflow_index());
455 let as_of = dataflow.as_of.clone().unwrap();
456
457 let dataflow_expiration = dataflow
458 .time_dependence
459 .as_ref()
460 .map(|time_dependence| {
461 self.determine_dataflow_expiration(time_dependence, &dataflow.until)
462 })
463 .unwrap_or_default();
464
465 let until = dataflow.until.meet(&dataflow_expiration);
467
468 if dataflow.is_transient() {
469 debug!(
470 name = %dataflow.debug_name,
471 import_ids = %dataflow.display_import_ids(),
472 export_ids = %dataflow.display_export_ids(),
473 as_of = ?as_of.elements(),
474 time_dependence = ?dataflow.time_dependence,
475 expiration = ?dataflow_expiration.elements(),
476 expiration_datetime = ?dataflow_expiration
477 .as_option()
478 .map(|t| mz_ore::now::to_datetime(t.into())),
479 plan_until = ?dataflow.until.elements(),
480 until = ?until.elements(),
481 "creating dataflow",
482 );
483 } else {
484 info!(
485 name = %dataflow.debug_name,
486 import_ids = %dataflow.display_import_ids(),
487 export_ids = %dataflow.display_export_ids(),
488 as_of = ?as_of.elements(),
489 time_dependence = ?dataflow.time_dependence,
490 expiration = ?dataflow_expiration.elements(),
491 expiration_datetime = ?dataflow_expiration
492 .as_option()
493 .map(|t| mz_ore::now::to_datetime(t.into())),
494 plan_until = ?dataflow.until.elements(),
495 until = ?until.elements(),
496 "creating dataflow",
497 );
498 };
499
500 let subscribe_copy_ids: BTreeSet<_> = dataflow
501 .subscribe_ids()
502 .chain(dataflow.copy_to_ids())
503 .collect();
504
505 for object_id in dataflow.export_ids() {
507 let is_subscribe_or_copy = subscribe_copy_ids.contains(&object_id);
508 let metrics = self.compute_state.metrics.for_collection(object_id);
509 let mut collection = CollectionState::new(
510 Rc::clone(&dataflow_index),
511 is_subscribe_or_copy,
512 as_of.clone(),
513 metrics,
514 );
515
516 if let Some(logger) = self.compute_state.compute_logger.clone() {
517 let logging = CollectionLogging::new(
518 object_id,
519 logger,
520 *dataflow_index,
521 dataflow.import_ids(),
522 );
523 collection.logging = Some(logging);
524 }
525
526 collection.reset_reported_frontiers(ReportedFrontier::NotReported {
527 lower: as_of.clone(),
528 });
529
530 let existing = self.compute_state.collections.insert(object_id, collection);
531 if existing.is_some() {
532 error!(
533 id = ?object_id,
534 "existing collection for newly created dataflow",
535 );
536 }
537 }
538
539 let (start_signal, suspension_token) = StartSignal::new();
540 for id in dataflow.export_ids() {
541 self.compute_state
542 .suspended_collections
543 .insert(id, Rc::clone(&suspension_token));
544 }
545
546 crate::render::build_compute_dataflow(
547 self.timely_worker,
548 self.compute_state,
549 dataflow,
550 start_signal,
551 until,
552 dataflow_expiration,
553 );
554 }
555
556 fn handle_schedule(&mut self, id: GlobalId) {
557 let suspension_token = self.compute_state.suspended_collections.remove(&id);
563 drop(suspension_token);
564 }
565
566 fn handle_allow_compaction(&mut self, id: GlobalId, frontier: Antichain<Timestamp>) {
567 if frontier.is_empty() {
568 self.drop_collection(id);
570 } else {
571 self.compute_state
572 .traces
573 .allow_compaction(id, frontier.borrow());
574 }
575 }
576
577 #[mz_ore::instrument(level = "debug")]
578 fn handle_peek(&mut self, peek: Peek) {
579 let pending = match &peek.target {
580 PeekTarget::Index { id } => {
581 let trace_bundle = self.compute_state.traces.get(id).unwrap().clone();
583 PendingPeek::index(peek, trace_bundle)
584 }
585 PeekTarget::Persist { metadata, .. } => {
586 let metadata = metadata.clone();
587 PendingPeek::persist(
588 peek,
589 Arc::clone(&self.compute_state.persist_clients),
590 metadata,
591 usize::cast_from(self.compute_state.max_result_size),
592 self.timely_worker,
593 )
594 }
595 };
596
597 if let Some(logger) = self.compute_state.compute_logger.as_mut() {
599 logger.log(&pending.as_log_event(true));
600 }
601
602 self.process_peek(&mut Antichain::new(), pending);
603 }
604
605 fn handle_cancel_peek(&mut self, uuid: Uuid) {
606 if let Some(peek) = self.compute_state.pending_peeks.remove(&uuid) {
607 self.send_peek_response(peek, PeekResponse::Canceled);
608 }
609 }
610
611 fn handle_allow_writes(&mut self, id: GlobalId) {
612 self.compute_state.persist_clients.cfg().enable_compaction();
616
617 if let Some(collection) = self.compute_state.collections.get_mut(&id) {
618 collection.allow_writes();
619 } else {
620 soft_panic_or_log!("allow writes for unknown collection {id}");
621 }
622 }
623
624 fn drop_collection(&mut self, id: GlobalId) {
626 let collection = self
627 .compute_state
628 .collections
629 .remove(&id)
630 .expect("dropped untracked collection");
631
632 self.compute_state.traces.remove(&id);
634 self.compute_state.suspended_collections.remove(&id);
636
637 if let Ok(index) = Rc::try_unwrap(collection.dataflow_index) {
639 self.timely_worker.drop_dataflow(index);
640 }
641
642 if !collection.is_subscribe_or_copy {
647 let reported = collection.reported_frontiers;
648 let write_frontier = (!reported.write_frontier.is_empty()).then(Antichain::new);
649 let input_frontier = (!reported.input_frontier.is_empty()).then(Antichain::new);
650 let output_frontier = (!reported.output_frontier.is_empty()).then(Antichain::new);
651
652 let frontiers = FrontiersResponse {
653 write_frontier,
654 input_frontier,
655 output_frontier,
656 };
657 if frontiers.has_updates() {
658 self.send_compute_response(ComputeResponse::Frontiers(id, frontiers));
659 }
660 }
661 }
662
663 pub fn initialize_logging(&mut self, config: LoggingConfig) {
665 if self.compute_state.compute_logger.is_some() {
666 panic!("dataflow server has already initialized logging");
667 }
668
669 let LoggingTraces {
670 traces,
671 dataflow_index,
672 compute_logger: logger,
673 } = logging::initialize(self.timely_worker, &config);
674
675 let dataflow_index = Rc::new(dataflow_index);
676 let mut log_index_ids = config.index_logs;
677 for (log, trace) in traces {
678 let id = log_index_ids
680 .remove(&log)
681 .expect("`logging::initialize` does not invent logs");
682 self.compute_state.traces.set(id, trace);
683
684 let is_subscribe_or_copy = false;
686 let as_of = Antichain::from_elem(Timestamp::MIN);
687 let metrics = self.compute_state.metrics.for_collection(id);
688 let mut collection = CollectionState::new(
689 Rc::clone(&dataflow_index),
690 is_subscribe_or_copy,
691 as_of,
692 metrics,
693 );
694
695 let logging =
696 CollectionLogging::new(id, logger.clone(), *dataflow_index, std::iter::empty());
697 collection.logging = Some(logging);
698
699 let existing = self.compute_state.collections.insert(id, collection);
700 if existing.is_some() {
701 error!(
702 id = ?id,
703 "existing collection for newly initialized logging export",
704 );
705 }
706 }
707
708 assert!(
710 log_index_ids.is_empty(),
711 "failed to create requested logging indexes: {log_index_ids:?}",
712 );
713
714 self.compute_state.compute_logger = Some(logger);
715 }
716
717 pub fn report_frontiers(&mut self) {
719 let mut responses = Vec::new();
720
721 let mut new_frontier = Antichain::new();
723
724 for (&id, collection) in self.compute_state.collections.iter_mut() {
725 if collection.is_subscribe_or_copy {
728 continue;
729 }
730
731 let reported = collection.reported_frontiers();
732
733 new_frontier.clear();
735 if let Some(traces) = self.compute_state.traces.get_mut(&id) {
736 assert!(
737 collection.sink_write_frontier.is_none(),
738 "collection {id} has multiple frontiers"
739 );
740 traces.oks_mut().read_upper(&mut new_frontier);
741 } else if let Some(frontier) = &collection.sink_write_frontier {
742 new_frontier.clone_from(&frontier.borrow());
743 } else {
744 error!(id = ?id, "collection without write frontier");
745 continue;
746 }
747 let new_write_frontier = reported
748 .write_frontier
749 .allows_reporting(&new_frontier)
750 .then(|| new_frontier.clone());
751
752 if let Some(probe) = &collection.compute_probe {
761 probe.with_frontier(|frontier| new_frontier.extend(frontier.iter().copied()));
762 }
763 let new_output_frontier = reported
764 .output_frontier
765 .allows_reporting(&new_frontier)
766 .then(|| new_frontier.clone());
767
768 new_frontier.clear();
770 for probe in collection.input_probes.values() {
771 probe.with_frontier(|frontier| new_frontier.extend(frontier.iter().copied()));
772 }
773 let new_input_frontier = reported
774 .input_frontier
775 .allows_reporting(&new_frontier)
776 .then(|| new_frontier.clone());
777
778 if let Some(frontier) = &new_write_frontier {
779 collection
780 .set_reported_write_frontier(ReportedFrontier::Reported(frontier.clone()));
781 }
782 if let Some(frontier) = &new_input_frontier {
783 collection
784 .set_reported_input_frontier(ReportedFrontier::Reported(frontier.clone()));
785 }
786 if let Some(frontier) = &new_output_frontier {
787 collection
788 .set_reported_output_frontier(ReportedFrontier::Reported(frontier.clone()));
789 }
790
791 let response = FrontiersResponse {
792 write_frontier: new_write_frontier,
793 input_frontier: new_input_frontier,
794 output_frontier: new_output_frontier,
795 };
796 if response.has_updates() {
797 responses.push((id, response));
798 }
799 }
800
801 for (id, frontiers) in responses {
802 self.send_compute_response(ComputeResponse::Frontiers(id, frontiers));
803 }
804 }
805
806 pub(crate) fn report_metrics(&self) {
808 if let Some(expiration) = self.compute_state.replica_expiration.as_option() {
809 let now = Duration::from_millis(mz_ore::now::SYSTEM_TIME()).as_secs_f64();
810 let expiration = Duration::from_millis(<u64>::from(expiration)).as_secs_f64();
811 let remaining = expiration - now;
812 self.compute_state
813 .metrics
814 .replica_expiration_remaining_seconds
815 .set(remaining)
816 }
817 }
818
819 fn process_peek(&mut self, upper: &mut Antichain<Timestamp>, mut peek: PendingPeek) {
821 let response = match &mut peek {
822 PendingPeek::Index(peek) => {
823 let start = Instant::now();
824
825 let peek_stash_eligible = peek
826 .peek
827 .finishing
828 .is_streamable(peek.peek.result_desc.arity());
829
830 let peek_stash_enabled = {
831 let enabled = ENABLE_PEEK_RESPONSE_STASH.get(&self.compute_state.worker_config);
832 let peek_persist_stash_available =
833 self.compute_state.peek_stash_persist_location.is_some();
834 if !peek_persist_stash_available && enabled {
835 error!("missing peek_stash_persist_location but peek stash is enabled");
836 }
837 enabled && peek_persist_stash_available
838 };
839
840 let peek_stash_threshold_bytes =
841 PEEK_RESPONSE_STASH_THRESHOLD_BYTES.get(&self.compute_state.worker_config);
842
843 let metrics = IndexPeekMetrics {
844 seek_fulfillment_seconds: &self
845 .compute_state
846 .metrics
847 .index_peek_seek_fulfillment_seconds,
848 frontier_check_seconds: &self
849 .compute_state
850 .metrics
851 .index_peek_frontier_check_seconds,
852 error_scan_seconds: &self.compute_state.metrics.index_peek_error_scan_seconds,
853 cursor_setup_seconds: &self
854 .compute_state
855 .metrics
856 .index_peek_cursor_setup_seconds,
857 row_iteration_seconds: &self
858 .compute_state
859 .metrics
860 .index_peek_row_iteration_seconds,
861 result_sort_seconds: &self.compute_state.metrics.index_peek_result_sort_seconds,
862 row_collection_seconds: &self
863 .compute_state
864 .metrics
865 .index_peek_row_collection_seconds,
866 };
867
868 let status = peek.seek_fulfillment(
869 upper,
870 self.compute_state.max_result_size,
871 peek_stash_enabled && peek_stash_eligible,
872 peek_stash_threshold_bytes,
873 &metrics,
874 );
875
876 self.compute_state
877 .metrics
878 .index_peek_total_seconds
879 .observe(start.elapsed().as_secs_f64());
880
881 match status {
882 PeekStatus::Ready(result) => Some(result),
883 PeekStatus::NotReady => None,
884 PeekStatus::UsePeekStash => {
885 let _span =
886 span!(parent: &peek.span, Level::DEBUG, "process_stash_peek").entered();
887
888 let peek_stash_batch_max_runs = PEEK_RESPONSE_STASH_BATCH_MAX_RUNS
889 .get(&self.compute_state.worker_config);
890
891 let stash_task = peek_stash::StashingPeek::start_upload(
892 Arc::clone(&self.compute_state.persist_clients),
893 self.compute_state
894 .peek_stash_persist_location
895 .as_ref()
896 .expect("verified above"),
897 peek.peek.clone(),
898 peek.trace_bundle.clone(),
899 peek_stash_batch_max_runs,
900 );
901
902 self.compute_state
903 .pending_peeks
904 .insert(peek.peek.uuid, PendingPeek::Stash(stash_task));
905 return;
906 }
907 }
908 }
909 PendingPeek::Persist(peek) => peek.result.try_recv().ok().map(|(result, duration)| {
910 self.compute_state
911 .metrics
912 .persist_peek_seconds
913 .observe(duration.as_secs_f64());
914 result
915 }),
916 PendingPeek::Stash(stashing_peek) => {
917 let num_batches = PEEK_STASH_NUM_BATCHES.get(&self.compute_state.worker_config);
918 let batch_size = PEEK_STASH_BATCH_SIZE.get(&self.compute_state.worker_config);
919 stashing_peek.pump_rows(num_batches, batch_size);
920
921 if let Ok((response, duration)) = stashing_peek.result.try_recv() {
922 self.compute_state
923 .metrics
924 .stashed_peek_seconds
925 .observe(duration.as_secs_f64());
926 trace!(?stashing_peek.peek, ?duration, "finished stashing peek response in persist");
927
928 Some(response)
929 } else {
930 None
931 }
932 }
933 };
934
935 if let Some(response) = response {
936 let _span = span!(parent: peek.span(), Level::DEBUG, "process_peek_response").entered();
937 self.send_peek_response(peek, response)
938 } else {
939 let uuid = peek.peek().uuid;
940 self.compute_state.pending_peeks.insert(uuid, peek);
941 }
942 }
943
944 pub fn process_peeks(&mut self) {
946 let mut upper = Antichain::new();
947 let pending_peeks = std::mem::take(&mut self.compute_state.pending_peeks);
948 for (_uuid, peek) in pending_peeks {
949 self.process_peek(&mut upper, peek);
950 }
951 }
952
953 #[mz_ore::instrument(level = "debug")]
958 fn send_peek_response(&mut self, peek: PendingPeek, response: PeekResponse) {
959 let log_event = peek.as_log_event(false);
960 self.send_compute_response(ComputeResponse::PeekResponse(
962 peek.peek().uuid,
963 response,
964 OpenTelemetryContext::obtain(),
965 ));
966
967 if let Some(logger) = self.compute_state.compute_logger.as_mut() {
969 logger.log(&log_event);
970 }
971 }
972
973 pub fn process_subscribes(&mut self) {
975 let mut subscribe_responses = self.compute_state.subscribe_response_buffer.borrow_mut();
976 for (sink_id, mut response) in subscribe_responses.drain(..) {
977 if let Some(collection) = self.compute_state.collections.get_mut(&sink_id) {
979 let new_frontier = match &response {
980 SubscribeResponse::Batch(b) => b.upper.clone(),
981 SubscribeResponse::DroppedAt(_) => Antichain::new(),
982 };
983
984 let reported = collection.reported_frontiers();
985 assert!(
986 reported.write_frontier.allows_reporting(&new_frontier),
987 "subscribe write frontier regression: {:?} -> {:?}",
988 reported.write_frontier,
989 new_frontier,
990 );
991 assert!(
992 reported.input_frontier.allows_reporting(&new_frontier),
993 "subscribe input frontier regression: {:?} -> {:?}",
994 reported.input_frontier,
995 new_frontier,
996 );
997
998 collection
999 .set_reported_write_frontier(ReportedFrontier::Reported(new_frontier.clone()));
1000 collection
1001 .set_reported_input_frontier(ReportedFrontier::Reported(new_frontier.clone()));
1002 collection.set_reported_output_frontier(ReportedFrontier::Reported(new_frontier));
1003 } else {
1004 }
1007
1008 response
1009 .to_error_if_exceeds(usize::try_from(self.compute_state.max_result_size).unwrap());
1010 self.send_compute_response(ComputeResponse::SubscribeResponse(sink_id, response));
1011 }
1012 }
1013
1014 pub fn process_copy_tos(&self) {
1016 let mut responses = self.compute_state.copy_to_response_buffer.borrow_mut();
1017 for (sink_id, response) in responses.drain(..) {
1018 self.send_compute_response(ComputeResponse::CopyToResponse(sink_id, response));
1019 }
1020 }
1021
1022 fn send_compute_response(&self, response: ComputeResponse) {
1024 let _ = self.response_tx.send(response);
1027 }
1028
1029 pub(crate) fn check_expiration(&self) {
1031 let now = mz_ore::now::SYSTEM_TIME();
1032 if self.compute_state.replica_expiration.less_than(&now.into()) {
1033 let now_datetime = mz_ore::now::to_datetime(now);
1034 let expiration_datetime = self
1035 .compute_state
1036 .replica_expiration
1037 .as_option()
1038 .map(Into::into)
1039 .map(mz_ore::now::to_datetime);
1040
1041 error!(
1044 now,
1045 now_datetime = ?now_datetime,
1046 expiration = ?self.compute_state.replica_expiration.elements(),
1047 expiration_datetime = ?expiration_datetime,
1048 "replica expired"
1049 );
1050
1051 assert!(
1053 !self.compute_state.replica_expiration.less_than(&now.into()),
1054 "replica expired. now: {now} ({now_datetime:?}), expiration: {:?} ({expiration_datetime:?})",
1055 self.compute_state.replica_expiration.elements(),
1056 );
1057 }
1058 }
1059
1060 pub fn determine_dataflow_expiration(
1066 &self,
1067 time_dependence: &TimeDependence,
1068 until: &Antichain<mz_repr::Timestamp>,
1069 ) -> Antichain<mz_repr::Timestamp> {
1070 let iter = self
1075 .compute_state
1076 .replica_expiration
1077 .iter()
1078 .filter_map(|t| time_dependence.apply(*t))
1079 .filter_map(|t| mz_repr::Timestamp::try_step_forward(&t))
1080 .filter(|expiration| !until.less_equal(expiration));
1081 Antichain::from_iter(iter)
1082 }
1083}
1084
1085pub enum PendingPeek {
1090 Index(IndexPeek),
1092 Persist(PersistPeek),
1094 Stash(peek_stash::StashingPeek),
1097}
1098
1099impl PendingPeek {
1100 pub fn as_log_event(&self, installed: bool) -> ComputeEvent {
1102 let peek = self.peek();
1103 let (id, peek_type) = match &peek.target {
1104 PeekTarget::Index { id } => (*id, logging::compute::PeekType::Index),
1105 PeekTarget::Persist { id, .. } => (*id, logging::compute::PeekType::Persist),
1106 };
1107 let uuid = peek.uuid.into_bytes();
1108 ComputeEvent::Peek(PeekEvent {
1109 id,
1110 time: peek.timestamp,
1111 uuid,
1112 peek_type,
1113 installed,
1114 })
1115 }
1116
1117 fn index(peek: Peek, mut trace_bundle: TraceBundle) -> Self {
1118 let empty_frontier = Antichain::new();
1119 let timestamp_frontier = Antichain::from_elem(peek.timestamp);
1120 trace_bundle
1121 .oks_mut()
1122 .set_logical_compaction(timestamp_frontier.borrow());
1123 trace_bundle
1124 .errs_mut()
1125 .set_logical_compaction(timestamp_frontier.borrow());
1126 trace_bundle
1127 .oks_mut()
1128 .set_physical_compaction(empty_frontier.borrow());
1129 trace_bundle
1130 .errs_mut()
1131 .set_physical_compaction(empty_frontier.borrow());
1132
1133 PendingPeek::Index(IndexPeek {
1134 peek,
1135 trace_bundle,
1136 span: tracing::Span::current(),
1137 })
1138 }
1139
1140 fn persist<A: Allocate>(
1141 peek: Peek,
1142 persist_clients: Arc<PersistClientCache>,
1143 metadata: CollectionMetadata,
1144 max_result_size: usize,
1145 timely_worker: &TimelyWorker<A>,
1146 ) -> Self {
1147 let active_worker = {
1148 let chosen_index = usize::cast_from(peek.uuid.hashed()) % timely_worker.peers();
1150 chosen_index == timely_worker.index()
1151 };
1152 let activator = timely_worker.sync_activator_for([].into());
1153 let peek_uuid = peek.uuid;
1154
1155 let (result_tx, result_rx) = oneshot::channel();
1156 let timestamp = peek.timestamp;
1157 let mfp_plan = peek.map_filter_project.clone();
1158 let max_results_needed = peek
1159 .finishing
1160 .limit
1161 .map(|l| usize::cast_from(u64::from(l)))
1162 .unwrap_or(usize::MAX)
1163 + peek.finishing.offset;
1164 let order_by = peek.finishing.order_by.clone();
1165
1166 let literal_constraint = peek
1168 .literal_constraints
1169 .clone()
1170 .map(|rows| rows.into_element());
1171
1172 let task_handle = mz_ore::task::spawn(|| "persist::peek", async move {
1173 let start = Instant::now();
1174 let result = if active_worker {
1175 PersistPeek::do_peek(
1176 &persist_clients,
1177 metadata,
1178 timestamp,
1179 literal_constraint,
1180 mfp_plan,
1181 max_result_size,
1182 max_results_needed,
1183 )
1184 .await
1185 } else {
1186 Ok(vec![])
1187 };
1188 let result = match result {
1189 Ok(rows) => PeekResponse::Rows(vec![RowCollection::new(rows, &order_by)]),
1190 Err(e) => PeekResponse::Error(e.to_string()),
1191 };
1192 match result_tx.send((result, start.elapsed())) {
1193 Ok(()) => {}
1194 Err((_result, elapsed)) => {
1195 debug!(duration =? elapsed, "dropping result for cancelled peek {peek_uuid}")
1196 }
1197 }
1198 match activator.activate() {
1199 Ok(()) => {}
1200 Err(_) => {
1201 debug!("unable to wake timely after completed peek {peek_uuid}");
1202 }
1203 }
1204 });
1205 PendingPeek::Persist(PersistPeek {
1206 peek,
1207 _abort_handle: task_handle.abort_on_drop(),
1208 result: result_rx,
1209 span: tracing::Span::current(),
1210 })
1211 }
1212
1213 fn span(&self) -> &tracing::Span {
1214 match self {
1215 PendingPeek::Index(p) => &p.span,
1216 PendingPeek::Persist(p) => &p.span,
1217 PendingPeek::Stash(p) => &p.span,
1218 }
1219 }
1220
1221 pub(crate) fn peek(&self) -> &Peek {
1222 match self {
1223 PendingPeek::Index(p) => &p.peek,
1224 PendingPeek::Persist(p) => &p.peek,
1225 PendingPeek::Stash(p) => &p.peek,
1226 }
1227 }
1228}
1229
1230pub struct PersistPeek {
1235 pub(crate) peek: Peek,
1236 _abort_handle: AbortOnDropHandle<()>,
1239 result: oneshot::Receiver<(PeekResponse, Duration)>,
1241 span: tracing::Span,
1243}
1244
1245impl PersistPeek {
1246 async fn do_peek(
1247 persist_clients: &PersistClientCache,
1248 metadata: CollectionMetadata,
1249 as_of: Timestamp,
1250 literal_constraint: Option<Row>,
1251 mfp_plan: SafeMfpPlan,
1252 max_result_size: usize,
1253 mut limit_remaining: usize,
1254 ) -> Result<Vec<(Row, NonZeroUsize)>, String> {
1255 let client = persist_clients
1256 .open(metadata.persist_location)
1257 .await
1258 .map_err(|e| e.to_string())?;
1259
1260 let mut reader: ReadHandle<SourceData, (), Timestamp, StorageDiff> = client
1261 .open_leased_reader(
1262 metadata.data_shard,
1263 Arc::new(metadata.relation_desc.clone()),
1264 Arc::new(UnitSchema),
1265 Diagnostics::from_purpose("persist::peek"),
1266 USE_CRITICAL_SINCE_SNAPSHOT.get(client.dyncfgs()),
1267 )
1268 .await
1269 .map_err(|e| e.to_string())?;
1270
1271 let mut txns_read = if let Some(txns_id) = metadata.txns_shard {
1278 Some(TxnsCache::open(&client, txns_id, Some(metadata.data_shard)).await)
1279 } else {
1280 None
1281 };
1282
1283 let metrics = client.metrics();
1284
1285 let mut cursor = StatsCursor::new(
1286 &mut reader,
1287 txns_read.as_mut(),
1288 metrics,
1289 &mfp_plan,
1290 &metadata.relation_desc,
1291 Antichain::from_elem(as_of),
1292 )
1293 .await
1294 .map_err(|since| {
1295 format!("attempted to peek at {as_of}, but the since has advanced to {since:?}")
1296 })?;
1297
1298 let mut result = vec![];
1300 let mut datum_vec = DatumVec::new();
1301 let mut row_builder = Row::default();
1302 let arena = RowArena::new();
1303 let mut total_size = 0usize;
1304
1305 let literal_len = match &literal_constraint {
1306 None => 0,
1307 Some(row) => row.iter().count(),
1308 };
1309
1310 'collect: while limit_remaining > 0 {
1311 let Some(batch) = cursor.next().await else {
1312 break;
1313 };
1314 for (data, _, d) in batch {
1315 let row = data.map_err(|e| e.to_string())?;
1316
1317 if let Some(literal) = &literal_constraint {
1318 match row.iter().take(literal_len).cmp(literal.iter()) {
1319 Ordering::Less => continue,
1320 Ordering::Equal => {}
1321 Ordering::Greater => break 'collect,
1322 }
1323 }
1324
1325 let count: usize = d.try_into().map_err(|_| {
1326 error!(
1327 shard = %metadata.data_shard, diff = d, ?row,
1328 "persist peek encountered negative multiplicities",
1329 );
1330 format!(
1331 "Invalid data in source, \
1332 saw retractions ({}) for row that does not exist: {:?}",
1333 -d, row,
1334 )
1335 })?;
1336 let Some(count) = NonZeroUsize::new(count) else {
1337 continue;
1338 };
1339 let mut datum_local = datum_vec.borrow_with(&row);
1340 let eval_result = mfp_plan
1341 .evaluate_into(&mut datum_local, &arena, &mut row_builder)
1342 .map(|row| row.cloned())
1343 .map_err(|e| e.to_string())?;
1344 if let Some(row) = eval_result {
1345 total_size = total_size
1346 .saturating_add(row.byte_len())
1347 .saturating_add(std::mem::size_of::<NonZeroUsize>());
1348 if total_size > max_result_size {
1349 return Err(format!(
1350 "result exceeds max size of {}",
1351 ByteSize::b(u64::cast_from(max_result_size))
1352 ));
1353 }
1354 result.push((row, count));
1355 limit_remaining = limit_remaining.saturating_sub(count.get());
1356 if limit_remaining == 0 {
1357 break;
1358 }
1359 }
1360 }
1361 }
1362
1363 Ok(result)
1364 }
1365}
1366
1367pub struct IndexPeek {
1369 peek: Peek,
1370 trace_bundle: TraceBundle,
1372 span: tracing::Span,
1374}
1375
1376pub(crate) struct IndexPeekMetrics<'a> {
1381 pub seek_fulfillment_seconds: &'a prometheus::Histogram,
1382 pub frontier_check_seconds: &'a prometheus::Histogram,
1383 pub error_scan_seconds: &'a prometheus::Histogram,
1384 pub cursor_setup_seconds: &'a prometheus::Histogram,
1385 pub row_iteration_seconds: &'a prometheus::Histogram,
1386 pub result_sort_seconds: &'a prometheus::Histogram,
1387 pub row_collection_seconds: &'a prometheus::Histogram,
1388}
1389
1390impl IndexPeek {
1391 fn seek_fulfillment(
1404 &mut self,
1405 upper: &mut Antichain<Timestamp>,
1406 max_result_size: u64,
1407 peek_stash_eligible: bool,
1408 peek_stash_threshold_bytes: usize,
1409 metrics: &IndexPeekMetrics<'_>,
1410 ) -> PeekStatus {
1411 let method_start = Instant::now();
1412
1413 self.trace_bundle.oks_mut().read_upper(upper);
1414 if upper.less_equal(&self.peek.timestamp) {
1415 return PeekStatus::NotReady;
1416 }
1417 self.trace_bundle.errs_mut().read_upper(upper);
1418 if upper.less_equal(&self.peek.timestamp) {
1419 return PeekStatus::NotReady;
1420 }
1421
1422 let read_frontier = self.trace_bundle.compaction_frontier();
1423 if !read_frontier.less_equal(&self.peek.timestamp) {
1424 let error = format!(
1425 "Arrangement compaction frontier ({:?}) is beyond the time of the attempted read ({})",
1426 read_frontier.elements(),
1427 self.peek.timestamp,
1428 );
1429 return PeekStatus::Ready(PeekResponse::Error(error));
1430 }
1431
1432 metrics
1433 .frontier_check_seconds
1434 .observe(method_start.elapsed().as_secs_f64());
1435
1436 let result = self.collect_finished_data(
1437 max_result_size,
1438 peek_stash_eligible,
1439 peek_stash_threshold_bytes,
1440 metrics,
1441 );
1442
1443 metrics
1444 .seek_fulfillment_seconds
1445 .observe(method_start.elapsed().as_secs_f64());
1446
1447 result
1448 }
1449
1450 fn collect_finished_data(
1452 &mut self,
1453 max_result_size: u64,
1454 peek_stash_eligible: bool,
1455 peek_stash_threshold_bytes: usize,
1456 metrics: &IndexPeekMetrics<'_>,
1457 ) -> PeekStatus {
1458 let error_scan_start = Instant::now();
1459
1460 let (mut cursor, storage) = self.trace_bundle.errs_mut().cursor();
1463 while cursor.key_valid(&storage) {
1464 let mut copies = Diff::ZERO;
1465 cursor.map_times(&storage, |time, diff| {
1466 if time.less_equal(&self.peek.timestamp) {
1467 copies += diff;
1468 }
1469 });
1470 if copies.is_negative() {
1471 let error = cursor.key(&storage);
1472 error!(
1473 target = %self.peek.target.id(), diff = %copies, %error,
1474 "index peek encountered negative multiplicities in error trace",
1475 );
1476 return PeekStatus::Ready(PeekResponse::Error(format!(
1477 "Invalid data in source errors, \
1478 saw retractions ({}) for row that does not exist: {}",
1479 -copies, error,
1480 )));
1481 }
1482 if copies.is_positive() {
1483 return PeekStatus::Ready(PeekResponse::Error(cursor.key(&storage).to_string()));
1484 }
1485 cursor.step_key(&storage);
1486 }
1487
1488 metrics
1489 .error_scan_seconds
1490 .observe(error_scan_start.elapsed().as_secs_f64());
1491
1492 Self::collect_ok_finished_data(
1493 &self.peek,
1494 self.trace_bundle.oks_mut(),
1495 max_result_size,
1496 peek_stash_eligible,
1497 peek_stash_threshold_bytes,
1498 metrics,
1499 )
1500 }
1501
1502 fn collect_ok_finished_data<Tr>(
1504 peek: &Peek<Timestamp>,
1505 oks_handle: &mut Tr,
1506 max_result_size: u64,
1507 peek_stash_eligible: bool,
1508 peek_stash_threshold_bytes: usize,
1509 metrics: &IndexPeekMetrics<'_>,
1510 ) -> PeekStatus
1511 where
1512 for<'a> Tr: TraceReader<
1513 Key<'a>: ToDatumIter + Eq,
1514 KeyOwn = Row,
1515 Val<'a>: ToDatumIter,
1516 TimeGat<'a>: PartialOrder<mz_repr::Timestamp>,
1517 DiffGat<'a> = &'a Diff,
1518 >,
1519 {
1520 let max_result_size = usize::cast_from(max_result_size);
1521 let count_byte_size = size_of::<NonZeroUsize>();
1522
1523 let cursor_setup_start = Instant::now();
1525
1526 let mut peek_iterator = peek_result_iterator::PeekResultIterator::new(
1529 peek.target.id().clone(),
1530 peek.map_filter_project.clone(),
1531 peek.timestamp,
1532 peek.literal_constraints.clone().as_deref_mut(),
1533 oks_handle,
1534 );
1535
1536 metrics
1537 .cursor_setup_seconds
1538 .observe(cursor_setup_start.elapsed().as_secs_f64());
1539
1540 let mut results = Vec::new();
1542 let mut total_size: usize = 0;
1543
1544 let max_results = peek.finishing.num_rows_needed();
1550
1551 let mut l_datum_vec = DatumVec::new();
1552 let mut r_datum_vec = DatumVec::new();
1553
1554 let row_iteration_start = Instant::now();
1556 let mut sort_time_accum = Duration::ZERO;
1557
1558 while let Some(row) = peek_iterator.next() {
1559 let row = match row {
1560 Ok(row) => row,
1561 Err(err) => return PeekStatus::Ready(PeekResponse::Error(err)),
1562 };
1563 let (row, copies) = row;
1564 let copies: NonZeroUsize = NonZeroUsize::try_from(copies).expect("fits into usize");
1565
1566 total_size = total_size
1567 .saturating_add(row.byte_len())
1568 .saturating_add(count_byte_size);
1569 if peek_stash_eligible && total_size > peek_stash_threshold_bytes {
1570 return PeekStatus::UsePeekStash;
1571 }
1572 if total_size > max_result_size {
1573 return PeekStatus::Ready(PeekResponse::Error(format!(
1574 "result exceeds max size of {}",
1575 ByteSize::b(u64::cast_from(max_result_size))
1576 )));
1577 }
1578
1579 results.push((row, copies));
1580
1581 if let Some(max_results) = max_results {
1584 if results.len() >= 2 * max_results {
1588 if peek.finishing.order_by.is_empty() {
1589 results.truncate(max_results);
1590 metrics
1591 .row_iteration_seconds
1592 .observe(row_iteration_start.elapsed().as_secs_f64());
1593 metrics
1594 .result_sort_seconds
1595 .observe(sort_time_accum.as_secs_f64());
1596 let row_collection_start = Instant::now();
1597 let collection = RowCollection::new(results, &peek.finishing.order_by);
1598 metrics
1599 .row_collection_seconds
1600 .observe(row_collection_start.elapsed().as_secs_f64());
1601 return PeekStatus::Ready(PeekResponse::Rows(vec![collection]));
1602 } else {
1603 let sort_start = Instant::now();
1612 results.sort_by(|left, right| {
1613 let left_datums = l_datum_vec.borrow_with(&left.0);
1614 let right_datums = r_datum_vec.borrow_with(&right.0);
1615 mz_expr::compare_columns(
1616 &peek.finishing.order_by,
1617 &left_datums,
1618 &right_datums,
1619 || left.0.cmp(&right.0),
1620 )
1621 });
1622 sort_time_accum += sort_start.elapsed();
1623 let dropped = results.drain(max_results..);
1624 let dropped_size =
1625 dropped.into_iter().fold(0, |acc: usize, (row, _count)| {
1626 acc.saturating_add(row.byte_len().saturating_add(count_byte_size))
1627 });
1628 total_size = total_size.saturating_sub(dropped_size);
1629 }
1630 }
1631 }
1632 }
1633
1634 metrics
1635 .row_iteration_seconds
1636 .observe(row_iteration_start.elapsed().as_secs_f64());
1637 metrics
1638 .result_sort_seconds
1639 .observe(sort_time_accum.as_secs_f64());
1640
1641 let row_collection_start = Instant::now();
1642 let collection = RowCollection::new(results, &peek.finishing.order_by);
1643 metrics
1644 .row_collection_seconds
1645 .observe(row_collection_start.elapsed().as_secs_f64());
1646 PeekStatus::Ready(PeekResponse::Rows(vec![collection]))
1647 }
1648}
1649
1650enum PeekStatus {
1653 NotReady,
1656 UsePeekStash,
1659 Ready(PeekResponse),
1661}
1662
1663#[derive(Debug)]
1665struct ReportedFrontiers {
1666 write_frontier: ReportedFrontier,
1668 input_frontier: ReportedFrontier,
1670 output_frontier: ReportedFrontier,
1672}
1673
1674impl ReportedFrontiers {
1675 fn new() -> Self {
1677 Self {
1678 write_frontier: ReportedFrontier::new(),
1679 input_frontier: ReportedFrontier::new(),
1680 output_frontier: ReportedFrontier::new(),
1681 }
1682 }
1683}
1684
1685#[derive(Clone, Debug)]
1687pub enum ReportedFrontier {
1688 Reported(Antichain<Timestamp>),
1690 NotReported {
1692 lower: Antichain<Timestamp>,
1694 },
1695}
1696
1697impl ReportedFrontier {
1698 pub fn new() -> Self {
1700 let lower = Antichain::from_elem(timely::progress::Timestamp::minimum());
1701 Self::NotReported { lower }
1702 }
1703
1704 pub fn is_empty(&self) -> bool {
1706 match self {
1707 Self::Reported(frontier) => frontier.is_empty(),
1708 Self::NotReported { .. } => false,
1709 }
1710 }
1711
1712 fn allows_reporting(&self, other: &Antichain<Timestamp>) -> bool {
1718 match self {
1719 Self::Reported(frontier) => PartialOrder::less_than(frontier, other),
1720 Self::NotReported { lower } => PartialOrder::less_equal(lower, other),
1721 }
1722 }
1723}
1724
1725pub struct CollectionState {
1727 reported_frontiers: ReportedFrontiers,
1729 dataflow_index: Rc<usize>,
1735 pub is_subscribe_or_copy: bool,
1741 as_of: Antichain<Timestamp>,
1745
1746 pub sink_token: Option<SinkToken>,
1751 pub sink_write_frontier: Option<Rc<RefCell<Antichain<Timestamp>>>>,
1755 pub input_probes: BTreeMap<GlobalId, probe::Handle<Timestamp>>,
1757 pub compute_probe: Option<probe::Handle<Timestamp>>,
1762 logging: Option<CollectionLogging>,
1764 metrics: CollectionMetrics,
1766 read_only_tx: watch::Sender<bool>,
1778 pub read_only_rx: watch::Receiver<bool>,
1780}
1781
1782impl CollectionState {
1783 fn new(
1784 dataflow_index: Rc<usize>,
1785 is_subscribe_or_copy: bool,
1786 as_of: Antichain<Timestamp>,
1787 metrics: CollectionMetrics,
1788 ) -> Self {
1789 let (read_only_tx, read_only_rx) = watch::channel(true);
1792
1793 Self {
1794 reported_frontiers: ReportedFrontiers::new(),
1795 dataflow_index,
1796 is_subscribe_or_copy,
1797 as_of,
1798 sink_token: None,
1799 sink_write_frontier: None,
1800 input_probes: Default::default(),
1801 compute_probe: None,
1802 logging: None,
1803 metrics,
1804 read_only_tx,
1805 read_only_rx,
1806 }
1807 }
1808
1809 fn reported_frontiers(&self) -> &ReportedFrontiers {
1811 &self.reported_frontiers
1812 }
1813
1814 pub fn reset_reported_frontiers(&mut self, frontier: ReportedFrontier) {
1816 self.reported_frontiers.write_frontier = frontier.clone();
1817 self.reported_frontiers.input_frontier = frontier.clone();
1818 self.reported_frontiers.output_frontier = frontier;
1819 }
1820
1821 fn set_reported_write_frontier(&mut self, frontier: ReportedFrontier) {
1823 if let Some(logging) = &mut self.logging {
1824 let time = match &frontier {
1825 ReportedFrontier::Reported(frontier) => frontier.get(0).copied(),
1826 ReportedFrontier::NotReported { .. } => Some(Timestamp::MIN),
1827 };
1828 logging.set_frontier(time);
1829 }
1830
1831 self.reported_frontiers.write_frontier = frontier;
1832 }
1833
1834 fn set_reported_input_frontier(&mut self, frontier: ReportedFrontier) {
1836 if let Some(logging) = &mut self.logging {
1838 for (id, probe) in &self.input_probes {
1839 let new_time = probe.with_frontier(|frontier| frontier.as_option().copied());
1840 logging.set_import_frontier(*id, new_time);
1841 }
1842 }
1843
1844 self.reported_frontiers.input_frontier = frontier;
1845 }
1846
1847 fn set_reported_output_frontier(&mut self, frontier: ReportedFrontier) {
1849 let already_hydrated = self.hydrated();
1850
1851 self.reported_frontiers.output_frontier = frontier;
1852
1853 if !already_hydrated && self.hydrated() {
1854 if let Some(logging) = &mut self.logging {
1855 logging.set_hydrated();
1856 }
1857 self.metrics.record_collection_hydrated();
1858 }
1859 }
1860
1861 fn hydrated(&self) -> bool {
1863 match &self.reported_frontiers.output_frontier {
1864 ReportedFrontier::Reported(frontier) => PartialOrder::less_than(&self.as_of, frontier),
1865 ReportedFrontier::NotReported { .. } => false,
1866 }
1867 }
1868
1869 fn allow_writes(&self) {
1871 info!(
1872 dataflow_index = *self.dataflow_index,
1873 export = ?self.logging.as_ref().map(|l| l.export_id()),
1874 "allowing writes for dataflow",
1875 );
1876 let _ = self.read_only_tx.send(false);
1877 }
1878}