Skip to main content

mz_interchange/
glue.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Wire-format helpers for the AWS Glue Schema Registry framing.
11//!
12//! Glue prepends an 18-byte header to each Kafka record payload:
13//!
14//! | Offset | Bytes | Meaning                                              |
15//! |--------|-------|------------------------------------------------------|
16//! | 0      | 1     | Header version. Glue currently emits `0x03`.         |
17//! | 1      | 1     | Compression byte. `0x00` = none, `0x05` = zlib.      |
18//! | 2..18  | 16    | Schema-version UUID, big-endian.                     |
19//! | 18..   | N     | The serialized record payload.                       |
20//!
21//! Not documented in any spec (that I could find it), ref to the source
22//! [serializer-deserializer/src/main/java/com/amazonaws/services/schemaregistry/serializers/SerializationDataEncoder.java](https://github.com/awslabs/aws-glue-schema-registry/blob/4b9cac477d6876a883e2a8893738a30c072694dc/serializer-deserializer/src/main/java/com/amazonaws/services/schemaregistry/serializers/SerializationDataEncoder.java#L54-L70)
23//! Materialize only supports the uncompressed framing (`compression =
24//! 0x00`). Compressed records are rejected — supporting zlib at the wire
25//! layer is straightforward but no consumer in Materialize asks for it yet,
26//! and silently decompressing would mask producer misconfiguration.
27//!
28//! The Confluent analogue lives in [`crate::confluent`].
29
30use anyhow::{Result, bail};
31use uuid::Uuid;
32
33/// Glue wire-format header version, written at byte 0.
34const HEADER_VERSION: u8 = 0x03;
35
36/// Compression byte indicating an uncompressed payload.
37const COMPRESSION_NONE: u8 = 0x00;
38
39/// Length of the Glue header in bytes (version + compression + UUID).
40pub const HEADER_LEN: usize = 1 + 1 + 16;
41
42/// Parse the Glue Avro header from the front of `buf`, returning the
43/// schema-version UUID and a subslice covering the record payload.
44///
45/// Returns an error if the buffer is shorter than the fixed header, if the
46/// header-version byte is not `0x03`, or if the compression byte is
47/// anything other than `0x00`.
48pub fn extract_avro_header(buf: &[u8]) -> Result<(Uuid, &[u8])> {
49    if buf.len() < HEADER_LEN {
50        bail!(
51            "Glue-style avro datum is too few bytes: expected at least {} bytes, got {}",
52            HEADER_LEN,
53            buf.len()
54        );
55    }
56    let version = buf[0];
57    if version != HEADER_VERSION {
58        bail!(
59            "wrong Glue-style avro serialization header version: expected {:#04x}, got {:#04x}",
60            HEADER_VERSION,
61            version
62        );
63    }
64    let compression = buf[1];
65    if compression != COMPRESSION_NONE {
66        bail!(
67            "unsupported Glue-style avro compression byte: \
68             expected {:#04x} (uncompressed), got {:#04x}",
69            COMPRESSION_NONE,
70            compression
71        );
72    }
73    // `Uuid::from_slice` only fails on length mismatch, which we've already
74    // validated above; the unwrap is sound.
75    let uuid = Uuid::from_slice(&buf[2..HEADER_LEN]).expect("18-byte header validated above");
76    Ok((uuid, &buf[HEADER_LEN..]))
77}
78
79/// Frame `payload` with the Glue Avro header, producing a buffer suitable
80/// to publish to Kafka. The header is laid down using the uncompressed
81/// framing (`compression = 0x00`).
82pub fn prepend_avro_header(schema_version_id: Uuid, payload: &[u8]) -> Vec<u8> {
83    let mut out = Vec::with_capacity(HEADER_LEN + payload.len());
84    out.push(HEADER_VERSION);
85    out.push(COMPRESSION_NONE);
86    out.extend_from_slice(schema_version_id.as_bytes());
87    out.extend_from_slice(payload);
88    out
89}
90
91#[cfg(test)]
92mod tests {
93    use super::*;
94
95    fn fixture_uuid() -> Uuid {
96        // Fixed value so the encoded byte layout is exact in assertions.
97        Uuid::parse_str("12345678-1234-5678-1234-567812345678").unwrap()
98    }
99
100    #[mz_ore::test]
101    fn roundtrip() {
102        let uuid = fixture_uuid();
103        let payload = b"avro-bytes-here";
104        let framed = prepend_avro_header(uuid, payload);
105        assert_eq!(framed.len(), HEADER_LEN + payload.len());
106        let (parsed_uuid, rest) = extract_avro_header(&framed).unwrap();
107        assert_eq!(parsed_uuid, uuid);
108        assert_eq!(rest, payload);
109    }
110
111    #[mz_ore::test]
112    fn header_byte_layout() {
113        let uuid = fixture_uuid();
114        let framed = prepend_avro_header(uuid, &[]);
115        assert_eq!(framed[0], HEADER_VERSION);
116        assert_eq!(framed[1], COMPRESSION_NONE);
117        assert_eq!(&framed[2..HEADER_LEN], uuid.as_bytes());
118    }
119
120    #[mz_ore::test]
121    fn rejects_buffer_too_short() {
122        // 17 bytes — one short of the minimum header.
123        let buf = [0u8; HEADER_LEN - 1];
124        let err = extract_avro_header(&buf).unwrap_err();
125        assert!(err.to_string().contains("too few bytes"), "{err}");
126    }
127
128    #[mz_ore::test]
129    fn rejects_wrong_header_version() {
130        let mut buf = prepend_avro_header(fixture_uuid(), b"payload");
131        buf[0] = 0x02;
132        let err = extract_avro_header(&buf).unwrap_err();
133        assert!(
134            err.to_string()
135                .contains("wrong Glue-style avro serialization header version"),
136            "{err}"
137        );
138    }
139
140    #[mz_ore::test]
141    fn rejects_compressed_payload() {
142        let mut buf = prepend_avro_header(fixture_uuid(), b"payload");
143        buf[1] = 0x05; // zlib
144        let err = extract_avro_header(&buf).unwrap_err();
145        assert!(
146            err.to_string()
147                .contains("unsupported Glue-style avro compression byte"),
148            "{err}"
149        );
150    }
151
152    #[mz_ore::test]
153    fn empty_payload_is_legal() {
154        let uuid = fixture_uuid();
155        let framed = prepend_avro_header(uuid, &[]);
156        let (parsed_uuid, rest) = extract_avro_header(&framed).unwrap();
157        assert_eq!(parsed_uuid, uuid);
158        assert!(rest.is_empty());
159    }
160}