mz_testdrive/action/
glue.rs1use anyhow::{Context, anyhow, bail};
11use aws_sdk_glue::types::{Compatibility, DataFormat, RegistryId, SchemaId};
12
13use crate::action::{ControlFlow, State};
14use crate::parser::BuiltinCommand;
15
16pub async fn run_create_schema(
49 mut cmd: BuiltinCommand,
50 state: &mut State,
51) -> Result<ControlFlow, anyhow::Error> {
52 let name = cmd.args.string("name")?;
53 let version_id_var = cmd.args.opt_string("set-version-id-var");
54 let registry = cmd.args.opt_string("registry");
55 let schema_arg = cmd.args.opt_string("schema");
56 let data_format = match cmd
57 .args
58 .opt_string("data-format")
59 .unwrap_or_else(|| "avro".into())
60 .to_lowercase()
61 .as_str()
62 {
63 "avro" => DataFormat::Avro,
64 "json" => DataFormat::Json,
65 "protobuf" => DataFormat::Protobuf,
66 other => bail!("unknown data-format: {}", other),
67 };
68 let compatibility = match cmd
69 .args
70 .opt_string("compatibility")
71 .unwrap_or_else(|| "backward".into())
72 .to_lowercase()
73 .as_str()
74 {
75 "backward" => Compatibility::Backward,
76 "backward_all" => Compatibility::BackwardAll,
77 "forward" => Compatibility::Forward,
78 "forward_all" => Compatibility::ForwardAll,
79 "full" => Compatibility::Full,
80 "full_all" => Compatibility::FullAll,
81 "none" => Compatibility::None,
82 "disabled" => Compatibility::Disabled,
83 other => bail!("unknown compatibility: {}", other),
84 };
85 cmd.args.done()?;
86
87 let definition = match schema_arg {
91 Some(schema) => schema,
92 None => cmd.input.join("\n"),
93 };
94 if definition.trim().is_empty() {
95 bail!("glue-create-schema requires a `schema=` argument or a schema definition body");
96 }
97
98 println!(
99 "Registering Glue schema {:?} (registry {:?})...",
100 name,
101 registry.as_deref().unwrap_or("<default>"),
102 );
103
104 let glue = aws_sdk_glue::Client::new(&state.aws_config);
105
106 let mut create = glue
107 .create_schema()
108 .schema_name(&name)
109 .data_format(data_format)
110 .compatibility(compatibility)
111 .schema_definition(&definition);
112 if let Some(registry) = ®istry {
113 create = create.registry_id(RegistryId::builder().registry_name(registry).build());
114 }
115
116 let version_id = match create.send().await {
117 Ok(resp) => resp.schema_version_id().map(|s| s.to_string()),
118 Err(err) => {
119 let svc = err.into_service_error();
122 if svc.is_already_exists_exception() {
123 let mut schema_id = SchemaId::builder().schema_name(&name);
124 if let Some(registry) = ®istry {
125 schema_id = schema_id.registry_name(registry);
126 }
127 let resp = glue
128 .register_schema_version()
129 .schema_id(schema_id.build())
130 .schema_definition(&definition)
131 .send()
132 .await
133 .context("registering new Glue schema version")?;
134 resp.schema_version_id().map(|s| s.to_string())
135 } else {
136 return Err(anyhow::Error::new(svc).context("creating Glue schema"));
137 }
138 }
139 };
140 if let Some(var) = version_id_var {
141 let version_id =
142 version_id.ok_or_else(|| anyhow!("Glue did not return a schema version id"))?;
143 state.cmd_vars.insert(var, version_id);
144 }
145 Ok(ControlFlow::Continue)
146}