1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
// Copyright Materialize, Inc. and contributors. All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use anyhow::{bail, Result};
use byteorder::{BigEndian, ByteOrder};

/// Extracts the schema_id placed in front of the serialized message by the confluent stack
/// Optionally expect an empty
///
/// This function returns the schema_id and a subslice of the rest of the buffer
fn extract_schema_id<'buf>(buf: &'buf [u8], protocol: &str) -> Result<(i32, &'buf [u8])> {
    // The first byte is a magic byte (0) that indicates the Confluent
    // serialization format version, and the next four bytes are a big
    // endian 32-bit schema ID.
    //
    // https://docs.confluent.io/current/schema-registry/docs/serializer-formatter.html#wire-format
    //
    // For formats like protobuf, confluent adds additional information related to
    // which message in the proto file

    let expected_len = 5;

    if buf.len() < expected_len {
        bail!(
            "Confluent-style {} datum is too few bytes: expected at least {} bytes, got {}",
            protocol,
            expected_len,
            buf.len()
        );
    }
    let magic = buf[0];
    let schema_id = BigEndian::read_i32(&buf[1..5]);

    if magic != 0 {
        bail!(
            "wrong Confluent-style {} serialization magic: expected 0, got {}",
            protocol,
            magic
        );
    }

    Ok((schema_id, &buf[expected_len..]))
}

pub fn extract_avro_header(buf: &[u8]) -> Result<(i32, &[u8])> {
    extract_schema_id(buf, "avro")
}

pub fn extract_protobuf_header(buf: &[u8]) -> Result<(i32, &[u8])> {
    let (schema_id, buf) = extract_schema_id(buf, "protobuf")?;

    match buf.get(0) {
        Some(0) => Ok((schema_id, &buf[1..])),
        Some(message_id) => bail!(
            "unsupported Confluent-style protobuf message descriptor id: \
                expected 0, but found: {}. \
                See https://github.com/MaterializeInc/materialize/issues/9250",
            message_id
        ),
        None => bail!(
            "Confluent-style protobuf datum is too few bytes: expected a message id after magic \
            and schema id, got a buffer of length {}",
            buf.len()
        ),
    }
}