mz_compute/
compute_state.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5
6//! Worker-local state for compute timely instances.
7
8use std::any::Any;
9use std::cell::RefCell;
10use std::cmp::Ordering;
11use std::collections::{BTreeMap, BTreeSet};
12use std::num::NonZeroUsize;
13use std::ops::DerefMut;
14use std::rc::Rc;
15use std::sync::{Arc, mpsc};
16use std::time::{Duration, Instant};
17
18use bytesize::ByteSize;
19use differential_dataflow::Hashable;
20use differential_dataflow::IntoOwned;
21use differential_dataflow::lattice::Lattice;
22use differential_dataflow::trace::{Cursor, TraceReader};
23use mz_compute_client::logging::LoggingConfig;
24use mz_compute_client::protocol::command::{
25    ComputeCommand, ComputeParameters, InstanceConfig, Peek, PeekTarget,
26};
27use mz_compute_client::protocol::history::ComputeCommandHistory;
28use mz_compute_client::protocol::response::{
29    ComputeResponse, CopyToResponse, FrontiersResponse, OperatorHydrationStatus, PeekResponse,
30    StatusResponse, SubscribeResponse,
31};
32use mz_compute_types::dataflows::DataflowDescription;
33use mz_compute_types::plan::LirId;
34use mz_compute_types::plan::render_plan::RenderPlan;
35use mz_dyncfg::ConfigSet;
36use mz_expr::SafeMfpPlan;
37use mz_expr::row::RowCollection;
38use mz_ore::cast::CastFrom;
39use mz_ore::collections::CollectionExt;
40use mz_ore::metrics::UIntGauge;
41use mz_ore::now::EpochMillis;
42use mz_ore::task::AbortOnDropHandle;
43use mz_ore::tracing::{OpenTelemetryContext, TracingHandle};
44use mz_persist_client::Diagnostics;
45use mz_persist_client::cache::PersistClientCache;
46use mz_persist_client::cfg::USE_CRITICAL_SINCE_SNAPSHOT;
47use mz_persist_client::read::ReadHandle;
48use mz_persist_types::codec_impls::UnitSchema;
49use mz_repr::fixed_length::ToDatumIter;
50use mz_repr::{DatumVec, Diff, GlobalId, Row, RowArena, Timestamp};
51use mz_storage_operators::stats::StatsCursor;
52use mz_storage_types::StorageDiff;
53use mz_storage_types::controller::CollectionMetadata;
54use mz_storage_types::dyncfgs::ORE_OVERFLOWING_BEHAVIOR;
55use mz_storage_types::sources::SourceData;
56use mz_storage_types::time_dependence::TimeDependence;
57use mz_txn_wal::operator::TxnsContext;
58use mz_txn_wal::txn_cache::TxnsCache;
59use timely::communication::Allocate;
60use timely::dataflow::operators::probe;
61use timely::order::PartialOrder;
62use timely::progress::frontier::Antichain;
63use timely::scheduling::Scheduler;
64use timely::worker::Worker as TimelyWorker;
65use tokio::sync::{oneshot, watch};
66use tracing::{Level, debug, error, info, span, warn};
67use uuid::Uuid;
68
69use crate::arrangement::manager::{TraceBundle, TraceManager};
70use crate::logging;
71use crate::logging::compute::{CollectionLogging, ComputeEvent, PeekEvent};
72use crate::logging::initialize::LoggingTraces;
73use crate::metrics::{CollectionMetrics, WorkerMetrics};
74use crate::render::{LinearJoinSpec, StartSignal};
75use crate::server::{ComputeInstanceContext, ResponseSender};
76
77/// Worker-local state that is maintained across dataflows.
78///
79/// This state is restricted to the COMPUTE state, the deterministic, idempotent work
80/// done between data ingress and egress.
81pub struct ComputeState {
82    /// State kept for each installed compute collection.
83    ///
84    /// Each collection has exactly one frontier.
85    /// How the frontier is communicated depends on the collection type:
86    ///  * Frontiers of indexes are equal to the frontier of their corresponding traces in the
87    ///    `TraceManager`.
88    ///  * Persist sinks store their current frontier in `CollectionState::sink_write_frontier`.
89    ///  * Subscribes report their frontiers through the `subscribe_response_buffer`.
90    pub collections: BTreeMap<GlobalId, CollectionState>,
91    /// Collections that were recently dropped and whose removal needs to be reported.
92    pub dropped_collections: Vec<(GlobalId, DroppedCollection)>,
93    /// The traces available for sharing across dataflows.
94    pub traces: TraceManager,
95    /// Shared buffer with SUBSCRIBE operator instances by which they can respond.
96    ///
97    /// The entries are pairs of sink identifier (to identify the subscribe instance)
98    /// and the response itself.
99    pub subscribe_response_buffer: Rc<RefCell<Vec<(GlobalId, SubscribeResponse)>>>,
100    /// Shared buffer with S3 oneshot operator instances by which they can respond.
101    ///
102    /// The entries are pairs of sink identifier (to identify the s3 oneshot instance)
103    /// and the response itself.
104    pub copy_to_response_buffer: Rc<RefCell<Vec<(GlobalId, CopyToResponse)>>>,
105    /// Peek commands that are awaiting fulfillment.
106    pub pending_peeks: BTreeMap<Uuid, PendingPeek>,
107    /// The logger, from Timely's logging framework, if logs are enabled.
108    pub compute_logger: Option<logging::compute::Logger>,
109    /// A process-global cache of (blob_uri, consensus_uri) -> PersistClient.
110    /// This is intentionally shared between workers.
111    pub persist_clients: Arc<PersistClientCache>,
112    /// Context necessary for rendering txn-wal operators.
113    pub txns_ctx: TxnsContext,
114    /// History of commands received by this workers and all its peers.
115    pub command_history: ComputeCommandHistory<UIntGauge>,
116    /// Max size in bytes of any result.
117    max_result_size: u64,
118    /// Specification for rendering linear joins.
119    pub linear_join_spec: LinearJoinSpec,
120    /// Metrics for this worker.
121    pub metrics: WorkerMetrics,
122    /// A process-global handle to tracing configuration.
123    tracing_handle: Arc<TracingHandle>,
124    /// Other configuration for compute
125    pub context: ComputeInstanceContext,
126    /// Per-worker dynamic configuration.
127    ///
128    /// This is separate from the process-global `ConfigSet` and contains config options that need
129    /// to be applied consistently with compute command order.
130    ///
131    /// For example, for options that influence dataflow rendering it is important that all workers
132    /// render the same dataflow with the same options. If these options were stored in a global
133    /// `ConfigSet`, we couldn't guarantee that all workers observe changes to them at the same
134    /// point in the stream of compute commands. Storing per-worker configuration ensures that
135    /// because each worker's configuration is only updated once that worker observes the
136    /// respective `UpdateConfiguration` command.
137    ///
138    /// Reference-counted to avoid cloning for `Context`.
139    pub worker_config: Rc<ConfigSet>,
140
141    /// Receiver of operator hydration events.
142    pub hydration_rx: mpsc::Receiver<HydrationEvent>,
143    /// Transmitter of operator hydration events.
144    ///
145    /// Copies of this sender are passed to the hydration logging operators.
146    pub hydration_tx: mpsc::Sender<HydrationEvent>,
147
148    /// Collections awaiting schedule instruction by the controller.
149    ///
150    /// Each entry stores a reference to a token that can be dropped to unsuspend the collection's
151    /// dataflow. Multiple collections can reference the same token if they are exported by the
152    /// same dataflow.
153    suspended_collections: BTreeMap<GlobalId, Rc<dyn Any>>,
154
155    /// When this replica/cluster is in read-only mode it must not affect any
156    /// changes to external state. This flag can only be changed by a
157    /// [ComputeCommand::AllowWrites].
158    ///
159    /// Everything running on this replica/cluster must obey this flag. At the
160    /// time of writing the only part that is doing this is `persist_sink`.
161    ///
162    /// NOTE: In the future, we might want a more complicated flag, for example
163    /// something that tells us after which timestamp we are allowed to write.
164    /// In this first version we are keeping things as simple as possible!
165    pub read_only_rx: watch::Receiver<bool>,
166
167    /// Send-side for read-only state.
168    pub read_only_tx: watch::Sender<bool>,
169
170    /// Interval at which to perform server maintenance tasks. Set to a zero interval to
171    /// perform maintenance with every `step_or_park` invocation.
172    pub server_maintenance_interval: Duration,
173
174    /// The [`mz_ore::now::SYSTEM_TIME`] at which the replica was started.
175    ///
176    /// Used to compute `replica_expiration`.
177    pub init_system_time: EpochMillis,
178
179    /// The maximum time for which the replica is expected to live. If not empty, dataflows in the
180    /// replica can drop diffs associated with timestamps beyond the replica expiration.
181    /// The replica will panic if such dataflows are not dropped before the replica has expired.
182    pub replica_expiration: Antichain<Timestamp>,
183}
184
185impl ComputeState {
186    /// Construct a new `ComputeState`.
187    pub fn new(
188        persist_clients: Arc<PersistClientCache>,
189        txns_ctx: TxnsContext,
190        metrics: WorkerMetrics,
191        tracing_handle: Arc<TracingHandle>,
192        context: ComputeInstanceContext,
193    ) -> Self {
194        let traces = TraceManager::new(metrics.clone());
195        let command_history = ComputeCommandHistory::new(metrics.for_history());
196        let (hydration_tx, hydration_rx) = mpsc::channel();
197
198        // We always initialize as read_only=true. Only when we're explicitly
199        // allowed do we switch to doing writes.
200        let (read_only_tx, read_only_rx) = watch::channel(true);
201
202        Self {
203            collections: Default::default(),
204            dropped_collections: Default::default(),
205            traces,
206            subscribe_response_buffer: Default::default(),
207            copy_to_response_buffer: Default::default(),
208            pending_peeks: Default::default(),
209            compute_logger: None,
210            persist_clients,
211            txns_ctx,
212            command_history,
213            max_result_size: u64::MAX,
214            linear_join_spec: Default::default(),
215            metrics,
216            tracing_handle,
217            context,
218            worker_config: mz_dyncfgs::all_dyncfgs().into(),
219            hydration_rx,
220            hydration_tx,
221            suspended_collections: Default::default(),
222            read_only_tx,
223            read_only_rx,
224            server_maintenance_interval: Duration::ZERO,
225            init_system_time: mz_ore::now::SYSTEM_TIME(),
226            replica_expiration: Antichain::default(),
227        }
228    }
229
230    /// Return a mutable reference to the identified collection.
231    ///
232    /// Panics if the collection doesn't exist.
233    pub fn expect_collection_mut(&mut self, id: GlobalId) -> &mut CollectionState {
234        self.collections
235            .get_mut(&id)
236            .expect("collection must exist")
237    }
238
239    /// Construct a new frontier probe for the given input and add it to the state of the given
240    /// collections.
241    ///
242    /// The caller is responsible for attaching the returned probe handle to the respective
243    /// dataflow input stream.
244    pub fn input_probe_for(
245        &mut self,
246        input_id: GlobalId,
247        collection_ids: impl Iterator<Item = GlobalId>,
248    ) -> probe::Handle<Timestamp> {
249        let probe = probe::Handle::default();
250        for id in collection_ids {
251            if let Some(collection) = self.collections.get_mut(&id) {
252                collection.input_probes.insert(input_id, probe.clone());
253            }
254        }
255        probe
256    }
257
258    /// Apply the current `worker_config` to the compute state.
259    fn apply_worker_config(&mut self) {
260        use mz_compute_types::dyncfgs::*;
261
262        let config = &self.worker_config;
263
264        self.linear_join_spec = LinearJoinSpec::from_config(config);
265
266        if ENABLE_LGALLOC.get(config) {
267            if let Some(path) = &self.context.scratch_directory {
268                let clear_bytes = LGALLOC_SLOW_CLEAR_BYTES.get(config);
269                let eager_return = ENABLE_LGALLOC_EAGER_RECLAMATION.get(config);
270                let file_growth_dampener = LGALLOC_FILE_GROWTH_DAMPENER.get(config);
271                let interval = LGALLOC_BACKGROUND_INTERVAL.get(config);
272                let local_buffer_bytes = LGALLOC_LOCAL_BUFFER_BYTES.get(config);
273                info!(
274                    ?path,
275                    backgrund_interval=?interval,
276                    clear_bytes,
277                    eager_return,
278                    file_growth_dampener,
279                    local_buffer_bytes,
280                    "enabling lgalloc"
281                );
282                let background_worker_config = lgalloc::BackgroundWorkerConfig {
283                    interval,
284                    clear_bytes,
285                };
286                lgalloc::lgalloc_set_config(
287                    lgalloc::LgAlloc::new()
288                        .enable()
289                        .with_path(path.clone())
290                        .with_background_config(background_worker_config)
291                        .eager_return(eager_return)
292                        .file_growth_dampener(file_growth_dampener)
293                        .local_buffer_bytes(local_buffer_bytes),
294                );
295            } else {
296                debug!("not enabling lgalloc, scratch directory not specified");
297            }
298        } else {
299            info!("disabling lgalloc");
300            lgalloc::lgalloc_set_config(lgalloc::LgAlloc::new().disable());
301        }
302
303        mz_ore::region::ENABLE_LGALLOC_REGION.store(
304            ENABLE_COLUMNATION_LGALLOC.get(config),
305            std::sync::atomic::Ordering::Relaxed,
306        );
307
308        let enable_columnar_lgalloc = ENABLE_COLUMNAR_LGALLOC.get(config);
309        mz_timely_util::containers::set_enable_columnar_lgalloc(enable_columnar_lgalloc);
310
311        // Remember the maintenance interval locally to avoid reading it from the config set on
312        // every server iteration.
313        self.server_maintenance_interval = COMPUTE_SERVER_MAINTENANCE_INTERVAL.get(config);
314
315        let overflowing_behavior = ORE_OVERFLOWING_BEHAVIOR.get(config);
316        match overflowing_behavior.parse() {
317            Ok(behavior) => mz_ore::overflowing::set_behavior(behavior),
318            Err(err) => {
319                error!(
320                    err,
321                    overflowing_behavior, "Invalid value for ore_overflowing_behavior"
322                );
323            }
324        }
325    }
326
327    /// Apply the provided replica expiration `offset` by converting it to a frontier relative to
328    /// the replica's initialization system time.
329    ///
330    /// Only expected to be called once when creating the instance. Guards against calling it
331    /// multiple times by checking if the local expiration time is set.
332    pub fn apply_expiration_offset(&mut self, offset: Duration) {
333        if self.replica_expiration.is_empty() {
334            let offset: EpochMillis = offset
335                .as_millis()
336                .try_into()
337                .expect("duration must fit within u64");
338            let replica_expiration_millis = self.init_system_time + offset;
339            let replica_expiration = Timestamp::from(replica_expiration_millis);
340
341            info!(
342                offset = %offset,
343                replica_expiration_millis = %replica_expiration_millis,
344                replica_expiration_utc = %mz_ore::now::to_datetime(replica_expiration_millis),
345                "setting replica expiration",
346            );
347            self.replica_expiration = Antichain::from_elem(replica_expiration);
348
349            // Record the replica expiration in the metrics.
350            self.metrics
351                .replica_expiration_timestamp_seconds
352                .set(replica_expiration.into());
353        }
354    }
355
356    /// Returns the cc or non-cc version of "dataflow_max_inflight_bytes", as
357    /// appropriate to this replica.
358    pub fn dataflow_max_inflight_bytes(&self) -> Option<usize> {
359        use mz_compute_types::dyncfgs::{
360            DATAFLOW_MAX_INFLIGHT_BYTES, DATAFLOW_MAX_INFLIGHT_BYTES_CC,
361        };
362
363        if self.persist_clients.cfg.is_cc_active {
364            DATAFLOW_MAX_INFLIGHT_BYTES_CC.get(&self.worker_config)
365        } else {
366            DATAFLOW_MAX_INFLIGHT_BYTES.get(&self.worker_config)
367        }
368    }
369}
370
371/// A wrapper around [ComputeState] with a live timely worker and response channel.
372pub(crate) struct ActiveComputeState<'a, A: Allocate> {
373    /// The underlying Timely worker.
374    pub timely_worker: &'a mut TimelyWorker<A>,
375    /// The compute state itself.
376    pub compute_state: &'a mut ComputeState,
377    /// The channel over which frontier information is reported.
378    pub response_tx: &'a mut ResponseSender,
379}
380
381/// A token that keeps a sink alive.
382pub struct SinkToken(#[allow(dead_code)] Box<dyn Any>);
383
384impl SinkToken {
385    /// Create a new `SinkToken`.
386    pub fn new(t: Box<dyn Any>) -> Self {
387        Self(t)
388    }
389}
390
391impl<'a, A: Allocate + 'static> ActiveComputeState<'a, A> {
392    /// Entrypoint for applying a compute command.
393    #[mz_ore::instrument(level = "debug")]
394    pub fn handle_compute_command(&mut self, cmd: ComputeCommand) {
395        use ComputeCommand::*;
396
397        self.compute_state.command_history.push(cmd.clone());
398
399        // Record the command duration, per worker and command kind.
400        let timer = self
401            .compute_state
402            .metrics
403            .handle_command_duration_seconds
404            .for_command(&cmd)
405            .start_timer();
406
407        match cmd {
408            CreateTimely { .. } => panic!("CreateTimely must be captured before"),
409            CreateInstance(instance_config) => self.handle_create_instance(*instance_config),
410            InitializationComplete => (),
411            UpdateConfiguration(params) => self.handle_update_configuration(*params),
412            CreateDataflow(dataflow) => self.handle_create_dataflow(*dataflow),
413            Schedule(id) => self.handle_schedule(id),
414            AllowCompaction { id, frontier } => self.handle_allow_compaction(id, frontier),
415            Peek(peek) => {
416                peek.otel_ctx.attach_as_parent();
417                self.handle_peek(*peek)
418            }
419            CancelPeek { uuid } => self.handle_cancel_peek(uuid),
420            AllowWrites => {
421                self.compute_state
422                    .read_only_tx
423                    .send(false)
424                    .expect("we're holding one other end");
425                self.compute_state.persist_clients.cfg().enable_compaction();
426            }
427        }
428
429        timer.observe_duration();
430    }
431
432    fn handle_create_instance(&mut self, config: InstanceConfig) {
433        // Ensure the state is consistent with the config before we initialize anything.
434        self.compute_state.apply_worker_config();
435        if let Some(offset) = config.expiration_offset {
436            self.compute_state.apply_expiration_offset(offset);
437        }
438
439        self.initialize_logging(config.logging);
440    }
441
442    fn handle_update_configuration(&mut self, params: ComputeParameters) {
443        info!("Applying configuration update: {params:?}");
444
445        let ComputeParameters {
446            workload_class,
447            max_result_size,
448            tracing,
449            grpc_client: _grpc_client,
450            dyncfg_updates,
451        } = params;
452
453        if let Some(v) = workload_class {
454            self.compute_state.metrics.set_workload_class(v);
455        }
456        if let Some(v) = max_result_size {
457            self.compute_state.max_result_size = v;
458        }
459
460        tracing.apply(self.compute_state.tracing_handle.as_ref());
461
462        dyncfg_updates.apply(&self.compute_state.worker_config);
463        self.compute_state
464            .persist_clients
465            .cfg()
466            .apply_from(&dyncfg_updates);
467
468        // Note: We're only updating mz_metrics from the compute state here, but not from the
469        // equivalent storage state. This is because they're running on the same process and
470        // share the metrics.
471        mz_metrics::update_dyncfg(&dyncfg_updates);
472
473        self.compute_state.apply_worker_config();
474    }
475
476    fn handle_create_dataflow(
477        &mut self,
478        dataflow: DataflowDescription<RenderPlan, CollectionMetadata>,
479    ) {
480        // Collect the exported object identifiers, paired with their associated "collection" identifier.
481        // The latter is used to extract dependency information, which is in terms of collections ids.
482        let dataflow_index = self.timely_worker.next_dataflow_index();
483        let as_of = dataflow.as_of.clone().unwrap();
484
485        let dataflow_expiration = dataflow
486            .time_dependence
487            .as_ref()
488            .map(|time_dependence| {
489                self.determine_dataflow_expiration(time_dependence, &dataflow.until)
490            })
491            .unwrap_or_default();
492
493        // Add the dataflow expiration to `until`.
494        let until = dataflow.until.meet(&dataflow_expiration);
495
496        if dataflow.is_transient() {
497            debug!(
498                name = %dataflow.debug_name,
499                import_ids = %dataflow.display_import_ids(),
500                export_ids = %dataflow.display_export_ids(),
501                as_of = ?as_of.elements(),
502                time_dependence = ?dataflow.time_dependence,
503                expiration = ?dataflow_expiration.elements(),
504                expiration_datetime = ?dataflow_expiration.as_option().map(|t| mz_ore::now::to_datetime(t.into())),
505                plan_until = ?dataflow.until.elements(),
506                until = ?until.elements(),
507                "creating dataflow",
508            );
509        } else {
510            info!(
511                name = %dataflow.debug_name,
512                import_ids = %dataflow.display_import_ids(),
513                export_ids = %dataflow.display_export_ids(),
514                as_of = ?as_of.elements(),
515                time_dependence = ?dataflow.time_dependence,
516                expiration = ?dataflow_expiration.elements(),
517                expiration_datetime = ?dataflow_expiration.as_option().map(|t| mz_ore::now::to_datetime(t.into())),
518                plan_until = ?dataflow.until.elements(),
519                until = ?until.elements(),
520                "creating dataflow",
521            );
522        };
523
524        let subscribe_copy_ids: BTreeSet<_> = dataflow
525            .subscribe_ids()
526            .chain(dataflow.copy_to_ids())
527            .collect();
528
529        // Initialize compute and logging state for each object.
530        for object_id in dataflow.export_ids() {
531            let is_subscribe_or_copy = subscribe_copy_ids.contains(&object_id);
532            let metrics = self.compute_state.metrics.for_collection(object_id);
533            let mut collection = CollectionState::new(is_subscribe_or_copy, as_of.clone(), metrics);
534
535            if let Some(logger) = self.compute_state.compute_logger.clone() {
536                let logging = CollectionLogging::new(
537                    object_id,
538                    logger,
539                    dataflow_index,
540                    dataflow.import_ids(),
541                );
542                collection.logging = Some(logging);
543            }
544
545            collection.reset_reported_frontiers(ReportedFrontier::NotReported {
546                lower: as_of.clone(),
547            });
548
549            let existing = self.compute_state.collections.insert(object_id, collection);
550            if existing.is_some() {
551                error!(
552                    id = ?object_id,
553                    "existing collection for newly created dataflow",
554                );
555            }
556        }
557
558        let (start_signal, suspension_token) = StartSignal::new();
559        for id in dataflow.export_ids() {
560            self.compute_state
561                .suspended_collections
562                .insert(id, Rc::clone(&suspension_token));
563        }
564
565        crate::render::build_compute_dataflow(
566            self.timely_worker,
567            self.compute_state,
568            dataflow,
569            start_signal,
570            until,
571            dataflow_expiration,
572        );
573    }
574
575    fn handle_schedule(&mut self, id: GlobalId) {
576        // A `Schedule` command instructs us to begin dataflow computation for a collection, so
577        // we should unsuspend it by dropping the corresponding suspension token. Note that a
578        // dataflow can export multiple collections and they all share one suspension token, so the
579        // computation of a dataflow will only start once all its exported collections have been
580        // scheduled.
581        let suspension_token = self.compute_state.suspended_collections.remove(&id);
582        drop(suspension_token);
583    }
584
585    fn handle_allow_compaction(&mut self, id: GlobalId, frontier: Antichain<Timestamp>) {
586        if frontier.is_empty() {
587            // Indicates that we may drop `id`, as there are no more valid times to read.
588            self.drop_collection(id);
589        } else {
590            self.compute_state
591                .traces
592                .allow_compaction(id, frontier.borrow());
593        }
594    }
595
596    #[mz_ore::instrument(level = "debug")]
597    fn handle_peek(&mut self, peek: Peek) {
598        let pending = match &peek.target {
599            PeekTarget::Index { id } => {
600                // Acquire a copy of the trace suitable for fulfilling the peek.
601                let trace_bundle = self.compute_state.traces.get(id).unwrap().clone();
602                PendingPeek::index(peek, trace_bundle)
603            }
604            PeekTarget::Persist { metadata, .. } => {
605                let metadata = metadata.clone();
606                PendingPeek::persist(
607                    peek,
608                    Arc::clone(&self.compute_state.persist_clients),
609                    metadata,
610                    usize::cast_from(self.compute_state.max_result_size),
611                    self.timely_worker,
612                )
613            }
614        };
615
616        // Log the receipt of the peek.
617        if let Some(logger) = self.compute_state.compute_logger.as_mut() {
618            logger.log(&pending.as_log_event(true));
619        }
620
621        self.process_peek(&mut Antichain::new(), pending);
622    }
623
624    fn handle_cancel_peek(&mut self, uuid: Uuid) {
625        if let Some(peek) = self.compute_state.pending_peeks.remove(&uuid) {
626            self.send_peek_response(peek, PeekResponse::Canceled);
627        }
628    }
629
630    /// Arrange for the given collection to be dropped.
631    ///
632    /// Collection dropping occurs in three phases:
633    ///
634    ///  1. This method removes the collection from the [`ComputeState`] and drops its
635    ///     [`CollectionState`], including its held dataflow tokens. It then adds the dropped
636    ///     collection to `dropped_collections`.
637    ///  2. The next step of the Timely worker lets the source operators observe the token drops
638    ///     and shut themselves down.
639    ///  3. `report_dropped_collections` removes the entry from `dropped_collections` and emits any
640    ///     outstanding final responses required by the compute protocol.
641    ///
642    /// These steps ensure that we don't report a collection as dropped to the controller before it
643    /// has stopped reading from its inputs. Doing so would allow the controller to release its
644    /// read holds on the inputs, which could lead to panics from the replica trying to read
645    /// already compacted times.
646    fn drop_collection(&mut self, id: GlobalId) {
647        let collection = self
648            .compute_state
649            .collections
650            .remove(&id)
651            .expect("dropped untracked collection");
652
653        // If this collection is an index, remove its trace.
654        self.compute_state.traces.remove(&id);
655        // If the collection is unscheduled, remove it from the list of waiting collections.
656        self.compute_state.suspended_collections.remove(&id);
657
658        // Remember the collection as dropped, for emission of outstanding final compute responses.
659        let dropped = DroppedCollection {
660            reported_frontiers: collection.reported_frontiers,
661            is_subscribe_or_copy: collection.is_subscribe_or_copy,
662        };
663        self.compute_state.dropped_collections.push((id, dropped));
664    }
665
666    /// Initializes timely dataflow logging and publishes as a view.
667    pub fn initialize_logging(&mut self, config: LoggingConfig) {
668        if self.compute_state.compute_logger.is_some() {
669            panic!("dataflow server has already initialized logging");
670        }
671
672        let LoggingTraces {
673            traces,
674            dataflow_index,
675            compute_logger: logger,
676        } = logging::initialize(self.timely_worker, &config);
677
678        let mut log_index_ids = config.index_logs;
679        for (log, trace) in traces {
680            // Install trace as maintained index.
681            let id = log_index_ids
682                .remove(&log)
683                .expect("`logging::initialize` does not invent logs");
684            self.compute_state.traces.set(id, trace);
685
686            // Initialize compute and logging state for the logging index.
687            let is_subscribe_or_copy = false;
688            let as_of = Antichain::from_elem(Timestamp::MIN);
689            let metrics = self.compute_state.metrics.for_collection(id);
690            let mut collection = CollectionState::new(is_subscribe_or_copy, as_of, metrics);
691
692            let logging =
693                CollectionLogging::new(id, logger.clone(), dataflow_index, std::iter::empty());
694            collection.logging = Some(logging);
695
696            let existing = self.compute_state.collections.insert(id, collection);
697            if existing.is_some() {
698                error!(
699                    id = ?id,
700                    "existing collection for newly initialized logging export",
701                );
702            }
703        }
704
705        // Sanity check.
706        assert!(
707            log_index_ids.is_empty(),
708            "failed to create requested logging indexes: {log_index_ids:?}",
709        );
710
711        self.compute_state.compute_logger = Some(logger);
712    }
713
714    /// Send progress information to the controller.
715    pub fn report_frontiers(&mut self) {
716        let mut responses = Vec::new();
717
718        // Maintain a single allocation for `new_frontier` to avoid allocating on every iteration.
719        let mut new_frontier = Antichain::new();
720
721        for (&id, collection) in self.compute_state.collections.iter_mut() {
722            // The compute protocol does not allow `Frontiers` responses for subscribe and copy-to
723            // collections (database-issues#4701).
724            if collection.is_subscribe_or_copy {
725                continue;
726            }
727
728            let reported = collection.reported_frontiers();
729
730            // Collect the write frontier and check for progress.
731            new_frontier.clear();
732            if let Some(traces) = self.compute_state.traces.get_mut(&id) {
733                assert!(
734                    collection.sink_write_frontier.is_none(),
735                    "collection {id} has multiple frontiers"
736                );
737                traces.oks_mut().read_upper(&mut new_frontier);
738            } else if let Some(frontier) = &collection.sink_write_frontier {
739                new_frontier.clone_from(&frontier.borrow());
740            } else {
741                error!(id = ?id, "collection without write frontier");
742                continue;
743            }
744            let new_write_frontier = reported
745                .write_frontier
746                .allows_reporting(&new_frontier)
747                .then(|| new_frontier.clone());
748
749            // Collect the output frontier and check for progress.
750            //
751            // By default, the output frontier equals the write frontier (which is still stored in
752            // `new_frontier`). If the collection provides a compute frontier, we construct the
753            // output frontier by taking the meet of write and compute frontier, to avoid:
754            //  * reporting progress through times we have not yet written
755            //  * reporting progress through times we have not yet fully processed, for
756            //    collections that jump their write frontiers into the future
757            if let Some(probe) = &collection.compute_probe {
758                probe.with_frontier(|frontier| new_frontier.extend(frontier.iter().copied()));
759            }
760            let new_output_frontier = reported
761                .output_frontier
762                .allows_reporting(&new_frontier)
763                .then(|| new_frontier.clone());
764
765            // Collect the input frontier and check for progress.
766            new_frontier.clear();
767            for probe in collection.input_probes.values() {
768                probe.with_frontier(|frontier| new_frontier.extend(frontier.iter().copied()));
769            }
770            let new_input_frontier = reported
771                .input_frontier
772                .allows_reporting(&new_frontier)
773                .then(|| new_frontier.clone());
774
775            if let Some(frontier) = &new_write_frontier {
776                collection
777                    .set_reported_write_frontier(ReportedFrontier::Reported(frontier.clone()));
778            }
779            if let Some(frontier) = &new_input_frontier {
780                collection
781                    .set_reported_input_frontier(ReportedFrontier::Reported(frontier.clone()));
782            }
783            if let Some(frontier) = &new_output_frontier {
784                collection
785                    .set_reported_output_frontier(ReportedFrontier::Reported(frontier.clone()));
786            }
787
788            let response = FrontiersResponse {
789                write_frontier: new_write_frontier,
790                input_frontier: new_input_frontier,
791                output_frontier: new_output_frontier,
792            };
793            if response.has_updates() {
794                responses.push((id, response));
795            }
796        }
797
798        for (id, frontiers) in responses {
799            self.send_compute_response(ComputeResponse::Frontiers(id, frontiers));
800        }
801    }
802
803    /// Report dropped collections to the controller.
804    pub fn report_dropped_collections(&mut self) {
805        let dropped_collections = std::mem::take(&mut self.compute_state.dropped_collections);
806
807        for (id, collection) in dropped_collections {
808            // The compute protocol requires us to send a `Frontiers` response with empty frontiers
809            // when a collection was dropped, unless:
810            //  * The frontier was already reported as empty previously, or
811            //  * The collection is a subscribe or copy-to.
812
813            if collection.is_subscribe_or_copy {
814                continue;
815            }
816
817            let reported = collection.reported_frontiers;
818            let write_frontier = (!reported.write_frontier.is_empty()).then(Antichain::new);
819            let input_frontier = (!reported.input_frontier.is_empty()).then(Antichain::new);
820            let output_frontier = (!reported.output_frontier.is_empty()).then(Antichain::new);
821
822            let frontiers = FrontiersResponse {
823                write_frontier,
824                input_frontier,
825                output_frontier,
826            };
827            if frontiers.has_updates() {
828                self.send_compute_response(ComputeResponse::Frontiers(id, frontiers));
829            }
830        }
831    }
832
833    /// Report operator hydration events.
834    pub fn report_operator_hydration(&self) {
835        let worker_id = self.timely_worker.index();
836        for event in self.compute_state.hydration_rx.try_iter() {
837            // The compute protocol forbids reporting `Status` about collections that have advanced
838            // to the empty frontier, so we ignore updates for those.
839            let collection = self.compute_state.collections.get(&event.export_id);
840            if collection.map_or(true, |c| c.reported_frontiers().all_empty()) {
841                continue;
842            }
843
844            let status = OperatorHydrationStatus {
845                collection_id: event.export_id,
846                lir_id: event.lir_id,
847                worker_id,
848                hydrated: event.hydrated,
849            };
850            let response = ComputeResponse::Status(StatusResponse::OperatorHydration(status));
851            self.send_compute_response(response);
852        }
853    }
854
855    /// Report per-worker metrics.
856    pub(crate) fn report_metrics(&self) {
857        if let Some(expiration) = self.compute_state.replica_expiration.as_option() {
858            let now = Duration::from_millis(mz_ore::now::SYSTEM_TIME()).as_secs_f64();
859            let expiration = Duration::from_millis(<u64>::from(expiration)).as_secs_f64();
860            let remaining = expiration - now;
861            self.compute_state
862                .metrics
863                .replica_expiration_remaining_seconds
864                .set(remaining)
865        }
866    }
867
868    /// Either complete the peek (and send the response) or put it in the pending set.
869    fn process_peek(&mut self, upper: &mut Antichain<Timestamp>, mut peek: PendingPeek) {
870        let response = match &mut peek {
871            PendingPeek::Index(peek) => {
872                peek.seek_fulfillment(upper, self.compute_state.max_result_size)
873            }
874            PendingPeek::Persist(peek) => peek.result.try_recv().ok().map(|(result, duration)| {
875                self.compute_state
876                    .metrics
877                    .persist_peek_seconds
878                    .observe(duration.as_secs_f64());
879                result
880            }),
881        };
882
883        if let Some(response) = response {
884            let _span = span!(parent: peek.span(), Level::DEBUG, "process_peek").entered();
885            self.send_peek_response(peek, response)
886        } else {
887            let uuid = peek.peek().uuid;
888            self.compute_state.pending_peeks.insert(uuid, peek);
889        }
890    }
891
892    /// Scan pending peeks and attempt to retire each.
893    pub fn process_peeks(&mut self) {
894        let mut upper = Antichain::new();
895        let pending_peeks = std::mem::take(&mut self.compute_state.pending_peeks);
896        for (_uuid, peek) in pending_peeks {
897            self.process_peek(&mut upper, peek);
898        }
899    }
900
901    /// Sends a response for this peek's resolution to the coordinator.
902    ///
903    /// Note that this function takes ownership of the `PendingPeek`, which is
904    /// meant to prevent multiple responses to the same peek.
905    #[mz_ore::instrument(level = "debug")]
906    fn send_peek_response(&mut self, peek: PendingPeek, response: PeekResponse) {
907        let log_event = peek.as_log_event(false);
908        // Respond with the response.
909        self.send_compute_response(ComputeResponse::PeekResponse(
910            peek.peek().uuid,
911            response,
912            OpenTelemetryContext::obtain(),
913        ));
914
915        // Log responding to the peek request.
916        if let Some(logger) = self.compute_state.compute_logger.as_mut() {
917            logger.log(&log_event);
918        }
919    }
920
921    /// Scan the shared subscribe response buffer, and forward results along.
922    pub fn process_subscribes(&mut self) {
923        let mut subscribe_responses = self.compute_state.subscribe_response_buffer.borrow_mut();
924        for (sink_id, mut response) in subscribe_responses.drain(..) {
925            // Update frontier logging for this subscribe.
926            if let Some(collection) = self.compute_state.collections.get_mut(&sink_id) {
927                let new_frontier = match &response {
928                    SubscribeResponse::Batch(b) => b.upper.clone(),
929                    SubscribeResponse::DroppedAt(_) => Antichain::new(),
930                };
931
932                let reported = collection.reported_frontiers();
933                assert!(
934                    reported.write_frontier.allows_reporting(&new_frontier),
935                    "subscribe write frontier regression: {:?} -> {:?}",
936                    reported.write_frontier,
937                    new_frontier,
938                );
939                assert!(
940                    reported.input_frontier.allows_reporting(&new_frontier),
941                    "subscribe input frontier regression: {:?} -> {:?}",
942                    reported.input_frontier,
943                    new_frontier,
944                );
945
946                collection
947                    .set_reported_write_frontier(ReportedFrontier::Reported(new_frontier.clone()));
948                collection
949                    .set_reported_input_frontier(ReportedFrontier::Reported(new_frontier.clone()));
950                collection.set_reported_output_frontier(ReportedFrontier::Reported(new_frontier));
951            } else {
952                // Presumably tracking state for this subscribe was already dropped by
953                // `drop_collection`. There is nothing left to do for logging.
954            }
955
956            response
957                .to_error_if_exceeds(usize::try_from(self.compute_state.max_result_size).unwrap());
958            self.send_compute_response(ComputeResponse::SubscribeResponse(sink_id, response));
959        }
960    }
961
962    /// Scan the shared copy to response buffer, and forward results along.
963    pub fn process_copy_tos(&self) {
964        let mut responses = self.compute_state.copy_to_response_buffer.borrow_mut();
965        for (sink_id, response) in responses.drain(..) {
966            self.send_compute_response(ComputeResponse::CopyToResponse(sink_id, response));
967        }
968    }
969
970    /// Send a response to the coordinator.
971    fn send_compute_response(&self, response: ComputeResponse) {
972        // Ignore send errors because the coordinator is free to ignore our
973        // responses. This happens during shutdown.
974        let _ = self.response_tx.send(response);
975    }
976
977    /// Checks for dataflow expiration. Panics if we're past the replica expiration time.
978    pub(crate) fn check_expiration(&self) {
979        let now = mz_ore::now::SYSTEM_TIME();
980        if self.compute_state.replica_expiration.less_than(&now.into()) {
981            let now_datetime = mz_ore::now::to_datetime(now);
982            let expiration_datetime = self
983                .compute_state
984                .replica_expiration
985                .as_option()
986                .map(Into::into)
987                .map(mz_ore::now::to_datetime);
988
989            // We error and assert separately to produce structured logs in anything that depends
990            // on tracing.
991            error!(
992                now,
993                now_datetime = ?now_datetime,
994                expiration = ?self.compute_state.replica_expiration.elements(),
995                expiration_datetime = ?expiration_datetime,
996                "replica expired"
997            );
998
999            // Repeat condition for better error message.
1000            assert!(
1001                !self.compute_state.replica_expiration.less_than(&now.into()),
1002                "replica expired. now: {now} ({now_datetime:?}), expiration: {:?} ({expiration_datetime:?})",
1003                self.compute_state.replica_expiration.elements(),
1004            );
1005        }
1006    }
1007
1008    /// Returns the dataflow expiration, i.e, the timestamp beyond which diffs can be
1009    /// dropped.
1010    ///
1011    /// Returns an empty timestamp if `replica_expiration` is unset or matches conditions under
1012    /// which dataflow expiration should be disabled.
1013    pub fn determine_dataflow_expiration(
1014        &self,
1015        time_dependence: &TimeDependence,
1016        until: &Antichain<mz_repr::Timestamp>,
1017    ) -> Antichain<mz_repr::Timestamp> {
1018        // Evaluate time dependence with respect to the expiration time.
1019        // * Step time forward to ensure the expiration time is different to the moment a dataflow
1020        //   can legitimately jump to.
1021        // * We cannot expire dataflow with an until that is less or equal to the expiration time.
1022        let iter = self
1023            .compute_state
1024            .replica_expiration
1025            .iter()
1026            .filter_map(|t| time_dependence.apply(*t))
1027            .filter_map(|t| mz_repr::Timestamp::try_step_forward(&t))
1028            .filter(|expiration| !until.less_equal(expiration));
1029        Antichain::from_iter(iter)
1030    }
1031}
1032
1033/// A peek against either an index or a Persist collection.
1034///
1035/// Note that `PendingPeek` intentionally does not implement or derive `Clone`,
1036/// as each `PendingPeek` is meant to be dropped after it's responded to.
1037pub enum PendingPeek {
1038    /// A peek against an index. (Possibly a temporary index created for the purpose.)
1039    Index(IndexPeek),
1040    /// A peek against a Persist-backed collection.
1041    Persist(PersistPeek),
1042}
1043
1044impl PendingPeek {
1045    /// Produces a corresponding log event.
1046    pub fn as_log_event(&self, installed: bool) -> ComputeEvent {
1047        let peek = self.peek();
1048        let (id, peek_type) = match &peek.target {
1049            PeekTarget::Index { id } => (id, logging::compute::PeekType::Index),
1050            PeekTarget::Persist { id, .. } => (id, logging::compute::PeekType::Persist),
1051        };
1052        ComputeEvent::Peek(PeekEvent {
1053            peek: logging::compute::Peek::new(*id, peek.timestamp, peek.uuid),
1054            peek_type,
1055            installed,
1056        })
1057    }
1058
1059    fn index(peek: Peek, mut trace_bundle: TraceBundle) -> Self {
1060        let empty_frontier = Antichain::new();
1061        let timestamp_frontier = Antichain::from_elem(peek.timestamp);
1062        trace_bundle
1063            .oks_mut()
1064            .set_logical_compaction(timestamp_frontier.borrow());
1065        trace_bundle
1066            .errs_mut()
1067            .set_logical_compaction(timestamp_frontier.borrow());
1068        trace_bundle
1069            .oks_mut()
1070            .set_physical_compaction(empty_frontier.borrow());
1071        trace_bundle
1072            .errs_mut()
1073            .set_physical_compaction(empty_frontier.borrow());
1074
1075        PendingPeek::Index(IndexPeek {
1076            peek,
1077            trace_bundle,
1078            span: tracing::Span::current(),
1079        })
1080    }
1081
1082    fn persist<A: Allocate>(
1083        peek: Peek,
1084        persist_clients: Arc<PersistClientCache>,
1085        metadata: CollectionMetadata,
1086        max_result_size: usize,
1087        timely_worker: &TimelyWorker<A>,
1088    ) -> Self {
1089        let active_worker = {
1090            // Choose the worker that does the actual peek arbitrarily but consistently.
1091            let chosen_index = usize::cast_from(peek.uuid.hashed()) % timely_worker.peers();
1092            chosen_index == timely_worker.index()
1093        };
1094        let activator = timely_worker.sync_activator_for([].into());
1095        let peek_uuid = peek.uuid;
1096
1097        let (result_tx, result_rx) = oneshot::channel();
1098        let timestamp = peek.timestamp;
1099        let mfp_plan = peek.map_filter_project.clone();
1100        let max_results_needed = peek
1101            .finishing
1102            .limit
1103            .map(|l| usize::cast_from(u64::from(l)))
1104            .unwrap_or(usize::MAX)
1105            + peek.finishing.offset;
1106        let order_by = peek.finishing.order_by.clone();
1107
1108        // Persist peeks can include at most one literal constraint.
1109        let literal_constraint = peek
1110            .literal_constraints
1111            .clone()
1112            .map(|rows| rows.into_element());
1113
1114        let task_handle = mz_ore::task::spawn(|| "persist::peek", async move {
1115            let start = Instant::now();
1116            let result = if active_worker {
1117                PersistPeek::do_peek(
1118                    &persist_clients,
1119                    metadata,
1120                    timestamp,
1121                    literal_constraint,
1122                    mfp_plan,
1123                    max_result_size,
1124                    max_results_needed,
1125                )
1126                .await
1127            } else {
1128                Ok(vec![])
1129            };
1130            let result = match result {
1131                Ok(rows) => PeekResponse::Rows(RowCollection::new(rows, &order_by)),
1132                Err(e) => PeekResponse::Error(e.to_string()),
1133            };
1134            match result_tx.send((result, start.elapsed())) {
1135                Ok(()) => {}
1136                Err((_result, elapsed)) => {
1137                    debug!(duration =? elapsed, "dropping result for cancelled peek {peek_uuid}")
1138                }
1139            }
1140            match activator.activate() {
1141                Ok(()) => {}
1142                Err(_) => {
1143                    debug!("unable to wake timely after completed peek {peek_uuid}");
1144                }
1145            }
1146        });
1147        PendingPeek::Persist(PersistPeek {
1148            peek,
1149            _abort_handle: task_handle.abort_on_drop(),
1150            result: result_rx,
1151            span: tracing::Span::current(),
1152        })
1153    }
1154
1155    fn span(&self) -> &tracing::Span {
1156        match self {
1157            PendingPeek::Index(p) => &p.span,
1158            PendingPeek::Persist(p) => &p.span,
1159        }
1160    }
1161
1162    pub(crate) fn peek(&self) -> &Peek {
1163        match self {
1164            PendingPeek::Index(p) => &p.peek,
1165            PendingPeek::Persist(p) => &p.peek,
1166        }
1167    }
1168}
1169
1170/// An in-progress Persist peek.
1171///
1172/// Note that `PendingPeek` intentionally does not implement or derive `Clone`,
1173/// as each `PendingPeek` is meant to be dropped after it's responded to.
1174pub struct PersistPeek {
1175    pub(crate) peek: Peek,
1176    /// A background task that's responsible for producing the peek results.
1177    /// If we're no longer interested in the results, we abort the task.
1178    _abort_handle: AbortOnDropHandle<()>,
1179    /// The result of the background task, eventually.
1180    result: oneshot::Receiver<(PeekResponse, Duration)>,
1181    /// The `tracing::Span` tracking this peek's operation
1182    span: tracing::Span,
1183}
1184
1185impl PersistPeek {
1186    async fn do_peek(
1187        persist_clients: &PersistClientCache,
1188        metadata: CollectionMetadata,
1189        as_of: Timestamp,
1190        literal_constraint: Option<Row>,
1191        mfp_plan: SafeMfpPlan,
1192        max_result_size: usize,
1193        mut limit_remaining: usize,
1194    ) -> Result<Vec<(Row, NonZeroUsize)>, String> {
1195        let client = persist_clients
1196            .open(metadata.persist_location)
1197            .await
1198            .map_err(|e| e.to_string())?;
1199
1200        let mut reader: ReadHandle<SourceData, (), Timestamp, StorageDiff> = client
1201            .open_leased_reader(
1202                metadata.data_shard,
1203                Arc::new(metadata.relation_desc.clone()),
1204                Arc::new(UnitSchema),
1205                Diagnostics::from_purpose("persist::peek"),
1206                USE_CRITICAL_SINCE_SNAPSHOT.get(client.dyncfgs()),
1207            )
1208            .await
1209            .map_err(|e| e.to_string())?;
1210
1211        // If we are using txn-wal for this collection, then the upper might
1212        // be advanced lazily and we have to go through txn-wal for reads.
1213        //
1214        // TODO: If/when we have a process-wide TxnsRead worker for clusterd,
1215        // use in here (instead of opening a new TxnsCache) to save a persist
1216        // reader registration and some txns shard read traffic.
1217        let mut txns_read = if let Some(txns_id) = metadata.txns_shard {
1218            Some(TxnsCache::open(&client, txns_id, Some(metadata.data_shard)).await)
1219        } else {
1220            None
1221        };
1222
1223        let metrics = client.metrics();
1224
1225        let mut cursor = StatsCursor::new(
1226            &mut reader,
1227            txns_read.as_mut(),
1228            metrics,
1229            &mfp_plan,
1230            &metadata.relation_desc,
1231            Antichain::from_elem(as_of),
1232        )
1233        .await
1234        .map_err(|since| {
1235            format!("attempted to peek at {as_of}, but the since has advanced to {since:?}")
1236        })?;
1237
1238        // Re-used state for processing and building rows.
1239        let mut result = vec![];
1240        let mut datum_vec = DatumVec::new();
1241        let mut row_builder = Row::default();
1242        let arena = RowArena::new();
1243        let mut total_size = 0usize;
1244
1245        let literal_len = match &literal_constraint {
1246            None => 0,
1247            Some(row) => row.iter().count(),
1248        };
1249
1250        'collect: while limit_remaining > 0 {
1251            let Some(batch) = cursor.next().await else {
1252                break;
1253            };
1254            for (data, _, d) in batch {
1255                let row = data.map_err(|e| e.to_string())?;
1256
1257                if let Some(literal) = &literal_constraint {
1258                    match row.iter().take(literal_len).cmp(literal.iter()) {
1259                        Ordering::Less => continue,
1260                        Ordering::Equal => {}
1261                        Ordering::Greater => break 'collect,
1262                    }
1263                }
1264
1265                let count: usize = d.try_into().map_err(|_| {
1266                    format!(
1267                        "Invalid data in source, saw retractions ({}) for row that does not exist: {:?}",
1268                        -d,
1269                        row,
1270                    )
1271                })?;
1272                let Some(count) = NonZeroUsize::new(count) else {
1273                    continue;
1274                };
1275                let mut datum_local = datum_vec.borrow_with(&row);
1276                let eval_result = mfp_plan
1277                    .evaluate_into(&mut datum_local, &arena, &mut row_builder)
1278                    .map(|row| row.cloned())
1279                    .map_err(|e| e.to_string())?;
1280                if let Some(row) = eval_result {
1281                    total_size = total_size
1282                        .saturating_add(row.byte_len())
1283                        .saturating_add(std::mem::size_of::<NonZeroUsize>());
1284                    if total_size > max_result_size {
1285                        return Err(format!(
1286                            "result exceeds max size of {}",
1287                            ByteSize::b(u64::cast_from(max_result_size))
1288                        ));
1289                    }
1290                    result.push((row, count));
1291                    limit_remaining = limit_remaining.saturating_sub(count.get());
1292                    if limit_remaining == 0 {
1293                        break;
1294                    }
1295                }
1296            }
1297        }
1298
1299        Ok(result)
1300    }
1301}
1302
1303/// An in-progress index-backed peek, and data to eventually fulfill it.
1304pub struct IndexPeek {
1305    peek: Peek,
1306    /// The data from which the trace derives.
1307    trace_bundle: TraceBundle,
1308    /// The `tracing::Span` tracking this peek's operation
1309    span: tracing::Span,
1310}
1311
1312impl IndexPeek {
1313    /// Attempts to fulfill the peek and reports success.
1314    ///
1315    /// To produce output at `peek.timestamp`, we must be certain that
1316    /// it is no longer changing. A trace guarantees that all future
1317    /// changes will be greater than or equal to an element of `upper`.
1318    ///
1319    /// If an element of `upper` is less or equal to `peek.timestamp`,
1320    /// then there can be further updates that would change the output.
1321    /// If no element of `upper` is less or equal to `peek.timestamp`,
1322    /// then for any time `t` less or equal to `peek.timestamp` it is
1323    /// not the case that `upper` is less or equal to that timestamp,
1324    /// and so the result cannot further evolve.
1325    fn seek_fulfillment(
1326        &mut self,
1327        upper: &mut Antichain<Timestamp>,
1328        max_result_size: u64,
1329    ) -> Option<PeekResponse> {
1330        self.trace_bundle.oks_mut().read_upper(upper);
1331        if upper.less_equal(&self.peek.timestamp) {
1332            return None;
1333        }
1334        self.trace_bundle.errs_mut().read_upper(upper);
1335        if upper.less_equal(&self.peek.timestamp) {
1336            return None;
1337        }
1338
1339        let read_frontier = self.trace_bundle.compaction_frontier();
1340        if !read_frontier.less_equal(&self.peek.timestamp) {
1341            let error = format!(
1342                "Arrangement compaction frontier ({:?}) is beyond the time of the attempted read ({})",
1343                read_frontier.elements(),
1344                self.peek.timestamp,
1345            );
1346            return Some(PeekResponse::Error(error));
1347        }
1348
1349        let response = match self.collect_finished_data(max_result_size) {
1350            Ok(rows) => PeekResponse::Rows(RowCollection::new(rows, &self.peek.finishing.order_by)),
1351            Err(text) => PeekResponse::Error(text),
1352        };
1353        Some(response)
1354    }
1355
1356    /// Collects data for a known-complete peek from the ok stream.
1357    fn collect_finished_data(
1358        &mut self,
1359        max_result_size: u64,
1360    ) -> Result<Vec<(Row, NonZeroUsize)>, String> {
1361        // Check if there exist any errors and, if so, return whatever one we
1362        // find first.
1363        let (mut cursor, storage) = self.trace_bundle.errs_mut().cursor();
1364        while cursor.key_valid(&storage) {
1365            let mut copies = Diff::ZERO;
1366            cursor.map_times(&storage, |time, diff| {
1367                if time.less_equal(&self.peek.timestamp) {
1368                    copies += diff;
1369                }
1370            });
1371            if copies.is_negative() {
1372                return Err(format!(
1373                    "Invalid data in source errors, saw retractions ({}) for row that does not exist: {}",
1374                    -copies,
1375                    cursor.key(&storage),
1376                ));
1377            }
1378            if copies.is_positive() {
1379                return Err(cursor.key(&storage).to_string());
1380            }
1381            cursor.step_key(&storage);
1382        }
1383
1384        Self::collect_ok_finished_data(&mut self.peek, self.trace_bundle.oks_mut(), max_result_size)
1385    }
1386
1387    /// Collects data for a known-complete peek from the ok stream.
1388    fn collect_ok_finished_data<Tr>(
1389        peek: &mut Peek<Timestamp>,
1390        oks_handle: &mut Tr,
1391        max_result_size: u64,
1392    ) -> Result<Vec<(Row, NonZeroUsize)>, String>
1393    where
1394        for<'a> Tr: TraceReader<DiffGat<'a> = &'a Diff>,
1395        for<'a> Tr::Key<'a>: ToDatumIter + IntoOwned<'a, Owned = Row> + Eq,
1396        for<'a> Tr::Val<'a>: ToDatumIter,
1397        for<'a> Tr::TimeGat<'a>: PartialOrder<mz_repr::Timestamp>,
1398    {
1399        let max_result_size = usize::cast_from(max_result_size);
1400        let count_byte_size = std::mem::size_of::<NonZeroUsize>();
1401
1402        // Cursor and bound lifetime for `Row` data in the backing trace.
1403        let (mut cursor, storage) = oks_handle.cursor();
1404        // Accumulated `Vec<(row, count)>` results that we are likely to return.
1405        let mut results = Vec::new();
1406        let mut total_size: usize = 0;
1407
1408        // When set, a bound on the number of records we need to return.
1409        // The requirements on the records are driven by the finishing's
1410        // `order_by` field. Further limiting will happen when the results
1411        // are collected, so we don't need to have exactly this many results,
1412        // just at least those results that would have been returned.
1413        let max_results = peek
1414            .finishing
1415            .limit
1416            .map(|l| usize::cast_from(u64::from(l)) + peek.finishing.offset);
1417
1418        use mz_ore::result::ResultExt;
1419
1420        let mut row_builder = Row::default();
1421        let mut datum_vec = DatumVec::new();
1422        let mut l_datum_vec = DatumVec::new();
1423        let mut r_datum_vec = DatumVec::new();
1424
1425        // We have to sort the literal constraints because cursor.seek_key can seek only forward.
1426        peek.literal_constraints
1427            .iter_mut()
1428            .for_each(|vec| vec.sort());
1429        let has_literal_constraints = peek.literal_constraints.is_some();
1430        let mut literals = peek.literal_constraints.iter().flatten();
1431        let mut current_literal = None;
1432
1433        while cursor.key_valid(&storage) {
1434            if has_literal_constraints {
1435                loop {
1436                    // Go to the next literal constraint.
1437                    // (i.e., to the next OR argument in something like `c=3 OR c=7 OR c=9`)
1438                    current_literal = literals.next();
1439                    match current_literal {
1440                        None => return Ok(results),
1441                        Some(current_literal) => {
1442                            // NOTE(vmarcos): We expect the extra allocations below to be manageable
1443                            // since we only perform as many of them as there are literals.
1444                            cursor.seek_key(&storage, IntoOwned::borrow_as(current_literal));
1445                            if !cursor.key_valid(&storage) {
1446                                return Ok(results);
1447                            }
1448                            if cursor.get_key(&storage).unwrap()
1449                                == IntoOwned::borrow_as(current_literal)
1450                            {
1451                                // The cursor found a record whose key matches the current literal.
1452                                // We break from the inner loop, and process this key.
1453                                break;
1454                            }
1455                            // The cursor landed on a record that has a different key, meaning that there is
1456                            // no record whose key would match the current literal.
1457                        }
1458                    }
1459                }
1460            }
1461
1462            while cursor.val_valid(&storage) {
1463                // TODO: This arena could be maintained and reused for longer,
1464                // but it wasn't clear at what interval we should flush
1465                // it to ensure we don't accidentally spike our memory use.
1466                // This choice is conservative, and not the end of the world
1467                // from a performance perspective.
1468                let arena = RowArena::new();
1469
1470                let key_item = cursor.key(&storage);
1471                let key = key_item.to_datum_iter();
1472                let row_item = cursor.val(&storage);
1473                let row = row_item.to_datum_iter();
1474
1475                let mut borrow = datum_vec.borrow();
1476                borrow.extend(key);
1477                borrow.extend(row);
1478
1479                if has_literal_constraints {
1480                    // The peek was created from an IndexedFilter join. We have to add those columns
1481                    // here that the join would add in a dataflow.
1482                    let datum_vec = borrow.deref_mut();
1483                    // unwrap is ok, because it could be None only if !has_literal_constraints or if
1484                    // the iteration is finished. In the latter case we already exited the while
1485                    // loop.
1486                    datum_vec.extend(current_literal.unwrap().iter());
1487                }
1488                if let Some(result) = peek
1489                    .map_filter_project
1490                    .evaluate_into(&mut borrow, &arena, &mut row_builder)
1491                    .map(|row| row.cloned())
1492                    .map_err_to_string_with_causes()?
1493                {
1494                    let mut copies = Diff::ZERO;
1495                    cursor.map_times(&storage, |time, diff| {
1496                        if time.less_equal(&peek.timestamp) {
1497                            copies += diff;
1498                        }
1499                    });
1500                    let copies: usize = if copies.is_negative() {
1501                        return Err(format!(
1502                            "Invalid data in source, saw retractions ({}) for row that does not exist: {:?}",
1503                            -copies, &*borrow,
1504                        ));
1505                    } else {
1506                        copies.into_inner().try_into().unwrap()
1507                    };
1508                    // if copies > 0 ... otherwise skip
1509                    if let Some(copies) = NonZeroUsize::new(copies) {
1510                        total_size = total_size
1511                            .saturating_add(result.byte_len())
1512                            .saturating_add(count_byte_size);
1513                        if total_size > max_result_size {
1514                            return Err(format!(
1515                                "result exceeds max size of {}",
1516                                ByteSize::b(u64::cast_from(max_result_size))
1517                            ));
1518                        }
1519                        results.push((result, copies));
1520                    }
1521
1522                    // If we hold many more than `max_results` records, we can thin down
1523                    // `results` using `self.finishing.ordering`.
1524                    if let Some(max_results) = max_results {
1525                        // We use a threshold twice what we intend, to amortize the work
1526                        // across all of the insertions. We could tighten this, but it
1527                        // works for the moment.
1528                        if results.len() >= 2 * max_results {
1529                            if peek.finishing.order_by.is_empty() {
1530                                results.truncate(max_results);
1531                                return Ok(results);
1532                            } else {
1533                                // We can sort `results` and then truncate to `max_results`.
1534                                // This has an effect similar to a priority queue, without
1535                                // its interactive dequeueing properties.
1536                                // TODO: Had we left these as `Vec<Datum>` we would avoid
1537                                // the unpacking; we should consider doing that, although
1538                                // it will require a re-pivot of the code to branch on this
1539                                // inner test (as we prefer not to maintain `Vec<Datum>`
1540                                // in the other case).
1541                                results.sort_by(|left, right| {
1542                                    let left_datums = l_datum_vec.borrow_with(&left.0);
1543                                    let right_datums = r_datum_vec.borrow_with(&right.0);
1544                                    mz_expr::compare_columns(
1545                                        &peek.finishing.order_by,
1546                                        &left_datums,
1547                                        &right_datums,
1548                                        || left.0.cmp(&right.0),
1549                                    )
1550                                });
1551                                let dropped = results.drain(max_results..);
1552                                let dropped_size =
1553                                    dropped.into_iter().fold(0, |acc: usize, (row, _count)| {
1554                                        acc.saturating_add(
1555                                            row.byte_len().saturating_add(count_byte_size),
1556                                        )
1557                                    });
1558                                total_size = total_size.saturating_sub(dropped_size);
1559                            }
1560                        }
1561                    }
1562                }
1563                cursor.step_val(&storage);
1564            }
1565            // The cursor doesn't have anything more to say for the current key.
1566
1567            if !has_literal_constraints {
1568                // We are simply stepping through all the keys that the index has.
1569                cursor.step_key(&storage);
1570            }
1571        }
1572
1573        Ok(results)
1574    }
1575}
1576
1577/// The frontiers we have reported to the controller for a collection.
1578#[derive(Debug)]
1579struct ReportedFrontiers {
1580    /// The reported write frontier.
1581    write_frontier: ReportedFrontier,
1582    /// The reported input frontier.
1583    input_frontier: ReportedFrontier,
1584    /// The reported output frontier.
1585    output_frontier: ReportedFrontier,
1586}
1587
1588impl ReportedFrontiers {
1589    /// Creates a new `ReportedFrontiers` instance.
1590    fn new() -> Self {
1591        Self {
1592            write_frontier: ReportedFrontier::new(),
1593            input_frontier: ReportedFrontier::new(),
1594            output_frontier: ReportedFrontier::new(),
1595        }
1596    }
1597
1598    /// Returns whether all reported frontiers are empty.
1599    fn all_empty(&self) -> bool {
1600        self.write_frontier.is_empty()
1601            && self.input_frontier.is_empty()
1602            && self.output_frontier.is_empty()
1603    }
1604}
1605
1606/// A frontier we have reported to the controller, or the least frontier we are allowed to report.
1607#[derive(Clone, Debug)]
1608pub enum ReportedFrontier {
1609    /// A frontier has been previously reported.
1610    Reported(Antichain<Timestamp>),
1611    /// No frontier has been reported yet.
1612    NotReported {
1613        /// A lower bound for frontiers that may be reported in the future.
1614        lower: Antichain<Timestamp>,
1615    },
1616}
1617
1618impl ReportedFrontier {
1619    /// Create a new `ReportedFrontier` enforcing the minimum lower bound.
1620    pub fn new() -> Self {
1621        let lower = Antichain::from_elem(timely::progress::Timestamp::minimum());
1622        Self::NotReported { lower }
1623    }
1624
1625    /// Whether the reported frontier is the empty frontier.
1626    pub fn is_empty(&self) -> bool {
1627        match self {
1628            Self::Reported(frontier) => frontier.is_empty(),
1629            Self::NotReported { .. } => false,
1630        }
1631    }
1632
1633    /// Whether this `ReportedFrontier` allows reporting the given frontier.
1634    ///
1635    /// A `ReportedFrontier` allows reporting of another frontier if:
1636    ///  * The other frontier is greater than the reported frontier.
1637    ///  * The other frontier is greater than or equal to the lower bound.
1638    fn allows_reporting(&self, other: &Antichain<Timestamp>) -> bool {
1639        match self {
1640            Self::Reported(frontier) => PartialOrder::less_than(frontier, other),
1641            Self::NotReported { lower } => PartialOrder::less_equal(lower, other),
1642        }
1643    }
1644}
1645
1646/// State maintained for a compute collection.
1647pub struct CollectionState {
1648    /// Tracks the frontiers that have been reported to the controller.
1649    reported_frontiers: ReportedFrontiers,
1650    /// Whether this collection is a subscribe or copy-to.
1651    ///
1652    /// The compute protocol does not allow `Frontiers` responses for subscribe and copy-to
1653    /// collections, so we need to be able to recognize them. This is something we would like to
1654    /// change in the future (database-issues#4701).
1655    pub is_subscribe_or_copy: bool,
1656    /// The collection's initial as-of frontier.
1657    ///
1658    /// Used to determine hydration status.
1659    as_of: Antichain<Timestamp>,
1660
1661    /// A token that should be dropped when this collection is dropped to clean up associated
1662    /// sink state.
1663    ///
1664    /// Only `Some` if the collection is a sink.
1665    pub sink_token: Option<SinkToken>,
1666    /// Frontier of sink writes.
1667    ///
1668    /// Only `Some` if the collection is a sink and *not* a subscribe.
1669    pub sink_write_frontier: Option<Rc<RefCell<Antichain<Timestamp>>>>,
1670    /// Frontier probes for every input to the collection.
1671    pub input_probes: BTreeMap<GlobalId, probe::Handle<Timestamp>>,
1672    /// A probe reporting the frontier of times through which all collection outputs have been
1673    /// computed (but not necessarily written).
1674    ///
1675    /// `None` for collections with compute frontiers equal to their write frontiers.
1676    pub compute_probe: Option<probe::Handle<Timestamp>>,
1677    /// Logging state maintained for this collection.
1678    logging: Option<CollectionLogging>,
1679    /// Metrics tracked for this collection.
1680    metrics: CollectionMetrics,
1681}
1682
1683impl CollectionState {
1684    fn new(
1685        is_subscribe_or_copy: bool,
1686        as_of: Antichain<Timestamp>,
1687        metrics: CollectionMetrics,
1688    ) -> Self {
1689        Self {
1690            reported_frontiers: ReportedFrontiers::new(),
1691            is_subscribe_or_copy,
1692            as_of,
1693            sink_token: None,
1694            sink_write_frontier: None,
1695            input_probes: Default::default(),
1696            compute_probe: None,
1697            logging: None,
1698            metrics,
1699        }
1700    }
1701
1702    /// Return the frontiers that have been reported to the controller.
1703    fn reported_frontiers(&self) -> &ReportedFrontiers {
1704        &self.reported_frontiers
1705    }
1706
1707    /// Reset all reported frontiers to the given value.
1708    pub fn reset_reported_frontiers(&mut self, frontier: ReportedFrontier) {
1709        self.reported_frontiers.write_frontier = frontier.clone();
1710        self.reported_frontiers.input_frontier = frontier.clone();
1711        self.reported_frontiers.output_frontier = frontier;
1712    }
1713
1714    /// Set the write frontier that has been reported to the controller.
1715    fn set_reported_write_frontier(&mut self, frontier: ReportedFrontier) {
1716        if let Some(logging) = &mut self.logging {
1717            let time = match &frontier {
1718                ReportedFrontier::Reported(frontier) => frontier.get(0).copied(),
1719                ReportedFrontier::NotReported { .. } => Some(Timestamp::MIN),
1720            };
1721            logging.set_frontier(time);
1722        }
1723
1724        self.reported_frontiers.write_frontier = frontier;
1725    }
1726
1727    /// Set the input frontier that has been reported to the controller.
1728    fn set_reported_input_frontier(&mut self, frontier: ReportedFrontier) {
1729        // Use this opportunity to update our input frontier logging.
1730        if let Some(logging) = &mut self.logging {
1731            for (id, probe) in &self.input_probes {
1732                let new_time = probe.with_frontier(|frontier| frontier.as_option().copied());
1733                logging.set_import_frontier(*id, new_time);
1734            }
1735        }
1736
1737        self.reported_frontiers.input_frontier = frontier;
1738    }
1739
1740    /// Set the output frontier that has been reported to the controller.
1741    fn set_reported_output_frontier(&mut self, frontier: ReportedFrontier) {
1742        let already_hydrated = self.hydrated();
1743
1744        self.reported_frontiers.output_frontier = frontier;
1745
1746        if !already_hydrated && self.hydrated() {
1747            if let Some(logging) = &mut self.logging {
1748                logging.set_hydrated();
1749            }
1750            self.metrics.record_collection_hydrated();
1751        }
1752    }
1753
1754    /// Return whether this collection is hydrated.
1755    fn hydrated(&self) -> bool {
1756        match &self.reported_frontiers.output_frontier {
1757            ReportedFrontier::Reported(frontier) => PartialOrder::less_than(&self.as_of, frontier),
1758            ReportedFrontier::NotReported { .. } => false,
1759        }
1760    }
1761}
1762
1763/// State remembered about a dropped compute collection.
1764///
1765/// This is the subset of the full [`CollectionState`] that survives the invocation of
1766/// `drop_collection`, until it is finally dropped in `report_dropped_collections`. It includes any
1767/// information required to report the dropping of a collection to the controller.
1768///
1769/// Note that this state must _not_ store any state (such as tokens) whose dropping releases
1770/// resources elsewhere in the system. A `DroppedCollection` for a collection dropped during
1771/// reconciliation might be alive at the same time as the [`CollectionState`] for the re-created
1772/// collection, and if the dropped collection hasn't released all its held resources by the time
1773/// the new one is created, conflicts can ensue.
1774pub struct DroppedCollection {
1775    reported_frontiers: ReportedFrontiers,
1776    is_subscribe_or_copy: bool,
1777}
1778
1779/// An event reporting the hydration status of an LIR node in a dataflow.
1780pub struct HydrationEvent {
1781    /// The ID of the export this dataflow maintains.
1782    pub export_id: GlobalId,
1783    /// The ID of the LIR node.
1784    pub lir_id: LirId,
1785    /// Whether the node is hydrated.
1786    pub hydrated: bool,
1787}