mysql_async/conn/routines/
helpers.rs
1use std::sync::Arc;
4
5use futures_util::StreamExt;
6use mysql_common::{
7 constants::MAX_PAYLOAD_LEN,
8 io::{ParseBuf, ReadMysqlExt},
9 packets::{ComStmtSendLongData, LocalInfilePacket},
10 value::Value,
11};
12
13use crate::{error::LocalInfileError, queryable::Protocol, Conn, Error};
14
15impl Conn {
16 pub(super) async fn send_long_data<'a, I>(
18 &mut self,
19 statement_id: u32,
20 params: I,
21 ) -> crate::Result<()>
22 where
23 I: Iterator<Item = &'a Value>,
24 {
25 for (i, value) in params.enumerate() {
26 if let Value::Bytes(bytes) = value {
27 let chunks = bytes.chunks(MAX_PAYLOAD_LEN - 6);
28 let chunks = chunks.chain(if bytes.is_empty() {
29 Some(&[][..])
30 } else {
31 None
32 });
33 for chunk in chunks {
34 let com = ComStmtSendLongData::new(statement_id, i as u16, chunk);
35 self.write_command(&com).await?;
36 }
37 }
38 }
39
40 Ok(())
41 }
42
43 pub(super) async fn read_result_set<P>(
45 &mut self,
46 is_first_result_set: bool,
47 ) -> crate::Result<()>
48 where
49 P: Protocol,
50 {
51 let packet = match self.read_packet().await {
52 Ok(packet) => packet,
53 Err(err @ Error::Server(_)) if is_first_result_set => {
54 return Err(err);
56 }
57 Err(Error::Server(error)) => {
58 self.set_pending_result_error(error)?;
60 return Ok(());
61 }
62 Err(err) => {
63 return Err(err);
65 }
66 };
67
68 match packet.first() {
69 Some(0x00) => {
70 self.set_pending_result(Some(P::result_set_meta(Arc::from(
71 Vec::new().into_boxed_slice(),
72 ))))?;
73 }
74 Some(0xFB) => self.handle_local_infile::<P>(&packet).await?,
75 _ => self.handle_result_set::<P>(&packet).await?,
76 }
77
78 Ok(())
79 }
80
81 pub(super) async fn handle_local_infile<P>(&mut self, packet: &[u8]) -> crate::Result<()>
83 where
84 P: Protocol,
85 {
86 let local_infile = ParseBuf(packet).parse::<LocalInfilePacket>(())?;
87
88 let mut infile_data = if let Some(handler) = self.inner.infile_handler.take() {
89 handler.await?
90 } else if let Some(handler) = self.opts().local_infile_handler() {
91 handler.handle(local_infile.file_name_ref()).await?
92 } else {
93 return Err(LocalInfileError::NoHandler.into());
94 };
95
96 let mut result = Ok(());
97 while let Some(bytes) = infile_data.next().await {
98 match bytes {
99 Ok(bytes) => {
100 if !bytes.is_empty() {
102 self.write_bytes(&bytes).await?;
103 }
104 }
105 Err(err) => {
106 result = Err(LocalInfileError::from(err));
108 break;
109 }
110 }
111 }
112 self.write_bytes(&[]).await?;
113
114 self.read_packet().await?;
115 self.set_pending_result(Some(P::result_set_meta(Arc::from(
116 Vec::new().into_boxed_slice(),
117 ))))?;
118
119 result.map_err(Into::into)
120 }
121
122 pub(super) async fn handle_result_set<P>(&mut self, mut packet: &[u8]) -> crate::Result<()>
126 where
127 P: Protocol,
128 {
129 let column_count = packet.read_lenenc_int()?;
130 let columns = self.read_column_defs(column_count as usize).await?;
131 let meta = P::result_set_meta(Arc::from(columns.into_boxed_slice()));
132 self.set_pending_result(Some(meta))?;
133 Ok(())
134 }
135}