mysql_async/conn/routines/
helpers.rsuse std::sync::Arc;
use futures_util::StreamExt;
use mysql_common::{
constants::MAX_PAYLOAD_LEN,
io::{ParseBuf, ReadMysqlExt},
packets::{ComStmtSendLongData, LocalInfilePacket},
value::Value,
};
use crate::{error::LocalInfileError, queryable::Protocol, Conn, Error};
impl Conn {
pub(super) async fn send_long_data<'a, I>(
&mut self,
statement_id: u32,
params: I,
) -> crate::Result<()>
where
I: Iterator<Item = &'a Value>,
{
for (i, value) in params.enumerate() {
if let Value::Bytes(bytes) = value {
let chunks = bytes.chunks(MAX_PAYLOAD_LEN - 6);
let chunks = chunks.chain(if bytes.is_empty() {
Some(&[][..])
} else {
None
});
for chunk in chunks {
let com = ComStmtSendLongData::new(statement_id, i as u16, chunk);
self.write_command(&com).await?;
}
}
}
Ok(())
}
pub(super) async fn read_result_set<P>(
&mut self,
is_first_result_set: bool,
) -> crate::Result<()>
where
P: Protocol,
{
let packet = match self.read_packet().await {
Ok(packet) => packet,
Err(err @ Error::Server(_)) if is_first_result_set => {
return Err(err);
}
Err(Error::Server(error)) => {
self.set_pending_result_error(error)?;
return Ok(());
}
Err(err) => {
return Err(err);
}
};
match packet.first() {
Some(0x00) => {
self.set_pending_result(Some(P::result_set_meta(Arc::from(
Vec::new().into_boxed_slice(),
))))?;
}
Some(0xFB) => self.handle_local_infile::<P>(&packet).await?,
_ => self.handle_result_set::<P>(&packet).await?,
}
Ok(())
}
pub(super) async fn handle_local_infile<P>(&mut self, packet: &[u8]) -> crate::Result<()>
where
P: Protocol,
{
let local_infile = ParseBuf(packet).parse::<LocalInfilePacket>(())?;
let mut infile_data = if let Some(handler) = self.inner.infile_handler.take() {
handler.await?
} else if let Some(handler) = self.opts().local_infile_handler() {
handler.handle(local_infile.file_name_ref()).await?
} else {
return Err(LocalInfileError::NoHandler.into());
};
let mut result = Ok(());
while let Some(bytes) = infile_data.next().await {
match bytes {
Ok(bytes) => {
if !bytes.is_empty() {
self.write_bytes(&bytes).await?;
}
}
Err(err) => {
result = Err(LocalInfileError::from(err));
break;
}
}
}
self.write_bytes(&[]).await?;
self.read_packet().await?;
self.set_pending_result(Some(P::result_set_meta(Arc::from(
Vec::new().into_boxed_slice(),
))))?;
result.map_err(Into::into)
}
pub(super) async fn handle_result_set<P>(&mut self, mut packet: &[u8]) -> crate::Result<()>
where
P: Protocol,
{
let column_count = packet.read_lenenc_int()?;
let columns = self.read_column_defs(column_count as usize).await?;
let meta = P::result_set_meta(Arc::from(columns.into_boxed_slice()));
self.set_pending_result(Some(meta))?;
Ok(())
}
}