mysql_async/conn/routines/
helpers.rs

1//! Private routine helpers.
2
3use std::sync::Arc;
4
5use futures_util::StreamExt;
6use mysql_common::{
7    constants::MAX_PAYLOAD_LEN,
8    io::{ParseBuf, ReadMysqlExt},
9    packets::{ComStmtSendLongData, LocalInfilePacket},
10    value::Value,
11};
12
13use crate::{error::LocalInfileError, queryable::Protocol, Conn, Error};
14
15impl Conn {
16    /// Helper, that sends all `Value::Bytes` in the given list of paramenters as long data.
17    pub(super) async fn send_long_data<'a, I>(
18        &mut self,
19        statement_id: u32,
20        params: I,
21    ) -> crate::Result<()>
22    where
23        I: Iterator<Item = &'a Value>,
24    {
25        for (i, value) in params.enumerate() {
26            if let Value::Bytes(bytes) = value {
27                let chunks = bytes.chunks(MAX_PAYLOAD_LEN - 6);
28                let chunks = chunks.chain(if bytes.is_empty() {
29                    Some(&[][..])
30                } else {
31                    None
32                });
33                for chunk in chunks {
34                    let com = ComStmtSendLongData::new(statement_id, i as u16, chunk);
35                    self.write_command(&com).await?;
36                }
37            }
38        }
39
40        Ok(())
41    }
42
43    /// Will read result set and write pending result into `self` (if any).
44    pub(super) async fn read_result_set<P>(
45        &mut self,
46        is_first_result_set: bool,
47    ) -> crate::Result<()>
48    where
49        P: Protocol,
50    {
51        let packet = match self.read_packet().await {
52            Ok(packet) => packet,
53            Err(err @ Error::Server(_)) if is_first_result_set => {
54                // shortcut to emit an error right to the caller of a query/execute
55                return Err(err);
56            }
57            Err(Error::Server(error)) => {
58                // error will be consumed as a part of a multi-result set
59                self.set_pending_result_error(error)?;
60                return Ok(());
61            }
62            Err(err) => {
63                // non-server errors are fatal
64                return Err(err);
65            }
66        };
67
68        match packet.first() {
69            Some(0x00) => {
70                self.set_pending_result(Some(P::result_set_meta(Arc::from(
71                    Vec::new().into_boxed_slice(),
72                ))))?;
73            }
74            Some(0xFB) => self.handle_local_infile::<P>(&packet).await?,
75            _ => self.handle_result_set::<P>(&packet).await?,
76        }
77
78        Ok(())
79    }
80
81    /// Will handle local infile packet.
82    pub(super) async fn handle_local_infile<P>(&mut self, packet: &[u8]) -> crate::Result<()>
83    where
84        P: Protocol,
85    {
86        let local_infile = ParseBuf(packet).parse::<LocalInfilePacket>(())?;
87
88        let mut infile_data = if let Some(handler) = self.inner.infile_handler.take() {
89            handler.await?
90        } else if let Some(handler) = self.opts().local_infile_handler() {
91            handler.handle(local_infile.file_name_ref()).await?
92        } else {
93            return Err(LocalInfileError::NoHandler.into());
94        };
95
96        let mut result = Ok(());
97        while let Some(bytes) = infile_data.next().await {
98            match bytes {
99                Ok(bytes) => {
100                    // We'll skip empty chunks to stay compliant with the protocol.
101                    if !bytes.is_empty() {
102                        self.write_bytes(&bytes).await?;
103                    }
104                }
105                Err(err) => {
106                    // Abort the stream in case of an error.
107                    result = Err(LocalInfileError::from(err));
108                    break;
109                }
110            }
111        }
112        self.write_bytes(&[]).await?;
113
114        self.read_packet().await?;
115        self.set_pending_result(Some(P::result_set_meta(Arc::from(
116            Vec::new().into_boxed_slice(),
117        ))))?;
118
119        result.map_err(Into::into)
120    }
121
122    /// Helper that handles result set packet.
123    ///
124    /// Requires that `packet` contains non-zero length-encoded integer.
125    pub(super) async fn handle_result_set<P>(&mut self, mut packet: &[u8]) -> crate::Result<()>
126    where
127        P: Protocol,
128    {
129        let column_count = packet.read_lenenc_int()?;
130        let columns = self.read_column_defs(column_count as usize).await?;
131        let meta = P::result_set_meta(Arc::from(columns.into_boxed_slice()));
132        self.set_pending_result(Some(meta))?;
133        Ok(())
134    }
135}