mz_storage_operators/oneshot_source/
http_source.rs

1// Copyright Materialize, Inc. and contributors. All rights reserved.
2//
3// Use of this software is governed by the Business Source License
4// included in the LICENSE file.
5//
6// As of the Change Date specified in that file, in accordance with
7// the Business Source License, use of this software will be governed
8// by the Apache License, Version 2.0.
9
10//! Generic HTTP oneshot source that will fetch a file from the public internet.
11
12use bytes::Bytes;
13use derivative::Derivative;
14use futures::TryStreamExt;
15use futures::stream::{BoxStream, StreamExt};
16use reqwest::Client;
17use serde::{Deserialize, Serialize};
18use url::Url;
19
20use crate::oneshot_source::util::IntoRangeHeaderValue;
21use crate::oneshot_source::{
22    Encoding, OneshotObject, OneshotSource, StorageErrorX, StorageErrorXContext, StorageErrorXKind,
23};
24
25/// Generic oneshot source that fetches a file from a URL on the public internet.
26#[derive(Clone, Derivative)]
27#[derivative(Debug)]
28pub struct HttpOneshotSource {
29    #[derivative(Debug = "ignore")]
30    client: Client,
31    origin: Url,
32}
33
34impl HttpOneshotSource {
35    pub fn new(client: Client, origin: Url) -> Self {
36        HttpOneshotSource { client, origin }
37    }
38}
39
40/// Object returned from an [`HttpOneshotSource`].
41#[derive(Debug, Clone, Serialize, Deserialize)]
42pub struct HttpObject {
43    /// [`Url`] to access the file.
44    url: Url,
45    /// Name of the file.
46    filename: String,
47    /// Size of this file reported by the [`Content-Length`](https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Content-Length) header
48    size: usize,
49    /// Any values reporting from the [`Content-Encoding`](https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Content-Encoding) header.
50    content_encoding: Vec<Encoding>,
51}
52
53impl OneshotObject for HttpObject {
54    fn name(&self) -> &str {
55        &self.filename
56    }
57
58    fn path(&self) -> &str {
59        &self.filename
60    }
61
62    fn size(&self) -> usize {
63        self.size
64    }
65
66    fn encodings(&self) -> &[Encoding] {
67        &self.content_encoding
68    }
69}
70
71#[derive(Debug, Clone, Serialize, Deserialize)]
72pub enum HttpChecksum {
73    /// No checksumming is requested.
74    None,
75    /// The HTTP [`ETag`](https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/ETag) header.
76    ETag(String),
77    /// The HTTP [`Last-Modified`](https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Last-Modified) header.
78    LastModified(String),
79}
80
81impl OneshotSource for HttpOneshotSource {
82    type Object = HttpObject;
83    type Checksum = HttpChecksum;
84
85    async fn list<'a>(&'a self) -> Result<Vec<(Self::Object, Self::Checksum)>, StorageErrorX> {
86        // TODO(cf3): Support listing files from a directory index.
87
88        // To get metadata about a file we'll first try issuing a `HEAD` request, which
89        // canonically is the right thing do.
90        let response = self
91            .client
92            .head(self.origin.clone())
93            .send()
94            .await
95            .context("HEAD request")?;
96
97        // Not all servers accept `HEAD` requests though, so we'll fallback to a `GET`
98        // request and skip fetching the body.
99        let headers = match response.error_for_status() {
100            Ok(response) => response.headers().clone(),
101            Err(err) => {
102                tracing::warn!(status = ?err.status(), "HEAD request failed");
103
104                let response = self
105                    .client
106                    .get(self.origin.clone())
107                    .send()
108                    .await
109                    .context("GET request")?;
110                let headers = response.headers().clone();
111
112                // Immediately drop the response so we don't attempt to fetch the body.
113                drop(response);
114
115                headers
116            }
117        };
118
119        let get_header = |name: &reqwest::header::HeaderName| {
120            let header = headers.get(name)?;
121            match header.to_str() {
122                Err(e) => {
123                    tracing::warn!("failed to deserialize header '{name}', err: {e}");
124                    None
125                }
126                Ok(value) => Some(value),
127            }
128        };
129
130        // Get a checksum from the content.
131        let checksum = if let Some(etag) = get_header(&reqwest::header::ETAG) {
132            HttpChecksum::ETag(etag.to_string())
133        } else if let Some(last_modified) = get_header(&reqwest::header::LAST_MODIFIED) {
134            let last_modified = last_modified.to_string();
135            HttpChecksum::LastModified(last_modified.to_string())
136        } else {
137            HttpChecksum::None
138        };
139
140        // Get the size of the object from the Conent-Length header.
141        let size = get_header(&reqwest::header::CONTENT_LENGTH)
142            .ok_or(StorageErrorXKind::MissingSize)
143            .and_then(|s| s.parse::<usize>().map_err(StorageErrorXKind::generic))
144            .context("content-length header")?;
145
146        // TODO(cf1): We should probably check the content-type as well. At least for advisory purposes.
147
148        let filename = self
149            .origin
150            .path_segments()
151            .and_then(|segments| segments.rev().next())
152            .map(|s| s.to_string())
153            .unwrap_or_default();
154        let object = HttpObject {
155            url: self.origin.clone(),
156            filename,
157            size,
158            content_encoding: vec![],
159        };
160        tracing::info!(?object, "found objects");
161
162        Ok(vec![(object, checksum)])
163    }
164
165    fn get<'s>(
166        &'s self,
167        object: Self::Object,
168        _checksum: Self::Checksum,
169        range: Option<std::ops::RangeInclusive<usize>>,
170    ) -> BoxStream<'s, Result<Bytes, StorageErrorX>> {
171        // TODO(cf1): Validate our checksum.
172
173        let initial_response = async move {
174            let mut request = self.client.get(object.url);
175
176            if let Some(range) = &range {
177                let value = range.into_range_header_value();
178                request = request.header(&reqwest::header::RANGE, value);
179            }
180
181            // TODO(parkmycar): We should probably assert that the response contains
182            // an appropriate Content-Range header in the response, and maybe that we
183            // got back an HTTP 206?
184
185            let response = request.send().await.context("get")?;
186            let bytes_stream = response.bytes_stream().err_into();
187
188            Ok::<_, StorageErrorX>(bytes_stream)
189        };
190
191        futures::stream::once(initial_response)
192            .try_flatten()
193            .boxed()
194    }
195}