Skip to main content

mz_adapter/coord/
sequencer.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10// Prevents anyone from accidentally exporting a method from the `inner` module.
11#![allow(clippy::pub_use)]
12
13//! Logic for executing a planned SQL query.
14
15use std::collections::{BTreeMap, BTreeSet};
16use std::str::FromStr;
17use std::sync::Arc;
18
19use futures::FutureExt;
20use futures::future::LocalBoxFuture;
21use futures::stream::FuturesOrdered;
22use http::Uri;
23use inner::return_if_err;
24use maplit::btreemap;
25use mz_catalog::memory::objects::Cluster;
26use mz_controller_types::ReplicaId;
27use mz_expr::row::RowCollection;
28use mz_expr::{MapFilterProject, MirRelationExpr, ResultSpec, RowSetFinishing};
29use mz_ore::cast::CastFrom;
30use mz_ore::tracing::OpenTelemetryContext;
31use mz_persist_client::stats::SnapshotPartStats;
32use mz_repr::explain::{ExprHumanizerExt, TransientItem};
33use mz_repr::{CatalogItemId, Datum, Diff, GlobalId, IntoRowIterator, Row, RowArena, Timestamp};
34use mz_sql::catalog::{CatalogError, SessionCatalog};
35use mz_sql::names::ResolvedIds;
36use mz_sql::plan::{
37    self, AbortTransactionPlan, CommitTransactionPlan, CopyFromSource, CreateRolePlan,
38    CreateSourcePlanBundle, FetchPlan, HirScalarExpr, MutationKind, Params, Plan, PlanKind,
39    RaisePlan,
40};
41use mz_sql::rbac;
42use mz_sql::session::metadata::SessionMetadata;
43use mz_sql::session::vars;
44use mz_sql::session::vars::SessionVars;
45use mz_sql_parser::ast::{Raw, Statement};
46use mz_storage_client::client::TableData;
47use mz_storage_client::storage_collections::StorageCollections;
48use mz_storage_types::connections::inline::IntoInlineConnection;
49use mz_storage_types::controller::StorageError;
50use mz_storage_types::stats::RelationPartStats;
51use mz_transform::dataflow::DataflowMetainfo;
52use mz_transform::notice::{OptimizerNoticeApi, OptimizerNoticeKind, RawOptimizerNotice};
53use mz_transform::{EmptyStatisticsOracle, StatisticsOracle};
54use timely::progress::Antichain;
55use timely::progress::Timestamp as TimelyTimestamp;
56use tokio::sync::oneshot;
57use tracing::{Instrument, Level, Span, event, warn};
58
59use crate::ExecuteContext;
60use crate::catalog::{Catalog, CatalogState};
61use crate::command::{Command, ExecuteResponse, Response};
62use crate::coord::appends::{DeferredOp, DeferredPlan};
63use crate::coord::validity::PlanValidity;
64use crate::coord::{
65    Coordinator, DeferredPlanStatement, ExplainPlanContext, Message, PlanStatement, TargetCluster,
66    catalog_serving,
67};
68use crate::error::AdapterError;
69use crate::explain::insights::PlanInsightsContext;
70use crate::notice::AdapterNotice;
71use crate::optimize::dataflows::{EvalTime, ExprPrep, ExprPrepOneShot};
72use crate::optimize::peek;
73use crate::session::{
74    EndTransactionAction, Session, StateRevision, TransactionOps, TransactionStatus, WriteOp,
75};
76use crate::util::ClientTransmitter;
77
78// DO NOT make this visible in any way, i.e. do not add any version of
79// `pub` to this mod. The inner `sequence_X` methods are hidden in this
80// private module to prevent anyone from calling them directly. All
81// sequencing should be done through the `sequence_plan` method.
82// This allows us to add catch-all logic that should be applied to all
83// plans in `sequence_plan` and guarantee that no caller can circumvent
84// that logic.
85//
86// The exceptions are:
87//
88// - Creating a role during connection startup. In this scenario, the session has not been properly
89// initialized and we need to skip directly to creating role. We have a specific method,
90// `sequence_create_role_for_startup` for this purpose.
91// - Methods that continue the execution of some plan that was being run asynchronously, such as
92// `sequence_peek_stage` and `sequence_create_connection_stage_finish`.
93// - The frontend peek sequencing temporarily reaches into this module for things that are needed
94//   by both the old and new peek sequencing. TODO(peek-seq): We plan to eliminate this with a
95//   big refactoring after the old peek sequencing is removed.
96
97mod inner;
98
99impl Coordinator {
100    /// BOXED FUTURE: As of Nov 2023 the returned Future from this function was 34KB. This would
101    /// get stored on the stack which is bad for runtime performance, and blow up our stack usage.
102    /// Because of that we purposefully move this Future onto the heap (i.e. Box it).
103    pub(crate) fn sequence_plan(
104        &mut self,
105        mut ctx: ExecuteContext,
106        plan: Plan,
107        resolved_ids: ResolvedIds,
108    ) -> LocalBoxFuture<'_, ()> {
109        async move {
110            let responses = ExecuteResponse::generated_from(&PlanKind::from(&plan));
111            ctx.tx_mut().set_allowed(responses);
112
113            if self.controller.read_only() && !plan.allowed_in_read_only() {
114                ctx.retire(Err(AdapterError::ReadOnly));
115                return;
116            }
117
118            // Check if we're still waiting for any of the builtin table appends from when we
119            // started the Session to complete.
120            if let Some((dependencies, wait_future)) =
121                super::appends::waiting_on_startup_appends(self.catalog(), ctx.session_mut(), &plan)
122            {
123                let conn_id = ctx.session().conn_id();
124                tracing::debug!(%conn_id, "deferring plan for startup appends");
125
126                let role_metadata = ctx.session().role_metadata().clone();
127                let validity = PlanValidity::new(
128                    self.catalog.transient_revision(),
129                    dependencies,
130                    None,
131                    None,
132                    role_metadata,
133                );
134                let deferred_plan = DeferredPlan {
135                    ctx,
136                    plan,
137                    validity,
138                    requires_locks: BTreeSet::default(),
139                };
140                // Defer op accepts an optional write lock, but there aren't any writes occurring
141                // here, since the map to `None`.
142                let acquire_future = wait_future.map(|()| None);
143
144                self.defer_op(acquire_future, DeferredOp::Plan(deferred_plan));
145
146                // Return early because our op is deferred on waiting for the builtin writes to
147                // complete.
148                return;
149            };
150
151            // Scope the borrow of the Catalog because we need to mutate the Coordinator state below.
152            let target_cluster = match ctx.session().transaction().cluster() {
153                // Use the current transaction's cluster.
154                Some(cluster_id) => TargetCluster::Transaction(cluster_id),
155                // If there isn't a current cluster set for a transaction, then try to auto route.
156                None => {
157                    let session_catalog = self.catalog.for_session(ctx.session());
158                    catalog_serving::auto_run_on_catalog_server(
159                        &session_catalog,
160                        ctx.session(),
161                        &plan,
162                    )
163                }
164            };
165            let (target_cluster_id, target_cluster_name) = match self
166                .catalog()
167                .resolve_target_cluster(target_cluster, ctx.session())
168            {
169                Ok(cluster) => (Some(cluster.id), Some(cluster.name.clone())),
170                Err(_) => (None, None),
171            };
172
173            if let (Some(cluster_id), Some(statement_id)) =
174                (target_cluster_id, ctx.extra().contents())
175            {
176                self.set_statement_execution_cluster(statement_id, cluster_id);
177            }
178
179            let session_catalog = self.catalog.for_session(ctx.session());
180
181            if let Some(cluster_name) = &target_cluster_name {
182                if let Err(e) = catalog_serving::check_cluster_restrictions(
183                    cluster_name,
184                    &session_catalog,
185                    &plan,
186                ) {
187                    return ctx.retire(Err(e));
188                }
189            }
190
191            if let Err(e) = rbac::check_plan(
192                &session_catalog,
193                Some(|id| {
194                    // We use linear search through active connections if needed, which is fine
195                    // because the RBAC check will call the closure at most once.
196                    self.active_conns()
197                        .into_iter()
198                        .find(|(conn_id, _)| conn_id.unhandled() == id)
199                        .map(|(_, conn_meta)| *conn_meta.authenticated_role_id())
200                }),
201                ctx.session(),
202                &plan,
203                target_cluster_id,
204                &resolved_ids,
205            ) {
206                return ctx.retire(Err(e.into()));
207            }
208
209            match plan {
210                Plan::CreateSource(plan) => {
211                    let id_ts = self.get_catalog_write_ts().await;
212                    let (item_id, global_id) =
213                        return_if_err!(self.catalog().allocate_user_id(id_ts).await, ctx);
214                    let result = self
215                        .sequence_create_source(
216                            &mut ctx,
217                            vec![CreateSourcePlanBundle {
218                                item_id,
219                                global_id,
220                                plan,
221                                resolved_ids,
222                                available_source_references: None,
223                            }],
224                        )
225                        .await;
226                    ctx.retire(result);
227                }
228                Plan::CreateSources(plans) => {
229                    assert!(
230                        resolved_ids.is_empty(),
231                        "each plan has separate resolved_ids"
232                    );
233                    let result = self.sequence_create_source(&mut ctx, plans).await;
234                    ctx.retire(result);
235                }
236                Plan::CreateConnection(plan) => {
237                    self.sequence_create_connection(ctx, plan, resolved_ids)
238                        .await;
239                }
240                Plan::CreateDatabase(plan) => {
241                    let result = self.sequence_create_database(ctx.session_mut(), plan).await;
242                    ctx.retire(result);
243                }
244                Plan::CreateSchema(plan) => {
245                    let result = self.sequence_create_schema(ctx.session_mut(), plan).await;
246                    ctx.retire(result);
247                }
248                Plan::CreateRole(plan) => {
249                    let result = self
250                        .sequence_create_role(Some(ctx.session().conn_id()), plan)
251                        .await;
252                    if let Some(notice) = self.should_emit_rbac_notice(ctx.session()) {
253                        ctx.session().add_notice(notice);
254                    }
255                    ctx.retire(result);
256                }
257                Plan::CreateCluster(plan) => {
258                    let result = self.sequence_create_cluster(ctx.session(), plan).await;
259                    ctx.retire(result);
260                }
261                Plan::CreateClusterReplica(plan) => {
262                    let result = self
263                        .sequence_create_cluster_replica(ctx.session(), plan)
264                        .await;
265                    ctx.retire(result);
266                }
267                Plan::CreateTable(plan) => {
268                    let result = self
269                        .sequence_create_table(&mut ctx, plan, resolved_ids)
270                        .await;
271                    ctx.retire(result);
272                }
273                Plan::CreateSecret(plan) => {
274                    self.sequence_create_secret(ctx, plan).await;
275                }
276                Plan::CreateSink(plan) => {
277                    self.sequence_create_sink(ctx, plan, resolved_ids).await;
278                }
279                Plan::CreateView(plan) => {
280                    self.sequence_create_view(ctx, plan, resolved_ids).await;
281                }
282                Plan::CreateMaterializedView(plan) => {
283                    self.sequence_create_materialized_view(ctx, plan, resolved_ids)
284                        .await;
285                }
286                Plan::CreateContinualTask(plan) => {
287                    let res = self
288                        .sequence_create_continual_task(&mut ctx, plan, resolved_ids)
289                        .await;
290                    ctx.retire(res);
291                }
292                Plan::CreateIndex(plan) => {
293                    self.sequence_create_index(ctx, plan, resolved_ids).await;
294                }
295                Plan::CreateType(plan) => {
296                    let result = self
297                        .sequence_create_type(ctx.session(), plan, resolved_ids)
298                        .await;
299                    ctx.retire(result);
300                }
301                Plan::CreateNetworkPolicy(plan) => {
302                    let res = self
303                        .sequence_create_network_policy(ctx.session(), plan)
304                        .await;
305                    ctx.retire(res);
306                }
307                Plan::Comment(plan) => {
308                    let result = self.sequence_comment_on(ctx.session(), plan).await;
309                    ctx.retire(result);
310                }
311                Plan::CopyTo(plan) => {
312                    self.sequence_copy_to(ctx, plan, target_cluster).await;
313                }
314                Plan::DropObjects(plan) => {
315                    let result = self.sequence_drop_objects(&mut ctx, plan).await;
316                    ctx.retire(result);
317                }
318                Plan::DropOwned(plan) => {
319                    let result = self.sequence_drop_owned(ctx.session_mut(), plan).await;
320                    ctx.retire(result);
321                }
322                Plan::EmptyQuery => {
323                    ctx.retire(Ok(ExecuteResponse::EmptyQuery));
324                }
325                Plan::ShowAllVariables => {
326                    let result = self.sequence_show_all_variables(ctx.session());
327                    ctx.retire(result);
328                }
329                Plan::ShowVariable(plan) => {
330                    let result = self.sequence_show_variable(ctx.session(), plan);
331                    ctx.retire(result);
332                }
333                Plan::InspectShard(plan) => {
334                    // TODO: Ideally, this await would happen off the main thread.
335                    let result = self.sequence_inspect_shard(ctx.session(), plan).await;
336                    ctx.retire(result);
337                }
338                Plan::SetVariable(plan) => {
339                    let result = self.sequence_set_variable(ctx.session_mut(), plan);
340                    ctx.retire(result);
341                }
342                Plan::ResetVariable(plan) => {
343                    let result = self.sequence_reset_variable(ctx.session_mut(), plan);
344                    ctx.retire(result);
345                }
346                Plan::SetTransaction(plan) => {
347                    let result = self.sequence_set_transaction(ctx.session_mut(), plan);
348                    ctx.retire(result);
349                }
350                Plan::StartTransaction(plan) => {
351                    if matches!(
352                        ctx.session().transaction(),
353                        TransactionStatus::InTransaction(_)
354                    ) {
355                        ctx.session()
356                            .add_notice(AdapterNotice::ExistingTransactionInProgress);
357                    }
358                    let result = ctx.session_mut().start_transaction(
359                        self.now_datetime(),
360                        plan.access,
361                        plan.isolation_level,
362                    );
363                    ctx.retire(result.map(|_| ExecuteResponse::StartedTransaction))
364                }
365                Plan::CommitTransaction(CommitTransactionPlan {
366                    ref transaction_type,
367                })
368                | Plan::AbortTransaction(AbortTransactionPlan {
369                    ref transaction_type,
370                }) => {
371                    // Serialize DDL transactions. Statements that use this mode must return false
372                    // in `must_serialize_ddl()`.
373                    if ctx.session().transaction().is_ddl() {
374                        if let Ok(guard) = self.serialized_ddl.try_lock_owned() {
375                            let prev = self
376                                .active_conns
377                                .get_mut(ctx.session().conn_id())
378                                .expect("connection must exist")
379                                .deferred_lock
380                                .replace(guard);
381                            assert!(
382                                prev.is_none(),
383                                "connections should have at most one lock guard"
384                            );
385                        } else {
386                            self.serialized_ddl.push_back(DeferredPlanStatement {
387                                ctx,
388                                ps: PlanStatement::Plan { plan, resolved_ids },
389                            });
390                            return;
391                        }
392                    }
393
394                    let action = match &plan {
395                        Plan::CommitTransaction(_) => EndTransactionAction::Commit,
396                        Plan::AbortTransaction(_) => EndTransactionAction::Rollback,
397                        _ => unreachable!(),
398                    };
399                    if ctx.session().transaction().is_implicit() && !transaction_type.is_implicit()
400                    {
401                        // In Postgres, if a user sends a COMMIT or ROLLBACK in an
402                        // implicit transaction, a warning is sent warning them.
403                        // (The transaction is still closed and a new implicit
404                        // transaction started, though.)
405                        ctx.session().add_notice(
406                            AdapterNotice::ExplicitTransactionControlInImplicitTransaction,
407                        );
408                    }
409                    self.sequence_end_transaction(ctx, action).await;
410                }
411                Plan::Select(plan) => {
412                    let max = Some(ctx.session().vars().max_query_result_size());
413                    self.sequence_peek(ctx, plan, target_cluster, max).await;
414                }
415                Plan::Subscribe(plan) => {
416                    self.sequence_subscribe(ctx, plan, target_cluster).await;
417                }
418                Plan::SideEffectingFunc(plan) => {
419                    self.sequence_side_effecting_func(ctx, plan).await;
420                }
421                Plan::ShowCreate(plan) => {
422                    ctx.retire(Ok(Self::send_immediate_rows(plan.row)));
423                }
424                Plan::ShowColumns(show_columns_plan) => {
425                    let max = Some(ctx.session().vars().max_query_result_size());
426                    self.sequence_peek(ctx, show_columns_plan.select_plan, target_cluster, max)
427                        .await;
428                }
429                Plan::CopyFrom(plan) => match plan.source {
430                    CopyFromSource::Stdin => {
431                        let (tx, _, session, ctx_extra) = ctx.into_parts();
432                        tx.send(
433                            Ok(ExecuteResponse::CopyFrom {
434                                target_id: plan.target_id,
435                                target_name: plan.target_name,
436                                columns: plan.columns,
437                                params: plan.params,
438                                ctx_extra,
439                            }),
440                            session,
441                        );
442                    }
443                    CopyFromSource::Url(_) | CopyFromSource::AwsS3 { .. } => {
444                        self.sequence_copy_from(ctx, plan, target_cluster).await;
445                    }
446                },
447                Plan::ExplainPlan(plan) => {
448                    self.sequence_explain_plan(ctx, plan, target_cluster).await;
449                }
450                Plan::ExplainPushdown(plan) => {
451                    self.sequence_explain_pushdown(ctx, plan, target_cluster)
452                        .await;
453                }
454                Plan::ExplainSinkSchema(plan) => {
455                    let result = self.sequence_explain_schema(plan);
456                    ctx.retire(result);
457                }
458                Plan::ExplainTimestamp(plan) => {
459                    self.sequence_explain_timestamp(ctx, plan, target_cluster)
460                        .await;
461                }
462                Plan::Insert(plan) => {
463                    self.sequence_insert(ctx, plan).await;
464                }
465                Plan::ReadThenWrite(plan) => {
466                    self.sequence_read_then_write(ctx, plan).await;
467                }
468                Plan::AlterNoop(plan) => {
469                    ctx.retire(Ok(ExecuteResponse::AlteredObject(plan.object_type)));
470                }
471                Plan::AlterCluster(plan) => {
472                    self.sequence_alter_cluster_staged(ctx, plan).await;
473                }
474                Plan::AlterClusterRename(plan) => {
475                    let result = self.sequence_alter_cluster_rename(&mut ctx, plan).await;
476                    ctx.retire(result);
477                }
478                Plan::AlterClusterSwap(plan) => {
479                    let result = self.sequence_alter_cluster_swap(&mut ctx, plan).await;
480                    ctx.retire(result);
481                }
482                Plan::AlterClusterReplicaRename(plan) => {
483                    let result = self
484                        .sequence_alter_cluster_replica_rename(ctx.session(), plan)
485                        .await;
486                    ctx.retire(result);
487                }
488                Plan::AlterConnection(plan) => {
489                    self.sequence_alter_connection(ctx, plan).await;
490                }
491                Plan::AlterSetCluster(plan) => {
492                    let result = self.sequence_alter_set_cluster(ctx.session(), plan).await;
493                    ctx.retire(result);
494                }
495                Plan::AlterRetainHistory(plan) => {
496                    let result = self.sequence_alter_retain_history(&mut ctx, plan).await;
497                    ctx.retire(result);
498                }
499                Plan::AlterItemRename(plan) => {
500                    let result = self.sequence_alter_item_rename(&mut ctx, plan).await;
501                    ctx.retire(result);
502                }
503                Plan::AlterSchemaRename(plan) => {
504                    let result = self.sequence_alter_schema_rename(&mut ctx, plan).await;
505                    ctx.retire(result);
506                }
507                Plan::AlterSchemaSwap(plan) => {
508                    let result = self.sequence_alter_schema_swap(&mut ctx, plan).await;
509                    ctx.retire(result);
510                }
511                Plan::AlterRole(plan) => {
512                    let result = self.sequence_alter_role(ctx.session_mut(), plan).await;
513                    ctx.retire(result);
514                }
515                Plan::AlterSecret(plan) => {
516                    self.sequence_alter_secret(ctx, plan).await;
517                }
518                Plan::AlterSink(plan) => {
519                    self.sequence_alter_sink_prepare(ctx, plan).await;
520                }
521                Plan::AlterSource(plan) => {
522                    let result = self.sequence_alter_source(ctx.session_mut(), plan).await;
523                    ctx.retire(result);
524                }
525                Plan::AlterSystemSet(plan) => {
526                    let result = self.sequence_alter_system_set(ctx.session(), plan).await;
527                    ctx.retire(result);
528                }
529                Plan::AlterSystemReset(plan) => {
530                    let result = self.sequence_alter_system_reset(ctx.session(), plan).await;
531                    ctx.retire(result);
532                }
533                Plan::AlterSystemResetAll(plan) => {
534                    let result = self
535                        .sequence_alter_system_reset_all(ctx.session(), plan)
536                        .await;
537                    ctx.retire(result);
538                }
539                Plan::AlterTableAddColumn(plan) => {
540                    let result = self.sequence_alter_table(&mut ctx, plan).await;
541                    ctx.retire(result);
542                }
543                Plan::AlterMaterializedViewApplyReplacement(plan) => {
544                    self.sequence_alter_materialized_view_apply_replacement_prepare(ctx, plan)
545                        .await;
546                }
547                Plan::AlterNetworkPolicy(plan) => {
548                    let res = self
549                        .sequence_alter_network_policy(ctx.session(), plan)
550                        .await;
551                    ctx.retire(res);
552                }
553                Plan::DiscardTemp => {
554                    self.drop_temp_items(ctx.session().conn_id()).await;
555                    ctx.retire(Ok(ExecuteResponse::DiscardedTemp));
556                }
557                Plan::DiscardAll => {
558                    let ret = if let TransactionStatus::Started(_) = ctx.session().transaction() {
559                        self.clear_transaction(ctx.session_mut()).await;
560                        self.drop_temp_items(ctx.session().conn_id()).await;
561                        ctx.session_mut().reset();
562                        Ok(ExecuteResponse::DiscardedAll)
563                    } else {
564                        Err(AdapterError::OperationProhibitsTransaction(
565                            "DISCARD ALL".into(),
566                        ))
567                    };
568                    ctx.retire(ret);
569                }
570                Plan::Declare(plan) => {
571                    self.declare(ctx, plan.name, plan.stmt, plan.sql, plan.params);
572                }
573                Plan::Fetch(FetchPlan {
574                    name,
575                    count,
576                    timeout,
577                }) => {
578                    let ctx_extra = std::mem::take(ctx.extra_mut());
579                    ctx.retire(Ok(ExecuteResponse::Fetch {
580                        name,
581                        count,
582                        timeout,
583                        ctx_extra,
584                    }));
585                }
586                Plan::Close(plan) => {
587                    if ctx.session_mut().remove_portal(&plan.name) {
588                        ctx.retire(Ok(ExecuteResponse::ClosedCursor));
589                    } else {
590                        ctx.retire(Err(AdapterError::UnknownCursor(plan.name)));
591                    }
592                }
593                Plan::Prepare(plan) => {
594                    if ctx
595                        .session()
596                        .get_prepared_statement_unverified(&plan.name)
597                        .is_some()
598                    {
599                        ctx.retire(Err(AdapterError::PreparedStatementExists(plan.name)));
600                    } else {
601                        let state_revision = StateRevision {
602                            catalog_revision: self.catalog().transient_revision(),
603                            session_state_revision: ctx.session().state_revision(),
604                        };
605                        ctx.session_mut().set_prepared_statement(
606                            plan.name,
607                            Some(plan.stmt),
608                            plan.sql,
609                            plan.desc,
610                            state_revision,
611                            self.now(),
612                        );
613                        ctx.retire(Ok(ExecuteResponse::Prepare));
614                    }
615                }
616                Plan::Execute(plan) => {
617                    match self.sequence_execute(ctx.session_mut(), plan) {
618                        Ok(portal_name) => {
619                            let (tx, _, session, extra) = ctx.into_parts();
620                            self.internal_cmd_tx
621                                .send(Message::Command(
622                                    OpenTelemetryContext::obtain(),
623                                    Command::Execute {
624                                        portal_name,
625                                        session,
626                                        tx: tx.take(),
627                                        outer_ctx_extra: Some(extra),
628                                    },
629                                ))
630                                .expect("sending to self.internal_cmd_tx cannot fail");
631                        }
632                        Err(err) => ctx.retire(Err(err)),
633                    };
634                }
635                Plan::Deallocate(plan) => match plan.name {
636                    Some(name) => {
637                        if ctx.session_mut().remove_prepared_statement(&name) {
638                            ctx.retire(Ok(ExecuteResponse::Deallocate { all: false }));
639                        } else {
640                            ctx.retire(Err(AdapterError::UnknownPreparedStatement(name)));
641                        }
642                    }
643                    None => {
644                        ctx.session_mut().remove_all_prepared_statements();
645                        ctx.retire(Ok(ExecuteResponse::Deallocate { all: true }));
646                    }
647                },
648                Plan::Raise(RaisePlan { severity }) => {
649                    ctx.session()
650                        .add_notice(AdapterNotice::UserRequested { severity });
651                    ctx.retire(Ok(ExecuteResponse::Raised));
652                }
653                Plan::GrantPrivileges(plan) => {
654                    let result = self
655                        .sequence_grant_privileges(ctx.session_mut(), plan)
656                        .await;
657                    ctx.retire(result);
658                }
659                Plan::RevokePrivileges(plan) => {
660                    let result = self
661                        .sequence_revoke_privileges(ctx.session_mut(), plan)
662                        .await;
663                    ctx.retire(result);
664                }
665                Plan::AlterDefaultPrivileges(plan) => {
666                    let result = self
667                        .sequence_alter_default_privileges(ctx.session_mut(), plan)
668                        .await;
669                    ctx.retire(result);
670                }
671                Plan::GrantRole(plan) => {
672                    let result = self.sequence_grant_role(ctx.session_mut(), plan).await;
673                    ctx.retire(result);
674                }
675                Plan::RevokeRole(plan) => {
676                    let result = self.sequence_revoke_role(ctx.session_mut(), plan).await;
677                    ctx.retire(result);
678                }
679                Plan::AlterOwner(plan) => {
680                    let result = self.sequence_alter_owner(ctx.session_mut(), plan).await;
681                    ctx.retire(result);
682                }
683                Plan::ReassignOwned(plan) => {
684                    let result = self.sequence_reassign_owned(ctx.session_mut(), plan).await;
685                    ctx.retire(result);
686                }
687                Plan::ValidateConnection(plan) => {
688                    let connection = plan
689                        .connection
690                        .into_inline_connection(self.catalog().state());
691                    let current_storage_configuration = self.controller.storage.config().clone();
692                    mz_ore::task::spawn(|| "coord::validate_connection", async move {
693                        let res = match connection
694                            .validate(plan.id, &current_storage_configuration)
695                            .await
696                        {
697                            Ok(()) => Ok(ExecuteResponse::ValidatedConnection),
698                            Err(err) => Err(err.into()),
699                        };
700                        ctx.retire(res);
701                    });
702                }
703            }
704        }
705        .instrument(tracing::debug_span!("coord::sequencer::sequence_plan"))
706        .boxed_local()
707    }
708
709    #[mz_ore::instrument(level = "debug")]
710    pub(crate) async fn sequence_execute_single_statement_transaction(
711        &mut self,
712        ctx: ExecuteContext,
713        stmt: Arc<Statement<Raw>>,
714        params: Params,
715    ) {
716        // Put the session into single statement implicit so anything can execute.
717        let (tx, internal_cmd_tx, mut session, extra) = ctx.into_parts();
718        assert!(matches!(session.transaction(), TransactionStatus::Default));
719        session.start_transaction_single_stmt(self.now_datetime());
720        let conn_id = session.conn_id().unhandled();
721
722        // Execute the saved statement in a temp transmitter so we can run COMMIT.
723        let (sub_tx, sub_rx) = oneshot::channel();
724        let sub_ct = ClientTransmitter::new(sub_tx, self.internal_cmd_tx.clone());
725        let sub_ctx = ExecuteContext::from_parts(sub_ct, internal_cmd_tx, session, extra);
726        self.handle_execute_inner(stmt, params, sub_ctx).await;
727
728        // The response can need off-thread processing. Wait for it elsewhere so the coordinator can
729        // continue processing.
730        let internal_cmd_tx = self.internal_cmd_tx.clone();
731        mz_ore::task::spawn(
732            || format!("execute_single_statement:{conn_id}"),
733            async move {
734                let Ok(Response {
735                    result,
736                    session,
737                    otel_ctx,
738                }) = sub_rx.await
739                else {
740                    // Coordinator went away.
741                    return;
742                };
743                otel_ctx.attach_as_parent();
744                let (sub_tx, sub_rx) = oneshot::channel();
745                let _ = internal_cmd_tx.send(Message::Command(
746                    otel_ctx,
747                    Command::Commit {
748                        action: EndTransactionAction::Commit,
749                        session,
750                        tx: sub_tx,
751                    },
752                ));
753                let Ok(commit_response) = sub_rx.await else {
754                    // Coordinator went away.
755                    return;
756                };
757                assert!(matches!(
758                    commit_response.session.transaction(),
759                    TransactionStatus::Default
760                ));
761                // The fake, generated response was already sent to the user and we don't need to
762                // ever send an `Ok(result)` to the user, because they are expecting a response from
763                // a `COMMIT`. So, always send the `COMMIT`'s result if the original statement
764                // succeeded. If it failed, we can send an error and don't need to wrap it or send a
765                // later COMMIT or ROLLBACK.
766                let result = match (result, commit_response.result) {
767                    (Ok(_), commit) => commit,
768                    (Err(result), _) => Err(result),
769                };
770                // We ignore the resp.result because it's not clear what to do if it failed since we
771                // can only send a single ExecuteResponse to tx.
772                tx.send(result, commit_response.session);
773            }
774            .instrument(Span::current()),
775        );
776    }
777
778    /// Creates a role during connection startup.
779    ///
780    /// This should not be called from anywhere except connection startup.
781    #[mz_ore::instrument(level = "debug")]
782    pub(crate) async fn sequence_create_role_for_startup(
783        &mut self,
784        plan: CreateRolePlan,
785    ) -> Result<ExecuteResponse, AdapterError> {
786        // This does not set conn_id because it's not yet in active_conns. That is because we can't
787        // make a ConnMeta until we have a role id which we don't have until after the catalog txn
788        // is committed. Passing None here means the audit log won't have a user set in the event's
789        // user field. This seems fine because it is indeed the system that is creating this role,
790        // not a user request, and the user name is still recorded in the plan, so we aren't losing
791        // information.
792        self.sequence_create_role(None, plan).await
793    }
794
795    pub(crate) fn allocate_transient_id(&self) -> (CatalogItemId, GlobalId) {
796        self.transient_id_gen.allocate_id()
797    }
798
799    fn should_emit_rbac_notice(&self, session: &Session) -> Option<AdapterNotice> {
800        if !rbac::is_rbac_enabled_for_session(self.catalog.system_config(), session) {
801            Some(AdapterNotice::RbacUserDisabled)
802        } else {
803            None
804        }
805    }
806
807    /// Inserts the rows from `constants` into the table identified by `target_id`.
808    ///
809    /// # Panics
810    ///
811    /// Panics if `target_id` doesn't refer to a table.
812    /// Panics if `constants` is not an `MirRelationExpr::Constant`.
813    pub(crate) fn insert_constant(
814        catalog: &Catalog,
815        session: &mut Session,
816        target_id: CatalogItemId,
817        constants: MirRelationExpr,
818    ) -> Result<ExecuteResponse, AdapterError> {
819        // Insert can be queued, so we need to re-verify the id exists.
820        let desc = match catalog.try_get_entry(&target_id) {
821            Some(table) => {
822                // Inserts always happen at the latest version of a table.
823                table.relation_desc_latest().expect("table has desc")
824            }
825            None => {
826                return Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
827                    kind: mz_catalog::memory::error::ErrorKind::Sql(CatalogError::UnknownItem(
828                        target_id.to_string(),
829                    )),
830                }));
831            }
832        };
833
834        match constants.as_const() {
835            Some((rows, ..)) => {
836                let rows = rows.clone()?;
837                for (row, _) in &rows {
838                    for (i, datum) in row.iter().enumerate() {
839                        desc.constraints_met(i, &datum)?;
840                    }
841                }
842                let diffs_plan = plan::SendDiffsPlan {
843                    id: target_id,
844                    updates: rows,
845                    kind: MutationKind::Insert,
846                    returning: Vec::new(),
847                    max_result_size: catalog.system_config().max_result_size(),
848                };
849                Self::send_diffs(session, diffs_plan)
850            }
851            None => panic!(
852                "tried using sequence_insert_constant on non-constant MirRelationExpr\n{}",
853                constants.pretty(),
854            ),
855        }
856    }
857
858    #[mz_ore::instrument(level = "debug")]
859    pub(crate) fn send_diffs(
860        session: &mut Session,
861        mut plan: plan::SendDiffsPlan,
862    ) -> Result<ExecuteResponse, AdapterError> {
863        let affected_rows = {
864            let mut affected_rows = Diff::from(0);
865            let mut all_positive_diffs = true;
866            // If all diffs are positive, the number of affected rows is just the
867            // sum of all unconsolidated diffs.
868            for (_, diff) in plan.updates.iter() {
869                if diff.is_negative() {
870                    all_positive_diffs = false;
871                    break;
872                }
873
874                affected_rows += diff;
875            }
876
877            if !all_positive_diffs {
878                // Consolidate rows. This is useful e.g. for an UPDATE where the row
879                // doesn't change, and we need to reflect that in the number of
880                // affected rows.
881                differential_dataflow::consolidation::consolidate(&mut plan.updates);
882
883                affected_rows = Diff::ZERO;
884                // With retractions, the number of affected rows is not the number
885                // of rows we see, but the sum of the absolute value of their diffs,
886                // e.g. if one row is retracted and another is added, the total
887                // number of rows affected is 2.
888                for (_, diff) in plan.updates.iter() {
889                    affected_rows += diff.abs();
890                }
891            }
892
893            usize::try_from(affected_rows.into_inner()).expect("positive Diff must fit")
894        };
895        event!(
896            Level::TRACE,
897            affected_rows,
898            id = format!("{:?}", plan.id),
899            kind = format!("{:?}", plan.kind),
900            updates = plan.updates.len(),
901            returning = plan.returning.len(),
902        );
903
904        session.add_transaction_ops(TransactionOps::Writes(vec![WriteOp {
905            id: plan.id,
906            rows: TableData::Rows(plan.updates),
907        }]))?;
908        if !plan.returning.is_empty() {
909            let finishing = RowSetFinishing {
910                order_by: Vec::new(),
911                limit: None,
912                offset: 0,
913                project: (0..plan.returning[0].0.iter().count()).collect(),
914            };
915            let max_returned_query_size = session.vars().max_query_result_size();
916            let duration_histogram = session.metrics().row_set_finishing_seconds();
917
918            return match finishing.finish(
919                RowCollection::new(plan.returning, &finishing.order_by),
920                plan.max_result_size,
921                Some(max_returned_query_size),
922                duration_histogram,
923            ) {
924                Ok((rows, _size_bytes)) => Ok(Self::send_immediate_rows(rows)),
925                Err(e) => Err(AdapterError::ResultSize(e)),
926            };
927        }
928        Ok(match plan.kind {
929            MutationKind::Delete => ExecuteResponse::Deleted(affected_rows),
930            MutationKind::Insert => ExecuteResponse::Inserted(affected_rows),
931            MutationKind::Update => ExecuteResponse::Updated(affected_rows / 2),
932        })
933    }
934}
935
936/// Checks whether we should emit diagnostic
937/// information associated with reading per-replica sources.
938///
939/// If an unrecoverable error is found (today: an untargeted read on a
940/// cluster with a non-1 number of replicas), return that.  Otherwise,
941/// return a list of associated notices (today: we always emit exactly
942/// one notice if there are any per-replica log dependencies and if
943/// `emit_introspection_query_notice` is set, and none otherwise.)
944pub(crate) fn check_log_reads(
945    catalog: &Catalog,
946    cluster: &Cluster,
947    source_ids: &BTreeSet<GlobalId>,
948    target_replica: &mut Option<ReplicaId>,
949    vars: &SessionVars,
950) -> Result<impl IntoIterator<Item = AdapterNotice>, AdapterError>
951where
952{
953    let log_names = source_ids
954        .iter()
955        .map(|gid| catalog.resolve_item_id(gid))
956        .flat_map(|item_id| catalog.introspection_dependencies(item_id))
957        .map(|item_id| catalog.get_entry(&item_id).name().item.clone())
958        .collect::<Vec<_>>();
959
960    if log_names.is_empty() {
961        return Ok(None);
962    }
963
964    // Reading from log sources on replicated clusters is only allowed if a
965    // target replica is selected. Otherwise, we have no way of knowing which
966    // replica we read the introspection data from.
967    let num_replicas = cluster.replicas().count();
968    if target_replica.is_none() {
969        if num_replicas == 1 {
970            *target_replica = cluster.replicas().map(|r| r.replica_id).next();
971        } else {
972            return Err(AdapterError::UntargetedLogRead { log_names });
973        }
974    }
975
976    // Ensure that logging is initialized for the target replica, lest
977    // we try to read from a non-existing arrangement.
978    let replica_id = target_replica.expect("set to `Some` above");
979    let replica = &cluster.replica(replica_id).expect("Replica must exist");
980    if !replica.config.compute.logging.enabled() {
981        return Err(AdapterError::IntrospectionDisabled { log_names });
982    }
983
984    Ok(vars
985        .emit_introspection_query_notice()
986        .then_some(AdapterNotice::PerReplicaLogRead { log_names }))
987}
988
989/// Forward notices that we got from the optimizer.
990pub(crate) fn emit_optimizer_notices(
991    catalog: &Catalog,
992    session: &Session,
993    notices: &Vec<RawOptimizerNotice>,
994) {
995    // `for_session` below is expensive, so return early if there's nothing to do.
996    if notices.is_empty() {
997        return;
998    }
999    let humanizer = catalog.for_session(session);
1000    let system_vars = catalog.system_config();
1001    for notice in notices {
1002        let kind = OptimizerNoticeKind::from(notice);
1003        let notice_enabled = match kind {
1004            OptimizerNoticeKind::EqualsNull => system_vars.enable_notices_for_equals_null(),
1005            OptimizerNoticeKind::IndexAlreadyExists => {
1006                system_vars.enable_notices_for_index_already_exists()
1007            }
1008            OptimizerNoticeKind::IndexTooWideForLiteralConstraints => {
1009                system_vars.enable_notices_for_index_too_wide_for_literal_constraints()
1010            }
1011            OptimizerNoticeKind::IndexKeyEmpty => system_vars.enable_notices_for_index_empty_key(),
1012        };
1013        if notice_enabled {
1014            // We don't need to redact the notice parts because
1015            // `emit_optimizer_notices` is only called by the `sequence_~`
1016            // method for the statement that produces that notice.
1017            session.add_notice(AdapterNotice::OptimizerNotice {
1018                notice: notice.message(&humanizer, false).to_string(),
1019                hint: notice.hint(&humanizer, false).to_string(),
1020            });
1021        }
1022        session
1023            .metrics()
1024            .optimization_notices(&[kind.metric_label()])
1025            .inc_by(1);
1026    }
1027}
1028
1029/// Evaluates a COPY TO target URI expression and validates it.
1030///
1031/// This function is shared between the old peek sequencing (sequence_copy_to)
1032/// and the new frontend peek sequencing to avoid code duplication.
1033pub fn eval_copy_to_uri(
1034    to: HirScalarExpr,
1035    session: &Session,
1036    catalog_state: &CatalogState,
1037) -> Result<Uri, AdapterError> {
1038    let style = ExprPrepOneShot {
1039        logical_time: EvalTime::NotAvailable,
1040        session,
1041        catalog_state,
1042    };
1043    let mut to = to.lower_uncorrelated(catalog_state.system_config())?;
1044    style.prep_scalar_expr(&mut to)?;
1045    let temp_storage = RowArena::new();
1046    let evaled = to.eval(&[], &temp_storage)?;
1047    if evaled == Datum::Null {
1048        coord_bail!("COPY TO target value can not be null");
1049    }
1050    let to_url = match Uri::from_str(evaled.unwrap_str()) {
1051        Ok(url) => {
1052            if url.scheme_str() != Some("s3") {
1053                coord_bail!("only 's3://...' urls are supported as COPY TO target");
1054            }
1055            url
1056        }
1057        Err(e) => coord_bail!("could not parse COPY TO target url: {}", e),
1058    };
1059    Ok(to_url)
1060}
1061
1062/// Returns a future that will execute EXPLAIN FILTER PUSHDOWN, i.e., compute the filter pushdown
1063/// statistics for the given collections with the given MFPs.
1064///
1065/// (Shared helper fn between the old and new sequencing. This doesn't take the Coordinator as a
1066/// parameter, but instead just the specifically necessary things are passed in, so that the
1067/// frontend peek sequencing can also call it.)
1068pub(crate) async fn explain_pushdown_future_inner<
1069    I: IntoIterator<Item = (GlobalId, MapFilterProject)>,
1070>(
1071    session: &Session,
1072    catalog: &Catalog,
1073    storage_collections: &Arc<dyn StorageCollections<Timestamp = Timestamp> + Send + Sync>,
1074    as_of: Antichain<Timestamp>,
1075    mz_now: ResultSpec<'static>,
1076    imports: I,
1077) -> impl Future<Output = Result<ExecuteResponse, AdapterError>> + use<I> {
1078    let explain_timeout = *session.vars().statement_timeout();
1079    let mut futures = FuturesOrdered::new();
1080    for (id, mfp) in imports {
1081        let catalog_entry = catalog.get_entry_by_global_id(&id);
1082        let full_name = catalog
1083            .for_session(session)
1084            .resolve_full_name(&catalog_entry.name);
1085        let name = format!("{}", full_name);
1086        let relation_desc = catalog_entry
1087            .relation_desc()
1088            .expect("source should have a proper desc")
1089            .into_owned();
1090        let stats_future = storage_collections
1091            .snapshot_parts_stats(id, as_of.clone())
1092            .await;
1093
1094        let mz_now = mz_now.clone();
1095        // These futures may block if the source is not yet readable at the as-of;
1096        // stash them in `futures` and only block on them in a separate task.
1097        // TODO(peek-seq): This complication won't be needed once this function will only be called
1098        // from the new peek sequencing, in which case it will be fine to block the current task.
1099        futures.push_back(async move {
1100            let snapshot_stats = match stats_future.await {
1101                Ok(stats) => stats,
1102                Err(e) => return Err(e),
1103            };
1104            let mut total_bytes = 0;
1105            let mut total_parts = 0;
1106            let mut selected_bytes = 0;
1107            let mut selected_parts = 0;
1108            for SnapshotPartStats {
1109                encoded_size_bytes: bytes,
1110                stats,
1111            } in &snapshot_stats.parts
1112            {
1113                let bytes = u64::cast_from(*bytes);
1114                total_bytes += bytes;
1115                total_parts += 1u64;
1116                let selected = match stats {
1117                    None => true,
1118                    Some(stats) => {
1119                        let stats = stats.decode();
1120                        let stats = RelationPartStats::new(
1121                            name.as_str(),
1122                            &snapshot_stats.metrics.pushdown.part_stats,
1123                            &relation_desc,
1124                            &stats,
1125                        );
1126                        stats.may_match_mfp(mz_now.clone(), &mfp)
1127                    }
1128                };
1129
1130                if selected {
1131                    selected_bytes += bytes;
1132                    selected_parts += 1u64;
1133                }
1134            }
1135            Ok(Row::pack_slice(&[
1136                name.as_str().into(),
1137                total_bytes.into(),
1138                selected_bytes.into(),
1139                total_parts.into(),
1140                selected_parts.into(),
1141            ]))
1142        });
1143    }
1144
1145    let fut = async move {
1146        match tokio::time::timeout(
1147            explain_timeout,
1148            futures::TryStreamExt::try_collect::<Vec<_>>(futures),
1149        )
1150        .await
1151        {
1152            Ok(Ok(rows)) => Ok(ExecuteResponse::SendingRowsImmediate {
1153                rows: Box::new(rows.into_row_iter()),
1154            }),
1155            Ok(Err(err)) => Err(err.into()),
1156            Err(_) => Err(AdapterError::StatementTimeout),
1157        }
1158    };
1159    fut
1160}
1161
1162/// Generates EXPLAIN PLAN output.
1163/// (Shared helper fn between the old and new sequencing.)
1164pub(crate) async fn explain_plan_inner(
1165    session: &Session,
1166    catalog: &Catalog,
1167    df_meta: DataflowMetainfo,
1168    explain_ctx: ExplainPlanContext,
1169    optimizer: peek::Optimizer,
1170    insights_ctx: Option<Box<PlanInsightsContext>>,
1171) -> Result<Vec<Row>, AdapterError> {
1172    let ExplainPlanContext {
1173        config,
1174        format,
1175        stage,
1176        desc,
1177        optimizer_trace,
1178        ..
1179    } = explain_ctx;
1180
1181    let desc = desc.expect("RelationDesc for SelectPlan in EXPLAIN mode");
1182
1183    let session_catalog = catalog.for_session(session);
1184    let expr_humanizer = {
1185        let transient_items = btreemap! {
1186            optimizer.select_id() => TransientItem::new(
1187                Some(vec![GlobalId::Explain.to_string()]),
1188                Some(desc.iter_names().map(|c| c.to_string()).collect()),
1189            )
1190        };
1191        ExprHumanizerExt::new(transient_items, &session_catalog)
1192    };
1193
1194    let finishing = if optimizer.finishing().is_trivial(desc.arity()) {
1195        None
1196    } else {
1197        Some(optimizer.finishing().clone())
1198    };
1199
1200    let target_cluster = catalog.get_cluster(optimizer.cluster_id());
1201    let features = optimizer.config().features.clone();
1202
1203    let rows = optimizer_trace
1204        .into_rows(
1205            format,
1206            &config,
1207            &features,
1208            &expr_humanizer,
1209            finishing,
1210            Some(target_cluster),
1211            df_meta,
1212            stage,
1213            plan::ExplaineeStatementKind::Select,
1214            insights_ctx,
1215        )
1216        .await?;
1217
1218    Ok(rows)
1219}
1220
1221/// Creates a statistics oracle for query optimization.
1222///
1223/// This is a free-standing function that can be called from both the old peek sequencing
1224/// and the new frontend peek sequencing.
1225pub(crate) async fn statistics_oracle(
1226    session: &Session,
1227    source_ids: &BTreeSet<GlobalId>,
1228    query_as_of: &Antichain<Timestamp>,
1229    is_oneshot: bool,
1230    system_config: &vars::SystemVars,
1231    storage_collections: &dyn StorageCollections<Timestamp = Timestamp>,
1232) -> Result<Box<dyn StatisticsOracle>, AdapterError> {
1233    if !session.vars().enable_session_cardinality_estimates() {
1234        return Ok(Box::new(EmptyStatisticsOracle));
1235    }
1236
1237    let timeout = if is_oneshot {
1238        // TODO(mgree): ideally, we would shorten the timeout even more if we think the query could take the fast path
1239        system_config.optimizer_oneshot_stats_timeout()
1240    } else {
1241        system_config.optimizer_stats_timeout()
1242    };
1243
1244    let cached_stats = mz_ore::future::timeout(
1245        timeout,
1246        CachedStatisticsOracle::new(source_ids, query_as_of, storage_collections),
1247    )
1248    .await;
1249
1250    match cached_stats {
1251        Ok(stats) => Ok(Box::new(stats)),
1252        Err(mz_ore::future::TimeoutError::DeadlineElapsed) => {
1253            warn!(
1254                is_oneshot = is_oneshot,
1255                "optimizer statistics collection timed out after {}ms",
1256                timeout.as_millis()
1257            );
1258
1259            Ok(Box::new(EmptyStatisticsOracle))
1260        }
1261        Err(mz_ore::future::TimeoutError::Inner(e)) => Err(AdapterError::Storage(e)),
1262    }
1263}
1264
1265#[derive(Debug)]
1266struct CachedStatisticsOracle {
1267    cache: BTreeMap<GlobalId, usize>,
1268}
1269
1270impl CachedStatisticsOracle {
1271    pub async fn new<T: TimelyTimestamp>(
1272        ids: &BTreeSet<GlobalId>,
1273        as_of: &Antichain<T>,
1274        storage_collections: &dyn mz_storage_client::storage_collections::StorageCollections<Timestamp = T>,
1275    ) -> Result<Self, StorageError<T>> {
1276        let mut cache = BTreeMap::new();
1277
1278        for id in ids {
1279            let stats = storage_collections.snapshot_stats(*id, as_of.clone()).await;
1280
1281            match stats {
1282                Ok(stats) => {
1283                    cache.insert(*id, stats.num_updates);
1284                }
1285                Err(StorageError::IdentifierMissing(id)) => {
1286                    ::tracing::debug!("no statistics for {id}")
1287                }
1288                Err(e) => return Err(e),
1289            }
1290        }
1291
1292        Ok(Self { cache })
1293    }
1294}
1295
1296impl StatisticsOracle for CachedStatisticsOracle {
1297    fn cardinality_estimate(&self, id: GlobalId) -> Option<usize> {
1298        self.cache.get(&id).map(|estimate| *estimate)
1299    }
1300
1301    fn as_map(&self) -> BTreeMap<GlobalId, usize> {
1302        self.cache.clone()
1303    }
1304}