mz_interchange/
confluent.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use anyhow::{Result, bail};
11use byteorder::{BigEndian, ByteOrder};
12
13/// Extracts the schema_id placed in front of the serialized message by the confluent stack
14/// Optionally expect an empty
15///
16/// This function returns the schema_id and a subslice of the rest of the buffer
17fn extract_schema_id<'buf>(buf: &'buf [u8], protocol: &str) -> Result<(i32, &'buf [u8])> {
18    // The first byte is a magic byte (0) that indicates the Confluent
19    // serialization format version, and the next four bytes are a big
20    // endian 32-bit schema ID.
21    //
22    // https://docs.confluent.io/current/schema-registry/docs/serializer-formatter.html#wire-format
23    //
24    // For formats like protobuf, confluent adds additional information related to
25    // which message in the proto file
26
27    let expected_len = 5;
28
29    if buf.len() < expected_len {
30        bail!(
31            "Confluent-style {} datum is too few bytes: expected at least {} bytes, got {}",
32            protocol,
33            expected_len,
34            buf.len()
35        );
36    }
37    let magic = buf[0];
38    let schema_id = BigEndian::read_i32(&buf[1..5]);
39
40    if magic != 0 {
41        bail!(
42            "wrong Confluent-style {} serialization magic: expected 0, got {}",
43            protocol,
44            magic
45        );
46    }
47
48    Ok((schema_id, &buf[expected_len..]))
49}
50
51pub fn extract_avro_header(buf: &[u8]) -> Result<(i32, &[u8])> {
52    extract_schema_id(buf, "avro")
53}
54
55pub fn extract_protobuf_header(buf: &[u8]) -> Result<(i32, &[u8])> {
56    let (schema_id, buf) = extract_schema_id(buf, "protobuf")?;
57
58    match buf.get(0) {
59        Some(0) => Ok((schema_id, &buf[1..])),
60        Some(message_id) => bail!(
61            "unsupported Confluent-style protobuf message descriptor id: \
62                expected 0, but found: {}",
63            message_id
64        ),
65        None => bail!(
66            "Confluent-style protobuf datum is too few bytes: expected a message id after magic \
67            and schema id, got a buffer of length {}",
68            buf.len()
69        ),
70    }
71}