mysql_async/io/
write_packet.rs
1use futures_core::ready;
10use futures_sink::Sink;
11
12use std::{
13 future::Future,
14 io::{Error, ErrorKind::UnexpectedEof},
15 pin::Pin,
16 task::{Context, Poll},
17};
18
19use crate::{buffer_pool::PooledBuf, connection_like::Connection, error::IoError};
20
21#[derive(Debug)]
23#[must_use = "futures do nothing unless you `.await` or poll them"]
24pub struct WritePacket<'a, 't> {
25 conn: Connection<'a, 't>,
26 data: Option<PooledBuf>,
27}
28
29impl<'a, 't> WritePacket<'a, 't> {
30 pub(crate) fn new<T: Into<Connection<'a, 't>>>(conn: T, data: PooledBuf) -> Self {
31 Self {
32 conn: conn.into(),
33 data: Some(data),
34 }
35 }
36}
37
38impl Future for WritePacket<'_, '_> {
39 type Output = std::result::Result<(), IoError>;
40
41 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
42 let Self {
43 ref mut conn,
44 ref mut data,
45 } = *self;
46
47 match conn.stream_mut() {
48 Ok(stream) => {
49 if data.is_some() {
50 let codec = Pin::new(stream.codec.as_mut().expect("must be here"));
51 ready!(codec.poll_ready(cx))?;
52 }
53
54 if let Some(data) = data.take() {
55 let codec = Pin::new(stream.codec.as_mut().expect("must be here"));
56 codec.start_send(data)?;
58 }
59
60 let codec = Pin::new(stream.codec.as_mut().expect("must be here"));
61
62 ready!(codec.poll_flush(cx))?;
63
64 Poll::Ready(Ok(()))
65 }
66 Err(err) => Poll::Ready(Err(Error::new(UnexpectedEof, err).into())),
67 }
68 }
69}