mz_compute/
compute_state.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5
6//! Worker-local state for compute timely instances.
7
8use std::any::Any;
9use std::cell::RefCell;
10use std::cmp::Ordering;
11use std::collections::{BTreeMap, BTreeSet};
12use std::num::NonZeroUsize;
13
14use std::rc::Rc;
15use std::sync::{Arc, mpsc};
16use std::time::{Duration, Instant};
17
18use bytesize::ByteSize;
19use differential_dataflow::Hashable;
20use differential_dataflow::lattice::Lattice;
21use differential_dataflow::trace::{Cursor, TraceReader};
22use mz_compute_client::logging::LoggingConfig;
23use mz_compute_client::protocol::command::{
24    ComputeCommand, ComputeParameters, InstanceConfig, Peek, PeekTarget,
25};
26use mz_compute_client::protocol::history::ComputeCommandHistory;
27use mz_compute_client::protocol::response::{
28    ComputeResponse, CopyToResponse, FrontiersResponse, OperatorHydrationStatus, PeekResponse,
29    StatusResponse, SubscribeResponse,
30};
31use mz_compute_types::dataflows::DataflowDescription;
32use mz_compute_types::dyncfgs::{
33    ENABLE_ACTIVE_DATAFLOW_CANCELATION, ENABLE_PEEK_RESPONSE_STASH,
34    PEEK_RESPONSE_STASH_BATCH_MAX_RUNS, PEEK_RESPONSE_STASH_THRESHOLD_BYTES, PEEK_STASH_BATCH_SIZE,
35    PEEK_STASH_NUM_BATCHES,
36};
37use mz_compute_types::plan::LirId;
38use mz_compute_types::plan::render_plan::RenderPlan;
39use mz_dyncfg::ConfigSet;
40use mz_expr::SafeMfpPlan;
41use mz_expr::row::RowCollection;
42use mz_ore::cast::CastFrom;
43use mz_ore::collections::CollectionExt;
44use mz_ore::metrics::UIntGauge;
45use mz_ore::now::EpochMillis;
46use mz_ore::task::AbortOnDropHandle;
47use mz_ore::tracing::{OpenTelemetryContext, TracingHandle};
48use mz_persist_client::Diagnostics;
49use mz_persist_client::cache::PersistClientCache;
50use mz_persist_client::cfg::USE_CRITICAL_SINCE_SNAPSHOT;
51use mz_persist_client::read::ReadHandle;
52use mz_persist_types::PersistLocation;
53use mz_persist_types::codec_impls::UnitSchema;
54use mz_repr::fixed_length::ToDatumIter;
55use mz_repr::{DatumVec, Diff, GlobalId, Row, RowArena, Timestamp};
56use mz_storage_operators::stats::StatsCursor;
57use mz_storage_types::StorageDiff;
58use mz_storage_types::controller::CollectionMetadata;
59use mz_storage_types::dyncfgs::ORE_OVERFLOWING_BEHAVIOR;
60use mz_storage_types::sources::SourceData;
61use mz_storage_types::time_dependence::TimeDependence;
62use mz_txn_wal::operator::TxnsContext;
63use mz_txn_wal::txn_cache::TxnsCache;
64use timely::communication::Allocate;
65use timely::dataflow::operators::probe;
66use timely::order::PartialOrder;
67use timely::progress::frontier::Antichain;
68use timely::scheduling::Scheduler;
69use timely::worker::Worker as TimelyWorker;
70use tokio::sync::{oneshot, watch};
71use tracing::{Level, debug, error, info, span, warn};
72use uuid::Uuid;
73
74use crate::arrangement::manager::{TraceBundle, TraceManager};
75use crate::logging;
76use crate::logging::compute::{CollectionLogging, ComputeEvent, PeekEvent};
77use crate::logging::initialize::LoggingTraces;
78use crate::metrics::{CollectionMetrics, WorkerMetrics};
79use crate::render::{LinearJoinSpec, StartSignal};
80use crate::server::{ComputeInstanceContext, ResponseSender};
81
82mod peek_result_iterator;
83mod peek_stash;
84
85/// Worker-local state that is maintained across dataflows.
86///
87/// This state is restricted to the COMPUTE state, the deterministic, idempotent work
88/// done between data ingress and egress.
89pub struct ComputeState {
90    /// State kept for each installed compute collection.
91    ///
92    /// Each collection has exactly one frontier.
93    /// How the frontier is communicated depends on the collection type:
94    ///  * Frontiers of indexes are equal to the frontier of their corresponding traces in the
95    ///    `TraceManager`.
96    ///  * Persist sinks store their current frontier in `CollectionState::sink_write_frontier`.
97    ///  * Subscribes report their frontiers through the `subscribe_response_buffer`.
98    pub collections: BTreeMap<GlobalId, CollectionState>,
99    /// Collections that were recently dropped and whose removal needs to be reported.
100    pub dropped_collections: Vec<(GlobalId, DroppedCollection)>,
101    /// The traces available for sharing across dataflows.
102    pub traces: TraceManager,
103    /// Shared buffer with SUBSCRIBE operator instances by which they can respond.
104    ///
105    /// The entries are pairs of sink identifier (to identify the subscribe instance)
106    /// and the response itself.
107    pub subscribe_response_buffer: Rc<RefCell<Vec<(GlobalId, SubscribeResponse)>>>,
108    /// Shared buffer with S3 oneshot operator instances by which they can respond.
109    ///
110    /// The entries are pairs of sink identifier (to identify the s3 oneshot instance)
111    /// and the response itself.
112    pub copy_to_response_buffer: Rc<RefCell<Vec<(GlobalId, CopyToResponse)>>>,
113    /// Peek commands that are awaiting fulfillment.
114    pub pending_peeks: BTreeMap<Uuid, PendingPeek>,
115    /// The persist location where we can stash large peek results.
116    pub peek_stash_persist_location: Option<PersistLocation>,
117    /// The logger, from Timely's logging framework, if logs are enabled.
118    pub compute_logger: Option<logging::compute::Logger>,
119    /// A process-global cache of (blob_uri, consensus_uri) -> PersistClient.
120    /// This is intentionally shared between workers.
121    pub persist_clients: Arc<PersistClientCache>,
122    /// Context necessary for rendering txn-wal operators.
123    pub txns_ctx: TxnsContext,
124    /// History of commands received by this workers and all its peers.
125    pub command_history: ComputeCommandHistory<UIntGauge>,
126    /// Max size in bytes of any result.
127    max_result_size: u64,
128    /// Specification for rendering linear joins.
129    pub linear_join_spec: LinearJoinSpec,
130    /// Metrics for this worker.
131    pub metrics: WorkerMetrics,
132    /// A process-global handle to tracing configuration.
133    tracing_handle: Arc<TracingHandle>,
134    /// Other configuration for compute
135    pub context: ComputeInstanceContext,
136    /// Per-worker dynamic configuration.
137    ///
138    /// This is separate from the process-global `ConfigSet` and contains config options that need
139    /// to be applied consistently with compute command order.
140    ///
141    /// For example, for options that influence dataflow rendering it is important that all workers
142    /// render the same dataflow with the same options. If these options were stored in a global
143    /// `ConfigSet`, we couldn't guarantee that all workers observe changes to them at the same
144    /// point in the stream of compute commands. Storing per-worker configuration ensures that
145    /// because each worker's configuration is only updated once that worker observes the
146    /// respective `UpdateConfiguration` command.
147    ///
148    /// Reference-counted to avoid cloning for `Context`.
149    pub worker_config: Rc<ConfigSet>,
150
151    /// Receiver of operator hydration events.
152    pub hydration_rx: mpsc::Receiver<HydrationEvent>,
153    /// Transmitter of operator hydration events.
154    ///
155    /// Copies of this sender are passed to the hydration logging operators.
156    pub hydration_tx: mpsc::Sender<HydrationEvent>,
157
158    /// Collections awaiting schedule instruction by the controller.
159    ///
160    /// Each entry stores a reference to a token that can be dropped to unsuspend the collection's
161    /// dataflow. Multiple collections can reference the same token if they are exported by the
162    /// same dataflow.
163    suspended_collections: BTreeMap<GlobalId, Rc<dyn Any>>,
164
165    /// When this replica/cluster is in read-only mode it must not affect any
166    /// changes to external state. This flag can only be changed by a
167    /// [ComputeCommand::AllowWrites].
168    ///
169    /// Everything running on this replica/cluster must obey this flag. At the
170    /// time of writing the only part that is doing this is `persist_sink`.
171    ///
172    /// NOTE: In the future, we might want a more complicated flag, for example
173    /// something that tells us after which timestamp we are allowed to write.
174    /// In this first version we are keeping things as simple as possible!
175    pub read_only_rx: watch::Receiver<bool>,
176
177    /// Send-side for read-only state.
178    pub read_only_tx: watch::Sender<bool>,
179
180    /// Interval at which to perform server maintenance tasks. Set to a zero interval to
181    /// perform maintenance with every `step_or_park` invocation.
182    pub server_maintenance_interval: Duration,
183
184    /// The [`mz_ore::now::SYSTEM_TIME`] at which the replica was started.
185    ///
186    /// Used to compute `replica_expiration`.
187    pub init_system_time: EpochMillis,
188
189    /// The maximum time for which the replica is expected to live. If not empty, dataflows in the
190    /// replica can drop diffs associated with timestamps beyond the replica expiration.
191    /// The replica will panic if such dataflows are not dropped before the replica has expired.
192    pub replica_expiration: Antichain<Timestamp>,
193}
194
195impl ComputeState {
196    /// Construct a new `ComputeState`.
197    pub fn new(
198        persist_clients: Arc<PersistClientCache>,
199        txns_ctx: TxnsContext,
200        metrics: WorkerMetrics,
201        tracing_handle: Arc<TracingHandle>,
202        context: ComputeInstanceContext,
203    ) -> Self {
204        let traces = TraceManager::new(metrics.clone());
205        let command_history = ComputeCommandHistory::new(metrics.for_history());
206        let (hydration_tx, hydration_rx) = mpsc::channel();
207
208        // We always initialize as read_only=true. Only when we're explicitly
209        // allowed do we switch to doing writes.
210        let (read_only_tx, read_only_rx) = watch::channel(true);
211
212        Self {
213            collections: Default::default(),
214            dropped_collections: Default::default(),
215            traces,
216            subscribe_response_buffer: Default::default(),
217            copy_to_response_buffer: Default::default(),
218            pending_peeks: Default::default(),
219            peek_stash_persist_location: None,
220            compute_logger: None,
221            persist_clients,
222            txns_ctx,
223            command_history,
224            max_result_size: u64::MAX,
225            linear_join_spec: Default::default(),
226            metrics,
227            tracing_handle,
228            context,
229            worker_config: mz_dyncfgs::all_dyncfgs().into(),
230            hydration_rx,
231            hydration_tx,
232            suspended_collections: Default::default(),
233            read_only_tx,
234            read_only_rx,
235            server_maintenance_interval: Duration::ZERO,
236            init_system_time: mz_ore::now::SYSTEM_TIME(),
237            replica_expiration: Antichain::default(),
238        }
239    }
240
241    /// Return a mutable reference to the identified collection.
242    ///
243    /// Panics if the collection doesn't exist.
244    pub fn expect_collection_mut(&mut self, id: GlobalId) -> &mut CollectionState {
245        self.collections
246            .get_mut(&id)
247            .expect("collection must exist")
248    }
249
250    /// Construct a new frontier probe for the given input and add it to the state of the given
251    /// collections.
252    ///
253    /// The caller is responsible for attaching the returned probe handle to the respective
254    /// dataflow input stream.
255    pub fn input_probe_for(
256        &mut self,
257        input_id: GlobalId,
258        collection_ids: impl Iterator<Item = GlobalId>,
259    ) -> probe::Handle<Timestamp> {
260        let probe = probe::Handle::default();
261        for id in collection_ids {
262            if let Some(collection) = self.collections.get_mut(&id) {
263                collection.input_probes.insert(input_id, probe.clone());
264            }
265        }
266        probe
267    }
268
269    /// Apply the current `worker_config` to the compute state.
270    fn apply_worker_config(&mut self) {
271        use mz_compute_types::dyncfgs::*;
272
273        let config = &self.worker_config;
274
275        self.linear_join_spec = LinearJoinSpec::from_config(config);
276
277        if ENABLE_LGALLOC.get(config) {
278            if let Some(path) = &self.context.scratch_directory {
279                let clear_bytes = LGALLOC_SLOW_CLEAR_BYTES.get(config);
280                let eager_return = ENABLE_LGALLOC_EAGER_RECLAMATION.get(config);
281                let file_growth_dampener = LGALLOC_FILE_GROWTH_DAMPENER.get(config);
282                let interval = LGALLOC_BACKGROUND_INTERVAL.get(config);
283                let local_buffer_bytes = LGALLOC_LOCAL_BUFFER_BYTES.get(config);
284                info!(
285                    ?path,
286                    backgrund_interval=?interval,
287                    clear_bytes,
288                    eager_return,
289                    file_growth_dampener,
290                    local_buffer_bytes,
291                    "enabling lgalloc"
292                );
293                let background_worker_config = lgalloc::BackgroundWorkerConfig {
294                    interval,
295                    clear_bytes,
296                };
297                lgalloc::lgalloc_set_config(
298                    lgalloc::LgAlloc::new()
299                        .enable()
300                        .with_path(path.clone())
301                        .with_background_config(background_worker_config)
302                        .eager_return(eager_return)
303                        .file_growth_dampener(file_growth_dampener)
304                        .local_buffer_bytes(local_buffer_bytes),
305                );
306                crate::lgalloc::apply_limiter_config(config);
307            } else {
308                debug!("not enabling lgalloc, scratch directory not specified");
309            }
310        } else {
311            info!("disabling lgalloc");
312            lgalloc::lgalloc_set_config(lgalloc::LgAlloc::new().disable());
313        }
314
315        crate::memory_limiter::apply_limiter_config(config);
316
317        mz_ore::region::ENABLE_LGALLOC_REGION.store(
318            ENABLE_COLUMNATION_LGALLOC.get(config),
319            std::sync::atomic::Ordering::Relaxed,
320        );
321
322        let enable_columnar_lgalloc = ENABLE_COLUMNAR_LGALLOC.get(config);
323        mz_timely_util::containers::set_enable_columnar_lgalloc(enable_columnar_lgalloc);
324
325        // Remember the maintenance interval locally to avoid reading it from the config set on
326        // every server iteration.
327        self.server_maintenance_interval = COMPUTE_SERVER_MAINTENANCE_INTERVAL.get(config);
328
329        let overflowing_behavior = ORE_OVERFLOWING_BEHAVIOR.get(config);
330        match overflowing_behavior.parse() {
331            Ok(behavior) => mz_ore::overflowing::set_behavior(behavior),
332            Err(err) => {
333                error!(
334                    err,
335                    overflowing_behavior, "Invalid value for ore_overflowing_behavior"
336                );
337            }
338        }
339    }
340
341    /// Apply the provided replica expiration `offset` by converting it to a frontier relative to
342    /// the replica's initialization system time.
343    ///
344    /// Only expected to be called once when creating the instance. Guards against calling it
345    /// multiple times by checking if the local expiration time is set.
346    pub fn apply_expiration_offset(&mut self, offset: Duration) {
347        if self.replica_expiration.is_empty() {
348            let offset: EpochMillis = offset
349                .as_millis()
350                .try_into()
351                .expect("duration must fit within u64");
352            let replica_expiration_millis = self.init_system_time + offset;
353            let replica_expiration = Timestamp::from(replica_expiration_millis);
354
355            info!(
356                offset = %offset,
357                replica_expiration_millis = %replica_expiration_millis,
358                replica_expiration_utc = %mz_ore::now::to_datetime(replica_expiration_millis),
359                "setting replica expiration",
360            );
361            self.replica_expiration = Antichain::from_elem(replica_expiration);
362
363            // Record the replica expiration in the metrics.
364            self.metrics
365                .replica_expiration_timestamp_seconds
366                .set(replica_expiration.into());
367        }
368    }
369
370    /// Returns the cc or non-cc version of "dataflow_max_inflight_bytes", as
371    /// appropriate to this replica.
372    pub fn dataflow_max_inflight_bytes(&self) -> Option<usize> {
373        use mz_compute_types::dyncfgs::{
374            DATAFLOW_MAX_INFLIGHT_BYTES, DATAFLOW_MAX_INFLIGHT_BYTES_CC,
375        };
376
377        if self.persist_clients.cfg.is_cc_active {
378            DATAFLOW_MAX_INFLIGHT_BYTES_CC.get(&self.worker_config)
379        } else {
380            DATAFLOW_MAX_INFLIGHT_BYTES.get(&self.worker_config)
381        }
382    }
383}
384
385/// A wrapper around [ComputeState] with a live timely worker and response channel.
386pub(crate) struct ActiveComputeState<'a, A: Allocate> {
387    /// The underlying Timely worker.
388    pub timely_worker: &'a mut TimelyWorker<A>,
389    /// The compute state itself.
390    pub compute_state: &'a mut ComputeState,
391    /// The channel over which frontier information is reported.
392    pub response_tx: &'a mut ResponseSender,
393}
394
395/// A token that keeps a sink alive.
396pub struct SinkToken(#[allow(dead_code)] Box<dyn Any>);
397
398impl SinkToken {
399    /// Create a new `SinkToken`.
400    pub fn new(t: Box<dyn Any>) -> Self {
401        Self(t)
402    }
403}
404
405impl<'a, A: Allocate + 'static> ActiveComputeState<'a, A> {
406    /// Entrypoint for applying a compute command.
407    #[mz_ore::instrument(level = "debug")]
408    pub fn handle_compute_command(&mut self, cmd: ComputeCommand) {
409        use ComputeCommand::*;
410
411        self.compute_state.command_history.push(cmd.clone());
412
413        // Record the command duration, per worker and command kind.
414        let timer = self
415            .compute_state
416            .metrics
417            .handle_command_duration_seconds
418            .for_command(&cmd)
419            .start_timer();
420
421        match cmd {
422            Hello { .. } => panic!("Hello must be captured before"),
423            CreateInstance(instance_config) => self.handle_create_instance(*instance_config),
424            InitializationComplete => (),
425            UpdateConfiguration(params) => self.handle_update_configuration(*params),
426            CreateDataflow(dataflow) => self.handle_create_dataflow(*dataflow),
427            Schedule(id) => self.handle_schedule(id),
428            AllowCompaction { id, frontier } => self.handle_allow_compaction(id, frontier),
429            Peek(peek) => {
430                peek.otel_ctx.attach_as_parent();
431                self.handle_peek(*peek)
432            }
433            CancelPeek { uuid } => self.handle_cancel_peek(uuid),
434            AllowWrites => {
435                self.compute_state
436                    .read_only_tx
437                    .send(false)
438                    .expect("we're holding one other end");
439                self.compute_state.persist_clients.cfg().enable_compaction();
440            }
441        }
442
443        timer.observe_duration();
444    }
445
446    fn handle_create_instance(&mut self, config: InstanceConfig) {
447        // Ensure the state is consistent with the config before we initialize anything.
448        self.compute_state.apply_worker_config();
449        if let Some(offset) = config.expiration_offset {
450            self.compute_state.apply_expiration_offset(offset);
451        }
452
453        self.initialize_logging(config.logging);
454
455        self.compute_state.peek_stash_persist_location = Some(config.peek_stash_persist_location);
456    }
457
458    fn handle_update_configuration(&mut self, params: ComputeParameters) {
459        info!("Applying configuration update: {params:?}");
460
461        let ComputeParameters {
462            workload_class,
463            max_result_size,
464            tracing,
465            grpc_client: _grpc_client,
466            dyncfg_updates,
467        } = params;
468
469        if let Some(v) = workload_class {
470            self.compute_state.metrics.set_workload_class(v);
471        }
472        if let Some(v) = max_result_size {
473            self.compute_state.max_result_size = v;
474        }
475
476        tracing.apply(self.compute_state.tracing_handle.as_ref());
477
478        dyncfg_updates.apply(&self.compute_state.worker_config);
479        self.compute_state
480            .persist_clients
481            .cfg()
482            .apply_from(&dyncfg_updates);
483
484        // Note: We're only updating mz_metrics from the compute state here, but not from the
485        // equivalent storage state. This is because they're running on the same process and
486        // share the metrics.
487        mz_metrics::update_dyncfg(&dyncfg_updates);
488
489        self.compute_state.apply_worker_config();
490    }
491
492    fn handle_create_dataflow(
493        &mut self,
494        dataflow: DataflowDescription<RenderPlan, CollectionMetadata>,
495    ) {
496        let dataflow_index = Rc::new(self.timely_worker.next_dataflow_index());
497        let as_of = dataflow.as_of.clone().unwrap();
498
499        let dataflow_expiration = dataflow
500            .time_dependence
501            .as_ref()
502            .map(|time_dependence| {
503                self.determine_dataflow_expiration(time_dependence, &dataflow.until)
504            })
505            .unwrap_or_default();
506
507        // Add the dataflow expiration to `until`.
508        let until = dataflow.until.meet(&dataflow_expiration);
509
510        if dataflow.is_transient() {
511            debug!(
512                name = %dataflow.debug_name,
513                import_ids = %dataflow.display_import_ids(),
514                export_ids = %dataflow.display_export_ids(),
515                as_of = ?as_of.elements(),
516                time_dependence = ?dataflow.time_dependence,
517                expiration = ?dataflow_expiration.elements(),
518                expiration_datetime = ?dataflow_expiration.as_option().map(|t| mz_ore::now::to_datetime(t.into())),
519                plan_until = ?dataflow.until.elements(),
520                until = ?until.elements(),
521                "creating dataflow",
522            );
523        } else {
524            info!(
525                name = %dataflow.debug_name,
526                import_ids = %dataflow.display_import_ids(),
527                export_ids = %dataflow.display_export_ids(),
528                as_of = ?as_of.elements(),
529                time_dependence = ?dataflow.time_dependence,
530                expiration = ?dataflow_expiration.elements(),
531                expiration_datetime = ?dataflow_expiration.as_option().map(|t| mz_ore::now::to_datetime(t.into())),
532                plan_until = ?dataflow.until.elements(),
533                until = ?until.elements(),
534                "creating dataflow",
535            );
536        };
537
538        let subscribe_copy_ids: BTreeSet<_> = dataflow
539            .subscribe_ids()
540            .chain(dataflow.copy_to_ids())
541            .collect();
542
543        // Initialize compute and logging state for each object.
544        for object_id in dataflow.export_ids() {
545            let is_subscribe_or_copy = subscribe_copy_ids.contains(&object_id);
546            let metrics = self.compute_state.metrics.for_collection(object_id);
547            let mut collection = CollectionState::new(
548                Rc::clone(&dataflow_index),
549                is_subscribe_or_copy,
550                as_of.clone(),
551                metrics,
552            );
553
554            if let Some(logger) = self.compute_state.compute_logger.clone() {
555                let logging = CollectionLogging::new(
556                    object_id,
557                    logger,
558                    *dataflow_index,
559                    dataflow.import_ids(),
560                );
561                collection.logging = Some(logging);
562            }
563
564            collection.reset_reported_frontiers(ReportedFrontier::NotReported {
565                lower: as_of.clone(),
566            });
567
568            let existing = self.compute_state.collections.insert(object_id, collection);
569            if existing.is_some() {
570                error!(
571                    id = ?object_id,
572                    "existing collection for newly created dataflow",
573                );
574            }
575        }
576
577        let (start_signal, suspension_token) = StartSignal::new();
578        for id in dataflow.export_ids() {
579            self.compute_state
580                .suspended_collections
581                .insert(id, Rc::clone(&suspension_token));
582        }
583
584        crate::render::build_compute_dataflow(
585            self.timely_worker,
586            self.compute_state,
587            dataflow,
588            start_signal,
589            until,
590            dataflow_expiration,
591        );
592    }
593
594    fn handle_schedule(&mut self, id: GlobalId) {
595        // A `Schedule` command instructs us to begin dataflow computation for a collection, so
596        // we should unsuspend it by dropping the corresponding suspension token. Note that a
597        // dataflow can export multiple collections and they all share one suspension token, so the
598        // computation of a dataflow will only start once all its exported collections have been
599        // scheduled.
600        let suspension_token = self.compute_state.suspended_collections.remove(&id);
601        drop(suspension_token);
602    }
603
604    fn handle_allow_compaction(&mut self, id: GlobalId, frontier: Antichain<Timestamp>) {
605        if frontier.is_empty() {
606            // Indicates that we may drop `id`, as there are no more valid times to read.
607            self.drop_collection(id);
608        } else {
609            self.compute_state
610                .traces
611                .allow_compaction(id, frontier.borrow());
612        }
613    }
614
615    #[mz_ore::instrument(level = "debug")]
616    fn handle_peek(&mut self, peek: Peek) {
617        let pending = match &peek.target {
618            PeekTarget::Index { id } => {
619                // Acquire a copy of the trace suitable for fulfilling the peek.
620                let trace_bundle = self.compute_state.traces.get(id).unwrap().clone();
621                PendingPeek::index(peek, trace_bundle)
622            }
623            PeekTarget::Persist { metadata, .. } => {
624                let metadata = metadata.clone();
625                PendingPeek::persist(
626                    peek,
627                    Arc::clone(&self.compute_state.persist_clients),
628                    metadata,
629                    usize::cast_from(self.compute_state.max_result_size),
630                    self.timely_worker,
631                )
632            }
633        };
634
635        // Log the receipt of the peek.
636        if let Some(logger) = self.compute_state.compute_logger.as_mut() {
637            logger.log(&pending.as_log_event(true));
638        }
639
640        self.process_peek(&mut Antichain::new(), pending);
641    }
642
643    fn handle_cancel_peek(&mut self, uuid: Uuid) {
644        if let Some(peek) = self.compute_state.pending_peeks.remove(&uuid) {
645            self.send_peek_response(peek, PeekResponse::Canceled);
646        }
647    }
648
649    /// Arrange for the given collection to be dropped.
650    ///
651    /// Collection dropping occurs in three phases:
652    ///
653    ///  1. This method removes the collection from the [`ComputeState`] and drops its
654    ///     [`CollectionState`], including its held dataflow tokens. It then adds the dropped
655    ///     collection to `dropped_collections`.
656    ///  2. The next step of the Timely worker lets the source operators observe the token drops
657    ///     and shut themselves down.
658    ///  3. `report_dropped_collections` removes the entry from `dropped_collections` and emits any
659    ///     outstanding final responses required by the compute protocol.
660    ///
661    /// These steps ensure that we don't report a collection as dropped to the controller before it
662    /// has stopped reading from its inputs. Doing so would allow the controller to release its
663    /// read holds on the inputs, which could lead to panics from the replica trying to read
664    /// already compacted times.
665    fn drop_collection(&mut self, id: GlobalId) {
666        let collection = self
667            .compute_state
668            .collections
669            .remove(&id)
670            .expect("dropped untracked collection");
671
672        // If this collection is an index, remove its trace.
673        self.compute_state.traces.remove(&id);
674        // If the collection is unscheduled, remove it from the list of waiting collections.
675        self.compute_state.suspended_collections.remove(&id);
676
677        if ENABLE_ACTIVE_DATAFLOW_CANCELATION.get(&self.compute_state.worker_config) {
678            // Drop the dataflow, if all its exports have been dropped.
679            if let Ok(index) = Rc::try_unwrap(collection.dataflow_index) {
680                self.timely_worker.drop_dataflow(index);
681            }
682        }
683
684        // Remember the collection as dropped, for emission of outstanding final compute responses.
685        let dropped = DroppedCollection {
686            reported_frontiers: collection.reported_frontiers,
687            is_subscribe_or_copy: collection.is_subscribe_or_copy,
688        };
689        self.compute_state.dropped_collections.push((id, dropped));
690    }
691
692    /// Initializes timely dataflow logging and publishes as a view.
693    pub fn initialize_logging(&mut self, config: LoggingConfig) {
694        if self.compute_state.compute_logger.is_some() {
695            panic!("dataflow server has already initialized logging");
696        }
697
698        let LoggingTraces {
699            traces,
700            dataflow_index,
701            compute_logger: logger,
702        } = logging::initialize(self.timely_worker, &config);
703
704        let dataflow_index = Rc::new(dataflow_index);
705        let mut log_index_ids = config.index_logs;
706        for (log, trace) in traces {
707            // Install trace as maintained index.
708            let id = log_index_ids
709                .remove(&log)
710                .expect("`logging::initialize` does not invent logs");
711            self.compute_state.traces.set(id, trace);
712
713            // Initialize compute and logging state for the logging index.
714            let is_subscribe_or_copy = false;
715            let as_of = Antichain::from_elem(Timestamp::MIN);
716            let metrics = self.compute_state.metrics.for_collection(id);
717            let mut collection = CollectionState::new(
718                Rc::clone(&dataflow_index),
719                is_subscribe_or_copy,
720                as_of,
721                metrics,
722            );
723
724            let logging =
725                CollectionLogging::new(id, logger.clone(), *dataflow_index, std::iter::empty());
726            collection.logging = Some(logging);
727
728            let existing = self.compute_state.collections.insert(id, collection);
729            if existing.is_some() {
730                error!(
731                    id = ?id,
732                    "existing collection for newly initialized logging export",
733                );
734            }
735        }
736
737        // Sanity check.
738        assert!(
739            log_index_ids.is_empty(),
740            "failed to create requested logging indexes: {log_index_ids:?}",
741        );
742
743        self.compute_state.compute_logger = Some(logger);
744    }
745
746    /// Send progress information to the controller.
747    pub fn report_frontiers(&mut self) {
748        let mut responses = Vec::new();
749
750        // Maintain a single allocation for `new_frontier` to avoid allocating on every iteration.
751        let mut new_frontier = Antichain::new();
752
753        for (&id, collection) in self.compute_state.collections.iter_mut() {
754            // The compute protocol does not allow `Frontiers` responses for subscribe and copy-to
755            // collections (database-issues#4701).
756            if collection.is_subscribe_or_copy {
757                continue;
758            }
759
760            let reported = collection.reported_frontiers();
761
762            // Collect the write frontier and check for progress.
763            new_frontier.clear();
764            if let Some(traces) = self.compute_state.traces.get_mut(&id) {
765                assert!(
766                    collection.sink_write_frontier.is_none(),
767                    "collection {id} has multiple frontiers"
768                );
769                traces.oks_mut().read_upper(&mut new_frontier);
770            } else if let Some(frontier) = &collection.sink_write_frontier {
771                new_frontier.clone_from(&frontier.borrow());
772            } else {
773                error!(id = ?id, "collection without write frontier");
774                continue;
775            }
776            let new_write_frontier = reported
777                .write_frontier
778                .allows_reporting(&new_frontier)
779                .then(|| new_frontier.clone());
780
781            // Collect the output frontier and check for progress.
782            //
783            // By default, the output frontier equals the write frontier (which is still stored in
784            // `new_frontier`). If the collection provides a compute frontier, we construct the
785            // output frontier by taking the meet of write and compute frontier, to avoid:
786            //  * reporting progress through times we have not yet written
787            //  * reporting progress through times we have not yet fully processed, for
788            //    collections that jump their write frontiers into the future
789            if let Some(probe) = &collection.compute_probe {
790                probe.with_frontier(|frontier| new_frontier.extend(frontier.iter().copied()));
791            }
792            let new_output_frontier = reported
793                .output_frontier
794                .allows_reporting(&new_frontier)
795                .then(|| new_frontier.clone());
796
797            // Collect the input frontier and check for progress.
798            new_frontier.clear();
799            for probe in collection.input_probes.values() {
800                probe.with_frontier(|frontier| new_frontier.extend(frontier.iter().copied()));
801            }
802            let new_input_frontier = reported
803                .input_frontier
804                .allows_reporting(&new_frontier)
805                .then(|| new_frontier.clone());
806
807            if let Some(frontier) = &new_write_frontier {
808                collection
809                    .set_reported_write_frontier(ReportedFrontier::Reported(frontier.clone()));
810            }
811            if let Some(frontier) = &new_input_frontier {
812                collection
813                    .set_reported_input_frontier(ReportedFrontier::Reported(frontier.clone()));
814            }
815            if let Some(frontier) = &new_output_frontier {
816                collection
817                    .set_reported_output_frontier(ReportedFrontier::Reported(frontier.clone()));
818            }
819
820            let response = FrontiersResponse {
821                write_frontier: new_write_frontier,
822                input_frontier: new_input_frontier,
823                output_frontier: new_output_frontier,
824            };
825            if response.has_updates() {
826                responses.push((id, response));
827            }
828        }
829
830        for (id, frontiers) in responses {
831            self.send_compute_response(ComputeResponse::Frontiers(id, frontiers));
832        }
833    }
834
835    /// Report dropped collections to the controller.
836    pub fn report_dropped_collections(&mut self) {
837        let dropped_collections = std::mem::take(&mut self.compute_state.dropped_collections);
838
839        for (id, collection) in dropped_collections {
840            // The compute protocol requires us to send a `Frontiers` response with empty frontiers
841            // when a collection was dropped, unless:
842            //  * The frontier was already reported as empty previously, or
843            //  * The collection is a subscribe or copy-to.
844
845            if collection.is_subscribe_or_copy {
846                continue;
847            }
848
849            let reported = collection.reported_frontiers;
850            let write_frontier = (!reported.write_frontier.is_empty()).then(Antichain::new);
851            let input_frontier = (!reported.input_frontier.is_empty()).then(Antichain::new);
852            let output_frontier = (!reported.output_frontier.is_empty()).then(Antichain::new);
853
854            let frontiers = FrontiersResponse {
855                write_frontier,
856                input_frontier,
857                output_frontier,
858            };
859            if frontiers.has_updates() {
860                self.send_compute_response(ComputeResponse::Frontiers(id, frontiers));
861            }
862        }
863    }
864
865    /// Report operator hydration events.
866    pub fn report_operator_hydration(&self) {
867        let worker_id = self.timely_worker.index();
868        for event in self.compute_state.hydration_rx.try_iter() {
869            // The compute protocol forbids reporting `Status` about collections that have advanced
870            // to the empty frontier, so we ignore updates for those.
871            let collection = self.compute_state.collections.get(&event.export_id);
872            if collection.map_or(true, |c| c.reported_frontiers().all_empty()) {
873                continue;
874            }
875
876            let status = OperatorHydrationStatus {
877                collection_id: event.export_id,
878                lir_id: event.lir_id,
879                worker_id,
880                hydrated: event.hydrated,
881            };
882            let response = ComputeResponse::Status(StatusResponse::OperatorHydration(status));
883            self.send_compute_response(response);
884        }
885    }
886
887    /// Report per-worker metrics.
888    pub(crate) fn report_metrics(&self) {
889        if let Some(expiration) = self.compute_state.replica_expiration.as_option() {
890            let now = Duration::from_millis(mz_ore::now::SYSTEM_TIME()).as_secs_f64();
891            let expiration = Duration::from_millis(<u64>::from(expiration)).as_secs_f64();
892            let remaining = expiration - now;
893            self.compute_state
894                .metrics
895                .replica_expiration_remaining_seconds
896                .set(remaining)
897        }
898    }
899
900    /// Either complete the peek (and send the response) or put it in the pending set.
901    fn process_peek(&mut self, upper: &mut Antichain<Timestamp>, mut peek: PendingPeek) {
902        let response = match &mut peek {
903            PendingPeek::Index(peek) => {
904                let peek_stash_eligible = peek
905                    .peek
906                    .finishing
907                    .is_streamable(peek.peek.result_desc.arity());
908
909                let peek_stash_enabled = {
910                    let enabled = ENABLE_PEEK_RESPONSE_STASH.get(&self.compute_state.worker_config);
911                    let peek_persist_stash_available =
912                        self.compute_state.peek_stash_persist_location.is_some();
913                    if !peek_persist_stash_available && enabled {
914                        tracing::error!(
915                            "missing peek_stash_persist_location but peek stash is enabled"
916                        );
917                    }
918                    enabled && peek_persist_stash_available
919                };
920
921                let peek_stash_threshold_bytes =
922                    PEEK_RESPONSE_STASH_THRESHOLD_BYTES.get(&self.compute_state.worker_config);
923
924                match peek.seek_fulfillment(
925                    upper,
926                    self.compute_state.max_result_size,
927                    peek_stash_enabled && peek_stash_eligible,
928                    peek_stash_threshold_bytes,
929                ) {
930                    PeekStatus::Ready(result) => Some(result),
931                    PeekStatus::NotReady => None,
932                    PeekStatus::UsePeekStash => {
933                        let _span =
934                            span!(parent: &peek.span, Level::DEBUG, "process_stash_peek").entered();
935
936                        let peek_stash_batch_max_runs = PEEK_RESPONSE_STASH_BATCH_MAX_RUNS
937                            .get(&self.compute_state.worker_config);
938
939                        let stash_task = peek_stash::StashingPeek::start_upload(
940                            Arc::clone(&self.compute_state.persist_clients),
941                            self.compute_state
942                                .peek_stash_persist_location
943                                .as_ref()
944                                .expect("verified above"),
945                            peek.peek.clone(),
946                            peek.trace_bundle.clone(),
947                            peek_stash_batch_max_runs,
948                        );
949
950                        self.compute_state
951                            .pending_peeks
952                            .insert(peek.peek.uuid, PendingPeek::Stash(stash_task));
953                        return;
954                    }
955                }
956            }
957            PendingPeek::Persist(peek) => peek.result.try_recv().ok().map(|(result, duration)| {
958                self.compute_state
959                    .metrics
960                    .persist_peek_seconds
961                    .observe(duration.as_secs_f64());
962                result
963            }),
964            PendingPeek::Stash(stashing_peek) => {
965                let num_batches = PEEK_STASH_NUM_BATCHES.get(&self.compute_state.worker_config);
966                let batch_size = PEEK_STASH_BATCH_SIZE.get(&self.compute_state.worker_config);
967                stashing_peek.pump_rows(num_batches, batch_size);
968
969                if let Ok((response, duration)) = stashing_peek.result.try_recv() {
970                    self.compute_state
971                        .metrics
972                        .stashed_peek_seconds
973                        .observe(duration.as_secs_f64());
974                    tracing::trace!(?stashing_peek.peek, ?duration, "finished stashing peek response in persist");
975
976                    Some(response)
977                } else {
978                    None
979                }
980            }
981        };
982
983        if let Some(response) = response {
984            let _span = span!(parent: peek.span(), Level::DEBUG, "process_peek_response").entered();
985            self.send_peek_response(peek, response)
986        } else {
987            let uuid = peek.peek().uuid;
988            self.compute_state.pending_peeks.insert(uuid, peek);
989        }
990    }
991
992    /// Scan pending peeks and attempt to retire each.
993    pub fn process_peeks(&mut self) {
994        let mut upper = Antichain::new();
995        let pending_peeks = std::mem::take(&mut self.compute_state.pending_peeks);
996        for (_uuid, peek) in pending_peeks {
997            self.process_peek(&mut upper, peek);
998        }
999    }
1000
1001    /// Sends a response for this peek's resolution to the coordinator.
1002    ///
1003    /// Note that this function takes ownership of the `PendingPeek`, which is
1004    /// meant to prevent multiple responses to the same peek.
1005    #[mz_ore::instrument(level = "debug")]
1006    fn send_peek_response(&mut self, peek: PendingPeek, response: PeekResponse) {
1007        let log_event = peek.as_log_event(false);
1008        // Respond with the response.
1009        self.send_compute_response(ComputeResponse::PeekResponse(
1010            peek.peek().uuid,
1011            response,
1012            OpenTelemetryContext::obtain(),
1013        ));
1014
1015        // Log responding to the peek request.
1016        if let Some(logger) = self.compute_state.compute_logger.as_mut() {
1017            logger.log(&log_event);
1018        }
1019    }
1020
1021    /// Scan the shared subscribe response buffer, and forward results along.
1022    pub fn process_subscribes(&mut self) {
1023        let mut subscribe_responses = self.compute_state.subscribe_response_buffer.borrow_mut();
1024        for (sink_id, mut response) in subscribe_responses.drain(..) {
1025            // Update frontier logging for this subscribe.
1026            if let Some(collection) = self.compute_state.collections.get_mut(&sink_id) {
1027                let new_frontier = match &response {
1028                    SubscribeResponse::Batch(b) => b.upper.clone(),
1029                    SubscribeResponse::DroppedAt(_) => Antichain::new(),
1030                };
1031
1032                let reported = collection.reported_frontiers();
1033                assert!(
1034                    reported.write_frontier.allows_reporting(&new_frontier),
1035                    "subscribe write frontier regression: {:?} -> {:?}",
1036                    reported.write_frontier,
1037                    new_frontier,
1038                );
1039                assert!(
1040                    reported.input_frontier.allows_reporting(&new_frontier),
1041                    "subscribe input frontier regression: {:?} -> {:?}",
1042                    reported.input_frontier,
1043                    new_frontier,
1044                );
1045
1046                collection
1047                    .set_reported_write_frontier(ReportedFrontier::Reported(new_frontier.clone()));
1048                collection
1049                    .set_reported_input_frontier(ReportedFrontier::Reported(new_frontier.clone()));
1050                collection.set_reported_output_frontier(ReportedFrontier::Reported(new_frontier));
1051            } else {
1052                // Presumably tracking state for this subscribe was already dropped by
1053                // `drop_collection`. There is nothing left to do for logging.
1054            }
1055
1056            response
1057                .to_error_if_exceeds(usize::try_from(self.compute_state.max_result_size).unwrap());
1058            self.send_compute_response(ComputeResponse::SubscribeResponse(sink_id, response));
1059        }
1060    }
1061
1062    /// Scan the shared copy to response buffer, and forward results along.
1063    pub fn process_copy_tos(&self) {
1064        let mut responses = self.compute_state.copy_to_response_buffer.borrow_mut();
1065        for (sink_id, response) in responses.drain(..) {
1066            self.send_compute_response(ComputeResponse::CopyToResponse(sink_id, response));
1067        }
1068    }
1069
1070    /// Send a response to the coordinator.
1071    fn send_compute_response(&self, response: ComputeResponse) {
1072        // Ignore send errors because the coordinator is free to ignore our
1073        // responses. This happens during shutdown.
1074        let _ = self.response_tx.send(response);
1075    }
1076
1077    /// Checks for dataflow expiration. Panics if we're past the replica expiration time.
1078    pub(crate) fn check_expiration(&self) {
1079        let now = mz_ore::now::SYSTEM_TIME();
1080        if self.compute_state.replica_expiration.less_than(&now.into()) {
1081            let now_datetime = mz_ore::now::to_datetime(now);
1082            let expiration_datetime = self
1083                .compute_state
1084                .replica_expiration
1085                .as_option()
1086                .map(Into::into)
1087                .map(mz_ore::now::to_datetime);
1088
1089            // We error and assert separately to produce structured logs in anything that depends
1090            // on tracing.
1091            error!(
1092                now,
1093                now_datetime = ?now_datetime,
1094                expiration = ?self.compute_state.replica_expiration.elements(),
1095                expiration_datetime = ?expiration_datetime,
1096                "replica expired"
1097            );
1098
1099            // Repeat condition for better error message.
1100            assert!(
1101                !self.compute_state.replica_expiration.less_than(&now.into()),
1102                "replica expired. now: {now} ({now_datetime:?}), expiration: {:?} ({expiration_datetime:?})",
1103                self.compute_state.replica_expiration.elements(),
1104            );
1105        }
1106    }
1107
1108    /// Returns the dataflow expiration, i.e, the timestamp beyond which diffs can be
1109    /// dropped.
1110    ///
1111    /// Returns an empty timestamp if `replica_expiration` is unset or matches conditions under
1112    /// which dataflow expiration should be disabled.
1113    pub fn determine_dataflow_expiration(
1114        &self,
1115        time_dependence: &TimeDependence,
1116        until: &Antichain<mz_repr::Timestamp>,
1117    ) -> Antichain<mz_repr::Timestamp> {
1118        // Evaluate time dependence with respect to the expiration time.
1119        // * Step time forward to ensure the expiration time is different to the moment a dataflow
1120        //   can legitimately jump to.
1121        // * We cannot expire dataflow with an until that is less or equal to the expiration time.
1122        let iter = self
1123            .compute_state
1124            .replica_expiration
1125            .iter()
1126            .filter_map(|t| time_dependence.apply(*t))
1127            .filter_map(|t| mz_repr::Timestamp::try_step_forward(&t))
1128            .filter(|expiration| !until.less_equal(expiration));
1129        Antichain::from_iter(iter)
1130    }
1131}
1132
1133/// A peek against either an index or a Persist collection.
1134///
1135/// Note that `PendingPeek` intentionally does not implement or derive `Clone`,
1136/// as each `PendingPeek` is meant to be dropped after it's responded to.
1137pub enum PendingPeek {
1138    /// A peek against an index. (Possibly a temporary index created for the purpose.)
1139    Index(IndexPeek),
1140    /// A peek against a Persist-backed collection.
1141    Persist(PersistPeek),
1142    /// A peek against an index that is being stashed in the peek stash by an
1143    /// async background task.
1144    Stash(peek_stash::StashingPeek),
1145}
1146
1147impl PendingPeek {
1148    /// Produces a corresponding log event.
1149    pub fn as_log_event(&self, installed: bool) -> ComputeEvent {
1150        let peek = self.peek();
1151        let (id, peek_type) = match &peek.target {
1152            PeekTarget::Index { id } => (id, logging::compute::PeekType::Index),
1153            PeekTarget::Persist { id, .. } => (id, logging::compute::PeekType::Persist),
1154        };
1155        ComputeEvent::Peek(PeekEvent {
1156            peek: logging::compute::Peek::new(*id, peek.timestamp, peek.uuid),
1157            peek_type,
1158            installed,
1159        })
1160    }
1161
1162    fn index(peek: Peek, mut trace_bundle: TraceBundle) -> Self {
1163        let empty_frontier = Antichain::new();
1164        let timestamp_frontier = Antichain::from_elem(peek.timestamp);
1165        trace_bundle
1166            .oks_mut()
1167            .set_logical_compaction(timestamp_frontier.borrow());
1168        trace_bundle
1169            .errs_mut()
1170            .set_logical_compaction(timestamp_frontier.borrow());
1171        trace_bundle
1172            .oks_mut()
1173            .set_physical_compaction(empty_frontier.borrow());
1174        trace_bundle
1175            .errs_mut()
1176            .set_physical_compaction(empty_frontier.borrow());
1177
1178        PendingPeek::Index(IndexPeek {
1179            peek,
1180            trace_bundle,
1181            span: tracing::Span::current(),
1182        })
1183    }
1184
1185    fn persist<A: Allocate>(
1186        peek: Peek,
1187        persist_clients: Arc<PersistClientCache>,
1188        metadata: CollectionMetadata,
1189        max_result_size: usize,
1190        timely_worker: &TimelyWorker<A>,
1191    ) -> Self {
1192        let active_worker = {
1193            // Choose the worker that does the actual peek arbitrarily but consistently.
1194            let chosen_index = usize::cast_from(peek.uuid.hashed()) % timely_worker.peers();
1195            chosen_index == timely_worker.index()
1196        };
1197        let activator = timely_worker.sync_activator_for([].into());
1198        let peek_uuid = peek.uuid;
1199
1200        let (result_tx, result_rx) = oneshot::channel();
1201        let timestamp = peek.timestamp;
1202        let mfp_plan = peek.map_filter_project.clone();
1203        let max_results_needed = peek
1204            .finishing
1205            .limit
1206            .map(|l| usize::cast_from(u64::from(l)))
1207            .unwrap_or(usize::MAX)
1208            + peek.finishing.offset;
1209        let order_by = peek.finishing.order_by.clone();
1210
1211        // Persist peeks can include at most one literal constraint.
1212        let literal_constraint = peek
1213            .literal_constraints
1214            .clone()
1215            .map(|rows| rows.into_element());
1216
1217        let task_handle = mz_ore::task::spawn(|| "persist::peek", async move {
1218            let start = Instant::now();
1219            let result = if active_worker {
1220                PersistPeek::do_peek(
1221                    &persist_clients,
1222                    metadata,
1223                    timestamp,
1224                    literal_constraint,
1225                    mfp_plan,
1226                    max_result_size,
1227                    max_results_needed,
1228                )
1229                .await
1230            } else {
1231                Ok(vec![])
1232            };
1233            let result = match result {
1234                Ok(rows) => PeekResponse::Rows(RowCollection::new(rows, &order_by)),
1235                Err(e) => PeekResponse::Error(e.to_string()),
1236            };
1237            match result_tx.send((result, start.elapsed())) {
1238                Ok(()) => {}
1239                Err((_result, elapsed)) => {
1240                    debug!(duration =? elapsed, "dropping result for cancelled peek {peek_uuid}")
1241                }
1242            }
1243            match activator.activate() {
1244                Ok(()) => {}
1245                Err(_) => {
1246                    debug!("unable to wake timely after completed peek {peek_uuid}");
1247                }
1248            }
1249        });
1250        PendingPeek::Persist(PersistPeek {
1251            peek,
1252            _abort_handle: task_handle.abort_on_drop(),
1253            result: result_rx,
1254            span: tracing::Span::current(),
1255        })
1256    }
1257
1258    fn span(&self) -> &tracing::Span {
1259        match self {
1260            PendingPeek::Index(p) => &p.span,
1261            PendingPeek::Persist(p) => &p.span,
1262            PendingPeek::Stash(p) => &p.span,
1263        }
1264    }
1265
1266    pub(crate) fn peek(&self) -> &Peek {
1267        match self {
1268            PendingPeek::Index(p) => &p.peek,
1269            PendingPeek::Persist(p) => &p.peek,
1270            PendingPeek::Stash(p) => &p.peek,
1271        }
1272    }
1273}
1274
1275/// An in-progress Persist peek.
1276///
1277/// Note that `PendingPeek` intentionally does not implement or derive `Clone`,
1278/// as each `PendingPeek` is meant to be dropped after it's responded to.
1279pub struct PersistPeek {
1280    pub(crate) peek: Peek,
1281    /// A background task that's responsible for producing the peek results.
1282    /// If we're no longer interested in the results, we abort the task.
1283    _abort_handle: AbortOnDropHandle<()>,
1284    /// The result of the background task, eventually.
1285    result: oneshot::Receiver<(PeekResponse, Duration)>,
1286    /// The `tracing::Span` tracking this peek's operation
1287    span: tracing::Span,
1288}
1289
1290impl PersistPeek {
1291    async fn do_peek(
1292        persist_clients: &PersistClientCache,
1293        metadata: CollectionMetadata,
1294        as_of: Timestamp,
1295        literal_constraint: Option<Row>,
1296        mfp_plan: SafeMfpPlan,
1297        max_result_size: usize,
1298        mut limit_remaining: usize,
1299    ) -> Result<Vec<(Row, NonZeroUsize)>, String> {
1300        let client = persist_clients
1301            .open(metadata.persist_location)
1302            .await
1303            .map_err(|e| e.to_string())?;
1304
1305        let mut reader: ReadHandle<SourceData, (), Timestamp, StorageDiff> = client
1306            .open_leased_reader(
1307                metadata.data_shard,
1308                Arc::new(metadata.relation_desc.clone()),
1309                Arc::new(UnitSchema),
1310                Diagnostics::from_purpose("persist::peek"),
1311                USE_CRITICAL_SINCE_SNAPSHOT.get(client.dyncfgs()),
1312            )
1313            .await
1314            .map_err(|e| e.to_string())?;
1315
1316        // If we are using txn-wal for this collection, then the upper might
1317        // be advanced lazily and we have to go through txn-wal for reads.
1318        //
1319        // TODO: If/when we have a process-wide TxnsRead worker for clusterd,
1320        // use in here (instead of opening a new TxnsCache) to save a persist
1321        // reader registration and some txns shard read traffic.
1322        let mut txns_read = if let Some(txns_id) = metadata.txns_shard {
1323            Some(TxnsCache::open(&client, txns_id, Some(metadata.data_shard)).await)
1324        } else {
1325            None
1326        };
1327
1328        let metrics = client.metrics();
1329
1330        let mut cursor = StatsCursor::new(
1331            &mut reader,
1332            txns_read.as_mut(),
1333            metrics,
1334            &mfp_plan,
1335            &metadata.relation_desc,
1336            Antichain::from_elem(as_of),
1337        )
1338        .await
1339        .map_err(|since| {
1340            format!("attempted to peek at {as_of}, but the since has advanced to {since:?}")
1341        })?;
1342
1343        // Re-used state for processing and building rows.
1344        let mut result = vec![];
1345        let mut datum_vec = DatumVec::new();
1346        let mut row_builder = Row::default();
1347        let arena = RowArena::new();
1348        let mut total_size = 0usize;
1349
1350        let literal_len = match &literal_constraint {
1351            None => 0,
1352            Some(row) => row.iter().count(),
1353        };
1354
1355        'collect: while limit_remaining > 0 {
1356            let Some(batch) = cursor.next().await else {
1357                break;
1358            };
1359            for (data, _, d) in batch {
1360                let row = data.map_err(|e| e.to_string())?;
1361
1362                if let Some(literal) = &literal_constraint {
1363                    match row.iter().take(literal_len).cmp(literal.iter()) {
1364                        Ordering::Less => continue,
1365                        Ordering::Equal => {}
1366                        Ordering::Greater => break 'collect,
1367                    }
1368                }
1369
1370                let count: usize = d.try_into().map_err(|_| {
1371                    tracing::error!(
1372                        shard = %metadata.data_shard, diff = d, ?row,
1373                        "persist peek encountered negative multiplicities",
1374                    );
1375                    format!(
1376                        "Invalid data in source, \
1377                         saw retractions ({}) for row that does not exist: {:?}",
1378                        -d, row,
1379                    )
1380                })?;
1381                let Some(count) = NonZeroUsize::new(count) else {
1382                    continue;
1383                };
1384                let mut datum_local = datum_vec.borrow_with(&row);
1385                let eval_result = mfp_plan
1386                    .evaluate_into(&mut datum_local, &arena, &mut row_builder)
1387                    .map(|row| row.cloned())
1388                    .map_err(|e| e.to_string())?;
1389                if let Some(row) = eval_result {
1390                    total_size = total_size
1391                        .saturating_add(row.byte_len())
1392                        .saturating_add(std::mem::size_of::<NonZeroUsize>());
1393                    if total_size > max_result_size {
1394                        return Err(format!(
1395                            "result exceeds max size of {}",
1396                            ByteSize::b(u64::cast_from(max_result_size))
1397                        ));
1398                    }
1399                    result.push((row, count));
1400                    limit_remaining = limit_remaining.saturating_sub(count.get());
1401                    if limit_remaining == 0 {
1402                        break;
1403                    }
1404                }
1405            }
1406        }
1407
1408        Ok(result)
1409    }
1410}
1411
1412/// An in-progress index-backed peek, and data to eventually fulfill it.
1413pub struct IndexPeek {
1414    peek: Peek,
1415    /// The data from which the trace derives.
1416    trace_bundle: TraceBundle,
1417    /// The `tracing::Span` tracking this peek's operation
1418    span: tracing::Span,
1419}
1420
1421impl IndexPeek {
1422    /// Attempts to fulfill the peek and reports success.
1423    ///
1424    /// To produce output at `peek.timestamp`, we must be certain that
1425    /// it is no longer changing. A trace guarantees that all future
1426    /// changes will be greater than or equal to an element of `upper`.
1427    ///
1428    /// If an element of `upper` is less or equal to `peek.timestamp`,
1429    /// then there can be further updates that would change the output.
1430    /// If no element of `upper` is less or equal to `peek.timestamp`,
1431    /// then for any time `t` less or equal to `peek.timestamp` it is
1432    /// not the case that `upper` is less or equal to that timestamp,
1433    /// and so the result cannot further evolve.
1434    fn seek_fulfillment(
1435        &mut self,
1436        upper: &mut Antichain<Timestamp>,
1437        max_result_size: u64,
1438        peek_stash_eligible: bool,
1439        peek_stash_threshold_bytes: usize,
1440    ) -> PeekStatus {
1441        self.trace_bundle.oks_mut().read_upper(upper);
1442        if upper.less_equal(&self.peek.timestamp) {
1443            return PeekStatus::NotReady;
1444        }
1445        self.trace_bundle.errs_mut().read_upper(upper);
1446        if upper.less_equal(&self.peek.timestamp) {
1447            return PeekStatus::NotReady;
1448        }
1449
1450        let read_frontier = self.trace_bundle.compaction_frontier();
1451        if !read_frontier.less_equal(&self.peek.timestamp) {
1452            let error = format!(
1453                "Arrangement compaction frontier ({:?}) is beyond the time of the attempted read ({})",
1454                read_frontier.elements(),
1455                self.peek.timestamp,
1456            );
1457            return PeekStatus::Ready(PeekResponse::Error(error));
1458        }
1459
1460        self.collect_finished_data(
1461            max_result_size,
1462            peek_stash_eligible,
1463            peek_stash_threshold_bytes,
1464        )
1465    }
1466
1467    /// Collects data for a known-complete peek from the ok stream.
1468    fn collect_finished_data(
1469        &mut self,
1470        max_result_size: u64,
1471        peek_stash_eligible: bool,
1472        peek_stash_threshold_bytes: usize,
1473    ) -> PeekStatus {
1474        // Check if there exist any errors and, if so, return whatever one we
1475        // find first.
1476        let (mut cursor, storage) = self.trace_bundle.errs_mut().cursor();
1477        while cursor.key_valid(&storage) {
1478            let mut copies = Diff::ZERO;
1479            cursor.map_times(&storage, |time, diff| {
1480                if time.less_equal(&self.peek.timestamp) {
1481                    copies += diff;
1482                }
1483            });
1484            if copies.is_negative() {
1485                let error = cursor.key(&storage);
1486                tracing::error!(
1487                    target = %self.peek.target.id(), diff = %copies, %error,
1488                    "index peek encountered negative multiplicities in error trace",
1489                );
1490                return PeekStatus::Ready(PeekResponse::Error(format!(
1491                    "Invalid data in source errors, \
1492                    saw retractions ({}) for row that does not exist: {}",
1493                    -copies, error,
1494                )));
1495            }
1496            if copies.is_positive() {
1497                return PeekStatus::Ready(PeekResponse::Error(cursor.key(&storage).to_string()));
1498            }
1499            cursor.step_key(&storage);
1500        }
1501
1502        Self::collect_ok_finished_data(
1503            &self.peek,
1504            self.trace_bundle.oks_mut(),
1505            max_result_size,
1506            peek_stash_eligible,
1507            peek_stash_threshold_bytes,
1508        )
1509    }
1510
1511    /// Collects data for a known-complete peek from the ok stream.
1512    fn collect_ok_finished_data<Tr>(
1513        peek: &Peek<Timestamp>,
1514        oks_handle: &mut Tr,
1515        max_result_size: u64,
1516        peek_stash_eligible: bool,
1517        peek_stash_threshold_bytes: usize,
1518    ) -> PeekStatus
1519    where
1520        for<'a> Tr: TraceReader<
1521                Key<'a>: ToDatumIter + Eq,
1522                KeyOwn = Row,
1523                Val<'a>: ToDatumIter,
1524                TimeGat<'a>: PartialOrder<mz_repr::Timestamp>,
1525                DiffGat<'a> = &'a Diff,
1526            >,
1527    {
1528        let max_result_size = usize::cast_from(max_result_size);
1529        let count_byte_size = size_of::<NonZeroUsize>();
1530
1531        // We clone `literal_constraints` here because we don't want to move the constraints
1532        // out of the peek struct, and don't want to modify in-place.
1533        let mut peek_iterator = peek_result_iterator::PeekResultIterator::new(
1534            peek.target.id().clone(),
1535            peek.map_filter_project.clone(),
1536            peek.timestamp,
1537            peek.literal_constraints.clone().as_deref_mut(),
1538            oks_handle,
1539        );
1540
1541        // Accumulated `Vec<(row, count)>` results that we are likely to return.
1542        let mut results = Vec::new();
1543        let mut total_size: usize = 0;
1544
1545        // When set, a bound on the number of records we need to return.
1546        // The requirements on the records are driven by the finishing's
1547        // `order_by` field. Further limiting will happen when the results
1548        // are collected, so we don't need to have exactly this many results,
1549        // just at least those results that would have been returned.
1550        let max_results = peek.finishing.num_rows_needed();
1551
1552        let mut l_datum_vec = DatumVec::new();
1553        let mut r_datum_vec = DatumVec::new();
1554
1555        while let Some(row) = peek_iterator.next() {
1556            let row = match row {
1557                Ok(row) => row,
1558                Err(err) => return PeekStatus::Ready(PeekResponse::Error(err)),
1559            };
1560            let (row, copies) = row;
1561            let copies: NonZeroUsize = NonZeroUsize::try_from(copies).expect("fits into usize");
1562
1563            total_size = total_size
1564                .saturating_add(row.byte_len())
1565                .saturating_add(count_byte_size);
1566            if peek_stash_eligible && total_size > peek_stash_threshold_bytes {
1567                return PeekStatus::UsePeekStash;
1568            }
1569            if total_size > max_result_size {
1570                return PeekStatus::Ready(PeekResponse::Error(format!(
1571                    "result exceeds max size of {}",
1572                    ByteSize::b(u64::cast_from(max_result_size))
1573                )));
1574            }
1575
1576            results.push((row, copies));
1577
1578            // If we hold many more than `max_results` records, we can thin down
1579            // `results` using `self.finishing.ordering`.
1580            if let Some(max_results) = max_results {
1581                // We use a threshold twice what we intend, to amortize the work
1582                // across all of the insertions. We could tighten this, but it
1583                // works for the moment.
1584                if results.len() >= 2 * max_results {
1585                    if peek.finishing.order_by.is_empty() {
1586                        results.truncate(max_results);
1587                        return PeekStatus::Ready(PeekResponse::Rows(RowCollection::new(
1588                            results,
1589                            &peek.finishing.order_by,
1590                        )));
1591                    } else {
1592                        // We can sort `results` and then truncate to `max_results`.
1593                        // This has an effect similar to a priority queue, without
1594                        // its interactive dequeueing properties.
1595                        // TODO: Had we left these as `Vec<Datum>` we would avoid
1596                        // the unpacking; we should consider doing that, although
1597                        // it will require a re-pivot of the code to branch on this
1598                        // inner test (as we prefer not to maintain `Vec<Datum>`
1599                        // in the other case).
1600                        results.sort_by(|left, right| {
1601                            let left_datums = l_datum_vec.borrow_with(&left.0);
1602                            let right_datums = r_datum_vec.borrow_with(&right.0);
1603                            mz_expr::compare_columns(
1604                                &peek.finishing.order_by,
1605                                &left_datums,
1606                                &right_datums,
1607                                || left.0.cmp(&right.0),
1608                            )
1609                        });
1610                        let dropped = results.drain(max_results..);
1611                        let dropped_size =
1612                            dropped.into_iter().fold(0, |acc: usize, (row, _count)| {
1613                                acc.saturating_add(row.byte_len().saturating_add(count_byte_size))
1614                            });
1615                        total_size = total_size.saturating_sub(dropped_size);
1616                    }
1617                }
1618            }
1619        }
1620
1621        PeekStatus::Ready(PeekResponse::Rows(RowCollection::new(
1622            results,
1623            &peek.finishing.order_by,
1624        )))
1625    }
1626}
1627
1628/// For keeping track of the state of pending or ready peeks, and managing
1629/// control flow.
1630enum PeekStatus {
1631    /// The frontiers of objects are not yet advanced enough, peek is still
1632    /// pending.
1633    NotReady,
1634    /// The result size is above the configured threshold and the peek is
1635    /// eligible for using the peek result stash.
1636    UsePeekStash,
1637    /// The peek result is ready.
1638    Ready(PeekResponse),
1639}
1640
1641/// The frontiers we have reported to the controller for a collection.
1642#[derive(Debug)]
1643struct ReportedFrontiers {
1644    /// The reported write frontier.
1645    write_frontier: ReportedFrontier,
1646    /// The reported input frontier.
1647    input_frontier: ReportedFrontier,
1648    /// The reported output frontier.
1649    output_frontier: ReportedFrontier,
1650}
1651
1652impl ReportedFrontiers {
1653    /// Creates a new `ReportedFrontiers` instance.
1654    fn new() -> Self {
1655        Self {
1656            write_frontier: ReportedFrontier::new(),
1657            input_frontier: ReportedFrontier::new(),
1658            output_frontier: ReportedFrontier::new(),
1659        }
1660    }
1661
1662    /// Returns whether all reported frontiers are empty.
1663    fn all_empty(&self) -> bool {
1664        self.write_frontier.is_empty()
1665            && self.input_frontier.is_empty()
1666            && self.output_frontier.is_empty()
1667    }
1668}
1669
1670/// A frontier we have reported to the controller, or the least frontier we are allowed to report.
1671#[derive(Clone, Debug)]
1672pub enum ReportedFrontier {
1673    /// A frontier has been previously reported.
1674    Reported(Antichain<Timestamp>),
1675    /// No frontier has been reported yet.
1676    NotReported {
1677        /// A lower bound for frontiers that may be reported in the future.
1678        lower: Antichain<Timestamp>,
1679    },
1680}
1681
1682impl ReportedFrontier {
1683    /// Create a new `ReportedFrontier` enforcing the minimum lower bound.
1684    pub fn new() -> Self {
1685        let lower = Antichain::from_elem(timely::progress::Timestamp::minimum());
1686        Self::NotReported { lower }
1687    }
1688
1689    /// Whether the reported frontier is the empty frontier.
1690    pub fn is_empty(&self) -> bool {
1691        match self {
1692            Self::Reported(frontier) => frontier.is_empty(),
1693            Self::NotReported { .. } => false,
1694        }
1695    }
1696
1697    /// Whether this `ReportedFrontier` allows reporting the given frontier.
1698    ///
1699    /// A `ReportedFrontier` allows reporting of another frontier if:
1700    ///  * The other frontier is greater than the reported frontier.
1701    ///  * The other frontier is greater than or equal to the lower bound.
1702    fn allows_reporting(&self, other: &Antichain<Timestamp>) -> bool {
1703        match self {
1704            Self::Reported(frontier) => PartialOrder::less_than(frontier, other),
1705            Self::NotReported { lower } => PartialOrder::less_equal(lower, other),
1706        }
1707    }
1708}
1709
1710/// State maintained for a compute collection.
1711pub struct CollectionState {
1712    /// Tracks the frontiers that have been reported to the controller.
1713    reported_frontiers: ReportedFrontiers,
1714    /// The index of the dataflow computing this collection.
1715    ///
1716    /// Used for dropping the dataflow when the collection is dropped.
1717    /// The Dataflow index is wrapped in an `Rc`s and can be shared between collections, to reflect
1718    /// the possibility that a single dataflow can export multiple collections.
1719    dataflow_index: Rc<usize>,
1720    /// Whether this collection is a subscribe or copy-to.
1721    ///
1722    /// The compute protocol does not allow `Frontiers` responses for subscribe and copy-to
1723    /// collections, so we need to be able to recognize them. This is something we would like to
1724    /// change in the future (database-issues#4701).
1725    pub is_subscribe_or_copy: bool,
1726    /// The collection's initial as-of frontier.
1727    ///
1728    /// Used to determine hydration status.
1729    as_of: Antichain<Timestamp>,
1730
1731    /// A token that should be dropped when this collection is dropped to clean up associated
1732    /// sink state.
1733    ///
1734    /// Only `Some` if the collection is a sink.
1735    pub sink_token: Option<SinkToken>,
1736    /// Frontier of sink writes.
1737    ///
1738    /// Only `Some` if the collection is a sink and *not* a subscribe.
1739    pub sink_write_frontier: Option<Rc<RefCell<Antichain<Timestamp>>>>,
1740    /// Frontier probes for every input to the collection.
1741    pub input_probes: BTreeMap<GlobalId, probe::Handle<Timestamp>>,
1742    /// A probe reporting the frontier of times through which all collection outputs have been
1743    /// computed (but not necessarily written).
1744    ///
1745    /// `None` for collections with compute frontiers equal to their write frontiers.
1746    pub compute_probe: Option<probe::Handle<Timestamp>>,
1747    /// Logging state maintained for this collection.
1748    logging: Option<CollectionLogging>,
1749    /// Metrics tracked for this collection.
1750    metrics: CollectionMetrics,
1751}
1752
1753impl CollectionState {
1754    fn new(
1755        dataflow_index: Rc<usize>,
1756        is_subscribe_or_copy: bool,
1757        as_of: Antichain<Timestamp>,
1758        metrics: CollectionMetrics,
1759    ) -> Self {
1760        Self {
1761            reported_frontiers: ReportedFrontiers::new(),
1762            dataflow_index,
1763            is_subscribe_or_copy,
1764            as_of,
1765            sink_token: None,
1766            sink_write_frontier: None,
1767            input_probes: Default::default(),
1768            compute_probe: None,
1769            logging: None,
1770            metrics,
1771        }
1772    }
1773
1774    /// Return the frontiers that have been reported to the controller.
1775    fn reported_frontiers(&self) -> &ReportedFrontiers {
1776        &self.reported_frontiers
1777    }
1778
1779    /// Reset all reported frontiers to the given value.
1780    pub fn reset_reported_frontiers(&mut self, frontier: ReportedFrontier) {
1781        self.reported_frontiers.write_frontier = frontier.clone();
1782        self.reported_frontiers.input_frontier = frontier.clone();
1783        self.reported_frontiers.output_frontier = frontier;
1784    }
1785
1786    /// Set the write frontier that has been reported to the controller.
1787    fn set_reported_write_frontier(&mut self, frontier: ReportedFrontier) {
1788        if let Some(logging) = &mut self.logging {
1789            let time = match &frontier {
1790                ReportedFrontier::Reported(frontier) => frontier.get(0).copied(),
1791                ReportedFrontier::NotReported { .. } => Some(Timestamp::MIN),
1792            };
1793            logging.set_frontier(time);
1794        }
1795
1796        self.reported_frontiers.write_frontier = frontier;
1797    }
1798
1799    /// Set the input frontier that has been reported to the controller.
1800    fn set_reported_input_frontier(&mut self, frontier: ReportedFrontier) {
1801        // Use this opportunity to update our input frontier logging.
1802        if let Some(logging) = &mut self.logging {
1803            for (id, probe) in &self.input_probes {
1804                let new_time = probe.with_frontier(|frontier| frontier.as_option().copied());
1805                logging.set_import_frontier(*id, new_time);
1806            }
1807        }
1808
1809        self.reported_frontiers.input_frontier = frontier;
1810    }
1811
1812    /// Set the output frontier that has been reported to the controller.
1813    fn set_reported_output_frontier(&mut self, frontier: ReportedFrontier) {
1814        let already_hydrated = self.hydrated();
1815
1816        self.reported_frontiers.output_frontier = frontier;
1817
1818        if !already_hydrated && self.hydrated() {
1819            if let Some(logging) = &mut self.logging {
1820                logging.set_hydrated();
1821            }
1822            self.metrics.record_collection_hydrated();
1823        }
1824    }
1825
1826    /// Return whether this collection is hydrated.
1827    fn hydrated(&self) -> bool {
1828        match &self.reported_frontiers.output_frontier {
1829            ReportedFrontier::Reported(frontier) => PartialOrder::less_than(&self.as_of, frontier),
1830            ReportedFrontier::NotReported { .. } => false,
1831        }
1832    }
1833}
1834
1835/// State remembered about a dropped compute collection.
1836///
1837/// This is the subset of the full [`CollectionState`] that survives the invocation of
1838/// `drop_collection`, until it is finally dropped in `report_dropped_collections`. It includes any
1839/// information required to report the dropping of a collection to the controller.
1840///
1841/// Note that this state must _not_ store any state (such as tokens) whose dropping releases
1842/// resources elsewhere in the system. A `DroppedCollection` for a collection dropped during
1843/// reconciliation might be alive at the same time as the [`CollectionState`] for the re-created
1844/// collection, and if the dropped collection hasn't released all its held resources by the time
1845/// the new one is created, conflicts can ensue.
1846pub struct DroppedCollection {
1847    reported_frontiers: ReportedFrontiers,
1848    is_subscribe_or_copy: bool,
1849}
1850
1851/// An event reporting the hydration status of an LIR node in a dataflow.
1852pub struct HydrationEvent {
1853    /// The ID of the export this dataflow maintains.
1854    pub export_id: GlobalId,
1855    /// The ID of the LIR node.
1856    pub lir_id: LirId,
1857    /// Whether the node is hydrated.
1858    pub hydrated: bool,
1859}