Skip to main content

mz_adapter/coord/
sequencer.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10// Prevents anyone from accidentally exporting a method from the `inner` module.
11#![allow(clippy::pub_use)]
12
13//! Logic for executing a planned SQL query.
14
15use std::collections::{BTreeMap, BTreeSet};
16use std::str::FromStr;
17use std::sync::Arc;
18
19use futures::FutureExt;
20use futures::future::LocalBoxFuture;
21use futures::stream::FuturesOrdered;
22use http::Uri;
23use inner::return_if_err;
24use maplit::btreemap;
25use mz_catalog::memory::objects::Cluster;
26use mz_controller_types::ReplicaId;
27use mz_expr::row::RowCollection;
28use mz_expr::{MapFilterProject, MirRelationExpr, ResultSpec, RowSetFinishing};
29use mz_ore::cast::CastFrom;
30use mz_ore::tracing::OpenTelemetryContext;
31use mz_persist_client::stats::SnapshotPartStats;
32use mz_repr::explain::{ExprHumanizerExt, TransientItem};
33use mz_repr::{CatalogItemId, Datum, Diff, GlobalId, IntoRowIterator, Row, RowArena, Timestamp};
34use mz_sql::catalog::{CatalogError, SessionCatalog};
35use mz_sql::names::ResolvedIds;
36use mz_sql::plan::{
37    self, AbortTransactionPlan, CommitTransactionPlan, CopyFromSource, CreateRolePlan,
38    CreateSourcePlanBundle, FetchPlan, HirScalarExpr, MutationKind, Params, Plan, PlanKind,
39    RaisePlan,
40};
41use mz_sql::rbac;
42use mz_sql::session::metadata::SessionMetadata;
43use mz_sql::session::vars;
44use mz_sql::session::vars::SessionVars;
45use mz_sql_parser::ast::{Raw, Statement};
46use mz_storage_client::client::TableData;
47use mz_storage_client::storage_collections::StorageCollections;
48use mz_storage_types::connections::inline::IntoInlineConnection;
49use mz_storage_types::controller::StorageError;
50use mz_storage_types::stats::RelationPartStats;
51use mz_transform::dataflow::DataflowMetainfo;
52use mz_transform::notice::{OptimizerNoticeApi, OptimizerNoticeKind, RawOptimizerNotice};
53use mz_transform::{EmptyStatisticsOracle, StatisticsOracle};
54use timely::progress::Antichain;
55use tokio::sync::oneshot;
56use tracing::{Instrument, Level, Span, event, warn};
57
58use crate::ExecuteContext;
59use crate::catalog::{Catalog, CatalogState};
60use crate::command::{Command, ExecuteResponse, Response};
61use crate::coord::appends::{DeferredOp, DeferredPlan};
62use crate::coord::validity::PlanValidity;
63use crate::coord::{
64    Coordinator, DeferredPlanStatement, ExplainPlanContext, Message, PlanStatement, TargetCluster,
65    catalog_serving,
66};
67use crate::error::AdapterError;
68use crate::explain::insights::PlanInsightsContext;
69use crate::notice::AdapterNotice;
70use crate::optimize::dataflows::{EvalTime, ExprPrep, ExprPrepOneShot};
71use crate::optimize::peek;
72use crate::session::{
73    EndTransactionAction, Session, StateRevision, TransactionOps, TransactionStatus, WriteOp,
74};
75use crate::util::ClientTransmitter;
76
77// DO NOT make this visible in any way, i.e. do not add any version of
78// `pub` to this mod. The inner `sequence_X` methods are hidden in this
79// private module to prevent anyone from calling them directly. All
80// sequencing should be done through the `sequence_plan` method.
81// This allows us to add catch-all logic that should be applied to all
82// plans in `sequence_plan` and guarantee that no caller can circumvent
83// that logic.
84//
85// The exceptions are:
86//
87// - Creating a role during connection startup. In this scenario, the session has not been properly
88// initialized and we need to skip directly to creating role. We have a specific method,
89// `sequence_create_role_for_startup` for this purpose.
90// - Methods that continue the execution of some plan that was being run asynchronously, such as
91// `sequence_peek_stage` and `sequence_create_connection_stage_finish`.
92// - The frontend peek sequencing temporarily reaches into this module for things that are needed
93//   by both the old and new peek sequencing. TODO(peek-seq): We plan to eliminate this with a
94//   big refactoring after the old peek sequencing is removed.
95
96mod inner;
97
98impl Coordinator {
99    /// BOXED FUTURE: As of Nov 2023 the returned Future from this function was 34KB. This would
100    /// get stored on the stack which is bad for runtime performance, and blow up our stack usage.
101    /// Because of that we purposefully move this Future onto the heap (i.e. Box it).
102    pub(crate) fn sequence_plan(
103        &mut self,
104        mut ctx: ExecuteContext,
105        plan: Plan,
106        resolved_ids: ResolvedIds,
107    ) -> LocalBoxFuture<'_, ()> {
108        async move {
109            let responses = ExecuteResponse::generated_from(&PlanKind::from(&plan));
110            ctx.tx_mut().set_allowed(responses);
111
112            if self.controller.read_only() && !plan.allowed_in_read_only() {
113                ctx.retire(Err(AdapterError::ReadOnly));
114                return;
115            }
116
117            // Check if we're still waiting for any of the builtin table appends from when we
118            // started the Session to complete.
119            if let Some((dependencies, wait_future)) =
120                super::appends::waiting_on_startup_appends(self.catalog(), ctx.session_mut(), &plan)
121            {
122                let conn_id = ctx.session().conn_id();
123                tracing::debug!(%conn_id, "deferring plan for startup appends");
124
125                let role_metadata = ctx.session().role_metadata().clone();
126                let validity = PlanValidity::new(
127                    self.catalog.transient_revision(),
128                    dependencies,
129                    None,
130                    None,
131                    role_metadata,
132                );
133                let deferred_plan = DeferredPlan {
134                    ctx,
135                    plan,
136                    validity,
137                    requires_locks: BTreeSet::default(),
138                };
139                // Defer op accepts an optional write lock, but there aren't any writes occurring
140                // here, since the map to `None`.
141                let acquire_future = wait_future.map(|()| None);
142
143                self.defer_op(acquire_future, DeferredOp::Plan(deferred_plan));
144
145                // Return early because our op is deferred on waiting for the builtin writes to
146                // complete.
147                return;
148            };
149
150            // Scope the borrow of the Catalog because we need to mutate the Coordinator state below.
151            let target_cluster = match ctx.session().transaction().cluster() {
152                // Use the current transaction's cluster.
153                Some(cluster_id) => TargetCluster::Transaction(cluster_id),
154                // If there isn't a current cluster set for a transaction, then try to auto route.
155                None => {
156                    let session_catalog = self.catalog.for_session(ctx.session());
157                    catalog_serving::auto_run_on_catalog_server(
158                        &session_catalog,
159                        ctx.session(),
160                        &plan,
161                    )
162                }
163            };
164            let (target_cluster_id, target_cluster_name) = match self
165                .catalog()
166                .resolve_target_cluster(target_cluster, ctx.session())
167            {
168                Ok(cluster) => (Some(cluster.id), Some(cluster.name.clone())),
169                Err(_) => (None, None),
170            };
171
172            if let (Some(cluster_id), Some(statement_id)) =
173                (target_cluster_id, ctx.extra().contents())
174            {
175                self.set_statement_execution_cluster(statement_id, cluster_id);
176            }
177
178            let session_catalog = self.catalog.for_session(ctx.session());
179
180            if let Some(cluster_name) = &target_cluster_name {
181                if let Err(e) = catalog_serving::check_cluster_restrictions(
182                    cluster_name,
183                    &session_catalog,
184                    &plan,
185                ) {
186                    return ctx.retire(Err(e));
187                }
188            }
189
190            if let Err(e) = rbac::check_plan(
191                &session_catalog,
192                Some(|id| {
193                    // We use linear search through active connections if needed, which is fine
194                    // because the RBAC check will call the closure at most once.
195                    self.active_conns()
196                        .into_iter()
197                        .find(|(conn_id, _)| conn_id.unhandled() == id)
198                        .map(|(_, conn_meta)| *conn_meta.authenticated_role_id())
199                }),
200                ctx.session(),
201                &plan,
202                target_cluster_id,
203                &resolved_ids,
204            ) {
205                return ctx.retire(Err(e.into()));
206            }
207
208            match plan {
209                Plan::CreateSource(plan) => {
210                    let (item_id, global_id) = return_if_err!(self.allocate_user_id().await, ctx);
211                    let result = self
212                        .sequence_create_source(
213                            &mut ctx,
214                            vec![CreateSourcePlanBundle {
215                                item_id,
216                                global_id,
217                                plan,
218                                resolved_ids,
219                                available_source_references: None,
220                            }],
221                        )
222                        .await;
223                    ctx.retire(result);
224                }
225                Plan::CreateSources(plans) => {
226                    assert!(
227                        resolved_ids.is_empty(),
228                        "each plan has separate resolved_ids"
229                    );
230                    let result = self.sequence_create_source(&mut ctx, plans).await;
231                    ctx.retire(result);
232                }
233                Plan::CreateConnection(plan) => {
234                    self.sequence_create_connection(ctx, plan, resolved_ids)
235                        .await;
236                }
237                Plan::CreateDatabase(plan) => {
238                    let result = self.sequence_create_database(ctx.session_mut(), plan).await;
239                    ctx.retire(result);
240                }
241                Plan::CreateSchema(plan) => {
242                    let result = self.sequence_create_schema(ctx.session_mut(), plan).await;
243                    ctx.retire(result);
244                }
245                Plan::CreateRole(plan) => {
246                    let result = self
247                        .sequence_create_role(Some(ctx.session().conn_id()), plan)
248                        .await;
249                    if let Some(notice) = self.should_emit_rbac_notice(ctx.session()) {
250                        ctx.session().add_notice(notice);
251                    }
252                    ctx.retire(result);
253                }
254                Plan::CreateCluster(plan) => {
255                    let result = self.sequence_create_cluster(ctx.session(), plan).await;
256                    ctx.retire(result);
257                }
258                Plan::CreateClusterReplica(plan) => {
259                    let result = self
260                        .sequence_create_cluster_replica(ctx.session(), plan)
261                        .await;
262                    ctx.retire(result);
263                }
264                Plan::CreateTable(plan) => {
265                    let result = self
266                        .sequence_create_table(&mut ctx, plan, resolved_ids)
267                        .await;
268                    ctx.retire(result);
269                }
270                Plan::CreateSecret(plan) => {
271                    self.sequence_create_secret(ctx, plan).await;
272                }
273                Plan::CreateSink(plan) => {
274                    self.sequence_create_sink(ctx, plan, resolved_ids).await;
275                }
276                Plan::CreateView(plan) => {
277                    self.sequence_create_view(ctx, plan, resolved_ids).await;
278                }
279                Plan::CreateMaterializedView(plan) => {
280                    self.sequence_create_materialized_view(ctx, plan, resolved_ids)
281                        .await;
282                }
283                Plan::CreateContinualTask(plan) => {
284                    let res = self
285                        .sequence_create_continual_task(&mut ctx, plan, resolved_ids)
286                        .await;
287                    ctx.retire(res);
288                }
289                Plan::CreateIndex(plan) => {
290                    self.sequence_create_index(ctx, plan, resolved_ids).await;
291                }
292                Plan::CreateType(plan) => {
293                    let result = self
294                        .sequence_create_type(ctx.session(), plan, resolved_ids)
295                        .await;
296                    ctx.retire(result);
297                }
298                Plan::CreateNetworkPolicy(plan) => {
299                    let res = self
300                        .sequence_create_network_policy(ctx.session(), plan)
301                        .await;
302                    ctx.retire(res);
303                }
304                Plan::Comment(plan) => {
305                    let result = self.sequence_comment_on(ctx.session(), plan).await;
306                    ctx.retire(result);
307                }
308                Plan::CopyTo(plan) => {
309                    self.sequence_copy_to(ctx, plan, target_cluster).await;
310                }
311                Plan::DropObjects(plan) => {
312                    let result = self.sequence_drop_objects(&mut ctx, plan).await;
313                    ctx.retire(result);
314                }
315                Plan::DropOwned(plan) => {
316                    let result = self.sequence_drop_owned(ctx.session_mut(), plan).await;
317                    ctx.retire(result);
318                }
319                Plan::EmptyQuery => {
320                    ctx.retire(Ok(ExecuteResponse::EmptyQuery));
321                }
322                Plan::ShowAllVariables => {
323                    let result = self.sequence_show_all_variables(ctx.session());
324                    ctx.retire(result);
325                }
326                Plan::ShowVariable(plan) => {
327                    let result = self.sequence_show_variable(ctx.session(), plan);
328                    ctx.retire(result);
329                }
330                Plan::InspectShard(plan) => {
331                    // TODO: Ideally, this await would happen off the main thread.
332                    let result = self.sequence_inspect_shard(ctx.session(), plan).await;
333                    ctx.retire(result);
334                }
335                Plan::SetVariable(plan) => {
336                    let result = self.sequence_set_variable(ctx.session_mut(), plan);
337                    ctx.retire(result);
338                }
339                Plan::ResetVariable(plan) => {
340                    let result = self.sequence_reset_variable(ctx.session_mut(), plan);
341                    ctx.retire(result);
342                }
343                Plan::SetTransaction(plan) => {
344                    let result = self.sequence_set_transaction(ctx.session_mut(), plan);
345                    ctx.retire(result);
346                }
347                Plan::StartTransaction(plan) => {
348                    if matches!(
349                        ctx.session().transaction(),
350                        TransactionStatus::InTransaction(_)
351                    ) {
352                        ctx.session()
353                            .add_notice(AdapterNotice::ExistingTransactionInProgress);
354                    }
355                    let result = ctx.session_mut().start_transaction(
356                        self.now_datetime(),
357                        plan.access,
358                        plan.isolation_level,
359                    );
360                    ctx.retire(result.map(|_| ExecuteResponse::StartedTransaction))
361                }
362                Plan::CommitTransaction(CommitTransactionPlan {
363                    ref transaction_type,
364                })
365                | Plan::AbortTransaction(AbortTransactionPlan {
366                    ref transaction_type,
367                }) => {
368                    // Serialize DDL transactions. Statements that use this mode must return false
369                    // in `must_serialize_ddl()`.
370                    if ctx.session().transaction().is_ddl() {
371                        if let Ok(guard) = self.serialized_ddl.try_lock_owned() {
372                            let prev = self
373                                .active_conns
374                                .get_mut(ctx.session().conn_id())
375                                .expect("connection must exist")
376                                .deferred_lock
377                                .replace(guard);
378                            assert!(
379                                prev.is_none(),
380                                "connections should have at most one lock guard"
381                            );
382                        } else {
383                            self.serialized_ddl.push_back(DeferredPlanStatement {
384                                ctx,
385                                ps: PlanStatement::Plan { plan, resolved_ids },
386                            });
387                            return;
388                        }
389                    }
390
391                    let action = match &plan {
392                        Plan::CommitTransaction(_) => EndTransactionAction::Commit,
393                        Plan::AbortTransaction(_) => EndTransactionAction::Rollback,
394                        _ => unreachable!(),
395                    };
396                    if ctx.session().transaction().is_implicit() && !transaction_type.is_implicit()
397                    {
398                        // In Postgres, if a user sends a COMMIT or ROLLBACK in an
399                        // implicit transaction, a warning is sent warning them.
400                        // (The transaction is still closed and a new implicit
401                        // transaction started, though.)
402                        ctx.session().add_notice(
403                            AdapterNotice::ExplicitTransactionControlInImplicitTransaction,
404                        );
405                    }
406                    self.sequence_end_transaction(ctx, action).await;
407                }
408                Plan::Select(plan) => {
409                    let max = Some(ctx.session().vars().max_query_result_size());
410                    self.sequence_peek(ctx, plan, target_cluster, max).await;
411                }
412                Plan::Subscribe(plan) => {
413                    self.sequence_subscribe(ctx, plan, target_cluster).await;
414                }
415                Plan::SideEffectingFunc(plan) => {
416                    self.sequence_side_effecting_func(ctx, plan).await;
417                }
418                Plan::ShowCreate(plan) => {
419                    ctx.retire(Ok(Self::send_immediate_rows(plan.row)));
420                }
421                Plan::ShowColumns(show_columns_plan) => {
422                    let max = Some(ctx.session().vars().max_query_result_size());
423                    self.sequence_peek(ctx, show_columns_plan.select_plan, target_cluster, max)
424                        .await;
425                }
426                Plan::CopyFrom(plan) => match plan.source {
427                    CopyFromSource::Stdin => {
428                        let (tx, _, session, ctx_extra) = ctx.into_parts();
429                        tx.send(
430                            Ok(ExecuteResponse::CopyFrom {
431                                target_id: plan.target_id,
432                                target_name: plan.target_name,
433                                columns: plan.columns,
434                                params: plan.params,
435                                ctx_extra,
436                            }),
437                            session,
438                        );
439                    }
440                    CopyFromSource::Url(_) | CopyFromSource::AwsS3 { .. } => {
441                        self.sequence_copy_from(ctx, plan, target_cluster).await;
442                    }
443                },
444                Plan::ExplainPlan(plan) => {
445                    self.sequence_explain_plan(ctx, plan, target_cluster).await;
446                }
447                Plan::ExplainPushdown(plan) => {
448                    self.sequence_explain_pushdown(ctx, plan, target_cluster)
449                        .await;
450                }
451                Plan::ExplainSinkSchema(plan) => {
452                    let result = self.sequence_explain_schema(plan);
453                    ctx.retire(result);
454                }
455                Plan::ExplainTimestamp(plan) => {
456                    self.sequence_explain_timestamp(ctx, plan, target_cluster)
457                        .await;
458                }
459                Plan::Insert(plan) => {
460                    self.sequence_insert(ctx, plan).await;
461                }
462                Plan::ReadThenWrite(plan) => {
463                    self.sequence_read_then_write(ctx, plan).await;
464                }
465                Plan::AlterNoop(plan) => {
466                    ctx.retire(Ok(ExecuteResponse::AlteredObject(plan.object_type)));
467                }
468                Plan::AlterCluster(plan) => {
469                    self.sequence_alter_cluster_staged(ctx, plan).await;
470                }
471                Plan::AlterClusterRename(plan) => {
472                    let result = self.sequence_alter_cluster_rename(&mut ctx, plan).await;
473                    ctx.retire(result);
474                }
475                Plan::AlterClusterSwap(plan) => {
476                    let result = self.sequence_alter_cluster_swap(&mut ctx, plan).await;
477                    ctx.retire(result);
478                }
479                Plan::AlterClusterReplicaRename(plan) => {
480                    let result = self
481                        .sequence_alter_cluster_replica_rename(ctx.session(), plan)
482                        .await;
483                    ctx.retire(result);
484                }
485                Plan::AlterConnection(plan) => {
486                    self.sequence_alter_connection(ctx, plan).await;
487                }
488                Plan::AlterSetCluster(plan) => {
489                    let result = self.sequence_alter_set_cluster(ctx.session(), plan).await;
490                    ctx.retire(result);
491                }
492                Plan::AlterRetainHistory(plan) => {
493                    let result = self.sequence_alter_retain_history(&mut ctx, plan).await;
494                    ctx.retire(result);
495                }
496                Plan::AlterSourceTimestampInterval(plan) => {
497                    let result = self
498                        .sequence_alter_source_timestamp_interval(&mut ctx, plan)
499                        .await;
500                    ctx.retire(result);
501                }
502                Plan::AlterItemRename(plan) => {
503                    let result = self.sequence_alter_item_rename(&mut ctx, plan).await;
504                    ctx.retire(result);
505                }
506                Plan::AlterSchemaRename(plan) => {
507                    let result = self.sequence_alter_schema_rename(&mut ctx, plan).await;
508                    ctx.retire(result);
509                }
510                Plan::AlterSchemaSwap(plan) => {
511                    let result = self.sequence_alter_schema_swap(&mut ctx, plan).await;
512                    ctx.retire(result);
513                }
514                Plan::AlterRole(plan) => {
515                    let result = self.sequence_alter_role(ctx.session_mut(), plan).await;
516                    ctx.retire(result);
517                }
518                Plan::AlterSecret(plan) => {
519                    self.sequence_alter_secret(ctx, plan).await;
520                }
521                Plan::AlterSink(plan) => {
522                    self.sequence_alter_sink_prepare(ctx, plan).await;
523                }
524                Plan::AlterSource(plan) => {
525                    let result = self.sequence_alter_source(ctx.session_mut(), plan).await;
526                    ctx.retire(result);
527                }
528                Plan::AlterSystemSet(plan) => {
529                    let result = self.sequence_alter_system_set(ctx.session(), plan).await;
530                    ctx.retire(result);
531                }
532                Plan::AlterSystemReset(plan) => {
533                    let result = self.sequence_alter_system_reset(ctx.session(), plan).await;
534                    ctx.retire(result);
535                }
536                Plan::AlterSystemResetAll(plan) => {
537                    let result = self
538                        .sequence_alter_system_reset_all(ctx.session(), plan)
539                        .await;
540                    ctx.retire(result);
541                }
542                Plan::AlterTableAddColumn(plan) => {
543                    let result = self.sequence_alter_table(&mut ctx, plan).await;
544                    ctx.retire(result);
545                }
546                Plan::AlterMaterializedViewApplyReplacement(plan) => {
547                    self.sequence_alter_materialized_view_apply_replacement_prepare(ctx, plan)
548                        .await;
549                }
550                Plan::AlterNetworkPolicy(plan) => {
551                    let res = self
552                        .sequence_alter_network_policy(ctx.session(), plan)
553                        .await;
554                    ctx.retire(res);
555                }
556                Plan::DiscardTemp => {
557                    self.drop_temp_items(ctx.session().conn_id()).await;
558                    ctx.retire(Ok(ExecuteResponse::DiscardedTemp));
559                }
560                Plan::DiscardAll => {
561                    let ret = if let TransactionStatus::Started(_) = ctx.session().transaction() {
562                        self.clear_transaction(ctx.session_mut()).await;
563                        self.drop_temp_items(ctx.session().conn_id()).await;
564                        ctx.session_mut().reset();
565                        Ok(ExecuteResponse::DiscardedAll)
566                    } else {
567                        Err(AdapterError::OperationProhibitsTransaction(
568                            "DISCARD ALL".into(),
569                        ))
570                    };
571                    ctx.retire(ret);
572                }
573                Plan::Declare(plan) => {
574                    self.declare(ctx, plan.name, plan.stmt, plan.sql, plan.params);
575                }
576                Plan::Fetch(FetchPlan {
577                    name,
578                    count,
579                    timeout,
580                }) => {
581                    let ctx_extra = std::mem::take(ctx.extra_mut());
582                    ctx.retire(Ok(ExecuteResponse::Fetch {
583                        name,
584                        count,
585                        timeout,
586                        ctx_extra,
587                    }));
588                }
589                Plan::Close(plan) => {
590                    if ctx.session_mut().remove_portal(&plan.name) {
591                        ctx.retire(Ok(ExecuteResponse::ClosedCursor));
592                    } else {
593                        ctx.retire(Err(AdapterError::UnknownCursor(plan.name)));
594                    }
595                }
596                Plan::Prepare(plan) => {
597                    if ctx
598                        .session()
599                        .get_prepared_statement_unverified(&plan.name)
600                        .is_some()
601                    {
602                        ctx.retire(Err(AdapterError::PreparedStatementExists(plan.name)));
603                    } else {
604                        let state_revision = StateRevision {
605                            catalog_revision: self.catalog().transient_revision(),
606                            session_state_revision: ctx.session().state_revision(),
607                        };
608                        ctx.session_mut().set_prepared_statement(
609                            plan.name,
610                            Some(plan.stmt),
611                            plan.sql,
612                            plan.desc,
613                            state_revision,
614                            self.now(),
615                        );
616                        ctx.retire(Ok(ExecuteResponse::Prepare));
617                    }
618                }
619                Plan::Execute(plan) => {
620                    match self.sequence_execute(ctx.session_mut(), plan) {
621                        Ok(portal_name) => {
622                            let (tx, _, session, extra) = ctx.into_parts();
623                            self.internal_cmd_tx
624                                .send(Message::Command(
625                                    OpenTelemetryContext::obtain(),
626                                    Command::Execute {
627                                        portal_name,
628                                        session,
629                                        tx: tx.take(),
630                                        outer_ctx_extra: Some(extra),
631                                    },
632                                ))
633                                .expect("sending to self.internal_cmd_tx cannot fail");
634                        }
635                        Err(err) => ctx.retire(Err(err)),
636                    };
637                }
638                Plan::Deallocate(plan) => match plan.name {
639                    Some(name) => {
640                        if ctx.session_mut().remove_prepared_statement(&name) {
641                            ctx.retire(Ok(ExecuteResponse::Deallocate { all: false }));
642                        } else {
643                            ctx.retire(Err(AdapterError::UnknownPreparedStatement(name)));
644                        }
645                    }
646                    None => {
647                        ctx.session_mut().remove_all_prepared_statements();
648                        ctx.retire(Ok(ExecuteResponse::Deallocate { all: true }));
649                    }
650                },
651                Plan::Raise(RaisePlan { severity }) => {
652                    ctx.session()
653                        .add_notice(AdapterNotice::UserRequested { severity });
654                    ctx.retire(Ok(ExecuteResponse::Raised));
655                }
656                Plan::GrantPrivileges(plan) => {
657                    let result = self
658                        .sequence_grant_privileges(ctx.session_mut(), plan)
659                        .await;
660                    ctx.retire(result);
661                }
662                Plan::RevokePrivileges(plan) => {
663                    let result = self
664                        .sequence_revoke_privileges(ctx.session_mut(), plan)
665                        .await;
666                    ctx.retire(result);
667                }
668                Plan::AlterDefaultPrivileges(plan) => {
669                    let result = self
670                        .sequence_alter_default_privileges(ctx.session_mut(), plan)
671                        .await;
672                    ctx.retire(result);
673                }
674                Plan::GrantRole(plan) => {
675                    let result = self.sequence_grant_role(ctx.session_mut(), plan).await;
676                    ctx.retire(result);
677                }
678                Plan::RevokeRole(plan) => {
679                    let result = self.sequence_revoke_role(ctx.session_mut(), plan).await;
680                    ctx.retire(result);
681                }
682                Plan::AlterOwner(plan) => {
683                    let result = self.sequence_alter_owner(ctx.session_mut(), plan).await;
684                    ctx.retire(result);
685                }
686                Plan::ReassignOwned(plan) => {
687                    let result = self.sequence_reassign_owned(ctx.session_mut(), plan).await;
688                    ctx.retire(result);
689                }
690                Plan::ValidateConnection(plan) => {
691                    let connection = plan
692                        .connection
693                        .into_inline_connection(self.catalog().state());
694                    let current_storage_configuration = self.controller.storage.config().clone();
695                    mz_ore::task::spawn(|| "coord::validate_connection", async move {
696                        let res = match connection
697                            .validate(plan.id, &current_storage_configuration)
698                            .await
699                        {
700                            Ok(()) => Ok(ExecuteResponse::ValidatedConnection),
701                            Err(err) => Err(err.into()),
702                        };
703                        ctx.retire(res);
704                    });
705                }
706            }
707        }
708        .instrument(tracing::debug_span!("coord::sequencer::sequence_plan"))
709        .boxed_local()
710    }
711
712    #[mz_ore::instrument(level = "debug")]
713    pub(crate) async fn sequence_execute_single_statement_transaction(
714        &mut self,
715        ctx: ExecuteContext,
716        stmt: Arc<Statement<Raw>>,
717        params: Params,
718    ) {
719        // Put the session into single statement implicit so anything can execute.
720        let (tx, internal_cmd_tx, mut session, extra) = ctx.into_parts();
721        assert!(matches!(session.transaction(), TransactionStatus::Default));
722        session.start_transaction_single_stmt(self.now_datetime());
723        let conn_id = session.conn_id().unhandled();
724
725        // Execute the saved statement in a temp transmitter so we can run COMMIT.
726        let (sub_tx, sub_rx) = oneshot::channel();
727        let sub_ct = ClientTransmitter::new(sub_tx, self.internal_cmd_tx.clone());
728        let sub_ctx = ExecuteContext::from_parts(sub_ct, internal_cmd_tx, session, extra);
729        self.handle_execute_inner(stmt, params, sub_ctx).await;
730
731        // The response can need off-thread processing. Wait for it elsewhere so the coordinator can
732        // continue processing.
733        let internal_cmd_tx = self.internal_cmd_tx.clone();
734        mz_ore::task::spawn(
735            || format!("execute_single_statement:{conn_id}"),
736            async move {
737                let Ok(Response {
738                    result,
739                    session,
740                    otel_ctx,
741                }) = sub_rx.await
742                else {
743                    // Coordinator went away.
744                    return;
745                };
746                otel_ctx.attach_as_parent();
747                let (sub_tx, sub_rx) = oneshot::channel();
748                let _ = internal_cmd_tx.send(Message::Command(
749                    otel_ctx,
750                    Command::Commit {
751                        action: EndTransactionAction::Commit,
752                        session,
753                        tx: sub_tx,
754                    },
755                ));
756                let Ok(commit_response) = sub_rx.await else {
757                    // Coordinator went away.
758                    return;
759                };
760                assert!(matches!(
761                    commit_response.session.transaction(),
762                    TransactionStatus::Default
763                ));
764                // The fake, generated response was already sent to the user and we don't need to
765                // ever send an `Ok(result)` to the user, because they are expecting a response from
766                // a `COMMIT`. So, always send the `COMMIT`'s result if the original statement
767                // succeeded. If it failed, we can send an error and don't need to wrap it or send a
768                // later COMMIT or ROLLBACK.
769                let result = match (result, commit_response.result) {
770                    (Ok(_), commit) => commit,
771                    (Err(result), _) => Err(result),
772                };
773                // We ignore the resp.result because it's not clear what to do if it failed since we
774                // can only send a single ExecuteResponse to tx.
775                tx.send(result, commit_response.session);
776            }
777            .instrument(Span::current()),
778        );
779    }
780
781    /// Creates a role during connection startup.
782    ///
783    /// This should not be called from anywhere except connection startup.
784    #[mz_ore::instrument(level = "debug")]
785    pub(crate) async fn sequence_create_role_for_startup(
786        &mut self,
787        plan: CreateRolePlan,
788    ) -> Result<ExecuteResponse, AdapterError> {
789        // This does not set conn_id because it's not yet in active_conns. That is because we can't
790        // make a ConnMeta until we have a role id which we don't have until after the catalog txn
791        // is committed. Passing None here means the audit log won't have a user set in the event's
792        // user field. This seems fine because it is indeed the system that is creating this role,
793        // not a user request, and the user name is still recorded in the plan, so we aren't losing
794        // information.
795        self.sequence_create_role(None, plan).await
796    }
797
798    pub(crate) fn allocate_transient_id(&self) -> (CatalogItemId, GlobalId) {
799        self.transient_id_gen.allocate_id()
800    }
801
802    fn should_emit_rbac_notice(&self, session: &Session) -> Option<AdapterNotice> {
803        if !rbac::is_rbac_enabled_for_session(self.catalog.system_config(), session) {
804            Some(AdapterNotice::RbacUserDisabled)
805        } else {
806            None
807        }
808    }
809
810    /// Inserts the rows from `constants` into the table identified by `target_id`.
811    ///
812    /// # Panics
813    ///
814    /// Panics if `target_id` doesn't refer to a table.
815    /// Panics if `constants` is not an `MirRelationExpr::Constant`.
816    pub(crate) fn insert_constant(
817        catalog: &Catalog,
818        session: &mut Session,
819        target_id: CatalogItemId,
820        constants: MirRelationExpr,
821    ) -> Result<ExecuteResponse, AdapterError> {
822        // Insert can be queued, so we need to re-verify the id exists.
823        let desc = match catalog.try_get_entry(&target_id) {
824            Some(table) => {
825                // Inserts always happen at the latest version of a table.
826                table.relation_desc_latest().expect("table has desc")
827            }
828            None => {
829                return Err(AdapterError::Catalog(mz_catalog::memory::error::Error {
830                    kind: mz_catalog::memory::error::ErrorKind::Sql(CatalogError::UnknownItem(
831                        target_id.to_string(),
832                    )),
833                }));
834            }
835        };
836
837        match constants.as_const() {
838            Some((rows, ..)) => {
839                let rows = rows.clone()?;
840                for (row, _) in &rows {
841                    for (i, datum) in row.iter().enumerate() {
842                        desc.constraints_met(i, &datum)?;
843                    }
844                }
845                let diffs_plan = plan::SendDiffsPlan {
846                    id: target_id,
847                    updates: rows,
848                    kind: MutationKind::Insert,
849                    returning: Vec::new(),
850                    max_result_size: catalog.system_config().max_result_size(),
851                };
852                Self::send_diffs(session, diffs_plan)
853            }
854            None => panic!(
855                "tried using sequence_insert_constant on non-constant MirRelationExpr\n{}",
856                constants.pretty(),
857            ),
858        }
859    }
860
861    #[mz_ore::instrument(level = "debug")]
862    pub(crate) fn send_diffs(
863        session: &mut Session,
864        mut plan: plan::SendDiffsPlan,
865    ) -> Result<ExecuteResponse, AdapterError> {
866        let affected_rows = {
867            let mut affected_rows = Diff::from(0);
868            let mut all_positive_diffs = true;
869            // If all diffs are positive, the number of affected rows is just the
870            // sum of all unconsolidated diffs.
871            for (_, diff) in plan.updates.iter() {
872                if diff.is_negative() {
873                    all_positive_diffs = false;
874                    break;
875                }
876
877                affected_rows += diff;
878            }
879
880            if !all_positive_diffs {
881                // Consolidate rows. This is useful e.g. for an UPDATE where the row
882                // doesn't change, and we need to reflect that in the number of
883                // affected rows.
884                differential_dataflow::consolidation::consolidate(&mut plan.updates);
885
886                affected_rows = Diff::ZERO;
887                // With retractions, the number of affected rows is not the number
888                // of rows we see, but the sum of the absolute value of their diffs,
889                // e.g. if one row is retracted and another is added, the total
890                // number of rows affected is 2.
891                for (_, diff) in plan.updates.iter() {
892                    affected_rows += diff.abs();
893                }
894            }
895
896            usize::try_from(affected_rows.into_inner()).expect("positive Diff must fit")
897        };
898        event!(
899            Level::TRACE,
900            affected_rows,
901            id = format!("{:?}", plan.id),
902            kind = format!("{:?}", plan.kind),
903            updates = plan.updates.len(),
904            returning = plan.returning.len(),
905        );
906
907        session.add_transaction_ops(TransactionOps::Writes(vec![WriteOp {
908            id: plan.id,
909            rows: TableData::Rows(plan.updates),
910        }]))?;
911        if !plan.returning.is_empty() {
912            let finishing = RowSetFinishing {
913                order_by: Vec::new(),
914                limit: None,
915                offset: 0,
916                project: (0..plan.returning[0].0.iter().count()).collect(),
917            };
918            let max_returned_query_size = session.vars().max_query_result_size();
919            let duration_histogram = session.metrics().row_set_finishing_seconds();
920
921            return match finishing.finish(
922                RowCollection::new(plan.returning, &finishing.order_by),
923                plan.max_result_size,
924                Some(max_returned_query_size),
925                duration_histogram,
926            ) {
927                Ok((rows, _size_bytes)) => Ok(Self::send_immediate_rows(rows)),
928                Err(e) => Err(AdapterError::ResultSize(e)),
929            };
930        }
931        Ok(match plan.kind {
932            MutationKind::Delete => ExecuteResponse::Deleted(affected_rows),
933            MutationKind::Insert => ExecuteResponse::Inserted(affected_rows),
934            MutationKind::Update => ExecuteResponse::Updated(affected_rows / 2),
935        })
936    }
937}
938
939/// Checks whether we should emit diagnostic
940/// information associated with reading per-replica sources.
941///
942/// If an unrecoverable error is found (today: an untargeted read on a
943/// cluster with a non-1 number of replicas), return that.  Otherwise,
944/// return a list of associated notices (today: we always emit exactly
945/// one notice if there are any per-replica log dependencies and if
946/// `emit_introspection_query_notice` is set, and none otherwise.)
947pub(crate) fn check_log_reads(
948    catalog: &Catalog,
949    cluster: &Cluster,
950    source_ids: &BTreeSet<GlobalId>,
951    target_replica: &mut Option<ReplicaId>,
952    vars: &SessionVars,
953) -> Result<impl IntoIterator<Item = AdapterNotice>, AdapterError>
954where
955{
956    let log_names = source_ids
957        .iter()
958        .map(|gid| catalog.resolve_item_id(gid))
959        .flat_map(|item_id| catalog.introspection_dependencies(item_id))
960        .map(|item_id| catalog.get_entry(&item_id).name().item.clone())
961        .collect::<Vec<_>>();
962
963    if log_names.is_empty() {
964        return Ok(None);
965    }
966
967    // Reading from log sources on replicated clusters is only allowed if a
968    // target replica is selected. Otherwise, we have no way of knowing which
969    // replica we read the introspection data from.
970    let num_replicas = cluster.replicas().count();
971    if target_replica.is_none() {
972        if num_replicas == 1 {
973            *target_replica = cluster.replicas().map(|r| r.replica_id).next();
974        } else {
975            return Err(AdapterError::UntargetedLogRead { log_names });
976        }
977    }
978
979    // Ensure that logging is initialized for the target replica, lest
980    // we try to read from a non-existing arrangement.
981    let replica_id = target_replica.expect("set to `Some` above");
982    let replica = &cluster.replica(replica_id).expect("Replica must exist");
983    if !replica.config.compute.logging.enabled() {
984        return Err(AdapterError::IntrospectionDisabled { log_names });
985    }
986
987    Ok(vars
988        .emit_introspection_query_notice()
989        .then_some(AdapterNotice::PerReplicaLogRead { log_names }))
990}
991
992/// Forward notices that we got from the optimizer.
993pub(crate) fn emit_optimizer_notices(
994    catalog: &Catalog,
995    session: &Session,
996    notices: &Vec<RawOptimizerNotice>,
997) {
998    // `for_session` below is expensive, so return early if there's nothing to do.
999    if notices.is_empty() {
1000        return;
1001    }
1002    let humanizer = catalog.for_session(session);
1003    let system_vars = catalog.system_config();
1004    for notice in notices {
1005        let kind = OptimizerNoticeKind::from(notice);
1006        let notice_enabled = match kind {
1007            OptimizerNoticeKind::EqualsNull => system_vars.enable_notices_for_equals_null(),
1008            OptimizerNoticeKind::IndexAlreadyExists => {
1009                system_vars.enable_notices_for_index_already_exists()
1010            }
1011            OptimizerNoticeKind::IndexTooWideForLiteralConstraints => {
1012                system_vars.enable_notices_for_index_too_wide_for_literal_constraints()
1013            }
1014            OptimizerNoticeKind::IndexKeyEmpty => system_vars.enable_notices_for_index_empty_key(),
1015        };
1016        if notice_enabled {
1017            // We don't need to redact the notice parts because
1018            // `emit_optimizer_notices` is only called by the `sequence_~`
1019            // method for the statement that produces that notice.
1020            session.add_notice(AdapterNotice::OptimizerNotice {
1021                notice: notice.message(&humanizer, false).to_string(),
1022                hint: notice.hint(&humanizer, false).to_string(),
1023            });
1024        }
1025        session
1026            .metrics()
1027            .optimization_notices(&[kind.metric_label()])
1028            .inc_by(1);
1029    }
1030}
1031
1032/// Evaluates a COPY TO target URI expression and validates it.
1033///
1034/// This function is shared between the old peek sequencing (sequence_copy_to)
1035/// and the new frontend peek sequencing to avoid code duplication.
1036pub fn eval_copy_to_uri(
1037    to: HirScalarExpr,
1038    session: &Session,
1039    catalog_state: &CatalogState,
1040) -> Result<Uri, AdapterError> {
1041    let style = ExprPrepOneShot {
1042        logical_time: EvalTime::NotAvailable,
1043        session,
1044        catalog_state,
1045    };
1046    let mut to = to.lower_uncorrelated(catalog_state.system_config())?;
1047    style.prep_scalar_expr(&mut to)?;
1048    let temp_storage = RowArena::new();
1049    let evaled = to.eval(&[], &temp_storage)?;
1050    if evaled == Datum::Null {
1051        coord_bail!("COPY TO target value can not be null");
1052    }
1053    let to_url = match Uri::from_str(evaled.unwrap_str()) {
1054        Ok(url) => {
1055            if url.scheme_str() != Some("s3") && url.scheme_str() != Some("gs") {
1056                coord_bail!("only 's3://...' and 'gs://...' urls are supported as COPY TO target");
1057            }
1058            url
1059        }
1060        Err(e) => coord_bail!("could not parse COPY TO target url: {}", e),
1061    };
1062    Ok(to_url)
1063}
1064
1065/// Returns a future that will execute EXPLAIN FILTER PUSHDOWN, i.e., compute the filter pushdown
1066/// statistics for the given collections with the given MFPs.
1067///
1068/// (Shared helper fn between the old and new sequencing. This doesn't take the Coordinator as a
1069/// parameter, but instead just the specifically necessary things are passed in, so that the
1070/// frontend peek sequencing can also call it.)
1071pub(crate) async fn explain_pushdown_future_inner<
1072    I: IntoIterator<Item = (GlobalId, MapFilterProject)>,
1073>(
1074    session: &Session,
1075    catalog: &Catalog,
1076    storage_collections: &Arc<dyn StorageCollections + Send + Sync>,
1077    as_of: Antichain<Timestamp>,
1078    mz_now: ResultSpec<'static>,
1079    imports: I,
1080) -> impl Future<Output = Result<ExecuteResponse, AdapterError>> + use<I> {
1081    let explain_timeout = *session.vars().statement_timeout();
1082    let mut futures = FuturesOrdered::new();
1083    for (id, mfp) in imports {
1084        let catalog_entry = catalog.get_entry_by_global_id(&id);
1085        let full_name = catalog
1086            .for_session(session)
1087            .resolve_full_name(&catalog_entry.name);
1088        let name = format!("{}", full_name);
1089        let relation_desc = catalog_entry
1090            .relation_desc()
1091            .expect("source should have a proper desc")
1092            .into_owned();
1093        let stats_future = storage_collections
1094            .snapshot_parts_stats(id, as_of.clone())
1095            .await;
1096
1097        let mz_now = mz_now.clone();
1098        // These futures may block if the source is not yet readable at the as-of;
1099        // stash them in `futures` and only block on them in a separate task.
1100        // TODO(peek-seq): This complication won't be needed once this function will only be called
1101        // from the new peek sequencing, in which case it will be fine to block the current task.
1102        futures.push_back(async move {
1103            let snapshot_stats = match stats_future.await {
1104                Ok(stats) => stats,
1105                Err(e) => return Err(e),
1106            };
1107            let mut total_bytes = 0;
1108            let mut total_parts = 0;
1109            let mut selected_bytes = 0;
1110            let mut selected_parts = 0;
1111            for SnapshotPartStats {
1112                encoded_size_bytes: bytes,
1113                stats,
1114            } in &snapshot_stats.parts
1115            {
1116                let bytes = u64::cast_from(*bytes);
1117                total_bytes += bytes;
1118                total_parts += 1u64;
1119                let selected = match stats {
1120                    None => true,
1121                    Some(stats) => {
1122                        let stats = stats.decode();
1123                        let stats = RelationPartStats::new(
1124                            name.as_str(),
1125                            &snapshot_stats.metrics.pushdown.part_stats,
1126                            &relation_desc,
1127                            &stats,
1128                        );
1129                        stats.may_match_mfp(mz_now.clone(), &mfp)
1130                    }
1131                };
1132
1133                if selected {
1134                    selected_bytes += bytes;
1135                    selected_parts += 1u64;
1136                }
1137            }
1138            Ok(Row::pack_slice(&[
1139                name.as_str().into(),
1140                total_bytes.into(),
1141                selected_bytes.into(),
1142                total_parts.into(),
1143                selected_parts.into(),
1144            ]))
1145        });
1146    }
1147
1148    let fut = async move {
1149        match tokio::time::timeout(
1150            explain_timeout,
1151            futures::TryStreamExt::try_collect::<Vec<_>>(futures),
1152        )
1153        .await
1154        {
1155            Ok(Ok(rows)) => Ok(ExecuteResponse::SendingRowsImmediate {
1156                rows: Box::new(rows.into_row_iter()),
1157            }),
1158            Ok(Err(err)) => Err(err.into()),
1159            Err(_) => Err(AdapterError::StatementTimeout),
1160        }
1161    };
1162    fut
1163}
1164
1165/// Generates EXPLAIN PLAN output.
1166/// (Shared helper fn between the old and new sequencing.)
1167pub(crate) async fn explain_plan_inner(
1168    session: &Session,
1169    catalog: &Catalog,
1170    df_meta: DataflowMetainfo,
1171    explain_ctx: ExplainPlanContext,
1172    optimizer: peek::Optimizer,
1173    insights_ctx: Option<Box<PlanInsightsContext>>,
1174) -> Result<Vec<Row>, AdapterError> {
1175    let ExplainPlanContext {
1176        config,
1177        format,
1178        stage,
1179        desc,
1180        optimizer_trace,
1181        ..
1182    } = explain_ctx;
1183
1184    let desc = desc.expect("RelationDesc for SelectPlan in EXPLAIN mode");
1185
1186    let session_catalog = catalog.for_session(session);
1187    let expr_humanizer = {
1188        let transient_items = btreemap! {
1189            optimizer.select_id() => TransientItem::new(
1190                Some(vec![GlobalId::Explain.to_string()]),
1191                Some(desc.iter_names().map(|c| c.to_string()).collect()),
1192            )
1193        };
1194        ExprHumanizerExt::new(transient_items, &session_catalog)
1195    };
1196
1197    let finishing = if optimizer.finishing().is_trivial(desc.arity()) {
1198        None
1199    } else {
1200        Some(optimizer.finishing().clone())
1201    };
1202
1203    let target_cluster = catalog.get_cluster(optimizer.cluster_id());
1204    let features = optimizer.config().features.clone();
1205
1206    let rows = optimizer_trace
1207        .into_rows(
1208            format,
1209            &config,
1210            &features,
1211            &expr_humanizer,
1212            finishing,
1213            Some(target_cluster),
1214            df_meta,
1215            stage,
1216            plan::ExplaineeStatementKind::Select,
1217            insights_ctx,
1218        )
1219        .await?;
1220
1221    Ok(rows)
1222}
1223
1224/// Creates a statistics oracle for query optimization.
1225///
1226/// This is a free-standing function that can be called from both the old peek sequencing
1227/// and the new frontend peek sequencing.
1228pub(crate) async fn statistics_oracle(
1229    session: &Session,
1230    source_ids: &BTreeSet<GlobalId>,
1231    query_as_of: &Antichain<Timestamp>,
1232    is_oneshot: bool,
1233    system_config: &vars::SystemVars,
1234    storage_collections: &dyn StorageCollections,
1235) -> Result<Box<dyn StatisticsOracle>, AdapterError> {
1236    if !session.vars().enable_session_cardinality_estimates() {
1237        let stats: Box<dyn StatisticsOracle> = Box::new(EmptyStatisticsOracle);
1238        return Ok(stats);
1239    }
1240
1241    let timeout = if is_oneshot {
1242        // TODO(mgree): ideally, we would shorten the timeout even more if we think the query could take the fast path
1243        system_config.optimizer_oneshot_stats_timeout()
1244    } else {
1245        system_config.optimizer_stats_timeout()
1246    };
1247
1248    let cached_stats = mz_ore::future::timeout(
1249        timeout,
1250        CachedStatisticsOracle::new(source_ids, query_as_of, storage_collections),
1251    )
1252    .await;
1253
1254    match cached_stats {
1255        Ok(stats) => Ok(Box::new(stats)),
1256        Err(mz_ore::future::TimeoutError::DeadlineElapsed) => {
1257            warn!(
1258                is_oneshot = is_oneshot,
1259                "optimizer statistics collection timed out after {}ms",
1260                timeout.as_millis()
1261            );
1262
1263            Ok(Box::new(EmptyStatisticsOracle))
1264        }
1265        Err(mz_ore::future::TimeoutError::Inner(e)) => Err(AdapterError::Storage(e)),
1266    }
1267}
1268
1269#[derive(Debug)]
1270struct CachedStatisticsOracle {
1271    cache: BTreeMap<GlobalId, usize>,
1272}
1273
1274impl CachedStatisticsOracle {
1275    pub async fn new(
1276        ids: &BTreeSet<GlobalId>,
1277        as_of: &Antichain<Timestamp>,
1278        storage_collections: &dyn StorageCollections,
1279    ) -> Result<Self, StorageError> {
1280        let mut cache = BTreeMap::new();
1281
1282        for id in ids {
1283            let stats = storage_collections.snapshot_stats(*id, as_of.clone()).await;
1284
1285            match stats {
1286                Ok(stats) => {
1287                    cache.insert(*id, stats.num_updates);
1288                }
1289                Err(StorageError::IdentifierMissing(id)) => {
1290                    ::tracing::debug!("no statistics for {id}")
1291                }
1292                Err(e) => return Err(e),
1293            }
1294        }
1295
1296        Ok(Self { cache })
1297    }
1298}
1299
1300impl StatisticsOracle for CachedStatisticsOracle {
1301    fn cardinality_estimate(&self, id: GlobalId) -> Option<usize> {
1302        self.cache.get(&id).map(|estimate| *estimate)
1303    }
1304
1305    fn as_map(&self) -> BTreeMap<GlobalId, usize> {
1306        self.cache.clone()
1307    }
1308}