openssh_mux_client/
connection.rs

1#![forbid(unsafe_code)]
2
3use crate::{
4    constants,
5    request::{Fwd, Request, SessionZeroCopy},
6    shutdown_mux_master::shutdown_mux_master_from,
7    utils::{serialize_u32, SliceExt},
8    Error, ErrorExt, EstablishedSession, Response, Result, Session, Socket,
9};
10
11use std::{
12    borrow::Cow,
13    convert::TryInto,
14    io,
15    io::IoSlice,
16    num::{NonZeroU32, Wrapping},
17    os::unix::io::RawFd,
18    path::Path,
19};
20
21use sendfd::SendWithFd;
22use serde::{de::DeserializeOwned, Serialize};
23use ssh_format::{from_bytes, Serializer};
24use tokio::net::UnixStream;
25use tokio_io_utility::{read_to_vec_rng, write_vectored_all};
26
27#[derive(Copy, Clone, Debug, Eq, PartialEq, Hash)]
28pub enum ForwardType {
29    Local,
30    Remote,
31}
32
33/// # Cancel safety
34///
35/// All methods of this struct is not cancellation safe.
36#[derive(Debug)]
37pub struct Connection {
38    raw_conn: UnixStream,
39    serializer: Serializer,
40    read_buffer: Vec<u8>,
41    request_id: Wrapping<u32>,
42}
43impl Connection {
44    fn reset_serializer(&mut self) {
45        self.serializer.reset_counter();
46        self.serializer.output.clear();
47    }
48
49    async fn write(&mut self, value: &Request) -> Result<()> {
50        self.reset_serializer();
51        value.serialize(&mut self.serializer)?;
52
53        let header = self.serializer.create_header(0)?;
54
55        write_vectored_all(
56            &mut self.raw_conn,
57            &mut [IoSlice::new(&header), IoSlice::new(&self.serializer.output)],
58        )
59        .await?;
60
61        Ok(())
62    }
63
64    fn deserialize<T: DeserializeOwned>(read_buffer: &[u8]) -> Result<T> {
65        // Ignore any trailing bytes to be forward compatible
66        Ok(from_bytes(read_buffer)?.0)
67    }
68
69    pub(crate) async fn read_response(&mut self) -> Result<Response> {
70        let buffer = &mut self.read_buffer;
71
72        if buffer.len() < 4 {
73            let n = 4 - buffer.len();
74            read_to_vec_rng(&mut self.raw_conn, buffer, n..).await?;
75        }
76
77        // Read in the header
78        let packet_len: u32 = Self::deserialize(&buffer[..4])?;
79
80        let packet_len: usize = packet_len.try_into().unwrap();
81
82        // The first 4 bytes are not counted as the packet body
83        let buffer_len = buffer.len() - 4;
84
85        if buffer_len < packet_len {
86            // Read in rest of the packet
87            let n = packet_len - buffer_len;
88            read_to_vec_rng(&mut self.raw_conn, buffer, n..).await?;
89        }
90
91        // Deserialize the response
92        let response = Self::deserialize(&buffer[4..(4 + packet_len)])?;
93
94        // Remove the packet from buffer
95        buffer.drain(..(4 + packet_len));
96
97        Ok(response)
98    }
99
100    /// Send fds with "\0"
101    async fn send_with_fds(&self, fds: &[RawFd]) -> Result<()> {
102        let byte = &[0];
103
104        loop {
105            self.raw_conn.writable().await?;
106
107            // send_with_fd calls `UnixStream::try_io`
108            match SendWithFd::send_with_fd(&self.raw_conn, byte, fds) {
109                Ok(n) => {
110                    if n == 1 {
111                        break Ok(());
112                    } else {
113                        debug_assert_eq!(n, 0);
114                        break Err(io::Error::from(io::ErrorKind::UnexpectedEof).into());
115                    }
116                }
117                Err(e) => {
118                    if e.kind() != io::ErrorKind::WouldBlock {
119                        break Err(e.into());
120                    }
121                }
122            }
123        }
124    }
125
126    fn get_request_id(&mut self) -> u32 {
127        let request_id = self.request_id.0;
128        self.request_id += Wrapping(1);
129
130        request_id
131    }
132
133    fn check_response_id(request_id: u32, response_id: u32) -> Result<()> {
134        if request_id != response_id {
135            Err(Error::UnmatchedRequestId)
136        } else {
137            Ok(())
138        }
139    }
140
141    async fn exchange_hello(mut self) -> Result<Self> {
142        self.write(&Request::Hello {
143            version: constants::SSHMUX_VER,
144        })
145        .await?;
146
147        let response = self.read_response().await?;
148        if let Response::Hello { version } = response {
149            if version != constants::SSHMUX_VER {
150                Err(Error::UnsupportedMuxProtocol)
151            } else {
152                Ok(self)
153            }
154        } else {
155            Err(Error::invalid_server_response(&"Hello message", &response))
156        }
157    }
158
159    pub async fn connect<P: AsRef<Path>>(path: P) -> Result<Self> {
160        Self {
161            raw_conn: UnixStream::connect(path).await?,
162            // All request packets are at least 12 bytes large,
163            // and variant [`Request::NewSession`] takes 36 bytes to
164            // serialize.
165            serializer: Serializer::new(Vec::with_capacity(36)),
166            // All reponse packets are at least 16 bytes large.
167            read_buffer: Vec::with_capacity(32),
168            request_id: Wrapping(0),
169        }
170        .exchange_hello()
171        .await
172    }
173
174    /// Send a ping to the server and return pid of the ssh mux server
175    /// if it is still alive.
176    pub async fn send_alive_check(&mut self) -> Result<NonZeroU32> {
177        let request_id = self.get_request_id();
178
179        self.write(&Request::AliveCheck { request_id }).await?;
180
181        let response = self.read_response().await?;
182        if let Response::Alive {
183            response_id,
184            server_pid,
185        } = response
186        {
187            Self::check_response_id(request_id, response_id)?;
188            NonZeroU32::new(server_pid).ok_or(Error::InvalidPid)
189        } else {
190            Err(Error::invalid_server_response(
191                &"Response::Alive",
192                &response,
193            ))
194        }
195    }
196
197    /// Return session_id
198    async fn open_new_session_impl(
199        &mut self,
200        session: &Session<'_>,
201        fds: &[RawFd; 3],
202    ) -> Result<u32> {
203        use Response::*;
204
205        let request_id = self.get_request_id();
206
207        // Prepare to serialize
208        let term = session.term.as_ref().into_inner();
209        let cmd = session.cmd.as_ref().into_inner();
210
211        let term_len: u32 = term.get_len_as_u32()?;
212        let cmd_len: u32 = cmd.get_len_as_u32()?;
213
214        let request = Request::NewSession {
215            request_id,
216            session: SessionZeroCopy {
217                tty: session.tty,
218                x11_forwarding: session.x11_forwarding,
219                agent: session.agent,
220                subsystem: session.subsystem,
221                escape_ch: session.escape_ch,
222            },
223        };
224
225        // Serialize
226        self.reset_serializer();
227
228        request.serialize(&mut self.serializer)?;
229        let serialized_header = self.serializer.create_header(
230            /* len of term */ 4 + term_len + /* len of cmd */ 4 + cmd_len,
231        )?;
232
233        let serialized_cmd_len = serialize_u32(cmd_len);
234        let serialized_term_len = serialize_u32(term_len);
235
236        // Write them to self.raw_conn
237        let mut io_slices = [
238            IoSlice::new(&serialized_header),
239            IoSlice::new(&self.serializer.output),
240            IoSlice::new(&serialized_term_len),
241            IoSlice::new(term),
242            IoSlice::new(&serialized_cmd_len),
243            IoSlice::new(cmd),
244        ];
245
246        write_vectored_all(&mut self.raw_conn, &mut io_slices).await?;
247
248        for fd in fds {
249            self.send_with_fds(&[*fd]).await?;
250        }
251
252        let session_id = match self.read_response().await? {
253            SessionOpened {
254                response_id,
255                session_id,
256            } => {
257                Self::check_response_id(request_id, response_id)?;
258                session_id
259            }
260            PermissionDenied {
261                response_id,
262                reason,
263            } => {
264                Self::check_response_id(request_id, response_id)?;
265                return Err(Error::PermissionDenied(reason));
266            }
267            Failure {
268                response_id,
269                reason,
270            } => {
271                Self::check_response_id(request_id, response_id)?;
272                return Err(Error::RequestFailure(reason));
273            }
274            response => {
275                return Err(Error::invalid_server_response(
276                    &"SessionOpened, PermissionDenied or Failure",
277                    &response,
278                ))
279            }
280        };
281
282        Result::Ok(session_id)
283    }
284
285    /// Opens a new session.
286    ///
287    /// Consumes `self` so that users would not be able to create multiple sessions
288    /// or perform other operations during the session that might complicates the
289    /// handling of packets received from the ssh mux server.
290    ///
291    /// Two additional cases that the client must cope with are it receiving
292    /// a signal itself (from the ssh mux server) and the server disconnecting
293    /// without sending an exit message.
294    ///
295    /// * `fds` - must be in blocking mode
296    pub async fn open_new_session(
297        mut self,
298        session: &Session<'_>,
299        fds: &[RawFd; 3],
300    ) -> Result<EstablishedSession> {
301        let session_id = self.open_new_session_impl(session, fds).await?;
302
303        // EstablishedSession does not send any request
304        // It merely wait for response.
305        self.serializer.output = Vec::new();
306
307        Ok(EstablishedSession {
308            conn: self,
309            session_id,
310        })
311    }
312
313    /// Convenient function for opening a new sftp session, uses
314    /// `open_new_session` underlying.
315    pub async fn sftp(self, fds: &[RawFd; 3]) -> Result<EstablishedSession> {
316        let session = Session::builder()
317            .subsystem(true)
318            .term(Cow::Borrowed("".try_into().unwrap()))
319            .cmd(Cow::Borrowed("sftp".try_into().unwrap()))
320            .build();
321
322        self.open_new_session(&session, fds).await
323    }
324
325    async fn send_fwd_request(&mut self, request_id: u32, fwd: &Fwd<'_>) -> Result<()> {
326        let (fwd_mode, listen_socket, connect_socket) = fwd.as_serializable();
327        let (listen_addr, listen_port) = listen_socket.as_serializable();
328        let (connect_addr, connect_port) = connect_socket.as_ref().as_serializable();
329
330        let serialized_listen_port = serialize_u32(listen_port);
331        let serialized_connect_port = serialize_u32(connect_port);
332
333        let listen_addr_len: u32 = listen_addr.get_len_as_u32()?;
334        let connect_addr_len: u32 = connect_addr.get_len_as_u32()?;
335
336        let request = Request::OpenFwd {
337            request_id,
338            fwd_mode,
339        };
340
341        // Serialize
342        self.reset_serializer();
343
344        request.serialize(&mut self.serializer)?;
345        let serialized_header = self.serializer.create_header(
346            // len
347            4 +
348            listen_addr_len +
349            // port
350            4 +
351            // len
352            4 +
353            connect_addr_len
354            // port
355            + 4,
356        )?;
357
358        let serialized_listen_addr_len = serialize_u32(listen_addr_len);
359        let serialized_connect_addr_len = serialize_u32(connect_addr_len);
360
361        // Write them to self.raw_conn
362        let mut io_slices = [
363            IoSlice::new(&serialized_header),
364            IoSlice::new(&self.serializer.output),
365            IoSlice::new(&serialized_listen_addr_len),
366            IoSlice::new(listen_addr.into_inner()),
367            IoSlice::new(&serialized_listen_port),
368            IoSlice::new(&serialized_connect_addr_len),
369            IoSlice::new(connect_addr.into_inner()),
370            IoSlice::new(&serialized_connect_port),
371        ];
372
373        write_vectored_all(&mut self.raw_conn, &mut io_slices).await?;
374
375        Ok(())
376    }
377
378    async fn send_close_fwd_request(&mut self, request_id: u32, fwd: &Fwd<'_>) -> Result<()> {
379        let (fwd_mode, listen_socket, connect_socket) = fwd.as_serializable();
380        let (listen_addr, listen_port) = listen_socket.as_serializable();
381        let (connect_addr, connect_port) = connect_socket.as_ref().as_serializable();
382
383        let serialized_listen_port = serialize_u32(listen_port);
384        let serialized_connect_port = serialize_u32(connect_port);
385
386        let listen_addr_len: u32 = listen_addr.get_len_as_u32()?;
387        let connect_addr_len: u32 = connect_addr.get_len_as_u32()?;
388
389        let request = Request::CloseFwd {
390            request_id,
391            fwd_mode,
392        };
393
394        // Serialize
395        self.reset_serializer();
396
397        request.serialize(&mut self.serializer)?;
398        let serialized_header = self.serializer.create_header(
399            // len
400            4 +
401            listen_addr_len +
402            // port
403            4 +
404            // len
405            4 +
406            connect_addr_len
407            // port
408            + 4,
409        )?;
410
411        let serialized_listen_addr_len = serialize_u32(listen_addr_len);
412        let serialized_connect_addr_len = serialize_u32(connect_addr_len);
413
414        // Write them to self.raw_conn
415        let mut io_slices = [
416            IoSlice::new(&serialized_header),
417            IoSlice::new(&self.serializer.output),
418            IoSlice::new(&serialized_listen_addr_len),
419            IoSlice::new(listen_addr.into_inner()),
420            IoSlice::new(&serialized_listen_port),
421            IoSlice::new(&serialized_connect_addr_len),
422            IoSlice::new(connect_addr.into_inner()),
423            IoSlice::new(&serialized_connect_port),
424        ];
425
426        write_vectored_all(&mut self.raw_conn, &mut io_slices).await?;
427
428        Ok(())
429    }
430
431    /// Request for local/remote port forwarding.
432    pub async fn request_port_forward(
433        &mut self,
434        forward_type: ForwardType,
435        listen_socket: &Socket<'_>,
436        connect_socket: &Socket<'_>,
437    ) -> Result<()> {
438        use ForwardType::*;
439        use Response::*;
440
441        let fwd = match forward_type {
442            Local => Fwd::Local {
443                listen_socket,
444                connect_socket,
445            },
446            Remote => Fwd::Remote {
447                listen_socket,
448                connect_socket,
449            },
450        };
451
452        let request_id = self.get_request_id();
453        self.send_fwd_request(request_id, &fwd).await?;
454
455        match self.read_response().await? {
456            Ok { response_id } => Self::check_response_id(request_id, response_id),
457            PermissionDenied {
458                response_id,
459                reason,
460            } => {
461                Self::check_response_id(request_id, response_id)?;
462                Err(Error::PermissionDenied(reason))
463            }
464            Failure {
465                response_id,
466                reason,
467            } => {
468                Self::check_response_id(request_id, response_id)?;
469                Err(Error::RequestFailure(reason))
470            }
471            response => Err(Error::invalid_server_response(
472                &"Ok, PermissionDenied or Failure",
473                &response,
474            )),
475        }
476    }
477
478    /// Request for local/remote port forwarding closure.
479    pub async fn close_port_forward(
480        &mut self,
481        forward_type: ForwardType,
482        listen_socket: &Socket<'_>,
483        connect_socket: &Socket<'_>,
484    ) -> Result<()> {
485        use ForwardType::*;
486        use Response::*;
487
488        let fwd = match forward_type {
489            Local => Fwd::Local {
490                listen_socket,
491                connect_socket,
492            },
493            Remote => Fwd::Remote {
494                listen_socket,
495                connect_socket,
496            },
497        };
498
499        let request_id = self.get_request_id();
500        self.send_close_fwd_request(request_id, &fwd).await?;
501
502        match self.read_response().await? {
503            Ok { response_id } => Self::check_response_id(request_id, response_id),
504            PermissionDenied {
505                response_id,
506                reason,
507            } => {
508                Self::check_response_id(request_id, response_id)?;
509                Err(Error::PermissionDenied(reason))
510            }
511            Failure {
512                response_id,
513                reason,
514            } => {
515                Self::check_response_id(request_id, response_id)?;
516                Err(Error::RequestFailure(reason))
517            }
518            response => Err(Error::invalid_server_response(
519                &"Ok, PermissionDenied or Failure",
520                &response,
521            )),
522        }
523    }
524
525    /// **UNTESTED** Return remote port opened for dynamic forwarding.
526    pub async fn request_dynamic_forward(
527        &mut self,
528        listen_socket: &Socket<'_>,
529    ) -> Result<NonZeroU32> {
530        use Response::*;
531
532        let fwd = Fwd::Dynamic { listen_socket };
533
534        let request_id = self.get_request_id();
535        self.send_fwd_request(request_id, &fwd).await?;
536
537        match self.read_response().await? {
538            RemotePort {
539                response_id,
540                remote_port,
541            } => {
542                Self::check_response_id(request_id, response_id)?;
543                NonZeroU32::new(remote_port).ok_or(Error::InvalidPort)
544            }
545            PermissionDenied {
546                response_id,
547                reason,
548            } => {
549                Self::check_response_id(request_id, response_id)?;
550                Err(Error::PermissionDenied(reason))
551            }
552            Failure {
553                response_id,
554                reason,
555            } => {
556                Self::check_response_id(request_id, response_id)?;
557                Err(Error::RequestFailure(reason))
558            }
559            response => Err(Error::invalid_server_response(
560                &"RemotePort, PermissionDenied or Failure",
561                &response,
562            )),
563        }
564    }
565
566    /// Request the master to stop accepting new multiplexing requests
567    /// and remove its listener socket.
568    pub async fn request_stop_listening(&mut self) -> Result<()> {
569        use Response::*;
570
571        let request_id = self.get_request_id();
572        self.write(&Request::StopListening { request_id }).await?;
573
574        match self.read_response().await? {
575            Ok { response_id } => {
576                Self::check_response_id(request_id, response_id)?;
577                Result::Ok(())
578            }
579            PermissionDenied {
580                response_id,
581                reason,
582            } => {
583                Self::check_response_id(request_id, response_id)?;
584                Err(Error::PermissionDenied(reason))
585            }
586            Failure {
587                response_id,
588                reason,
589            } => {
590                Self::check_response_id(request_id, response_id)?;
591                Err(Error::RequestFailure(reason))
592            }
593            response => Err(Error::invalid_server_response(
594                &"Ok, PermissionDenied or Failure",
595                &response,
596            )),
597        }
598    }
599
600    /// Request the master to stop accepting new multiplexing requests
601    /// and remove its listener socket.
602    ///
603    /// **Only suitable to use in `Drop::drop`.**
604    pub fn request_stop_listening_sync(self) -> Result<()> {
605        shutdown_mux_master_from(self.raw_conn.into_std()?)
606    }
607}
608
609#[cfg(test)]
610mod tests {
611    use super::*;
612    use crate::SessionStatus;
613
614    use std::convert::TryInto;
615    use std::env;
616    use std::io;
617    use std::os::unix::io::AsRawFd;
618    use std::time::Duration;
619
620    use tokio::io::{AsyncReadExt, AsyncWriteExt};
621    use tokio::net::{TcpListener, TcpStream};
622    use tokio::time::sleep;
623
624    use tokio_pipe::{pipe, PipeRead, PipeWrite};
625
626    const PATH: &str = "/tmp/openssh-mux-client-test.socket";
627
628    macro_rules! run_test {
629        ( $test_name:ident, $func:ident ) => {
630            #[tokio::test(flavor = "current_thread")]
631            async fn $test_name() {
632                $func(Connection::connect(PATH).await.unwrap()).await;
633            }
634        };
635    }
636
637    macro_rules! run_test2 {
638        ( $test_name:ident, $func:ident ) => {
639            #[tokio::test(flavor = "current_thread")]
640            async fn $test_name() {
641                $func(
642                    Connection::connect(PATH).await.unwrap(),
643                    Connection::connect(PATH).await.unwrap(),
644                )
645                .await;
646            }
647        };
648    }
649
650    async fn test_connect_impl(_conn: Connection) {}
651    run_test!(test_unordered_connect, test_connect_impl);
652
653    async fn test_alive_check_impl(mut conn: Connection) {
654        let expected_pid = env::var("ControlMasterPID").unwrap();
655        let expected_pid: u32 = expected_pid.parse().unwrap();
656
657        let actual_pid = conn.send_alive_check().await.unwrap().get();
658        assert_eq!(expected_pid, actual_pid);
659    }
660    run_test!(test_unordered_alive_check, test_alive_check_impl);
661
662    async fn test_roundtrip<const SIZE: usize>(
663        stdios: &mut (PipeWrite, PipeRead),
664        data: &'static [u8; SIZE],
665    ) {
666        stdios.0.write_all(data).await.unwrap();
667
668        let mut buffer = [0_u8; SIZE];
669        stdios.1.read_exact(&mut buffer).await.unwrap();
670
671        assert_eq!(data, &buffer);
672    }
673
674    async fn create_remote_process(
675        conn: Connection,
676        cmd: &str,
677    ) -> (EstablishedSession, (PipeWrite, PipeRead)) {
678        let session = Session::builder()
679            .cmd(Cow::Borrowed(cmd.try_into().unwrap()))
680            .build();
681
682        // pipe() returns (PipeRead, PipeWrite)
683        let (stdin_read, stdin_write) = pipe().unwrap();
684        let (stdout_read, stdout_write) = pipe().unwrap();
685
686        let established_session = conn
687            .open_new_session(
688                &session,
689                &[
690                    stdin_read.as_raw_fd(),
691                    stdout_write.as_raw_fd(),
692                    io::stderr().as_raw_fd(),
693                ],
694            )
695            .await
696            .unwrap();
697
698        (established_session, (stdin_write, stdout_read))
699    }
700
701    async fn test_open_new_session_impl(conn: Connection) {
702        let (established_session, mut stdios) = create_remote_process(conn, "/bin/cat").await;
703
704        // All test data here must end with '\n', otherwise cat would output nothing
705        // and the test would hang forever.
706
707        test_roundtrip(&mut stdios, b"0134131dqwdqdx13as\n").await;
708        test_roundtrip(&mut stdios, b"Whats' Up?\n").await;
709
710        drop(stdios);
711
712        let session_status = established_session.wait().await.unwrap();
713        assert_matches!(
714            session_status,
715            SessionStatus::Exited { exit_value, .. }
716                if exit_value.unwrap() == 0
717        );
718    }
719    run_test!(test_unordered_open_new_session, test_open_new_session_impl);
720
721    async fn test_remote_socket_forward_impl(mut conn0: Connection, mut conn1: Connection) {
722        let path = Path::new("/tmp/openssh-remote-forward.socket");
723
724        let output_listener = TcpListener::bind(("127.0.0.1", 1234)).await.unwrap();
725
726        eprintln!("Requesting port forward");
727        conn0
728            .request_port_forward(
729                ForwardType::Remote,
730                &Socket::UnixSocket { path: path.into() },
731                &Socket::TcpSocket {
732                    port: 1234,
733                    host: "127.0.0.1".into(),
734                },
735            )
736            .await
737            .unwrap();
738
739        eprintln!("Creating remote process");
740        let cmd = format!("/usr/bin/socat OPEN:/data,rdonly UNIX-CONNECT:{:#?}", path);
741        let (established_session, stdios) = create_remote_process(conn0, &cmd).await;
742
743        eprintln!("Waiting for connection");
744        let (mut output, _addr) = output_listener.accept().await.unwrap();
745
746        eprintln!("Reading");
747
748        const DATA: &[u8] = "0\n1\n2\n3\n4\n5\n6\n7\n8\n9\n10\n".as_bytes();
749
750        let mut buffer = [0_u8; DATA.len()];
751        output.read_exact(&mut buffer).await.unwrap();
752
753        assert_eq!(DATA, &buffer);
754
755        eprintln!("Closing port forward");
756        conn1
757            .close_port_forward(
758                ForwardType::Remote,
759                &Socket::UnixSocket { path: path.into() },
760                &Socket::TcpSocket {
761                    port: 1234,
762                    host: "127.0.0.1".into(),
763                },
764            )
765            .await
766            .unwrap();
767
768        eprintln!("Checking whether the forwarded socket is closed");
769        assert_eq!(output.read(&mut buffer).await.unwrap(), 0);
770
771        drop(output_listener);
772        drop(output);
773        drop(stdios);
774
775        eprintln!("Waiting for session to end");
776        let session_status = established_session.wait().await.unwrap();
777        assert_matches!(
778            session_status,
779            SessionStatus::Exited { exit_value, .. }
780                if exit_value.unwrap() == 0
781        );
782    }
783    run_test2!(
784        test_unordered_remote_socket_forward,
785        test_remote_socket_forward_impl
786    );
787
788    async fn test_local_socket_forward_impl(conn0: Connection, mut conn1: Connection) {
789        let path: Cow<'_, _> = Path::new("/tmp/openssh-local-forward.socket").into();
790
791        eprintln!("Creating remote process");
792        let cmd = format!("socat -u OPEN:/data UNIX-LISTEN:{:#?} >/dev/stderr", path);
793        let (established_session, stdios) = create_remote_process(conn0, &cmd).await;
794
795        sleep(Duration::from_secs(1)).await;
796
797        eprintln!("Requesting port forward");
798        conn1
799            .request_port_forward(
800                ForwardType::Local,
801                &Socket::TcpSocket {
802                    port: 1235,
803                    host: "127.0.0.1".into(),
804                },
805                &Socket::UnixSocket { path: path.clone() },
806            )
807            .await
808            .unwrap();
809
810        eprintln!("Connecting to forwarded socket");
811        let mut output = TcpStream::connect(("127.0.0.1", 1235)).await.unwrap();
812
813        eprintln!("Reading");
814
815        const DATA: &[u8] = "0\n1\n2\n3\n4\n5\n6\n7\n8\n9\n10\n".as_bytes();
816        let mut buffer = [0_u8; DATA.len()];
817        output.read_exact(&mut buffer).await.unwrap();
818
819        assert_eq!(DATA, buffer);
820
821        eprintln!("Closing port forward");
822        conn1
823            .close_port_forward(
824                ForwardType::Local,
825                &Socket::TcpSocket {
826                    port: 1235,
827                    host: "127.0.0.1".into(),
828                },
829                &Socket::UnixSocket { path },
830            )
831            .await
832            .unwrap();
833
834        eprintln!("Checking whether the forwarded socket is closed");
835        assert_eq!(output.read(&mut buffer).await.unwrap(), 0);
836
837        drop(output);
838        drop(stdios);
839
840        eprintln!("Waiting for session to end");
841        let session_status = established_session.wait().await.unwrap();
842        assert_matches!(
843            session_status,
844            SessionStatus::Exited { exit_value, .. }
845                if exit_value.unwrap() == 0
846        );
847    }
848    run_test2!(
849        test_unordered_local_socket_forward,
850        test_local_socket_forward_impl
851    );
852
853    async fn test_request_stop_listening_impl(mut conn: Connection) {
854        conn.request_stop_listening().await.unwrap();
855
856        eprintln!("Verify that existing connection is still usable.");
857        test_open_new_session_impl(conn).await;
858
859        eprintln!(
860            "Verify that after the last connection is dropped, the multiplex server \
861            indeed shutdown."
862        );
863        assert_matches!(Connection::connect(PATH).await, Err(_));
864    }
865    run_test!(
866        test_request_stop_listening,
867        test_request_stop_listening_impl
868    );
869}