1use std::fmt::{Debug, Display};
13use std::sync::Mutex;
14
15use mz_sql_parser::ast::NamedPlan;
16use tracing::{Level, span, subscriber};
17use tracing_core::{Interest, Metadata};
18use tracing_subscriber::{field, layer};
19
20use crate::explain::UsedIndexes;
21use smallvec::SmallVec;
22
23#[allow(missing_debug_implementations)]
25pub struct PlanTrace<T> {
26 filter: Option<SmallVec<[NamedPlan; 4]>>,
30 path: Mutex<String>,
33 start: Mutex<Option<std::time::Instant>>,
35 times: Mutex<Vec<std::time::Instant>>,
39 entries: Mutex<Vec<TraceEntry<T>>>,
41}
42
43#[derive(Clone, Debug)]
45pub struct TraceEntry<T> {
46 pub instant: std::time::Instant,
51 pub span_duration: std::time::Duration,
53 pub full_duration: std::time::Duration,
55 pub path: String,
57 pub plan: T,
59}
60
61pub fn trace_plan<T: Clone + 'static>(plan: &T) {
109 tracing::Span::current().with_subscriber(|(_id, subscriber)| {
110 if let Some(trace) = subscriber.downcast_ref::<PlanTrace<T>>() {
111 trace.push(plan)
112 }
113 });
114}
115
116pub fn dbg_plan<S: Display, T: Clone + 'static>(segment: S, plan: &T) {
123 span!(target: "optimizer", Level::DEBUG, "segment", path.segment = %segment).in_scope(|| {
124 trace_plan(plan);
125 });
126}
127
128pub fn dbg_misc<S: Display, T: Display>(segment: S, misc: T) {
135 span!(target: "optimizer", Level::DEBUG, "segment", path.segment = %segment).in_scope(|| {
136 trace_plan(&misc.to_string());
137 });
138}
139
140#[allow(missing_debug_implementations)]
146pub struct ContextHash(u64);
147
148impl ContextHash {
149 pub fn of<T: std::hash::Hash>(t: T) -> Self {
150 use std::collections::hash_map::DefaultHasher;
151 use std::hash::Hasher;
152
153 let mut h = DefaultHasher::new();
154 t.hash(&mut h);
155 ContextHash(h.finish())
156 }
157}
158
159impl Display for ContextHash {
160 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
161 write!(f, "{:x}", self.0 & 0xFFFFFFFu64) }
163}
164
165impl<S, T> layer::Layer<S> for PlanTrace<T>
171where
172 S: subscriber::Subscriber,
173 T: 'static,
174{
175 fn on_new_span(
176 &self,
177 attrs: &span::Attributes<'_>,
178 _id: &span::Id,
179 _ctx: layer::Context<'_, S>,
180 ) {
181 let mut path = self.path.lock().expect("path shouldn't be poisoned");
183 let segment = attrs.get_str("path.segment");
184 let segment = segment.unwrap_or_else(|| attrs.metadata().name().to_string());
185 if !path.is_empty() {
186 path.push('/');
187 }
188 path.push_str(segment.as_str());
189 }
190
191 fn on_enter(&self, _id: &span::Id, _ctx: layer::Context<'_, S>) {
192 let now = std::time::Instant::now();
193 let mut start = self.start.lock().expect("start shouldn't be poisoned");
195 start.get_or_insert(now);
196 let mut times = self.times.lock().expect("times shouldn't be poisoned");
198 times.push(now);
199 }
200
201 fn on_exit(&self, _id: &span::Id, _ctx: layer::Context<'_, S>) {
202 let mut path = self.path.lock().expect("path shouldn't be poisoned");
204 let new_len = path.rfind('/').unwrap_or(0);
205 path.truncate(new_len);
206 let mut times = self.times.lock().expect("times shouldn't be poisoned");
208 times.pop();
209 }
210}
211
212impl<S, T> layer::Filter<S> for PlanTrace<T>
213where
214 S: subscriber::Subscriber,
215 T: 'static + Clone,
216{
217 fn enabled(&self, meta: &Metadata<'_>, _cx: &layer::Context<'_, S>) -> bool {
218 self.is_enabled(meta)
219 }
220
221 fn callsite_enabled(&self, meta: &'static Metadata<'static>) -> Interest {
222 if self.is_enabled(meta) {
223 Interest::always()
224 } else {
225 Interest::never()
226 }
227 }
228}
229
230impl<T: 'static + Clone> PlanTrace<T> {
231 pub fn new(filter: Option<SmallVec<[NamedPlan; 4]>>) -> Self {
234 Self {
235 filter,
236 path: Mutex::new(String::with_capacity(256)),
237 start: Mutex::new(None),
238 times: Mutex::new(Default::default()),
239 entries: Mutex::new(Default::default()),
240 }
241 }
242
243 fn is_enabled(&self, meta: &Metadata<'_>) -> bool {
246 meta.is_span() && meta.target() == "optimizer"
247 }
248
249 pub fn drain_as_vec(&self) -> Vec<TraceEntry<T>> {
255 let mut entries = self.entries.lock().expect("entries shouldn't be poisoned");
256 entries.split_off(0)
257 }
258
259 pub fn collect_as_vec(&self) -> Vec<TraceEntry<T>> {
261 let entries = self.entries.lock().expect("entries shouldn't be poisoned");
262 (*entries).clone()
263 }
264
265 pub fn find(&self, path: &str) -> Option<TraceEntry<T>>
267 where
268 T: Clone,
269 {
270 let entries = self.entries.lock().expect("entries shouldn't be poisoned");
271 entries.iter().find(|entry| entry.path == path).cloned()
272 }
273
274 fn push(&self, plan: &T)
280 where
281 T: Clone,
282 {
283 if let Some(current_path) = self.current_path() {
284 let times = self.times.lock().expect("times shouldn't be poisoned");
285 let start = self.start.lock().expect("start shouldn't is poisoned");
286 if let (Some(full_start), Some(span_start)) = (start.as_ref(), times.last()) {
287 let mut entries = self.entries.lock().expect("entries shouldn't be poisoned");
288 let time = std::time::Instant::now();
289 entries.push(TraceEntry {
290 instant: time,
291 span_duration: time.duration_since(*span_start),
292 full_duration: time.duration_since(*full_start),
293 path: current_path,
294 plan: plan.clone(),
295 });
296 }
297 }
298 }
299
300 fn current_path(&self) -> Option<String> {
305 let path = self.path.lock().expect("path shouldn't be poisoned");
306 let path = path.as_str();
307 match self.filter.as_ref() {
308 Some(named_paths) => {
309 if named_paths.iter().any(|named| path == named.path()) {
310 Some(path.to_owned())
311 } else {
312 None
313 }
314 }
315 None => Some(path.to_owned()),
316 }
317 }
318}
319
320impl PlanTrace<UsedIndexes> {
321 pub fn used_indexes_for(&self, plan_path: &str) -> UsedIndexes {
326 let path = match NamedPlan::of_path(plan_path) {
329 Some(NamedPlan::Global) => Some(NamedPlan::Global),
330 Some(NamedPlan::Physical) => Some(NamedPlan::Global),
331 Some(NamedPlan::FastPath) => Some(NamedPlan::FastPath),
332 _ => None,
333 };
334 let entry = match path {
336 Some(path) => self.find(path.path()),
337 None => None,
338 };
339 entry.map_or(Default::default(), |e| e.plan)
342 }
343}
344
345trait GetStr {
347 fn get_str(&self, key: &'static str) -> Option<String>;
348}
349
350impl<'a> GetStr for span::Attributes<'a> {
351 fn get_str(&self, key: &'static str) -> Option<String> {
352 let mut extract_str = ExtractStr::new(key);
353 self.record(&mut extract_str);
354 extract_str.val()
355 }
356}
357
358struct ExtractStr {
361 key: &'static str,
362 val: Option<String>,
363}
364
365impl ExtractStr {
366 fn new(key: &'static str) -> Self {
367 Self { key, val: None }
368 }
369
370 fn val(self) -> Option<String> {
371 self.val
372 }
373}
374
375impl field::Visit for ExtractStr {
376 fn record_str(&mut self, field: &tracing::field::Field, value: &str) {
377 if field.name() == self.key {
378 self.val = Some(value.to_string())
379 }
380 }
381
382 fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) {
383 if field.name() == self.key {
384 self.val = Some(format!("{value:?}"))
385 }
386 }
387}
388
389#[cfg(test)]
390mod test {
391 use mz_ore::instrument;
392 use tracing::dispatcher;
393 use tracing_subscriber::prelude::*;
394
395 use super::{PlanTrace, trace_plan};
396
397 #[mz_ore::test]
398 fn test_optimizer_trace() {
399 let subscriber = tracing_subscriber::registry().with(Some(PlanTrace::<String>::new(None)));
400 let dispatch = dispatcher::Dispatch::new(subscriber);
401
402 dispatcher::with_default(&dispatch, || {
403 optimize();
404 });
405
406 if let Some(trace) = dispatch.downcast_ref::<PlanTrace<String>>() {
407 let trace = trace.drain_as_vec();
408 assert_eq!(trace.len(), 5);
409 for (i, entry) in trace.into_iter().enumerate() {
410 let path = entry.path;
411 match i {
412 0 => {
413 assert_eq!(path, "optimize");
414 }
415 1 => {
416 assert_eq!(path, "optimize/logical/my_optimization");
417 }
418 2 => {
419 assert_eq!(path, "optimize/logical");
420 }
421 3 => {
422 assert_eq!(path, "optimize/physical");
423 }
424 4 => {
425 assert_eq!(path, "optimize");
426 }
427 _ => (),
428 }
429 }
430 }
431 }
432
433 #[instrument(level = "info")]
434 fn optimize() {
435 let mut plan = constant_plan(42);
436 trace_plan(&plan);
437 logical_optimizer(&mut plan);
438 physical_optimizer(&mut plan);
439 trace_plan(&plan);
440 }
441
442 #[instrument(level = "info", name = "logical")]
443 fn logical_optimizer(plan: &mut String) {
444 some_optimization(plan);
445 let _ = plan.replace("RawPlan", "LogicalPlan");
446 trace_plan(plan);
447 }
448
449 #[instrument(level = "info", name = "physical")]
450 fn physical_optimizer(plan: &mut String) {
451 let _ = plan.replace("LogicalPlan", "PhysicalPlan");
452 trace_plan(plan);
453 }
454
455 #[mz_ore::instrument(level = "debug", fields(path.segment ="my_optimization"))]
456 fn some_optimization(plan: &mut String) {
457 let _ = plan.replace("42", "47");
458 trace_plan(plan);
459 }
460
461 fn constant_plan(i: usize) -> String {
462 format!("RawPlan(#{})", i)
463 }
464}