tower_lsp/service/
pending.rs

1//! Types for tracking cancelable client-to-server JSON-RPC requests.
2
3use std::fmt::{self, Debug, Formatter};
4use std::future::Future;
5use std::sync::Arc;
6
7use dashmap::{mapref::entry::Entry, DashMap};
8use futures::future::{self, Either};
9use tracing::{debug, info};
10
11use super::ExitedError;
12use crate::jsonrpc::{Error, Id, Response};
13
14/// A hashmap containing pending server requests, keyed by request ID.
15pub struct Pending(Arc<DashMap<Id, future::AbortHandle>>);
16
17impl Pending {
18    /// Creates a new pending server requests map.
19    pub fn new() -> Self {
20        Pending(Arc::new(DashMap::new()))
21    }
22
23    /// Executes the given async request handler, keyed by the given request ID.
24    ///
25    /// If a cancel request is issued before the future is finished resolving, this will resolve to
26    /// a "canceled" error response, and the pending request handler future will be dropped.
27    pub fn execute<F>(
28        &self,
29        id: Id,
30        fut: F,
31    ) -> impl Future<Output = Result<Option<Response>, ExitedError>> + Send + 'static
32    where
33        F: Future<Output = Result<Option<Response>, ExitedError>> + Send + 'static,
34    {
35        if let Entry::Vacant(entry) = self.0.entry(id.clone()) {
36            let (handler_fut, abort_handle) = future::abortable(fut);
37            entry.insert(abort_handle);
38
39            let requests = self.0.clone();
40            Either::Left(async move {
41                let abort_result = handler_fut.await;
42                requests.remove(&id); // Remove abort handle now to avoid double cancellation.
43
44                if let Ok(handler_result) = abort_result {
45                    handler_result
46                } else {
47                    Ok(Some(Response::from_error(id, Error::request_cancelled())))
48                }
49            })
50        } else {
51            Either::Right(async { Ok(Some(Response::from_error(id, Error::invalid_request()))) })
52        }
53    }
54
55    /// Attempts to cancel the running request handler corresponding to this ID.
56    ///
57    /// This will force the future to resolve to a "canceled" error response. If the future has
58    /// already completed, this method call will do nothing.
59    pub fn cancel(&self, id: &Id) {
60        if let Some((_, handle)) = self.0.remove(id) {
61            handle.abort();
62            info!("successfully cancelled request with ID: {}", id);
63        } else {
64            debug!(
65                "client asked to cancel request {}, but no such pending request exists, ignoring",
66                id
67            );
68        }
69    }
70
71    /// Cancels all pending request handlers, if any.
72    pub fn cancel_all(&self) {
73        self.0.retain(|_, handle| {
74            handle.abort();
75            false
76        });
77    }
78}
79
80impl Debug for Pending {
81    fn fmt(&self, f: &mut Formatter) -> fmt::Result {
82        f.debug_set()
83            .entries(self.0.iter().map(|entry| entry.key().clone()))
84            .finish()
85    }
86}
87
88#[cfg(test)]
89mod tests {
90    use serde_json::json;
91
92    use super::*;
93
94    #[tokio::test(flavor = "current_thread")]
95    async fn executes_server_request() {
96        let pending = Pending::new();
97
98        let id = Id::Number(1);
99        let id2 = id.clone();
100        let response = pending
101            .execute(id.clone(), async {
102                Ok(Some(Response::from_ok(id2, json!({}))))
103            })
104            .await;
105
106        assert_eq!(response, Ok(Some(Response::from_ok(id, json!({})))));
107    }
108
109    #[tokio::test(flavor = "current_thread")]
110    async fn cancels_server_request() {
111        let pending = Pending::new();
112
113        let id = Id::Number(1);
114        let handler_fut = tokio::spawn(pending.execute(id.clone(), future::pending()));
115
116        pending.cancel(&id);
117
118        let res = handler_fut.await.expect("task panicked");
119        assert_eq!(
120            res,
121            Ok(Some(Response::from_error(id, Error::request_cancelled())))
122        );
123    }
124}