mz_storage_operators/oneshot_source/
http_source.rs1use bytes::Bytes;
13use derivative::Derivative;
14use futures::TryStreamExt;
15use futures::stream::{BoxStream, StreamExt};
16use reqwest::Client;
17use serde::{Deserialize, Serialize};
18use url::Url;
19
20use crate::oneshot_source::util::IntoRangeHeaderValue;
21use crate::oneshot_source::{
22 Encoding, OneshotObject, OneshotSource, StorageErrorX, StorageErrorXContext, StorageErrorXKind,
23};
24
25#[derive(Clone, Derivative)]
27#[derivative(Debug)]
28pub struct HttpOneshotSource {
29 #[derivative(Debug = "ignore")]
30 client: Client,
31 origin: Url,
32}
33
34impl HttpOneshotSource {
35 pub fn new(client: Client, origin: Url) -> Self {
36 HttpOneshotSource { client, origin }
37 }
38}
39
40#[derive(Debug, Clone, Serialize, Deserialize)]
42pub struct HttpObject {
43 url: Url,
45 filename: String,
47 size: usize,
49 content_encoding: Vec<Encoding>,
51}
52
53impl OneshotObject for HttpObject {
54 fn name(&self) -> &str {
55 &self.filename
56 }
57
58 fn path(&self) -> &str {
59 &self.filename
60 }
61
62 fn size(&self) -> usize {
63 self.size
64 }
65
66 fn encodings(&self) -> &[Encoding] {
67 &self.content_encoding
68 }
69}
70
71#[derive(Debug, Clone, Serialize, Deserialize)]
72pub enum HttpChecksum {
73 None,
75 ETag(String),
77 LastModified(String),
79}
80
81impl OneshotSource for HttpOneshotSource {
82 type Object = HttpObject;
83 type Checksum = HttpChecksum;
84
85 async fn list<'a>(&'a self) -> Result<Vec<(Self::Object, Self::Checksum)>, StorageErrorX> {
86 let response = self
91 .client
92 .head(self.origin.clone())
93 .send()
94 .await
95 .context("HEAD request")?;
96
97 let headers = match response.error_for_status() {
100 Ok(response) => response.headers().clone(),
101 Err(err) => {
102 tracing::warn!(status = ?err.status(), "HEAD request failed");
103
104 let response = self
105 .client
106 .get(self.origin.clone())
107 .send()
108 .await
109 .context("GET request")?;
110 let headers = response.headers().clone();
111
112 drop(response);
114
115 headers
116 }
117 };
118
119 let get_header = |name: &reqwest::header::HeaderName| {
120 let header = headers.get(name)?;
121 match header.to_str() {
122 Err(e) => {
123 tracing::warn!("failed to deserialize header '{name}', err: {e}");
124 None
125 }
126 Ok(value) => Some(value),
127 }
128 };
129
130 let checksum = if let Some(etag) = get_header(&reqwest::header::ETAG) {
132 HttpChecksum::ETag(etag.to_string())
133 } else if let Some(last_modified) = get_header(&reqwest::header::LAST_MODIFIED) {
134 let last_modified = last_modified.to_string();
135 HttpChecksum::LastModified(last_modified.to_string())
136 } else {
137 HttpChecksum::None
138 };
139
140 let size = get_header(&reqwest::header::CONTENT_LENGTH)
142 .ok_or(StorageErrorXKind::MissingSize)
143 .and_then(|s| s.parse::<usize>().map_err(StorageErrorXKind::generic))
144 .context("content-length header")?;
145
146 let filename = self
149 .origin
150 .path_segments()
151 .and_then(|segments| segments.rev().next())
152 .map(|s| s.to_string())
153 .unwrap_or_default();
154 let object = HttpObject {
155 url: self.origin.clone(),
156 filename,
157 size,
158 content_encoding: vec![],
159 };
160 tracing::info!(?object, "found objects");
161
162 Ok(vec![(object, checksum)])
163 }
164
165 fn get<'s>(
166 &'s self,
167 object: Self::Object,
168 _checksum: Self::Checksum,
169 range: Option<std::ops::RangeInclusive<usize>>,
170 ) -> BoxStream<'s, Result<Bytes, StorageErrorX>> {
171 let initial_response = async move {
174 let mut request = self.client.get(object.url);
175
176 if let Some(range) = &range {
177 let value = range.into_range_header_value();
178 request = request.header(&reqwest::header::RANGE, value);
179 }
180
181 let response = request.send().await.context("get")?;
186 let bytes_stream = response.bytes_stream().err_into();
187
188 Ok::<_, StorageErrorX>(bytes_stream)
189 };
190
191 futures::stream::once(initial_response)
192 .try_flatten()
193 .boxed()
194 }
195}