1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 622 623 624 625 626 627 628 629 630 631 632 633 634 635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666 667 668 669 670 671 672 673 674 675 676 677 678 679 680 681 682 683 684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711 712 713 714 715 716 717 718 719 720 721 722 723 724 725 726 727 728 729 730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748 749 750 751 752 753 754 755 756 757 758 759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784 785 786 787 788 789 790 791 792 793 794 795 796 797 798 799 800 801 802 803 804 805 806 807 808 809 810 811 812 813 814 815 816 817 818 819 820 821 822 823 824 825 826 827 828 829 830 831 832 833 834 835 836 837 838 839 840 841 842 843 844 845 846 847 848 849 850 851 852 853 854 855 856 857 858 859 860 861 862 863 864 865 866 867 868 869 870 871 872 873 874 875 876 877 878 879 880 881 882 883 884 885 886 887 888 889 890 891 892 893 894 895 896 897 898 899 900 901 902 903 904 905 906 907 908 909 910 911 912 913 914 915 916 917 918 919 920 921 922 923 924 925 926 927 928 929 930 931 932 933 934 935 936 937 938 939 940 941 942 943 944 945 946 947 948 949 950 951 952 953 954 955 956 957 958 959 960 961 962 963 964 965 966 967 968 969 970 971 972 973 974 975 976 977 978 979 980 981 982 983 984 985 986 987 988 989 990 991 992 993 994 995 996 997 998 999 1000 1001 1002 1003 1004 1005 1006 1007 1008 1009 1010 1011 1012 1013 1014 1015 1016 1017 1018 1019 1020 1021 1022 1023 1024 1025 1026 1027 1028 1029 1030 1031 1032 1033 1034 1035 1036 1037 1038 1039 1040 1041 1042 1043 1044 1045 1046 1047 1048 1049 1050 1051 1052 1053 1054 1055 1056 1057 1058 1059 1060 1061 1062 1063 1064 1065 1066 1067 1068 1069 1070 1071 1072 1073 1074 1075 1076 1077 1078 1079 1080 1081 1082 1083 1084 1085 1086 1087 1088 1089 1090 1091 1092 1093 1094 1095 1096 1097 1098 1099 1100 1101 1102 1103 1104 1105 1106 1107 1108 1109 1110 1111 1112 1113 1114 1115 1116 1117 1118 1119 1120 1121 1122 1123 1124 1125 1126 1127 1128 1129 1130 1131 1132 1133 1134 1135 1136 1137 1138 1139 1140 1141 1142 1143 1144 1145 1146 1147 1148 1149 1150 1151 1152 1153 1154 1155 1156 1157 1158 1159 1160 1161 1162 1163 1164 1165 1166 1167 1168 1169 1170 1171 1172 1173 1174 1175 1176 1177 1178 1179 1180 1181 1182 1183 1184 1185 1186 1187 1188 1189 1190 1191 1192 1193 1194 1195 1196 1197 1198 1199 1200 1201 1202 1203 1204 1205 1206 1207 1208 1209 1210 1211 1212 1213 1214 1215 1216 1217 1218 1219 1220 1221 1222 1223 1224 1225 1226 1227 1228 1229 1230 1231 1232 1233 1234 1235 1236 1237 1238 1239 1240 1241 1242 1243 1244 1245 1246 1247 1248 1249 1250 1251 1252 1253 1254 1255 1256 1257 1258 1259 1260 1261 1262 1263 1264 1265 1266 1267 1268 1269 1270 1271 1272 1273 1274 1275 1276 1277 1278 1279 1280 1281 1282 1283 1284 1285 1286 1287 1288 1289 1290 1291 1292 1293 1294 1295 1296 1297 1298 1299 1300 1301 1302 1303 1304 1305 1306 1307 1308 1309 1310 1311 1312 1313 1314 1315 1316 1317 1318 1319 1320 1321 1322 1323 1324 1325 1326 1327 1328 1329 1330 1331 1332 1333 1334 1335 1336 1337 1338 1339 1340 1341 1342 1343 1344 1345 1346 1347 1348 1349 1350 1351 1352 1353 1354 1355 1356 1357 1358 1359 1360 1361 1362 1363 1364 1365 1366 1367 1368 1369 1370 1371 1372 1373 1374 1375 1376 1377 1378 1379 1380 1381 1382 1383 1384 1385 1386 1387 1388 1389 1390 1391 1392 1393 1394 1395 1396 1397 1398 1399 1400 1401 1402 1403 1404 1405 1406
// Copyright Materialize, Inc. and contributors. All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.
//! Logic for processing client [`Command`]s. Each [`Command`] is initiated by a
//! client via some external Materialize API (ex: HTTP and psql).
use differential_dataflow::lattice::Lattice;
use mz_adapter_types::dyncfgs::ALLOW_USER_SESSIONS;
use mz_sql::session::metadata::SessionMetadata;
use std::collections::{BTreeMap, BTreeSet};
use std::net::IpAddr;
use std::sync::Arc;
use futures::future::LocalBoxFuture;
use futures::FutureExt;
use mz_adapter_types::connection::{ConnectionId, ConnectionIdType};
use mz_catalog::memory::objects::{CatalogItem, DataSourceDesc, Source};
use mz_catalog::SYSTEM_CONN_ID;
use mz_ore::task;
use mz_ore::tracing::OpenTelemetryContext;
use mz_ore::{instrument, soft_panic_or_log};
use mz_repr::role_id::RoleId;
use mz_repr::{ScalarType, Timestamp};
use mz_sql::ast::{
AlterConnectionAction, AlterConnectionStatement, AlterSourceAction, AstInfo, ConstantVisitor,
CopyRelation, CopyStatement, CreateSourceOptionName, Raw, Statement, SubscribeStatement,
};
use mz_sql::catalog::RoleAttributes;
use mz_sql::names::{Aug, PartialItemName, ResolvedIds};
use mz_sql::plan::{
AbortTransactionPlan, CommitTransactionPlan, CreateRolePlan, Params, Plan,
StatementClassification, TransactionType,
};
use mz_sql::pure::{
materialized_view_option_contains_temporal, purify_create_materialized_view_options,
};
use mz_sql::rbac;
use mz_sql::rbac::CREATE_ITEM_USAGE;
use mz_sql::session::user::User;
use mz_sql::session::vars::{
EndTransactionAction, OwnedVarInput, Value, Var, NETWORK_POLICY, STATEMENT_LOGGING_SAMPLE_RATE,
};
use mz_sql_parser::ast::display::AstDisplay;
use mz_sql_parser::ast::{
CreateMaterializedViewStatement, ExplainPlanStatement, Explainee, InsertStatement,
WithOptionValue,
};
use mz_storage_types::sources::Timeline;
use opentelemetry::trace::TraceContextExt;
use tokio::sync::{mpsc, oneshot};
use tracing::{debug_span, info, warn, Instrument};
use tracing_opentelemetry::OpenTelemetrySpanExt;
use crate::command::{
CatalogSnapshot, Command, ExecuteResponse, GetVariablesResponse, StartupResponse,
};
use crate::coord::appends::PendingWriteTxn;
use crate::coord::{
validate_ip_with_policy_rules, ConnMeta, Coordinator, DeferredPlanStatement, Message,
PendingTxn, PlanStatement, PlanValidity, PurifiedStatementReady,
};
use crate::error::AdapterError;
use crate::notice::AdapterNotice;
use crate::session::{Session, TransactionOps, TransactionStatus};
use crate::util::{ClientTransmitter, ResultExt};
use crate::webhook::{
AppendWebhookResponse, AppendWebhookValidator, WebhookAppender, WebhookAppenderInvalidator,
};
use crate::{catalog, metrics, AppendWebhookError, ExecuteContext, TimestampProvider};
use super::ExecuteContextExtra;
impl Coordinator {
/// BOXED FUTURE: As of Nov 2023 the returned Future from this function was 58KB. This would
/// get stored on the stack which is bad for runtime performance, and blow up our stack usage.
/// Because of that we purposefully move this Future onto the heap (i.e. Box it).
pub(crate) fn handle_command(&mut self, mut cmd: Command) -> LocalBoxFuture<()> {
async move {
if let Some(session) = cmd.session_mut() {
session.apply_external_metadata_updates();
}
match cmd {
Command::Startup {
tx,
user,
conn_id,
secret_key,
uuid,
client_ip,
application_name,
notice_tx,
} => {
// Note: We purposefully do not use a ClientTransmitter here because startup
// handles errors and cleanup of sessions itself.
self.handle_startup(
tx,
user,
conn_id,
secret_key,
uuid,
client_ip,
application_name,
notice_tx,
)
.await;
}
Command::Execute {
portal_name,
session,
tx,
outer_ctx_extra,
} => {
let tx = ClientTransmitter::new(tx, self.internal_cmd_tx.clone());
self.handle_execute(portal_name, session, tx, outer_ctx_extra)
.await;
}
Command::RetireExecute { data, reason } => self.retire_execution(reason, data),
Command::CancelRequest {
conn_id,
secret_key,
} => {
self.handle_cancel(conn_id, secret_key).await;
}
Command::PrivilegedCancelRequest { conn_id } => {
self.handle_privileged_cancel(conn_id).await;
}
Command::GetWebhook {
database,
schema,
name,
tx,
} => {
self.handle_get_webhook(database, schema, name, tx);
}
Command::GetSystemVars { conn_id, tx } => {
let conn = &self.active_conns[&conn_id];
let vars = GetVariablesResponse::new(
self.catalog.system_config().iter().filter(|var| {
var.visible(conn.user(), Some(self.catalog.system_config()))
.is_ok()
}),
);
let _ = tx.send(Ok(vars));
}
Command::SetSystemVars { vars, conn_id, tx } => {
let mut ops = Vec::with_capacity(vars.len());
let conn = &self.active_conns[&conn_id];
for (name, value) in vars {
if let Err(e) = self.catalog().system_config().get(&name).and_then(|var| {
var.visible(conn.user(), Some(self.catalog.system_config()))
}) {
let _ = tx.send(Err(e.into()));
return;
}
ops.push(catalog::Op::UpdateSystemConfiguration {
name,
value: OwnedVarInput::Flat(value),
});
}
let result = self.catalog_transact_conn(Some(&conn_id), ops).await;
let _ = tx.send(result);
}
Command::Terminate { conn_id, tx } => {
self.handle_terminate(conn_id).await;
// Note: We purposefully do not use a ClientTransmitter here because we're already
// terminating the provided session.
if let Some(tx) = tx {
let _ = tx.send(Ok(()));
}
}
Command::Commit {
action,
session,
tx,
} => {
let tx = ClientTransmitter::new(tx, self.internal_cmd_tx.clone());
// We reach here not through a statement execution, but from the
// "commit" pgwire command. Thus, we just generate a default statement
// execution context (once statement logging is implemented, this will cause nothing to be logged
// when the execution finishes.)
let ctx = ExecuteContext::from_parts(
tx,
self.internal_cmd_tx.clone(),
session,
Default::default(),
);
let plan = match action {
EndTransactionAction::Commit => {
Plan::CommitTransaction(CommitTransactionPlan {
transaction_type: TransactionType::Implicit,
})
}
EndTransactionAction::Rollback => {
Plan::AbortTransaction(AbortTransactionPlan {
transaction_type: TransactionType::Implicit,
})
}
};
let conn_id = ctx.session().conn_id().clone();
self.sequence_plan(ctx, plan, ResolvedIds::empty()).await;
// Part of the Command::Commit contract is that the Coordinator guarantees that
// it has cleared its transaction state for the connection.
self.clear_connection(&conn_id).await;
}
Command::CatalogSnapshot { tx } => {
let _ = tx.send(CatalogSnapshot {
catalog: self.owned_catalog(),
});
}
Command::CheckConsistency { tx } => {
let _ = tx.send(self.check_consistency());
}
Command::Dump { tx } => {
let _ = tx.send(self.dump().await);
}
}
}
.instrument(debug_span!("handle_command"))
.boxed_local()
}
#[mz_ore::instrument(level = "debug")]
async fn handle_startup(
&mut self,
tx: oneshot::Sender<Result<StartupResponse, AdapterError>>,
user: User,
conn_id: ConnectionId,
secret_key: u32,
uuid: uuid::Uuid,
client_ip: Option<IpAddr>,
application_name: String,
notice_tx: mpsc::UnboundedSender<AdapterNotice>,
) {
// Early return if successful, otherwise cleanup any possible state.
match self.handle_startup_inner(&user, &conn_id, &client_ip).await {
Ok((role_id, session_defaults)) => {
let session_type = metrics::session_type_label_value(&user);
self.metrics
.active_sessions
.with_label_values(&[session_type])
.inc();
let conn = ConnMeta {
secret_key,
notice_tx,
drop_sinks: BTreeSet::new(),
pending_cluster_alters: BTreeSet::new(),
connected_at: self.now(),
user,
application_name,
uuid,
client_ip,
conn_id: conn_id.clone(),
authenticated_role: role_id,
deferred_lock: None,
};
let update = self.catalog().state().pack_session_update(&conn, 1);
let update = self.catalog().state().resolve_builtin_table_update(update);
self.begin_session_for_statement_logging(&conn);
self.active_conns.insert(conn_id.clone(), conn);
// Note: Do NOT await the notify here, we pass this back to
// whatever requested the startup to prevent blocking the
// Coordinator on a builtin table update.
let notify = self.builtin_table_update().defer(vec![update]);
let resp = Ok(StartupResponse {
role_id,
write_notify: Box::pin(notify),
session_defaults,
catalog: self.owned_catalog(),
});
if tx.send(resp).is_err() {
// Failed to send to adapter, but everything is setup so we can terminate
// normally.
self.handle_terminate(conn_id).await;
}
}
Err(e) => {
// Error during startup or sending to adapter, cleanup possible state created by
// handle_startup_inner. A user may have been created and it can stay; no need to
// delete it.
self.catalog_mut()
.drop_temporary_schema(&conn_id)
.unwrap_or_terminate("unable to drop temporary schema");
// Communicate the error back to the client. No need to
// handle failures to send the error back; we've already
// cleaned up all necessary state.
let _ = tx.send(Err(e));
}
}
}
// Failible startup work that needs to be cleaned up on error.
async fn handle_startup_inner(
&mut self,
user: &User,
conn_id: &ConnectionId,
client_ip: &Option<IpAddr>,
) -> Result<(RoleId, BTreeMap<String, OwnedVarInput>), AdapterError> {
if self.catalog().try_get_role_by_name(&user.name).is_none() {
// If the user has made it to this point, that means they have been fully authenticated.
// This includes preventing any user, except a pre-defined set of system users, from
// connecting to an internal port. Therefore it's ok to always create a new role for the
// user.
let attributes = RoleAttributes::new();
let plan = CreateRolePlan {
name: user.name.to_string(),
attributes,
};
self.sequence_create_role_for_startup(plan).await?;
}
let role_id = self
.catalog()
.try_get_role_by_name(&user.name)
.expect("created above")
.id;
if role_id.is_user() && !ALLOW_USER_SESSIONS.get(self.catalog().system_config().dyncfgs()) {
return Err(AdapterError::UserSessionsDisallowed);
}
// Initialize the default session variables for this role.
let mut session_defaults = BTreeMap::new();
let system_config = self.catalog().state().system_config();
// Override the session with any system defaults.
session_defaults.extend(
system_config
.iter_session()
.map(|v| (v.name().to_string(), OwnedVarInput::Flat(v.value()))),
);
// Special case.
let statement_logging_default = system_config
.statement_logging_default_sample_rate()
.format();
session_defaults.insert(
STATEMENT_LOGGING_SAMPLE_RATE.name().to_string(),
OwnedVarInput::Flat(statement_logging_default),
);
// Override system defaults with role defaults.
session_defaults.extend(
self.catalog()
.get_role(&role_id)
.vars()
.map(|(name, val)| (name.to_string(), val.clone())),
);
// Validate network policies for external users. Internal users can only connect on the
// internal interfaces (internal HTTP/ pgwire). It is up to the person deploying the system
// to ensure these internal interfaces are well secured.
//
// HACKY(parkmycar): We don't have a fully formed session yet for this role, but we want
// the default network policy for this role, so we read directly out of what the session
// will get initialized with.
if !user.is_internal() {
let network_policy_name = session_defaults
.get(NETWORK_POLICY.name())
.and_then(|value| match value {
OwnedVarInput::Flat(name) => Some(name.clone()),
OwnedVarInput::SqlSet(names) => {
tracing::error!(?names, "found multiple network policies");
None
}
})
.unwrap_or(system_config.default_network_policy_name());
let maybe_network_policy = self
.catalog()
.get_network_policy_by_name(&network_policy_name);
let Some(network_policy) = maybe_network_policy else {
// We should prevent dropping the default network policy, or setting the policy
// to something that doesn't exist, so complain loudly if this occurs.
tracing::error!(
network_policy_name,
"default network policy does not exist. All user traffic will be blocked"
);
let reason = match client_ip {
Some(ip) => super::NetworkPolicyError::AddressDenied(ip.clone()),
None => super::NetworkPolicyError::MissingIp,
};
return Err(AdapterError::NetworkPolicyDenied(reason));
};
if let Some(ip) = client_ip {
match validate_ip_with_policy_rules(ip, &network_policy.rules) {
Ok(_) => {}
Err(e) => return Err(AdapterError::NetworkPolicyDenied(e)),
}
} else {
// Only temporary and internal representation of a session
// should be missing a client_ip. These sessions should not be
// making requests or going through handle_startup.
return Err(AdapterError::NetworkPolicyDenied(
super::NetworkPolicyError::MissingIp,
));
}
}
self.catalog_mut()
.create_temporary_schema(conn_id, role_id)?;
Ok((role_id, session_defaults))
}
/// Handles an execute command.
#[instrument(name = "coord::handle_execute", fields(session = session.uuid().to_string()))]
pub(crate) async fn handle_execute(
&mut self,
portal_name: String,
mut session: Session,
tx: ClientTransmitter<ExecuteResponse>,
// If this command was part of another execute command
// (for example, executing a `FETCH` statement causes an execute to be
// issued for the cursor it references),
// then `outer_context` should be `Some`.
// This instructs the coordinator that the
// outer execute should be considered finished once the inner one is.
outer_context: Option<ExecuteContextExtra>,
) {
if session.vars().emit_trace_id_notice() {
let span_context = tracing::Span::current()
.context()
.span()
.span_context()
.clone();
if span_context.is_valid() {
session.add_notice(AdapterNotice::QueryTrace {
trace_id: span_context.trace_id(),
});
}
}
if let Err(err) = self.verify_portal(&mut session, &portal_name) {
// If statement logging hasn't started yet, we don't need
// to add any "end" event, so just make up a no-op
// `ExecuteContextExtra` here, via `Default::default`.
//
// It's a bit unfortunate because the edge case of failed
// portal verifications won't show up in statement
// logging, but there seems to be nothing else we can do,
// because we need access to the portal to begin logging.
//
// Another option would be to log a begin and end event, but just fill in NULLs
// for everything we get from the portal (prepared statement id, params).
let extra = outer_context.unwrap_or_else(Default::default);
let ctx = ExecuteContext::from_parts(tx, self.internal_cmd_tx.clone(), session, extra);
return ctx.retire(Err(err));
}
// The reference to `portal` can't outlive `session`, which we
// use to construct the context, so scope the reference to this block where we
// get everything we need from the portal for later.
let (stmt, ctx, params) = {
let portal = session
.get_portal_unverified(&portal_name)
.expect("known to exist");
let params = portal.parameters.clone();
let stmt = portal.stmt.clone();
let logging = Arc::clone(&portal.logging);
let extra = if let Some(extra) = outer_context {
// We are executing in the context of another SQL statement, so we don't
// want to begin statement logging anew. The context of the actual statement
// being executed is the one that should be retired once this finishes.
extra
} else {
// This is a new statement, log it and return the context
let maybe_uuid = self.begin_statement_execution(&mut session, ¶ms, &logging);
ExecuteContextExtra::new(maybe_uuid)
};
let ctx = ExecuteContext::from_parts(tx, self.internal_cmd_tx.clone(), session, extra);
(stmt, ctx, params)
};
let stmt = match stmt {
Some(stmt) => stmt,
None => return ctx.retire(Ok(ExecuteResponse::EmptyQuery)),
};
let session_type = metrics::session_type_label_value(ctx.session().user());
let stmt_type = metrics::statement_type_label_value(&stmt);
self.metrics
.query_total
.with_label_values(&[session_type, stmt_type])
.inc();
match &*stmt {
Statement::Subscribe(SubscribeStatement { output, .. })
| Statement::Copy(CopyStatement {
relation: CopyRelation::Subscribe(SubscribeStatement { output, .. }),
..
}) => {
self.metrics
.subscribe_outputs
.with_label_values(&[
session_type,
metrics::subscribe_output_label_value(output),
])
.inc();
}
_ => {}
}
self.handle_execute_inner(stmt, params, ctx).await
}
#[instrument(name = "coord::handle_execute_inner", fields(stmt = stmt.to_ast_string_redacted()))]
pub(crate) async fn handle_execute_inner(
&mut self,
stmt: Arc<Statement<Raw>>,
params: Params,
mut ctx: ExecuteContext,
) {
// This comment describes the various ways DDL can execute (the ordered operations: name
// resolve, purify, plan, sequence), all of which are managed by this function. DDL has
// three notable properties that all partially interact.
//
// 1. Most DDL statements (and a few others) support single-statement transaction delayed
// execution. This occurs when a session executes `BEGIN`, a single DDL, then `COMMIT`.
// We announce success of the single DDL when it is executed, but do not attempt to plan
// or sequence it until `COMMIT`, which is able to error if needed while sequencing the
// DDL (this behavior is Postgres-compatible). The purpose of this is because some
// drivers or tools wrap all statements in `BEGIN` and `COMMIT` and we would like them to
// work. When the single DDL is announced as successful we also put the session's
// transaction ops into `SingleStatement` which will produce an error if any other
// statement is run in the transaction except `COMMIT`. Additionally, this will cause
// `handle_execute_inner` to stop further processing (no planning, etc.) of the
// statement.
// 2. A few other DDL statements (`ALTER .. RENAME/SWAP`) enter the `DDL` ops which allows
// any number of only these DDL statements to be executed in a transaction. At sequencing
// these generate the `Op::TransactionDryRun` catalog op. When applied with
// `catalog_transact`, that op will always produce the `TransactionDryRun` error. The
// `catalog_transact_with_ddl_transaction` function intercepts that error and reports
// success to the user, but nothing is yet committed to the real catalog. At `COMMIT` all
// of the ops but without dry run are applied. The purpose of this is to allow multiple,
// atomic renames in the same transaction.
// 3. Some DDLs do off-thread work during purification or sequencing that is expensive or
// makes network calls (interfacing with secrets, optimization of views/indexes, source
// purification). These must guarantee correctness when they return to the main
// coordinator thread because the catalog state could have changed while they were doing
// the off-thread work. Previously we would use `PlanValidity::Checks` to specify a bunch
// of IDs that we needed to exist. We discovered the way we were doing that was not
// always correct. Instead of attempting to get that completely right, we have opted to
// serialize DDL. Getting this right is difficult because catalog changes can affect name
// resolution, planning, sequencing, and optimization. Correctly writing logic that is
// aware of all possible catalog changes that would affect any of those parts is not
// something our current code has been designed to be helpful at. Even if a DDL statement
// is doing off-thread work, another DDL must not yet execute at all. Executing these
// serially will guarantee that no off-thread work has affected the state of the catalog.
// This is done by adding a VecDeque of deferred statements and a lock to the
// Coordinator. When a DDL is run in `handle_execute_inner` (after applying whatever
// transaction ops are needed to the session as described above), it attempts to own the
// lock (a tokio Mutex). If acquired, it stashes the lock in the connection`s `ConnMeta`
// struct in `active_conns` and proceeds. The lock is dropped at transaction end in
// `clear_transaction` and a message sent to the Coordinator to execute the next queued
// DDL. If the lock could not be acquired, the DDL is put into the VecDeque where it
// awaits dequeuing caused by the lock being released.
// Verify that this statement type can be executed in the current
// transaction state.
match ctx.session().transaction() {
// By this point we should be in a running transaction.
TransactionStatus::Default => unreachable!(),
// Failed transactions have already been checked in pgwire for a safe statement
// (COMMIT, ROLLBACK, etc.) and can proceed.
TransactionStatus::Failed(_) => {}
// Started is a deceptive name, and means different things depending on which
// protocol was used. It's either exactly one statement (known because this
// is the simple protocol and the parser parsed the entire string, and it had
// one statement). Or from the extended protocol, it means *some* query is
// being executed, but there might be others after it before the Sync (commit)
// message. Postgres handles this by teaching Started to eagerly commit certain
// statements that can't be run in a transaction block.
TransactionStatus::Started(_) => {
if let Statement::Declare(_) = &*stmt {
// Declare is an exception. Although it's not against any spec to execute
// it, it will always result in nothing happening, since all portals will be
// immediately closed. Users don't know this detail, so this error helps them
// understand what's going wrong. Postgres does this too.
return ctx.retire(Err(AdapterError::OperationRequiresTransaction(
"DECLARE CURSOR".into(),
)));
}
}
// Implicit or explicit transactions.
//
// Implicit transactions happen when a multi-statement query is executed
// (a "simple query"). However if a "BEGIN" appears somewhere in there,
// then the existing implicit transaction will be upgraded to an explicit
// transaction. Thus, we should not separate what implicit and explicit
// transactions can do unless there's some additional checking to make sure
// something disallowed in explicit transactions did not previously take place
// in the implicit portion.
TransactionStatus::InTransactionImplicit(_) | TransactionStatus::InTransaction(_) => {
match &*stmt {
// Statements that are safe in a transaction. We still need to verify that we
// don't interleave reads and writes since we can't perform those serializably.
Statement::Close(_)
| Statement::Commit(_)
| Statement::Copy(_)
| Statement::Deallocate(_)
| Statement::Declare(_)
| Statement::Discard(_)
| Statement::Execute(_)
| Statement::ExplainPlan(_)
| Statement::ExplainPushdown(_)
| Statement::ExplainTimestamp(_)
| Statement::ExplainSinkSchema(_)
| Statement::Fetch(_)
| Statement::Prepare(_)
| Statement::Rollback(_)
| Statement::Select(_)
| Statement::SetTransaction(_)
| Statement::Show(_)
| Statement::SetVariable(_)
| Statement::ResetVariable(_)
| Statement::StartTransaction(_)
| Statement::Subscribe(_)
| Statement::Raise(_) => {
// Always safe.
}
Statement::Insert(InsertStatement {
source, returning, ..
}) if returning.is_empty() && ConstantVisitor::insert_source(source) => {
// Inserting from constant values statements that do not need to execute on
// any cluster (no RETURNING) is always safe.
}
// These statements must be kept in-sync with `must_serialize_ddl()`.
Statement::AlterObjectRename(_) | Statement::AlterObjectSwap(_) => {
let state = self.catalog().for_session(ctx.session()).state().clone();
let revision = self.catalog().transient_revision();
// Initialize our transaction with a set of empty ops, or return an error
// if we can't run a DDL transaction
let txn_status = ctx.session_mut().transaction_mut();
if let Err(err) = txn_status.add_ops(TransactionOps::DDL {
ops: vec![],
state,
revision,
}) {
return ctx.retire(Err(err));
}
}
// Statements below must by run singly (in Started).
Statement::AlterCluster(_)
| Statement::AlterConnection(_)
| Statement::AlterDefaultPrivileges(_)
| Statement::AlterIndex(_)
| Statement::AlterSetCluster(_)
| Statement::AlterOwner(_)
| Statement::AlterRetainHistory(_)
| Statement::AlterRole(_)
| Statement::AlterSecret(_)
| Statement::AlterSink(_)
| Statement::AlterSource(_)
| Statement::AlterSystemReset(_)
| Statement::AlterSystemResetAll(_)
| Statement::AlterSystemSet(_)
| Statement::AlterTableAddColumn(_)
| Statement::AlterNetworkPolicy(_)
| Statement::CreateCluster(_)
| Statement::CreateClusterReplica(_)
| Statement::CreateConnection(_)
| Statement::CreateDatabase(_)
| Statement::CreateIndex(_)
| Statement::CreateMaterializedView(_)
| Statement::CreateContinualTask(_)
| Statement::CreateRole(_)
| Statement::CreateSchema(_)
| Statement::CreateSecret(_)
| Statement::CreateSink(_)
| Statement::CreateSource(_)
| Statement::CreateSubsource(_)
| Statement::CreateTable(_)
| Statement::CreateTableFromSource(_)
| Statement::CreateType(_)
| Statement::CreateView(_)
| Statement::CreateWebhookSource(_)
| Statement::CreateNetworkPolicy(_)
| Statement::Delete(_)
| Statement::DropObjects(_)
| Statement::DropOwned(_)
| Statement::GrantPrivileges(_)
| Statement::GrantRole(_)
| Statement::Insert(_)
| Statement::ReassignOwned(_)
| Statement::RevokePrivileges(_)
| Statement::RevokeRole(_)
| Statement::Update(_)
| Statement::ValidateConnection(_)
| Statement::Comment(_) => {
let txn_status = ctx.session_mut().transaction_mut();
// If we're not in an implicit transaction and we could generate exactly one
// valid ExecuteResponse, we can delay execution until commit.
if !txn_status.is_implicit() {
// Statements whose tag is trivial (known only from an unexecuted statement) can
// be run in a special single-statement explicit mode. In this mode (`BEGIN;
// <stmt>; COMMIT`), we generate the expected tag from a successful <stmt>, but
// delay execution until `COMMIT`.
if let Ok(resp) = ExecuteResponse::try_from(&*stmt) {
if let Err(err) = txn_status
.add_ops(TransactionOps::SingleStatement { stmt, params })
{
ctx.retire(Err(err));
return;
}
ctx.retire(Ok(resp));
return;
}
}
return ctx.retire(Err(AdapterError::OperationProhibitsTransaction(
stmt.to_string(),
)));
}
}
}
}
// DDLs must be planned and sequenced serially. We do not rely on PlanValidity checking
// various IDs because we have incorrectly done that in the past. Attempt to acquire the
// ddl lock. The lock is stashed in the ConnMeta which is dropped at transaction end. If
// acquired, proceed with sequencing. If not, enqueue and return. This logic assumes that
// Coordinator::clear_transaction is correctly called when session transactions are ended
// because that function will release the held lock from active_conns.
if Self::must_serialize_ddl(&stmt, &ctx) {
if let Ok(guard) = self.serialized_ddl.try_lock_owned() {
let prev = self
.active_conns
.get_mut(ctx.session().conn_id())
.expect("connection must exist")
.deferred_lock
.replace(guard);
assert!(
prev.is_none(),
"connections should have at most one lock guard"
);
} else {
if self
.active_conns
.get(ctx.session().conn_id())
.expect("connection must exist")
.deferred_lock
.is_some()
{
// This session *already* has the lock, and incorrectly tried to execute another
// DDL while still holding the lock, violating the assumption documented above.
// This is an internal error, probably in some AdapterClient user (pgwire or
// http). Because the session is now in some unexpected state, return an error
// which should cause the AdapterClient user to fail the transaction.
// (Terminating the connection is maybe what we would prefer to do, but is not
// currently a thing we can do from the coordinator: calling handle_terminate
// cleans up Coordinator state for the session but doesn't inform the
// AdapterClient that the session should terminate.)
soft_panic_or_log!(
"session {} attempted to get ddl lock while already owning it",
ctx.session().conn_id()
);
ctx.retire(Err(AdapterError::Internal(
"session attempted to get ddl lock while already owning it".to_string(),
)));
return;
}
self.serialized_ddl.push_back(DeferredPlanStatement {
ctx,
ps: PlanStatement::Statement { stmt, params },
});
return;
}
}
let catalog = self.catalog();
let catalog = catalog.for_session(ctx.session());
let original_stmt = Arc::clone(&stmt);
// `resolved_ids` should be derivable from `stmt`. If `stmt` is transformed to remove/add
// IDs, then `resolved_ids` should be updated to also remove/add those IDs.
let (stmt, mut resolved_ids) = match mz_sql::names::resolve(&catalog, (*stmt).clone()) {
Ok(resolved) => resolved,
Err(e) => return ctx.retire(Err(e.into())),
};
// N.B. The catalog can change during purification so we must validate that the dependencies still exist after
// purification. This should be done back on the main thread.
// We do the validation:
// - In the handler for `Message::PurifiedStatementReady`, before we handle the purified statement.
// If we add special handling for more types of `Statement`s, we'll need to ensure similar verification
// occurs.
let (stmt, resolved_ids) = match stmt {
// Various statements must be purified off the main coordinator thread of control.
stmt if Self::must_spawn_purification(&stmt) => {
let internal_cmd_tx = self.internal_cmd_tx.clone();
let conn_id = ctx.session().conn_id().clone();
let catalog = self.owned_catalog();
let now = self.now();
let otel_ctx = OpenTelemetryContext::obtain();
let current_storage_configuration = self.controller.storage.config().clone();
task::spawn(|| format!("purify:{conn_id}"), async move {
let transient_revision = catalog.transient_revision();
let catalog = catalog.for_session(ctx.session());
// Checks if the session is authorized to purify a statement. Usually
// authorization is checked after planning, however purification happens before
// planning, which may require the use of some connections and secrets.
if let Err(e) = rbac::check_usage(
&catalog,
ctx.session(),
&resolved_ids,
&CREATE_ITEM_USAGE,
) {
return ctx.retire(Err(e.into()));
}
let (result, cluster_id) = mz_sql::pure::purify_statement(
catalog,
now,
stmt,
¤t_storage_configuration,
)
.await;
let result = result.map_err(|e| e.into());
let dependency_ids = resolved_ids.items().copied().collect();
let plan_validity = PlanValidity::new(
transient_revision,
dependency_ids,
cluster_id,
None,
ctx.session().role_metadata().clone(),
);
// It is not an error for purification to complete after `internal_cmd_rx` is dropped.
let result = internal_cmd_tx.send(Message::PurifiedStatementReady(
PurifiedStatementReady {
ctx,
result,
params,
plan_validity,
original_stmt,
otel_ctx,
},
));
if let Err(e) = result {
tracing::warn!("internal_cmd_rx dropped before we could send: {:?}", e);
}
});
return;
}
// `CREATE SUBSOURCE` statements are disallowed for users and are only generated
// automatically as part of purification
Statement::CreateSubsource(_) => {
ctx.retire(Err(AdapterError::Unsupported(
"CREATE SUBSOURCE statements",
)));
return;
}
Statement::CreateMaterializedView(mut cmvs) => {
// `CREATE MATERIALIZED VIEW ... AS OF ...` syntax is disallowed for users and is
// only used for storing initial frontiers in the catalog.
if cmvs.as_of.is_some() {
return ctx.retire(Err(AdapterError::Unsupported(
"CREATE MATERIALIZED VIEW ... AS OF statements",
)));
}
let mz_now = match self
.resolve_mz_now_for_create_materialized_view(
&cmvs,
&resolved_ids,
ctx.session_mut(),
true,
)
.await
{
Ok(mz_now) => mz_now,
Err(e) => return ctx.retire(Err(e)),
};
let owned_catalog = self.owned_catalog();
let catalog = owned_catalog.for_session(ctx.session());
purify_create_materialized_view_options(
catalog,
mz_now,
&mut cmvs,
&mut resolved_ids,
);
let purified_stmt =
Statement::CreateMaterializedView(CreateMaterializedViewStatement::<Aug> {
if_exists: cmvs.if_exists,
name: cmvs.name,
columns: cmvs.columns,
in_cluster: cmvs.in_cluster,
query: cmvs.query,
with_options: cmvs.with_options,
as_of: None,
});
// (Purifying CreateMaterializedView doesn't happen async, so no need to send
// `Message::PurifiedStatementReady` here.)
(purified_stmt, resolved_ids)
}
Statement::ExplainPlan(ExplainPlanStatement {
stage,
with_options,
format,
explainee: Explainee::CreateMaterializedView(box_cmvs, broken),
}) => {
let mut cmvs = *box_cmvs;
let mz_now = match self
.resolve_mz_now_for_create_materialized_view(
&cmvs,
&resolved_ids,
ctx.session_mut(),
false,
)
.await
{
Ok(mz_now) => mz_now,
Err(e) => return ctx.retire(Err(e)),
};
let owned_catalog = self.owned_catalog();
let catalog = owned_catalog.for_session(ctx.session());
purify_create_materialized_view_options(
catalog,
mz_now,
&mut cmvs,
&mut resolved_ids,
);
let purified_stmt = Statement::ExplainPlan(ExplainPlanStatement {
stage,
with_options,
format,
explainee: Explainee::CreateMaterializedView(Box::new(cmvs), broken),
});
(purified_stmt, resolved_ids)
}
// All other statements are handled immediately.
_ => (stmt, resolved_ids),
};
match self.plan_statement(ctx.session(), stmt, ¶ms, &resolved_ids) {
Ok(plan) => self.sequence_plan(ctx, plan, resolved_ids).await,
Err(e) => ctx.retire(Err(e)),
}
}
/// Whether the statement must be serialized and is DDL.
fn must_serialize_ddl(stmt: &Statement<Raw>, ctx: &ExecuteContext) -> bool {
// Non-DDL is not serialized here.
if !StatementClassification::from(&*stmt).is_ddl() {
return false;
}
// Off-thread, pre-planning purification can perform arbitrarily slow network calls so must
// not be serialized. These all use PlanValidity for their checking, and we must ensure
// those checks are sufficient.
if Self::must_spawn_purification(stmt) {
return false;
}
// Statements that support multiple DDLs in a single transaction aren't serialized here.
// Their operations are serialized when applied to the catalog, guaranteeing that any
// off-thread DDLs concurrent with a multiple DDL transaction will have a serial order.
if ctx.session.transaction().is_ddl() {
return false;
}
// Some DDL is exempt. It is not great that we are matching on Statements here because
// different plans can be produced from the same top-level statement type (i.e., `ALTER
// CONNECTION ROTATE KEYS`). But the whole point of this is to prevent things from being
// planned in the first place, so we accept the abstraction leak.
match stmt {
// Secrets have a small and understood set of dependencies, and their off-thread work
// interacts with k8s.
Statement::AlterSecret(_) => false,
Statement::CreateSecret(_) => false,
Statement::AlterConnection(AlterConnectionStatement { actions, .. })
if actions
.iter()
.all(|action| matches!(action, AlterConnectionAction::RotateKeys)) =>
{
false
}
// The off-thread work that altering a cluster may do (waiting for replicas to spin-up),
// does not affect its catalog names or ids and so is safe to not serialize. This could
// change the set of replicas that exist. For queries that name replicas or use the
// current_replica session var, the `replica_id` field of `PlanValidity` serves to
// ensure that those replicas exist during the query finish stage. Additionally, that
// work can take hours (configured by the user), so would also be a bad experience for
// users.
Statement::AlterCluster(_) => false,
// Everything else must be serialized.
_ => true,
}
}
/// Whether the statement must be purified off of the Coordinator thread.
fn must_spawn_purification<A: AstInfo>(stmt: &Statement<A>) -> bool {
// `CREATE` and `ALTER` `SOURCE` and `SINK` statements must be purified off the main
// coordinator thread.
if !matches!(
stmt,
Statement::CreateSource(_)
| Statement::AlterSource(_)
| Statement::CreateSink(_)
| Statement::CreateTableFromSource(_)
) {
return false;
}
// However `ALTER SOURCE RETAIN HISTORY` should be excluded from off-thread purification.
if let Statement::AlterSource(stmt) = stmt {
let names: Vec<CreateSourceOptionName> = match &stmt.action {
AlterSourceAction::SetOptions(options) => {
options.iter().map(|o| o.name.clone()).collect()
}
AlterSourceAction::ResetOptions(names) => names.clone(),
_ => vec![],
};
if !names.is_empty()
&& names
.iter()
.all(|n| matches!(n, CreateSourceOptionName::RetainHistory))
{
return false;
}
}
true
}
/// Chooses a timestamp for `mz_now()`, if `mz_now()` occurs in a REFRESH option of the
/// materialized view. Additionally, if `acquire_read_holds` is true and the MV has any REFRESH
/// option, this function grabs read holds at the earliest possible time on input collections
/// that might be involved in the MV.
///
/// Note that this is NOT what handles `mz_now()` in the query part of the MV. (handles it only
/// in `with_options`).
///
/// (Note that the chosen timestamp won't be the same timestamp as the system table inserts,
/// unfortunately.)
async fn resolve_mz_now_for_create_materialized_view<'a>(
&mut self,
cmvs: &CreateMaterializedViewStatement<Aug>,
resolved_ids: &ResolvedIds,
session: &Session,
acquire_read_holds: bool,
) -> Result<Option<Timestamp>, AdapterError> {
if cmvs
.with_options
.iter()
.any(|wo| matches!(wo.value, Some(WithOptionValue::Refresh(..))))
{
let catalog = self.catalog().for_session(session);
let cluster = mz_sql::plan::resolve_cluster_for_materialized_view(&catalog, cmvs)?;
let ids = self
.index_oracle(cluster)
.sufficient_collections(resolved_ids.collections().copied());
// If there is any REFRESH option, then acquire read holds. (Strictly speaking, we'd
// need this only if there is a `REFRESH AT`, not for `REFRESH EVERY`, because later
// we want to check the AT times against the read holds that we acquire here. But
// we do it for any REFRESH option, to avoid having so many code paths doing different
// things.)
//
// It's important that we acquire read holds _before_ we determine the least valid read.
// Otherwise, we're not guaranteed that the since frontier doesn't
// advance forward from underneath us.
let read_holds = self.acquire_read_holds(&ids);
// Does `mz_now()` occur?
let mz_now_ts = if cmvs
.with_options
.iter()
.any(materialized_view_option_contains_temporal)
{
let timeline_context =
self.validate_timeline_context(resolved_ids.collections().copied())?;
// We default to EpochMilliseconds, similarly to `determine_timestamp_for`,
// but even in the TimestampIndependent case.
// Note that we didn't accurately decide whether we are TimestampDependent
// or TimestampIndependent, because for this we'd need to also check whether
// `query.contains_temporal()`, similarly to how `peek_stage_validate` does.
// However, this doesn't matter here, as we are just going to default to
// EpochMilliseconds in both cases.
let timeline = timeline_context
.timeline()
.unwrap_or(&Timeline::EpochMilliseconds);
// Let's start with the timestamp oracle read timestamp.
let mut timestamp = self.get_timestamp_oracle(timeline).read_ts().await;
// If `least_valid_read` is later than the oracle, then advance to that time.
// If we didn't do this, then there would be a danger of missing the first refresh,
// which might cause the materialized view to be unreadable for hours. This might
// be what was happening here:
// https://github.com/MaterializeInc/database-issues/issues/7265#issuecomment-1931856361
//
// In the long term, it would be good to actually block the MV creation statement
// until `least_valid_read`. https://github.com/MaterializeInc/database-issues/issues/7504
// Without blocking, we have the problem that a REFRESH AT CREATION is not linearized
// with the CREATE MATERIALIZED VIEW statement, in the sense that a query from the MV
// after its creation might see input changes that happened after the CRATE MATERIALIZED
// VIEW statement returned.
let oracle_timestamp = timestamp;
let least_valid_read = self.least_valid_read(&read_holds);
timestamp.advance_by(least_valid_read.borrow());
if oracle_timestamp != timestamp {
warn!(%cmvs.name, %oracle_timestamp, %timestamp, "REFRESH MV's inputs are not readable at the oracle read ts");
}
info!("Resolved `mz_now()` to {timestamp} for REFRESH MV");
Ok(Some(timestamp))
} else {
Ok(None)
};
// NOTE: The Drop impl of ReadHolds makes sure that the hold is
// released when we don't use it.
if acquire_read_holds {
self.store_transaction_read_holds(session, read_holds);
}
mz_now_ts
} else {
Ok(None)
}
}
/// Instruct the dataflow layer to cancel any ongoing, interactive work for
/// the named `conn_id` if the correct secret key is specified.
///
/// Note: Here we take a [`ConnectionIdType`] as opposed to an owned
/// `ConnectionId` because this method gets called by external clients when
/// they request to cancel a request.
#[mz_ore::instrument(level = "debug")]
async fn handle_cancel(&mut self, conn_id: ConnectionIdType, secret_key: u32) {
if let Some((id_handle, conn_meta)) = self.active_conns.get_key_value(&conn_id) {
// If the secret key specified by the client doesn't match the
// actual secret key for the target connection, we treat this as a
// rogue cancellation request and ignore it.
if conn_meta.secret_key != secret_key {
return;
}
// Now that we've verified the secret key, this is a privileged
// cancellation request. We can upgrade the raw connection ID to a
// proper `IdHandle`.
self.handle_privileged_cancel(id_handle.clone()).await;
}
}
/// Unconditionally instructs the dataflow layer to cancel any ongoing,
/// interactive work for the named `conn_id`.
#[mz_ore::instrument(level = "debug")]
pub(crate) async fn handle_privileged_cancel(&mut self, conn_id: ConnectionId) {
let mut maybe_ctx = None;
// Cancel pending writes. There is at most one pending write per session.
if let Some(idx) = self.pending_writes.iter().position(|pending_write_txn| {
matches!(pending_write_txn, PendingWriteTxn::User {
pending_txn: PendingTxn { ctx, .. },
..
} if *ctx.session().conn_id() == conn_id)
}) {
if let PendingWriteTxn::User {
pending_txn: PendingTxn { ctx, .. },
..
} = self.pending_writes.remove(idx)
{
maybe_ctx = Some(ctx);
}
}
// Cancel deferred writes.
if let Some(write_op) = self.deferred_write_ops.remove(&conn_id) {
maybe_ctx = Some(write_op.into_ctx());
}
// Cancel deferred statements.
if let Some(idx) = self
.serialized_ddl
.iter()
.position(|deferred| *deferred.ctx.session().conn_id() == conn_id)
{
let deferred = self
.serialized_ddl
.remove(idx)
.expect("known to exist from call to `position` above");
maybe_ctx = Some(deferred.ctx);
}
// Cancel reads waiting on being linearized. There is at most one linearized read per
// session.
if let Some(pending_read_txn) = self.pending_linearize_read_txns.remove(&conn_id) {
let ctx = pending_read_txn.take_context();
maybe_ctx = Some(ctx);
}
if let Some(ctx) = maybe_ctx {
ctx.retire(Err(AdapterError::Canceled));
}
self.cancel_pending_peeks(&conn_id);
self.cancel_pending_watchsets(&conn_id);
self.cancel_compute_sinks_for_conn(&conn_id).await;
self.cancel_cluster_reconfigurations_for_conn(&conn_id)
.await;
if let Some((tx, _rx)) = self.staged_cancellation.get_mut(&conn_id) {
let _ = tx.send(true);
}
}
/// Handle termination of a client session.
///
/// This cleans up any state in the coordinator associated with the session.
#[mz_ore::instrument(level = "debug")]
async fn handle_terminate(&mut self, conn_id: ConnectionId) {
if !self.active_conns.contains_key(&conn_id) {
// If the session doesn't exist in `active_conns`, then this method will panic later on.
// Instead we explicitly panic here while dumping the entire Coord to the logs to help
// debug. This panic is very infrequent so we want as much information as possible.
// See https://github.com/MaterializeInc/database-issues/issues/5627.
panic!("unknown connection: {conn_id:?}\n\n{self:?}")
}
// We do not need to call clear_transaction here because there are no side effects to run
// based on any session transaction state.
self.clear_connection(&conn_id).await;
self.drop_temp_items(&conn_id).await;
self.catalog_mut()
.drop_temporary_schema(&conn_id)
.unwrap_or_terminate("unable to drop temporary schema");
let conn = self.active_conns.remove(&conn_id).expect("conn must exist");
let session_type = metrics::session_type_label_value(conn.user());
self.metrics
.active_sessions
.with_label_values(&[session_type])
.dec();
self.cancel_pending_peeks(conn.conn_id());
self.cancel_pending_watchsets(&conn_id);
self.end_session_for_statement_logging(conn.uuid());
// Queue the builtin table update, but do not wait for it to complete. We explicitly do
// this to prevent blocking the Coordinator in the case that a lot of connections are
// closed at once, which occurs regularly in some workflows.
let update = self.catalog().state().pack_session_update(&conn, -1);
let update = self.catalog().state().resolve_builtin_table_update(update);
let _builtin_update_notify = self.builtin_table_update().defer(vec![update]);
}
/// Returns the necessary metadata for appending to a webhook source, and a channel to send
/// rows.
#[mz_ore::instrument(level = "debug")]
fn handle_get_webhook(
&mut self,
database: String,
schema: String,
name: String,
tx: oneshot::Sender<Result<AppendWebhookResponse, AppendWebhookError>>,
) {
/// Attempts to resolve a Webhook source from a provided `database.schema.name` path.
///
/// Returns a struct that can be used to append data to the underlying storate collection, and the
/// types we should cast the request to.
fn resolve(
coord: &mut Coordinator,
database: String,
schema: String,
name: String,
) -> Result<AppendWebhookResponse, PartialItemName> {
// Resolve our collection.
let name = PartialItemName {
database: Some(database),
schema: Some(schema),
item: name,
};
let Ok(entry) = coord
.catalog()
.resolve_entry(None, &vec![], &name, &SYSTEM_CONN_ID)
else {
return Err(name);
};
let (body_format, header_tys, validator, global_id) = match entry.item() {
CatalogItem::Source(Source {
data_source:
DataSourceDesc::Webhook {
validate_using,
body_format,
headers,
..
},
desc,
global_id,
..
}) => {
// Assert we have one column for the body, and how ever many are required for
// the headers.
let num_columns = headers.num_columns() + 1;
mz_ore::soft_assert_or_log!(
desc.arity() <= num_columns,
"expected at most {} columns, but got {}",
num_columns,
desc.arity()
);
// Double check that the body column of the webhook source matches the type
// we're about to deserialize as.
let body_column = desc
.get_by_name(&"body".into())
.map(|(_idx, ty)| ty.clone())
.ok_or(name.clone())?;
assert!(!body_column.nullable, "webhook body column is nullable!?");
assert_eq!(body_column.scalar_type, ScalarType::from(*body_format));
// Create a validator that can be called to validate a webhook request.
let validator = validate_using.as_ref().map(|v| {
let validation = v.clone();
AppendWebhookValidator::new(
validation,
coord.caching_secrets_reader.clone(),
)
});
(*body_format, headers.clone(), validator, *global_id)
}
_ => return Err(name),
};
// Get a channel so we can queue updates to be written.
let row_tx = coord
.controller
.storage
.monotonic_appender(global_id)
.map_err(|_| name.clone())?;
let stats = coord
.controller
.storage
.webhook_statistics(global_id)
.map_err(|_| name)?;
let invalidator = coord
.active_webhooks
.entry(entry.id())
.or_insert_with(WebhookAppenderInvalidator::new);
let tx = WebhookAppender::new(row_tx, invalidator.guard(), stats);
Ok(AppendWebhookResponse {
tx,
body_format,
header_tys,
validator,
})
}
let response = resolve(self, database, schema, name).map_err(|name| {
AppendWebhookError::UnknownWebhook {
database: name.database.expect("provided"),
schema: name.schema.expect("provided"),
name: name.item,
}
});
let _ = tx.send(response);
}
}