use std::collections::BTreeSet;
use std::error::Error;
use std::fmt;
use std::sync::Arc;
use std::time::Duration;
use anyhow::bail;
use proptest_derive::Arbitrary;
use reqwest::{Method, Response, Url};
use serde::de::DeserializeOwned;
use serde::{Deserialize, Serialize};
use crate::config::Auth;
#[derive(Clone)]
pub struct Client {
inner: reqwest::Client,
url: Arc<dyn Fn() -> Url + Send + Sync + 'static>,
auth: Option<Auth>,
timeout: Duration,
}
impl fmt::Debug for Client {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Client")
.field("inner", &self.inner)
.field("url", &"...")
.field("auth", &self.auth)
.finish()
}
}
impl Client {
pub(crate) fn new(
inner: reqwest::Client,
url: Arc<dyn Fn() -> Url + Send + Sync + 'static>,
auth: Option<Auth>,
timeout: Duration,
) -> Result<Self, anyhow::Error> {
if url().cannot_be_a_base() {
bail!("cannot construct a CCSR client with a cannot-be-a-base URL");
}
Ok(Client {
inner,
url,
auth,
timeout,
})
}
fn make_request<P>(&self, method: Method, path: P) -> reqwest::RequestBuilder
where
P: IntoIterator,
P::Item: AsRef<str>,
{
let mut url = (self.url)();
url.path_segments_mut()
.expect("constructor validated URL can be a base")
.clear()
.extend(path);
let mut request = self.inner.request(method, url);
if let Some(auth) = &self.auth {
request = request.basic_auth(&auth.username, auth.password.as_ref());
}
request
}
pub fn timeout(&self) -> Duration {
self.timeout
}
pub async fn get_schema_by_id(&self, id: i32) -> Result<Schema, GetByIdError> {
let req = self.make_request(Method::GET, &["schemas", "ids", &id.to_string()]);
let res: GetByIdResponse = send_request(req).await?;
Ok(Schema {
id,
raw: res.schema,
})
}
pub async fn get_schema_by_subject(&self, subject: &str) -> Result<Schema, GetBySubjectError> {
self.get_subject_latest(subject).await.map(|s| s.schema)
}
pub async fn get_subject_latest(&self, subject: &str) -> Result<Subject, GetBySubjectError> {
let req = self.make_request(Method::GET, &["subjects", subject, "versions", "latest"]);
let res: GetBySubjectResponse = send_request(req).await?;
Ok(Subject {
schema: Schema {
id: res.id,
raw: res.schema,
},
version: res.version,
name: res.subject,
})
}
pub async fn get_subject_config(
&self,
subject: &str,
) -> Result<SubjectConfig, GetSubjectConfigError> {
let req = self.make_request(Method::GET, &["config", subject]);
let res: SubjectConfig = send_request(req).await?;
Ok(res)
}
pub async fn get_subject_and_references(
&self,
subject: &str,
) -> Result<(Subject, Vec<Subject>), GetBySubjectError> {
self.get_subject_and_references_by_version(subject, "latest".to_owned())
.await
}
async fn get_subject_and_references_by_version(
&self,
subject: &str,
version: String,
) -> Result<(Subject, Vec<Subject>), GetBySubjectError> {
let mut subjects = vec![];
let mut seen = BTreeSet::new();
let mut subjects_queue = vec![(subject.to_owned(), version)];
while let Some((subject, version)) = subjects_queue.pop() {
let req = self.make_request(Method::GET, &["subjects", &subject, "versions", &version]);
let res: GetBySubjectResponse = send_request(req).await?;
subjects.push(Subject {
schema: Schema {
id: res.id,
raw: res.schema,
},
version: res.version,
name: res.subject.clone(),
});
seen.insert(res.subject);
subjects_queue.extend(
res.references
.into_iter()
.filter(|r| !seen.contains(&r.subject))
.map(|r| (r.subject, r.version.to_string())),
);
}
assert!(subjects.len() > 0, "Request should error if no subjects");
let primary = subjects.remove(0);
subjects.sort_by(|a, b| a.name.cmp(&b.name));
Ok((primary, subjects))
}
pub async fn publish_schema(
&self,
subject: &str,
schema: &str,
schema_type: SchemaType,
references: &[SchemaReference],
) -> Result<i32, PublishError> {
let req = self.make_request(Method::POST, &["subjects", subject, "versions"]);
let req = req.json(&PublishRequest {
schema,
schema_type,
references,
});
let res: PublishResponse = send_request(req).await?;
Ok(res.id)
}
pub async fn set_subject_compatibility_level(
&self,
subject: &str,
compatibility_level: CompatibilityLevel,
) -> Result<(), SetCompatibilityLevelError> {
let req = self.make_request(Method::PUT, &["config", subject]);
let req = req.json(&CompatibilityLevelRequest {
compatibility: compatibility_level,
});
send_request_raw(req).await?;
Ok(())
}
pub async fn list_subjects(&self) -> Result<Vec<String>, ListError> {
let req = self.make_request(Method::GET, &["subjects"]);
Ok(send_request(req).await?)
}
pub async fn delete_subject(&self, subject: &str) -> Result<(), DeleteError> {
let req = self.make_request(Method::DELETE, &["subjects", subject]);
send_request_raw(req).await?;
Ok(())
}
pub async fn get_subject_and_references_by_id(
&self,
id: i32,
) -> Result<(Subject, Vec<Subject>), GetBySubjectError> {
let req = self.make_request(
Method::GET,
&["schemas", "ids", &id.to_string(), "versions"],
);
let res: Vec<SubjectVersion> = send_request(req).await?;
match res.as_slice() {
[first, ..] => {
self.get_subject_and_references_by_version(
&first.subject,
first.version.to_string(),
)
.await
}
_ => Err(GetBySubjectError::SubjectNotFound),
}
}
}
async fn send_request<T>(req: reqwest::RequestBuilder) -> Result<T, UnhandledError>
where
T: DeserializeOwned,
{
let res = send_request_raw(req).await?;
Ok(res.json().await?)
}
async fn send_request_raw(req: reqwest::RequestBuilder) -> Result<Response, UnhandledError> {
let res = req.send().await?;
let status = res.status();
if status.is_success() {
Ok(res)
} else {
match res.json::<ErrorResponse>().await {
Ok(err_res) => Err(UnhandledError::Api {
code: err_res.error_code,
message: err_res.message,
}),
Err(_) => Err(UnhandledError::Api {
code: i32::from(status.as_u16()),
message: "unable to decode error details".into(),
}),
}
}
}
#[derive(Clone, Copy, Debug, Serialize)]
#[serde(rename_all = "UPPERCASE")]
pub enum SchemaType {
Avro,
Protobuf,
Json,
}
impl SchemaType {
fn is_default(&self) -> bool {
matches!(self, SchemaType::Avro)
}
}
#[derive(Debug, Eq, PartialEq)]
pub struct Schema {
pub id: i32,
pub raw: String,
}
#[derive(Debug, Eq, PartialEq)]
pub struct Subject {
pub version: i32,
pub name: String,
pub schema: Schema,
}
#[derive(Debug, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct SchemaReference {
pub name: String,
pub subject: String,
pub version: i32,
}
#[derive(Debug, Deserialize)]
struct GetByIdResponse {
schema: String,
}
#[derive(Debug)]
pub enum GetByIdError {
SchemaNotFound,
Transport(reqwest::Error),
Server { code: i32, message: String },
}
#[derive(Debug, Deserialize)]
struct SubjectVersion {
pub subject: String,
pub version: i32,
}
impl From<UnhandledError> for GetByIdError {
fn from(err: UnhandledError) -> GetByIdError {
match err {
UnhandledError::Transport(err) => GetByIdError::Transport(err),
UnhandledError::Api { code, message } => match code {
40403 => GetByIdError::SchemaNotFound,
_ => GetByIdError::Server { code, message },
},
}
}
}
impl Error for GetByIdError {
fn source(&self) -> Option<&(dyn Error + 'static)> {
match self {
GetByIdError::SchemaNotFound | GetByIdError::Server { .. } => None,
GetByIdError::Transport(err) => Some(err),
}
}
}
impl fmt::Display for GetByIdError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
GetByIdError::SchemaNotFound => write!(f, "schema not found"),
GetByIdError::Transport(err) => write!(f, "transport: {}", err),
GetByIdError::Server { code, message } => {
write!(f, "server error {}: {}", code, message)
}
}
}
}
#[derive(Debug, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct SubjectConfig {
pub compatibility_level: CompatibilityLevel,
}
#[derive(Debug)]
pub enum GetSubjectConfigError {
SubjectNotFound,
SubjectCompatibilityLevelNotSet,
Transport(reqwest::Error),
Server { code: i32, message: String },
}
impl From<UnhandledError> for GetSubjectConfigError {
fn from(err: UnhandledError) -> GetSubjectConfigError {
match err {
UnhandledError::Transport(err) => GetSubjectConfigError::Transport(err),
UnhandledError::Api { code, message } => match code {
404 => GetSubjectConfigError::SubjectNotFound,
40408 => GetSubjectConfigError::SubjectCompatibilityLevelNotSet,
_ => GetSubjectConfigError::Server { code, message },
},
}
}
}
impl Error for GetSubjectConfigError {
fn source(&self) -> Option<&(dyn Error + 'static)> {
match self {
GetSubjectConfigError::SubjectNotFound
| GetSubjectConfigError::SubjectCompatibilityLevelNotSet
| GetSubjectConfigError::Server { .. } => None,
GetSubjectConfigError::Transport(err) => Some(err),
}
}
}
impl fmt::Display for GetSubjectConfigError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
GetSubjectConfigError::SubjectNotFound => write!(f, "subject not found"),
GetSubjectConfigError::SubjectCompatibilityLevelNotSet => {
write!(f, "subject level compatibility not set")
}
GetSubjectConfigError::Transport(err) => write!(f, "transport: {}", err),
GetSubjectConfigError::Server { code, message } => {
write!(f, "server error {}: {}", code, message)
}
}
}
}
#[derive(Debug, Deserialize)]
#[serde(rename_all = "camelCase")]
struct GetBySubjectResponse {
id: i32,
schema: String,
version: i32,
subject: String,
#[serde(default)]
references: Vec<SchemaReference>,
}
#[derive(Debug)]
pub enum GetBySubjectError {
SubjectNotFound,
VersionNotFound(String),
Transport(reqwest::Error),
Server { code: i32, message: String },
}
impl From<UnhandledError> for GetBySubjectError {
fn from(err: UnhandledError) -> GetBySubjectError {
match err {
UnhandledError::Transport(err) => GetBySubjectError::Transport(err),
UnhandledError::Api { code, message } => match code {
40401 => GetBySubjectError::SubjectNotFound,
40402 => GetBySubjectError::VersionNotFound(message),
_ => GetBySubjectError::Server { code, message },
},
}
}
}
impl Error for GetBySubjectError {
fn source(&self) -> Option<&(dyn Error + 'static)> {
match self {
GetBySubjectError::SubjectNotFound
| GetBySubjectError::VersionNotFound(_)
| GetBySubjectError::Server { .. } => None,
GetBySubjectError::Transport(err) => Some(err),
}
}
}
impl fmt::Display for GetBySubjectError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
GetBySubjectError::SubjectNotFound => write!(f, "subject not found"),
GetBySubjectError::VersionNotFound(message) => {
write!(f, "version not found: {}", message)
}
GetBySubjectError::Transport(err) => write!(f, "transport: {}", err),
GetBySubjectError::Server { code, message } => {
write!(f, "server error {}: {}", code, message)
}
}
}
}
#[derive(Debug, Serialize)]
#[serde(rename_all = "camelCase")]
struct PublishRequest<'a> {
schema: &'a str,
#[serde(skip_serializing_if = "SchemaType::is_default")]
schema_type: SchemaType,
#[serde(skip_serializing_if = "<[_]>::is_empty")]
references: &'a [SchemaReference],
}
#[derive(Debug, Deserialize)]
#[serde(rename_all = "camelCase")]
struct PublishResponse {
id: i32,
}
#[derive(Debug, Serialize)]
#[serde(rename_all = "camelCase")]
struct CompatibilityLevelRequest {
compatibility: CompatibilityLevel,
}
#[derive(Arbitrary, Clone, Copy, Debug, Serialize, Deserialize, Eq, PartialEq)]
#[serde(rename_all = "SCREAMING_SNAKE_CASE")]
pub enum CompatibilityLevel {
Backward,
BackwardTransitive,
Forward,
ForwardTransitive,
Full,
FullTransitive,
None,
}
impl TryFrom<&str> for CompatibilityLevel {
type Error = String;
fn try_from(value: &str) -> Result<Self, Self::Error> {
match value {
"BACKWARD" => Ok(CompatibilityLevel::Backward),
"BACKWARD_TRANSITIVE" => Ok(CompatibilityLevel::BackwardTransitive),
"FORWARD" => Ok(CompatibilityLevel::Forward),
"FORWARD_TRANSITIVE" => Ok(CompatibilityLevel::ForwardTransitive),
"FULL" => Ok(CompatibilityLevel::Full),
"FULL_TRANSITIVE" => Ok(CompatibilityLevel::FullTransitive),
"NONE" => Ok(CompatibilityLevel::None),
_ => Err(format!("invalid compatibility level: {}", value)),
}
}
}
impl fmt::Display for CompatibilityLevel {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
CompatibilityLevel::Backward => write!(f, "BACKWARD"),
CompatibilityLevel::BackwardTransitive => write!(f, "BACKWARD_TRANSITIVE"),
CompatibilityLevel::Forward => write!(f, "FORWARD"),
CompatibilityLevel::ForwardTransitive => write!(f, "FORWARD_TRANSITIVE"),
CompatibilityLevel::Full => write!(f, "FULL"),
CompatibilityLevel::FullTransitive => write!(f, "FULL_TRANSITIVE"),
CompatibilityLevel::None => write!(f, "NONE"),
}
}
}
#[derive(Debug)]
pub enum PublishError {
IncompatibleSchema,
InvalidSchema { message: String },
Transport(reqwest::Error),
Server { code: i32, message: String },
}
impl From<UnhandledError> for PublishError {
fn from(err: UnhandledError) -> PublishError {
match err {
UnhandledError::Transport(err) => PublishError::Transport(err),
UnhandledError::Api { code, message } => match code {
409 => PublishError::IncompatibleSchema,
42201 => PublishError::InvalidSchema { message },
_ => PublishError::Server { code, message },
},
}
}
}
impl Error for PublishError {
fn source(&self) -> Option<&(dyn Error + 'static)> {
match self {
PublishError::IncompatibleSchema
| PublishError::InvalidSchema { .. }
| PublishError::Server { .. } => None,
PublishError::Transport(err) => Some(err),
}
}
}
impl fmt::Display for PublishError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
PublishError::IncompatibleSchema => write!(
f,
"schema being registered is incompatible with an earlier schema"
),
PublishError::InvalidSchema { message } => write!(f, "{}", message),
PublishError::Transport(err) => write!(f, "transport: {}", err),
PublishError::Server { code, message } => {
write!(f, "server error {}: {}", code, message)
}
}
}
}
#[derive(Debug)]
pub enum ListError {
Transport(reqwest::Error),
Server { code: i32, message: String },
}
impl From<UnhandledError> for ListError {
fn from(err: UnhandledError) -> ListError {
match err {
UnhandledError::Transport(err) => ListError::Transport(err),
UnhandledError::Api { code, message } => ListError::Server { code, message },
}
}
}
impl Error for ListError {
fn source(&self) -> Option<&(dyn Error + 'static)> {
match self {
ListError::Server { .. } => None,
ListError::Transport(err) => Some(err),
}
}
}
impl fmt::Display for ListError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
ListError::Transport(err) => write!(f, "transport: {}", err),
ListError::Server { code, message } => write!(f, "server error {}: {}", code, message),
}
}
}
#[derive(Debug)]
pub enum DeleteError {
SubjectNotFound,
Transport(reqwest::Error),
Server { code: i32, message: String },
}
impl From<UnhandledError> for DeleteError {
fn from(err: UnhandledError) -> DeleteError {
match err {
UnhandledError::Transport(err) => DeleteError::Transport(err),
UnhandledError::Api { code, message } => match code {
40401 => DeleteError::SubjectNotFound,
_ => DeleteError::Server { code, message },
},
}
}
}
impl Error for DeleteError {
fn source(&self) -> Option<&(dyn Error + 'static)> {
match self {
DeleteError::SubjectNotFound | DeleteError::Server { .. } => None,
DeleteError::Transport(err) => Some(err),
}
}
}
impl fmt::Display for DeleteError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
DeleteError::SubjectNotFound => write!(f, "subject not found"),
DeleteError::Transport(err) => write!(f, "transport: {}", err),
DeleteError::Server { code, message } => {
write!(f, "server error {}: {}", code, message)
}
}
}
}
#[derive(Debug)]
pub enum SetCompatibilityLevelError {
InvalidCompatibilityLevel,
Transport(reqwest::Error),
Server { code: i32, message: String },
}
impl From<UnhandledError> for SetCompatibilityLevelError {
fn from(err: UnhandledError) -> SetCompatibilityLevelError {
match err {
UnhandledError::Transport(err) => SetCompatibilityLevelError::Transport(err),
UnhandledError::Api { code, message } => match code {
42203 => SetCompatibilityLevelError::InvalidCompatibilityLevel,
_ => SetCompatibilityLevelError::Server { code, message },
},
}
}
}
impl Error for SetCompatibilityLevelError {
fn source(&self) -> Option<&(dyn Error + 'static)> {
match self {
SetCompatibilityLevelError::InvalidCompatibilityLevel
| SetCompatibilityLevelError::Server { .. } => None,
SetCompatibilityLevelError::Transport(err) => Some(err),
}
}
}
impl fmt::Display for SetCompatibilityLevelError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
SetCompatibilityLevelError::InvalidCompatibilityLevel => {
write!(f, "invalid compatibility level")
}
SetCompatibilityLevelError::Transport(err) => write!(f, "transport: {}", err),
SetCompatibilityLevelError::Server { code, message } => {
write!(f, "server error {}: {}", code, message)
}
}
}
}
#[derive(Debug, Deserialize)]
struct ErrorResponse {
error_code: i32,
message: String,
}
#[derive(Debug)]
enum UnhandledError {
Transport(reqwest::Error),
Api { code: i32, message: String },
}
impl From<reqwest::Error> for UnhandledError {
fn from(err: reqwest::Error) -> UnhandledError {
UnhandledError::Transport(err)
}
}