mz_interchange/
confluent.rs1use anyhow::{Result, bail};
11use byteorder::{BigEndian, ByteOrder};
12
13fn extract_schema_id<'buf>(buf: &'buf [u8], protocol: &str) -> Result<(i32, &'buf [u8])> {
18 let expected_len = 5;
28
29 if buf.len() < expected_len {
30 bail!(
31 "Confluent-style {} datum is too few bytes: expected at least {} bytes, got {}",
32 protocol,
33 expected_len,
34 buf.len()
35 );
36 }
37 let magic = buf[0];
38 let schema_id = BigEndian::read_i32(&buf[1..5]);
39
40 if magic != 0 {
41 bail!(
42 "wrong Confluent-style {} serialization magic: expected 0, got {}",
43 protocol,
44 magic
45 );
46 }
47
48 Ok((schema_id, &buf[expected_len..]))
49}
50
51pub fn extract_avro_header(buf: &[u8]) -> Result<(i32, &[u8])> {
52 extract_schema_id(buf, "avro")
53}
54
55pub fn extract_protobuf_header(buf: &[u8]) -> Result<(i32, &[u8])> {
56 let (schema_id, buf) = extract_schema_id(buf, "protobuf")?;
57
58 match buf.get(0) {
59 Some(0) => Ok((schema_id, &buf[1..])),
60 Some(message_id) => bail!(
61 "unsupported Confluent-style protobuf message descriptor id: \
62 expected 0, but found: {}",
63 message_id
64 ),
65 None => bail!(
66 "Confluent-style protobuf datum is too few bytes: expected a message id after magic \
67 and schema id, got a buffer of length {}",
68 buf.len()
69 ),
70 }
71}