1use std::collections::BTreeMap;
13use std::sync::Arc;
14
15use mz_adapter::{AppendWebhookError, AppendWebhookResponse, WebhookAppenderCache};
16use mz_ore::cast::CastFrom;
17use mz_ore::retry::{Retry, RetryResult};
18use mz_ore::str::StrExt;
19use mz_repr::adt::jsonb::Jsonb;
20use mz_repr::{Datum, Diff, Row, RowPacker, SqlScalarType};
21use mz_sql::plan::{WebhookBodyFormat, WebhookHeaderFilters, WebhookHeaders};
22use mz_storage_types::controller::StorageError;
23
24use axum::extract::{Path, State};
25use axum::response::IntoResponse;
26use bytes::Bytes;
27use http::StatusCode;
28use thiserror::Error;
29
30use crate::http::WebhookState;
31
32pub async fn handle_webhook(
33 State(WebhookState {
34 adapter_client_rx,
35 webhook_cache,
36 }): State<WebhookState>,
37 Path((database, schema, name)): Path<(String, String, String)>,
38 headers: http::HeaderMap,
39 body: Bytes,
40) -> impl IntoResponse {
41 let adapter_client = adapter_client_rx.clone().await.expect("sender not dropped");
42 let mut headers_s = BTreeMap::new();
44 for (name, val) in headers.iter() {
45 if let Ok(val_s) = val.to_str().map(|s| s.to_string()) {
46 let existing = headers_s.insert(name.as_str().to_string(), val_s);
48 if existing.is_some() {
49 let msg = format!("{} provided more than once", name.as_str());
50 return Err(WebhookError::InvalidHeaders(msg));
51 }
52 }
53 }
54 let headers = Arc::new(headers_s);
55
56 Retry::default()
58 .max_tries(2)
59 .retry_async(|_| async {
60 let result = append_webhook(
61 &adapter_client,
62 &webhook_cache,
63 &database,
64 &schema,
65 &name,
66 &body,
67 &headers,
68 )
69 .await;
70
71 match result {
74 Ok(()) => RetryResult::Ok(()),
75 Err(e @ AppendWebhookError::ChannelClosed) => RetryResult::RetryableErr(e),
76 Err(e) => RetryResult::FatalErr(e),
77 }
78 })
79 .await?;
80
81 Ok::<_, WebhookError>(())
82}
83
84async fn append_webhook(
87 adapter_client: &mz_adapter::Client,
88 webhook_cache: &WebhookAppenderCache,
89 database: &str,
90 schema: &str,
91 name: &str,
92 body: &Bytes,
93 headers: &Arc<BTreeMap<String, String>>,
94) -> Result<(), AppendWebhookError> {
95 let (database, schema, name) = (database.to_string(), schema.to_string(), name.to_string());
97
98 let received_at = adapter_client.now();
100
101 let AppendWebhookResponse {
103 tx,
104 body_format,
105 header_tys,
106 validator,
107 } = async {
108 let mut guard = webhook_cache.entries.lock().await;
109
110 match guard.remove(&(database.clone(), schema.clone(), name.clone())) {
112 Some(appender) if !appender.tx.is_closed() => {
113 guard.insert((database, schema, name), appender.clone());
114 Ok::<_, AppendWebhookError>(appender)
115 }
116 _ => {
120 tracing::info!(?database, ?schema, ?name, "fetching webhook appender");
121 adapter_client.metrics().webhook_get_appender.inc();
122
123 let appender = adapter_client
125 .get_webhook_appender(database.clone(), schema.clone(), name.clone())
126 .await?;
127
128 guard.insert((database, schema, name), appender.clone());
129
130 Ok(appender)
131 }
132 }
133 }
134 .await?;
135
136 tx.increment_messages_received(1);
139 tx.increment_bytes_received(u64::cast_from(body.len()));
140
141 if let Some(validator) = validator {
143 let valid = validator
144 .eval(Bytes::clone(body), Arc::clone(headers), received_at)
145 .await?;
146 if !valid {
147 return Err(AppendWebhookError::ValidationFailed);
148 }
149 }
150
151 let rows = pack_rows(body, &body_format, headers, &header_tys)?;
153
154 tx.append(rows).await?;
156
157 Ok(())
158}
159
160fn pack_rows(
165 body: &[u8],
166 body_format: &WebhookBodyFormat,
167 headers: &BTreeMap<String, String>,
168 header_tys: &WebhookHeaders,
169) -> Result<Vec<(Row, Diff)>, AppendWebhookError> {
170 let rows = transform_body(body, body_format)?
173 .into_iter()
174 .map(|row| pack_header(row, headers, header_tys).map(|row| (row, Diff::ONE)))
175 .collect::<Result<_, _>>()?;
176 Ok(rows)
177}
178
179fn transform_body(
181 body: &[u8],
182 format: &WebhookBodyFormat,
183) -> Result<Vec<BodyRow>, AppendWebhookError> {
184 let rows = match format {
185 WebhookBodyFormat::Bytes => {
186 vec![Row::pack_slice(&[Datum::Bytes(body)])]
187 }
188 WebhookBodyFormat::Text => {
189 let s = std::str::from_utf8(body)
190 .map_err(|m| AppendWebhookError::InvalidUtf8Body { msg: m.to_string() })?;
191 vec![Row::pack_slice(&[Datum::String(s)])]
192 }
193 WebhookBodyFormat::Json { array } => {
194 let objects = serde_json::Deserializer::from_slice(body)
195 .into_iter::<serde_json::Value>()
198 .flat_map(|value| match value {
200 Ok(serde_json::Value::Array(inners)) if *array => {
201 itertools::Either::Left(inners.into_iter().map(Result::Ok))
202 }
203 value => itertools::Either::Right(std::iter::once(value)),
204 })
205 .collect::<Result<Vec<_>, _>>()
206 .map_err(|m| AppendWebhookError::InvalidJsonBody { msg: m.to_string() })?;
207
208 let rows = objects
211 .into_iter()
212 .map(|o| {
214 let row = Jsonb::from_serde_json(o)
215 .map_err(|m| AppendWebhookError::InvalidJsonBody { msg: m.to_string() })?
216 .into_row();
217 Ok::<_, AppendWebhookError>(row)
218 })
219 .collect::<Result<_, _>>()?;
220
221 rows
222 }
223 };
224
225 let body_rows = rows.into_iter().map(BodyRow).collect();
228
229 Ok(body_rows)
230}
231
232fn pack_header(
234 mut body_row: BodyRow,
235 headers: &BTreeMap<String, String>,
236 header_tys: &WebhookHeaders,
237) -> Result<Row, AppendWebhookError> {
238 let num_cols = 1 + header_tys.num_columns();
240 let mut num_cols_written = 1;
242
243 let mut packer = RowPacker::for_existing_row(body_row.inner_mut());
244
245 if let Some(filters) = &header_tys.header_column {
247 packer.push_dict(
248 filter_headers(headers, filters).map(|(name, val)| (name, Datum::String(val))),
249 );
250 num_cols_written += 1;
251 }
252
253 for idx in num_cols_written..num_cols {
255 let (header_name, use_bytes) = header_tys
256 .mapped_headers
257 .get(&idx)
258 .ok_or_else(|| anyhow::anyhow!("Invalid header column index {idx}"))?;
259 let header = headers.get(header_name);
260 let datum = match header {
261 Some(h) if *use_bytes => Datum::Bytes(h.as_bytes()),
262 Some(h) => Datum::String(h),
263 None => Datum::Null,
264 };
265 packer.push(datum);
266 }
267
268 Ok(body_row.into_inner())
269}
270
271fn filter_headers<'a: 'b, 'b>(
272 headers: &'a BTreeMap<String, String>,
273 filters: &'b WebhookHeaderFilters,
274) -> impl Iterator<Item = (&'a str, &'a str)> + 'b {
275 headers
276 .iter()
277 .filter(|(header_name, _val)| {
278 filters.block.is_empty() || !filters.block.contains(*header_name)
280 })
281 .filter(|(header_name, _val)| {
282 filters.allow.is_empty() || filters.allow.contains(*header_name)
284 })
285 .map(|(key, val)| (key.as_str(), val.as_str()))
286}
287
288#[repr(transparent)]
293struct BodyRow(Row);
294
295impl BodyRow {
296 fn inner_mut(&mut self) -> &mut Row {
298 &mut self.0
299 }
300
301 fn into_inner(self) -> Row {
303 self.0
304 }
305}
306
307#[derive(Error, Debug)]
318pub enum WebhookError {
319 #[error("no object was found at the path {}", .0.quoted())]
320 NotFound(String),
321 #[error("the required auth could not be found")]
322 SecretMissing,
323 #[error("headers of request were invalid: {0}")]
324 InvalidHeaders(String),
325 #[error("failed to deserialize body as {ty:?}: {msg}")]
326 InvalidBody { ty: SqlScalarType, msg: String },
327 #[error("failed to validate the request")]
328 ValidationFailed,
329 #[error("error occurred while running validation")]
330 ValidationError,
331 #[error("service unavailable")]
332 Unavailable,
333 #[error("internal storage failure! {0:?}")]
334 InternalStorageError(StorageError),
335 #[error("internal failure! {0:?}")]
336 Internal(#[from] anyhow::Error),
337}
338
339impl From<AppendWebhookError> for WebhookError {
340 fn from(err: AppendWebhookError) -> Self {
341 match err {
342 AppendWebhookError::MissingSecret => WebhookError::SecretMissing,
343 AppendWebhookError::ValidationError => WebhookError::ValidationError,
344 AppendWebhookError::InvalidUtf8Body { msg } => WebhookError::InvalidBody {
345 ty: SqlScalarType::String,
346 msg,
347 },
348 AppendWebhookError::InvalidJsonBody { msg } => WebhookError::InvalidBody {
349 ty: SqlScalarType::Jsonb,
350 msg,
351 },
352 AppendWebhookError::UnknownWebhook {
353 database,
354 schema,
355 name,
356 } => WebhookError::NotFound(format!("'{database}.{schema}.{name}'")),
357 AppendWebhookError::ValidationFailed => WebhookError::ValidationFailed,
358 AppendWebhookError::ChannelClosed => {
359 WebhookError::Internal(anyhow::anyhow!("channel closed"))
360 }
361 AppendWebhookError::StorageError(storage_err) => {
362 match storage_err {
363 StorageError::IdentifierMissing(id) | StorageError::IdentifierInvalid(id) => {
365 WebhookError::NotFound(id.to_string())
366 }
367 StorageError::ShuttingDown(_) => WebhookError::Unavailable,
368 e => WebhookError::InternalStorageError(e),
369 }
370 }
371 AppendWebhookError::InternalError(err) => WebhookError::Internal(err),
372 }
373 }
374}
375
376impl IntoResponse for WebhookError {
377 fn into_response(self) -> axum::response::Response {
378 match self {
379 e @ WebhookError::NotFound(_) | e @ WebhookError::SecretMissing => {
380 (StatusCode::NOT_FOUND, e.to_string()).into_response()
381 }
382 e @ WebhookError::InvalidBody { .. }
383 | e @ WebhookError::ValidationFailed
384 | e @ WebhookError::ValidationError => {
385 (StatusCode::BAD_REQUEST, e.to_string()).into_response()
386 }
387 e @ WebhookError::InvalidHeaders(_) => {
388 (StatusCode::UNAUTHORIZED, e.to_string()).into_response()
389 }
390 e @ WebhookError::Unavailable => {
391 (StatusCode::SERVICE_UNAVAILABLE, e.to_string()).into_response()
392 }
393 e @ WebhookError::InternalStorageError(_) => {
394 (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response()
395 }
396 WebhookError::Internal(e) => (
397 StatusCode::INTERNAL_SERVER_ERROR,
398 e.root_cause().to_string(),
399 )
400 .into_response(),
401 }
402 }
403}
404
405#[cfg(test)]
406mod tests {
407 use std::collections::{BTreeMap, BTreeSet};
408
409 use axum::response::IntoResponse;
410 use bytes::Bytes;
411 use http::StatusCode;
412 use mz_adapter::AppendWebhookError;
413 use mz_ore::assert_none;
414 use mz_repr::{GlobalId, Row};
415 use mz_sql::plan::{WebhookBodyFormat, WebhookHeaderFilters, WebhookHeaders};
416 use mz_storage_types::controller::StorageError;
417 use proptest::prelude::*;
418 use proptest::strategy::Union;
419
420 use super::{WebhookError, filter_headers, pack_rows};
421
422 fn arbitrary_json() -> impl Strategy<Value = serde_json::Value> {
424 let json_leaf = Union::new(vec![
425 any::<()>().prop_map(|_| serde_json::Value::Null).boxed(),
426 any::<bool>().prop_map(serde_json::Value::Bool).boxed(),
427 any::<i64>()
428 .prop_map(|x| serde_json::Value::Number(x.into()))
429 .boxed(),
430 any::<String>().prop_map(serde_json::Value::String).boxed(),
431 ]);
432
433 json_leaf.prop_recursive(4, 32, 8, |element| {
434 Union::new(vec![
435 prop::collection::vec(element.clone(), 0..16)
436 .prop_map(serde_json::Value::Array)
437 .boxed(),
438 prop::collection::hash_map(".*", element, 0..16)
439 .prop_map(|map| serde_json::Value::Object(map.into_iter().collect()))
440 .boxed(),
441 ])
442 })
443 }
444
445 #[track_caller]
446 fn check_rows(rows: &Vec<(Row, mz_repr::Diff)>, expected_rows: usize, expected_cols: usize) {
447 assert_eq!(rows.len(), expected_rows);
448 for (row, _diff) in rows {
449 assert_eq!(row.unpack().len(), expected_cols);
450 }
451 }
452
453 #[mz_ore::test]
454 fn smoke_test_storage_error_response_status() {
455 let resp = WebhookError::from(AppendWebhookError::StorageError(
457 StorageError::IdentifierMissing(GlobalId::User(42)),
458 ))
459 .into_response();
460 assert_eq!(resp.status(), StatusCode::NOT_FOUND);
461 }
462
463 #[mz_ore::test]
464 fn smoke_test_filter_headers() {
465 let block = BTreeSet::from(["foo".to_string()]);
466 let allow = BTreeSet::from(["bar".to_string()]);
467
468 let headers = BTreeMap::from([
469 ("foo".to_string(), "1".to_string()),
470 ("bar".to_string(), "2".to_string()),
471 ("baz".to_string(), "3".to_string()),
472 ]);
473 let mut filters = WebhookHeaderFilters::default();
474 filters.block.clone_from(&block);
475
476 let mut h = filter_headers(&headers, &filters);
477 assert_eq!(h.next().unwrap().0, "bar");
478 assert_eq!(h.next().unwrap().0, "baz");
479 assert_none!(h.next());
480
481 let mut filters = WebhookHeaderFilters::default();
482 filters.allow.clone_from(&allow);
483
484 let mut h = filter_headers(&headers, &filters);
485 assert_eq!(h.next().unwrap().0, "bar");
486 assert_none!(h.next());
487
488 let mut filters = WebhookHeaderFilters::default();
489 filters.allow = allow;
490 filters.block = block;
491
492 let mut h = filter_headers(&headers, &filters);
493 assert_eq!(h.next().unwrap().0, "bar");
494 assert_none!(h.next());
495 }
496
497 #[mz_ore::test]
498 fn filter_headers_block_overrides_allow() {
499 let block = BTreeSet::from(["foo".to_string()]);
500 let allow = block.clone();
501
502 let headers = BTreeMap::from([
503 ("foo".to_string(), "1".to_string()),
504 ("bar".to_string(), "2".to_string()),
505 ("baz".to_string(), "3".to_string()),
506 ]);
507 let filters = WebhookHeaderFilters { block, allow };
508
509 let mut h = filter_headers(&headers, &filters);
511 assert_none!(h.next());
512 }
513
514 #[mz_ore::test]
515 fn test_json_array_single() {
516 let single_raw = r#"
517 {
518 "event_type": "i am a single object",
519 "another_field": 42
520 }
521 "#;
522
523 let rows = pack_rows(
525 single_raw.as_bytes(),
526 &WebhookBodyFormat::Json { array: false },
527 &BTreeMap::default(),
528 &WebhookHeaders::default(),
529 )
530 .unwrap();
531 assert_eq!(rows.len(), 1);
532
533 let rows = pack_rows(
535 single_raw.as_bytes(),
536 &WebhookBodyFormat::Json { array: true },
537 &BTreeMap::default(),
538 &WebhookHeaders::default(),
539 )
540 .unwrap();
541 assert_eq!(rows.len(), 1);
542 }
543
544 #[mz_ore::test]
545 fn test_json_deserializer_multi() {
546 let multi_raw = r#"
547 [
548 { "event_type": "smol" },
549 { "event_type": "dog" }
550 ]
551 "#;
552
553 let rows = pack_rows(
554 multi_raw.as_bytes(),
555 &WebhookBodyFormat::Json { array: false },
556 &BTreeMap::default(),
557 &WebhookHeaders::default(),
558 )
559 .unwrap();
560 assert_eq!(rows.len(), 1);
562
563 let rows = pack_rows(
564 multi_raw.as_bytes(),
565 &WebhookBodyFormat::Json { array: true },
566 &BTreeMap::default(),
567 &WebhookHeaders::default(),
568 )
569 .unwrap();
570 assert_eq!(rows.len(), 2);
572 }
573
574 proptest! {
575 #[mz_ore::test]
576 fn proptest_pack_row_never_panics(
577 body: Vec<u8>,
578 body_ty: WebhookBodyFormat,
579 headers: BTreeMap<String, String>,
580 non_existent_headers: Vec<String>,
581 block: BTreeSet<String>,
582 allow: BTreeSet<String>,
583 ) {
584 let body = Bytes::from(body);
585
586 let filters = WebhookHeaderFilters { block, allow };
588 let mut use_bytes = false;
590 let mapped_headers = headers
591 .keys()
592 .take(headers.len() / 2)
593 .chain(non_existent_headers.iter())
594 .cloned()
595 .enumerate()
596 .map(|(idx, name)| {
597 use_bytes = !use_bytes;
598 (idx + 2, (name, use_bytes))
599 })
600 .collect();
601 let header_tys = WebhookHeaders {
602 header_column: Some(filters),
603 mapped_headers,
604 };
605
606 let _ = pack_rows(&body[..], &body_ty, &headers, &header_tys);
608 }
609
610 #[mz_ore::test]
611 fn proptest_pack_row_succeeds_for_bytes(
612 body: Vec<u8>,
613 headers: BTreeMap<String, String>,
614 include_headers: bool,
615 ) {
616 let body = Bytes::from(body);
617
618 let body_ty = WebhookBodyFormat::Bytes;
619 let mut header_tys = WebhookHeaders::default();
620 header_tys.header_column = include_headers.then(Default::default);
621
622 let rows = pack_rows(&body[..], &body_ty, &headers, &header_tys).unwrap();
623 check_rows(&rows, 1, header_tys.num_columns() + 1);
624 }
625
626 #[mz_ore::test]
627 fn proptest_pack_row_succeeds_for_strings(
628 body: String,
629 headers: BTreeMap<String, String>,
630 include_headers: bool,
631 ) {
632 let body = Bytes::from(body);
633
634 let body_ty = WebhookBodyFormat::Text;
635 let mut header_tys = WebhookHeaders::default();
636 header_tys.header_column = include_headers.then(Default::default);
637
638 let rows = pack_rows(&body[..], &body_ty, &headers, &header_tys).unwrap();
639 check_rows(&rows, 1, header_tys.num_columns() + 1);
640 }
641
642 #[mz_ore::test]
643 fn proptest_pack_row_succeeds_for_selective_headers(
644 body: String,
645 headers: BTreeMap<String, String>,
646 include_headers: bool,
647 non_existent_headers: Vec<String>,
648 block: BTreeSet<String>,
649 allow: BTreeSet<String>,
650 ) {
651 let body = Bytes::from(body);
652 let body_ty = WebhookBodyFormat::Text;
653
654 let filters = WebhookHeaderFilters { block, allow };
656 let mut use_bytes = false;
658 let column_offset = if include_headers { 2 } else { 1 };
659 let mapped_headers = headers
660 .keys()
661 .take(headers.len() / 2)
662 .chain(non_existent_headers.iter())
663 .cloned()
664 .enumerate()
665 .map(|(idx, name)| {
666 use_bytes = !use_bytes;
667 (idx + column_offset, (name, use_bytes))
668 })
669 .collect();
670 let header_tys = WebhookHeaders {
671 header_column: include_headers.then_some(filters),
672 mapped_headers,
673 };
674
675 let rows = pack_rows(&body[..], &body_ty, &headers, &header_tys).unwrap();
676 check_rows(&rows, 1, header_tys.num_columns() + 1);
677 }
678
679 #[mz_ore::test]
680 fn proptest_pack_json_with_array_expansion(
681 body in arbitrary_json(),
682 expand_array: bool,
683 headers: BTreeMap<String, String>,
684 include_headers: bool,
685 ) {
686 let json_raw = serde_json::to_vec(&body).unwrap();
687 let mut header_tys = WebhookHeaders::default();
688 header_tys.header_column = include_headers.then(Default::default);
689
690 let rows = pack_rows(
691 &json_raw[..],
692 &WebhookBodyFormat::Json { array: expand_array },
693 &headers,
694 &header_tys,
695 )
696 .unwrap();
697
698 let expected_num_rows = match body {
699 serde_json::Value::Array(inner) if expand_array => inner.len(),
700 _ => 1,
701 };
702 check_rows(&rows, expected_num_rows, header_tys.num_columns() + 1);
703 }
704 }
705}