1use std::collections::BTreeSet;
11use std::error::Error;
12use std::fmt;
13use std::sync::Arc;
14use std::time::Duration;
15
16use anyhow::bail;
17use proptest_derive::Arbitrary;
18use reqwest::{Method, Response, Url};
19use serde::de::DeserializeOwned;
20use serde::{Deserialize, Serialize};
21
22use crate::config::Auth;
23
24#[derive(Clone)]
26pub struct Client {
27 inner: reqwest::Client,
28 url: Arc<dyn Fn() -> Url + Send + Sync + 'static>,
29 auth: Option<Auth>,
30 timeout: Duration,
31}
32
33impl fmt::Debug for Client {
34 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
35 f.debug_struct("Client")
36 .field("inner", &self.inner)
37 .field("url", &"...")
38 .field("auth", &self.auth)
39 .finish()
40 }
41}
42
43impl Client {
44 pub(crate) fn new(
45 inner: reqwest::Client,
46 url: Arc<dyn Fn() -> Url + Send + Sync + 'static>,
47 auth: Option<Auth>,
48 timeout: Duration,
49 ) -> Result<Self, anyhow::Error> {
50 if url().cannot_be_a_base() {
51 bail!("cannot construct a CCSR client with a cannot-be-a-base URL");
52 }
53 Ok(Client {
54 inner,
55 url,
56 auth,
57 timeout,
58 })
59 }
60
61 fn make_request<P>(&self, method: Method, path: P) -> reqwest::RequestBuilder
62 where
63 P: IntoIterator,
64 P::Item: AsRef<str>,
65 {
66 let mut url = (self.url)();
67 url.path_segments_mut()
68 .expect("constructor validated URL can be a base")
69 .clear()
70 .extend(path);
71
72 let mut request = self.inner.request(method, url);
73 if let Some(auth) = &self.auth {
74 request = request.basic_auth(&auth.username, auth.password.as_ref());
75 }
76 request
77 }
78
79 pub fn timeout(&self) -> Duration {
80 self.timeout
81 }
82
83 pub async fn get_schema_by_id(&self, id: i32) -> Result<Schema, GetByIdError> {
85 let req = self.make_request(Method::GET, &["schemas", "ids", &id.to_string()]);
86 let res: GetByIdResponse = send_request(req).await?;
87 Ok(Schema {
88 id,
89 raw: res.schema,
90 })
91 }
92
93 pub async fn get_schema_by_subject(&self, subject: &str) -> Result<Schema, GetBySubjectError> {
95 self.get_subject_latest(subject).await.map(|s| s.schema)
96 }
97
98 pub async fn get_subject_latest(&self, subject: &str) -> Result<Subject, GetBySubjectError> {
100 let req = self.make_request(Method::GET, &["subjects", subject, "versions", "latest"]);
101 let res: GetBySubjectResponse = send_request(req).await?;
102 Ok(Subject {
103 schema: Schema {
104 id: res.id,
105 raw: res.schema,
106 },
107 version: res.version,
108 name: res.subject,
109 })
110 }
111
112 pub async fn get_subject_config(
114 &self,
115 subject: &str,
116 ) -> Result<SubjectConfig, GetSubjectConfigError> {
117 let req = self.make_request(Method::GET, &["config", subject]);
118 let res: SubjectConfig = send_request(req).await?;
119 Ok(res)
120 }
121
122 pub async fn get_subject_and_references(
127 &self,
128 subject: &str,
129 ) -> Result<(Subject, Vec<Subject>), GetBySubjectError> {
130 self.get_subject_and_references_by_version(subject, "latest".to_owned())
131 .await
132 }
133
134 async fn get_subject_and_references_by_version(
138 &self,
139 subject: &str,
140 version: String,
141 ) -> Result<(Subject, Vec<Subject>), GetBySubjectError> {
142 let mut subjects = vec![];
143 let mut seen = BTreeSet::new();
144 let mut subjects_queue = vec![(subject.to_owned(), version)];
145 while let Some((subject, version)) = subjects_queue.pop() {
146 let req = self.make_request(Method::GET, &["subjects", &subject, "versions", &version]);
147 let res: GetBySubjectResponse = send_request(req).await?;
148 subjects.push(Subject {
149 schema: Schema {
150 id: res.id,
151 raw: res.schema,
152 },
153 version: res.version,
154 name: res.subject.clone(),
155 });
156 seen.insert(res.subject);
157 subjects_queue.extend(
158 res.references
159 .into_iter()
160 .filter(|r| !seen.contains(&r.subject))
161 .map(|r| (r.subject, r.version.to_string())),
162 );
163 }
164 assert!(subjects.len() > 0, "Request should error if no subjects");
165
166 let primary = subjects.remove(0);
167 subjects.sort_by(|a, b| a.name.cmp(&b.name));
168 Ok((primary, subjects))
169 }
170
171 pub async fn publish_schema(
178 &self,
179 subject: &str,
180 schema: &str,
181 schema_type: SchemaType,
182 references: &[SchemaReference],
183 ) -> Result<i32, PublishError> {
184 let req = self.make_request(Method::POST, &["subjects", subject, "versions"]);
185 let req = req.json(&PublishRequest {
186 schema,
187 schema_type,
188 references,
189 });
190 let res: PublishResponse = send_request(req).await?;
191 Ok(res.id)
192 }
193
194 pub async fn set_subject_compatibility_level(
196 &self,
197 subject: &str,
198 compatibility_level: CompatibilityLevel,
199 ) -> Result<(), SetCompatibilityLevelError> {
200 let req = self.make_request(Method::PUT, &["config", subject]);
201 let req = req.json(&CompatibilityLevelRequest {
202 compatibility: compatibility_level,
203 });
204 send_request_raw(req).await?;
205 Ok(())
206 }
207
208 pub async fn list_subjects(&self) -> Result<Vec<String>, ListError> {
210 let req = self.make_request(Method::GET, &["subjects"]);
211 Ok(send_request(req).await?)
212 }
213
214 pub async fn delete_subject(&self, subject: &str) -> Result<(), DeleteError> {
221 let req = self.make_request(Method::DELETE, &["subjects", subject]);
222 send_request_raw(req).await?;
223 Ok(())
224 }
225
226 pub async fn get_subject_and_references_by_id(
231 &self,
232 id: i32,
233 ) -> Result<(Subject, Vec<Subject>), GetBySubjectError> {
234 let req = self.make_request(
235 Method::GET,
236 &["schemas", "ids", &id.to_string(), "versions"],
237 );
238 let res: Vec<SubjectVersion> = send_request(req).await?;
239
240 match res.as_slice() {
252 [first, ..] => {
253 self.get_subject_and_references_by_version(
254 &first.subject,
255 first.version.to_string(),
256 )
257 .await
258 }
259 _ => Err(GetBySubjectError::SubjectNotFound),
260 }
261 }
262}
263
264async fn send_request<T>(req: reqwest::RequestBuilder) -> Result<T, UnhandledError>
265where
266 T: DeserializeOwned,
267{
268 let res = send_request_raw(req).await?;
269 Ok(res.json().await?)
270}
271
272async fn send_request_raw(req: reqwest::RequestBuilder) -> Result<Response, UnhandledError> {
273 let res = req.send().await?;
274 let status = res.status();
275 if status.is_success() {
276 Ok(res)
277 } else {
278 match res.json::<ErrorResponse>().await {
279 Ok(err_res) => Err(UnhandledError::Api {
280 code: err_res.error_code,
281 message: err_res.message,
282 }),
283 Err(_) => Err(UnhandledError::Api {
284 code: i32::from(status.as_u16()),
285 message: "unable to decode error details".into(),
286 }),
287 }
288 }
289}
290
291#[derive(Clone, Copy, Debug, Serialize)]
293#[serde(rename_all = "UPPERCASE")]
294pub enum SchemaType {
295 Avro,
297 Protobuf,
299 Json,
301}
302
303impl SchemaType {
304 fn is_default(&self) -> bool {
305 matches!(self, SchemaType::Avro)
306 }
307}
308
309#[derive(Debug, Eq, PartialEq)]
311pub struct Schema {
312 pub id: i32,
314 pub raw: String,
316}
317
318#[derive(Debug, Eq, PartialEq)]
320pub struct Subject {
321 pub version: i32,
323 pub name: String,
325 pub schema: Schema,
327}
328
329#[derive(Debug, Serialize, Deserialize)]
331#[serde(rename_all = "camelCase")]
332pub struct SchemaReference {
333 pub name: String,
335 pub subject: String,
337 pub version: i32,
339}
340
341#[derive(Debug, Deserialize)]
342struct GetByIdResponse {
343 schema: String,
344}
345
346#[derive(Debug)]
348pub enum GetByIdError {
349 SchemaNotFound,
351 Transport(reqwest::Error),
353 Server { code: i32, message: String },
355}
356
357#[derive(Debug, Deserialize)]
358struct SubjectVersion {
359 pub subject: String,
361 pub version: i32,
363}
364
365impl From<UnhandledError> for GetByIdError {
366 fn from(err: UnhandledError) -> GetByIdError {
367 match err {
368 UnhandledError::Transport(err) => GetByIdError::Transport(err),
369 UnhandledError::Api { code, message } => match code {
370 40403 => GetByIdError::SchemaNotFound,
371 _ => GetByIdError::Server { code, message },
372 },
373 }
374 }
375}
376
377impl Error for GetByIdError {
378 fn source(&self) -> Option<&(dyn Error + 'static)> {
379 match self {
380 GetByIdError::SchemaNotFound | GetByIdError::Server { .. } => None,
381 GetByIdError::Transport(err) => Some(err),
382 }
383 }
384}
385
386impl fmt::Display for GetByIdError {
387 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
388 match self {
389 GetByIdError::SchemaNotFound => write!(f, "schema not found"),
390 GetByIdError::Transport(err) => write!(f, "transport: {}", err),
391 GetByIdError::Server { code, message } => {
392 write!(f, "server error {}: {}", code, message)
393 }
394 }
395 }
396}
397
398#[derive(Debug, Deserialize)]
399#[serde(rename_all = "camelCase")]
400pub struct SubjectConfig {
401 pub compatibility_level: CompatibilityLevel,
402 }
404
405#[derive(Debug)]
407pub enum GetSubjectConfigError {
408 SubjectNotFound,
410 SubjectCompatibilityLevelNotSet,
412 Transport(reqwest::Error),
414 Server { code: i32, message: String },
416}
417
418impl From<UnhandledError> for GetSubjectConfigError {
419 fn from(err: UnhandledError) -> GetSubjectConfigError {
420 match err {
421 UnhandledError::Transport(err) => GetSubjectConfigError::Transport(err),
422 UnhandledError::Api { code, message } => match code {
423 404 => GetSubjectConfigError::SubjectNotFound,
424 40408 => GetSubjectConfigError::SubjectCompatibilityLevelNotSet,
425 _ => GetSubjectConfigError::Server { code, message },
426 },
427 }
428 }
429}
430
431impl Error for GetSubjectConfigError {
432 fn source(&self) -> Option<&(dyn Error + 'static)> {
433 match self {
434 GetSubjectConfigError::SubjectNotFound
435 | GetSubjectConfigError::SubjectCompatibilityLevelNotSet
436 | GetSubjectConfigError::Server { .. } => None,
437 GetSubjectConfigError::Transport(err) => Some(err),
438 }
439 }
440}
441
442impl fmt::Display for GetSubjectConfigError {
443 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
444 match self {
445 GetSubjectConfigError::SubjectNotFound => write!(f, "subject not found"),
446 GetSubjectConfigError::SubjectCompatibilityLevelNotSet => {
447 write!(f, "subject level compatibility not set")
448 }
449 GetSubjectConfigError::Transport(err) => write!(f, "transport: {}", err),
450 GetSubjectConfigError::Server { code, message } => {
451 write!(f, "server error {}: {}", code, message)
452 }
453 }
454 }
455}
456
457#[derive(Debug, Deserialize)]
458#[serde(rename_all = "camelCase")]
459struct GetBySubjectResponse {
460 id: i32,
461 schema: String,
462 version: i32,
463 subject: String,
464 #[serde(default)]
465 references: Vec<SchemaReference>,
466}
467
468#[derive(Debug)]
470pub enum GetBySubjectError {
471 SubjectNotFound,
473 VersionNotFound(String),
475 Transport(reqwest::Error),
477 Server { code: i32, message: String },
479}
480
481impl From<UnhandledError> for GetBySubjectError {
482 fn from(err: UnhandledError) -> GetBySubjectError {
483 match err {
484 UnhandledError::Transport(err) => GetBySubjectError::Transport(err),
485 UnhandledError::Api { code, message } => match code {
486 40401 => GetBySubjectError::SubjectNotFound,
487 40402 => GetBySubjectError::VersionNotFound(message),
488 _ => GetBySubjectError::Server { code, message },
489 },
490 }
491 }
492}
493
494impl Error for GetBySubjectError {
495 fn source(&self) -> Option<&(dyn Error + 'static)> {
496 match self {
497 GetBySubjectError::SubjectNotFound
498 | GetBySubjectError::VersionNotFound(_)
499 | GetBySubjectError::Server { .. } => None,
500 GetBySubjectError::Transport(err) => Some(err),
501 }
502 }
503}
504
505impl fmt::Display for GetBySubjectError {
506 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
507 match self {
508 GetBySubjectError::SubjectNotFound => write!(f, "subject not found"),
509 GetBySubjectError::VersionNotFound(message) => {
510 write!(f, "version not found: {}", message)
511 }
512 GetBySubjectError::Transport(err) => write!(f, "transport: {}", err),
513 GetBySubjectError::Server { code, message } => {
514 write!(f, "server error {}: {}", code, message)
515 }
516 }
517 }
518}
519
520#[derive(Debug, Serialize)]
521#[serde(rename_all = "camelCase")]
522struct PublishRequest<'a> {
523 schema: &'a str,
524 #[serde(skip_serializing_if = "SchemaType::is_default")]
528 schema_type: SchemaType,
529 #[serde(skip_serializing_if = "<[_]>::is_empty")]
530 references: &'a [SchemaReference],
531}
532
533#[derive(Debug, Deserialize)]
534#[serde(rename_all = "camelCase")]
535struct PublishResponse {
536 id: i32,
537}
538
539#[derive(Debug, Serialize)]
540#[serde(rename_all = "camelCase")]
541struct CompatibilityLevelRequest {
542 compatibility: CompatibilityLevel,
543}
544
545#[derive(Arbitrary, Clone, Copy, Debug, Serialize, Deserialize, Eq, PartialEq)]
546#[serde(rename_all = "SCREAMING_SNAKE_CASE")]
547pub enum CompatibilityLevel {
548 Backward,
549 BackwardTransitive,
550 Forward,
551 ForwardTransitive,
552 Full,
553 FullTransitive,
554 None,
555}
556
557impl TryFrom<&str> for CompatibilityLevel {
558 type Error = String;
559
560 fn try_from(value: &str) -> Result<Self, Self::Error> {
561 match value {
562 "BACKWARD" => Ok(CompatibilityLevel::Backward),
563 "BACKWARD_TRANSITIVE" => Ok(CompatibilityLevel::BackwardTransitive),
564 "FORWARD" => Ok(CompatibilityLevel::Forward),
565 "FORWARD_TRANSITIVE" => Ok(CompatibilityLevel::ForwardTransitive),
566 "FULL" => Ok(CompatibilityLevel::Full),
567 "FULL_TRANSITIVE" => Ok(CompatibilityLevel::FullTransitive),
568 "NONE" => Ok(CompatibilityLevel::None),
569 _ => Err(format!("invalid compatibility level: {}", value)),
570 }
571 }
572}
573
574impl fmt::Display for CompatibilityLevel {
575 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
576 match self {
577 CompatibilityLevel::Backward => write!(f, "BACKWARD"),
578 CompatibilityLevel::BackwardTransitive => write!(f, "BACKWARD_TRANSITIVE"),
579 CompatibilityLevel::Forward => write!(f, "FORWARD"),
580 CompatibilityLevel::ForwardTransitive => write!(f, "FORWARD_TRANSITIVE"),
581 CompatibilityLevel::Full => write!(f, "FULL"),
582 CompatibilityLevel::FullTransitive => write!(f, "FULL_TRANSITIVE"),
583 CompatibilityLevel::None => write!(f, "NONE"),
584 }
585 }
586}
587
588#[derive(Debug)]
590pub enum PublishError {
591 IncompatibleSchema,
595 InvalidSchema { message: String },
597 Transport(reqwest::Error),
599 Server { code: i32, message: String },
601}
602
603impl From<UnhandledError> for PublishError {
604 fn from(err: UnhandledError) -> PublishError {
605 match err {
606 UnhandledError::Transport(err) => PublishError::Transport(err),
607 UnhandledError::Api { code, message } => match code {
608 409 => PublishError::IncompatibleSchema,
609 42201 => PublishError::InvalidSchema { message },
610 _ => PublishError::Server { code, message },
611 },
612 }
613 }
614}
615
616impl Error for PublishError {
617 fn source(&self) -> Option<&(dyn Error + 'static)> {
618 match self {
619 PublishError::IncompatibleSchema
620 | PublishError::InvalidSchema { .. }
621 | PublishError::Server { .. } => None,
622 PublishError::Transport(err) => Some(err),
623 }
624 }
625}
626
627impl fmt::Display for PublishError {
628 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
629 match self {
630 PublishError::IncompatibleSchema => write!(
633 f,
634 "schema being registered is incompatible with an earlier schema"
635 ),
636 PublishError::InvalidSchema { message } => write!(f, "{}", message),
637 PublishError::Transport(err) => write!(f, "transport: {}", err),
638 PublishError::Server { code, message } => {
639 write!(f, "server error {}: {}", code, message)
640 }
641 }
642 }
643}
644
645#[derive(Debug)]
647pub enum ListError {
648 Transport(reqwest::Error),
650 Server { code: i32, message: String },
652}
653
654impl From<UnhandledError> for ListError {
655 fn from(err: UnhandledError) -> ListError {
656 match err {
657 UnhandledError::Transport(err) => ListError::Transport(err),
658 UnhandledError::Api { code, message } => ListError::Server { code, message },
659 }
660 }
661}
662
663impl Error for ListError {
664 fn source(&self) -> Option<&(dyn Error + 'static)> {
665 match self {
666 ListError::Server { .. } => None,
667 ListError::Transport(err) => Some(err),
668 }
669 }
670}
671
672impl fmt::Display for ListError {
673 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
674 match self {
675 ListError::Transport(err) => write!(f, "transport: {}", err),
676 ListError::Server { code, message } => write!(f, "server error {}: {}", code, message),
677 }
678 }
679}
680
681#[derive(Debug)]
683pub enum DeleteError {
684 SubjectNotFound,
686 Transport(reqwest::Error),
688 Server { code: i32, message: String },
690}
691
692impl From<UnhandledError> for DeleteError {
693 fn from(err: UnhandledError) -> DeleteError {
694 match err {
695 UnhandledError::Transport(err) => DeleteError::Transport(err),
696 UnhandledError::Api { code, message } => match code {
697 40401 => DeleteError::SubjectNotFound,
698 _ => DeleteError::Server { code, message },
699 },
700 }
701 }
702}
703
704impl Error for DeleteError {
705 fn source(&self) -> Option<&(dyn Error + 'static)> {
706 match self {
707 DeleteError::SubjectNotFound | DeleteError::Server { .. } => None,
708 DeleteError::Transport(err) => Some(err),
709 }
710 }
711}
712
713impl fmt::Display for DeleteError {
714 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
715 match self {
716 DeleteError::SubjectNotFound => write!(f, "subject not found"),
717 DeleteError::Transport(err) => write!(f, "transport: {}", err),
718 DeleteError::Server { code, message } => {
719 write!(f, "server error {}: {}", code, message)
720 }
721 }
722 }
723}
724
725#[derive(Debug)]
727pub enum SetCompatibilityLevelError {
728 InvalidCompatibilityLevel,
730 Transport(reqwest::Error),
732 Server { code: i32, message: String },
734}
735
736impl From<UnhandledError> for SetCompatibilityLevelError {
737 fn from(err: UnhandledError) -> SetCompatibilityLevelError {
738 match err {
739 UnhandledError::Transport(err) => SetCompatibilityLevelError::Transport(err),
740 UnhandledError::Api { code, message } => match code {
741 42203 => SetCompatibilityLevelError::InvalidCompatibilityLevel,
742 _ => SetCompatibilityLevelError::Server { code, message },
743 },
744 }
745 }
746}
747
748impl Error for SetCompatibilityLevelError {
749 fn source(&self) -> Option<&(dyn Error + 'static)> {
750 match self {
751 SetCompatibilityLevelError::InvalidCompatibilityLevel
752 | SetCompatibilityLevelError::Server { .. } => None,
753 SetCompatibilityLevelError::Transport(err) => Some(err),
754 }
755 }
756}
757
758impl fmt::Display for SetCompatibilityLevelError {
759 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
760 match self {
761 SetCompatibilityLevelError::InvalidCompatibilityLevel => {
762 write!(f, "invalid compatibility level")
763 }
764 SetCompatibilityLevelError::Transport(err) => write!(f, "transport: {}", err),
765 SetCompatibilityLevelError::Server { code, message } => {
766 write!(f, "server error {}: {}", code, message)
767 }
768 }
769 }
770}
771
772#[derive(Debug, Deserialize)]
773struct ErrorResponse {
774 error_code: i32,
775 message: String,
776}
777
778#[derive(Debug)]
779enum UnhandledError {
780 Transport(reqwest::Error),
781 Api { code: i32, message: String },
782}
783
784impl From<reqwest::Error> for UnhandledError {
785 fn from(err: reqwest::Error) -> UnhandledError {
786 UnhandledError::Transport(err)
787 }
788}