mz_environmentd/http/
webhook.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Helpers for handling events from a Webhook source.
11
12use std::collections::BTreeMap;
13use std::sync::Arc;
14
15use mz_adapter::{AppendWebhookError, AppendWebhookResponse, WebhookAppenderCache};
16use mz_ore::cast::CastFrom;
17use mz_ore::retry::{Retry, RetryResult};
18use mz_ore::str::StrExt;
19use mz_repr::adt::jsonb::Jsonb;
20use mz_repr::{Datum, Diff, Row, RowPacker, ScalarType, Timestamp};
21use mz_sql::plan::{WebhookBodyFormat, WebhookHeaderFilters, WebhookHeaders};
22use mz_storage_types::controller::StorageError;
23
24use axum::extract::{Path, State};
25use axum::response::IntoResponse;
26use bytes::Bytes;
27use http::StatusCode;
28use thiserror::Error;
29
30use crate::http::WebhookState;
31
32pub async fn handle_webhook(
33    State(WebhookState {
34        adapter_client,
35        webhook_cache,
36    }): State<WebhookState>,
37    Path((database, schema, name)): Path<(String, String, String)>,
38    headers: http::HeaderMap,
39    body: Bytes,
40) -> impl IntoResponse {
41    // Collect headers into a map, while converting them into strings.
42    let mut headers_s = BTreeMap::new();
43    for (name, val) in headers.iter() {
44        if let Ok(val_s) = val.to_str().map(|s| s.to_string()) {
45            // If a header is included more than once, bail returning an error to the user.
46            let existing = headers_s.insert(name.as_str().to_string(), val_s);
47            if existing.is_some() {
48                let msg = format!("{} provided more than once", name.as_str());
49                return Err(WebhookError::InvalidHeaders(msg));
50            }
51        }
52    }
53    let headers = Arc::new(headers_s);
54
55    // Append to the webhook source, retrying if we race with a concurrent `ALTER SOURCE` op.
56    Retry::default()
57        .max_tries(2)
58        .retry_async(|_| async {
59            let result = append_webhook(
60                &adapter_client,
61                &webhook_cache,
62                &database,
63                &schema,
64                &name,
65                &body,
66                &headers,
67            )
68            .await;
69
70            // Note: think carefully before adding more errors here, we need to make sure we don't
71            // append data more than once.
72            match result {
73                Ok(()) => RetryResult::Ok(()),
74                Err(e @ AppendWebhookError::ChannelClosed) => RetryResult::RetryableErr(e),
75                Err(e) => RetryResult::FatalErr(e),
76            }
77        })
78        .await?;
79
80    Ok::<_, WebhookError>(())
81}
82
83/// Append the provided `body` and `headers` to the webhook source identified via `database`,
84/// `schema`, and `name`.
85async fn append_webhook(
86    adapter_client: &mz_adapter::Client,
87    webhook_cache: &WebhookAppenderCache,
88    database: &str,
89    schema: &str,
90    name: &str,
91    body: &Bytes,
92    headers: &Arc<BTreeMap<String, String>>,
93) -> Result<(), AppendWebhookError> {
94    // Shenanigans to get the types working for the async retry.
95    let (database, schema, name) = (database.to_string(), schema.to_string(), name.to_string());
96
97    // Record the time we receive the request, for use if validation checks the current timestamp.
98    let received_at = adapter_client.now();
99
100    // Get an appender for the provided object, if that object exists.
101    let AppendWebhookResponse {
102        tx,
103        body_format,
104        header_tys,
105        validator,
106    } = async {
107        let mut guard = webhook_cache.entries.lock().await;
108
109        // Remove the appender from our map, only re-insert it, if it's valid.
110        match guard.remove(&(database.clone(), schema.clone(), name.clone())) {
111            Some(appender) if !appender.tx.is_closed() => {
112                guard.insert((database, schema, name), appender.clone());
113                Ok::<_, AppendWebhookError>(appender)
114            }
115            // We don't have a valid appender, so we need to get one.
116            //
117            // Note: we hold the lock while we acquire and appender to prevent a dogpile.
118            _ => {
119                tracing::info!(?database, ?schema, ?name, "fetching webhook appender");
120                adapter_client.metrics().webhook_get_appender.inc();
121
122                // Acquire and cache a new appender.
123                let appender = adapter_client
124                    .get_webhook_appender(database.clone(), schema.clone(), name.clone())
125                    .await?;
126
127                guard.insert((database, schema, name), appender.clone());
128
129                Ok(appender)
130            }
131        }
132    }
133    .await?;
134
135    // These must happen before validation as we do not know if validation or
136    // packing will succeed and appending will begin
137    tx.increment_messages_received(1);
138    tx.increment_bytes_received(u64::cast_from(body.len()));
139
140    // If this source requires validation, then validate!
141    if let Some(validator) = validator {
142        let valid = validator
143            .eval(Bytes::clone(body), Arc::clone(headers), received_at)
144            .await?;
145        if !valid {
146            return Err(AppendWebhookError::ValidationFailed);
147        }
148    }
149
150    // Pack our body and headers into a Row.
151    let rows = pack_rows(body, &body_format, headers, &header_tys)?;
152
153    // Send the row to get appended.
154    tx.append(rows).await?;
155
156    Ok(())
157}
158
159/// Packs the body and headers of a webhook request into as many rows as necessary.
160///
161/// TODO(parkmycar): Should we be consolidating the returned Rows here? Presumably something in
162/// storage would already be doing it, so no need to do it twice?
163fn pack_rows(
164    body: &[u8],
165    body_format: &WebhookBodyFormat,
166    headers: &BTreeMap<String, String>,
167    header_tys: &WebhookHeaders,
168) -> Result<Vec<(Row, Diff)>, AppendWebhookError> {
169    // This method isn't that "deep" but it reflects the way we intend for the packing process to
170    // work and makes testing easier.
171    let rows = transform_body(body, body_format)?
172        .into_iter()
173        .map(|row| pack_header(row, headers, header_tys).map(|row| (row, Diff::ONE)))
174        .collect::<Result<_, _>>()?;
175    Ok(rows)
176}
177
178/// Transforms the body of a webhook request into a `Vec<BodyRow>`.
179fn transform_body(
180    body: &[u8],
181    format: &WebhookBodyFormat,
182) -> Result<Vec<BodyRow>, AppendWebhookError> {
183    let rows = match format {
184        WebhookBodyFormat::Bytes => {
185            vec![Row::pack_slice(&[Datum::Bytes(body)])]
186        }
187        WebhookBodyFormat::Text => {
188            let s = std::str::from_utf8(body)
189                .map_err(|m| AppendWebhookError::InvalidUtf8Body { msg: m.to_string() })?;
190            vec![Row::pack_slice(&[Datum::String(s)])]
191        }
192        WebhookBodyFormat::Json { array } => {
193            let objects = serde_json::Deserializer::from_slice(body)
194                // Automatically expand multiple JSON objects delimited by whitespace, e.g.
195                // newlines, into a single batch.
196                .into_iter::<serde_json::Value>()
197                // Optionally expand a JSON array into separate rows, if requested.
198                .flat_map(|value| match value {
199                    Ok(serde_json::Value::Array(inners)) if *array => {
200                        itertools::Either::Left(inners.into_iter().map(Result::Ok))
201                    }
202                    value => itertools::Either::Right(std::iter::once(value)),
203                })
204                .collect::<Result<Vec<_>, _>>()
205                .map_err(|m| AppendWebhookError::InvalidJsonBody { msg: m.to_string() })?;
206
207            // Note: `into_iter()` should be re-using the underlying allocation of the `objects`
208            // vector, and it's more readable to split these into separate iterators.
209            let rows = objects
210                .into_iter()
211                // Map a JSON object into a Row.
212                .map(|o| {
213                    let row = Jsonb::from_serde_json(o)
214                        .map_err(|m| AppendWebhookError::InvalidJsonBody { msg: m.to_string() })?
215                        .into_row();
216                    Ok::<_, AppendWebhookError>(row)
217                })
218                .collect::<Result<_, _>>()?;
219
220            rows
221        }
222    };
223
224    // A `Row` cannot describe its schema without unpacking it. To add some safety we wrap the
225    // returned `Row`s in a newtype to signify they already have the "body" column packed.
226    let body_rows = rows.into_iter().map(BodyRow).collect();
227
228    Ok(body_rows)
229}
230
231/// Pack the headers of a request into a [`Row`].
232fn pack_header(
233    mut body_row: BodyRow,
234    headers: &BTreeMap<String, String>,
235    header_tys: &WebhookHeaders,
236) -> Result<Row, AppendWebhookError> {
237    // 1 column for the body plus however many are needed for the headers.
238    let num_cols = 1 + header_tys.num_columns();
239    // The provided Row already has the Body written.
240    let mut num_cols_written = 1;
241
242    let mut packer = RowPacker::for_existing_row(body_row.inner_mut());
243
244    // Pack the headers into our row, if required.
245    if let Some(filters) = &header_tys.header_column {
246        packer.push_dict(
247            filter_headers(headers, filters).map(|(name, val)| (name, Datum::String(val))),
248        );
249        num_cols_written += 1;
250    }
251
252    // Pack the mapped headers.
253    for idx in num_cols_written..num_cols {
254        let (header_name, use_bytes) = header_tys
255            .mapped_headers
256            .get(&idx)
257            .ok_or_else(|| anyhow::anyhow!("Invalid header column index {idx}"))?;
258        let header = headers.get(header_name);
259        let datum = match header {
260            Some(h) if *use_bytes => Datum::Bytes(h.as_bytes()),
261            Some(h) => Datum::String(h),
262            None => Datum::Null,
263        };
264        packer.push(datum);
265    }
266
267    Ok(body_row.into_inner())
268}
269
270fn filter_headers<'a: 'b, 'b>(
271    headers: &'a BTreeMap<String, String>,
272    filters: &'b WebhookHeaderFilters,
273) -> impl Iterator<Item = (&'a str, &'a str)> + 'b {
274    headers
275        .iter()
276        .filter(|(header_name, _val)| {
277            // If our block list is empty, then don't filter anything.
278            filters.block.is_empty() || !filters.block.contains(*header_name)
279        })
280        .filter(|(header_name, _val)| {
281            // If our allow list is empty, then don't filter anything.
282            filters.allow.is_empty() || filters.allow.contains(*header_name)
283        })
284        .map(|(key, val)| (key.as_str(), val.as_str()))
285}
286
287/// A [`Row`] that has the body of a request already packed into it.
288///
289/// Note: if you're constructing a [`BodyRow`] you need to guarantee the only column packed into
290/// the [`Row`] is a single "body" column.
291#[repr(transparent)]
292struct BodyRow(Row);
293
294impl BodyRow {
295    /// Obtain a mutable reference to the inner [`Row`].
296    fn inner_mut(&mut self) -> &mut Row {
297        &mut self.0
298    }
299
300    /// Return the inner [`Row`].
301    fn into_inner(self) -> Row {
302        self.0
303    }
304}
305
306/// Errors we can encounter when appending data to a Webhook Source.
307///
308/// Webhook sources are a bit special since they are handled by `environmentd` (all other sources
309/// are handled by `clusterd`) and data is "pushed" to them (all other source pull data). The
310/// errors also generally need to map to HTTP status codes that we can use to respond to a webhook
311/// request. As such, webhook errors don't cleanly map to any existing error type, hence the
312/// existence of this error type.
313#[derive(Error, Debug)]
314pub enum WebhookError {
315    #[error("no object was found at the path {}", .0.quoted())]
316    NotFound(String),
317    #[error("the required auth could not be found")]
318    SecretMissing,
319    #[error("headers of request were invalid: {0}")]
320    InvalidHeaders(String),
321    #[error("failed to deserialize body as {ty:?}: {msg}")]
322    InvalidBody { ty: ScalarType, msg: String },
323    #[error("failed to validate the request")]
324    ValidationFailed,
325    #[error("error occurred while running validation")]
326    ValidationError,
327    #[error("service unavailable")]
328    Unavailable,
329    #[error("internal storage failure! {0:?}")]
330    InternalStorageError(StorageError<Timestamp>),
331    #[error("internal failure! {0:?}")]
332    Internal(#[from] anyhow::Error),
333}
334
335impl From<AppendWebhookError> for WebhookError {
336    fn from(err: AppendWebhookError) -> Self {
337        match err {
338            AppendWebhookError::MissingSecret => WebhookError::SecretMissing,
339            AppendWebhookError::ValidationError => WebhookError::ValidationError,
340            AppendWebhookError::InvalidUtf8Body { msg } => WebhookError::InvalidBody {
341                ty: ScalarType::String,
342                msg,
343            },
344            AppendWebhookError::InvalidJsonBody { msg } => WebhookError::InvalidBody {
345                ty: ScalarType::Jsonb,
346                msg,
347            },
348            AppendWebhookError::UnknownWebhook {
349                database,
350                schema,
351                name,
352            } => WebhookError::NotFound(format!("'{database}.{schema}.{name}'")),
353            AppendWebhookError::ValidationFailed => WebhookError::ValidationFailed,
354            AppendWebhookError::ChannelClosed => {
355                WebhookError::Internal(anyhow::anyhow!("channel closed"))
356            }
357            AppendWebhookError::StorageError(storage_err) => {
358                match storage_err {
359                    // TODO(parkmycar): Maybe map this to a HTTP 410 Gone instead of 404?
360                    StorageError::IdentifierMissing(id) | StorageError::IdentifierInvalid(id) => {
361                        WebhookError::NotFound(id.to_string())
362                    }
363                    StorageError::ShuttingDown(_) => WebhookError::Unavailable,
364                    e => WebhookError::InternalStorageError(e),
365                }
366            }
367            AppendWebhookError::InternalError(err) => WebhookError::Internal(err),
368        }
369    }
370}
371
372impl IntoResponse for WebhookError {
373    fn into_response(self) -> axum::response::Response {
374        match self {
375            e @ WebhookError::NotFound(_) | e @ WebhookError::SecretMissing => {
376                (StatusCode::NOT_FOUND, e.to_string()).into_response()
377            }
378            e @ WebhookError::InvalidBody { .. }
379            | e @ WebhookError::ValidationFailed
380            | e @ WebhookError::ValidationError => {
381                (StatusCode::BAD_REQUEST, e.to_string()).into_response()
382            }
383            e @ WebhookError::InvalidHeaders(_) => {
384                (StatusCode::UNAUTHORIZED, e.to_string()).into_response()
385            }
386            e @ WebhookError::Unavailable => {
387                (StatusCode::SERVICE_UNAVAILABLE, e.to_string()).into_response()
388            }
389            e @ WebhookError::InternalStorageError(StorageError::ResourceExhausted(_)) => {
390                (StatusCode::TOO_MANY_REQUESTS, e.to_string()).into_response()
391            }
392            e @ WebhookError::InternalStorageError(_) => {
393                (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response()
394            }
395            WebhookError::Internal(e) => (
396                StatusCode::INTERNAL_SERVER_ERROR,
397                e.root_cause().to_string(),
398            )
399                .into_response(),
400        }
401    }
402}
403
404#[cfg(test)]
405mod tests {
406    use std::collections::{BTreeMap, BTreeSet};
407
408    use axum::response::IntoResponse;
409    use bytes::Bytes;
410    use http::StatusCode;
411    use mz_adapter::AppendWebhookError;
412    use mz_ore::assert_none;
413    use mz_repr::{GlobalId, Row};
414    use mz_sql::plan::{WebhookBodyFormat, WebhookHeaderFilters, WebhookHeaders};
415    use mz_storage_types::controller::StorageError;
416    use proptest::prelude::*;
417    use proptest::strategy::Union;
418
419    use super::{WebhookError, filter_headers, pack_rows};
420
421    // TODO(parkmycar): Move this strategy to `ore`?
422    fn arbitrary_json() -> impl Strategy<Value = serde_json::Value> {
423        let json_leaf = Union::new(vec![
424            any::<()>().prop_map(|_| serde_json::Value::Null).boxed(),
425            any::<bool>().prop_map(serde_json::Value::Bool).boxed(),
426            any::<i64>()
427                .prop_map(|x| serde_json::Value::Number(x.into()))
428                .boxed(),
429            any::<String>().prop_map(serde_json::Value::String).boxed(),
430        ]);
431
432        json_leaf.prop_recursive(4, 32, 8, |element| {
433            Union::new(vec![
434                prop::collection::vec(element.clone(), 0..16)
435                    .prop_map(serde_json::Value::Array)
436                    .boxed(),
437                prop::collection::hash_map(".*", element, 0..16)
438                    .prop_map(|map| serde_json::Value::Object(map.into_iter().collect()))
439                    .boxed(),
440            ])
441        })
442    }
443
444    #[track_caller]
445    fn check_rows(rows: &Vec<(Row, mz_repr::Diff)>, expected_rows: usize, expected_cols: usize) {
446        assert_eq!(rows.len(), expected_rows);
447        for (row, _diff) in rows {
448            assert_eq!(row.unpack().len(), expected_cols);
449        }
450    }
451
452    #[mz_ore::test]
453    fn smoke_test_storage_error_response_status() {
454        // Resource exhausted should get mapped to a specific status code.
455        let resp = WebhookError::from(AppendWebhookError::StorageError(
456            StorageError::ResourceExhausted("test"),
457        ))
458        .into_response();
459        assert_eq!(resp.status(), StatusCode::TOO_MANY_REQUESTS);
460
461        // IdentifierMissing should also get mapped to a specific status code.
462        let resp = WebhookError::from(AppendWebhookError::StorageError(
463            StorageError::IdentifierMissing(GlobalId::User(42)),
464        ))
465        .into_response();
466        assert_eq!(resp.status(), StatusCode::NOT_FOUND);
467    }
468
469    #[mz_ore::test]
470    fn smoke_test_filter_headers() {
471        let block = BTreeSet::from(["foo".to_string()]);
472        let allow = BTreeSet::from(["bar".to_string()]);
473
474        let headers = BTreeMap::from([
475            ("foo".to_string(), "1".to_string()),
476            ("bar".to_string(), "2".to_string()),
477            ("baz".to_string(), "3".to_string()),
478        ]);
479        let mut filters = WebhookHeaderFilters::default();
480        filters.block.clone_from(&block);
481
482        let mut h = filter_headers(&headers, &filters);
483        assert_eq!(h.next().unwrap().0, "bar");
484        assert_eq!(h.next().unwrap().0, "baz");
485        assert_none!(h.next());
486
487        let mut filters = WebhookHeaderFilters::default();
488        filters.allow.clone_from(&allow);
489
490        let mut h = filter_headers(&headers, &filters);
491        assert_eq!(h.next().unwrap().0, "bar");
492        assert_none!(h.next());
493
494        let mut filters = WebhookHeaderFilters::default();
495        filters.allow = allow;
496        filters.block = block;
497
498        let mut h = filter_headers(&headers, &filters);
499        assert_eq!(h.next().unwrap().0, "bar");
500        assert_none!(h.next());
501    }
502
503    #[mz_ore::test]
504    fn filter_headers_block_overrides_allow() {
505        let block = BTreeSet::from(["foo".to_string()]);
506        let allow = block.clone();
507
508        let headers = BTreeMap::from([
509            ("foo".to_string(), "1".to_string()),
510            ("bar".to_string(), "2".to_string()),
511            ("baz".to_string(), "3".to_string()),
512        ]);
513        let filters = WebhookHeaderFilters { block, allow };
514
515        // We should yield nothing since we block the only thing we allow.
516        let mut h = filter_headers(&headers, &filters);
517        assert_none!(h.next());
518    }
519
520    #[mz_ore::test]
521    fn test_json_array_single() {
522        let single_raw = r#"
523        {
524            "event_type": "i am a single object",
525            "another_field": 42
526        }
527        "#;
528
529        // We should get a single Row regardless of whether or not we're requested to expand.
530        let rows = pack_rows(
531            single_raw.as_bytes(),
532            &WebhookBodyFormat::Json { array: false },
533            &BTreeMap::default(),
534            &WebhookHeaders::default(),
535        )
536        .unwrap();
537        assert_eq!(rows.len(), 1);
538
539        // We should get a single Row regardless of whether or not we're requested to expand.
540        let rows = pack_rows(
541            single_raw.as_bytes(),
542            &WebhookBodyFormat::Json { array: true },
543            &BTreeMap::default(),
544            &WebhookHeaders::default(),
545        )
546        .unwrap();
547        assert_eq!(rows.len(), 1);
548    }
549
550    #[mz_ore::test]
551    fn test_json_deserializer_multi() {
552        let multi_raw = r#"
553            [
554                { "event_type": "smol" },
555                { "event_type": "dog" }
556            ]
557        "#;
558
559        let rows = pack_rows(
560            multi_raw.as_bytes(),
561            &WebhookBodyFormat::Json { array: false },
562            &BTreeMap::default(),
563            &WebhookHeaders::default(),
564        )
565        .unwrap();
566        // If we don't expand the body, we should have a single row.
567        assert_eq!(rows.len(), 1);
568
569        let rows = pack_rows(
570            multi_raw.as_bytes(),
571            &WebhookBodyFormat::Json { array: true },
572            &BTreeMap::default(),
573            &WebhookHeaders::default(),
574        )
575        .unwrap();
576        // If we _do_ expand the body, we should have a two rows.
577        assert_eq!(rows.len(), 2);
578    }
579
580    proptest! {
581        #[mz_ore::test]
582        fn proptest_pack_row_never_panics(
583            body: Vec<u8>,
584            body_ty: WebhookBodyFormat,
585            headers: BTreeMap<String, String>,
586            non_existent_headers: Vec<String>,
587            block: BTreeSet<String>,
588            allow: BTreeSet<String>,
589        ) {
590            let body = Bytes::from(body);
591
592            // Include the headers column with a random set of block and allow.
593            let filters = WebhookHeaderFilters { block, allow };
594            // Include half of the existing headers, append on some non-existing ones too.
595            let mut use_bytes = false;
596            let mapped_headers = headers
597                .keys()
598                .take(headers.len() / 2)
599                .chain(non_existent_headers.iter())
600                .cloned()
601                .enumerate()
602                .map(|(idx, name)| {
603                    use_bytes = !use_bytes;
604                    (idx + 2, (name, use_bytes))
605                })
606                .collect();
607            let header_tys = WebhookHeaders {
608                header_column: Some(filters),
609                mapped_headers,
610            };
611
612            // Call this method to make sure it doesn't panic.
613            let _ = pack_rows(&body[..], &body_ty, &headers, &header_tys);
614        }
615
616        #[mz_ore::test]
617        fn proptest_pack_row_succeeds_for_bytes(
618            body: Vec<u8>,
619            headers: BTreeMap<String, String>,
620            include_headers: bool,
621        ) {
622            let body = Bytes::from(body);
623
624            let body_ty = WebhookBodyFormat::Bytes;
625            let mut header_tys = WebhookHeaders::default();
626            header_tys.header_column = include_headers.then(Default::default);
627
628            let rows = pack_rows(&body[..], &body_ty, &headers, &header_tys).unwrap();
629            check_rows(&rows, 1, header_tys.num_columns() + 1);
630        }
631
632        #[mz_ore::test]
633        fn proptest_pack_row_succeeds_for_strings(
634            body: String,
635            headers: BTreeMap<String, String>,
636            include_headers: bool,
637        ) {
638            let body = Bytes::from(body);
639
640            let body_ty = WebhookBodyFormat::Text;
641            let mut header_tys = WebhookHeaders::default();
642            header_tys.header_column = include_headers.then(Default::default);
643
644            let rows = pack_rows(&body[..], &body_ty, &headers, &header_tys).unwrap();
645            check_rows(&rows, 1, header_tys.num_columns() + 1);
646        }
647
648        #[mz_ore::test]
649        fn proptest_pack_row_succeeds_for_selective_headers(
650            body: String,
651            headers: BTreeMap<String, String>,
652            include_headers: bool,
653            non_existent_headers: Vec<String>,
654            block: BTreeSet<String>,
655            allow: BTreeSet<String>,
656        ) {
657            let body = Bytes::from(body);
658            let body_ty = WebhookBodyFormat::Text;
659
660            // Include the headers column with a random set of block and allow.
661            let filters = WebhookHeaderFilters { block, allow };
662            // Include half of the existing headers, append on some non-existing ones too.
663            let mut use_bytes = false;
664            let column_offset = if include_headers { 2 } else { 1 };
665            let mapped_headers = headers
666                .keys()
667                .take(headers.len() / 2)
668                .chain(non_existent_headers.iter())
669                .cloned()
670                .enumerate()
671                .map(|(idx, name)| {
672                    use_bytes = !use_bytes;
673                    (idx + column_offset, (name, use_bytes))
674                })
675                .collect();
676            let header_tys = WebhookHeaders {
677                header_column: include_headers.then_some(filters),
678                mapped_headers,
679            };
680
681            let rows = pack_rows(&body[..], &body_ty, &headers, &header_tys).unwrap();
682            check_rows(&rows, 1, header_tys.num_columns() + 1);
683        }
684
685        #[mz_ore::test]
686        fn proptest_pack_json_with_array_expansion(
687            body in arbitrary_json(),
688            expand_array: bool,
689            headers: BTreeMap<String, String>,
690            include_headers: bool,
691        ) {
692            let json_raw = serde_json::to_vec(&body).unwrap();
693            let mut header_tys = WebhookHeaders::default();
694            header_tys.header_column = include_headers.then(Default::default);
695
696            let rows = pack_rows(
697                &json_raw[..],
698                &WebhookBodyFormat::Json { array: expand_array },
699                &headers,
700                &header_tys,
701            )
702            .unwrap();
703
704            let expected_num_rows = match body {
705                serde_json::Value::Array(inner) if expand_array => inner.len(),
706                _ => 1,
707            };
708            check_rows(&rows, expected_num_rows, header_tys.num_columns() + 1);
709        }
710    }
711}