1use std::future::Future;
19use std::sync::Arc;
20use std::time::Duration;
21
22use crate::raw::*;
23use crate::*;
24
25#[derive(Clone)]
115pub struct TimeoutLayer {
116 timeout: Duration,
117 io_timeout: Duration,
118}
119
120impl Default for TimeoutLayer {
121 fn default() -> Self {
122 Self {
123 timeout: Duration::from_secs(60),
124 io_timeout: Duration::from_secs(10),
125 }
126 }
127}
128
129impl TimeoutLayer {
130 pub fn new() -> Self {
132 Self::default()
133 }
134
135 pub fn with_timeout(mut self, timeout: Duration) -> Self {
139 self.timeout = timeout;
140 self
141 }
142
143 pub fn with_io_timeout(mut self, timeout: Duration) -> Self {
147 self.io_timeout = timeout;
148 self
149 }
150
151 #[deprecated(note = "with speed is not supported anymore, please use with_io_timeout instead")]
162 pub fn with_speed(self, _: u64) -> Self {
163 self
164 }
165}
166
167impl<A: Access> Layer<A> for TimeoutLayer {
168 type LayeredAccess = TimeoutAccessor<A>;
169
170 fn layer(&self, inner: A) -> Self::LayeredAccess {
171 let info = inner.info();
172 info.update_executor(|exec| {
173 Executor::with(TimeoutExecutor::new(exec.into_inner(), self.io_timeout))
174 });
175
176 TimeoutAccessor {
177 inner,
178
179 timeout: self.timeout,
180 io_timeout: self.io_timeout,
181 }
182 }
183}
184
185#[derive(Debug, Clone)]
186pub struct TimeoutAccessor<A: Access> {
187 inner: A,
188
189 timeout: Duration,
190 io_timeout: Duration,
191}
192
193impl<A: Access> TimeoutAccessor<A> {
194 async fn timeout<F: Future<Output = Result<T>>, T>(&self, op: Operation, fut: F) -> Result<T> {
195 tokio::time::timeout(self.timeout, fut).await.map_err(|_| {
196 Error::new(ErrorKind::Unexpected, "operation timeout reached")
197 .with_operation(op)
198 .with_context("timeout", self.timeout.as_secs_f64().to_string())
199 .set_temporary()
200 })?
201 }
202
203 async fn io_timeout<F: Future<Output = Result<T>>, T>(
204 &self,
205 op: Operation,
206 fut: F,
207 ) -> Result<T> {
208 tokio::time::timeout(self.io_timeout, fut)
209 .await
210 .map_err(|_| {
211 Error::new(ErrorKind::Unexpected, "io timeout reached")
212 .with_operation(op)
213 .with_context("timeout", self.io_timeout.as_secs_f64().to_string())
214 .set_temporary()
215 })?
216 }
217}
218
219impl<A: Access> LayeredAccess for TimeoutAccessor<A> {
220 type Inner = A;
221 type Reader = TimeoutWrapper<A::Reader>;
222 type Writer = TimeoutWrapper<A::Writer>;
223 type Lister = TimeoutWrapper<A::Lister>;
224 type Deleter = TimeoutWrapper<A::Deleter>;
225
226 fn inner(&self) -> &Self::Inner {
227 &self.inner
228 }
229
230 async fn create_dir(&self, path: &str, args: OpCreateDir) -> Result<RpCreateDir> {
231 self.timeout(Operation::CreateDir, self.inner.create_dir(path, args))
232 .await
233 }
234
235 async fn read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::Reader)> {
236 self.io_timeout(Operation::Read, self.inner.read(path, args))
237 .await
238 .map(|(rp, r)| (rp, TimeoutWrapper::new(r, self.io_timeout)))
239 }
240
241 async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> {
242 self.io_timeout(Operation::Write, self.inner.write(path, args))
243 .await
244 .map(|(rp, r)| (rp, TimeoutWrapper::new(r, self.io_timeout)))
245 }
246
247 async fn copy(&self, from: &str, to: &str, args: OpCopy) -> Result<RpCopy> {
248 self.timeout(Operation::Copy, self.inner.copy(from, to, args))
249 .await
250 }
251
252 async fn rename(&self, from: &str, to: &str, args: OpRename) -> Result<RpRename> {
253 self.timeout(Operation::Rename, self.inner.rename(from, to, args))
254 .await
255 }
256
257 async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
258 self.timeout(Operation::Stat, self.inner.stat(path, args))
259 .await
260 }
261
262 async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
263 self.timeout(Operation::Delete, self.inner.delete())
264 .await
265 .map(|(rp, r)| (rp, TimeoutWrapper::new(r, self.io_timeout)))
266 }
267
268 async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
269 self.io_timeout(Operation::List, self.inner.list(path, args))
270 .await
271 .map(|(rp, r)| (rp, TimeoutWrapper::new(r, self.io_timeout)))
272 }
273
274 async fn presign(&self, path: &str, args: OpPresign) -> Result<RpPresign> {
275 self.timeout(Operation::Presign, self.inner.presign(path, args))
276 .await
277 }
278}
279
280pub struct TimeoutExecutor {
281 exec: Arc<dyn Execute>,
282 timeout: Duration,
283}
284
285impl TimeoutExecutor {
286 pub fn new(exec: Arc<dyn Execute>, timeout: Duration) -> Self {
287 Self { exec, timeout }
288 }
289}
290
291impl Execute for TimeoutExecutor {
292 fn execute(&self, f: BoxedStaticFuture<()>) {
293 self.exec.execute(f)
294 }
295
296 fn timeout(&self) -> Option<BoxedStaticFuture<()>> {
297 Some(Box::pin(tokio::time::sleep(self.timeout)))
298 }
299}
300
301pub struct TimeoutWrapper<R> {
302 inner: R,
303
304 timeout: Duration,
305}
306
307impl<R> TimeoutWrapper<R> {
308 fn new(inner: R, timeout: Duration) -> Self {
309 Self { inner, timeout }
310 }
311
312 #[inline]
313 async fn io_timeout<F: Future<Output = Result<T>>, T>(
314 timeout: Duration,
315 op: &'static str,
316 fut: F,
317 ) -> Result<T> {
318 tokio::time::timeout(timeout, fut).await.map_err(|_| {
319 Error::new(ErrorKind::Unexpected, "io operation timeout reached")
320 .with_operation(op)
321 .with_context("timeout", timeout.as_secs_f64().to_string())
322 .set_temporary()
323 })?
324 }
325}
326
327impl<R: oio::Read> oio::Read for TimeoutWrapper<R> {
328 async fn read(&mut self) -> Result<Buffer> {
329 let fut = self.inner.read();
330 Self::io_timeout(self.timeout, Operation::Read.into_static(), fut).await
331 }
332}
333
334impl<R: oio::Write> oio::Write for TimeoutWrapper<R> {
335 async fn write(&mut self, bs: Buffer) -> Result<()> {
336 let fut = self.inner.write(bs);
337 Self::io_timeout(self.timeout, Operation::Write.into_static(), fut).await
338 }
339
340 async fn close(&mut self) -> Result<Metadata> {
341 let fut = self.inner.close();
342 Self::io_timeout(self.timeout, Operation::Write.into_static(), fut).await
343 }
344
345 async fn abort(&mut self) -> Result<()> {
346 let fut = self.inner.abort();
347 Self::io_timeout(self.timeout, Operation::Write.into_static(), fut).await
348 }
349}
350
351impl<R: oio::List> oio::List for TimeoutWrapper<R> {
352 async fn next(&mut self) -> Result<Option<oio::Entry>> {
353 let fut = self.inner.next();
354 Self::io_timeout(self.timeout, Operation::List.into_static(), fut).await
355 }
356}
357
358impl<R: oio::Delete> oio::Delete for TimeoutWrapper<R> {
359 fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> {
360 self.inner.delete(path, args)
361 }
362
363 async fn flush(&mut self) -> Result<usize> {
364 let fut = self.inner.flush();
365 Self::io_timeout(self.timeout, Operation::Delete.into_static(), fut).await
366 }
367}
368
369#[cfg(test)]
370mod tests {
371 use std::future::Future;
372 use std::future::pending;
373 use std::sync::Arc;
374 use std::time::Duration;
375
376 use futures::StreamExt;
377 use tokio::time::sleep;
378 use tokio::time::timeout;
379
380 use crate::layers::TimeoutLayer;
381 use crate::layers::TypeEraseLayer;
382 use crate::raw::*;
383 use crate::*;
384
385 #[derive(Debug, Clone, Default)]
386 struct MockService;
387
388 impl Access for MockService {
389 type Reader = MockReader;
390 type Writer = ();
391 type Lister = MockLister;
392 type Deleter = ();
393
394 fn info(&self) -> Arc<AccessorInfo> {
395 let am = AccessorInfo::default();
396 am.set_native_capability(Capability {
397 read: true,
398 delete: true,
399 ..Default::default()
400 });
401
402 am.into()
403 }
404
405 async fn read(&self, _: &str, _: OpRead) -> Result<(RpRead, Self::Reader)> {
407 Ok((RpRead::new(), MockReader))
408 }
409
410 async fn delete(&self) -> Result<(RpDelete, Self::Deleter)> {
412 sleep(Duration::from_secs(u64::MAX)).await;
413
414 Ok((RpDelete::default(), ()))
415 }
416
417 async fn list(&self, _: &str, _: OpList) -> Result<(RpList, Self::Lister)> {
418 Ok((RpList::default(), MockLister))
419 }
420 }
421
422 #[derive(Debug, Clone, Default)]
423 struct MockReader;
424
425 impl oio::Read for MockReader {
426 fn read(&mut self) -> impl Future<Output = Result<Buffer>> {
427 pending()
428 }
429 }
430
431 #[derive(Debug, Clone, Default)]
432 struct MockLister;
433
434 impl oio::List for MockLister {
435 fn next(&mut self) -> impl Future<Output = Result<Option<oio::Entry>>> {
436 pending()
437 }
438 }
439
440 #[tokio::test]
441 async fn test_operation_timeout() {
442 let acc = Arc::new(TypeEraseLayer.layer(MockService)) as Accessor;
443 let op = Operator::from_inner(acc)
444 .layer(TimeoutLayer::new().with_timeout(Duration::from_secs(1)));
445
446 let fut = async {
447 let res = op.delete("test").await;
448 assert!(res.is_err());
449 let err = res.unwrap_err();
450 assert_eq!(err.kind(), ErrorKind::Unexpected);
451 assert!(err.to_string().contains("timeout"))
452 };
453
454 timeout(Duration::from_secs(2), fut)
455 .await
456 .expect("this test should not exceed 2 seconds")
457 }
458
459 #[tokio::test]
460 async fn test_io_timeout() {
461 let acc = Arc::new(TypeEraseLayer.layer(MockService)) as Accessor;
462 let op = Operator::from_inner(acc)
463 .layer(TimeoutLayer::new().with_io_timeout(Duration::from_secs(1)));
464
465 let reader = op.reader("test").await.unwrap();
466
467 let res = reader.read(0..4).await;
468 assert!(res.is_err());
469 let err = res.unwrap_err();
470 assert_eq!(err.kind(), ErrorKind::Unexpected);
471 assert!(err.to_string().contains("timeout"))
472 }
473
474 #[tokio::test]
475 async fn test_list_timeout() {
476 let acc = Arc::new(TypeEraseLayer.layer(MockService)) as Accessor;
477 let op = Operator::from_inner(acc).layer(
478 TimeoutLayer::new()
479 .with_timeout(Duration::from_secs(1))
480 .with_io_timeout(Duration::from_secs(1)),
481 );
482
483 let mut lister = op.lister("test").await.unwrap();
484
485 let res = lister.next().await.unwrap();
486 assert!(res.is_err());
487 let err = res.unwrap_err();
488 assert_eq!(err.kind(), ErrorKind::Unexpected);
489 assert!(err.to_string().contains("timeout"))
490 }
491
492 #[tokio::test]
493 async fn test_list_timeout_raw() {
494 use oio::List;
495
496 let acc = MockService;
497 let timeout_layer = TimeoutLayer::new()
498 .with_timeout(Duration::from_secs(1))
499 .with_io_timeout(Duration::from_secs(1));
500 let timeout_acc = timeout_layer.layer(acc);
501
502 let (_, mut lister) = Access::list(&timeout_acc, "test", OpList::default())
503 .await
504 .unwrap();
505
506 let res = lister.next().await;
507 assert!(res.is_err());
508 let err = res.unwrap_err();
509 assert_eq!(err.kind(), ErrorKind::Unexpected);
510 assert!(err.to_string().contains("timeout"));
511 }
512}