1use futures_util::FutureExt;
10use mysql_common::{
11 constants::MAX_PAYLOAD_LEN,
12 io::ParseBuf,
13 proto::{Binary, Text},
14 row::RowDeserializer,
15 value::ServerSide,
16};
17
18use std::{fmt, sync::Arc};
19
20use self::{
21 query_result::QueryResult,
22 stmt::Statement,
23 transaction::{Transaction, TxStatus},
24};
25
26use crate::{
27 conn::routines::{PingRoutine, QueryRoutine},
28 consts::CapabilityFlags,
29 error::*,
30 prelude::{FromRow, StatementLike},
31 query::AsQuery,
32 queryable::query_result::ResultSetMeta,
33 tracing_utils::{LevelInfo, LevelTrace, TracingLevel},
34 BoxFuture, Column, Conn, Connection, Params, ResultSetStream, Row,
35};
36
37pub mod query_result;
38pub mod stmt;
39pub mod transaction;
40
41pub trait Protocol: fmt::Debug + Send + Sync + 'static {
42 fn result_set_meta(columns: Arc<[Column]>) -> ResultSetMeta;
44 fn read_result_set_row(packet: &[u8], columns: Arc<[Column]>) -> Result<Row>;
45 fn is_last_result_set_packet(capabilities: CapabilityFlags, packet: &[u8]) -> bool {
46 if capabilities.contains(CapabilityFlags::CLIENT_DEPRECATE_EOF) {
47 packet[0] == 0xFE && packet.len() < MAX_PAYLOAD_LEN
48 } else {
49 packet[0] == 0xFE && packet.len() < 8
50 }
51 }
52}
53
54#[derive(Debug)]
56pub struct TextProtocol;
57
58#[derive(Debug)]
60pub struct BinaryProtocol;
61
62impl Protocol for TextProtocol {
63 fn result_set_meta(columns: Arc<[Column]>) -> ResultSetMeta {
64 ResultSetMeta::Text(columns)
65 }
66
67 fn read_result_set_row(packet: &[u8], columns: Arc<[Column]>) -> Result<Row> {
68 ParseBuf(packet)
69 .parse::<RowDeserializer<ServerSide, Text>>(columns)
70 .map(Into::into)
71 .map_err(Into::into)
72 }
73}
74
75impl Protocol for BinaryProtocol {
76 fn result_set_meta(columns: Arc<[Column]>) -> ResultSetMeta {
77 ResultSetMeta::Binary(columns)
78 }
79
80 fn read_result_set_row(packet: &[u8], columns: Arc<[Column]>) -> Result<Row> {
81 ParseBuf(packet)
82 .parse::<RowDeserializer<ServerSide, Binary>>(columns)
83 .map(Into::into)
84 .map_err(Into::into)
85 }
86}
87
88impl Conn {
89 pub(crate) async fn clean_dirty(&mut self) -> Result<()> {
97 self.drop_result().await?;
98 if self.get_tx_status() == TxStatus::RequiresRollback {
99 self.rollback_transaction().await?;
100 }
101 Ok(())
102 }
103
104 pub(crate) async fn raw_query<'a, Q, L: TracingLevel>(&'a mut self, query: Q) -> Result<()>
106 where
107 Q: AsQuery + 'a,
108 {
109 self.routine(QueryRoutine::<'_, L>::new(query.as_query().as_ref()))
110 .await
111 }
112
113 pub(crate) fn query_internal<'a, T, Q>(&'a mut self, query: Q) -> BoxFuture<'a, Option<T>>
118 where
119 Q: AsQuery + 'a,
120 T: FromRow + Send + 'static,
121 {
122 async move {
123 self.raw_query::<'_, _, LevelTrace>(query).await?;
124 Ok(QueryResult::<'_, '_, TextProtocol>::new(self)
125 .collect_and_drop::<T>()
126 .await?
127 .pop())
128 }
129 .boxed()
130 }
131}
132
133pub trait Queryable: Send {
137 fn ping(&mut self) -> BoxFuture<'_, ()>;
139
140 fn query_iter<'a, Q>(
142 &'a mut self,
143 query: Q,
144 ) -> BoxFuture<'a, QueryResult<'a, 'static, TextProtocol>>
145 where
146 Q: AsQuery + 'a;
147
148 fn prep<'a, Q>(&'a mut self, query: Q) -> BoxFuture<'a, Statement>
157 where
158 Q: AsQuery + 'a;
159
160 fn close(&mut self, stmt: Statement) -> BoxFuture<'_, ()>;
167
168 fn exec_iter<'a: 's, 's, Q, P>(
172 &'a mut self,
173 stmt: Q,
174 params: P,
175 ) -> BoxFuture<'s, QueryResult<'a, 'static, BinaryProtocol>>
176 where
177 Q: StatementLike + 'a,
178 P: Into<Params>;
179
180 fn query<'a, T, Q>(&'a mut self, query: Q) -> BoxFuture<'a, Vec<T>>
188 where
189 Q: AsQuery + 'a,
190 T: FromRow + Send + 'static,
191 {
192 async move { self.query_iter(query).await?.collect_and_drop::<T>().await }.boxed()
193 }
194
195 fn query_first<'a, T, Q>(&'a mut self, query: Q) -> BoxFuture<'a, Option<T>>
203 where
204 Q: AsQuery + 'a,
205 T: FromRow + Send + 'static,
206 {
207 async move {
208 let mut result = self.query_iter(query).await?;
209 let output = if result.is_empty() {
210 None
211 } else {
212 result.next().await?.map(crate::from_row)
213 };
214 result.drop_result().await?;
215 Ok(output)
216 }
217 .boxed()
218 }
219
220 fn query_map<'a, T, F, Q, U>(&'a mut self, query: Q, mut f: F) -> BoxFuture<'a, Vec<U>>
228 where
229 Q: AsQuery + 'a,
230 T: FromRow + Send + 'static,
231 F: FnMut(T) -> U + Send + 'a,
232 U: Send,
233 {
234 async move {
235 self.query_fold(query, Vec::new(), |mut acc, row| {
236 acc.push(f(crate::from_row(row)));
237 acc
238 })
239 .await
240 }
241 .boxed()
242 }
243
244 fn query_fold<'a, T, F, Q, U>(&'a mut self, query: Q, init: U, mut f: F) -> BoxFuture<'a, U>
252 where
253 Q: AsQuery + 'a,
254 T: FromRow + Send + 'static,
255 F: FnMut(U, T) -> U + Send + 'a,
256 U: Send + 'a,
257 {
258 async move {
259 self.query_iter(query)
260 .await?
261 .reduce_and_drop(init, |acc, row| f(acc, crate::from_row(row)))
262 .await
263 }
264 .boxed()
265 }
266
267 fn query_drop<'a, Q>(&'a mut self, query: Q) -> BoxFuture<'a, ()>
269 where
270 Q: AsQuery + 'a,
271 {
272 async move { self.query_iter(query).await?.drop_result().await }.boxed()
273 }
274
275 fn exec_batch<'a: 'b, 'b, S, P, I>(&'a mut self, stmt: S, params_iter: I) -> BoxFuture<'b, ()>
279 where
280 S: StatementLike + 'b,
281 I: IntoIterator<Item = P> + Send + 'b,
282 I::IntoIter: Send,
283 P: Into<Params> + Send;
284
285 fn exec<'a: 'b, 'b, T, S, P>(&'a mut self, stmt: S, params: P) -> BoxFuture<'b, Vec<T>>
295 where
296 S: StatementLike + 'b,
297 P: Into<Params> + Send + 'b,
298 T: FromRow + Send + 'static,
299 {
300 async move {
301 self.exec_iter(stmt, params)
302 .await?
303 .collect_and_drop::<T>()
304 .await
305 }
306 .boxed()
307 }
308
309 fn exec_first<'a: 'b, 'b, T, S, P>(&'a mut self, stmt: S, params: P) -> BoxFuture<'b, Option<T>>
319 where
320 S: StatementLike + 'b,
321 P: Into<Params> + Send + 'b,
322 T: FromRow + Send + 'static,
323 {
324 async move {
325 let mut result = self.exec_iter(stmt, params).await?;
326 let row = if result.is_empty() {
327 None
328 } else {
329 result.next().await?
330 };
331 result.drop_result().await?;
332 Ok(row.map(crate::from_row))
333 }
334 .boxed()
335 }
336
337 fn exec_map<'a: 'b, 'b, T, S, P, U, F>(
347 &'a mut self,
348 stmt: S,
349 params: P,
350 mut f: F,
351 ) -> BoxFuture<'b, Vec<U>>
352 where
353 S: StatementLike + 'b,
354 P: Into<Params> + Send + 'b,
355 T: FromRow + Send + 'static,
356 F: FnMut(T) -> U + Send + 'a,
357 U: Send + 'a,
358 {
359 async move {
360 self.exec_fold(stmt, params, Vec::new(), |mut acc, row| {
361 acc.push(f(crate::from_row(row)));
362 acc
363 })
364 .await
365 }
366 .boxed()
367 }
368
369 fn exec_fold<'a: 'b, 'b, T, S, P, U, F>(
379 &'a mut self,
380 stmt: S,
381 params: P,
382 init: U,
383 mut f: F,
384 ) -> BoxFuture<'b, U>
385 where
386 S: StatementLike + 'b,
387 P: Into<Params> + Send + 'b,
388 T: FromRow + Send + 'static,
389 F: FnMut(U, T) -> U + Send + 'a,
390 U: Send + 'a,
391 {
392 async move {
393 self.exec_iter(stmt, params)
394 .await?
395 .reduce_and_drop(init, |acc, row| f(acc, crate::from_row(row)))
396 .await
397 }
398 .boxed()
399 }
400
401 fn exec_drop<'a: 'b, 'b, S, P>(&'a mut self, stmt: S, params: P) -> BoxFuture<'b, ()>
403 where
404 S: StatementLike + 'b,
405 P: Into<Params> + Send + 'b,
406 {
407 async move { self.exec_iter(stmt, params).await?.drop_result().await }.boxed()
408 }
409
410 fn query_stream<'a, T, Q>(
416 &'a mut self,
417 query: Q,
418 ) -> BoxFuture<'a, ResultSetStream<'a, 'a, 'static, T, TextProtocol>>
419 where
420 T: Unpin + FromRow + Send + 'static,
421 Q: AsQuery + 'a,
422 {
423 async move {
424 self.query_iter(query)
425 .await?
426 .stream_and_drop()
427 .await
428 .transpose()
429 .expect("At least one result set is expected")
430 }
431 .boxed()
432 }
433
434 fn exec_stream<'a: 's, 's, T, Q, P>(
440 &'a mut self,
441 stmt: Q,
442 params: P,
443 ) -> BoxFuture<'s, ResultSetStream<'a, 'a, 'static, T, BinaryProtocol>>
444 where
445 T: Unpin + FromRow + Send + 'static,
446 Q: StatementLike + 'a,
447 P: Into<Params> + Send + 's,
448 {
449 async move {
450 self.exec_iter(stmt, params)
451 .await?
452 .stream_and_drop()
453 .await
454 .transpose()
455 .expect("At least one result set is expected")
456 }
457 .boxed()
458 }
459}
460
461impl Queryable for Conn {
462 fn ping(&mut self) -> BoxFuture<'_, ()> {
463 async move {
464 self.routine(PingRoutine).await?;
465 Ok(())
466 }
467 .boxed()
468 }
469
470 fn query_iter<'a, Q>(
471 &'a mut self,
472 query: Q,
473 ) -> BoxFuture<'a, QueryResult<'a, 'static, TextProtocol>>
474 where
475 Q: AsQuery + 'a,
476 {
477 async move {
478 self.raw_query::<'_, _, LevelInfo>(query).await?;
479 Ok(QueryResult::new(self))
480 }
481 .boxed()
482 }
483
484 fn prep<'a, Q>(&'a mut self, query: Q) -> BoxFuture<'a, Statement>
485 where
486 Q: AsQuery + 'a,
487 {
488 async move { self.get_statement(query.as_query()).await }.boxed()
489 }
490
491 fn close(&mut self, stmt: Statement) -> BoxFuture<'_, ()> {
492 async move {
493 self.stmt_cache_mut().remove(stmt.id());
494 self.close_statement(stmt.id()).await
495 }
496 .boxed()
497 }
498
499 fn exec_iter<'a: 's, 's, Q, P>(
500 &'a mut self,
501 stmt: Q,
502 params: P,
503 ) -> BoxFuture<'s, QueryResult<'a, 'static, BinaryProtocol>>
504 where
505 Q: StatementLike + 'a,
506 P: Into<Params>,
507 {
508 let params = params.into();
509 async move {
510 let statement = self.get_statement(stmt).await?;
511 self.execute_statement(&statement, params).await?;
512 Ok(QueryResult::new(self))
513 }
514 .boxed()
515 }
516
517 fn exec_batch<'a: 'b, 'b, S, P, I>(&'a mut self, stmt: S, params_iter: I) -> BoxFuture<'b, ()>
518 where
519 S: StatementLike + 'b,
520 I: IntoIterator<Item = P> + Send + 'b,
521 I::IntoIter: Send,
522 P: Into<Params> + Send,
523 {
524 async move {
525 let statement = self.get_statement(stmt).await?;
526 for params in params_iter {
527 self.execute_statement(&statement, params).await?;
528 QueryResult::<BinaryProtocol>::new(&mut *self)
529 .drop_result()
530 .await?;
531 }
532 Ok(())
533 }
534 .boxed()
535 }
536}
537
538impl Queryable for Transaction<'_> {
539 fn ping(&mut self) -> BoxFuture<'_, ()> {
540 self.0.as_mut().ping()
541 }
542
543 fn query_iter<'a, Q>(
544 &'a mut self,
545 query: Q,
546 ) -> BoxFuture<'a, QueryResult<'a, 'static, TextProtocol>>
547 where
548 Q: AsQuery + 'a,
549 {
550 self.0.as_mut().query_iter(query)
551 }
552
553 fn prep<'a, Q>(&'a mut self, query: Q) -> BoxFuture<'a, Statement>
554 where
555 Q: AsQuery + 'a,
556 {
557 self.0.as_mut().prep(query)
558 }
559
560 fn close(&mut self, stmt: Statement) -> BoxFuture<'_, ()> {
561 self.0.as_mut().close(stmt)
562 }
563
564 fn exec_iter<'a: 's, 's, Q, P>(
565 &'a mut self,
566 stmt: Q,
567 params: P,
568 ) -> BoxFuture<'s, QueryResult<'a, 'static, BinaryProtocol>>
569 where
570 Q: StatementLike + 'a,
571 P: Into<Params>,
572 {
573 self.0.as_mut().exec_iter(stmt, params)
574 }
575
576 fn exec_batch<'a: 'b, 'b, S, P, I>(&'a mut self, stmt: S, params_iter: I) -> BoxFuture<'b, ()>
577 where
578 S: StatementLike + 'b,
579 I: IntoIterator<Item = P> + Send + 'b,
580 I::IntoIter: Send,
581 P: Into<Params> + Send,
582 {
583 self.0.as_mut().exec_batch(stmt, params_iter)
584 }
585}
586
587impl<'c, 't: 'c> Queryable for Connection<'c, 't> {
588 #[inline]
589 fn ping(&mut self) -> BoxFuture<'_, ()> {
590 self.as_mut().ping()
591 }
592
593 #[inline]
594 fn query_iter<'a, Q>(
595 &'a mut self,
596 query: Q,
597 ) -> BoxFuture<'a, QueryResult<'a, 'static, TextProtocol>>
598 where
599 Q: AsQuery + 'a,
600 {
601 self.as_mut().query_iter(query)
602 }
603
604 fn prep<'a, Q>(&'a mut self, query: Q) -> BoxFuture<'a, Statement>
605 where
606 Q: AsQuery + 'a,
607 {
608 self.as_mut().prep(query)
609 }
610
611 fn close(&mut self, stmt: Statement) -> BoxFuture<'_, ()> {
612 self.as_mut().close(stmt)
613 }
614
615 fn exec_iter<'a: 's, 's, Q, P>(
616 &'a mut self,
617 stmt: Q,
618 params: P,
619 ) -> BoxFuture<'s, QueryResult<'a, 'static, BinaryProtocol>>
620 where
621 Q: StatementLike + 'a,
622 P: Into<Params>,
623 {
624 self.as_mut().exec_iter(stmt, params)
625 }
626
627 fn exec_batch<'a: 'b, 'b, S, P, I>(&'a mut self, stmt: S, params_iter: I) -> BoxFuture<'b, ()>
628 where
629 S: StatementLike + 'b,
630 I: IntoIterator<Item = P> + Send + 'b,
631 I::IntoIter: Send,
632 P: Into<Params> + Send,
633 {
634 self.as_mut().exec_batch(stmt, params_iter)
635 }
636}
637
638#[cfg(test)]
639mod tests {
640 use crate::{error::Result, prelude::*, test_misc::get_opts, Conn};
641
642 #[tokio::test]
643 async fn should_prep() -> Result<()> {
644 const NAMED: &str = "SELECT :foo, :bar, :foo";
645 const POSITIONAL: &str = "SELECT ?, ?, ?";
646
647 let mut conn = Conn::new(get_opts()).await?;
648
649 let stmt_named = conn.prep(NAMED).await?;
650 let stmt_positional = conn.prep(POSITIONAL).await?;
651
652 let result_stmt_named: Option<(String, u8, String)> = conn
653 .exec_first(&stmt_named, params! { "foo" => "bar", "bar" => 42 })
654 .await?;
655 let result_str_named: Option<(String, u8, String)> = conn
656 .exec_first(NAMED, params! { "foo" => "bar", "bar" => 42 })
657 .await?;
658
659 let result_stmt_positional: Option<(String, u8, String)> = conn
660 .exec_first(&stmt_positional, ("bar", 42, "bar"))
661 .await?;
662 let result_str_positional: Option<(String, u8, String)> =
663 conn.exec_first(NAMED, ("bar", 42, "bar")).await?;
664
665 assert_eq!(
666 Some(("bar".to_owned(), 42_u8, "bar".to_owned())),
667 result_stmt_named
668 );
669 assert_eq!(result_stmt_named, result_str_named);
670 assert_eq!(result_str_named, result_stmt_positional);
671 assert_eq!(result_stmt_positional, result_str_positional);
672
673 conn.disconnect().await?;
674
675 Ok(())
676 }
677}