domain/net/client/
dgram_stream.rs1use crate::base::Message;
7use crate::net::client::dgram;
8use crate::net::client::multi_stream;
9use crate::net::client::protocol::{
10    AsyncConnect, AsyncDgramRecv, AsyncDgramSend,
11};
12use crate::net::client::request::{
13    ComposeRequest, Error, GetResponse, SendRequest,
14};
15use bytes::Bytes;
16use std::boxed::Box;
17use std::fmt::Debug;
18use std::future::Future;
19use std::pin::Pin;
20use std::sync::Arc;
21
22#[derive(Clone, Debug, Default)]
26pub struct Config {
27    dgram: dgram::Config,
29
30    multi_stream: multi_stream::Config,
32}
33
34impl Config {
35    pub fn new() -> Self {
37        Default::default()
38    }
39
40    pub fn from_parts(
42        dgram: dgram::Config,
43        multi_stream: multi_stream::Config,
44    ) -> Self {
45        Self {
46            dgram,
47            multi_stream,
48        }
49    }
50
51    pub fn dgram(&self) -> &dgram::Config {
53        &self.dgram
54    }
55
56    pub fn dgram_mut(&mut self) -> &mut dgram::Config {
58        &mut self.dgram
59    }
60
61    pub fn set_dgram(&mut self, dgram: dgram::Config) {
63        self.dgram = dgram
64    }
65
66    pub fn stream(&self) -> &multi_stream::Config {
68        &self.multi_stream
69    }
70
71    pub fn stream_mut(&mut self) -> &mut multi_stream::Config {
73        &mut self.multi_stream
74    }
75
76    pub fn set_stream(&mut self, stream: multi_stream::Config) {
78        self.multi_stream = stream
79    }
80}
81
82#[derive(Clone, Debug)]
87pub struct Connection<DgramS, Req> {
88    udp_conn: Arc<dgram::Connection<DgramS>>,
90
91    tcp_conn: multi_stream::Connection<Req>,
93}
94
95impl<DgramS, Req> Connection<DgramS, Req>
96where
97    DgramS: AsyncConnect + Clone + Send + Sync + 'static,
98    DgramS::Connection:
99        AsyncDgramRecv + AsyncDgramSend + Send + Sync + Unpin + 'static,
100{
101    pub fn new<StreamS>(
103        dgram_remote: DgramS,
104        stream_remote: StreamS,
105    ) -> (Self, multi_stream::Transport<StreamS, Req>) {
106        Self::with_config(dgram_remote, stream_remote, Default::default())
107    }
108
109    pub fn with_config<StreamS>(
111        dgram_remote: DgramS,
112        stream_remote: StreamS,
113        config: Config,
114    ) -> (Self, multi_stream::Transport<StreamS, Req>) {
115        let udp_conn =
116            dgram::Connection::with_config(dgram_remote, config.dgram).into();
117        let (tcp_conn, transport) = multi_stream::Connection::with_config(
118            stream_remote,
119            config.multi_stream,
120        );
121        (Self { udp_conn, tcp_conn }, transport)
122    }
123}
124
125impl<DgramS, Req> SendRequest<Req> for Connection<DgramS, Req>
128where
129    DgramS: AsyncConnect + Clone + Debug + Send + Sync + 'static,
130    DgramS::Connection: AsyncDgramRecv + AsyncDgramSend + Send + Sync + Unpin,
131    Req: ComposeRequest + Clone + 'static,
132{
133    fn send_request(
134        &self,
135        request_msg: Req,
136    ) -> Box<dyn GetResponse + Send + Sync> {
137        Box::new(Request::new(
138            request_msg,
139            self.udp_conn.clone(),
140            self.tcp_conn.clone(),
141        ))
142    }
143}
144
145#[derive(Debug)]
149pub struct Request<S, Req> {
150    request_msg: Req,
152
153    udp_conn: Arc<dgram::Connection<S>>,
155
156    tcp_conn: multi_stream::Connection<Req>,
158
159    state: QueryState,
161}
162
163#[derive(Debug)]
165enum QueryState {
166    StartUdpRequest,
168
169    GetUdpResponse(Box<dyn GetResponse + Send + Sync>),
171
172    StartTcpRequest,
174
175    GetTcpResponse(Box<dyn GetResponse + Send + Sync>),
177}
178
179impl<S, Req> Request<S, Req>
180where
181    S: AsyncConnect + Clone + Send + Sync + 'static,
182    Req: ComposeRequest + Clone + 'static,
183{
184    fn new(
188        request_msg: Req,
189        udp_conn: Arc<dgram::Connection<S>>,
190        tcp_conn: multi_stream::Connection<Req>,
191    ) -> Request<S, Req> {
192        Self {
193            request_msg,
194            udp_conn,
195            tcp_conn,
196            state: QueryState::StartUdpRequest,
197        }
198    }
199
200    async fn get_response_impl(&mut self) -> Result<Message<Bytes>, Error>
204    where
205        S::Connection: AsyncDgramRecv + AsyncDgramSend + Send + Sync + Unpin,
206    {
207        loop {
208            match &mut self.state {
209                QueryState::StartUdpRequest => {
210                    let msg = self.request_msg.clone();
211                    let request = self.udp_conn.send_request(msg);
212                    self.state = QueryState::GetUdpResponse(request);
213                    continue;
214                }
215                QueryState::GetUdpResponse(ref mut request) => {
216                    let response = request.get_response().await?;
217                    if response.header().tc() {
218                        self.state = QueryState::StartTcpRequest;
219                        continue;
220                    }
221                    return Ok(response);
222                }
223                QueryState::StartTcpRequest => {
224                    let msg = self.request_msg.clone();
225                    let request = self.tcp_conn.send_request(msg);
226                    self.state = QueryState::GetTcpResponse(request);
227                    continue;
228                }
229                QueryState::GetTcpResponse(ref mut query) => {
230                    let response = query.get_response().await?;
231                    return Ok(response);
232                }
233            }
234        }
235    }
236}
237
238impl<S, Req> GetResponse for Request<S, Req>
239where
240    S: AsyncConnect + Clone + Debug + Send + Sync + 'static,
241    S::Connection: AsyncDgramRecv + AsyncDgramSend + Send + Sync + Unpin,
242    Req: ComposeRequest + Clone + 'static,
243{
244    fn get_response(
245        &mut self,
246    ) -> Pin<
247        Box<
248            dyn Future<Output = Result<Message<Bytes>, Error>>
249                + Send
250                + Sync
251                + '_,
252        >,
253    > {
254        Box::pin(self.get_response_impl())
255    }
256}