use std::collections::{BTreeMap, BTreeSet};
use std::fmt;
use std::num::NonZeroUsize;
use differential_dataflow::consolidation::consolidate;
use futures::TryFutureExt;
use mz_adapter_types::compaction::CompactionWindow;
use mz_adapter_types::connection::ConnectionId;
use mz_cluster_client::ReplicaId;
use mz_compute_client::controller::PeekNotification;
use mz_compute_client::protocol::command::PeekTarget;
use mz_compute_client::protocol::response::PeekResponse;
use mz_compute_types::dataflows::{DataflowDescription, IndexImport};
use mz_compute_types::ComputeInstanceId;
use mz_controller_types::ClusterId;
use mz_expr::explain::{fmt_text_constant_rows, HumanizedExplain, HumanizerMode};
use mz_expr::{
permutation_for_arrangement, EvalError, Id, MirRelationExpr, MirScalarExpr,
OptimizedMirRelationExpr, RowSetFinishing,
};
use mz_ore::cast::CastFrom;
use mz_ore::str::{separated, StrExt};
use mz_ore::tracing::OpenTelemetryContext;
use mz_repr::explain::text::DisplayText;
use mz_repr::explain::{CompactScalars, IndexUsageType, PlanRenderingContext, UsedIndexes};
use mz_repr::{Diff, GlobalId, IntoRowIterator, RelationType, Row, RowCollection, RowIterator};
use serde::{Deserialize, Serialize};
use timely::progress::Timestamp;
use uuid::Uuid;
use crate::coord::timestamp_selection::TimestampDetermination;
use crate::optimize::OptimizerError;
use crate::statement_logging::{StatementEndedExecutionReason, StatementExecutionStrategy};
use crate::util::ResultExt;
use crate::{AdapterError, ExecuteContextExtra, ExecuteResponse};
#[derive(Debug)]
pub(crate) struct PendingPeek {
pub(crate) conn_id: ConnectionId,
pub(crate) cluster_id: ClusterId,
pub(crate) depends_on: BTreeSet<GlobalId>,
pub(crate) ctx_extra: ExecuteContextExtra,
pub(crate) is_fast_path: bool,
}
#[derive(Debug)]
pub enum PeekResponseUnary {
Rows(Box<dyn RowIterator + Send + Sync>),
Error(String),
Canceled,
}
#[derive(Clone, Debug)]
pub struct PeekDataflowPlan<T = mz_repr::Timestamp> {
pub(crate) desc: DataflowDescription<mz_compute_types::plan::Plan<T>, (), T>,
pub(crate) id: GlobalId,
key: Vec<MirScalarExpr>,
permutation: BTreeMap<usize, usize>,
thinned_arity: usize,
}
impl<T> PeekDataflowPlan<T> {
pub fn new(
desc: DataflowDescription<mz_compute_types::plan::Plan<T>, (), T>,
id: GlobalId,
typ: &RelationType,
) -> Self {
let arity = typ.arity();
let key = typ
.default_key()
.into_iter()
.map(MirScalarExpr::Column)
.collect::<Vec<_>>();
let (permutation, thinning) = permutation_for_arrangement(&key, arity);
Self {
desc,
id,
key,
permutation,
thinned_arity: thinning.len(),
}
}
}
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq, Eq, Ord, PartialOrd)]
pub enum FastPathPlan {
Constant(Result<Vec<(Row, Diff)>, EvalError>, RelationType),
PeekExisting(GlobalId, GlobalId, Option<Vec<Row>>, mz_expr::SafeMfpPlan),
PeekPersist(GlobalId, mz_expr::SafeMfpPlan),
}
impl<'a, T: 'a> DisplayText<PlanRenderingContext<'a, T>> for FastPathPlan {
fn fmt_text(
&self,
f: &mut fmt::Formatter<'_>,
ctx: &mut PlanRenderingContext<'a, T>,
) -> fmt::Result {
let redacted = ctx.config.redacted;
let mode = HumanizedExplain::new(ctx.config.redacted);
match self {
FastPathPlan::Constant(Ok(rows), _) => {
if !rows.is_empty() {
writeln!(f, "{}Constant", ctx.indent)?;
*ctx.as_mut() += 1;
fmt_text_constant_rows(
f,
rows.iter().map(|(row, diff)| (row, diff)),
ctx.as_mut(),
redacted,
)?;
*ctx.as_mut() -= 1;
} else {
writeln!(f, "{}Constant <empty>", ctx.as_mut())?;
}
Ok(())
}
FastPathPlan::Constant(Err(err), _) => {
if redacted {
writeln!(f, "{}Error █", ctx.as_mut())
} else {
writeln!(f, "{}Error {}", ctx.as_mut(), err.to_string().escaped())
}
}
FastPathPlan::PeekExisting(coll_id, idx_id, literal_constraints, mfp) => {
ctx.as_mut().set();
let (map, filter, project) = mfp.as_map_filter_project();
let cols = if !ctx.config.humanized_exprs {
None
} else if let Some(cols) = ctx.humanizer.column_names_for_id(*idx_id) {
let cols = itertools::chain(
cols.iter().cloned(),
std::iter::repeat(String::new()).take(map.len()),
)
.collect();
Some(cols)
} else {
None
};
if project.len() != mfp.input_arity + map.len()
|| !project.iter().enumerate().all(|(i, o)| i == *o)
{
let outputs = mode.seq(&project, cols.as_ref());
let outputs = CompactScalars(outputs);
writeln!(f, "{}Project ({})", ctx.as_mut(), outputs)?;
*ctx.as_mut() += 1;
}
if !filter.is_empty() {
let predicates = separated(" AND ", mode.seq(&filter, cols.as_ref()));
writeln!(f, "{}Filter {}", ctx.as_mut(), predicates)?;
*ctx.as_mut() += 1;
}
if !map.is_empty() {
let scalars = mode.seq(&map, cols.as_ref());
let scalars = CompactScalars(scalars);
writeln!(f, "{}Map ({})", ctx.as_mut(), scalars)?;
*ctx.as_mut() += 1;
}
MirRelationExpr::fmt_indexed_filter(
f,
ctx,
coll_id,
idx_id,
literal_constraints.clone(),
None,
)?;
writeln!(f)?;
ctx.as_mut().reset();
Ok(())
}
FastPathPlan::PeekPersist(gid, mfp) => {
ctx.as_mut().set();
let (map, filter, project) = mfp.as_map_filter_project();
let cols = if !ctx.config.humanized_exprs {
None
} else if let Some(cols) = ctx.humanizer.column_names_for_id(*gid) {
let cols = itertools::chain(
cols.iter().cloned(),
std::iter::repeat(String::new()).take(map.len()),
)
.collect::<Vec<_>>();
Some(cols)
} else {
None
};
if project.len() != mfp.input_arity + map.len()
|| !project.iter().enumerate().all(|(i, o)| i == *o)
{
let outputs = mode.seq(&project, cols.as_ref());
let outputs = CompactScalars(outputs);
writeln!(f, "{}Project ({})", ctx.as_mut(), outputs)?;
*ctx.as_mut() += 1;
}
if !filter.is_empty() {
let predicates = separated(" AND ", mode.seq(&filter, cols.as_ref()));
writeln!(f, "{}Filter {}", ctx.as_mut(), predicates)?;
*ctx.as_mut() += 1;
}
if !map.is_empty() {
let scalars = mode.seq(&map, cols.as_ref());
let scalars = CompactScalars(scalars);
writeln!(f, "{}Map ({})", ctx.as_mut(), scalars)?;
*ctx.as_mut() += 1;
}
let human_id = ctx
.humanizer
.humanize_id(*gid)
.unwrap_or_else(|| gid.to_string());
writeln!(f, "{}PeekPersist {human_id}", ctx.as_mut())?;
ctx.as_mut().reset();
Ok(())
}
}?;
Ok(())
}
}
#[derive(Debug)]
pub struct PlannedPeek {
pub plan: PeekPlan,
pub determination: TimestampDetermination<mz_repr::Timestamp>,
pub conn_id: ConnectionId,
pub source_arity: usize,
pub source_ids: BTreeSet<GlobalId>,
}
#[derive(Clone, Debug)]
pub enum PeekPlan<T = mz_repr::Timestamp> {
FastPath(FastPathPlan),
SlowPath(PeekDataflowPlan<T>),
}
fn mfp_to_safe_plan(
mfp: mz_expr::MapFilterProject,
) -> Result<mz_expr::SafeMfpPlan, OptimizerError> {
mfp.into_plan()
.map_err(OptimizerError::Internal)?
.into_nontemporal()
.map_err(|_e| OptimizerError::UnsafeMfpPlan)
}
fn permute_oneshot_mfp_around_index(
mfp: mz_expr::MapFilterProject,
key: &[MirScalarExpr],
) -> Result<mz_expr::SafeMfpPlan, OptimizerError> {
let input_arity = mfp.input_arity;
let mut safe_mfp = mfp_to_safe_plan(mfp)?;
let (permute, thinning) = mz_expr::permutation_for_arrangement(key, input_arity);
safe_mfp.permute(permute, key.len() + thinning.len());
Ok(safe_mfp)
}
pub fn create_fast_path_plan<T: Timestamp>(
dataflow_plan: &mut DataflowDescription<OptimizedMirRelationExpr, (), T>,
view_id: GlobalId,
finishing: Option<&RowSetFinishing>,
persist_fast_path_limit: usize,
) -> Result<Option<FastPathPlan>, OptimizerError> {
if dataflow_plan.objects_to_build.len() >= 1 && dataflow_plan.objects_to_build[0].id == view_id
{
let mut mir = &*dataflow_plan.objects_to_build[0].plan.as_inner_mut();
if let Some((rows, ..)) = mir.as_const() {
return Ok(Some(FastPathPlan::Constant(
rows.clone()
.map(|rows| rows.into_iter().map(|(row, diff)| (row, diff)).collect()),
mir.typ(),
)));
} else {
if let MirRelationExpr::TopK {
input,
group_key,
order_key,
limit,
offset,
monotonic: _,
expected_group_size: _,
} = mir
{
if let Some(finishing) = finishing {
if group_key.is_empty() && *order_key == finishing.order_by && *offset == 0 {
let finishing_limits_at_least_as_topk = match (limit, finishing.limit) {
(None, _) => true,
(Some(..), None) => false,
(Some(topk_limit), Some(finishing_limit)) => {
if let Some(l) = topk_limit.as_literal_int64() {
l >= *finishing_limit
} else {
false
}
}
};
if finishing_limits_at_least_as_topk {
mir = input;
}
}
}
}
let (mfp, mir) = mz_expr::MapFilterProject::extract_from_expression(mir);
match mir {
MirRelationExpr::Get {
id: Id::Global(get_id),
..
} => {
for (index_id, IndexImport { desc, .. }) in dataflow_plan.index_imports.iter() {
if desc.on_id == *get_id {
return Ok(Some(FastPathPlan::PeekExisting(
*get_id,
*index_id,
None,
permute_oneshot_mfp_around_index(mfp, &desc.key)?,
)));
}
}
let safe_mfp = mfp_to_safe_plan(mfp)?;
let (_m, filters, _p) = safe_mfp.as_map_filter_project();
let small_finish = match &finishing {
None => false,
Some(RowSetFinishing {
order_by,
limit,
offset,
..
}) => {
order_by.is_empty()
&& limit.iter().any(|l| {
usize::cast_from(*l) + *offset < persist_fast_path_limit
})
}
};
if filters.is_empty() && small_finish {
return Ok(Some(FastPathPlan::PeekPersist(*get_id, safe_mfp)));
}
}
MirRelationExpr::Join { implementation, .. } => {
if let mz_expr::JoinImplementation::IndexedFilter(coll_id, idx_id, key, vals) =
implementation
{
return Ok(Some(FastPathPlan::PeekExisting(
*coll_id,
*idx_id,
Some(vals.clone()),
permute_oneshot_mfp_around_index(mfp, key)?,
)));
}
}
_ => {}
}
}
}
Ok(None)
}
impl FastPathPlan {
pub fn used_indexes(&self, finishing: Option<&RowSetFinishing>) -> UsedIndexes {
match self {
FastPathPlan::Constant(..) => UsedIndexes::default(),
FastPathPlan::PeekExisting(_coll_id, idx_id, literal_constraints, _mfp) => {
if literal_constraints.is_some() {
UsedIndexes::new([(*idx_id, vec![IndexUsageType::Lookup(*idx_id)])].into())
} else if finishing.map_or(false, |f| f.limit.is_some() && f.order_by.is_empty()) {
UsedIndexes::new([(*idx_id, vec![IndexUsageType::FastPathLimit])].into())
} else {
UsedIndexes::new([(*idx_id, vec![IndexUsageType::FullScan])].into())
}
}
FastPathPlan::PeekPersist(..) => UsedIndexes::default(),
}
}
}
impl crate::coord::Coordinator {
#[mz_ore::instrument(level = "debug")]
pub async fn implement_peek_plan(
&mut self,
ctx_extra: &mut ExecuteContextExtra,
plan: PlannedPeek,
finishing: RowSetFinishing,
compute_instance: ComputeInstanceId,
target_replica: Option<ReplicaId>,
max_result_size: u64,
max_returned_query_size: Option<u64>,
) -> Result<crate::ExecuteResponse, AdapterError> {
let PlannedPeek {
plan: fast_path,
determination,
conn_id,
source_arity,
source_ids,
} = plan;
if let PeekPlan::FastPath(FastPathPlan::Constant(rows, _)) = fast_path {
let mut rows = match rows {
Ok(rows) => rows,
Err(e) => return Err(e.into()),
};
consolidate(&mut rows);
let mut results = Vec::new();
for (row, count) in rows {
if count < 0 {
Err(EvalError::InvalidParameterValue(
format!("Negative multiplicity in constant result: {}", count).into(),
))?
};
if count > 0 {
let count = usize::cast_from(
u64::try_from(count).expect("known to be positive from check above"),
);
results.push((
row,
NonZeroUsize::new(count).expect("known to be non-zero from check above"),
));
}
}
let row_collection = RowCollection::new(&results);
let duration_histogram = self.metrics.row_set_finishing_seconds();
let (ret, reason) = match finishing.finish(
row_collection,
max_result_size,
max_returned_query_size,
&duration_histogram,
) {
Ok(rows) => {
let rows_returned = u64::cast_from(rows.count());
(
Ok(Self::send_immediate_rows(rows)),
StatementEndedExecutionReason::Success {
rows_returned: Some(rows_returned),
execution_strategy: Some(StatementExecutionStrategy::Constant),
},
)
}
Err(error) => (
Err(AdapterError::ResultSize(error.clone())),
StatementEndedExecutionReason::Errored { error },
),
};
self.retire_execution(reason, std::mem::take(ctx_extra));
return ret;
}
let timestamp = determination.timestamp_context.timestamp_or_default();
if let Some(id) = ctx_extra.contents() {
self.set_statement_execution_timestamp(id, timestamp)
}
let (peek_command, drop_dataflow, is_fast_path, peek_target, strategy) = match fast_path {
PeekPlan::FastPath(FastPathPlan::PeekExisting(
_coll_id,
idx_id,
literal_constraints,
map_filter_project,
)) => (
(literal_constraints, timestamp, map_filter_project),
None,
true,
PeekTarget::Index { id: idx_id },
StatementExecutionStrategy::FastPath,
),
PeekPlan::FastPath(FastPathPlan::PeekPersist(coll_id, map_filter_project)) => {
let peek_command = (None, timestamp, map_filter_project);
let metadata = self
.controller
.storage
.collection_metadata(coll_id)
.expect("storage collection for fast-path peek")
.clone();
(
peek_command,
None,
true,
PeekTarget::Persist {
id: coll_id,
metadata,
},
StatementExecutionStrategy::PersistFastPath,
)
}
PeekPlan::SlowPath(PeekDataflowPlan {
desc: dataflow,
id: index_id,
key: index_key,
permutation: index_permutation,
thinned_arity: index_thinned_arity,
}) => {
let output_ids = dataflow.export_ids().collect();
self.controller
.compute
.create_dataflow(compute_instance, dataflow, None)
.unwrap_or_terminate("cannot fail to create dataflows");
self.initialize_compute_read_policies(
output_ids,
compute_instance,
CompactionWindow::DisableCompaction,
)
.await;
let mut map_filter_project = mz_expr::MapFilterProject::new(source_arity);
map_filter_project
.permute(index_permutation, index_key.len() + index_thinned_arity);
let map_filter_project = mfp_to_safe_plan(map_filter_project)?;
(
(None, timestamp, map_filter_project),
Some(index_id),
false,
PeekTarget::Index { id: index_id },
StatementExecutionStrategy::Standard,
)
}
_ => {
unreachable!()
}
};
let (rows_tx, rows_rx) = tokio::sync::oneshot::channel();
let mut uuid = Uuid::new_v4();
while self.pending_peeks.contains_key(&uuid) {
uuid = Uuid::new_v4();
}
self.pending_peeks.insert(
uuid,
PendingPeek {
conn_id: conn_id.clone(),
cluster_id: compute_instance,
depends_on: source_ids,
ctx_extra: std::mem::take(ctx_extra),
is_fast_path,
},
);
self.client_pending_peeks
.entry(conn_id)
.or_default()
.insert(uuid, compute_instance);
let (literal_constraints, timestamp, map_filter_project) = peek_command;
self.controller
.compute
.peek(
compute_instance,
peek_target,
literal_constraints,
uuid,
timestamp,
finishing.clone(),
map_filter_project,
target_replica,
rows_tx,
)
.unwrap_or_terminate("cannot fail to peek");
let duration_histogram = self.metrics.row_set_finishing_seconds();
let rows_rx = rows_rx.map_ok_or_else(
|e| PeekResponseUnary::Error(e.to_string()),
move |resp| match resp {
PeekResponse::Rows(rows) => {
match finishing.finish(
rows,
max_result_size,
max_returned_query_size,
&duration_histogram,
) {
Ok(rows) => PeekResponseUnary::Rows(Box::new(rows)),
Err(e) => PeekResponseUnary::Error(e),
}
}
PeekResponse::Canceled => PeekResponseUnary::Canceled,
PeekResponse::Error(e) => PeekResponseUnary::Error(e),
},
);
if let Some(index_id) = drop_dataflow {
self.remove_compute_ids_from_timeline(vec![(compute_instance, index_id)]);
self.drop_indexes(vec![(compute_instance, index_id)]);
}
Ok(crate::ExecuteResponse::SendingRows {
future: Box::pin(rows_rx),
instance_id: compute_instance,
strategy,
})
}
#[mz_ore::instrument(level = "debug")]
pub(crate) fn cancel_pending_peeks(&mut self, conn_id: &ConnectionId) {
if let Some(uuids) = self.client_pending_peeks.remove(conn_id) {
self.metrics
.canceled_peeks
.with_label_values(&[])
.inc_by(u64::cast_from(uuids.len()));
let mut inverse: BTreeMap<ComputeInstanceId, BTreeSet<Uuid>> = Default::default();
for (uuid, compute_instance) in &uuids {
inverse.entry(*compute_instance).or_default().insert(*uuid);
}
for (compute_instance, uuids) in inverse {
for uuid in uuids {
let _ = self.controller.compute.cancel_peek(
compute_instance,
uuid,
PeekResponse::Canceled,
);
}
}
let peeks = uuids
.iter()
.filter_map(|(uuid, _)| self.pending_peeks.remove(uuid))
.collect::<Vec<_>>();
for peek in peeks {
self.retire_execution(StatementEndedExecutionReason::Canceled, peek.ctx_extra);
}
}
}
pub(crate) fn handle_peek_notification(
&mut self,
uuid: Uuid,
notification: PeekNotification,
otel_ctx: OpenTelemetryContext,
) {
if let Some(PendingPeek {
conn_id: _,
cluster_id: _,
depends_on: _,
ctx_extra,
is_fast_path,
}) = self.remove_pending_peek(&uuid)
{
let reason = match notification {
PeekNotification::Success { rows: num_rows } => {
let strategy = if is_fast_path {
StatementExecutionStrategy::FastPath
} else {
StatementExecutionStrategy::Standard
};
StatementEndedExecutionReason::Success {
rows_returned: Some(num_rows),
execution_strategy: Some(strategy),
}
}
PeekNotification::Error(error) => StatementEndedExecutionReason::Errored { error },
PeekNotification::Canceled => StatementEndedExecutionReason::Canceled,
};
otel_ctx.attach_as_parent();
self.retire_execution(reason, ctx_extra);
}
}
pub(crate) fn remove_pending_peek(&mut self, uuid: &Uuid) -> Option<PendingPeek> {
let pending_peek = self.pending_peeks.remove(uuid);
if let Some(pending_peek) = &pending_peek {
let uuids = self
.client_pending_peeks
.get_mut(&pending_peek.conn_id)
.expect("coord peek state is inconsistent");
uuids.remove(uuid);
if uuids.is_empty() {
self.client_pending_peeks.remove(&pending_peek.conn_id);
}
}
pending_peek
}
pub(crate) fn send_immediate_rows<I>(rows: I) -> ExecuteResponse
where
I: IntoRowIterator,
I::Iter: Send + Sync + 'static,
{
let rows = Box::new(rows.into_row_iter());
ExecuteResponse::SendingRowsImmediate { rows }
}
}
#[cfg(test)]
mod tests {
use mz_expr::func::IsNull;
use mz_expr::{MapFilterProject, UnaryFunc};
use mz_ore::str::Indent;
use mz_repr::explain::text::text_string_at;
use mz_repr::explain::{DummyHumanizer, ExplainConfig, PlanRenderingContext};
use mz_repr::{ColumnType, Datum, ScalarType};
use super::*;
#[mz_ore::test]
#[cfg_attr(miri, ignore)] fn test_fast_path_plan_as_text() {
let typ = RelationType::new(vec![ColumnType {
scalar_type: ScalarType::String,
nullable: false,
}]);
let constant_err = FastPathPlan::Constant(Err(EvalError::DivisionByZero), typ.clone());
let no_lookup = FastPathPlan::PeekExisting(
GlobalId::User(8),
GlobalId::User(10),
None,
MapFilterProject::new(4)
.map(Some(MirScalarExpr::column(0).or(MirScalarExpr::column(2))))
.project([1, 4])
.into_plan()
.expect("invalid plan")
.into_nontemporal()
.expect("invalid nontemporal"),
);
let lookup = FastPathPlan::PeekExisting(
GlobalId::User(9),
GlobalId::User(11),
Some(vec![Row::pack(Some(Datum::Int32(5)))]),
MapFilterProject::new(3)
.filter(Some(
MirScalarExpr::column(0).call_unary(UnaryFunc::IsNull(IsNull)),
))
.into_plan()
.expect("invalid plan")
.into_nontemporal()
.expect("invalid nontemporal"),
);
let humanizer = DummyHumanizer;
let config = ExplainConfig {
redacted: false,
..Default::default()
};
let ctx_gen = || {
let indent = Indent::default();
let annotations = BTreeMap::new();
PlanRenderingContext::<FastPathPlan>::new(indent, &humanizer, annotations, &config)
};
let constant_err_exp = "Error \"division by zero\"\n";
let no_lookup_exp =
"Project (#1, #4)\n Map ((#0 OR #2))\n ReadIndex on=u8 [DELETED INDEX]=[*** full scan ***]\n";
let lookup_exp =
"Filter (#0) IS NULL\n ReadIndex on=u9 [DELETED INDEX]=[lookup value=(5)]\n";
assert_eq!(text_string_at(&constant_err, ctx_gen), constant_err_exp);
assert_eq!(text_string_at(&no_lookup, ctx_gen), no_lookup_exp);
assert_eq!(text_string_at(&lookup, ctx_gen), lookup_exp);
let mut constant_rows = vec![
(Row::pack(Some(Datum::String("hello"))), 1),
(Row::pack(Some(Datum::String("world"))), 2),
(Row::pack(Some(Datum::String("star"))), 500),
];
let constant_exp1 =
"Constant\n - (\"hello\")\n - ((\"world\") x 2)\n - ((\"star\") x 500)\n";
assert_eq!(
text_string_at(
&FastPathPlan::Constant(Ok(constant_rows.clone()), typ.clone()),
ctx_gen
),
constant_exp1
);
constant_rows.extend((0..20).map(|i| (Row::pack(Some(Datum::String(&i.to_string()))), 1)));
let constant_exp2 =
"Constant\n total_rows (diffs absed): 523\n first_rows:\n - (\"hello\")\
\n - ((\"world\") x 2)\n - ((\"star\") x 500)\n - (\"0\")\n - (\"1\")\
\n - (\"2\")\n - (\"3\")\n - (\"4\")\n - (\"5\")\n - (\"6\")\
\n - (\"7\")\n - (\"8\")\n - (\"9\")\n - (\"10\")\n - (\"11\")\
\n - (\"12\")\n - (\"13\")\n - (\"14\")\n - (\"15\")\n - (\"16\")\n";
assert_eq!(
text_string_at(&FastPathPlan::Constant(Ok(constant_rows), typ), ctx_gen),
constant_exp2
);
}
}