azure_core/pipeline.rs
1use crate::policies::TransportPolicy;
2use crate::policies::{CustomHeadersPolicy, Policy, TelemetryPolicy};
3use crate::{ClientOptions, Context, Request, Response};
4use std::sync::Arc;
5
6/// Execution pipeline.
7///
8/// A pipeline follows a precise flow:
9///
10/// 1. Client library-specified per-call policies are executed. Per-call policies can fail and bail out of the pipeline
11/// immediately.
12/// 2. User-specified per-call policies are executed.
13/// 3. Telemetry policy.
14/// 4. Retry policy. It allows to re-execute the following policies.
15/// 5. Client library-specified per-retry policies. Per-retry polices are always executed at least once but are re-executed
16/// in case of retries.
17/// 6. User-specified per-retry policies are executed.
18/// 7. Authorization policy. Authorization can depend on the HTTP headers and/or the request body so it
19/// must be executed right before sending the request to the transport. Also, the authorization
20/// can depend on the current time so it must be executed at every retry.
21/// 8. Transport policy. Transport policy is always the last policy and is the policy that
22/// actually constructs the `Response` to be passed up the pipeline.
23///
24/// A pipeline is immutable. In other words a policy can either succeed and call the following
25/// policy of fail and return to the calling policy. Arbitrary policy "skip" must be avoided (but
26/// cannot be enforced by code). All policies except Transport policy can assume there is another following policy (so
27/// `self.pipeline[0]` is always valid).
28///
29/// The `C` generic contains the pipeline-specific context. Different crates can pass
30/// different contexts using this generic. This way each crate can have its own specific pipeline
31/// context. For example, in `CosmosDB`, the generic carries the operation-specific information used by
32/// the authorization policy.
33#[derive(Debug, Clone)]
34pub struct Pipeline {
35 pipeline: Vec<Arc<dyn Policy>>,
36}
37
38impl Pipeline {
39 /// Creates a new pipeline given the client library crate name and version,
40 /// alone with user-specified and client library-specified policies.
41 ///
42 /// Crates can simply pass `option_env!("CARGO_PKG_NAME")` and `option_env!("CARGO_PKG_VERSION")` for the
43 /// `crate_name` and `crate_version` arguments respectively.
44 pub fn new(
45 crate_name: Option<&'static str>,
46 crate_version: Option<&'static str>,
47 options: ClientOptions,
48 per_call_policies: Vec<Arc<dyn Policy>>,
49 per_retry_policies: Vec<Arc<dyn Policy>>,
50 ) -> Self {
51 let mut pipeline: Vec<Arc<dyn Policy>> = Vec::with_capacity(
52 options.per_call_policies.len()
53 + per_call_policies.len()
54 + options.per_retry_policies.len()
55 + per_retry_policies.len()
56 + 3,
57 );
58
59 pipeline.extend_from_slice(&per_call_policies);
60 pipeline.extend_from_slice(&options.per_call_policies);
61
62 let telemetry_policy = TelemetryPolicy::new(crate_name, crate_version, &options.telemetry);
63 pipeline.push(Arc::new(telemetry_policy));
64
65 pipeline.push(Arc::new(CustomHeadersPolicy::default()));
66
67 let retry_policy = options.retry.to_policy();
68 pipeline.push(retry_policy);
69
70 pipeline.extend_from_slice(&per_retry_policies);
71 pipeline.extend_from_slice(&options.per_retry_policies);
72
73 let transport: Arc<dyn Policy> = Arc::new(TransportPolicy::new(options.transport.clone()));
74
75 pipeline.push(transport);
76
77 Self { pipeline }
78 }
79
80 pub fn replace_policy(&mut self, policy: Arc<dyn Policy>, position: usize) -> Arc<dyn Policy> {
81 std::mem::replace(&mut self.pipeline[position], policy)
82 }
83
84 pub fn policies(&self) -> &[Arc<dyn Policy>] {
85 &self.pipeline
86 }
87
88 pub async fn send(&self, ctx: &Context, request: &mut Request) -> crate::Result<Response> {
89 self.pipeline[0]
90 .send(ctx, request, &self.pipeline[1..])
91 .await
92 }
93}