Skip to main content

mz_environmentd/http/
webhook.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Helpers for handling events from a Webhook source.
11
12use std::collections::BTreeMap;
13use std::sync::Arc;
14
15use mz_adapter::{AppendWebhookError, AppendWebhookResponse, WebhookAppenderCache};
16use mz_ore::cast::CastFrom;
17use mz_ore::retry::{Retry, RetryResult};
18use mz_ore::str::StrExt;
19use mz_repr::adt::jsonb::Jsonb;
20use mz_repr::{Datum, Diff, Row, RowPacker, SqlScalarType};
21use mz_sql::plan::{WebhookBodyFormat, WebhookHeaderFilters, WebhookHeaders};
22use mz_storage_types::controller::StorageError;
23
24use axum::extract::{Path, State};
25use axum::response::IntoResponse;
26use bytes::Bytes;
27use http::StatusCode;
28use thiserror::Error;
29
30use crate::http::WebhookState;
31
32pub async fn handle_webhook(
33    State(WebhookState {
34        adapter_client_rx,
35        webhook_cache,
36    }): State<WebhookState>,
37    Path((database, schema, name)): Path<(String, String, String)>,
38    headers: http::HeaderMap,
39    body: Bytes,
40) -> impl IntoResponse {
41    let adapter_client = adapter_client_rx.clone().await.expect("sender not dropped");
42    // Collect headers into a map, while converting them into strings.
43    let mut headers_s = BTreeMap::new();
44    for (name, val) in headers.iter() {
45        if let Ok(val_s) = val.to_str().map(|s| s.to_string()) {
46            // If a header is included more than once, bail returning an error to the user.
47            let existing = headers_s.insert(name.as_str().to_string(), val_s);
48            if existing.is_some() {
49                let msg = format!("{} provided more than once", name.as_str());
50                return Err(WebhookError::InvalidHeaders(msg));
51            }
52        }
53    }
54    let headers = Arc::new(headers_s);
55
56    // Append to the webhook source, retrying if we race with a concurrent `ALTER SOURCE` op.
57    Retry::default()
58        .max_tries(2)
59        .retry_async(|_| async {
60            let result = append_webhook(
61                &adapter_client,
62                &webhook_cache,
63                &database,
64                &schema,
65                &name,
66                &body,
67                &headers,
68            )
69            .await;
70
71            // Note: think carefully before adding more errors here, we need to make sure we don't
72            // append data more than once.
73            match result {
74                Ok(()) => RetryResult::Ok(()),
75                Err(e @ AppendWebhookError::ChannelClosed) => RetryResult::RetryableErr(e),
76                Err(e) => RetryResult::FatalErr(e),
77            }
78        })
79        .await?;
80
81    Ok::<_, WebhookError>(())
82}
83
84/// Append the provided `body` and `headers` to the webhook source identified via `database`,
85/// `schema`, and `name`.
86async fn append_webhook(
87    adapter_client: &mz_adapter::Client,
88    webhook_cache: &WebhookAppenderCache,
89    database: &str,
90    schema: &str,
91    name: &str,
92    body: &Bytes,
93    headers: &Arc<BTreeMap<String, String>>,
94) -> Result<(), AppendWebhookError> {
95    // Shenanigans to get the types working for the async retry.
96    let (database, schema, name) = (database.to_string(), schema.to_string(), name.to_string());
97
98    // Record the time we receive the request, for use if validation checks the current timestamp.
99    let received_at = adapter_client.now();
100
101    // Get an appender for the provided object, if that object exists.
102    let AppendWebhookResponse {
103        tx,
104        body_format,
105        header_tys,
106        validator,
107    } = async {
108        let mut guard = webhook_cache.entries.lock().await;
109
110        // Remove the appender from our map, only re-insert it, if it's valid.
111        match guard.remove(&(database.clone(), schema.clone(), name.clone())) {
112            Some(appender) if !appender.tx.is_closed() => {
113                guard.insert((database, schema, name), appender.clone());
114                Ok::<_, AppendWebhookError>(appender)
115            }
116            // We don't have a valid appender, so we need to get one.
117            //
118            // Note: we hold the lock while we acquire and appender to prevent a dogpile.
119            _ => {
120                tracing::info!(?database, ?schema, ?name, "fetching webhook appender");
121                adapter_client.metrics().webhook_get_appender.inc();
122
123                // Acquire and cache a new appender.
124                let appender = adapter_client
125                    .get_webhook_appender(database.clone(), schema.clone(), name.clone())
126                    .await?;
127
128                guard.insert((database, schema, name), appender.clone());
129
130                Ok(appender)
131            }
132        }
133    }
134    .await?;
135
136    // These must happen before validation as we do not know if validation or
137    // packing will succeed and appending will begin
138    tx.increment_messages_received(1);
139    tx.increment_bytes_received(u64::cast_from(body.len()));
140
141    // If this source requires validation, then validate!
142    if let Some(validator) = validator {
143        let valid = validator
144            .eval(Bytes::clone(body), Arc::clone(headers), received_at)
145            .await?;
146        if !valid {
147            return Err(AppendWebhookError::ValidationFailed);
148        }
149    }
150
151    // Pack our body and headers into a Row.
152    let rows = pack_rows(body, &body_format, headers, &header_tys)?;
153
154    // Send the row to get appended.
155    tx.append(rows).await?;
156
157    Ok(())
158}
159
160/// Packs the body and headers of a webhook request into as many rows as necessary.
161///
162/// TODO(parkmycar): Should we be consolidating the returned Rows here? Presumably something in
163/// storage would already be doing it, so no need to do it twice?
164fn pack_rows(
165    body: &[u8],
166    body_format: &WebhookBodyFormat,
167    headers: &BTreeMap<String, String>,
168    header_tys: &WebhookHeaders,
169) -> Result<Vec<(Row, Diff)>, AppendWebhookError> {
170    // This method isn't that "deep" but it reflects the way we intend for the packing process to
171    // work and makes testing easier.
172    let rows = transform_body(body, body_format)?
173        .into_iter()
174        .map(|row| pack_header(row, headers, header_tys).map(|row| (row, Diff::ONE)))
175        .collect::<Result<_, _>>()?;
176    Ok(rows)
177}
178
179/// Transforms the body of a webhook request into a `Vec<BodyRow>`.
180fn transform_body(
181    body: &[u8],
182    format: &WebhookBodyFormat,
183) -> Result<Vec<BodyRow>, AppendWebhookError> {
184    let rows = match format {
185        WebhookBodyFormat::Bytes => {
186            vec![Row::pack_slice(&[Datum::Bytes(body)])]
187        }
188        WebhookBodyFormat::Text => {
189            let s = std::str::from_utf8(body)
190                .map_err(|m| AppendWebhookError::InvalidUtf8Body { msg: m.to_string() })?;
191            vec![Row::pack_slice(&[Datum::String(s)])]
192        }
193        WebhookBodyFormat::Json { array } => {
194            let objects = serde_json::Deserializer::from_slice(body)
195                // Automatically expand multiple JSON objects delimited by whitespace, e.g.
196                // newlines, into a single batch.
197                .into_iter::<serde_json::Value>()
198                // Optionally expand a JSON array into separate rows, if requested.
199                .flat_map(|value| match value {
200                    Ok(serde_json::Value::Array(inners)) if *array => {
201                        itertools::Either::Left(inners.into_iter().map(Result::Ok))
202                    }
203                    value => itertools::Either::Right(std::iter::once(value)),
204                })
205                .collect::<Result<Vec<_>, _>>()
206                .map_err(|m| AppendWebhookError::InvalidJsonBody { msg: m.to_string() })?;
207
208            // Note: `into_iter()` should be re-using the underlying allocation of the `objects`
209            // vector, and it's more readable to split these into separate iterators.
210            let rows = objects
211                .into_iter()
212                // Map a JSON object into a Row.
213                .map(|o| {
214                    let row = Jsonb::from_serde_json(o)
215                        .map_err(|m| AppendWebhookError::InvalidJsonBody { msg: m.to_string() })?
216                        .into_row();
217                    Ok::<_, AppendWebhookError>(row)
218                })
219                .collect::<Result<_, _>>()?;
220
221            rows
222        }
223    };
224
225    // A `Row` cannot describe its schema without unpacking it. To add some safety we wrap the
226    // returned `Row`s in a newtype to signify they already have the "body" column packed.
227    let body_rows = rows.into_iter().map(BodyRow).collect();
228
229    Ok(body_rows)
230}
231
232/// Pack the headers of a request into a [`Row`].
233fn pack_header(
234    mut body_row: BodyRow,
235    headers: &BTreeMap<String, String>,
236    header_tys: &WebhookHeaders,
237) -> Result<Row, AppendWebhookError> {
238    // 1 column for the body plus however many are needed for the headers.
239    let num_cols = 1 + header_tys.num_columns();
240    // The provided Row already has the Body written.
241    let mut num_cols_written = 1;
242
243    let mut packer = RowPacker::for_existing_row(body_row.inner_mut());
244
245    // Pack the headers into our row, if required.
246    if let Some(filters) = &header_tys.header_column {
247        packer.push_dict(
248            filter_headers(headers, filters).map(|(name, val)| (name, Datum::String(val))),
249        );
250        num_cols_written += 1;
251    }
252
253    // Pack the mapped headers.
254    for idx in num_cols_written..num_cols {
255        let (header_name, use_bytes) = header_tys
256            .mapped_headers
257            .get(&idx)
258            .ok_or_else(|| anyhow::anyhow!("Invalid header column index {idx}"))?;
259        let header = headers.get(header_name);
260        let datum = match header {
261            Some(h) if *use_bytes => Datum::Bytes(h.as_bytes()),
262            Some(h) => Datum::String(h),
263            None => Datum::Null,
264        };
265        packer.push(datum);
266    }
267
268    Ok(body_row.into_inner())
269}
270
271fn filter_headers<'a: 'b, 'b>(
272    headers: &'a BTreeMap<String, String>,
273    filters: &'b WebhookHeaderFilters,
274) -> impl Iterator<Item = (&'a str, &'a str)> + 'b {
275    headers
276        .iter()
277        .filter(|(header_name, _val)| {
278            // If our block list is empty, then don't filter anything.
279            filters.block.is_empty() || !filters.block.contains(*header_name)
280        })
281        .filter(|(header_name, _val)| {
282            // If our allow list is empty, then don't filter anything.
283            filters.allow.is_empty() || filters.allow.contains(*header_name)
284        })
285        .map(|(key, val)| (key.as_str(), val.as_str()))
286}
287
288/// A [`Row`] that has the body of a request already packed into it.
289///
290/// Note: if you're constructing a [`BodyRow`] you need to guarantee the only column packed into
291/// the [`Row`] is a single "body" column.
292#[repr(transparent)]
293struct BodyRow(Row);
294
295impl BodyRow {
296    /// Obtain a mutable reference to the inner [`Row`].
297    fn inner_mut(&mut self) -> &mut Row {
298        &mut self.0
299    }
300
301    /// Return the inner [`Row`].
302    fn into_inner(self) -> Row {
303        self.0
304    }
305}
306
307/// Errors we can encounter when appending data to a Webhook Source.
308///
309/// Webhook sources are a bit special since they are handled by `environmentd` (all other sources
310/// are handled by `clusterd`) and data is "pushed" to them (all other source pull data). The
311/// errors also generally need to map to HTTP status codes that we can use to respond to a webhook
312/// request. As such, webhook errors don't cleanly map to any existing error type, hence the
313/// existence of this error type.
314///
315/// Note: 429 Too Many Requests is handled at the HTTP layer via Tower's
316/// `GlobalConcurrencyLimitLayer`, not through these error variants.
317#[derive(Error, Debug)]
318pub enum WebhookError {
319    #[error("no object was found at the path {}", .0.quoted())]
320    NotFound(String),
321    #[error("the required auth could not be found")]
322    SecretMissing,
323    #[error("headers of request were invalid: {0}")]
324    InvalidHeaders(String),
325    #[error("failed to deserialize body as {ty:?}: {msg}")]
326    InvalidBody { ty: SqlScalarType, msg: String },
327    #[error("failed to validate the request")]
328    ValidationFailed,
329    #[error("error occurred while running validation")]
330    ValidationError,
331    #[error("service unavailable")]
332    Unavailable,
333    #[error("internal storage failure! {0:?}")]
334    InternalStorageError(StorageError),
335    #[error("internal failure! {0:?}")]
336    Internal(#[from] anyhow::Error),
337}
338
339impl From<AppendWebhookError> for WebhookError {
340    fn from(err: AppendWebhookError) -> Self {
341        match err {
342            AppendWebhookError::MissingSecret => WebhookError::SecretMissing,
343            AppendWebhookError::ValidationError => WebhookError::ValidationError,
344            AppendWebhookError::InvalidUtf8Body { msg } => WebhookError::InvalidBody {
345                ty: SqlScalarType::String,
346                msg,
347            },
348            AppendWebhookError::InvalidJsonBody { msg } => WebhookError::InvalidBody {
349                ty: SqlScalarType::Jsonb,
350                msg,
351            },
352            AppendWebhookError::UnknownWebhook {
353                database,
354                schema,
355                name,
356            } => WebhookError::NotFound(format!("'{database}.{schema}.{name}'")),
357            AppendWebhookError::ValidationFailed => WebhookError::ValidationFailed,
358            AppendWebhookError::ChannelClosed => {
359                WebhookError::Internal(anyhow::anyhow!("channel closed"))
360            }
361            AppendWebhookError::StorageError(storage_err) => {
362                match storage_err {
363                    // TODO(parkmycar): Maybe map this to a HTTP 410 Gone instead of 404?
364                    StorageError::IdentifierMissing(id) | StorageError::IdentifierInvalid(id) => {
365                        WebhookError::NotFound(id.to_string())
366                    }
367                    StorageError::ShuttingDown(_) => WebhookError::Unavailable,
368                    e => WebhookError::InternalStorageError(e),
369                }
370            }
371            AppendWebhookError::InternalError(err) => WebhookError::Internal(err),
372        }
373    }
374}
375
376impl IntoResponse for WebhookError {
377    fn into_response(self) -> axum::response::Response {
378        match self {
379            e @ WebhookError::NotFound(_) | e @ WebhookError::SecretMissing => {
380                (StatusCode::NOT_FOUND, e.to_string()).into_response()
381            }
382            e @ WebhookError::InvalidBody { .. }
383            | e @ WebhookError::ValidationFailed
384            | e @ WebhookError::ValidationError => {
385                (StatusCode::BAD_REQUEST, e.to_string()).into_response()
386            }
387            e @ WebhookError::InvalidHeaders(_) => {
388                (StatusCode::UNAUTHORIZED, e.to_string()).into_response()
389            }
390            e @ WebhookError::Unavailable => {
391                (StatusCode::SERVICE_UNAVAILABLE, e.to_string()).into_response()
392            }
393            e @ WebhookError::InternalStorageError(_) => {
394                (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response()
395            }
396            WebhookError::Internal(e) => (
397                StatusCode::INTERNAL_SERVER_ERROR,
398                e.root_cause().to_string(),
399            )
400                .into_response(),
401        }
402    }
403}
404
405#[cfg(test)]
406mod tests {
407    use std::collections::{BTreeMap, BTreeSet};
408
409    use axum::response::IntoResponse;
410    use bytes::Bytes;
411    use http::StatusCode;
412    use mz_adapter::AppendWebhookError;
413    use mz_ore::assert_none;
414    use mz_repr::{GlobalId, Row};
415    use mz_sql::plan::{WebhookBodyFormat, WebhookHeaderFilters, WebhookHeaders};
416    use mz_storage_types::controller::StorageError;
417    use proptest::prelude::*;
418    use proptest::strategy::Union;
419
420    use super::{WebhookError, filter_headers, pack_rows};
421
422    // TODO(parkmycar): Move this strategy to `ore`?
423    fn arbitrary_json() -> impl Strategy<Value = serde_json::Value> {
424        let json_leaf = Union::new(vec![
425            any::<()>().prop_map(|_| serde_json::Value::Null).boxed(),
426            any::<bool>().prop_map(serde_json::Value::Bool).boxed(),
427            any::<i64>()
428                .prop_map(|x| serde_json::Value::Number(x.into()))
429                .boxed(),
430            any::<String>().prop_map(serde_json::Value::String).boxed(),
431        ]);
432
433        json_leaf.prop_recursive(4, 32, 8, |element| {
434            Union::new(vec![
435                prop::collection::vec(element.clone(), 0..16)
436                    .prop_map(serde_json::Value::Array)
437                    .boxed(),
438                prop::collection::hash_map(".*", element, 0..16)
439                    .prop_map(|map| serde_json::Value::Object(map.into_iter().collect()))
440                    .boxed(),
441            ])
442        })
443    }
444
445    #[track_caller]
446    fn check_rows(rows: &Vec<(Row, mz_repr::Diff)>, expected_rows: usize, expected_cols: usize) {
447        assert_eq!(rows.len(), expected_rows);
448        for (row, _diff) in rows {
449            assert_eq!(row.unpack().len(), expected_cols);
450        }
451    }
452
453    #[mz_ore::test]
454    fn smoke_test_storage_error_response_status() {
455        // IdentifierMissing should get mapped to a specific status code.
456        let resp = WebhookError::from(AppendWebhookError::StorageError(
457            StorageError::IdentifierMissing(GlobalId::User(42)),
458        ))
459        .into_response();
460        assert_eq!(resp.status(), StatusCode::NOT_FOUND);
461    }
462
463    #[mz_ore::test]
464    fn smoke_test_filter_headers() {
465        let block = BTreeSet::from(["foo".to_string()]);
466        let allow = BTreeSet::from(["bar".to_string()]);
467
468        let headers = BTreeMap::from([
469            ("foo".to_string(), "1".to_string()),
470            ("bar".to_string(), "2".to_string()),
471            ("baz".to_string(), "3".to_string()),
472        ]);
473        let mut filters = WebhookHeaderFilters::default();
474        filters.block.clone_from(&block);
475
476        let mut h = filter_headers(&headers, &filters);
477        assert_eq!(h.next().unwrap().0, "bar");
478        assert_eq!(h.next().unwrap().0, "baz");
479        assert_none!(h.next());
480
481        let mut filters = WebhookHeaderFilters::default();
482        filters.allow.clone_from(&allow);
483
484        let mut h = filter_headers(&headers, &filters);
485        assert_eq!(h.next().unwrap().0, "bar");
486        assert_none!(h.next());
487
488        let mut filters = WebhookHeaderFilters::default();
489        filters.allow = allow;
490        filters.block = block;
491
492        let mut h = filter_headers(&headers, &filters);
493        assert_eq!(h.next().unwrap().0, "bar");
494        assert_none!(h.next());
495    }
496
497    #[mz_ore::test]
498    fn filter_headers_block_overrides_allow() {
499        let block = BTreeSet::from(["foo".to_string()]);
500        let allow = block.clone();
501
502        let headers = BTreeMap::from([
503            ("foo".to_string(), "1".to_string()),
504            ("bar".to_string(), "2".to_string()),
505            ("baz".to_string(), "3".to_string()),
506        ]);
507        let filters = WebhookHeaderFilters { block, allow };
508
509        // We should yield nothing since we block the only thing we allow.
510        let mut h = filter_headers(&headers, &filters);
511        assert_none!(h.next());
512    }
513
514    #[mz_ore::test]
515    fn test_json_array_single() {
516        let single_raw = r#"
517        {
518            "event_type": "i am a single object",
519            "another_field": 42
520        }
521        "#;
522
523        // We should get a single Row regardless of whether or not we're requested to expand.
524        let rows = pack_rows(
525            single_raw.as_bytes(),
526            &WebhookBodyFormat::Json { array: false },
527            &BTreeMap::default(),
528            &WebhookHeaders::default(),
529        )
530        .unwrap();
531        assert_eq!(rows.len(), 1);
532
533        // We should get a single Row regardless of whether or not we're requested to expand.
534        let rows = pack_rows(
535            single_raw.as_bytes(),
536            &WebhookBodyFormat::Json { array: true },
537            &BTreeMap::default(),
538            &WebhookHeaders::default(),
539        )
540        .unwrap();
541        assert_eq!(rows.len(), 1);
542    }
543
544    #[mz_ore::test]
545    fn test_json_deserializer_multi() {
546        let multi_raw = r#"
547            [
548                { "event_type": "smol" },
549                { "event_type": "dog" }
550            ]
551        "#;
552
553        let rows = pack_rows(
554            multi_raw.as_bytes(),
555            &WebhookBodyFormat::Json { array: false },
556            &BTreeMap::default(),
557            &WebhookHeaders::default(),
558        )
559        .unwrap();
560        // If we don't expand the body, we should have a single row.
561        assert_eq!(rows.len(), 1);
562
563        let rows = pack_rows(
564            multi_raw.as_bytes(),
565            &WebhookBodyFormat::Json { array: true },
566            &BTreeMap::default(),
567            &WebhookHeaders::default(),
568        )
569        .unwrap();
570        // If we _do_ expand the body, we should have a two rows.
571        assert_eq!(rows.len(), 2);
572    }
573
574    proptest! {
575        #[mz_ore::test]
576        fn proptest_pack_row_never_panics(
577            body: Vec<u8>,
578            body_ty: WebhookBodyFormat,
579            headers: BTreeMap<String, String>,
580            non_existent_headers: Vec<String>,
581            block: BTreeSet<String>,
582            allow: BTreeSet<String>,
583        ) {
584            let body = Bytes::from(body);
585
586            // Include the headers column with a random set of block and allow.
587            let filters = WebhookHeaderFilters { block, allow };
588            // Include half of the existing headers, append on some non-existing ones too.
589            let mut use_bytes = false;
590            let mapped_headers = headers
591                .keys()
592                .take(headers.len() / 2)
593                .chain(non_existent_headers.iter())
594                .cloned()
595                .enumerate()
596                .map(|(idx, name)| {
597                    use_bytes = !use_bytes;
598                    (idx + 2, (name, use_bytes))
599                })
600                .collect();
601            let header_tys = WebhookHeaders {
602                header_column: Some(filters),
603                mapped_headers,
604            };
605
606            // Call this method to make sure it doesn't panic.
607            let _ = pack_rows(&body[..], &body_ty, &headers, &header_tys);
608        }
609
610        #[mz_ore::test]
611        fn proptest_pack_row_succeeds_for_bytes(
612            body: Vec<u8>,
613            headers: BTreeMap<String, String>,
614            include_headers: bool,
615        ) {
616            let body = Bytes::from(body);
617
618            let body_ty = WebhookBodyFormat::Bytes;
619            let mut header_tys = WebhookHeaders::default();
620            header_tys.header_column = include_headers.then(Default::default);
621
622            let rows = pack_rows(&body[..], &body_ty, &headers, &header_tys).unwrap();
623            check_rows(&rows, 1, header_tys.num_columns() + 1);
624        }
625
626        #[mz_ore::test]
627        fn proptest_pack_row_succeeds_for_strings(
628            body: String,
629            headers: BTreeMap<String, String>,
630            include_headers: bool,
631        ) {
632            let body = Bytes::from(body);
633
634            let body_ty = WebhookBodyFormat::Text;
635            let mut header_tys = WebhookHeaders::default();
636            header_tys.header_column = include_headers.then(Default::default);
637
638            let rows = pack_rows(&body[..], &body_ty, &headers, &header_tys).unwrap();
639            check_rows(&rows, 1, header_tys.num_columns() + 1);
640        }
641
642        #[mz_ore::test]
643        fn proptest_pack_row_succeeds_for_selective_headers(
644            body: String,
645            headers: BTreeMap<String, String>,
646            include_headers: bool,
647            non_existent_headers: Vec<String>,
648            block: BTreeSet<String>,
649            allow: BTreeSet<String>,
650        ) {
651            let body = Bytes::from(body);
652            let body_ty = WebhookBodyFormat::Text;
653
654            // Include the headers column with a random set of block and allow.
655            let filters = WebhookHeaderFilters { block, allow };
656            // Include half of the existing headers, append on some non-existing ones too.
657            let mut use_bytes = false;
658            let column_offset = if include_headers { 2 } else { 1 };
659            let mapped_headers = headers
660                .keys()
661                .take(headers.len() / 2)
662                .chain(non_existent_headers.iter())
663                .cloned()
664                .enumerate()
665                .map(|(idx, name)| {
666                    use_bytes = !use_bytes;
667                    (idx + column_offset, (name, use_bytes))
668                })
669                .collect();
670            let header_tys = WebhookHeaders {
671                header_column: include_headers.then_some(filters),
672                mapped_headers,
673            };
674
675            let rows = pack_rows(&body[..], &body_ty, &headers, &header_tys).unwrap();
676            check_rows(&rows, 1, header_tys.num_columns() + 1);
677        }
678
679        #[mz_ore::test]
680        fn proptest_pack_json_with_array_expansion(
681            body in arbitrary_json(),
682            expand_array: bool,
683            headers: BTreeMap<String, String>,
684            include_headers: bool,
685        ) {
686            let json_raw = serde_json::to_vec(&body).unwrap();
687            let mut header_tys = WebhookHeaders::default();
688            header_tys.header_column = include_headers.then(Default::default);
689
690            let rows = pack_rows(
691                &json_raw[..],
692                &WebhookBodyFormat::Json { array: expand_array },
693                &headers,
694                &header_tys,
695            )
696            .unwrap();
697
698            let expected_num_rows = match body {
699                serde_json::Value::Array(inner) if expand_array => inner.len(),
700                _ => 1,
701            };
702            check_rows(&rows, expected_num_rows, header_tys.num_columns() + 1);
703        }
704    }
705}