tower_lsp/service/client/
socket.rs
1use std::pin::Pin;
4use std::sync::Arc;
5use std::task::{Context, Poll};
6
7use futures::channel::mpsc::Receiver;
8use futures::sink::Sink;
9use futures::stream::{FusedStream, Stream, StreamExt};
10
11use super::{ExitedError, Pending, ServerState, State};
12use crate::jsonrpc::{Request, Response};
13
14#[derive(Debug)]
16pub struct ClientSocket {
17 pub(super) rx: Receiver<Request>,
18 pub(super) pending: Arc<Pending>,
19 pub(super) state: Arc<ServerState>,
20}
21
22impl ClientSocket {
23 pub fn split(self) -> (RequestStream, ResponseSink) {
30 let ClientSocket { rx, pending, state } = self;
31 let state_ = state.clone();
32
33 (
34 RequestStream { rx, state: state_ },
35 ResponseSink { pending, state },
36 )
37 }
38}
39
40impl Stream for ClientSocket {
42 type Item = Request;
43
44 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
45 if self.state.get() == State::Exited || self.rx.is_terminated() {
46 Poll::Ready(None)
47 } else {
48 self.rx.poll_next_unpin(cx)
49 }
50 }
51
52 fn size_hint(&self) -> (usize, Option<usize>) {
53 self.rx.size_hint()
54 }
55}
56
57impl FusedStream for ClientSocket {
58 fn is_terminated(&self) -> bool {
59 self.rx.is_terminated()
60 }
61}
62
63impl Sink<Response> for ClientSocket {
65 type Error = ExitedError;
66
67 fn poll_ready(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
68 if self.state.get() == State::Exited || self.rx.is_terminated() {
69 Poll::Ready(Err(ExitedError(())))
70 } else {
71 Poll::Ready(Ok(()))
72 }
73 }
74
75 fn start_send(self: Pin<&mut Self>, item: Response) -> Result<(), Self::Error> {
76 self.pending.insert(item);
77 Ok(())
78 }
79
80 fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
81 Poll::Ready(Ok(()))
82 }
83
84 fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
85 Poll::Ready(Ok(()))
86 }
87}
88
89#[derive(Debug)]
91#[must_use = "streams do nothing unless polled"]
92pub struct RequestStream {
93 rx: Receiver<Request>,
94 state: Arc<ServerState>,
95}
96
97impl Stream for RequestStream {
98 type Item = Request;
99
100 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
101 if self.state.get() == State::Exited || self.rx.is_terminated() {
102 Poll::Ready(None)
103 } else {
104 self.rx.poll_next_unpin(cx)
105 }
106 }
107
108 fn size_hint(&self) -> (usize, Option<usize>) {
109 self.rx.size_hint()
110 }
111}
112
113impl FusedStream for RequestStream {
114 fn is_terminated(&self) -> bool {
115 self.rx.is_terminated()
116 }
117}
118
119#[derive(Debug)]
121pub struct ResponseSink {
122 pending: Arc<Pending>,
123 state: Arc<ServerState>,
124}
125
126impl Sink<Response> for ResponseSink {
127 type Error = ExitedError;
128
129 fn poll_ready(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
130 if self.state.get() == State::Exited {
131 Poll::Ready(Err(ExitedError(())))
132 } else {
133 Poll::Ready(Ok(()))
134 }
135 }
136
137 fn start_send(self: Pin<&mut Self>, item: Response) -> Result<(), Self::Error> {
138 self.pending.insert(item);
139 Ok(())
140 }
141
142 fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
143 Poll::Ready(Ok(()))
144 }
145
146 fn poll_close(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
147 Poll::Ready(Ok(()))
148 }
149}