Skip to main content

mz_testdrive/action/
glue.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use anyhow::{Context, anyhow, bail};
11use aws_sdk_glue::types::{Compatibility, DataFormat, RegistryId, SchemaId};
12
13use crate::action::{ControlFlow, State};
14use crate::parser::BuiltinCommand;
15
16/// Register a schema in AWS Glue Schema Registry and stash its version UUID in a
17/// testdrive variable.
18///
19/// This keeps a single source of truth in the `.td` file: define the schema
20/// once as a pretty-printed `$ set` variable, then reference it both here (to
21/// register it) and in `kafka-ingest` (to encode records) — so the body is
22/// never written twice.
23///
24/// ```text
25/// $ set my-schema={
26///     "type": "record", "name": "row",
27///     "fields": [{"name": "a", "type": "long"}]
28///   }
29///
30/// $ glue-create-schema registry=my-registry name=my-schema set-version-id-var=my-version-id schema=${my-schema}
31/// ```
32///
33/// Arguments:
34///   * `name` (required): the schema name.
35///   * `schema` (required unless given as the command body): the schema
36///     definition. Typically a `${...}` reference to a `$ set` variable.
37///   * `set-version-id-var` (optional): testdrive variable to receive the
38///     returned `SchemaVersionId`. Omit when the schema is referenced only by
39///     name (e.g. a negative test that registers a schema just to be rejected).
40///   * `registry` (optional): registry name. Omit to target Glue's implicit
41///     default registry.
42///   * `data-format` (optional, default `avro`): one of `avro`, `json`,
43///     `protobuf`.
44///   * `compatibility` (optional, default `backward`).
45///
46/// If a schema with this name already exists, a new version is registered
47/// instead — which is how schema-evolution tests register v2 atop v1.
48pub async fn run_create_schema(
49    mut cmd: BuiltinCommand,
50    state: &mut State,
51) -> Result<ControlFlow, anyhow::Error> {
52    let name = cmd.args.string("name")?;
53    let version_id_var = cmd.args.opt_string("set-version-id-var");
54    let registry = cmd.args.opt_string("registry");
55    let schema_arg = cmd.args.opt_string("schema");
56    let data_format = match cmd
57        .args
58        .opt_string("data-format")
59        .unwrap_or_else(|| "avro".into())
60        .to_lowercase()
61        .as_str()
62    {
63        "avro" => DataFormat::Avro,
64        "json" => DataFormat::Json,
65        "protobuf" => DataFormat::Protobuf,
66        other => bail!("unknown data-format: {}", other),
67    };
68    let compatibility = match cmd
69        .args
70        .opt_string("compatibility")
71        .unwrap_or_else(|| "backward".into())
72        .to_lowercase()
73        .as_str()
74    {
75        "backward" => Compatibility::Backward,
76        "backward_all" => Compatibility::BackwardAll,
77        "forward" => Compatibility::Forward,
78        "forward_all" => Compatibility::ForwardAll,
79        "full" => Compatibility::Full,
80        "full_all" => Compatibility::FullAll,
81        "none" => Compatibility::None,
82        "disabled" => Compatibility::Disabled,
83        other => bail!("unknown compatibility: {}", other),
84    };
85    cmd.args.done()?;
86
87    // The schema definition comes from the `schema=` argument (typically a
88    // `${...}` reference to a `$ set` variable) or, failing that, the command
89    // body.
90    let definition = match schema_arg {
91        Some(schema) => schema,
92        None => cmd.input.join("\n"),
93    };
94    if definition.trim().is_empty() {
95        bail!("glue-create-schema requires a `schema=` argument or a schema definition body");
96    }
97
98    println!(
99        "Registering Glue schema {:?} (registry {:?})...",
100        name,
101        registry.as_deref().unwrap_or("<default>"),
102    );
103
104    let glue = aws_sdk_glue::Client::new(&state.aws_config);
105
106    let mut create = glue
107        .create_schema()
108        .schema_name(&name)
109        .data_format(data_format)
110        .compatibility(compatibility)
111        .schema_definition(&definition);
112    if let Some(registry) = &registry {
113        create = create.registry_id(RegistryId::builder().registry_name(registry).build());
114    }
115
116    let version_id = match create.send().await {
117        Ok(resp) => resp.schema_version_id().map(|s| s.to_string()),
118        Err(err) => {
119            // The schema already exists — register a new version of it. This is
120            // the schema-evolution path (v2 atop v1).
121            let svc = err.into_service_error();
122            if svc.is_already_exists_exception() {
123                let mut schema_id = SchemaId::builder().schema_name(&name);
124                if let Some(registry) = &registry {
125                    schema_id = schema_id.registry_name(registry);
126                }
127                let resp = glue
128                    .register_schema_version()
129                    .schema_id(schema_id.build())
130                    .schema_definition(&definition)
131                    .send()
132                    .await
133                    .context("registering new Glue schema version")?;
134                resp.schema_version_id().map(|s| s.to_string())
135            } else {
136                return Err(anyhow::Error::new(svc).context("creating Glue schema"));
137            }
138        }
139    };
140    if let Some(var) = version_id_var {
141        let version_id =
142            version_id.ok_or_else(|| anyhow!("Glue did not return a schema version id"))?;
143        state.cmd_vars.insert(var, version_id);
144    }
145    Ok(ControlFlow::Continue)
146}