mz_ccsr/
client.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10use std::collections::BTreeSet;
11use std::error::Error;
12use std::fmt;
13use std::sync::Arc;
14use std::time::Duration;
15
16use anyhow::bail;
17use proptest_derive::Arbitrary;
18use reqwest::{Method, Response, Url};
19use serde::de::DeserializeOwned;
20use serde::{Deserialize, Serialize};
21
22use crate::config::Auth;
23
24/// An API client for a Confluent-compatible schema registry.
25#[derive(Clone)]
26pub struct Client {
27    inner: reqwest::Client,
28    url: Arc<dyn Fn() -> Url + Send + Sync + 'static>,
29    auth: Option<Auth>,
30    timeout: Duration,
31}
32
33impl fmt::Debug for Client {
34    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
35        f.debug_struct("Client")
36            .field("inner", &self.inner)
37            .field("url", &"...")
38            .field("auth", &self.auth)
39            .finish()
40    }
41}
42
43impl Client {
44    pub(crate) fn new(
45        inner: reqwest::Client,
46        url: Arc<dyn Fn() -> Url + Send + Sync + 'static>,
47        auth: Option<Auth>,
48        timeout: Duration,
49    ) -> Result<Self, anyhow::Error> {
50        if url().cannot_be_a_base() {
51            bail!("cannot construct a CCSR client with a cannot-be-a-base URL");
52        }
53        Ok(Client {
54            inner,
55            url,
56            auth,
57            timeout,
58        })
59    }
60
61    fn make_request<P>(&self, method: Method, path: P) -> reqwest::RequestBuilder
62    where
63        P: IntoIterator,
64        P::Item: AsRef<str>,
65    {
66        let mut url = (self.url)();
67        url.path_segments_mut()
68            .expect("constructor validated URL can be a base")
69            .clear()
70            .extend(path);
71
72        let mut request = self.inner.request(method, url);
73        if let Some(auth) = &self.auth {
74            request = request.basic_auth(&auth.username, auth.password.as_ref());
75        }
76        request
77    }
78
79    pub fn timeout(&self) -> Duration {
80        self.timeout
81    }
82
83    /// Gets the schema with the associated ID.
84    pub async fn get_schema_by_id(&self, id: i32) -> Result<Schema, GetByIdError> {
85        let req = self.make_request(Method::GET, &["schemas", "ids", &id.to_string()]);
86        let res: GetByIdResponse = send_request(req).await?;
87        Ok(Schema {
88            id,
89            raw: res.schema,
90        })
91    }
92
93    /// Gets the latest schema for the specified subject.
94    pub async fn get_schema_by_subject(&self, subject: &str) -> Result<Schema, GetBySubjectError> {
95        self.get_subject_latest(subject).await.map(|s| s.schema)
96    }
97
98    /// Gets the latest version of the specified subject.
99    pub async fn get_subject_latest(&self, subject: &str) -> Result<Subject, GetBySubjectError> {
100        let req = self.make_request(Method::GET, &["subjects", subject, "versions", "latest"]);
101        let res: GetBySubjectResponse = send_request(req).await?;
102        Ok(Subject {
103            schema: Schema {
104                id: res.id,
105                raw: res.schema,
106            },
107            version: res.version,
108            name: res.subject,
109        })
110    }
111
112    /// Gets the config set for the specified subject
113    pub async fn get_subject_config(
114        &self,
115        subject: &str,
116    ) -> Result<SubjectConfig, GetSubjectConfigError> {
117        let req = self.make_request(Method::GET, &["config", subject]);
118        let res: SubjectConfig = send_request(req).await?;
119        Ok(res)
120    }
121
122    /// Gets the latest version of the specified subject as well as all other
123    /// subjects referenced by that subject (recursively).
124    ///
125    /// The dependencies are returned in alphabetical order by subject name.
126    pub async fn get_subject_and_references(
127        &self,
128        subject: &str,
129    ) -> Result<(Subject, Vec<Subject>), GetBySubjectError> {
130        self.get_subject_and_references_by_version(subject, "latest".to_owned())
131            .await
132    }
133
134    /// Gets a subject and all other subjects referenced by that subject (recursively)
135    ///
136    /// The dependencies are returned in alphabetical order by subject name.
137    async fn get_subject_and_references_by_version(
138        &self,
139        subject: &str,
140        version: String,
141    ) -> Result<(Subject, Vec<Subject>), GetBySubjectError> {
142        let mut subjects = vec![];
143        let mut seen = BTreeSet::new();
144        let mut subjects_queue = vec![(subject.to_owned(), version)];
145        while let Some((subject, version)) = subjects_queue.pop() {
146            let req = self.make_request(Method::GET, &["subjects", &subject, "versions", &version]);
147            let res: GetBySubjectResponse = send_request(req).await?;
148            subjects.push(Subject {
149                schema: Schema {
150                    id: res.id,
151                    raw: res.schema,
152                },
153                version: res.version,
154                name: res.subject.clone(),
155            });
156            seen.insert(res.subject);
157            subjects_queue.extend(
158                res.references
159                    .into_iter()
160                    .filter(|r| !seen.contains(&r.subject))
161                    .map(|r| (r.subject, r.version.to_string())),
162            );
163        }
164        assert!(subjects.len() > 0, "Request should error if no subjects");
165
166        let primary = subjects.remove(0);
167        subjects.sort_by(|a, b| a.name.cmp(&b.name));
168        Ok((primary, subjects))
169    }
170
171    /// Publishes a new schema for the specified subject. The ID of the new
172    /// schema is returned.
173    ///
174    /// Note that if a schema that is identical to an existing schema for the
175    /// same subject is published, the ID of the existing schema will be
176    /// returned.
177    pub async fn publish_schema(
178        &self,
179        subject: &str,
180        schema: &str,
181        schema_type: SchemaType,
182        references: &[SchemaReference],
183    ) -> Result<i32, PublishError> {
184        let req = self.make_request(Method::POST, &["subjects", subject, "versions"]);
185        let req = req.json(&PublishRequest {
186            schema,
187            schema_type,
188            references,
189        });
190        let res: PublishResponse = send_request(req).await?;
191        Ok(res.id)
192    }
193
194    /// Sets the compatibility level for the specified subject.
195    pub async fn set_subject_compatibility_level(
196        &self,
197        subject: &str,
198        compatibility_level: CompatibilityLevel,
199    ) -> Result<(), SetCompatibilityLevelError> {
200        let req = self.make_request(Method::PUT, &["config", subject]);
201        let req = req.json(&CompatibilityLevelRequest {
202            compatibility: compatibility_level,
203        });
204        send_request_raw(req).await?;
205        Ok(())
206    }
207
208    /// Lists the names of all subjects that the schema registry is aware of.
209    pub async fn list_subjects(&self) -> Result<Vec<String>, ListError> {
210        let req = self.make_request(Method::GET, &["subjects"]);
211        Ok(send_request(req).await?)
212    }
213
214    /// Deletes all schema versions associated with the specified subject.
215    ///
216    /// This API is only intended to be used in development environments.
217    /// Deleting schemas only allows new, potentially incompatible schemas to
218    /// be registered under the same subject. It does not allow the schema ID
219    /// to be reused.
220    pub async fn delete_subject(&self, subject: &str) -> Result<(), DeleteError> {
221        let req = self.make_request(Method::DELETE, &["subjects", subject]);
222        send_request_raw(req).await?;
223        Ok(())
224    }
225
226    /// Gets the latest version of the first subject found associated with the scheme with
227    /// the given id, as well as all other subjects referenced by that subject (recursively).
228    ///
229    /// The dependencies are returned in alphabetical order by subject name.
230    pub async fn get_subject_and_references_by_id(
231        &self,
232        id: i32,
233    ) -> Result<(Subject, Vec<Subject>), GetBySubjectError> {
234        let req = self.make_request(
235            Method::GET,
236            &["schemas", "ids", &id.to_string(), "versions"],
237        );
238        let res: Vec<SubjectVersion> = send_request(req).await?;
239
240        // NOTE NOTE NOTE
241        // We take the FIRST subject that matches this schema id. This could be DIFFERENT
242        // than the actual subject we are interested in (it could even be from a different test
243        // run), but we are trusting the schema registry to only output the same schema id for
244        // identical subjects.
245        // This was validated by publishing 2 empty schemas (i.e., identical), with different
246        // references (one empty, one with a random reference), and they were not linked to the
247        // same schema id.
248        //
249        // See https://docs.confluent.io/platform/current/schema-registry/develop/api.html#post--subjects-(string-%20subject)-versions
250        // for more info.
251        match res.as_slice() {
252            [first, ..] => {
253                self.get_subject_and_references_by_version(
254                    &first.subject,
255                    first.version.to_string(),
256                )
257                .await
258            }
259            _ => Err(GetBySubjectError::SubjectNotFound),
260        }
261    }
262}
263
264async fn send_request<T>(req: reqwest::RequestBuilder) -> Result<T, UnhandledError>
265where
266    T: DeserializeOwned,
267{
268    let res = send_request_raw(req).await?;
269    Ok(res.json().await?)
270}
271
272async fn send_request_raw(req: reqwest::RequestBuilder) -> Result<Response, UnhandledError> {
273    let res = req.send().await?;
274    let status = res.status();
275    if status.is_success() {
276        Ok(res)
277    } else {
278        match res.json::<ErrorResponse>().await {
279            Ok(err_res) => Err(UnhandledError::Api {
280                code: err_res.error_code,
281                message: err_res.message,
282            }),
283            Err(_) => Err(UnhandledError::Api {
284                code: i32::from(status.as_u16()),
285                message: "unable to decode error details".into(),
286            }),
287        }
288    }
289}
290
291/// The type of a schema stored by a schema registry.
292#[derive(Clone, Copy, Debug, Serialize)]
293#[serde(rename_all = "UPPERCASE")]
294pub enum SchemaType {
295    /// An Avro schema.
296    Avro,
297    /// A Protobuf schema.
298    Protobuf,
299    /// A JSON schema.
300    Json,
301}
302
303impl SchemaType {
304    fn is_default(&self) -> bool {
305        matches!(self, SchemaType::Avro)
306    }
307}
308
309/// A schema stored by a schema registry.
310#[derive(Debug, Eq, PartialEq)]
311pub struct Schema {
312    /// The ID of the schema.
313    pub id: i32,
314    /// The raw text representing the schema.
315    pub raw: String,
316}
317
318/// A subject stored by a schema registry.
319#[derive(Debug, Eq, PartialEq)]
320pub struct Subject {
321    /// The version of the schema.
322    pub version: i32,
323    /// The name of the schema.
324    pub name: String,
325    /// The schema of the `version` of the `Subject`.
326    pub schema: Schema,
327}
328
329/// A reference from one schema in a schema registry to another.
330#[derive(Debug, Serialize, Deserialize)]
331#[serde(rename_all = "camelCase")]
332pub struct SchemaReference {
333    /// The name of the reference.
334    pub name: String,
335    /// The subject under which the referenced schema is registered.
336    pub subject: String,
337    /// The version of the referenced schema.
338    pub version: i32,
339}
340
341#[derive(Debug, Deserialize)]
342struct GetByIdResponse {
343    schema: String,
344}
345
346/// Errors for schema lookups by ID.
347#[derive(Debug)]
348pub enum GetByIdError {
349    /// No schema with the requested ID exists.
350    SchemaNotFound,
351    /// The underlying HTTP transport failed.
352    Transport(reqwest::Error),
353    /// An internal server error occurred.
354    Server { code: i32, message: String },
355}
356
357#[derive(Debug, Deserialize)]
358struct SubjectVersion {
359    /// The name of the subject
360    pub subject: String,
361    /// The version of the schema
362    pub version: i32,
363}
364
365impl From<UnhandledError> for GetByIdError {
366    fn from(err: UnhandledError) -> GetByIdError {
367        match err {
368            UnhandledError::Transport(err) => GetByIdError::Transport(err),
369            UnhandledError::Api { code, message } => match code {
370                40403 => GetByIdError::SchemaNotFound,
371                _ => GetByIdError::Server { code, message },
372            },
373        }
374    }
375}
376
377impl Error for GetByIdError {
378    fn source(&self) -> Option<&(dyn Error + 'static)> {
379        match self {
380            GetByIdError::SchemaNotFound | GetByIdError::Server { .. } => None,
381            GetByIdError::Transport(err) => Some(err),
382        }
383    }
384}
385
386impl fmt::Display for GetByIdError {
387    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
388        match self {
389            GetByIdError::SchemaNotFound => write!(f, "schema not found"),
390            GetByIdError::Transport(err) => write!(f, "transport: {}", err),
391            GetByIdError::Server { code, message } => {
392                write!(f, "server error {}: {}", code, message)
393            }
394        }
395    }
396}
397
398#[derive(Debug, Deserialize)]
399#[serde(rename_all = "camelCase")]
400pub struct SubjectConfig {
401    pub compatibility_level: CompatibilityLevel,
402    // There are other fields to include if we need them.
403}
404
405/// Errors for schema lookups by subject.
406#[derive(Debug)]
407pub enum GetSubjectConfigError {
408    /// The requested subject does not exist.
409    SubjectNotFound,
410    /// The compatibility level for the subject has not been set.
411    SubjectCompatibilityLevelNotSet,
412    /// The underlying HTTP transport failed.
413    Transport(reqwest::Error),
414    /// An internal server error occurred.
415    Server { code: i32, message: String },
416}
417
418impl From<UnhandledError> for GetSubjectConfigError {
419    fn from(err: UnhandledError) -> GetSubjectConfigError {
420        match err {
421            UnhandledError::Transport(err) => GetSubjectConfigError::Transport(err),
422            UnhandledError::Api { code, message } => match code {
423                404 => GetSubjectConfigError::SubjectNotFound,
424                40408 => GetSubjectConfigError::SubjectCompatibilityLevelNotSet,
425                _ => GetSubjectConfigError::Server { code, message },
426            },
427        }
428    }
429}
430
431impl Error for GetSubjectConfigError {
432    fn source(&self) -> Option<&(dyn Error + 'static)> {
433        match self {
434            GetSubjectConfigError::SubjectNotFound
435            | GetSubjectConfigError::SubjectCompatibilityLevelNotSet
436            | GetSubjectConfigError::Server { .. } => None,
437            GetSubjectConfigError::Transport(err) => Some(err),
438        }
439    }
440}
441
442impl fmt::Display for GetSubjectConfigError {
443    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
444        match self {
445            GetSubjectConfigError::SubjectNotFound => write!(f, "subject not found"),
446            GetSubjectConfigError::SubjectCompatibilityLevelNotSet => {
447                write!(f, "subject level compatibility not set")
448            }
449            GetSubjectConfigError::Transport(err) => write!(f, "transport: {}", err),
450            GetSubjectConfigError::Server { code, message } => {
451                write!(f, "server error {}: {}", code, message)
452            }
453        }
454    }
455}
456
457#[derive(Debug, Deserialize)]
458#[serde(rename_all = "camelCase")]
459struct GetBySubjectResponse {
460    id: i32,
461    schema: String,
462    version: i32,
463    subject: String,
464    #[serde(default)]
465    references: Vec<SchemaReference>,
466}
467
468/// Errors for schema lookups by subject.
469#[derive(Debug)]
470pub enum GetBySubjectError {
471    /// The requested subject does not exist.
472    SubjectNotFound,
473    /// The requested version does not exist.
474    VersionNotFound(String),
475    /// The underlying HTTP transport failed.
476    Transport(reqwest::Error),
477    /// An internal server error occurred.
478    Server { code: i32, message: String },
479}
480
481impl From<UnhandledError> for GetBySubjectError {
482    fn from(err: UnhandledError) -> GetBySubjectError {
483        match err {
484            UnhandledError::Transport(err) => GetBySubjectError::Transport(err),
485            UnhandledError::Api { code, message } => match code {
486                40401 => GetBySubjectError::SubjectNotFound,
487                40402 => GetBySubjectError::VersionNotFound(message),
488                _ => GetBySubjectError::Server { code, message },
489            },
490        }
491    }
492}
493
494impl Error for GetBySubjectError {
495    fn source(&self) -> Option<&(dyn Error + 'static)> {
496        match self {
497            GetBySubjectError::SubjectNotFound
498            | GetBySubjectError::VersionNotFound(_)
499            | GetBySubjectError::Server { .. } => None,
500            GetBySubjectError::Transport(err) => Some(err),
501        }
502    }
503}
504
505impl fmt::Display for GetBySubjectError {
506    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
507        match self {
508            GetBySubjectError::SubjectNotFound => write!(f, "subject not found"),
509            GetBySubjectError::VersionNotFound(message) => {
510                write!(f, "version not found: {}", message)
511            }
512            GetBySubjectError::Transport(err) => write!(f, "transport: {}", err),
513            GetBySubjectError::Server { code, message } => {
514                write!(f, "server error {}: {}", code, message)
515            }
516        }
517    }
518}
519
520#[derive(Debug, Serialize)]
521#[serde(rename_all = "camelCase")]
522struct PublishRequest<'a> {
523    schema: &'a str,
524    // Omitting the following fields when they're set to their defaults provides
525    // compatibility with old versions of the schema registry that don't
526    // understand these fields.
527    #[serde(skip_serializing_if = "SchemaType::is_default")]
528    schema_type: SchemaType,
529    #[serde(skip_serializing_if = "<[_]>::is_empty")]
530    references: &'a [SchemaReference],
531}
532
533#[derive(Debug, Deserialize)]
534#[serde(rename_all = "camelCase")]
535struct PublishResponse {
536    id: i32,
537}
538
539#[derive(Debug, Serialize)]
540#[serde(rename_all = "camelCase")]
541struct CompatibilityLevelRequest {
542    compatibility: CompatibilityLevel,
543}
544
545#[derive(Arbitrary, Clone, Copy, Debug, Serialize, Deserialize, Eq, PartialEq)]
546#[serde(rename_all = "SCREAMING_SNAKE_CASE")]
547pub enum CompatibilityLevel {
548    Backward,
549    BackwardTransitive,
550    Forward,
551    ForwardTransitive,
552    Full,
553    FullTransitive,
554    None,
555}
556
557impl TryFrom<&str> for CompatibilityLevel {
558    type Error = String;
559
560    fn try_from(value: &str) -> Result<Self, Self::Error> {
561        match value {
562            "BACKWARD" => Ok(CompatibilityLevel::Backward),
563            "BACKWARD_TRANSITIVE" => Ok(CompatibilityLevel::BackwardTransitive),
564            "FORWARD" => Ok(CompatibilityLevel::Forward),
565            "FORWARD_TRANSITIVE" => Ok(CompatibilityLevel::ForwardTransitive),
566            "FULL" => Ok(CompatibilityLevel::Full),
567            "FULL_TRANSITIVE" => Ok(CompatibilityLevel::FullTransitive),
568            "NONE" => Ok(CompatibilityLevel::None),
569            _ => Err(format!("invalid compatibility level: {}", value)),
570        }
571    }
572}
573
574impl fmt::Display for CompatibilityLevel {
575    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
576        match self {
577            CompatibilityLevel::Backward => write!(f, "BACKWARD"),
578            CompatibilityLevel::BackwardTransitive => write!(f, "BACKWARD_TRANSITIVE"),
579            CompatibilityLevel::Forward => write!(f, "FORWARD"),
580            CompatibilityLevel::ForwardTransitive => write!(f, "FORWARD_TRANSITIVE"),
581            CompatibilityLevel::Full => write!(f, "FULL"),
582            CompatibilityLevel::FullTransitive => write!(f, "FULL_TRANSITIVE"),
583            CompatibilityLevel::None => write!(f, "NONE"),
584        }
585    }
586}
587
588/// Errors for publish operations.
589#[derive(Debug)]
590pub enum PublishError {
591    /// The provided schema was not compatible with existing schemas for that
592    /// subject, according to the subject's forwards- or backwards-compatibility
593    /// requirements.
594    IncompatibleSchema,
595    /// The provided schema was invalid.
596    InvalidSchema { message: String },
597    /// The underlying HTTP transport failed.
598    Transport(reqwest::Error),
599    /// An internal server error occurred.
600    Server { code: i32, message: String },
601}
602
603impl From<UnhandledError> for PublishError {
604    fn from(err: UnhandledError) -> PublishError {
605        match err {
606            UnhandledError::Transport(err) => PublishError::Transport(err),
607            UnhandledError::Api { code, message } => match code {
608                409 => PublishError::IncompatibleSchema,
609                42201 => PublishError::InvalidSchema { message },
610                _ => PublishError::Server { code, message },
611            },
612        }
613    }
614}
615
616impl Error for PublishError {
617    fn source(&self) -> Option<&(dyn Error + 'static)> {
618        match self {
619            PublishError::IncompatibleSchema
620            | PublishError::InvalidSchema { .. }
621            | PublishError::Server { .. } => None,
622            PublishError::Transport(err) => Some(err),
623        }
624    }
625}
626
627impl fmt::Display for PublishError {
628    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
629        match self {
630            // The error descriptions for IncompatibleSchema and InvalidSchema
631            // are copied from the schema registry itself.
632            PublishError::IncompatibleSchema => write!(
633                f,
634                "schema being registered is incompatible with an earlier schema"
635            ),
636            PublishError::InvalidSchema { message } => write!(f, "{}", message),
637            PublishError::Transport(err) => write!(f, "transport: {}", err),
638            PublishError::Server { code, message } => {
639                write!(f, "server error {}: {}", code, message)
640            }
641        }
642    }
643}
644
645/// Errors for list operations.
646#[derive(Debug)]
647pub enum ListError {
648    /// The underlying HTTP transport failed.
649    Transport(reqwest::Error),
650    /// An internal server error occurred.
651    Server { code: i32, message: String },
652}
653
654impl From<UnhandledError> for ListError {
655    fn from(err: UnhandledError) -> ListError {
656        match err {
657            UnhandledError::Transport(err) => ListError::Transport(err),
658            UnhandledError::Api { code, message } => ListError::Server { code, message },
659        }
660    }
661}
662
663impl Error for ListError {
664    fn source(&self) -> Option<&(dyn Error + 'static)> {
665        match self {
666            ListError::Server { .. } => None,
667            ListError::Transport(err) => Some(err),
668        }
669    }
670}
671
672impl fmt::Display for ListError {
673    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
674        match self {
675            ListError::Transport(err) => write!(f, "transport: {}", err),
676            ListError::Server { code, message } => write!(f, "server error {}: {}", code, message),
677        }
678    }
679}
680
681/// Errors for delete operations.
682#[derive(Debug)]
683pub enum DeleteError {
684    /// The specified subject does not exist.
685    SubjectNotFound,
686    /// The underlying HTTP transport failed.
687    Transport(reqwest::Error),
688    /// An internal server error occurred.
689    Server { code: i32, message: String },
690}
691
692impl From<UnhandledError> for DeleteError {
693    fn from(err: UnhandledError) -> DeleteError {
694        match err {
695            UnhandledError::Transport(err) => DeleteError::Transport(err),
696            UnhandledError::Api { code, message } => match code {
697                40401 => DeleteError::SubjectNotFound,
698                _ => DeleteError::Server { code, message },
699            },
700        }
701    }
702}
703
704impl Error for DeleteError {
705    fn source(&self) -> Option<&(dyn Error + 'static)> {
706        match self {
707            DeleteError::SubjectNotFound | DeleteError::Server { .. } => None,
708            DeleteError::Transport(err) => Some(err),
709        }
710    }
711}
712
713impl fmt::Display for DeleteError {
714    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
715        match self {
716            DeleteError::SubjectNotFound => write!(f, "subject not found"),
717            DeleteError::Transport(err) => write!(f, "transport: {}", err),
718            DeleteError::Server { code, message } => {
719                write!(f, "server error {}: {}", code, message)
720            }
721        }
722    }
723}
724
725/// Errors for setting compatibility level operations.
726#[derive(Debug)]
727pub enum SetCompatibilityLevelError {
728    /// The compatibility level is invalid.
729    InvalidCompatibilityLevel,
730    /// The underlying HTTP transport failed.
731    Transport(reqwest::Error),
732    /// An internal server error occurred.
733    Server { code: i32, message: String },
734}
735
736impl From<UnhandledError> for SetCompatibilityLevelError {
737    fn from(err: UnhandledError) -> SetCompatibilityLevelError {
738        match err {
739            UnhandledError::Transport(err) => SetCompatibilityLevelError::Transport(err),
740            UnhandledError::Api { code, message } => match code {
741                42203 => SetCompatibilityLevelError::InvalidCompatibilityLevel,
742                _ => SetCompatibilityLevelError::Server { code, message },
743            },
744        }
745    }
746}
747
748impl Error for SetCompatibilityLevelError {
749    fn source(&self) -> Option<&(dyn Error + 'static)> {
750        match self {
751            SetCompatibilityLevelError::InvalidCompatibilityLevel
752            | SetCompatibilityLevelError::Server { .. } => None,
753            SetCompatibilityLevelError::Transport(err) => Some(err),
754        }
755    }
756}
757
758impl fmt::Display for SetCompatibilityLevelError {
759    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
760        match self {
761            SetCompatibilityLevelError::InvalidCompatibilityLevel => {
762                write!(f, "invalid compatibility level")
763            }
764            SetCompatibilityLevelError::Transport(err) => write!(f, "transport: {}", err),
765            SetCompatibilityLevelError::Server { code, message } => {
766                write!(f, "server error {}: {}", code, message)
767            }
768        }
769    }
770}
771
772#[derive(Debug, Deserialize)]
773struct ErrorResponse {
774    error_code: i32,
775    message: String,
776}
777
778#[derive(Debug)]
779enum UnhandledError {
780    Transport(reqwest::Error),
781    Api { code: i32, message: String },
782}
783
784impl From<reqwest::Error> for UnhandledError {
785    fn from(err: reqwest::Error) -> UnhandledError {
786        UnhandledError::Transport(err)
787    }
788}