use asynchronous_codec::BytesMut;
use futures::{AsyncRead, AsyncWrite};
use tracing::{event, Level};
use crate::{
client::Connection, sql_read_bytes::SqlReadBytes, BytesMutWithDataColumns, ExecuteResult,
};
use super::{
Encode, MetaDataColumn, PacketHeader, PacketStatus, TokenColMetaData, TokenDone, TokenRow,
HEADER_BYTES,
};
#[derive(Debug)]
pub struct BulkLoadRequest<'a, S>
where
S: AsyncRead + AsyncWrite + Unpin + Send,
{
connection: &'a mut Connection<S>,
packet_id: u8,
buf: BytesMut,
columns: Vec<MetaDataColumn<'a>>,
}
impl<'a, S> BulkLoadRequest<'a, S>
where
S: AsyncRead + AsyncWrite + Unpin + Send,
{
pub(crate) fn new(
connection: &'a mut Connection<S>,
columns: Vec<MetaDataColumn<'a>>,
) -> crate::Result<Self> {
let packet_id = connection.context_mut().next_packet_id();
let mut buf = BytesMut::new();
let cmd = TokenColMetaData {
columns: columns.clone(),
};
cmd.encode(&mut buf)?;
let this = Self {
connection,
packet_id,
buf,
columns,
};
Ok(this)
}
pub async fn send(&mut self, row: TokenRow<'a>) -> crate::Result<()> {
let mut buf_with_columns = BytesMutWithDataColumns::new(&mut self.buf, &self.columns);
row.encode(&mut buf_with_columns)?;
self.write_packets().await?;
Ok(())
}
pub async fn finalize(mut self) -> crate::Result<ExecuteResult> {
TokenDone::default().encode(&mut self.buf)?;
self.write_packets().await?;
let mut header = PacketHeader::bulk_load(self.packet_id);
header.set_status(PacketStatus::EndOfMessage);
let data = self.buf.split();
event!(
Level::TRACE,
"Finalizing a bulk insert ({} bytes)",
data.len() + HEADER_BYTES,
);
self.connection.write_to_wire(header, data).await?;
self.connection.flush_sink().await?;
ExecuteResult::new(self.connection).await
}
async fn write_packets(&mut self) -> crate::Result<()> {
let packet_size = (self.connection.context().packet_size() as usize) - HEADER_BYTES;
while self.buf.len() > packet_size {
let header = PacketHeader::bulk_load(self.packet_id);
let data = self.buf.split_to(packet_size);
event!(
Level::TRACE,
"Bulk insert packet ({} bytes)",
data.len() + HEADER_BYTES,
);
self.connection.write_to_wire(header, data).await?;
}
Ok(())
}
}