mz_interchange/glue.rs
1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Wire-format helpers for the AWS Glue Schema Registry framing.
11//!
12//! Glue prepends an 18-byte header to each Kafka record payload:
13//!
14//! | Offset | Bytes | Meaning |
15//! |--------|-------|------------------------------------------------------|
16//! | 0 | 1 | Header version. Glue currently emits `0x03`. |
17//! | 1 | 1 | Compression byte. `0x00` = none, `0x05` = zlib. |
18//! | 2..18 | 16 | Schema-version UUID, big-endian. |
19//! | 18.. | N | The serialized record payload. |
20//!
21//! Not documented in any spec (that I could find it), ref to the source
22//! [serializer-deserializer/src/main/java/com/amazonaws/services/schemaregistry/serializers/SerializationDataEncoder.java](https://github.com/awslabs/aws-glue-schema-registry/blob/4b9cac477d6876a883e2a8893738a30c072694dc/serializer-deserializer/src/main/java/com/amazonaws/services/schemaregistry/serializers/SerializationDataEncoder.java#L54-L70)
23//! Materialize only supports the uncompressed framing (`compression =
24//! 0x00`). Compressed records are rejected — supporting zlib at the wire
25//! layer is straightforward but no consumer in Materialize asks for it yet,
26//! and silently decompressing would mask producer misconfiguration.
27//!
28//! The Confluent analogue lives in [`crate::confluent`].
29
30use anyhow::{Result, bail};
31use uuid::Uuid;
32
33/// Glue wire-format header version, written at byte 0.
34const HEADER_VERSION: u8 = 0x03;
35
36/// Compression byte indicating an uncompressed payload.
37const COMPRESSION_NONE: u8 = 0x00;
38
39/// Length of the Glue header in bytes (version + compression + UUID).
40pub const HEADER_LEN: usize = 1 + 1 + 16;
41
42/// Parse the Glue Avro header from the front of `buf`, returning the
43/// schema-version UUID and a subslice covering the record payload.
44///
45/// Returns an error if the buffer is shorter than the fixed header, if the
46/// header-version byte is not `0x03`, or if the compression byte is
47/// anything other than `0x00`.
48pub fn extract_avro_header(buf: &[u8]) -> Result<(Uuid, &[u8])> {
49 if buf.len() < HEADER_LEN {
50 bail!(
51 "Glue-style avro datum is too few bytes: expected at least {} bytes, got {}",
52 HEADER_LEN,
53 buf.len()
54 );
55 }
56 let version = buf[0];
57 if version != HEADER_VERSION {
58 bail!(
59 "wrong Glue-style avro serialization header version: expected {:#04x}, got {:#04x}",
60 HEADER_VERSION,
61 version
62 );
63 }
64 let compression = buf[1];
65 if compression != COMPRESSION_NONE {
66 bail!(
67 "unsupported Glue-style avro compression byte: \
68 expected {:#04x} (uncompressed), got {:#04x}",
69 COMPRESSION_NONE,
70 compression
71 );
72 }
73 // `Uuid::from_slice` only fails on length mismatch, which we've already
74 // validated above; the unwrap is sound.
75 let uuid = Uuid::from_slice(&buf[2..HEADER_LEN]).expect("18-byte header validated above");
76 Ok((uuid, &buf[HEADER_LEN..]))
77}
78
79/// Frame `payload` with the Glue Avro header, producing a buffer suitable
80/// to publish to Kafka. The header is laid down using the uncompressed
81/// framing (`compression = 0x00`).
82pub fn prepend_avro_header(schema_version_id: Uuid, payload: &[u8]) -> Vec<u8> {
83 let mut out = Vec::with_capacity(HEADER_LEN + payload.len());
84 out.push(HEADER_VERSION);
85 out.push(COMPRESSION_NONE);
86 out.extend_from_slice(schema_version_id.as_bytes());
87 out.extend_from_slice(payload);
88 out
89}
90
91#[cfg(test)]
92mod tests {
93 use super::*;
94
95 fn fixture_uuid() -> Uuid {
96 // Fixed value so the encoded byte layout is exact in assertions.
97 Uuid::parse_str("12345678-1234-5678-1234-567812345678").unwrap()
98 }
99
100 #[mz_ore::test]
101 fn roundtrip() {
102 let uuid = fixture_uuid();
103 let payload = b"avro-bytes-here";
104 let framed = prepend_avro_header(uuid, payload);
105 assert_eq!(framed.len(), HEADER_LEN + payload.len());
106 let (parsed_uuid, rest) = extract_avro_header(&framed).unwrap();
107 assert_eq!(parsed_uuid, uuid);
108 assert_eq!(rest, payload);
109 }
110
111 #[mz_ore::test]
112 fn header_byte_layout() {
113 let uuid = fixture_uuid();
114 let framed = prepend_avro_header(uuid, &[]);
115 assert_eq!(framed[0], HEADER_VERSION);
116 assert_eq!(framed[1], COMPRESSION_NONE);
117 assert_eq!(&framed[2..HEADER_LEN], uuid.as_bytes());
118 }
119
120 #[mz_ore::test]
121 fn rejects_buffer_too_short() {
122 // 17 bytes — one short of the minimum header.
123 let buf = [0u8; HEADER_LEN - 1];
124 let err = extract_avro_header(&buf).unwrap_err();
125 assert!(err.to_string().contains("too few bytes"), "{err}");
126 }
127
128 #[mz_ore::test]
129 fn rejects_wrong_header_version() {
130 let mut buf = prepend_avro_header(fixture_uuid(), b"payload");
131 buf[0] = 0x02;
132 let err = extract_avro_header(&buf).unwrap_err();
133 assert!(
134 err.to_string()
135 .contains("wrong Glue-style avro serialization header version"),
136 "{err}"
137 );
138 }
139
140 #[mz_ore::test]
141 fn rejects_compressed_payload() {
142 let mut buf = prepend_avro_header(fixture_uuid(), b"payload");
143 buf[1] = 0x05; // zlib
144 let err = extract_avro_header(&buf).unwrap_err();
145 assert!(
146 err.to_string()
147 .contains("unsupported Glue-style avro compression byte"),
148 "{err}"
149 );
150 }
151
152 #[mz_ore::test]
153 fn empty_payload_is_legal() {
154 let uuid = fixture_uuid();
155 let framed = prepend_avro_header(uuid, &[]);
156 let (parsed_uuid, rest) = extract_avro_header(&framed).unwrap();
157 assert_eq!(parsed_uuid, uuid);
158 assert!(rest.is_empty());
159 }
160}